fix: 修复path_executor.py文件的编码和语法错误

- 修复文件中的中文乱码问题
- 删除重复的pybullet导入
- 修复第182行被注释掉的关键代码
- 清理所有空注释,保持代码整洁

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
sladro 2025-09-12 16:46:05 +08:00
parent 23aff4077a
commit 62497af8be

View File

@ -2,125 +2,103 @@
"""
Path Executor Module
緞鎵ц潡锛屾墽琛岃鍒掑ソ鐨勮矾寰勩?
MVP鐗堟湰锛氬彧瀹炵幇鏈鍩烘湰鐨勮矾寰勬墽琛屽姛鑳姐?
路径执行模块执行规划好的路径
MVP版本只实现最基本的路径执行功能
閰嶇疆椹卞姩锛氭墍鏈夊弬鏁颁粠config.json璇诲彇
閿欒澶勭悊锛氬け璐ョ珛鍗虫姏鍑哄紓甯革紝鏃犲悗澶囨柟妗?
配置驱动所有参数从config.json读取
错误处理失败立即抛出异常无后备方案
"""
import numpy as np
from typing import List, Dict, Any
import time
import pybullet as p
import pybullet as p
class PathExecutor:
"""璺緞鎵ц鍣?""
"""路径执行器"""
def __init__(self, arm_controller, config_loader):
"""
鍒濆鍖栬矾寰勬墽琛屽櫒
初始化路径执行器
Args:
arm_controller: 鏈烘鑷傛帶鍒跺櫒
config_loader: 閰嶇疆鍔犺浇鍣?
arm_controller: 机械臂控制器
config_loader: 配置加载器
"""
self.arm_controller = arm_controller
self.config_loader = config_loader
# 浠庨厤缃鍙栨墽琛屽弬鏁?
#
config = config_loader.get_full_config()
execution_config = config['path_planning']['execution']
self.velocity_scaling = execution_config['velocity_scaling']
self.position_tolerance = execution_config['position_tolerance']
# 浠庨厤缃鍙栦豢鐪熷弬鏁?
#
simulation_config = config['simulation']
self.timestep = simulation_config['timestep']
def execute_path(self, path: List[List[float]], gripper_action: str = None) -> bool:
"""
鎵ц
Args:
path: 緞鐐瑰垪琛紙鍏宠妭閰嶇疆锛?
gripper_action: 澶圭埅鍔ㄤ綔锛?open', 'close', None锛?
Returns:
True if execution successful
Raises:
ValueError: 緞鏃犳晥
RuntimeError: 鎵ц澶辫触
"""
if len(path) < 2:
raise ValueError("Path must have at least 2 points")
# 楠岃瘉璺緞鐐?
#
for i, config in enumerate(path):
if not self.arm_controller.validate_joint_configuration(config):
raise ValueError(f"Invalid joint configuration at point {i}")
# 鎵ц璺緞
for i, target_config in enumerate(path):
# 绉诲姩鍒扮洰鏍囬厤缃?
success = self._move_to_configuration(target_config)
if not success:
raise RuntimeError(f"Failed to reach waypoint {i}")
# 鍦ㄧ涓€涓偣鎵ц澶圭埅鍔ㄤ綔
if i == 0 and gripper_action == 'close':
self._close_gripper()
# 鍦ㄦ渶鍚庝竴涓偣鎵ц澶圭埅鍔ㄤ綔
#
if i == len(path) - 1 and gripper_action == 'open':
self._open_gripper()
return True
def _move_to_configuration(self, target_config: List[float]) -> bool:
"""
绉诲姩鍒扮洰鏍囬厤缃?
Args:
target_config: 爣鍏宠妭閰嶇疆
Returns:
True if reached target
"""
current_config = self.arm_controller.get_current_joint_positions()
# 璁$畻璺濈
distance = np.linalg.norm(
np.array(target_config) - np.array(current_config)
)
# 濡傛灉宸茬粡鍒拌揪锛岀洿鎺ヨ繑鍥?
if distance < self.position_tolerance:
return True
# 璁$畻绉诲姩姝ユ暟锛堝熀浜庨€熷害鍜屾椂闂存锛?
move_time = distance / self.velocity_scaling
num_steps = int(move_time / self.timestep) + 1
# 閫愭绉诲姩
#
for step in range(1, num_steps + 1):
# 绾挎€ф彃鍊?
ratio = step / num_steps
interpolated = (
np.array(current_config) * (1 - ratio) +
np.array(target_config) * ratio
)
# 璁剧疆鍏宠妭浣嶇疆
self.arm_controller.set_joint_positions(interpolated.tolist())
# 绛夊緟涓€涓椂闂存
time.sleep(self.timestep)
# 璁惧畾鐩爣鏈偣锛屽苟绛夊緟鏀舵暃鍒板宸唴锛堣秴鏃跺垯澶辫触锛屾毚闇查棶棰橈級
self.arm_controller.set_joint_positions(target_config)
wait_start = time.time()
@ -134,40 +112,23 @@ class PathExecutor:
return True
time.sleep(self.timestep)
# 瓒呮椂浠嶆湭鍒拌揪锛屾姏閿欐毚闇查棶棰? final_config = self.arm_controller.get_current_joint_positions()
# final_config = self.arm_controller.get_current_joint_positions()
final_distance = np.linalg.norm(np.array(target_config) - np.array(final_config))
raise RuntimeError(f"Failed to reach target, error: {final_distance}")
def _close_gripper(self):
"""鍏抽棴澶圭埅"""
if hasattr(self.arm_controller, 'close_gripper'):
self.arm_controller.close_gripper()
def _open_gripper(self):
"""鎵撳紑澶圭埅"""
if hasattr(self.arm_controller, 'open_gripper'):
self.arm_controller.open_gripper()
def execute_task_sequence(self, point_a: List[float], point_b: List[float],
paths: Dict[str, List[List[float]]]) -> bool:
"""
鎵ц瀹屾暣鐨勪换鍔簭鍒楋細鍙栫墿 ?绌胯秺 ?鏀剧疆
Args:
point_a: A鐐逛綅缃紙鍙栫墿鐐癸級
point_b: B鐐逛綅缃紙鏀剧疆鐐癸級
paths: 鍖呭惈鍚勯樁娈佃矾寰勭殑瀛楀吀
- 'to_a': 鍒癆鐐圭殑璺
- 'through_hole': 绌胯秺娲炲彛鐨勮矾寰?
- 'to_b': 鍒癇鐐圭殑璺
Returns:
True if task completed successfully
Raises:
RuntimeError: 浠诲姟鎵ц澶辫触
"""
# 闃舵1: 绉诲姩鍒癆鐐?
if 'to_a' not in paths:
raise ValueError("Missing path to point A")
@ -175,11 +136,11 @@ class PathExecutor:
if not success:
raise RuntimeError("Failed to reach point A")
# 闃舵2: 鎶撳彇鐗╀綋
self._close_gripper()
time.sleep(self.timestep * 10) # 绛夊緟澶圭埅鍏抽棴
# 闃舵3: 绌胯秺娲炲彛鍒癇鐐?
self._close_gripper()
time.sleep(self.timestep * 10)
if 'through_hole' in paths:
success = self.execute_path(paths['through_hole'])
elif 'to_b' in paths:
@ -190,15 +151,14 @@ class PathExecutor:
if not success:
raise RuntimeError("Failed to reach point B")
# 闃舵4: 閲婃斁鐗╀綋
self._open_gripper()
time.sleep(self.timestep * 10) # 绛夊緟澶圭埅鎵撳紑
time.sleep(self.timestep * 10) # 等待夹爪打开
return True
def _move_to_configuration_reset(self, target_config: List[float]) -> bool:
"""
浠ユ彃鍊?+ resetJointState 鐨勬柟寮忔墽琛屽埌鐩爣閰嶇疆銆? 鏍稿績锛氱淇濆彲瑙嗗寲涓兘鐪嬪埌鏈烘鑷傛寜璺緞绉诲姩锛屼笉渚濊禆鐢垫満鍔涚煩閰嶇疆銆? """
current_config = self.arm_controller.get_current_joint_positions()
distance = np.linalg.norm(np.array(target_config) - np.array(current_config))
@ -218,7 +178,8 @@ class PathExecutor:
interpolated = (
np.array(current_config) * (1 - ratio) + np.array(target_config) * ratio
).tolist()
# 灏?KDL 椤哄簭杞崲涓?PyBullet 椤哄簭鍐嶄笅鍙? q_pb = self.arm_controller._to_pb_order(interpolated)
# 将KDL顺序转换为PyBullet顺序
q_pb = self.arm_controller._to_pb_order(interpolated)
for j, jid in enumerate(joint_indices):
p.resetJointState(robot_id, jid, float(q_pb[j]), physicsClientId=cid)
time.sleep(self.timestep)