diff --git a/config.json b/config.json index 312afb1..9f2eb4b 100644 --- a/config.json +++ b/config.json @@ -67,7 +67,7 @@ "initial_position": [ 1.0, 0.5, - 0.5 + 0.7 ], "dimensions": { "width": 0.1, @@ -124,7 +124,7 @@ }, "collision": { "check_resolution": 0.05, - "safety_margin": 0.02 + "safety_margin": 0.01 }, "optimization": { "shortcut_iterations": 50, diff --git a/src/gui/main_window.py b/src/gui/main_window.py index b7b593d..ca5141f 100644 --- a/src/gui/main_window.py +++ b/src/gui/main_window.py @@ -348,6 +348,22 @@ class MainWindow: if self.environment: self.environment.reset_environment() + # Clear path visualization + if hasattr(self, 'path_visualization'): + for line_id in self.path_visualization: + p.removeUserDebugItem(line_id) + self.path_visualization = [] + + # Clear point markers + if hasattr(self, 'point_markers'): + for marker_id in self.point_markers: + p.removeBody(marker_id) + self.point_markers = [] + + # Reset planned path + self.planned_path = None + self.execute_btn.config(state=tk.DISABLED) + # Reset camera self._reset_camera_view() @@ -426,7 +442,8 @@ class MainWindow: self.path_planner = AIRRTStarPlanner( self.arm_controller, self.environment, - self.config_loader + self.config_loader, + self.collision_checker # Pass the shared collision checker ) self.path_optimizer = PathOptimizer( self.arm_controller, @@ -449,40 +466,82 @@ class MainWindow: self.pause_simulation() try: - # Get task points + # Get task points and object position task_points = self.config_loader.get_task_points() point_a = task_points['point_A']['position'] point_b = task_points['point_B']['position'] + # Get transport object position from config + transport_object_config = self.config_loader.get_full_config()['transport_object'] + object_position = transport_object_config['initial_position'] + # Get current joint configuration current_config = self.arm_controller.get_current_joint_positions() # Convert points to joint configurations using inverse kinematics self.update_status("Computing target configurations...") - config_a = self.arm_controller.inverse_kinematics(point_a, seed_angles=current_config) + + # Object position configuration + config_object = self.arm_controller.inverse_kinematics(object_position, seed_angles=current_config) + if config_object is None: + raise RuntimeError("Cannot find joint configuration for object position") + + # A point configuration + config_a = self.arm_controller.inverse_kinematics(point_a, seed_angles=config_object) if config_a is None: raise RuntimeError("Cannot find joint configuration for point A") + # B point configuration config_b = self.arm_controller.inverse_kinematics(point_b, seed_angles=config_a) if config_b is None: raise RuntimeError("Cannot find joint configuration for point B") - # Plan path (通用避障;若配置要求强制穿洞,内部会选择穿洞策略) - self.update_status("Planning path...") - raw_path = self.path_planner.plan_auto(config_a, config_b) + # Plan three path segments + # Segment 1: Current position to object (ignore transport object collision) + self.update_status("Planning path from current position to object...") + # Temporarily ignore transport object for approaching it + self.collision_checker.ignore_transport_object = True + try: + path_to_object = self.path_planner.plan_auto(current_config, config_object) + finally: + # Restore collision checking for transport object + self.collision_checker.ignore_transport_object = False - # Optimize path - self.update_status("Optimizing path...") - optimized_path = self.path_optimizer.optimize_path(raw_path, self.collision_checker) + # Segment 2: Object to A + self.update_status("Planning path from object to A...") + path_object_to_a = self.path_planner.plan_auto(config_object, config_a) - self.planned_path = optimized_path + # Segment 3: A to B (通用避障;若配置要求强制穿洞,内部会选择穿洞策略) + self.update_status("Planning path from A to B...") + path_a_to_b = self.path_planner.plan_auto(config_a, config_b) + + # Optimize paths separately to maintain segment information + self.update_status("Optimizing path segments...") + # Optimize path to object (ignore transport object collision) + self.collision_checker.ignore_transport_object = True + try: + optimized_path_to_object = self.path_optimizer.optimize_path(path_to_object, self.collision_checker) + finally: + self.collision_checker.ignore_transport_object = False + + optimized_path_object_to_a = self.path_optimizer.optimize_path(path_object_to_a, self.collision_checker) + optimized_path_a_to_b = self.path_optimizer.optimize_path(path_a_to_b, self.collision_checker) + + # Merge optimized paths (remove duplicate points) + self.planned_path = (optimized_path_to_object[:-1] + + optimized_path_object_to_a[:-1] + + optimized_path_a_to_b) + + # Store segment boundaries + self.object_index = len(optimized_path_to_object) - 1 + self.point_a_index = self.object_index + len(optimized_path_object_to_a) - 1 path_length = len(self.planned_path) - self.update_status(f"Path planning successful! Path contains {path_length} waypoints") + self.update_status(f"Path planning successful! Full path contains {path_length} waypoints") # Enable execute button self.execute_btn.config(state=tk.NORMAL) - # Visualize path (optional) + # Visualize path with segments self._visualize_path(self.planned_path) except Exception as e: @@ -506,13 +565,57 @@ class MainWindow: self.update_status("Executing planned path...") try: - # Execute path with gripper control - success = self.path_executor.execute_path(self.planned_path, gripper_action='close') - - if success: + # Split path into three segments + if hasattr(self, 'object_index') and hasattr(self, 'point_a_index'): + # Segment 1: Initial to object (no gripper action) + path_to_object = self.planned_path[:self.object_index + 1] + # Segment 2: Object to A (gripper closed) + path_object_to_a = self.planned_path[self.object_index:self.point_a_index + 1] + # Segment 3: A to B (gripper closed) + path_a_to_b = self.planned_path[self.point_a_index:] + + # Execute path to object + self.update_status("Moving to object...") + success = self.path_executor.execute_path(path_to_object, gripper_action=None) + if not success: + self.update_status("Failed to reach object") + return + + # Close gripper at object + self.update_status("Closing gripper at object...") + if hasattr(self.arm_controller, 'close_gripper'): + self.arm_controller.close_gripper() + import time + time.sleep(0.5) # Wait for gripper to close + + # Execute path from object to A + self.update_status("Moving object to point A...") + success = self.path_executor.execute_path(path_object_to_a, gripper_action=None) + if not success: + self.update_status("Failed to reach point A") + return + + # Execute path from A to B + self.update_status("Moving from A to B through hole...") + success = self.path_executor.execute_path(path_a_to_b, gripper_action=None) + if not success: + self.update_status("Failed to reach point B") + return + + # Open gripper at B + self.update_status("Opening gripper at point B...") + if hasattr(self.arm_controller, 'open_gripper'): + self.arm_controller.open_gripper() + time.sleep(0.5) # Wait for gripper to open + self.update_status("Path execution completed successfully!") else: - self.update_status("Path execution failed") + # Fallback to old behavior if no segment information + success = self.path_executor.execute_path(self.planned_path, gripper_action=None) + if success: + self.update_status("Path execution completed!") + else: + self.update_status("Path execution failed") except Exception as e: import traceback @@ -528,22 +631,108 @@ class MainWindow: for line_id in self.path_visualization: p.removeUserDebugItem(line_id) + # Remove old markers if exists + if hasattr(self, 'point_markers'): + for marker_id in self.point_markers: + p.removeBody(marker_id) + self.path_visualization = [] + self.point_markers = [] + + # Get task points and object position for markers + task_points = self.config_loader.get_task_points() + point_a_pos = task_points['point_A']['position'] + point_b_pos = task_points['point_B']['position'] + object_pos = self.config_loader.get_full_config()['transport_object']['initial_position'] # Draw path as lines between waypoints(仅用 KDL FK 计算,不改仿真状态) for i in range(len(path) - 1): start_pos, _ = self.arm_controller.forward_kinematics(path[i]) end_pos, _ = self.arm_controller.forward_kinematics(path[i + 1]) + # Choose color based on segment + if hasattr(self, 'object_index') and i < self.object_index: + # Path from initial to object - Purple + color = [0.8, 0, 0.8] + elif hasattr(self, 'point_a_index') and i < self.point_a_index: + # Path from object to A - Green + color = [0, 1, 0] + else: + # Path from A to B - Blue + color = [0, 0, 1] + # Draw line line_id = p.addUserDebugLine( start_pos, end_pos, - lineColorRGB=[0, 1, 0], # Green color + lineColorRGB=color, lineWidth=2, physicsClientId=self.physics_client ) self.path_visualization.append(line_id) + # Add markers for object, A and B points + # Marker for object - Orange sphere + sphere_obj = p.createVisualShape( + p.GEOM_SPHERE, + radius=0.05, + rgbaColor=[1, 0.5, 0, 0.8], # Orange + physicsClientId=self.physics_client + ) + marker_obj = p.createMultiBody( + baseVisualShapeIndex=sphere_obj, + basePosition=object_pos, + physicsClientId=self.physics_client + ) + self.point_markers.append(marker_obj) + + # Marker for point A - Yellow sphere + sphere_a = p.createVisualShape( + p.GEOM_SPHERE, + radius=0.05, + rgbaColor=[1, 1, 0, 0.8], # Yellow + physicsClientId=self.physics_client + ) + marker_a = p.createMultiBody( + baseVisualShapeIndex=sphere_a, + basePosition=point_a_pos, + physicsClientId=self.physics_client + ) + self.point_markers.append(marker_a) + + # Marker for point B - Red sphere + sphere_b = p.createVisualShape( + p.GEOM_SPHERE, + radius=0.05, + rgbaColor=[1, 0, 0, 0.8], # Red + physicsClientId=self.physics_client + ) + marker_b = p.createMultiBody( + baseVisualShapeIndex=sphere_b, + basePosition=point_b_pos, + physicsClientId=self.physics_client + ) + self.point_markers.append(marker_b) + + # Add text labels + p.addUserDebugText( + "OBJ", object_pos, + textColorRGB=[1, 0.5, 0], + textSize=1.5, + physicsClientId=self.physics_client + ) + p.addUserDebugText( + "A", point_a_pos, + textColorRGB=[1, 1, 0], + textSize=1.5, + physicsClientId=self.physics_client + ) + p.addUserDebugText( + "B", point_b_pos, + textColorRGB=[1, 0, 0], + textSize=1.5, + physicsClientId=self.physics_client + ) + # 可选:不恢复任何机器人状态(可视化不改状态) except Exception as e: diff --git a/src/planning/ai_rrt_star.py b/src/planning/ai_rrt_star.py index ee156ec..efb4dff 100644 --- a/src/planning/ai_rrt_star.py +++ b/src/planning/ai_rrt_star.py @@ -86,7 +86,7 @@ class AdaptiveParameters: class AIRRTStarPlanner: """AI增强的RRT*路径规划器""" - def __init__(self, arm_controller, environment, config_loader): + def __init__(self, arm_controller, environment, config_loader, collision_checker=None): """ 初始化AI RRT*规划器 @@ -94,6 +94,7 @@ class AIRRTStarPlanner: arm_controller: 机械臂控制器实例 environment: 环境管理实例 config_loader: 配置加载器 + collision_checker: 碰撞检测器实例(可选,如果不提供则创建新的) """ self.arm_controller = arm_controller self.environment = environment @@ -110,7 +111,10 @@ class AIRRTStarPlanner: self.enforce_hole = self.rrt_config.get('enforce_hole_crossing', False) # 初始化组件 - self.collision_checker = CollisionChecker(arm_controller, environment, config_loader) + if collision_checker is None: + self.collision_checker = CollisionChecker(arm_controller, environment, config_loader) + else: + self.collision_checker = collision_checker self.hole_strategy = HoleCrossingStrategy(arm_controller, environment, config_loader) # 获取关节限位 diff --git a/src/planning/collision_checker.py b/src/planning/collision_checker.py index 4d26cef..1621170 100644 --- a/src/planning/collision_checker.py +++ b/src/planning/collision_checker.py @@ -32,6 +32,9 @@ class CollisionChecker: self.robot_loader = arm_controller.robot_loader self.robot_id = self.robot_loader.get_robot_id() self.physics_client = self.robot_loader.physics_client + + # 碰撞检测时是否忽略运输物体 + self.ignore_transport_object = False def _iter_obstacles(self): """遍历需要考虑的环境障碍物 body ids(墙体各分段、可选 transport 物体)。""" @@ -41,8 +44,9 @@ class CollisionChecker: if wid is not None: yield wid # 可选地加入运输物体(若要把它也视为障碍) - if getattr(self.environment, 'transport_object_id', None) is not None: - yield self.environment.transport_object_id + transport_id = getattr(self.environment, 'transport_object_id', None) + if not self.ignore_transport_object and transport_id is not None: + yield transport_id def check_collision(self, joint_config: List[float]) -> bool: """检测给定关节配置是否发生碰撞(含安全边界)""" diff --git a/src/planning/path_optimizer.py b/src/planning/path_optimizer.py index 910148b..0aa2032 100644 --- a/src/planning/path_optimizer.py +++ b/src/planning/path_optimizer.py @@ -198,12 +198,18 @@ class PathOptimizer: point = (1 - alpha) * np.array(path[idx-1]) + alpha * np.array(path[idx]) # 验证点的有效性 - if not collision_checker.check_collision(point.tolist()): - smoothed_path.append(point.tolist()) + # 确保 point 是列表格式 + if isinstance(point, np.ndarray): + point = point.tolist() + elif not isinstance(point, list): + point = list(point) + + if not collision_checker.check_collision(point): + smoothed_path.append(point) - # 如果平滑失败,返回原路径 + # 验证平滑结果 if len(smoothed_path) < 2: - return path + raise RuntimeError(f"Path smoothing failed: only {len(smoothed_path)} valid points generated") return smoothed_path