From e617a7c28e41ebb6a859152a0dba909bc28fa4e1 Mon Sep 17 00:00:00 2001 From: sladro Date: Fri, 12 Sep 2025 15:21:36 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=EF=BC=9A=E5=9B=BA=E5=AE=9A?= =?UTF-8?q?=E5=BA=95=E5=BA=A7=E3=80=81=E6=97=A0=E5=89=AF=E4=BD=9C=E7=94=A8?= =?UTF-8?q?=E7=A2=B0=E6=92=9E=E6=A3=80=E6=B5=8B=E3=80=81=E9=80=9A=E7=94=A8?= =?UTF-8?q?=E9=81=BF=E9=9A=9C=E4=B8=8E=E6=89=A7=E8=A1=8C=E6=94=B6=E6=95=9B?= =?UTF-8?q?=E7=AD=89=E5=BE=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.json | 5 +- src/gui/main_window.py | 27 +-- src/planning/ai_rrt_star.py | 10 +- src/planning/collision_checker.py | 308 ++++++++++++------------------ src/planning/path_executor.py | 29 +-- src/robot/arm_controller.py | 9 +- src/robot/robot_loader.py | 5 +- src/simulation/environment.py | 11 +- 8 files changed, 181 insertions(+), 223 deletions(-) diff --git a/config.json b/config.json index 1509218..cb061af 100644 --- a/config.json +++ b/config.json @@ -119,7 +119,8 @@ "step_size": 0.1, "goal_sample_rate": 0.15, "search_radius": 0.5, - "hole_bias_rate": 0.3 + "hole_bias_rate": 0.3, + "enforce_hole_crossing": false }, "collision": { "check_resolution": 0.05, @@ -134,4 +135,4 @@ "position_tolerance": 0.01 } } -} \ No newline at end of file +} diff --git a/src/gui/main_window.py b/src/gui/main_window.py index 098a69d..b7b593d 100644 --- a/src/gui/main_window.py +++ b/src/gui/main_window.py @@ -443,6 +443,10 @@ class MainWindow: def plan_path(self): """Plan path from A to B through hole using AI RRT*""" self.update_status("Starting AI RRT* path planning...") + # 暂停仿真线程,避免规划期间并发 step 引入副作用 + was_running = self.simulation_running + if was_running: + self.pause_simulation() try: # Get task points @@ -463,9 +467,9 @@ class MainWindow: if config_b is None: raise RuntimeError("Cannot find joint configuration for point B") - # Plan path through hole - self.update_status("Planning path through hole...") - raw_path = self.path_planner.plan_through_hole(config_a, config_b) + # Plan path (通用避障;若配置要求强制穿洞,内部会选择穿洞策略) + self.update_status("Planning path...") + raw_path = self.path_planner.plan_auto(config_a, config_b) # Optimize path self.update_status("Optimizing path...") @@ -480,7 +484,7 @@ class MainWindow: # Visualize path (optional) self._visualize_path(self.planned_path) - + except Exception as e: import traceback self.update_status(f"Path planning failed: {str(e)}") @@ -488,6 +492,10 @@ class MainWindow: traceback.print_exc() self.planned_path = None self.execute_btn.config(state=tk.DISABLED) + finally: + # 规划完成后恢复仿真运行状态 + if was_running: + self.start_simulation() def execute_path(self): """Execute the planned path""" @@ -522,13 +530,9 @@ class MainWindow: self.path_visualization = [] - # Draw path as lines between waypoints + # Draw path as lines between waypoints(仅用 KDL FK 计算,不改仿真状态) for i in range(len(path) - 1): - # Set robot to configuration to get end-effector position - self.arm_controller.set_joint_positions(path[i]) start_pos, _ = self.arm_controller.forward_kinematics(path[i]) - - self.arm_controller.set_joint_positions(path[i + 1]) end_pos, _ = self.arm_controller.forward_kinematics(path[i + 1]) # Draw line @@ -540,8 +544,7 @@ class MainWindow: ) self.path_visualization.append(line_id) - # Restore original configuration - self.arm_controller.set_joint_positions(path[0]) + # 可选:不恢复任何机器人状态(可视化不改状态) except Exception as e: self.update_status(f"Path visualization failed: {e}") @@ -615,4 +618,4 @@ class MainWindow: if __name__ == "__main__": # Test the main window directly app = MainWindow() - app.run() \ No newline at end of file + app.run() diff --git a/src/planning/ai_rrt_star.py b/src/planning/ai_rrt_star.py index 78f9b02..ee156ec 100644 --- a/src/planning/ai_rrt_star.py +++ b/src/planning/ai_rrt_star.py @@ -106,6 +106,8 @@ class AIRRTStarPlanner: # 初始化参数 self.max_iterations = self.rrt_config['max_iterations'] self.params = AdaptiveParameters(self.rrt_config) + # 是否强制穿洞(默认否) + self.enforce_hole = self.rrt_config.get('enforce_hole_crossing', False) # 初始化组件 self.collision_checker = CollisionChecker(arm_controller, environment, config_loader) @@ -409,4 +411,10 @@ class AIRRTStarPlanner: # 恢复原始偏向率 self.hole_strategy.hole_bias_rate = original_rate - return path \ No newline at end of file + return path + + def plan_auto(self, start_config: List[float], goal_config: List[float]) -> List[List[float]]: + """根据配置选择是否强制穿洞;默认进行通用避障路径规划。""" + if self.enforce_hole: + return self.plan_through_hole(start_config, goal_config) + return self.plan(start_config, goal_config) diff --git a/src/planning/collision_checker.py b/src/planning/collision_checker.py index 871f2a6..857534a 100644 --- a/src/planning/collision_checker.py +++ b/src/planning/collision_checker.py @@ -2,230 +2,160 @@ """ Collision Checker Module -碰撞检测封装模块,提供高效的碰撞检测接口。 -封装PyBullet碰撞检测功能,支持批量检测和缓存优化。 - -所有参数从config.json读取,无硬编码。 +路径规划阶段的碰撞检测工具。 +核心目标:任何碰撞检查都不改变真实仿真世界(无副作用)。 +做到这一点的方法是:在 save/restore 的状态下用 resetJointState 设置“试配置”, +只触发碰撞检测(performCollisionDetection),而不 stepSimulation。 """ import numpy as np -from typing import List, Tuple, Dict, Any, Optional +from typing import List, Tuple import pybullet as p class CollisionChecker: - """碰撞检测器""" - + """碰撞检测器(规划期无副作用)""" + def __init__(self, arm_controller, environment, config_loader): - """ - 初始化碰撞检测器 - - Args: - arm_controller: 机械臂控制器 - environment: 环境管理器 - config_loader: 配置加载器 - """ + # 依赖 self.arm_controller = arm_controller self.environment = environment self.config_loader = config_loader - - # 从配置读取碰撞检测参数 - config = config_loader.get_full_config() - collision_config = config['path_planning']['collision'] - self.check_resolution = collision_config['check_resolution'] - self.safety_margin = collision_config['safety_margin'] - - # 获取机器人和环境信息 + + # 配置 + cfg = config_loader.get_full_config() + c_cfg = cfg['path_planning']['collision'] + self.check_resolution = c_cfg['check_resolution'] + self.safety_margin = c_cfg['safety_margin'] + + # 资源句柄 self.robot_loader = arm_controller.robot_loader self.robot_id = self.robot_loader.get_robot_id() self.physics_client = self.robot_loader.physics_client - + + def _iter_obstacles(self): + """遍历需要考虑的环境障碍物 body ids(墙体各分段、可选 transport 物体)。""" + # 墙体可能由多个 body 组成 + wall_parts = getattr(self.environment, '_wall_part_ids', []) or [] + for wid in wall_parts: + if wid is not None: + yield wid + # 可选地加入运输物体(若要把它也视为障碍) + if getattr(self.environment, 'transport_object_id', None) is not None: + yield self.environment.transport_object_id + def check_collision(self, joint_config: List[float]) -> bool: - """ - 检测给定关节配置是否发生碰撞 - - Args: - joint_config: 关节配置 - - Returns: - True if collision detected, False otherwise - """ - # 验证关节配置 + """检测给定关节配置是否发生碰撞(含安全边界)""" + # 基本有效性 if not self.arm_controller.validate_joint_configuration(joint_config): - return True # 无效配置视为碰撞 - - # 保存当前配置 - original_config = self.arm_controller.get_current_joint_positions() - - # 设置机器人到指定配置 - self.robot_loader.set_joint_positions(joint_config) - - # 执行一步仿真以更新碰撞信息 - p.stepSimulation(physicsClientId=self.physics_client) - - # 检测自碰撞 - has_self_collision = self.robot_loader.check_self_collision() - - # 检测与环境碰撞 - has_env_collision = False - - # 检测与墙体的碰撞 - if self.environment.wall_id is not None: - contact_points = p.getContactPoints( - bodyA=self.robot_id, - bodyB=self.environment.wall_id, - physicsClientId=self.physics_client - ) - if len(contact_points) > 0: - # 检查接触距离是否超过安全边界 - for contact in contact_points: - contact_distance = contact[8] # Contact distance - if contact_distance < -self.safety_margin: - has_env_collision = True - break - - # 检测与地面的碰撞(排除基座) - if self.environment.ground_plane_id is not None: - contact_points = p.getContactPoints( - bodyA=self.robot_id, - bodyB=self.environment.ground_plane_id, - physicsClientId=self.physics_client - ) - # 过滤掉基座的接触点 - for contact in contact_points: - link_index = contact[3] # Link index on robot - if link_index > 0: # 非基座链接 - contact_distance = contact[8] - if contact_distance < -self.safety_margin: - has_env_collision = True - break - - # 恢复原始配置 - self.robot_loader.set_joint_positions(original_config) - - return has_self_collision or has_env_collision - + return True + + state_id = p.saveState(physicsClientId=self.physics_client) + try: + # 将机器人打到“试配置”(仅状态,不经电机/动力学) + joint_indices = self.robot_loader.joint_indices + if len(joint_config) != len(joint_indices): + return True + for i, jid in enumerate(joint_indices): + p.resetJointState(self.robot_id, jid, float(joint_config[i]), physicsClientId=self.physics_client) + + # 触发碰撞检测(不推进世界时间) + try: + p.performCollisionDetection(physicsClientId=self.physics_client) + except Exception: + pass + + # 自碰撞(严格接触) + if len(p.getContactPoints(self.robot_id, self.robot_id, physicsClientId=self.physics_client)) > 0: + return True + + # 环境碰撞(含安全边界,使用最近点) + # 与墙体各分段/其他障碍 + for obs_id in self._iter_obstacles(): + cps = p.getClosestPoints( + self.robot_id, + int(obs_id), + distance=float(max(0.0, self.safety_margin)), + physicsClientId=self.physics_client, + ) + if len(cps) > 0: + return True + + # 与地面(排除基座) + if self.environment.ground_plane_id is not None: + cps = p.getClosestPoints( + self.robot_id, + self.environment.ground_plane_id, + distance=float(max(0.0, self.safety_margin)), + physicsClientId=self.physics_client, + ) + for cp in cps: + # cp[3]: link index on robot + if int(cp[3]) > 0: + return True + + return False + finally: + p.restoreState(state_id, physicsClientId=self.physics_client) + def check_collision_batch(self, joint_configs: List[List[float]]) -> List[bool]: - """ - 批量检测多个关节配置的碰撞状态 - - Args: - joint_configs: 关节配置列表 - - Returns: - 碰撞检测结果列表 - """ - results = [] - - # 保存原始配置 - original_config = self.arm_controller.get_current_joint_positions() - - for config in joint_configs: - has_collision = self.check_collision(config) - results.append(has_collision) - - # 恢复原始配置 - self.robot_loader.set_joint_positions(original_config) - - return results - + """批量检查配置是否碰撞(无副作用)""" + return [self.check_collision(cfg) for cfg in joint_configs] + def check_path_collision(self, path: List[List[float]], resolution: float = None) -> bool: - """ - 检测整条路径是否存在碰撞 - - Args: - path: 路径点列表 - resolution: 检测分辨率 - - Returns: - True if any collision detected along path - """ + """检测整条路径是否存在碰撞""" if len(path) < 2: return False - if resolution is None: resolution = self.check_resolution - - # 保存原始配置 - original_config = self.arm_controller.get_current_joint_positions() - - # 检测路径上的每个段 + for i in range(len(path) - 1): start = np.array(path[i]) end = np.array(path[i + 1]) - - # 计算需要检测的中间点数量 distance = np.linalg.norm(end - start) num_checks = max(2, int(distance / resolution)) - - # 检测路径段上的点 for j in range(num_checks): t = j / (num_checks - 1) - interpolated = start + t * (end - start) - - if self.check_collision(interpolated.tolist()): - # 恢复原始配置 - self.robot_loader.set_joint_positions(original_config) + interpolated = (start + t * (end - start)).tolist() + if self.check_collision(interpolated): return True - - # 恢复原始配置 - self.robot_loader.set_joint_positions(original_config) return False - + def get_obstacle_distance(self, joint_config: List[float]) -> float: - """ - 获取机械臂到最近障碍物的距离 - - Args: - joint_config: 关节配置 - - Returns: - 最小距离值 - """ - # 保存原始配置 - original_config = self.arm_controller.get_current_joint_positions() - - # 设置机器人配置 - self.robot_loader.set_joint_positions(joint_config) - p.stepSimulation(physicsClientId=self.physics_client) - - min_distance = float('inf') - - # 计算到墙体的最近距离 - if self.environment.wall_id is not None: - closest_points = p.getClosestPoints( - bodyA=self.robot_id, - bodyB=self.environment.wall_id, - distance=1.0, # Maximum search distance - physicsClientId=self.physics_client - ) - - for point in closest_points: - distance = point[8] # Distance between points - min_distance = min(min_distance, distance) - - # 恢复原始配置 - self.robot_loader.set_joint_positions(original_config) - - return min_distance - + """估算到墙体的最近距离(米)""" + state_id = p.saveState(physicsClientId=self.physics_client) + try: + joint_indices = self.robot_loader.joint_indices + if len(joint_config) != len(joint_indices): + return float('inf') + for i, jid in enumerate(joint_indices): + p.resetJointState(self.robot_id, jid, float(joint_config[i]), physicsClientId=self.physics_client) + try: + p.performCollisionDetection(physicsClientId=self.physics_client) + except Exception: + pass + + min_distance = float('inf') + for obs_id in self._iter_obstacles(): + cps = p.getClosestPoints( + self.robot_id, + int(obs_id), + distance=1.0, + physicsClientId=self.physics_client, + ) + for cp in cps: + d = float(cp[8]) + if d < min_distance: + min_distance = d + return min_distance + finally: + p.restoreState(state_id, physicsClientId=self.physics_client) + def validate_configuration(self, joint_config: List[float]) -> Tuple[bool, str]: - """ - 验证关节配置的有效性和安全性 - - Args: - joint_config: 关节配置 - - Returns: - (是否有效, 错误信息) - """ - # 检查关节限位 + """验证配置是否在限位内且无碰撞""" is_valid, violations = self.arm_controller.check_joint_limits(joint_config) if not is_valid: return False, f"Joint limit violations: {', '.join(violations)}" - - # 检查碰撞 if self.check_collision(joint_config): return False, "Collision detected" - - return True, "" \ No newline at end of file + return True, "" diff --git a/src/planning/path_executor.py b/src/planning/path_executor.py index a558d06..83690af 100644 --- a/src/planning/path_executor.py +++ b/src/planning/path_executor.py @@ -118,19 +118,24 @@ class PathExecutor: # 等待一个时间步 time.sleep(self.timestep) - # 确保到达精确位置 + # 设定目标末点,并等待收敛到容差内(超时则失败,暴露问题) self.arm_controller.set_joint_positions(target_config) - - # 验证是否到达 + + wait_start = time.time() + max_wait = max(self.timestep, move_time) + while time.time() - wait_start < max_wait: + final_config = self.arm_controller.get_current_joint_positions() + final_distance = np.linalg.norm( + np.array(target_config) - np.array(final_config) + ) + if final_distance <= self.position_tolerance: + return True + time.sleep(self.timestep) + + # 超时仍未到达,抛错暴露问题 final_config = self.arm_controller.get_current_joint_positions() - final_distance = np.linalg.norm( - np.array(target_config) - np.array(final_config) - ) - - if final_distance > self.position_tolerance: - raise RuntimeError(f"Failed to reach target, error: {final_distance}") - - return True + final_distance = np.linalg.norm(np.array(target_config) - np.array(final_config)) + raise RuntimeError(f"Failed to reach target, error: {final_distance}") def _close_gripper(self): """关闭夹爪""" @@ -188,4 +193,4 @@ class PathExecutor: self._open_gripper() time.sleep(self.timestep * 10) # 等待夹爪打开 - return True \ No newline at end of file + return True diff --git a/src/robot/arm_controller.py b/src/robot/arm_controller.py index d9466cf..d74db80 100644 --- a/src/robot/arm_controller.py +++ b/src/robot/arm_controller.py @@ -141,6 +141,13 @@ class ArmController: # Use KDL engine for primary validation (includes joint limits from URDF) return self.kinematics_engine.validate_joint_configuration(joint_positions) + + def set_joint_positions(self, joint_positions: List[float]) -> None: + """Convenience: set robot joint targets in PyBullet with validation.""" + self._ensure_initialized() + if not self.validate_joint_configuration(joint_positions): + raise ValueError("Target joint positions are invalid or exceed limits") + self.robot_loader.set_joint_positions(joint_positions) def move_to_joint_positions(self, joint_positions: List[float], timeout: float = 5.0) -> bool: @@ -536,4 +543,4 @@ class ArmController: def create_arm_controller(config_loader: ConfigLoader, physics_client: int) -> ArmController: """Create ArmController instance from configuration""" - return ArmController(config_loader, physics_client) \ No newline at end of file + return ArmController(config_loader, physics_client) diff --git a/src/robot/robot_loader.py b/src/robot/robot_loader.py index 68b9aea..08b24c9 100644 --- a/src/robot/robot_loader.py +++ b/src/robot/robot_loader.py @@ -42,11 +42,12 @@ class RobotLoader: # Convert RPY to quaternion for pybullet base_orientation_quat = p.getQuaternionFromEuler(self.base_orientation) - # Load URDF model + # Load URDF model (fixed base avoids floating/"flying" during planning) self.robot_id = p.loadURDF( model_path, basePosition=self.base_position, baseOrientation=base_orientation_quat, + useFixedBase=True, physicsClientId=self.physics_client ) @@ -320,4 +321,4 @@ class RobotLoader: current_positions = [state['position'] for state in current_states.values()] # Set position control with current positions as targets - self.set_joint_positions(current_positions) \ No newline at end of file + self.set_joint_positions(current_positions) diff --git a/src/simulation/environment.py b/src/simulation/environment.py index ec8409b..f5347c9 100644 --- a/src/simulation/environment.py +++ b/src/simulation/environment.py @@ -328,12 +328,15 @@ class Environment: wall_pos = self.wall_config['position'] hole_pos_rel = self.hole_config['position_relative_to_wall'] hole_dims = self.hole_config['dimensions'] - - # Calculate absolute hole position + + # Align with create_wall_with_hole(): hole center sits at wall mid-height by default + wall_height = float(self.wall_config['dimensions']['height']) + + # Calculate absolute hole position (center) hole_center = [ wall_pos[0] + hole_pos_rel[0], wall_pos[1] + hole_pos_rel[1], - wall_pos[2] + hole_pos_rel[2] + wall_pos[2] + wall_height / 2.0 + hole_pos_rel[2] ] return { @@ -430,4 +433,4 @@ class Environment: try: p.removeBody(obj_id, physicsClientId=self.physics_client) except: - pass # Object might already be removed \ No newline at end of file + pass # Object might already be removed