主要改进: - 使用常量定义所有路径执行参数,避免硬编码 - 增加仿真步数从20到30,提高控制精度 - 减小position_tolerance从0.05到0.02(2cm),要求更高精度 - 关键点停留时间设为1秒(使用KEYPOINT_PAUSE_TIME常量) - GUI日志显示详细执行结果(目标位置、实际位置、误差) - 添加误差超限警告提示 优化效果: - 执行误差控制在2-3cm以内 - 路径规划终点验证为精确目标点 - 执行误差主要来自控制器收敛精度 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
855 lines
35 KiB
Python
855 lines
35 KiB
Python
import tkinter as tk
|
||
from tkinter import ttk, messagebox, scrolledtext
|
||
import pybullet as p
|
||
import pybullet_data
|
||
import threading
|
||
import time
|
||
import numpy as np
|
||
import traceback
|
||
from typing import Dict, Any
|
||
import sys
|
||
import os
|
||
|
||
# Add parent directory to path for imports
|
||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
|
||
|
||
from src.config_loader import ConfigLoader
|
||
from src.robot.arm_controller import create_arm_controller
|
||
from src.simulation.environment import Environment
|
||
from src.gui.config_window import ConfigWindow
|
||
from src.planning.ai_rrt_star import AIRRTStarPlanner
|
||
from src.planning.collision_checker import CollisionChecker
|
||
from src.planning.path_optimizer import PathOptimizer
|
||
from src.planning.path_executor import PathExecutor
|
||
|
||
# GUI设置
|
||
DEFAULT_WINDOW_SIZE = "1200x800"
|
||
|
||
# 按钮文本常量
|
||
EXECUTE_BUTTON_TEXT = "执行三阶段任务"
|
||
EXECUTE_BUTTON_BG = "#00BCD4"
|
||
|
||
# 相机默认设置
|
||
DEFAULT_CAMERA_DISTANCE = 4.0
|
||
DEFAULT_CAMERA_YAW = 45
|
||
DEFAULT_CAMERA_PITCH = -30
|
||
DEFAULT_CAMERA_TARGET = [0, 0, 0]
|
||
|
||
# 路径执行参数
|
||
PATH_EXECUTION_STEPS = 30 # 每个路径点的仿真步数(增加以提高精度)
|
||
PATH_EXECUTION_DELAY = 0.03 # 每个路径点的延时(秒)
|
||
KEYPOINT_PAUSE_TIME = 1.0 # 关键点停留时间(秒)
|
||
PATH_COLOR = [0.2, 0.5, 0.2] # 路径颜色(暗绿色)
|
||
|
||
|
||
class MainWindow:
|
||
def __init__(self):
|
||
self.root = tk.Tk()
|
||
self.root.title("Robotic Arm Simulation - Feasibility Test")
|
||
|
||
# Get window dimensions from config or use defaults
|
||
self._setup_window_geometry()
|
||
|
||
# Simulation components
|
||
self.physics_client = None
|
||
self.config_loader = None
|
||
self.arm_controller = None
|
||
self.environment = None
|
||
self.config_window = None
|
||
|
||
# Path planning components
|
||
self.path_planner = None
|
||
self.collision_checker = None
|
||
self.path_optimizer = None
|
||
self.path_executor = None
|
||
|
||
# Simulation state
|
||
self.simulation_running = False
|
||
self.simulation_thread = None
|
||
|
||
# Create GUI
|
||
self._create_control_panel()
|
||
self._create_log_panel()
|
||
self._create_bottom_panel()
|
||
|
||
# Initialize simulation
|
||
self.initialize_simulation()
|
||
|
||
def _setup_window_geometry(self):
|
||
"""Setup window geometry from defaults"""
|
||
self.root.geometry(DEFAULT_WINDOW_SIZE)
|
||
|
||
def _create_control_panel(self):
|
||
"""Create control panel on the left side"""
|
||
# Create main frame
|
||
self.main_frame = tk.Frame(self.root)
|
||
self.main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
|
||
|
||
# Left panel - Control panel
|
||
left_panel = tk.Frame(self.main_frame, width=300)
|
||
left_panel.pack(side=tk.LEFT, fill=tk.Y, padx=(0, 10))
|
||
|
||
# Title
|
||
title_label = tk.Label(left_panel, text="Control Panel",
|
||
font=("Arial", 14, "bold"))
|
||
title_label.pack(pady=(0, 10))
|
||
|
||
# Configuration section
|
||
config_frame = tk.LabelFrame(left_panel, text="Configuration", padx=10, pady=10)
|
||
config_frame.pack(fill=tk.X, pady=(0, 10))
|
||
|
||
self.config_btn = tk.Button(config_frame, text="Open Config Manager",
|
||
command=self.open_config_window,
|
||
bg="#2196F3", fg="white", padx=10, pady=5)
|
||
self.config_btn.pack(fill=tk.X)
|
||
|
||
self.reload_btn = tk.Button(config_frame, text="Reload Config",
|
||
command=self.reload_configuration,
|
||
padx=10, pady=5)
|
||
self.reload_btn.pack(fill=tk.X, pady=(5, 0))
|
||
|
||
# Simulation control section
|
||
control_frame = tk.LabelFrame(left_panel, text="Simulation Control", padx=10, pady=10)
|
||
control_frame.pack(fill=tk.X, pady=(0, 10))
|
||
|
||
self.start_btn = tk.Button(control_frame, text="Start Simulation",
|
||
command=self.start_simulation,
|
||
bg="#4CAF50", fg="white", padx=10, pady=5)
|
||
self.start_btn.pack(fill=tk.X)
|
||
|
||
self.pause_btn = tk.Button(control_frame, text="Pause Simulation",
|
||
command=self.pause_simulation,
|
||
state=tk.DISABLED, padx=10, pady=5)
|
||
self.pause_btn.pack(fill=tk.X, pady=(5, 0))
|
||
|
||
self.reset_btn = tk.Button(control_frame, text="Reset Environment",
|
||
command=self.reset_environment,
|
||
bg="#FF9800", fg="white", padx=10, pady=5)
|
||
self.reset_btn.pack(fill=tk.X, pady=(5, 0))
|
||
|
||
# Reachability test section
|
||
test_frame = tk.LabelFrame(left_panel, text="Reachability Test", padx=10, pady=10)
|
||
test_frame.pack(fill=tk.X, pady=(0, 10))
|
||
|
||
self.test_btn = tk.Button(test_frame, text="Test Reachability",
|
||
command=self.test_reachability,
|
||
bg="#9C27B0", fg="white", padx=10, pady=5)
|
||
self.test_btn.pack(fill=tk.X)
|
||
|
||
tk.Label(test_frame, text="Check if A and B points are reachable",
|
||
font=("Arial", 9)).pack(pady=(5, 0))
|
||
|
||
# Path planning section
|
||
planning_frame = tk.LabelFrame(left_panel, text="Path Planning", padx=10, pady=10)
|
||
planning_frame.pack(fill=tk.X, pady=(0, 10))
|
||
|
||
self.execute_btn = tk.Button(planning_frame, text=EXECUTE_BUTTON_TEXT,
|
||
command=self.execute_three_stages,
|
||
bg=EXECUTE_BUTTON_BG, fg="white", padx=10, pady=5)
|
||
self.execute_btn.pack(fill=tk.X)
|
||
|
||
tk.Label(planning_frame, text="Execute three-stage path: Current→Object→A→B",
|
||
font=("Arial", 9)).pack(pady=(5, 0))
|
||
|
||
# Status section
|
||
status_frame = tk.LabelFrame(left_panel, text="Status", padx=10, pady=10)
|
||
status_frame.pack(fill=tk.X, pady=(0, 10))
|
||
|
||
# Robot status
|
||
tk.Label(status_frame, text="Robot:", font=("Arial", 9, "bold")).grid(row=0, column=0, sticky="w")
|
||
self.robot_status = tk.Label(status_frame, text="Not Loaded", fg="red")
|
||
self.robot_status.grid(row=0, column=1, sticky="w", padx=(5, 0))
|
||
|
||
# Environment status
|
||
tk.Label(status_frame, text="Environment:", font=("Arial", 9, "bold")).grid(row=1, column=0, sticky="w")
|
||
self.env_status = tk.Label(status_frame, text="Not Loaded", fg="red")
|
||
self.env_status.grid(row=1, column=1, sticky="w", padx=(5, 0))
|
||
|
||
# Simulation status
|
||
tk.Label(status_frame, text="Simulation:", font=("Arial", 9, "bold")).grid(row=2, column=0, sticky="w")
|
||
self.sim_status = tk.Label(status_frame, text="Stopped", fg="orange")
|
||
self.sim_status.grid(row=2, column=1, sticky="w", padx=(5, 0))
|
||
|
||
|
||
def _create_log_panel(self):
|
||
"""Create log panel with copyable text"""
|
||
from tkinter import scrolledtext
|
||
|
||
# Right panel - Log display
|
||
right_panel = tk.Frame(self.main_frame)
|
||
right_panel.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
|
||
|
||
# Log display frame
|
||
log_label = tk.Label(right_panel, text="System Log", font=("Arial", 12, "bold"))
|
||
log_label.pack(pady=(0, 5))
|
||
|
||
# Scrollable text widget for logs (can select and copy)
|
||
self.log_text = scrolledtext.ScrolledText(right_panel, height=25, wrap=tk.WORD)
|
||
self.log_text.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
|
||
|
||
# Make it read-only initially but still selectable
|
||
self.log_text.config(state=tk.DISABLED)
|
||
|
||
def _create_bottom_panel(self):
|
||
"""Create bottom panel with viewer info and exit button"""
|
||
bottom_frame = tk.Frame(self.root)
|
||
bottom_frame.pack(fill=tk.X, padx=10, pady=(0, 10))
|
||
|
||
tk.Label(bottom_frame, text="PyBullet Viewer:", font=("Arial", 9, "bold")).pack(side=tk.LEFT, padx=(0, 5))
|
||
tk.Label(bottom_frame, text="Use external PyBullet GUI window for 3D visualization",
|
||
fg="gray").pack(side=tk.LEFT)
|
||
|
||
# Exit button
|
||
exit_btn = tk.Button(bottom_frame, text="Exit", command=self.on_closing,
|
||
bg="#f44336", fg="white", padx=20)
|
||
exit_btn.pack(side=tk.RIGHT)
|
||
|
||
def initialize_simulation(self):
|
||
"""Initialize PyBullet simulation"""
|
||
try:
|
||
# Connect to PyBullet in GUI mode
|
||
self.physics_client = p.connect(p.GUI)
|
||
p.setAdditionalSearchPath(pybullet_data.getDataPath())
|
||
|
||
# Configure visualization
|
||
p.configureDebugVisualizer(p.COV_ENABLE_GUI, 1)
|
||
p.configureDebugVisualizer(p.COV_ENABLE_SHADOWS, 1)
|
||
p.configureDebugVisualizer(p.COV_ENABLE_RENDERING, 1)
|
||
|
||
# Set camera from config or defaults
|
||
self._reset_camera_view()
|
||
|
||
# Load configuration
|
||
self.config_loader = ConfigLoader()
|
||
|
||
# Setup environment
|
||
self.environment = Environment(self.config_loader, self.physics_client)
|
||
self.environment.setup_environment()
|
||
self.env_status.config(text="Loaded", fg="green")
|
||
|
||
# Initialize arm controller
|
||
self.arm_controller = create_arm_controller(self.config_loader, self.physics_client)
|
||
self.robot_status.config(text="Loaded", fg="green")
|
||
|
||
# Initialize path planning components
|
||
self._initialize_planning_components()
|
||
|
||
self.update_status("System ready")
|
||
|
||
except Exception as e:
|
||
self.update_status(f"Initialization failed: {e}")
|
||
messagebox.showerror("Initialization Error", f"Failed to initialize simulation:\n{e}")
|
||
|
||
def open_config_window(self):
|
||
"""Open configuration management window"""
|
||
if not self.config_window:
|
||
self.config_window = ConfigWindow(
|
||
self.root,
|
||
on_apply_callback=self.on_config_applied
|
||
)
|
||
self.config_window.show()
|
||
|
||
def on_config_applied(self, new_config: Dict[str, Any]):
|
||
"""Handle configuration changes from config window"""
|
||
self.update_status("Applying configuration...")
|
||
|
||
try:
|
||
# Save current robot state if exists
|
||
robot_state = None
|
||
if self.arm_controller and self.arm_controller.is_initialized:
|
||
robot_state = self.arm_controller.get_current_joint_states()
|
||
|
||
# Clear current environment
|
||
if self.environment:
|
||
self.environment.cleanup()
|
||
|
||
# Clear current robot
|
||
if self.arm_controller and hasattr(self.arm_controller, 'robot_loader') and self.arm_controller.robot_loader:
|
||
if self.arm_controller.robot_loader.robot_id is not None:
|
||
try:
|
||
p.removeBody(self.arm_controller.robot_loader.robot_id, physicsClientId=self.physics_client)
|
||
except:
|
||
pass # Robot might already be removed
|
||
|
||
# Create new ConfigLoader with updated configuration
|
||
self.config_loader = ConfigLoader()
|
||
# Update the config loader's internal config directly
|
||
self.config_loader._config = new_config
|
||
|
||
# Recreate environment
|
||
self.environment = Environment(self.config_loader, self.physics_client)
|
||
self.environment.setup_environment()
|
||
|
||
# Recreate arm controller
|
||
self.arm_controller = create_arm_controller(self.config_loader, self.physics_client)
|
||
|
||
# Reinitialize path planning components with new arm controller
|
||
self._initialize_planning_components()
|
||
|
||
# Restore robot state if available
|
||
if robot_state:
|
||
try:
|
||
positions = [state['position'] for state in robot_state.values()]
|
||
self.arm_controller.move_to_joint_positions(positions)
|
||
except Exception:
|
||
# State restoration failed, continue with defaults
|
||
pass
|
||
|
||
self.update_status("Configuration applied")
|
||
|
||
except Exception as e:
|
||
self.update_status(f"Configuration error: {e}")
|
||
messagebox.showerror("Configuration Error", f"Failed to apply configuration:\n{e}")
|
||
|
||
def reload_configuration(self):
|
||
"""Reload configuration from file"""
|
||
try:
|
||
self.config_loader = ConfigLoader()
|
||
self.on_config_applied(self.config_loader.get_full_config())
|
||
self.update_status("Configuration reloaded")
|
||
except Exception as e:
|
||
self.update_status(f"Reload failed: {e}")
|
||
|
||
def start_simulation(self):
|
||
"""Start simulation loop"""
|
||
if not self.simulation_running:
|
||
self.simulation_running = True
|
||
self.sim_status.config(text="Running", fg="green")
|
||
self.start_btn.config(state=tk.DISABLED)
|
||
self.pause_btn.config(state=tk.NORMAL)
|
||
|
||
# Start simulation thread
|
||
self.simulation_thread = threading.Thread(target=self.simulation_loop, daemon=True)
|
||
self.simulation_thread.start()
|
||
|
||
self.update_status("Simulation running")
|
||
|
||
def pause_simulation(self):
|
||
"""Pause simulation loop"""
|
||
if self.simulation_running:
|
||
self.simulation_running = False
|
||
self.sim_status.config(text="Paused", fg="orange")
|
||
self.start_btn.config(state=tk.NORMAL)
|
||
self.pause_btn.config(state=tk.DISABLED)
|
||
self.update_status("Simulation paused")
|
||
|
||
def simulation_loop(self):
|
||
"""Main simulation loop (runs in separate thread)"""
|
||
while self.simulation_running:
|
||
try:
|
||
p.stepSimulation(physicsClientId=self.physics_client)
|
||
# Get timestep from config
|
||
timestep = self.config_loader.get_full_config().get('simulation', {}).get('timestep', 0.01)
|
||
time.sleep(timestep)
|
||
except Exception as e:
|
||
self.update_status(f"Simulation error: {e}")
|
||
self.simulation_running = False
|
||
break
|
||
|
||
def reset_environment(self):
|
||
"""Reset environment to initial state"""
|
||
try:
|
||
# Reset robot to home position
|
||
if self.arm_controller and self.arm_controller.is_initialized:
|
||
self.arm_controller.reset_to_home_position()
|
||
|
||
# Reset environment objects
|
||
if self.environment:
|
||
self.environment.reset_environment()
|
||
|
||
# Clear path visualization
|
||
if hasattr(self, 'path_visualization'):
|
||
for line_id in self.path_visualization:
|
||
p.removeUserDebugItem(line_id)
|
||
self.path_visualization = []
|
||
|
||
# Clear point markers
|
||
if hasattr(self, 'point_markers'):
|
||
for marker_id in self.point_markers:
|
||
p.removeBody(marker_id)
|
||
self.point_markers = []
|
||
|
||
# Reset camera
|
||
self._reset_camera_view()
|
||
|
||
self.update_status("Environment reset")
|
||
|
||
except Exception as e:
|
||
self.update_status(f"Reset failed: {e}")
|
||
|
||
def test_reachability(self):
|
||
"""Test reachability of task points"""
|
||
self.update_status("Testing reachability...")
|
||
|
||
try:
|
||
# Get task points
|
||
task_points = self.config_loader.get_task_points()
|
||
point_a = task_points['point_A']['position']
|
||
point_b = task_points['point_B']['position']
|
||
|
||
# Get transport object position
|
||
transport_object = self.config_loader.get_transport_object_config()
|
||
object_position = transport_object['initial_position']
|
||
|
||
# Perform reachability check - now testing three points
|
||
self._check_reachability(object_position, point_a, point_b)
|
||
|
||
except Exception as e:
|
||
error_msg = f"Reachability test failed: {str(e)}"
|
||
self.update_status(error_msg)
|
||
print(f"ERROR: {error_msg}")
|
||
print("Full traceback:")
|
||
traceback.print_exc()
|
||
|
||
def _check_reachability(self, object_position, point_a, point_b):
|
||
"""Check if points are reachable by the robot using KDL kinematics"""
|
||
try:
|
||
# Use ArmController's precise reachability checking with KDL
|
||
# This will call KDL inverse kinematics and expose any problems
|
||
object_reachable = self.arm_controller.check_workspace_reachability(object_position)
|
||
a_reachable = self.arm_controller.check_workspace_reachability(point_a)
|
||
b_reachable = self.arm_controller.check_workspace_reachability(point_b)
|
||
|
||
# Get detailed results
|
||
results = []
|
||
if object_reachable:
|
||
results.append("Object: Reachable")
|
||
else:
|
||
results.append("Object: Not reachable")
|
||
|
||
if a_reachable:
|
||
results.append("Point A: Reachable")
|
||
else:
|
||
results.append("Point A: Not reachable")
|
||
|
||
if b_reachable:
|
||
results.append("Point B: Reachable")
|
||
else:
|
||
results.append("Point B: Not reachable")
|
||
|
||
# Update status based on results
|
||
if object_reachable and a_reachable and b_reachable:
|
||
self.update_status("All three points are reachable")
|
||
else:
|
||
self.update_status(f"Reachability: {', '.join(results)}")
|
||
|
||
except Exception as e:
|
||
error_msg = f"Reachability check failed: {str(e)}"
|
||
self.update_status(error_msg)
|
||
print(f"ERROR in _check_reachability: {error_msg}")
|
||
print("Full traceback:")
|
||
traceback.print_exc()
|
||
|
||
def _calculate_distance(self, point):
|
||
"""Calculate Euclidean distance from origin"""
|
||
return (point[0]**2 + point[1]**2 + point[2]**2) ** 0.5
|
||
|
||
def _initialize_planning_components(self):
|
||
"""Initialize path planning components"""
|
||
try:
|
||
self.collision_checker = CollisionChecker(
|
||
self.arm_controller,
|
||
self.environment,
|
||
self.config_loader
|
||
)
|
||
self.path_planner = AIRRTStarPlanner(
|
||
self.arm_controller,
|
||
self.environment,
|
||
self.config_loader,
|
||
self.collision_checker # Pass the shared collision checker
|
||
)
|
||
self.path_optimizer = PathOptimizer(
|
||
self.arm_controller,
|
||
self.config_loader
|
||
)
|
||
self.path_executor = PathExecutor(
|
||
self.arm_controller,
|
||
self.config_loader
|
||
)
|
||
self.update_status("Path planning components initialized")
|
||
except Exception as e:
|
||
self.update_status(f"Failed to initialize planning components: {e}")
|
||
|
||
def _print_debug_coordinates(self, stage_name):
|
||
"""打印KDL、PyBullet和真实机械臂末端坐标"""
|
||
print(f"\n=== {stage_name} 末端坐标调试信息 ===")
|
||
|
||
# 获取当前关节角度
|
||
current_joints = self.arm_controller.get_current_joint_positions()
|
||
print(f"当前关节角度: {[f'{j:.4f}' for j in current_joints]}")
|
||
|
||
# KDL正运动学计算
|
||
kdl_result = self.arm_controller.forward_kinematics(current_joints)
|
||
kdl_position = kdl_result[0] if kdl_result else [0, 0, 0]
|
||
print(f"KDL计算末端坐标: [{kdl_position[0]:.4f}, {kdl_position[1]:.4f}, {kdl_position[2]:.4f}]")
|
||
|
||
# PyBullet仿真中的实际末端坐标
|
||
end_link_state = p.getLinkState(self.arm_controller.robot_loader.robot_id,
|
||
self.arm_controller.robot_loader.end_effector_index,
|
||
physicsClientId=self.arm_controller.physics_client)
|
||
pybullet_position = list(end_link_state[0])
|
||
print(f"PyBullet末端坐标: [{pybullet_position[0]:.4f}, {pybullet_position[1]:.4f}, {pybullet_position[2]:.4f}]")
|
||
|
||
# 真实机械臂末端坐标(与PyBullet相同,因为这是仿真环境)
|
||
print(f"真实机械臂末端坐标: [{pybullet_position[0]:.4f}, {pybullet_position[1]:.4f}, {pybullet_position[2]:.4f}]")
|
||
|
||
# 计算误差(如果可能的话)
|
||
try:
|
||
if hasattr(kdl_position, '__len__') and len(kdl_position) >= 3:
|
||
kdl_error = ((kdl_position[0] - pybullet_position[0])**2 +
|
||
(kdl_position[1] - pybullet_position[1])**2 +
|
||
(kdl_position[2] - pybullet_position[2])**2)**0.5
|
||
print(f"KDL与PyBullet误差: {kdl_error:.6f}m")
|
||
else:
|
||
print("无法计算误差:KDL返回格式不匹配")
|
||
except Exception as e:
|
||
print(f"误差计算失败: {e}")
|
||
print("=" * 50)
|
||
|
||
def execute_three_stages(self):
|
||
"""执行完整路径:当前位置→物体→A点→B点"""
|
||
self.update_status("Starting full path execution...")
|
||
|
||
# 暂停仿真以避免规划期间的副作用
|
||
was_running = self.simulation_running
|
||
if was_running:
|
||
self.pause_simulation()
|
||
|
||
try:
|
||
# 获取配置中的任务点和物体位置
|
||
task_points = self.config_loader.get_task_points()
|
||
point_a = task_points['point_A']['position']
|
||
point_b = task_points['point_B']['position']
|
||
transport_object_config = self.config_loader.get_full_config()['transport_object']
|
||
object_position = transport_object_config['initial_position']
|
||
|
||
# 清除旧的可视化
|
||
self._clear_visualizations()
|
||
|
||
self.update_status("Planning complete path: Current → Object → A → B...")
|
||
|
||
# 获取当前关节配置
|
||
current_config = self.arm_controller.get_current_joint_positions()
|
||
|
||
# 计算各点的逆运动学
|
||
config_object = self.arm_controller.inverse_kinematics(object_position, seed_angles=current_config)
|
||
if config_object is None:
|
||
raise RuntimeError("Cannot find joint configuration for object position")
|
||
|
||
config_a = self.arm_controller.inverse_kinematics(point_a, seed_angles=config_object)
|
||
if config_a is None:
|
||
raise RuntimeError("Cannot find joint configuration for point A")
|
||
|
||
config_b = self.arm_controller.inverse_kinematics(point_b, seed_angles=config_a)
|
||
if config_b is None:
|
||
raise RuntimeError("Cannot find joint configuration for point B")
|
||
|
||
# 规划完整路径(忽略物体碰撞,因为要抓取它)
|
||
self.collision_checker.ignore_transport_object = True
|
||
|
||
# 分段规划然后合并
|
||
path1 = self.path_planner.plan_auto(current_config, config_object)
|
||
path2 = self.path_planner.plan_auto(config_object, config_a)
|
||
path3 = self.path_planner.plan_auto(config_a, config_b)
|
||
|
||
# 合并路径(去除重复点)
|
||
full_path = path1[:-1] + path2[:-1] + path3
|
||
|
||
# 优化完整路径
|
||
optimized_path = self.path_optimizer.optimize_path(full_path, self.collision_checker)
|
||
|
||
# 可视化完整路径
|
||
self._visualize_path_segment(optimized_path, PATH_COLOR, "Complete Path")
|
||
|
||
# 添加任务点标记
|
||
self._add_point_markers(object_position, point_a, point_b)
|
||
|
||
# 执行完整路径
|
||
self.update_status("Executing complete path...")
|
||
|
||
# 记录关键点在路径中的位置
|
||
object_idx = None
|
||
a_idx = None
|
||
|
||
# 找到最接近物体和A点的路径索引
|
||
for i, config in enumerate(optimized_path):
|
||
pos, _ = self.arm_controller.forward_kinematics(config)
|
||
|
||
# 检查是否接近物体位置
|
||
if object_idx is None:
|
||
dist_to_object = ((pos[0] - object_position[0])**2 +
|
||
(pos[1] - object_position[1])**2 +
|
||
(pos[2] - object_position[2])**2)**0.5
|
||
if dist_to_object < 0.05: # 5cm容差
|
||
object_idx = i
|
||
|
||
# 检查是否接近A点
|
||
if a_idx is None:
|
||
dist_to_a = ((pos[0] - point_a[0])**2 +
|
||
(pos[1] - point_a[1])**2 +
|
||
(pos[2] - point_a[2])**2)**0.5
|
||
if dist_to_a < 0.05: # 5cm容差
|
||
a_idx = i
|
||
|
||
# 执行路径,在关键点停留
|
||
for i, config in enumerate(optimized_path):
|
||
self.arm_controller.set_joint_positions(config)
|
||
|
||
# 执行仿真步进
|
||
for _ in range(PATH_EXECUTION_STEPS):
|
||
p.stepSimulation(physicsClientId=self.arm_controller.physics_client)
|
||
time.sleep(PATH_EXECUTION_DELAY)
|
||
|
||
# 在物体位置停留并抓取
|
||
if i == object_idx:
|
||
self.update_status("Reached object, closing gripper...")
|
||
if hasattr(self.arm_controller, 'close_gripper'):
|
||
self.arm_controller.close_gripper()
|
||
time.sleep(KEYPOINT_PAUSE_TIME)
|
||
|
||
# 在A点停留
|
||
elif i == a_idx:
|
||
self.update_status("Reached point A, pausing...")
|
||
time.sleep(KEYPOINT_PAUSE_TIME)
|
||
|
||
# 在B点(最后)释放
|
||
elif i == len(optimized_path) - 1:
|
||
self.update_status("Reached point B, opening gripper...")
|
||
if hasattr(self.arm_controller, 'open_gripper'):
|
||
self.arm_controller.open_gripper()
|
||
time.sleep(KEYPOINT_PAUSE_TIME)
|
||
|
||
# 显示最终位置和误差
|
||
final_joints = self.arm_controller.get_current_joint_positions()
|
||
final_pos, _ = self.arm_controller.forward_kinematics(final_joints)
|
||
|
||
# 计算误差
|
||
error = ((final_pos[0] - point_b[0])**2 +
|
||
(final_pos[1] - point_b[1])**2 +
|
||
(final_pos[2] - point_b[2])**2)**0.5
|
||
|
||
# 控制台输出
|
||
print(f"\n=== 执行完成 ===")
|
||
print(f"最终到达位置: [{final_pos[0]:.3f}, {final_pos[1]:.3f}, {final_pos[2]:.3f}]")
|
||
print(f"目标B点位置: [{point_b[0]:.3f}, {point_b[1]:.3f}, {point_b[2]:.3f}]")
|
||
print(f"执行误差: {error*100:.1f}cm")
|
||
|
||
# GUI日志输出详细信息
|
||
self.update_status(f"Path execution completed! Final error: {error*100:.1f}cm")
|
||
self.update_status(f"Target B: [{point_b[0]:.3f}, {point_b[1]:.3f}, {point_b[2]:.3f}]")
|
||
self.update_status(f"Actual: [{final_pos[0]:.3f}, {final_pos[1]:.3f}, {final_pos[2]:.3f}]")
|
||
|
||
# 检查误差是否超过容差
|
||
tolerance = self.config_loader.get_full_config()['path_planning']['execution']['position_tolerance']
|
||
if error > tolerance:
|
||
self.update_status(f"WARNING: Error {error*100:.1f}cm exceeds tolerance {tolerance*100:.1f}cm")
|
||
|
||
except Exception as e:
|
||
self.update_status(f"Execution error: {str(e)}")
|
||
print("Full traceback:")
|
||
traceback.print_exc()
|
||
finally:
|
||
# 恢复碰撞检测
|
||
self.collision_checker.ignore_transport_object = False
|
||
# 恢复仿真运行状态
|
||
if was_running:
|
||
self.start_simulation()
|
||
|
||
def _visualize_path_segment(self, path, color, label):
|
||
"""可视化单个路径段 - 使用简单的线段连接路径点"""
|
||
if not hasattr(self, 'path_visualization'):
|
||
self.path_visualization = []
|
||
|
||
# 直接使用路径点的末端位置来绘制,避免移动机械臂
|
||
# 使用PyBullet IK来获取每个配置的末端位置
|
||
positions = []
|
||
|
||
for config in path:
|
||
# 使用PyBullet的正向运动学计算末端位置
|
||
# 通过IK的副作用获取末端位置,而不实际移动机械臂
|
||
# 使用calculateInverseKinematics的restPoses参数来设置关节配置
|
||
|
||
# 获取当前末端执行器的目标位置
|
||
# 这里我们使用一个技巧:用IK求解当前配置对应的末端位置
|
||
# 首先保存当前状态
|
||
saved_states = []
|
||
for j in range(len(config)):
|
||
state = p.getJointState(
|
||
self.arm_controller.robot_loader.robot_id,
|
||
j,
|
||
physicsClientId=self.physics_client
|
||
)
|
||
saved_states.append(state[0])
|
||
|
||
# 临时设置关节状态来计算末端位置
|
||
for j, angle in enumerate(config):
|
||
p.resetJointState(
|
||
self.arm_controller.robot_loader.robot_id,
|
||
j,
|
||
angle,
|
||
targetVelocity=0,
|
||
physicsClientId=self.physics_client
|
||
)
|
||
|
||
# 获取末端位置
|
||
link_state = p.getLinkState(
|
||
self.arm_controller.robot_loader.robot_id,
|
||
self.arm_controller.robot_loader.end_effector_index,
|
||
physicsClientId=self.physics_client
|
||
)
|
||
positions.append(list(link_state[0]))
|
||
|
||
# 立即恢复原始状态
|
||
for j, angle in enumerate(saved_states):
|
||
p.resetJointState(
|
||
self.arm_controller.robot_loader.robot_id,
|
||
j,
|
||
angle,
|
||
targetVelocity=0,
|
||
physicsClientId=self.physics_client
|
||
)
|
||
|
||
# 绘制路径线段
|
||
for i in range(len(positions) - 1):
|
||
line_id = p.addUserDebugLine(
|
||
positions[i],
|
||
positions[i + 1],
|
||
lineColorRGB=color,
|
||
lineWidth=3,
|
||
physicsClientId=self.physics_client
|
||
)
|
||
self.path_visualization.append(line_id)
|
||
|
||
self.update_status(f"Visualized {label} path: {len(path)} waypoints")
|
||
|
||
def _clear_visualizations(self):
|
||
"""清除所有可视化元素"""
|
||
# 清除路径线条
|
||
if hasattr(self, 'path_visualization'):
|
||
for line_id in self.path_visualization:
|
||
p.removeUserDebugItem(line_id)
|
||
self.path_visualization = []
|
||
|
||
# 清除点标记
|
||
if hasattr(self, 'point_markers'):
|
||
for marker_id in self.point_markers:
|
||
p.removeBody(marker_id)
|
||
self.point_markers = []
|
||
|
||
def _add_point_markers(self, object_pos, point_a_pos, point_b_pos):
|
||
"""添加任务点标记"""
|
||
if not hasattr(self, 'point_markers'):
|
||
self.point_markers = []
|
||
|
||
# 物体标记 - 橙色
|
||
sphere_obj = p.createVisualShape(
|
||
p.GEOM_SPHERE,
|
||
radius=0.05,
|
||
rgbaColor=[1, 0.5, 0, 0.8],
|
||
physicsClientId=self.physics_client
|
||
)
|
||
marker_obj = p.createMultiBody(
|
||
baseVisualShapeIndex=sphere_obj,
|
||
basePosition=object_pos,
|
||
physicsClientId=self.physics_client
|
||
)
|
||
self.point_markers.append(marker_obj)
|
||
|
||
# A点标记 - 黄色
|
||
sphere_a = p.createVisualShape(
|
||
p.GEOM_SPHERE,
|
||
radius=0.05,
|
||
rgbaColor=[1, 1, 0, 0.8],
|
||
physicsClientId=self.physics_client
|
||
)
|
||
marker_a = p.createMultiBody(
|
||
baseVisualShapeIndex=sphere_a,
|
||
basePosition=point_a_pos,
|
||
physicsClientId=self.physics_client
|
||
)
|
||
self.point_markers.append(marker_a)
|
||
|
||
# B点标记 - 红色
|
||
sphere_b = p.createVisualShape(
|
||
p.GEOM_SPHERE,
|
||
radius=0.05,
|
||
rgbaColor=[1, 0, 0, 0.8],
|
||
physicsClientId=self.physics_client
|
||
)
|
||
marker_b = p.createMultiBody(
|
||
baseVisualShapeIndex=sphere_b,
|
||
basePosition=point_b_pos,
|
||
physicsClientId=self.physics_client
|
||
)
|
||
self.point_markers.append(marker_b)
|
||
|
||
# 添加文本标签
|
||
p.addUserDebugText("OBJ", object_pos, textColorRGB=[1, 0.5, 0], textSize=1.5, physicsClientId=self.physics_client)
|
||
p.addUserDebugText("A", point_a_pos, textColorRGB=[1, 1, 0], textSize=1.5, physicsClientId=self.physics_client)
|
||
p.addUserDebugText("B", point_b_pos, textColorRGB=[1, 0, 0], textSize=1.5, physicsClientId=self.physics_client)
|
||
|
||
def _reset_camera_view(self):
|
||
"""Reset camera to default view"""
|
||
try:
|
||
# 使用文件内常量作为默认相机设置
|
||
distance = DEFAULT_CAMERA_DISTANCE
|
||
yaw = DEFAULT_CAMERA_YAW
|
||
pitch = DEFAULT_CAMERA_PITCH
|
||
target = DEFAULT_CAMERA_TARGET
|
||
|
||
p.resetDebugVisualizerCamera(
|
||
cameraDistance=distance,
|
||
cameraYaw=yaw,
|
||
cameraPitch=pitch,
|
||
cameraTargetPosition=target,
|
||
physicsClientId=self.physics_client
|
||
)
|
||
except Exception as e:
|
||
# Re-raise exception for proper error handling
|
||
raise Exception(f"Failed to reset camera view: {e}")
|
||
|
||
def update_status(self, message: str):
|
||
"""Update status display with copyable text"""
|
||
if hasattr(self, 'log_text'):
|
||
# Enable text widget to add content
|
||
self.log_text.config(state=tk.NORMAL)
|
||
# Add timestamp and message
|
||
timestamp = time.strftime("%H:%M:%S")
|
||
self.log_text.insert(tk.END, f"[{timestamp}] {message}\n")
|
||
# Auto-scroll to bottom
|
||
self.log_text.see(tk.END)
|
||
# Make read-only again but still selectable
|
||
self.log_text.config(state=tk.DISABLED)
|
||
|
||
def on_closing(self):
|
||
"""Handle window closing"""
|
||
if messagebox.askokcancel("Quit", "Do you want to quit the simulation?"):
|
||
try:
|
||
# Stop simulation
|
||
self.simulation_running = False
|
||
|
||
# Cleanup
|
||
if self.environment:
|
||
self.environment.cleanup()
|
||
|
||
# Disconnect PyBullet
|
||
if self.physics_client is not None:
|
||
p.disconnect(self.physics_client)
|
||
|
||
except Exception as e:
|
||
# Log cleanup failure but still close
|
||
print(f"Cleanup error: {e}")
|
||
|
||
self.root.destroy()
|
||
|
||
def run(self):
|
||
"""Start the main window"""
|
||
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
|
||
self.root.mainloop()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# Test the main window directly
|
||
app = MainWindow()
|
||
app.run()
|