RoboticArmTest/src/robot/arm_controller.py
sladro 21bf2bb4df feat: 实施PyBullet IK方案替代KDL,解决32.8cm定位偏差问题
主要修改:
1. arm_controller.py: 使用PyBullet的calculateInverseKinematics替代KDL IK
   - 精度从32.8cm误差降至0.566cm
   - 确保IK求解与仿真环境的一致性

2. main_window.py: 优化路径可视化,避免机械臂重复执行
   - 修复早期返回bug,确保三阶段任务完整执行
   - 改进可视化方法,减少对机械臂状态的影响
   - 使用保存-计算-恢复策略最小化视觉干扰

效果:
- 大幅提升系统精度
- 消除重复执行问题
- 保持KDL代码供对比研究

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-14 09:12:09 +08:00

593 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Robotic Arm Controller
Unified control interface that integrates KDL kinematics engine with PyBullet simulation.
Provides high-level arm control, workspace validation, and path planning support.
"""
import numpy as np
from typing import List, Tuple, Optional, Dict, Any
import time
import math
from .kinematics import KinematicsEngine, create_kinematics_engine
from .robot_loader import RobotLoader
from ..config_loader import ConfigLoader
class ArmController:
"""Unified robotic arm controller integrating KDL kinematics with PyBullet simulation"""
def __init__(self, config_loader: ConfigLoader, physics_client: int):
"""Initialize arm controller with KDL and PyBullet integration"""
self.config_loader = config_loader
self.physics_client = physics_client
# Core components
self.kinematics_engine: Optional[KinematicsEngine] = None
self.robot_loader: Optional[RobotLoader] = None
# Configuration
self.robot_config = config_loader.get_robot_config()
self.task_points_config = config_loader.get_task_points()
# State tracking
self.is_initialized = False
self.home_position: Optional[List[float]] = None
# Initialize components
self._initialize_components()
self._validate_consistency()
self._build_joint_mappings()
def _initialize_components(self) -> None:
"""Initialize KDL kinematics engine and PyBullet robot loader"""
try:
# Initialize KDL kinematics engine
self.kinematics_engine = create_kinematics_engine(self.config_loader)
# Initialize PyBullet robot loader
self.robot_loader = RobotLoader(self.config_loader, self.physics_client)
robot_id = self.robot_loader.load_robot()
# Store home position from config or use zeros
num_joints = self.kinematics_engine.get_num_joints()
robot_config = self.config_loader.get_robot_config()
self.home_position = robot_config.get('initial_joint_positions', [0.0] * num_joints)
# Ensure home position length matches DOF
if len(self.home_position) != num_joints:
self.home_position = [0.0] * num_joints
self.is_initialized = True
except Exception as e:
raise RuntimeError(f"Failed to initialize arm controller components: {e}")
def _validate_consistency(self) -> None:
"""Validate consistency between KDL and PyBullet components"""
if not self.is_initialized:
raise RuntimeError("Components not initialized")
# Check DOF consistency
kdl_dof = self.kinematics_engine.get_num_joints()
pybullet_dof = self.robot_loader.get_dof()
if kdl_dof != pybullet_dof:
raise RuntimeError(f"DOF mismatch: KDL={kdl_dof}, PyBullet={pybullet_dof}")
# Validate joint limits consistency
kdl_limits = self.kinematics_engine.get_joint_limits()
pybullet_limits = self.robot_loader.get_joint_limits()
if len(kdl_limits) != len(pybullet_limits):
raise RuntimeError("Joint limits count mismatch between KDL and PyBullet")
def _build_joint_mappings(self) -> None:
"""建立 KDL 顺序 与 PyBullet 顺序 的双向映射(基于关节名称)"""
kdl_names = self.kinematics_engine.get_joint_names()
pb_names = self.robot_loader.get_joint_names()
if len(kdl_names) != len(pb_names):
raise RuntimeError(f"DOF name count mismatch: KDL={len(kdl_names)}, PyBullet={len(pb_names)}")
name_to_pb = {n: i for i, n in enumerate(pb_names)}
kdl_to_pb = []
for n in kdl_names:
if n not in name_to_pb:
raise RuntimeError(f"Joint name not found in PyBullet: {n}")
kdl_to_pb.append(name_to_pb[n])
pb_to_kdl = [0] * len(kdl_to_pb)
for kdl_i, pb_i in enumerate(kdl_to_pb):
pb_to_kdl[pb_i] = kdl_i
self._kdl_to_pb = kdl_to_pb
self._pb_to_kdl = pb_to_kdl
def _to_pb_order(self, q_kdl: List[float]) -> List[float]:
if len(q_kdl) != len(self._kdl_to_pb):
raise ValueError("Input joint vector size mismatch (KDL->PB)")
q_pb = [0.0] * len(q_kdl)
for kdl_i, pb_i in enumerate(self._kdl_to_pb):
q_pb[pb_i] = q_kdl[kdl_i]
return q_pb
def _to_kdl_order(self, q_pb: List[float]) -> List[float]:
if len(q_pb) != len(self._pb_to_kdl):
raise ValueError("Input joint vector size mismatch (PB->KDL)")
q_kdl = [0.0] * len(q_pb)
for pb_i, kdl_i in enumerate(self._pb_to_kdl):
q_kdl[kdl_i] = q_pb[pb_i]
return q_kdl
def forward_kinematics(self, joint_positions: List[float]) -> Tuple[List[float], List[float]]:
"""Calculate forward kinematics from joint positions to end effector pose"""
self._ensure_initialized()
if not self.validate_joint_configuration(joint_positions):
raise ValueError("Joint positions exceed limits or invalid configuration")
try:
position, orientation = self.kinematics_engine.forward_kinematics(joint_positions)
return position, orientation
except Exception as e:
raise RuntimeError(f"Forward kinematics calculation failed: {e}")
def inverse_kinematics(self, target_position: List[float],
target_orientation: List[float] = None,
seed_angles: List[float] = None) -> Optional[List[float]]:
"""Calculate inverse kinematics from target pose to joint positions"""
self._ensure_initialized()
# Use current joint angles as seed if not provided
if seed_angles is None:
seed_angles = self.get_current_joint_positions()
try:
# 使用PyBullet的IK求解器以确保与仿真环境的一致性
import pybullet as p
pb_solution = p.calculateInverseKinematics(
self.robot_loader.robot_id,
self.robot_loader.end_effector_index,
target_position,
targetOrientation=target_orientation,
maxNumIterations=100,
residualThreshold=0.001,
physicsClientId=self.physics_client
)
# 转换为列表并截取正确的关节数量
joint_angles = list(pb_solution)[:self.kinematics_engine.get_num_joints()]
# Validate solution if found
if joint_angles is not None:
if not self.validate_joint_configuration(joint_angles):
return None
return joint_angles
except Exception as e:
raise RuntimeError(f"Inverse kinematics calculation failed: {e}")
def compute_jacobian(self, joint_positions: List[float]) -> np.ndarray:
"""Compute Jacobian matrix for given joint configuration"""
self._ensure_initialized()
if not self.validate_joint_configuration(joint_positions):
raise ValueError("Invalid joint configuration for Jacobian computation")
try:
return self.kinematics_engine.compute_jacobian(joint_positions)
except Exception as e:
raise RuntimeError(f"Jacobian computation failed: {e}")
def validate_joint_configuration(self, joint_positions: List[float]) -> bool:
"""Validate if joint configuration is within limits and feasible"""
self._ensure_initialized()
# Use KDL engine for primary validation (includes joint limits from URDF)
return self.kinematics_engine.validate_joint_configuration(joint_positions)
def set_joint_positions(self, joint_positions: List[float]) -> None:
"""Convenience: set robot joint targets in PyBullet with validation."""
self._ensure_initialized()
if not self.validate_joint_configuration(joint_positions):
raise ValueError("Target joint positions are invalid or exceed limits")
self.robot_loader.set_joint_positions(self._to_pb_order(joint_positions))
def move_to_joint_positions(self, joint_positions: List[float],
timeout: float = 5.0) -> bool:
"""Move robot to specified joint positions"""
self._ensure_initialized()
if not self.validate_joint_configuration(joint_positions):
raise ValueError("Target joint positions are invalid or exceed limits")
try:
# Set joint positions in PyBullet按 PyBullet 顺序)
self.robot_loader.set_joint_positions(self._to_pb_order(joint_positions))
# Wait for movement completion or timeout
start_time = time.time()
tolerance = 0.01 # 0.01 radians tolerance
while time.time() - start_time < timeout:
current_positions = self.get_current_joint_positions()
# Check if close enough to target
position_errors = [abs(current - target)
for current, target in zip(current_positions, joint_positions)]
if all(error < tolerance for error in position_errors):
return True
time.sleep(0.1) # Check every 100ms
return False # Timeout reached
except Exception as e:
raise RuntimeError(f"Failed to move to joint positions: {e}")
def move_to_cartesian_pose(self, target_position: List[float],
target_orientation: List[float] = None,
timeout: float = 5.0) -> bool:
"""Move robot end effector to specified Cartesian pose"""
self._ensure_initialized()
# Get current joint positions as seed for IK
current_joints = self.get_current_joint_positions()
# Solve inverse kinematics
target_joints = self.inverse_kinematics(target_position, target_orientation, current_joints)
if target_joints is None:
return False # No IK solution found
# Move to computed joint positions
return self.move_to_joint_positions(target_joints, timeout)
def get_current_pose(self) -> Tuple[List[float], List[float]]:
"""Get current end effector pose in world coordinates"""
self._ensure_initialized()
try:
# Get current joint positions from PyBullet
current_joints = self.get_current_joint_positions()
# Calculate forward kinematics with KDL
position, orientation = self.forward_kinematics(current_joints)
return position, orientation
except Exception as e:
raise RuntimeError(f"Failed to get current pose: {e}")
def get_current_joint_positions(self) -> List[float]:
"""Get current joint positions from PyBullet simulation"""
self._ensure_initialized()
try:
joint_states = self.robot_loader.get_joint_states()
pb_names = self.robot_loader.get_joint_names()
q_pb = []
for joint_name in pb_names:
if joint_name not in joint_states:
raise RuntimeError(f"Joint {joint_name} not found in joint states")
q_pb.append(joint_states[joint_name]['position'])
return self._to_kdl_order(q_pb)
except Exception as e:
raise RuntimeError(f"Failed to get current joint positions: {e}")
def get_current_joint_states(self) -> Dict[str, Dict[str, float]]:
"""Get complete current joint states from PyBullet"""
self._ensure_initialized()
try:
return self.robot_loader.get_joint_states()
except Exception as e:
raise RuntimeError(f"Failed to get joint states: {e}")
def check_workspace_reachability(self, position: List[float],
orientation: List[float] = None) -> bool:
"""Check if a Cartesian position is reachable by the robot"""
self._ensure_initialized()
# Get current joint positions as seed
current_joints = self.get_current_joint_positions()
# Try to solve inverse kinematics - this will expose KDL problems
solution = self.inverse_kinematics(position, orientation, current_joints)
return solution is not None
def check_joint_limits(self, joint_positions: List[float]) -> Tuple[bool, List[str]]:
"""Check joint limits and return violations"""
self._ensure_initialized()
violations = []
joint_limits = self.kinematics_engine.get_joint_limits()
joint_names = self.robot_loader.get_joint_names()
for i, (pos, (lower, upper), name) in enumerate(zip(joint_positions, joint_limits, joint_names)):
if pos < lower:
violations.append(f"{name}: {pos:.3f} < {lower:.3f}")
elif pos > upper:
violations.append(f"{name}: {pos:.3f} > {upper:.3f}")
is_valid = len(violations) == 0
return is_valid, violations
def check_collision(self) -> bool:
"""Check for robot self-collision"""
self._ensure_initialized()
try:
return self.robot_loader.check_self_collision()
except Exception as e:
raise RuntimeError(f"Collision check failed: {e}")
def reset_to_home_position(self, timeout: float = 5.0) -> bool:
"""Reset robot to home position"""
self._ensure_initialized()
if self.home_position is None:
raise RuntimeError("Home position not configured")
return self.move_to_joint_positions(self.home_position, timeout)
def get_kinematic_info(self) -> Dict[str, Any]:
"""Get comprehensive kinematic information"""
self._ensure_initialized()
return {
'num_joints': self.kinematics_engine.get_num_joints(),
'base_link': self.kinematics_engine.base_link,
'end_link': self.kinematics_engine.end_link,
'joint_limits': self.kinematics_engine.get_joint_limits(),
'joint_names': self.robot_loader.get_joint_names(),
'current_pose': self.get_current_pose(),
'current_joint_positions': self.get_current_joint_positions(),
'home_position': self.home_position
}
def get_workspace_bounds(self) -> Dict[str, Tuple[float, float]]:
"""Estimate workspace bounds by sampling reachable positions"""
self._ensure_initialized()
# This is a simplified implementation
# For precise bounds, would need comprehensive sampling
task_points = self.task_points_config
bounds = {
'x': (-2.0, 2.0), # Default bounds
'y': (-2.0, 2.0),
'z': (0.0, 2.5)
}
# Update bounds based on task points if available
if task_points:
positions = []
for point_info in task_points.values():
if 'position' in point_info:
positions.append(point_info['position'])
if positions:
x_coords = [pos[0] for pos in positions]
y_coords = [pos[1] for pos in positions]
z_coords = [pos[2] for pos in positions]
bounds['x'] = (min(x_coords) - 0.5, max(x_coords) + 0.5)
bounds['y'] = (min(y_coords) - 0.5, max(y_coords) + 0.5)
bounds['z'] = (min(z_coords) - 0.5, max(z_coords) + 0.5)
return bounds
def _ensure_initialized(self) -> None:
"""Ensure controller is properly initialized"""
if not self.is_initialized:
raise RuntimeError("ArmController not initialized. Call initialization first.")
if self.kinematics_engine is None:
raise RuntimeError("KDL kinematics engine not initialized")
if self.robot_loader is None:
raise RuntimeError("Robot loader not initialized")
def sample_valid_configuration(self, max_attempts: int = 100) -> Optional[List[float]]:
"""Sample a random valid joint configuration within limits"""
self._ensure_initialized()
joint_limits = self.kinematics_engine.get_joint_limits()
for _ in range(max_attempts):
# Generate random joint positions within limits
joint_positions = []
for lower, upper in joint_limits:
random_pos = np.random.uniform(lower, upper)
joint_positions.append(random_pos)
# Validate configuration
if self.validate_joint_configuration(joint_positions):
return joint_positions
return None # No valid configuration found
def interpolate_joint_path(self, start_joints: List[float],
end_joints: List[float],
num_steps: int = 50) -> List[List[float]]:
"""Interpolate linear path between two joint configurations"""
self._ensure_initialized()
if len(start_joints) != len(end_joints):
raise ValueError("Start and end joint arrays must have same length")
if not self.validate_joint_configuration(start_joints):
raise ValueError("Start joint configuration is invalid")
if not self.validate_joint_configuration(end_joints):
raise ValueError("End joint configuration is invalid")
path = []
for i in range(num_steps + 1):
t = i / num_steps # Parameter from 0 to 1
# Linear interpolation
interpolated = []
for start, end in zip(start_joints, end_joints):
interpolated.append(start + t * (end - start))
path.append(interpolated)
return path
def interpolate_cartesian_path(self, start_pose: Tuple[List[float], List[float]],
end_pose: Tuple[List[float], List[float]],
num_steps: int = 50) -> Optional[List[List[float]]]:
"""Interpolate Cartesian path between two poses"""
self._ensure_initialized()
start_pos, start_orient = start_pose
end_pos, end_orient = end_pose
# Get current joint positions as seed
current_joints = self.get_current_joint_positions()
joint_path = []
for i in range(num_steps + 1):
t = i / num_steps
# Linear interpolation for position
interpolated_pos = []
for start, end in zip(start_pos, end_pos):
interpolated_pos.append(start + t * (end - start))
# Linear interpolation for orientation (simplified)
# For proper orientation interpolation, should use slerp for quaternions
interpolated_orient = []
if start_orient and end_orient:
for start, end in zip(start_orient, end_orient):
interpolated_orient.append(start + t * (end - start))
else:
interpolated_orient = start_orient
# Solve IK for interpolated pose
joint_solution = self.inverse_kinematics(
interpolated_pos, interpolated_orient, current_joints
)
if joint_solution is None:
return None # Path not feasible
joint_path.append(joint_solution)
current_joints = joint_solution # Use as seed for next step
return joint_path
def compute_path_length(self, joint_path: List[List[float]]) -> float:
"""Compute total length of joint path"""
if len(joint_path) < 2:
return 0.0
total_length = 0.0
for i in range(1, len(joint_path)):
# Compute Euclidean distance between consecutive configurations
segment_length = 0.0
for j in range(len(joint_path[i])):
diff = joint_path[i][j] - joint_path[i-1][j]
segment_length += diff * diff
total_length += math.sqrt(segment_length)
return total_length
def validate_trajectory(self, joint_path: List[List[float]],
check_collision: bool = True) -> Tuple[bool, List[str]]:
"""Validate entire trajectory for feasibility"""
self._ensure_initialized()
errors = []
# Check each configuration in path
for i, joint_config in enumerate(joint_path):
# Check joint limits
is_valid, violations = self.check_joint_limits(joint_config)
if not is_valid:
errors.extend([f"Step {i}: {v}" for v in violations])
# Check collision if requested
if check_collision:
# Store original configuration only once
if i == 0:
original_joints = self.get_current_joint_positions()
# Set robot to this configuration for collision check
self.robot_loader.set_joint_positions(joint_config)
if self.check_collision():
errors.append(f"Step {i}: Self-collision detected")
# Restore original configuration only at the end
if i == len(joint_path) - 1:
self.robot_loader.set_joint_positions(original_joints)
is_valid = len(errors) == 0
return is_valid, errors
def diagnose_ik_failure(self, target_position: List[float],
target_orientation: List[float] = None) -> Dict[str, Any]:
"""Diagnose why inverse kinematics failed for given target"""
self._ensure_initialized()
diagnosis = {
'target_position': target_position,
'target_orientation': target_orientation,
'reachable': False,
'issues': []
}
# Check workspace bounds
bounds = self.get_workspace_bounds()
x, y, z = target_position
if not (bounds['x'][0] <= x <= bounds['x'][1]):
diagnosis['issues'].append(f"X coordinate {x:.3f} outside bounds {bounds['x']}")
if not (bounds['y'][0] <= y <= bounds['y'][1]):
diagnosis['issues'].append(f"Y coordinate {y:.3f} outside bounds {bounds['y']}")
if not (bounds['z'][0] <= z <= bounds['z'][1]):
diagnosis['issues'].append(f"Z coordinate {z:.3f} outside bounds {bounds['z']}")
# Try IK with multiple seeds
seeds_tried = []
for _ in range(5): # Try 5 different seeds
seed = self.sample_valid_configuration()
if seed is not None:
seeds_tried.append(seed)
solution = self.inverse_kinematics(target_position, target_orientation, seed)
if solution is not None:
diagnosis['reachable'] = True
diagnosis['solution'] = solution
break
diagnosis['seeds_attempted'] = len(seeds_tried)
if not diagnosis['reachable'] and len(diagnosis['issues']) == 0:
diagnosis['issues'].append("Target may be at singular configuration or unreachable")
return diagnosis
def create_arm_controller(config_loader: ConfigLoader, physics_client: int) -> ArmController:
"""Create ArmController instance from configuration"""
return ArmController(config_loader, physics_client)