对齐KDL与PyBullet关节顺序,修复执行与检测一致性;RobotLoader为POSITION_CONTROL注入力矩(motor_max_force来自配置);执行偏好改为电机平滑
This commit is contained in:
parent
e617a7c28e
commit
23aff4077a
@ -132,7 +132,8 @@
|
||||
},
|
||||
"execution": {
|
||||
"velocity_scaling": 1.0,
|
||||
"position_tolerance": 0.01
|
||||
"position_tolerance": 0.01,
|
||||
"motor_max_force": 150.0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -56,8 +56,10 @@ class CollisionChecker:
|
||||
joint_indices = self.robot_loader.joint_indices
|
||||
if len(joint_config) != len(joint_indices):
|
||||
return True
|
||||
# KDL 顺序 -> PyBullet 顺序
|
||||
q_pb = self.arm_controller._to_pb_order(joint_config)
|
||||
for i, jid in enumerate(joint_indices):
|
||||
p.resetJointState(self.robot_id, jid, float(joint_config[i]), physicsClientId=self.physics_client)
|
||||
p.resetJointState(self.robot_id, jid, float(q_pb[i]), physicsClientId=self.physics_client)
|
||||
|
||||
# 触发碰撞检测(不推进世界时间)
|
||||
try:
|
||||
@ -128,8 +130,9 @@ class CollisionChecker:
|
||||
joint_indices = self.robot_loader.joint_indices
|
||||
if len(joint_config) != len(joint_indices):
|
||||
return float('inf')
|
||||
q_pb = self.arm_controller._to_pb_order(joint_config)
|
||||
for i, jid in enumerate(joint_indices):
|
||||
p.resetJointState(self.robot_id, jid, float(joint_config[i]), physicsClientId=self.physics_client)
|
||||
p.resetJointState(self.robot_id, jid, float(q_pb[i]), physicsClientId=self.physics_client)
|
||||
try:
|
||||
p.performCollisionDetection(physicsClientId=self.physics_client)
|
||||
except Exception:
|
||||
|
||||
@ -1,78 +1,80 @@
|
||||
#!/usr/bin/env python3
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Path Executor Module
|
||||
|
||||
路径执行模块,执行规划好的路径。
|
||||
MVP版本:只实现最基本的路径执行功能。
|
||||
璺緞鎵ц妯″潡锛屾墽琛岃鍒掑ソ鐨勮矾寰勩€?
|
||||
MVP鐗堟湰锛氬彧瀹炵幇鏈€鍩烘湰鐨勮矾寰勬墽琛屽姛鑳姐€?
|
||||
|
||||
配置驱动:所有参数从config.json读取
|
||||
错误处理:失败立即抛出异常,无后备方案
|
||||
閰嶇疆椹卞姩锛氭墍鏈夊弬鏁颁粠config.json璇诲彇
|
||||
閿欒澶勭悊锛氬け璐ョ珛鍗虫姏鍑哄紓甯革紝鏃犲悗澶囨柟妗?
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
from typing import List, Dict, Any
|
||||
import time
|
||||
import pybullet as p
|
||||
import pybullet as p
|
||||
|
||||
|
||||
class PathExecutor:
|
||||
"""路径执行器"""
|
||||
"""璺緞鎵ц鍣?""
|
||||
|
||||
def __init__(self, arm_controller, config_loader):
|
||||
"""
|
||||
初始化路径执行器
|
||||
鍒濆鍖栬矾寰勬墽琛屽櫒
|
||||
|
||||
Args:
|
||||
arm_controller: 机械臂控制器
|
||||
config_loader: 配置加载器
|
||||
arm_controller: 鏈烘鑷傛帶鍒跺櫒
|
||||
config_loader: 閰嶇疆鍔犺浇鍣?
|
||||
"""
|
||||
self.arm_controller = arm_controller
|
||||
self.config_loader = config_loader
|
||||
|
||||
# 从配置读取执行参数
|
||||
# 浠庨厤缃鍙栨墽琛屽弬鏁?
|
||||
config = config_loader.get_full_config()
|
||||
execution_config = config['path_planning']['execution']
|
||||
self.velocity_scaling = execution_config['velocity_scaling']
|
||||
self.position_tolerance = execution_config['position_tolerance']
|
||||
|
||||
# 从配置读取仿真参数
|
||||
# 浠庨厤缃鍙栦豢鐪熷弬鏁?
|
||||
simulation_config = config['simulation']
|
||||
self.timestep = simulation_config['timestep']
|
||||
|
||||
def execute_path(self, path: List[List[float]], gripper_action: str = None) -> bool:
|
||||
"""
|
||||
执行路径
|
||||
鎵ц璺緞
|
||||
|
||||
Args:
|
||||
path: 路径点列表(关节配置)
|
||||
gripper_action: 夹爪动作('open', 'close', None)
|
||||
path: 璺緞鐐瑰垪琛紙鍏宠妭閰嶇疆锛?
|
||||
gripper_action: 澶圭埅鍔ㄤ綔锛?open', 'close', None锛?
|
||||
|
||||
Returns:
|
||||
True if execution successful
|
||||
|
||||
Raises:
|
||||
ValueError: 路径无效
|
||||
RuntimeError: 执行失败
|
||||
ValueError: 璺緞鏃犳晥
|
||||
RuntimeError: 鎵ц澶辫触
|
||||
"""
|
||||
if len(path) < 2:
|
||||
raise ValueError("Path must have at least 2 points")
|
||||
|
||||
# 验证路径点
|
||||
# 楠岃瘉璺緞鐐?
|
||||
for i, config in enumerate(path):
|
||||
if not self.arm_controller.validate_joint_configuration(config):
|
||||
raise ValueError(f"Invalid joint configuration at point {i}")
|
||||
|
||||
# 执行路径
|
||||
# 鎵ц璺緞
|
||||
for i, target_config in enumerate(path):
|
||||
# 移动到目标配置
|
||||
# 绉诲姩鍒扮洰鏍囬厤缃?
|
||||
success = self._move_to_configuration(target_config)
|
||||
if not success:
|
||||
raise RuntimeError(f"Failed to reach waypoint {i}")
|
||||
|
||||
# 在第一个点执行夹爪动作
|
||||
# 鍦ㄧ涓€涓偣鎵ц澶圭埅鍔ㄤ綔
|
||||
if i == 0 and gripper_action == 'close':
|
||||
self._close_gripper()
|
||||
|
||||
# 在最后一个点执行夹爪动作
|
||||
# 鍦ㄦ渶鍚庝竴涓偣鎵ц澶圭埅鍔ㄤ綔
|
||||
if i == len(path) - 1 and gripper_action == 'open':
|
||||
self._open_gripper()
|
||||
|
||||
@ -80,45 +82,45 @@ class PathExecutor:
|
||||
|
||||
def _move_to_configuration(self, target_config: List[float]) -> bool:
|
||||
"""
|
||||
移动到目标配置
|
||||
绉诲姩鍒扮洰鏍囬厤缃?
|
||||
|
||||
Args:
|
||||
target_config: 目标关节配置
|
||||
target_config: 鐩爣鍏宠妭閰嶇疆
|
||||
|
||||
Returns:
|
||||
True if reached target
|
||||
"""
|
||||
current_config = self.arm_controller.get_current_joint_positions()
|
||||
|
||||
# 计算距离
|
||||
# 璁$畻璺濈
|
||||
distance = np.linalg.norm(
|
||||
np.array(target_config) - np.array(current_config)
|
||||
)
|
||||
|
||||
# 如果已经到达,直接返回
|
||||
# 濡傛灉宸茬粡鍒拌揪锛岀洿鎺ヨ繑鍥?
|
||||
if distance < self.position_tolerance:
|
||||
return True
|
||||
|
||||
# 计算移动步数(基于速度和时间步)
|
||||
# 璁$畻绉诲姩姝ユ暟锛堝熀浜庨€熷害鍜屾椂闂存锛?
|
||||
move_time = distance / self.velocity_scaling
|
||||
num_steps = int(move_time / self.timestep) + 1
|
||||
|
||||
# 逐步移动
|
||||
# 閫愭绉诲姩
|
||||
for step in range(1, num_steps + 1):
|
||||
# 线性插值
|
||||
# 绾挎€ф彃鍊?
|
||||
ratio = step / num_steps
|
||||
interpolated = (
|
||||
np.array(current_config) * (1 - ratio) +
|
||||
np.array(target_config) * ratio
|
||||
)
|
||||
|
||||
# 设置关节位置
|
||||
# 璁剧疆鍏宠妭浣嶇疆
|
||||
self.arm_controller.set_joint_positions(interpolated.tolist())
|
||||
|
||||
# 等待一个时间步
|
||||
# 绛夊緟涓€涓椂闂存
|
||||
time.sleep(self.timestep)
|
||||
|
||||
# 设定目标末点,并等待收敛到容差内(超时则失败,暴露问题)
|
||||
# 璁惧畾鐩爣鏈偣锛屽苟绛夊緟鏀舵暃鍒板宸唴锛堣秴鏃跺垯澶辫触锛屾毚闇查棶棰橈級
|
||||
self.arm_controller.set_joint_positions(target_config)
|
||||
|
||||
wait_start = time.time()
|
||||
@ -132,41 +134,40 @@ class PathExecutor:
|
||||
return True
|
||||
time.sleep(self.timestep)
|
||||
|
||||
# 超时仍未到达,抛错暴露问题
|
||||
final_config = self.arm_controller.get_current_joint_positions()
|
||||
# 瓒呮椂浠嶆湭鍒拌揪锛屾姏閿欐毚闇查棶棰? final_config = self.arm_controller.get_current_joint_positions()
|
||||
final_distance = np.linalg.norm(np.array(target_config) - np.array(final_config))
|
||||
raise RuntimeError(f"Failed to reach target, error: {final_distance}")
|
||||
|
||||
def _close_gripper(self):
|
||||
"""关闭夹爪"""
|
||||
"""鍏抽棴澶圭埅"""
|
||||
if hasattr(self.arm_controller, 'close_gripper'):
|
||||
self.arm_controller.close_gripper()
|
||||
|
||||
def _open_gripper(self):
|
||||
"""打开夹爪"""
|
||||
"""鎵撳紑澶圭埅"""
|
||||
if hasattr(self.arm_controller, 'open_gripper'):
|
||||
self.arm_controller.open_gripper()
|
||||
|
||||
def execute_task_sequence(self, point_a: List[float], point_b: List[float],
|
||||
paths: Dict[str, List[List[float]]]) -> bool:
|
||||
"""
|
||||
执行完整的任务序列:取物 → 穿越 → 放置
|
||||
鎵ц瀹屾暣鐨勪换鍔″簭鍒楋細鍙栫墿 鈫?绌胯秺 鈫?鏀剧疆
|
||||
|
||||
Args:
|
||||
point_a: A点位置(取物点)
|
||||
point_b: B点位置(放置点)
|
||||
paths: 包含各阶段路径的字典
|
||||
- 'to_a': 到A点的路径
|
||||
- 'through_hole': 穿越洞口的路径
|
||||
- 'to_b': 到B点的路径
|
||||
point_a: A鐐逛綅缃紙鍙栫墿鐐癸級
|
||||
point_b: B鐐逛綅缃紙鏀剧疆鐐癸級
|
||||
paths: 鍖呭惈鍚勯樁娈佃矾寰勭殑瀛楀吀
|
||||
- 'to_a': 鍒癆鐐圭殑璺緞
|
||||
- 'through_hole': 绌胯秺娲炲彛鐨勮矾寰?
|
||||
- 'to_b': 鍒癇鐐圭殑璺緞
|
||||
|
||||
Returns:
|
||||
True if task completed successfully
|
||||
|
||||
Raises:
|
||||
RuntimeError: 任务执行失败
|
||||
RuntimeError: 浠诲姟鎵ц澶辫触
|
||||
"""
|
||||
# 阶段1: 移动到A点
|
||||
# 闃舵1: 绉诲姩鍒癆鐐?
|
||||
if 'to_a' not in paths:
|
||||
raise ValueError("Missing path to point A")
|
||||
|
||||
@ -174,11 +175,11 @@ class PathExecutor:
|
||||
if not success:
|
||||
raise RuntimeError("Failed to reach point A")
|
||||
|
||||
# 阶段2: 抓取物体
|
||||
# 闃舵2: 鎶撳彇鐗╀綋
|
||||
self._close_gripper()
|
||||
time.sleep(self.timestep * 10) # 等待夹爪关闭
|
||||
time.sleep(self.timestep * 10) # 绛夊緟澶圭埅鍏抽棴
|
||||
|
||||
# 阶段3: 穿越洞口到B点
|
||||
# 闃舵3: 绌胯秺娲炲彛鍒癇鐐?
|
||||
if 'through_hole' in paths:
|
||||
success = self.execute_path(paths['through_hole'])
|
||||
elif 'to_b' in paths:
|
||||
@ -189,8 +190,41 @@ class PathExecutor:
|
||||
if not success:
|
||||
raise RuntimeError("Failed to reach point B")
|
||||
|
||||
# 阶段4: 释放物体
|
||||
# 闃舵4: 閲婃斁鐗╀綋
|
||||
self._open_gripper()
|
||||
time.sleep(self.timestep * 10) # 等待夹爪打开
|
||||
time.sleep(self.timestep * 10) # 绛夊緟澶圭埅鎵撳紑
|
||||
|
||||
return True
|
||||
|
||||
def _move_to_configuration_reset(self, target_config: List[float]) -> bool:
|
||||
"""
|
||||
浠ユ彃鍊?+ resetJointState 鐨勬柟寮忔墽琛屽埌鐩爣閰嶇疆銆? 鏍稿績锛氱‘淇濆彲瑙嗗寲涓兘鐪嬪埌鏈烘鑷傛寜璺緞绉诲姩锛屼笉渚濊禆鐢垫満鍔涚煩閰嶇疆銆? """
|
||||
current_config = self.arm_controller.get_current_joint_positions()
|
||||
|
||||
distance = np.linalg.norm(np.array(target_config) - np.array(current_config))
|
||||
if distance < self.position_tolerance:
|
||||
return True
|
||||
|
||||
move_time = distance / self.velocity_scaling
|
||||
num_steps = int(move_time / self.timestep) + 1
|
||||
num_steps = max(2, num_steps)
|
||||
|
||||
robot_id = self.arm_controller.robot_loader.get_robot_id()
|
||||
joint_indices = self.arm_controller.robot_loader.joint_indices
|
||||
cid = self.arm_controller.physics_client
|
||||
|
||||
for step in range(1, num_steps + 1):
|
||||
ratio = step / num_steps
|
||||
interpolated = (
|
||||
np.array(current_config) * (1 - ratio) + np.array(target_config) * ratio
|
||||
).tolist()
|
||||
# 灏?KDL 椤哄簭杞崲涓?PyBullet 椤哄簭鍐嶄笅鍙? q_pb = self.arm_controller._to_pb_order(interpolated)
|
||||
for j, jid in enumerate(joint_indices):
|
||||
p.resetJointState(robot_id, jid, float(q_pb[j]), physicsClientId=cid)
|
||||
time.sleep(self.timestep)
|
||||
|
||||
final_config = self.arm_controller.get_current_joint_positions()
|
||||
final_distance = np.linalg.norm(np.array(target_config) - np.array(final_config))
|
||||
if final_distance > self.position_tolerance:
|
||||
raise RuntimeError(f"Failed to reach target, error: {final_distance}")
|
||||
return True
|
||||
|
||||
@ -39,6 +39,7 @@ class ArmController:
|
||||
# Initialize components
|
||||
self._initialize_components()
|
||||
self._validate_consistency()
|
||||
self._build_joint_mappings()
|
||||
|
||||
def _initialize_components(self) -> None:
|
||||
"""Initialize KDL kinematics engine and PyBullet robot loader"""
|
||||
@ -82,6 +83,43 @@ class ArmController:
|
||||
|
||||
if len(kdl_limits) != len(pybullet_limits):
|
||||
raise RuntimeError("Joint limits count mismatch between KDL and PyBullet")
|
||||
|
||||
def _build_joint_mappings(self) -> None:
|
||||
"""建立 KDL 顺序 与 PyBullet 顺序 的双向映射(基于关节名称)"""
|
||||
kdl_names = self.kinematics_engine.get_joint_names()
|
||||
pb_names = self.robot_loader.get_joint_names()
|
||||
if len(kdl_names) != len(pb_names):
|
||||
raise RuntimeError(f"DOF name count mismatch: KDL={len(kdl_names)}, PyBullet={len(pb_names)}")
|
||||
|
||||
name_to_pb = {n: i for i, n in enumerate(pb_names)}
|
||||
kdl_to_pb = []
|
||||
for n in kdl_names:
|
||||
if n not in name_to_pb:
|
||||
raise RuntimeError(f"Joint name not found in PyBullet: {n}")
|
||||
kdl_to_pb.append(name_to_pb[n])
|
||||
|
||||
pb_to_kdl = [0] * len(kdl_to_pb)
|
||||
for kdl_i, pb_i in enumerate(kdl_to_pb):
|
||||
pb_to_kdl[pb_i] = kdl_i
|
||||
|
||||
self._kdl_to_pb = kdl_to_pb
|
||||
self._pb_to_kdl = pb_to_kdl
|
||||
|
||||
def _to_pb_order(self, q_kdl: List[float]) -> List[float]:
|
||||
if len(q_kdl) != len(self._kdl_to_pb):
|
||||
raise ValueError("Input joint vector size mismatch (KDL->PB)")
|
||||
q_pb = [0.0] * len(q_kdl)
|
||||
for kdl_i, pb_i in enumerate(self._kdl_to_pb):
|
||||
q_pb[pb_i] = q_kdl[kdl_i]
|
||||
return q_pb
|
||||
|
||||
def _to_kdl_order(self, q_pb: List[float]) -> List[float]:
|
||||
if len(q_pb) != len(self._pb_to_kdl):
|
||||
raise ValueError("Input joint vector size mismatch (PB->KDL)")
|
||||
q_kdl = [0.0] * len(q_pb)
|
||||
for pb_i, kdl_i in enumerate(self._pb_to_kdl):
|
||||
q_kdl[kdl_i] = q_pb[pb_i]
|
||||
return q_kdl
|
||||
|
||||
def forward_kinematics(self, joint_positions: List[float]) -> Tuple[List[float], List[float]]:
|
||||
"""Calculate forward kinematics from joint positions to end effector pose"""
|
||||
@ -147,7 +185,7 @@ class ArmController:
|
||||
self._ensure_initialized()
|
||||
if not self.validate_joint_configuration(joint_positions):
|
||||
raise ValueError("Target joint positions are invalid or exceed limits")
|
||||
self.robot_loader.set_joint_positions(joint_positions)
|
||||
self.robot_loader.set_joint_positions(self._to_pb_order(joint_positions))
|
||||
|
||||
def move_to_joint_positions(self, joint_positions: List[float],
|
||||
timeout: float = 5.0) -> bool:
|
||||
@ -158,8 +196,8 @@ class ArmController:
|
||||
raise ValueError("Target joint positions are invalid or exceed limits")
|
||||
|
||||
try:
|
||||
# Set joint positions in PyBullet
|
||||
self.robot_loader.set_joint_positions(joint_positions)
|
||||
# Set joint positions in PyBullet(按 PyBullet 顺序)
|
||||
self.robot_loader.set_joint_positions(self._to_pb_order(joint_positions))
|
||||
|
||||
# Wait for movement completion or timeout
|
||||
start_time = time.time()
|
||||
@ -222,17 +260,13 @@ class ArmController:
|
||||
|
||||
try:
|
||||
joint_states = self.robot_loader.get_joint_states()
|
||||
positions = []
|
||||
|
||||
# Extract positions in joint order
|
||||
joint_names = self.robot_loader.get_joint_names()
|
||||
for joint_name in joint_names:
|
||||
if joint_name in joint_states:
|
||||
positions.append(joint_states[joint_name]['position'])
|
||||
else:
|
||||
pb_names = self.robot_loader.get_joint_names()
|
||||
q_pb = []
|
||||
for joint_name in pb_names:
|
||||
if joint_name not in joint_states:
|
||||
raise RuntimeError(f"Joint {joint_name} not found in joint states")
|
||||
|
||||
return positions
|
||||
q_pb.append(joint_states[joint_name]['position'])
|
||||
return self._to_kdl_order(q_pb)
|
||||
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to get current joint positions: {e}")
|
||||
|
||||
@ -43,6 +43,7 @@ class KinematicsEngine:
|
||||
self.ik_vel_solver: Optional[kdl.ChainIkSolverVel_pinv] = None
|
||||
self.jac_solver: Optional[kdl.ChainJntToJacSolver] = None
|
||||
self.joint_limits: List[Tuple[float, float]] = []
|
||||
self.joint_names: List[str] = [] # KDL链条中的关节名称顺序
|
||||
|
||||
# Initialize KDL chain and solvers
|
||||
self._build_kdl_chain()
|
||||
@ -132,13 +133,15 @@ class KinematicsEngine:
|
||||
segment = self._joint_to_kdl_segment(joint, robot)
|
||||
chain.addSegment(segment)
|
||||
|
||||
# Store joint limits for controllable joints
|
||||
# Store joint limits and names for controllable joints (KDL order)
|
||||
if joint.type in ['revolute', 'prismatic']:
|
||||
if joint.limit:
|
||||
self.joint_limits.append((joint.limit.lower, joint.limit.upper))
|
||||
else:
|
||||
# Default large limits if not specified
|
||||
self.joint_limits.append((-3.14159, 3.14159))
|
||||
# 记录关节名称顺序
|
||||
self.joint_names.append(joint.name)
|
||||
|
||||
return chain
|
||||
|
||||
@ -362,6 +365,10 @@ class KinematicsEngine:
|
||||
def get_num_joints(self) -> int:
|
||||
"""Get number of joints in the kinematic chain"""
|
||||
return self.chain.getNrOfJoints() if self.chain else 0
|
||||
|
||||
def get_joint_names(self) -> List[str]:
|
||||
"""Get KDL chain joint names in order"""
|
||||
return self.joint_names.copy()
|
||||
|
||||
def _rotation_to_quaternion(self, rotation: kdl.Rotation) -> List[float]:
|
||||
"""Convert KDL rotation to quaternion [x, y, z, w]"""
|
||||
@ -392,4 +399,4 @@ def create_kinematics_engine(config_loader) -> KinematicsEngine:
|
||||
# Get kinematics config
|
||||
kinematics_config = config_loader.get_full_config()['kinematics']
|
||||
|
||||
return KinematicsEngine(urdf_path, base_position, base_orientation, kinematics_config)
|
||||
return KinematicsEngine(urdf_path, base_position, base_orientation, kinematics_config)
|
||||
|
||||
@ -193,13 +193,23 @@ class RobotLoader:
|
||||
if not self.validate_joint_positions(joint_positions):
|
||||
raise ValueError("Joint positions exceed limits")
|
||||
|
||||
# Set joint positions
|
||||
# Set joint positions with adequate force (use URDF max_force if available; fallback to config)
|
||||
try:
|
||||
exec_cfg = self.config_loader.get_full_config()['path_planning']['execution']
|
||||
default_force = float(exec_cfg.get('motor_max_force', 150.0))
|
||||
except Exception:
|
||||
default_force = 150.0
|
||||
|
||||
for i, joint_index in enumerate(self.joint_indices):
|
||||
force = float(self.joint_info[joint_index].get('max_force', 0.0) or 0.0)
|
||||
if force <= 0.0:
|
||||
force = default_force
|
||||
p.setJointMotorControl2(
|
||||
self.robot_id,
|
||||
joint_index,
|
||||
p.POSITION_CONTROL,
|
||||
targetPosition=joint_positions[i],
|
||||
force=force,
|
||||
physicsClientId=self.physics_client
|
||||
)
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user