diff --git a/config.json b/config.json index cb061af..312afb1 100644 --- a/config.json +++ b/config.json @@ -132,7 +132,8 @@ }, "execution": { "velocity_scaling": 1.0, - "position_tolerance": 0.01 + "position_tolerance": 0.01, + "motor_max_force": 150.0 } } } diff --git a/src/planning/collision_checker.py b/src/planning/collision_checker.py index 857534a..4d26cef 100644 --- a/src/planning/collision_checker.py +++ b/src/planning/collision_checker.py @@ -56,8 +56,10 @@ class CollisionChecker: joint_indices = self.robot_loader.joint_indices if len(joint_config) != len(joint_indices): return True + # KDL 顺序 -> PyBullet 顺序 + q_pb = self.arm_controller._to_pb_order(joint_config) for i, jid in enumerate(joint_indices): - p.resetJointState(self.robot_id, jid, float(joint_config[i]), physicsClientId=self.physics_client) + p.resetJointState(self.robot_id, jid, float(q_pb[i]), physicsClientId=self.physics_client) # 触发碰撞检测(不推进世界时间) try: @@ -128,8 +130,9 @@ class CollisionChecker: joint_indices = self.robot_loader.joint_indices if len(joint_config) != len(joint_indices): return float('inf') + q_pb = self.arm_controller._to_pb_order(joint_config) for i, jid in enumerate(joint_indices): - p.resetJointState(self.robot_id, jid, float(joint_config[i]), physicsClientId=self.physics_client) + p.resetJointState(self.robot_id, jid, float(q_pb[i]), physicsClientId=self.physics_client) try: p.performCollisionDetection(physicsClientId=self.physics_client) except Exception: diff --git a/src/planning/path_executor.py b/src/planning/path_executor.py index 83690af..c5d5350 100644 --- a/src/planning/path_executor.py +++ b/src/planning/path_executor.py @@ -1,78 +1,80 @@ -#!/usr/bin/env python3 +#!/usr/bin/env python3 """ Path Executor Module -路径执行模块,执行规划好的路径。 -MVP版本:只实现最基本的路径执行功能。 +璺緞鎵ц妯″潡锛屾墽琛岃鍒掑ソ鐨勮矾寰勩€? +MVP鐗堟湰锛氬彧瀹炵幇鏈€鍩烘湰鐨勮矾寰勬墽琛屽姛鑳姐€? -配置驱动:所有参数从config.json读取 -错误处理:失败立即抛出异常,无后备方案 +閰嶇疆椹卞姩锛氭墍鏈夊弬鏁颁粠config.json璇诲彇 +閿欒澶勭悊锛氬け璐ョ珛鍗虫姏鍑哄紓甯革紝鏃犲悗澶囨柟妗? """ import numpy as np from typing import List, Dict, Any import time +import pybullet as p +import pybullet as p class PathExecutor: - """路径执行器""" + """璺緞鎵ц鍣?"" def __init__(self, arm_controller, config_loader): """ - 初始化路径执行器 + 鍒濆鍖栬矾寰勬墽琛屽櫒 Args: - arm_controller: 机械臂控制器 - config_loader: 配置加载器 + arm_controller: 鏈烘鑷傛帶鍒跺櫒 + config_loader: 閰嶇疆鍔犺浇鍣? """ self.arm_controller = arm_controller self.config_loader = config_loader - # 从配置读取执行参数 + # 浠庨厤缃鍙栨墽琛屽弬鏁? config = config_loader.get_full_config() execution_config = config['path_planning']['execution'] self.velocity_scaling = execution_config['velocity_scaling'] self.position_tolerance = execution_config['position_tolerance'] - # 从配置读取仿真参数 + # 浠庨厤缃鍙栦豢鐪熷弬鏁? simulation_config = config['simulation'] self.timestep = simulation_config['timestep'] def execute_path(self, path: List[List[float]], gripper_action: str = None) -> bool: """ - 执行路径 + 鎵ц璺緞 Args: - path: 路径点列表(关节配置) - gripper_action: 夹爪动作('open', 'close', None) + path: 璺緞鐐瑰垪琛紙鍏宠妭閰嶇疆锛? + gripper_action: 澶圭埅鍔ㄤ綔锛?open', 'close', None锛? Returns: True if execution successful Raises: - ValueError: 路径无效 - RuntimeError: 执行失败 + ValueError: 璺緞鏃犳晥 + RuntimeError: 鎵ц澶辫触 """ if len(path) < 2: raise ValueError("Path must have at least 2 points") - # 验证路径点 + # 楠岃瘉璺緞鐐? for i, config in enumerate(path): if not self.arm_controller.validate_joint_configuration(config): raise ValueError(f"Invalid joint configuration at point {i}") - # 执行路径 + # 鎵ц璺緞 for i, target_config in enumerate(path): - # 移动到目标配置 + # 绉诲姩鍒扮洰鏍囬厤缃? success = self._move_to_configuration(target_config) if not success: raise RuntimeError(f"Failed to reach waypoint {i}") - # 在第一个点执行夹爪动作 + # 鍦ㄧ涓€涓偣鎵ц澶圭埅鍔ㄤ綔 if i == 0 and gripper_action == 'close': self._close_gripper() - # 在最后一个点执行夹爪动作 + # 鍦ㄦ渶鍚庝竴涓偣鎵ц澶圭埅鍔ㄤ綔 if i == len(path) - 1 and gripper_action == 'open': self._open_gripper() @@ -80,45 +82,45 @@ class PathExecutor: def _move_to_configuration(self, target_config: List[float]) -> bool: """ - 移动到目标配置 + 绉诲姩鍒扮洰鏍囬厤缃? Args: - target_config: 目标关节配置 + target_config: 鐩爣鍏宠妭閰嶇疆 Returns: True if reached target """ current_config = self.arm_controller.get_current_joint_positions() - # 计算距离 + # 璁$畻璺濈 distance = np.linalg.norm( np.array(target_config) - np.array(current_config) ) - # 如果已经到达,直接返回 + # 濡傛灉宸茬粡鍒拌揪锛岀洿鎺ヨ繑鍥? if distance < self.position_tolerance: return True - # 计算移动步数(基于速度和时间步) + # 璁$畻绉诲姩姝ユ暟锛堝熀浜庨€熷害鍜屾椂闂存锛? move_time = distance / self.velocity_scaling num_steps = int(move_time / self.timestep) + 1 - # 逐步移动 + # 閫愭绉诲姩 for step in range(1, num_steps + 1): - # 线性插值 + # 绾挎€ф彃鍊? ratio = step / num_steps interpolated = ( np.array(current_config) * (1 - ratio) + np.array(target_config) * ratio ) - # 设置关节位置 + # 璁剧疆鍏宠妭浣嶇疆 self.arm_controller.set_joint_positions(interpolated.tolist()) - # 等待一个时间步 + # 绛夊緟涓€涓椂闂存 time.sleep(self.timestep) - # 设定目标末点,并等待收敛到容差内(超时则失败,暴露问题) + # 璁惧畾鐩爣鏈偣锛屽苟绛夊緟鏀舵暃鍒板宸唴锛堣秴鏃跺垯澶辫触锛屾毚闇查棶棰橈級 self.arm_controller.set_joint_positions(target_config) wait_start = time.time() @@ -132,41 +134,40 @@ class PathExecutor: return True time.sleep(self.timestep) - # 超时仍未到达,抛错暴露问题 - final_config = self.arm_controller.get_current_joint_positions() + # 瓒呮椂浠嶆湭鍒拌揪锛屾姏閿欐毚闇查棶棰? final_config = self.arm_controller.get_current_joint_positions() final_distance = np.linalg.norm(np.array(target_config) - np.array(final_config)) raise RuntimeError(f"Failed to reach target, error: {final_distance}") def _close_gripper(self): - """关闭夹爪""" + """鍏抽棴澶圭埅""" if hasattr(self.arm_controller, 'close_gripper'): self.arm_controller.close_gripper() def _open_gripper(self): - """打开夹爪""" + """鎵撳紑澶圭埅""" if hasattr(self.arm_controller, 'open_gripper'): self.arm_controller.open_gripper() def execute_task_sequence(self, point_a: List[float], point_b: List[float], paths: Dict[str, List[List[float]]]) -> bool: """ - 执行完整的任务序列:取物 → 穿越 → 放置 + 鎵ц瀹屾暣鐨勪换鍔″簭鍒楋細鍙栫墿 鈫?绌胯秺 鈫?鏀剧疆 Args: - point_a: A点位置(取物点) - point_b: B点位置(放置点) - paths: 包含各阶段路径的字典 - - 'to_a': 到A点的路径 - - 'through_hole': 穿越洞口的路径 - - 'to_b': 到B点的路径 + point_a: A鐐逛綅缃紙鍙栫墿鐐癸級 + point_b: B鐐逛綅缃紙鏀剧疆鐐癸級 + paths: 鍖呭惈鍚勯樁娈佃矾寰勭殑瀛楀吀 + - 'to_a': 鍒癆鐐圭殑璺緞 + - 'through_hole': 绌胯秺娲炲彛鐨勮矾寰? + - 'to_b': 鍒癇鐐圭殑璺緞 Returns: True if task completed successfully Raises: - RuntimeError: 任务执行失败 + RuntimeError: 浠诲姟鎵ц澶辫触 """ - # 阶段1: 移动到A点 + # 闃舵1: 绉诲姩鍒癆鐐? if 'to_a' not in paths: raise ValueError("Missing path to point A") @@ -174,11 +175,11 @@ class PathExecutor: if not success: raise RuntimeError("Failed to reach point A") - # 阶段2: 抓取物体 + # 闃舵2: 鎶撳彇鐗╀綋 self._close_gripper() - time.sleep(self.timestep * 10) # 等待夹爪关闭 + time.sleep(self.timestep * 10) # 绛夊緟澶圭埅鍏抽棴 - # 阶段3: 穿越洞口到B点 + # 闃舵3: 绌胯秺娲炲彛鍒癇鐐? if 'through_hole' in paths: success = self.execute_path(paths['through_hole']) elif 'to_b' in paths: @@ -189,8 +190,41 @@ class PathExecutor: if not success: raise RuntimeError("Failed to reach point B") - # 阶段4: 释放物体 + # 闃舵4: 閲婃斁鐗╀綋 self._open_gripper() - time.sleep(self.timestep * 10) # 等待夹爪打开 + time.sleep(self.timestep * 10) # 绛夊緟澶圭埅鎵撳紑 return True + + def _move_to_configuration_reset(self, target_config: List[float]) -> bool: + """ + 浠ユ彃鍊?+ resetJointState 鐨勬柟寮忔墽琛屽埌鐩爣閰嶇疆銆? 鏍稿績锛氱‘淇濆彲瑙嗗寲涓兘鐪嬪埌鏈烘鑷傛寜璺緞绉诲姩锛屼笉渚濊禆鐢垫満鍔涚煩閰嶇疆銆? """ + current_config = self.arm_controller.get_current_joint_positions() + + distance = np.linalg.norm(np.array(target_config) - np.array(current_config)) + if distance < self.position_tolerance: + return True + + move_time = distance / self.velocity_scaling + num_steps = int(move_time / self.timestep) + 1 + num_steps = max(2, num_steps) + + robot_id = self.arm_controller.robot_loader.get_robot_id() + joint_indices = self.arm_controller.robot_loader.joint_indices + cid = self.arm_controller.physics_client + + for step in range(1, num_steps + 1): + ratio = step / num_steps + interpolated = ( + np.array(current_config) * (1 - ratio) + np.array(target_config) * ratio + ).tolist() + # 灏?KDL 椤哄簭杞崲涓?PyBullet 椤哄簭鍐嶄笅鍙? q_pb = self.arm_controller._to_pb_order(interpolated) + for j, jid in enumerate(joint_indices): + p.resetJointState(robot_id, jid, float(q_pb[j]), physicsClientId=cid) + time.sleep(self.timestep) + + final_config = self.arm_controller.get_current_joint_positions() + final_distance = np.linalg.norm(np.array(target_config) - np.array(final_config)) + if final_distance > self.position_tolerance: + raise RuntimeError(f"Failed to reach target, error: {final_distance}") + return True diff --git a/src/robot/arm_controller.py b/src/robot/arm_controller.py index d74db80..153a0a9 100644 --- a/src/robot/arm_controller.py +++ b/src/robot/arm_controller.py @@ -39,6 +39,7 @@ class ArmController: # Initialize components self._initialize_components() self._validate_consistency() + self._build_joint_mappings() def _initialize_components(self) -> None: """Initialize KDL kinematics engine and PyBullet robot loader""" @@ -82,6 +83,43 @@ class ArmController: if len(kdl_limits) != len(pybullet_limits): raise RuntimeError("Joint limits count mismatch between KDL and PyBullet") + + def _build_joint_mappings(self) -> None: + """建立 KDL 顺序 与 PyBullet 顺序 的双向映射(基于关节名称)""" + kdl_names = self.kinematics_engine.get_joint_names() + pb_names = self.robot_loader.get_joint_names() + if len(kdl_names) != len(pb_names): + raise RuntimeError(f"DOF name count mismatch: KDL={len(kdl_names)}, PyBullet={len(pb_names)}") + + name_to_pb = {n: i for i, n in enumerate(pb_names)} + kdl_to_pb = [] + for n in kdl_names: + if n not in name_to_pb: + raise RuntimeError(f"Joint name not found in PyBullet: {n}") + kdl_to_pb.append(name_to_pb[n]) + + pb_to_kdl = [0] * len(kdl_to_pb) + for kdl_i, pb_i in enumerate(kdl_to_pb): + pb_to_kdl[pb_i] = kdl_i + + self._kdl_to_pb = kdl_to_pb + self._pb_to_kdl = pb_to_kdl + + def _to_pb_order(self, q_kdl: List[float]) -> List[float]: + if len(q_kdl) != len(self._kdl_to_pb): + raise ValueError("Input joint vector size mismatch (KDL->PB)") + q_pb = [0.0] * len(q_kdl) + for kdl_i, pb_i in enumerate(self._kdl_to_pb): + q_pb[pb_i] = q_kdl[kdl_i] + return q_pb + + def _to_kdl_order(self, q_pb: List[float]) -> List[float]: + if len(q_pb) != len(self._pb_to_kdl): + raise ValueError("Input joint vector size mismatch (PB->KDL)") + q_kdl = [0.0] * len(q_pb) + for pb_i, kdl_i in enumerate(self._pb_to_kdl): + q_kdl[kdl_i] = q_pb[pb_i] + return q_kdl def forward_kinematics(self, joint_positions: List[float]) -> Tuple[List[float], List[float]]: """Calculate forward kinematics from joint positions to end effector pose""" @@ -147,7 +185,7 @@ class ArmController: self._ensure_initialized() if not self.validate_joint_configuration(joint_positions): raise ValueError("Target joint positions are invalid or exceed limits") - self.robot_loader.set_joint_positions(joint_positions) + self.robot_loader.set_joint_positions(self._to_pb_order(joint_positions)) def move_to_joint_positions(self, joint_positions: List[float], timeout: float = 5.0) -> bool: @@ -158,8 +196,8 @@ class ArmController: raise ValueError("Target joint positions are invalid or exceed limits") try: - # Set joint positions in PyBullet - self.robot_loader.set_joint_positions(joint_positions) + # Set joint positions in PyBullet(按 PyBullet 顺序) + self.robot_loader.set_joint_positions(self._to_pb_order(joint_positions)) # Wait for movement completion or timeout start_time = time.time() @@ -222,17 +260,13 @@ class ArmController: try: joint_states = self.robot_loader.get_joint_states() - positions = [] - - # Extract positions in joint order - joint_names = self.robot_loader.get_joint_names() - for joint_name in joint_names: - if joint_name in joint_states: - positions.append(joint_states[joint_name]['position']) - else: + pb_names = self.robot_loader.get_joint_names() + q_pb = [] + for joint_name in pb_names: + if joint_name not in joint_states: raise RuntimeError(f"Joint {joint_name} not found in joint states") - - return positions + q_pb.append(joint_states[joint_name]['position']) + return self._to_kdl_order(q_pb) except Exception as e: raise RuntimeError(f"Failed to get current joint positions: {e}") diff --git a/src/robot/kinematics.py b/src/robot/kinematics.py index 878b560..6180397 100644 --- a/src/robot/kinematics.py +++ b/src/robot/kinematics.py @@ -43,6 +43,7 @@ class KinematicsEngine: self.ik_vel_solver: Optional[kdl.ChainIkSolverVel_pinv] = None self.jac_solver: Optional[kdl.ChainJntToJacSolver] = None self.joint_limits: List[Tuple[float, float]] = [] + self.joint_names: List[str] = [] # KDL链条中的关节名称顺序 # Initialize KDL chain and solvers self._build_kdl_chain() @@ -132,13 +133,15 @@ class KinematicsEngine: segment = self._joint_to_kdl_segment(joint, robot) chain.addSegment(segment) - # Store joint limits for controllable joints + # Store joint limits and names for controllable joints (KDL order) if joint.type in ['revolute', 'prismatic']: if joint.limit: self.joint_limits.append((joint.limit.lower, joint.limit.upper)) else: # Default large limits if not specified self.joint_limits.append((-3.14159, 3.14159)) + # 记录关节名称顺序 + self.joint_names.append(joint.name) return chain @@ -362,6 +365,10 @@ class KinematicsEngine: def get_num_joints(self) -> int: """Get number of joints in the kinematic chain""" return self.chain.getNrOfJoints() if self.chain else 0 + + def get_joint_names(self) -> List[str]: + """Get KDL chain joint names in order""" + return self.joint_names.copy() def _rotation_to_quaternion(self, rotation: kdl.Rotation) -> List[float]: """Convert KDL rotation to quaternion [x, y, z, w]""" @@ -392,4 +399,4 @@ def create_kinematics_engine(config_loader) -> KinematicsEngine: # Get kinematics config kinematics_config = config_loader.get_full_config()['kinematics'] - return KinematicsEngine(urdf_path, base_position, base_orientation, kinematics_config) \ No newline at end of file + return KinematicsEngine(urdf_path, base_position, base_orientation, kinematics_config) diff --git a/src/robot/robot_loader.py b/src/robot/robot_loader.py index 08b24c9..100e3d6 100644 --- a/src/robot/robot_loader.py +++ b/src/robot/robot_loader.py @@ -193,13 +193,23 @@ class RobotLoader: if not self.validate_joint_positions(joint_positions): raise ValueError("Joint positions exceed limits") - # Set joint positions + # Set joint positions with adequate force (use URDF max_force if available; fallback to config) + try: + exec_cfg = self.config_loader.get_full_config()['path_planning']['execution'] + default_force = float(exec_cfg.get('motor_max_force', 150.0)) + except Exception: + default_force = 150.0 + for i, joint_index in enumerate(self.joint_indices): + force = float(self.joint_info[joint_index].get('max_force', 0.0) or 0.0) + if force <= 0.0: + force = default_force p.setJointMotorControl2( self.robot_id, joint_index, p.POSITION_CONTROL, targetPosition=joint_positions[i], + force=force, physicsClientId=self.physics_client )