diff --git a/src/gui/main_window.py b/src/gui/main_window.py index bb046d6..648f024 100644 --- a/src/gui/main_window.py +++ b/src/gui/main_window.py @@ -23,6 +23,10 @@ from src.planning.path_executor import PathExecutor # GUI设置 DEFAULT_WINDOW_SIZE = "1200x800" +# 按钮文本常量 +EXECUTE_BUTTON_TEXT = "执行三阶段任务" +EXECUTE_BUTTON_BG = "#00BCD4" + # 相机默认设置 DEFAULT_CAMERA_DISTANCE = 4.0 DEFAULT_CAMERA_YAW = 45 @@ -50,7 +54,6 @@ class MainWindow: self.collision_checker = None self.path_optimizer = None self.path_executor = None - self.planned_path = None # Simulation state self.simulation_running = False @@ -131,19 +134,13 @@ class MainWindow: # Path planning section planning_frame = tk.LabelFrame(left_panel, text="Path Planning", padx=10, pady=10) planning_frame.pack(fill=tk.X, pady=(0, 10)) - - self.plan_btn = tk.Button(planning_frame, text="Plan Path (AI RRT*)", - command=self.plan_path, - bg="#00BCD4", fg="white", padx=10, pady=5) - self.plan_btn.pack(fill=tk.X) - - self.execute_btn = tk.Button(planning_frame, text="Execute Path", - command=self.execute_path, - bg="#8BC34A", fg="white", padx=10, pady=5, - state=tk.DISABLED) - self.execute_btn.pack(fill=tk.X, pady=(5, 0)) - - tk.Label(planning_frame, text="Plan and execute path through hole", + + self.execute_btn = tk.Button(planning_frame, text=EXECUTE_BUTTON_TEXT, + command=self.execute_three_stages, + bg=EXECUTE_BUTTON_BG, fg="white", padx=10, pady=5) + self.execute_btn.pack(fill=tk.X) + + tk.Label(planning_frame, text="Execute three-stage path: Current→Object→A→B", font=("Arial", 9)).pack(pady=(5, 0)) # Status section @@ -364,10 +361,6 @@ class MainWindow: p.removeBody(marker_id) self.point_markers = [] - # Reset planned path - self.planned_path = None - self.execute_btn.config(state=tk.DISABLED) - # Reset camera self._reset_camera_view() @@ -471,284 +464,256 @@ class MainWindow: except Exception as e: self.update_status(f"Failed to initialize planning components: {e}") - def plan_path(self): - """Plan path from A to B through hole using AI RRT*""" - self.update_status("Starting AI RRT* path planning...") - # 暂停仿真线程,避免规划期间并发 step 引入副作用 + def execute_three_stages(self): + """执行三阶段任务:当前位置→物体→A点→B点""" + self.update_status("Starting three-stage execution...") + + # 暂停仿真以避免规划期间的副作用 was_running = self.simulation_running if was_running: self.pause_simulation() - + try: - # Get task points and object position + # 获取配置中的任务点和物体位置 task_points = self.config_loader.get_task_points() point_a = task_points['point_A']['position'] point_b = task_points['point_B']['position'] - - # Get transport object position from config transport_object_config = self.config_loader.get_full_config()['transport_object'] object_position = transport_object_config['initial_position'] - - # Get current joint configuration + + # 清除旧的可视化 + self._clear_visualizations() + + # =================== + # 第一阶段:当前位置 → 物体 + # =================== + self.update_status("Stage 1: Planning path from current position to object...") + + # 获取当前关节配置 current_config = self.arm_controller.get_current_joint_positions() - - # Convert points to joint configurations using inverse kinematics - self.update_status("Computing target configurations...") - - # Object position configuration + + # 计算物体位置的逆运动学 config_object = self.arm_controller.inverse_kinematics(object_position, seed_angles=current_config) if config_object is None: raise RuntimeError("Cannot find joint configuration for object position") - - # A point configuration - config_a = self.arm_controller.inverse_kinematics(point_a, seed_angles=config_object) - if config_a is None: - raise RuntimeError("Cannot find joint configuration for point A") - - # B point configuration - config_b = self.arm_controller.inverse_kinematics(point_b, seed_angles=config_a) - if config_b is None: - raise RuntimeError("Cannot find joint configuration for point B") - - # Plan three path segments - # Segment 1: Current position to object (ignore transport object collision) - self.update_status("Planning path from current position to object...") - # Temporarily ignore transport object for approaching it + + # 规划路径(忽略物体碰撞) self.collision_checker.ignore_transport_object = True try: path_to_object = self.path_planner.plan_auto(current_config, config_object) - finally: - # Restore collision checking for transport object - self.collision_checker.ignore_transport_object = False - - # Segment 2: Object to A - self.update_status("Planning path from object to A...") - path_object_to_a = self.path_planner.plan_auto(config_object, config_a) - - # Segment 3: A to B (通用避障;若配置要求强制穿洞,内部会选择穿洞策略) - self.update_status("Planning path from A to B...") - path_a_to_b = self.path_planner.plan_auto(config_a, config_b) - - # Optimize paths separately to maintain segment information - self.update_status("Optimizing path segments...") - # Optimize path to object (ignore transport object collision) - self.collision_checker.ignore_transport_object = True - try: optimized_path_to_object = self.path_optimizer.optimize_path(path_to_object, self.collision_checker) finally: self.collision_checker.ignore_transport_object = False - - optimized_path_object_to_a = self.path_optimizer.optimize_path(path_object_to_a, self.collision_checker) - optimized_path_a_to_b = self.path_optimizer.optimize_path(path_a_to_b, self.collision_checker) - - # Merge optimized paths (remove duplicate points) - self.planned_path = (optimized_path_to_object[:-1] + - optimized_path_object_to_a[:-1] + - optimized_path_a_to_b) - - # Store segment boundaries - self.object_index = len(optimized_path_to_object) - 1 - self.point_a_index = self.object_index + len(optimized_path_object_to_a) - 1 - path_length = len(self.planned_path) - self.update_status(f"Path planning successful! Full path contains {path_length} waypoints") - - # Enable execute button - self.execute_btn.config(state=tk.NORMAL) - - # Visualize path with segments - self._visualize_path(self.planned_path) - except Exception as e: - import traceback - self.update_status(f"Path planning failed: {str(e)}") - print("Full traceback:") - traceback.print_exc() - self.planned_path = None - self.execute_btn.config(state=tk.DISABLED) - finally: - # 规划完成后恢复仿真运行状态 - if was_running: - self.start_simulation() - - def execute_path(self): - """Execute the planned path using pre-planned segments""" - self.update_status("Starting three-stage execution using planned path...") + # 可视化第一段路径(紫色) + self._visualize_path_segment(optimized_path_to_object, [0.8, 0, 0.8], "To Object") - try: - # Check if planned path exists - if self.planned_path is None: - raise RuntimeError("No planned path available. Please run path planning first.") - - # Stage 1: Execute path to object using pre-planned segment + # 执行第一段路径 self.update_status("Stage 1: Moving to object...") - path_to_object = self.planned_path[:self.object_index + 1] - self.collision_checker.ignore_transport_object = True try: - success = self.path_executor.execute_path(path_to_object, gripper_action=None) + success = self.path_executor.execute_path(optimized_path_to_object, gripper_action=None) finally: self.collision_checker.ignore_transport_object = False if not success: raise RuntimeError("Stage 1 failed: Could not reach object") - # Close gripper + # 抓取物体 self.update_status("Closing gripper at object...") if hasattr(self.arm_controller, 'close_gripper'): self.arm_controller.close_gripper() import time time.sleep(0.5) - # After gripping, ignore transport object collision + # 从现在开始忽略物体碰撞(因为已经抓取) self.collision_checker.ignore_transport_object = True - # Stage 2: Execute path from object to A using pre-planned segment - self.update_status("Stage 2: Moving to point A...") - path_to_a = self.planned_path[self.object_index:self.point_a_index + 1] + # =================== + # 第二阶段:物体 → A点 + # =================== + self.update_status("Stage 2: Planning path from object to point A...") - success = self.path_executor.execute_path(path_to_a, gripper_action=None) + # 计算A点的逆运动学 + config_a = self.arm_controller.inverse_kinematics(point_a, seed_angles=config_object) + if config_a is None: + raise RuntimeError("Cannot find joint configuration for point A") + + # 规划路径 + path_object_to_a = self.path_planner.plan_auto(config_object, config_a) + optimized_path_object_to_a = self.path_optimizer.optimize_path(path_object_to_a, self.collision_checker) + + # 可视化第二段路径(绿色) + self._visualize_path_segment(optimized_path_object_to_a, [0, 1, 0], "To A") + + # 执行第二段路径 + self.update_status("Stage 2: Moving to point A...") + success = self.path_executor.execute_path(optimized_path_object_to_a, gripper_action=None) if not success: raise RuntimeError("Stage 2 failed: Could not reach point A") - # Stage 3: Execute path from A to B using pre-planned segment - self.update_status("Stage 3: Moving to point B...") - path_to_b = self.planned_path[self.point_a_index:] + # =================== + # 第三阶段:A点 → B点 + # =================== + self.update_status("Stage 3: Planning path from A to B...") - success = self.path_executor.execute_path(path_to_b, gripper_action=None) + # 计算B点的逆运动学 + config_b = self.arm_controller.inverse_kinematics(point_b, seed_angles=config_a) + if config_b is None: + raise RuntimeError("Cannot find joint configuration for point B") + + # 规划路径 + path_a_to_b = self.path_planner.plan_auto(config_a, config_b) + optimized_path_a_to_b = self.path_optimizer.optimize_path(path_a_to_b, self.collision_checker) + + # 可视化第三段路径(蓝色) + self._visualize_path_segment(optimized_path_a_to_b, [0, 0, 1], "To B") + + # 执行第三段路径 + self.update_status("Stage 3: Moving to point B...") + success = self.path_executor.execute_path(optimized_path_a_to_b, gripper_action=None) if not success: raise RuntimeError("Stage 3 failed: Could not reach point B") - # Open gripper at B + # 释放物体 self.update_status("Opening gripper at point B...") if hasattr(self.arm_controller, 'open_gripper'): self.arm_controller.open_gripper() time.sleep(0.5) + # 添加任务点标记 + self._add_point_markers(object_position, point_a, point_b) + self.update_status("All three stages completed successfully!") except Exception as e: import traceback - self.update_status(f"Path execution error: {str(e)}") + self.update_status(f"Execution error: {str(e)}") print("Full traceback:") traceback.print_exc() finally: - # Restore collision checking + # 恢复碰撞检测 self.collision_checker.ignore_transport_object = False - - def _visualize_path(self, path): - """Visualize the planned path in PyBullet""" - try: - # Remove old visualization if exists - if hasattr(self, 'path_visualization'): - for line_id in self.path_visualization: - p.removeUserDebugItem(line_id) - - # Remove old markers if exists - if hasattr(self, 'point_markers'): - for marker_id in self.point_markers: - p.removeBody(marker_id) - + # 恢复仿真运行状态 + if was_running: + self.start_simulation() + + def _visualize_path_segment(self, path, color, label): + """可视化单个路径段 - 显示实际执行轨迹""" + if not hasattr(self, 'path_visualization'): self.path_visualization = [] - self.point_markers = [] - - # Get task points and object position for markers - task_points = self.config_loader.get_task_points() - point_a_pos = task_points['point_A']['position'] - point_b_pos = task_points['point_B']['position'] - object_pos = self.config_loader.get_full_config()['transport_object']['initial_position'] - - # Draw path as lines between waypoints(仅用 KDL FK 计算,不改仿真状态) - for i in range(len(path) - 1): - start_pos, _ = self.arm_controller.forward_kinematics(path[i]) - end_pos, _ = self.arm_controller.forward_kinematics(path[i + 1]) - - # Choose color based on segment - if hasattr(self, 'object_index') and i < self.object_index: - # Path from initial to object - Purple - color = [0.8, 0, 0.8] - elif hasattr(self, 'point_a_index') and i < self.point_a_index: - # Path from object to A - Green - color = [0, 1, 0] - else: - # Path from A to B - Blue - color = [0, 0, 1] - - # Draw line - line_id = p.addUserDebugLine( - start_pos, end_pos, - lineColorRGB=color, - lineWidth=2, - physicsClientId=self.physics_client + + # 导入numpy用于插值计算 + import numpy as np + + # 对每两个waypoint之间进行插值,显示实际轨迹 + for i in range(len(path) - 1): + current_config = path[i] + target_config = path[i + 1] + + # 计算插值步数(与path_executor相同的逻辑) + distance = np.linalg.norm(np.array(target_config) - np.array(current_config)) + if distance < 0.001: # 跳过太近的点 + continue + + # 使用与path_executor相同的参数 + velocity_scaling = self.config_loader.get_full_config()['path_planning']['execution']['velocity_scaling'] + timestep = self.config_loader.get_full_config()['simulation']['timestep'] + + move_time = distance / velocity_scaling + num_steps = max(5, int(move_time / timestep)) # 至少5步以保证平滑 + + # 插值并绘制 + prev_pos = None + for step in range(num_steps + 1): + ratio = step / num_steps + interpolated = ( + np.array(current_config) * (1 - ratio) + + np.array(target_config) * ratio ) - self.path_visualization.append(line_id) - - # Add markers for object, A and B points - # Marker for object - Orange sphere - sphere_obj = p.createVisualShape( - p.GEOM_SPHERE, - radius=0.05, - rgbaColor=[1, 0.5, 0, 0.8], # Orange - physicsClientId=self.physics_client - ) - marker_obj = p.createMultiBody( - baseVisualShapeIndex=sphere_obj, - basePosition=object_pos, - physicsClientId=self.physics_client - ) - self.point_markers.append(marker_obj) - - # Marker for point A - Yellow sphere - sphere_a = p.createVisualShape( - p.GEOM_SPHERE, - radius=0.05, - rgbaColor=[1, 1, 0, 0.8], # Yellow - physicsClientId=self.physics_client - ) - marker_a = p.createMultiBody( - baseVisualShapeIndex=sphere_a, - basePosition=point_a_pos, - physicsClientId=self.physics_client - ) - self.point_markers.append(marker_a) - - # Marker for point B - Red sphere - sphere_b = p.createVisualShape( - p.GEOM_SPHERE, - radius=0.05, - rgbaColor=[1, 0, 0, 0.8], # Red - physicsClientId=self.physics_client - ) - marker_b = p.createMultiBody( - baseVisualShapeIndex=sphere_b, - basePosition=point_b_pos, - physicsClientId=self.physics_client - ) - self.point_markers.append(marker_b) - - # Add text labels - p.addUserDebugText( - "OBJ", object_pos, - textColorRGB=[1, 0.5, 0], - textSize=1.5, - physicsClientId=self.physics_client - ) - p.addUserDebugText( - "A", point_a_pos, - textColorRGB=[1, 1, 0], - textSize=1.5, - physicsClientId=self.physics_client - ) - p.addUserDebugText( - "B", point_b_pos, - textColorRGB=[1, 0, 0], - textSize=1.5, - physicsClientId=self.physics_client - ) - - # 可选:不恢复任何机器人状态(可视化不改状态) - - except Exception as e: - self.update_status(f"Path visualization failed: {e}") + + # 计算插值点的末端位置 + pos, _ = self.arm_controller.forward_kinematics(interpolated.tolist()) + + if prev_pos is not None: + # 画线段 + line_id = p.addUserDebugLine( + prev_pos, pos, + lineColorRGB=color, + lineWidth=2, + physicsClientId=self.physics_client + ) + self.path_visualization.append(line_id) + + prev_pos = pos + + self.update_status(f"Visualized {label} path: {len(path)} waypoints with interpolation") + + def _clear_visualizations(self): + """清除所有可视化元素""" + # 清除路径线条 + if hasattr(self, 'path_visualization'): + for line_id in self.path_visualization: + p.removeUserDebugItem(line_id) + self.path_visualization = [] + + # 清除点标记 + if hasattr(self, 'point_markers'): + for marker_id in self.point_markers: + p.removeBody(marker_id) + self.point_markers = [] + + def _add_point_markers(self, object_pos, point_a_pos, point_b_pos): + """添加任务点标记""" + if not hasattr(self, 'point_markers'): + self.point_markers = [] + + # 物体标记 - 橙色 + sphere_obj = p.createVisualShape( + p.GEOM_SPHERE, + radius=0.05, + rgbaColor=[1, 0.5, 0, 0.8], + physicsClientId=self.physics_client + ) + marker_obj = p.createMultiBody( + baseVisualShapeIndex=sphere_obj, + basePosition=object_pos, + physicsClientId=self.physics_client + ) + self.point_markers.append(marker_obj) + + # A点标记 - 黄色 + sphere_a = p.createVisualShape( + p.GEOM_SPHERE, + radius=0.05, + rgbaColor=[1, 1, 0, 0.8], + physicsClientId=self.physics_client + ) + marker_a = p.createMultiBody( + baseVisualShapeIndex=sphere_a, + basePosition=point_a_pos, + physicsClientId=self.physics_client + ) + self.point_markers.append(marker_a) + + # B点标记 - 红色 + sphere_b = p.createVisualShape( + p.GEOM_SPHERE, + radius=0.05, + rgbaColor=[1, 0, 0, 0.8], + physicsClientId=self.physics_client + ) + marker_b = p.createMultiBody( + baseVisualShapeIndex=sphere_b, + basePosition=point_b_pos, + physicsClientId=self.physics_client + ) + self.point_markers.append(marker_b) + + # 添加文本标签 + p.addUserDebugText("OBJ", object_pos, textColorRGB=[1, 0.5, 0], textSize=1.5, physicsClientId=self.physics_client) + p.addUserDebugText("A", point_a_pos, textColorRGB=[1, 1, 0], textSize=1.5, physicsClientId=self.physics_client) + p.addUserDebugText("B", point_b_pos, textColorRGB=[1, 0, 0], textSize=1.5, physicsClientId=self.physics_client) def _reset_camera_view(self): """Reset camera to default view""" diff --git a/src/planning/ai_rrt_star.py b/src/planning/ai_rrt_star.py index a8ecd22..0abbf71 100644 --- a/src/planning/ai_rrt_star.py +++ b/src/planning/ai_rrt_star.py @@ -206,10 +206,14 @@ class AIRRTStarPlanner: if np.linalg.norm(new_config - np.array(goal_config)) < GOAL_TOLERANCE: # 找到路径,提取并返回 path = self._extract_path(new_node) - + + # 确保路径以精确的目标配置结束 + if not np.allclose(path[-1], goal_config, atol=1e-6): + path.append(goal_config.tolist() if isinstance(goal_config, np.ndarray) else goal_config) + # 缓存成功配置 self._cache_success_configs(path) - + return path else: self.params.update(success=False) diff --git a/src/planning/path_executor.py b/src/planning/path_executor.py index d41ff70..75b669b 100644 --- a/src/planning/path_executor.py +++ b/src/planning/path_executor.py @@ -70,19 +70,15 @@ class PathExecutor: return True def _move_to_configuration(self, target_config: List[float]) -> bool: - + current_config = self.arm_controller.get_current_joint_positions() - - + + distance = np.linalg.norm( np.array(target_config) - np.array(current_config) ) - - - if distance < self.position_tolerance: - return True - - + + move_time = distance / self.velocity_scaling num_steps = int(move_time / self.timestep) + 1