fix: 修复路径执行精度问题

- 修改可视化方法显示真实的插值轨迹
- 修复RRT*算法路径终点精度,确保以精确目标结束
- 删除路径执行器的垃圾提前返回逻辑

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
sladro 2025-09-13 17:24:13 +08:00
parent 8fcd1c1d8c
commit c6a637d9b3
3 changed files with 211 additions and 246 deletions

View File

@ -23,6 +23,10 @@ from src.planning.path_executor import PathExecutor
# GUI设置
DEFAULT_WINDOW_SIZE = "1200x800"
# 按钮文本常量
EXECUTE_BUTTON_TEXT = "执行三阶段任务"
EXECUTE_BUTTON_BG = "#00BCD4"
# 相机默认设置
DEFAULT_CAMERA_DISTANCE = 4.0
DEFAULT_CAMERA_YAW = 45
@ -50,7 +54,6 @@ class MainWindow:
self.collision_checker = None
self.path_optimizer = None
self.path_executor = None
self.planned_path = None
# Simulation state
self.simulation_running = False
@ -131,19 +134,13 @@ class MainWindow:
# Path planning section
planning_frame = tk.LabelFrame(left_panel, text="Path Planning", padx=10, pady=10)
planning_frame.pack(fill=tk.X, pady=(0, 10))
self.plan_btn = tk.Button(planning_frame, text="Plan Path (AI RRT*)",
command=self.plan_path,
bg="#00BCD4", fg="white", padx=10, pady=5)
self.plan_btn.pack(fill=tk.X)
self.execute_btn = tk.Button(planning_frame, text="Execute Path",
command=self.execute_path,
bg="#8BC34A", fg="white", padx=10, pady=5,
state=tk.DISABLED)
self.execute_btn.pack(fill=tk.X, pady=(5, 0))
tk.Label(planning_frame, text="Plan and execute path through hole",
self.execute_btn = tk.Button(planning_frame, text=EXECUTE_BUTTON_TEXT,
command=self.execute_three_stages,
bg=EXECUTE_BUTTON_BG, fg="white", padx=10, pady=5)
self.execute_btn.pack(fill=tk.X)
tk.Label(planning_frame, text="Execute three-stage path: Current→Object→A→B",
font=("Arial", 9)).pack(pady=(5, 0))
# Status section
@ -364,10 +361,6 @@ class MainWindow:
p.removeBody(marker_id)
self.point_markers = []
# Reset planned path
self.planned_path = None
self.execute_btn.config(state=tk.DISABLED)
# Reset camera
self._reset_camera_view()
@ -471,284 +464,256 @@ class MainWindow:
except Exception as e:
self.update_status(f"Failed to initialize planning components: {e}")
def plan_path(self):
"""Plan path from A to B through hole using AI RRT*"""
self.update_status("Starting AI RRT* path planning...")
# 暂停仿真线程,避免规划期间并发 step 引入副作用
def execute_three_stages(self):
"""执行三阶段任务当前位置→物体→A点→B点"""
self.update_status("Starting three-stage execution...")
# 暂停仿真以避免规划期间的副作用
was_running = self.simulation_running
if was_running:
self.pause_simulation()
try:
# Get task points and object position
# 获取配置中的任务点和物体位置
task_points = self.config_loader.get_task_points()
point_a = task_points['point_A']['position']
point_b = task_points['point_B']['position']
# Get transport object position from config
transport_object_config = self.config_loader.get_full_config()['transport_object']
object_position = transport_object_config['initial_position']
# Get current joint configuration
# 清除旧的可视化
self._clear_visualizations()
# ===================
# 第一阶段:当前位置 → 物体
# ===================
self.update_status("Stage 1: Planning path from current position to object...")
# 获取当前关节配置
current_config = self.arm_controller.get_current_joint_positions()
# Convert points to joint configurations using inverse kinematics
self.update_status("Computing target configurations...")
# Object position configuration
# 计算物体位置的逆运动学
config_object = self.arm_controller.inverse_kinematics(object_position, seed_angles=current_config)
if config_object is None:
raise RuntimeError("Cannot find joint configuration for object position")
# A point configuration
config_a = self.arm_controller.inverse_kinematics(point_a, seed_angles=config_object)
if config_a is None:
raise RuntimeError("Cannot find joint configuration for point A")
# B point configuration
config_b = self.arm_controller.inverse_kinematics(point_b, seed_angles=config_a)
if config_b is None:
raise RuntimeError("Cannot find joint configuration for point B")
# Plan three path segments
# Segment 1: Current position to object (ignore transport object collision)
self.update_status("Planning path from current position to object...")
# Temporarily ignore transport object for approaching it
# 规划路径(忽略物体碰撞)
self.collision_checker.ignore_transport_object = True
try:
path_to_object = self.path_planner.plan_auto(current_config, config_object)
finally:
# Restore collision checking for transport object
self.collision_checker.ignore_transport_object = False
# Segment 2: Object to A
self.update_status("Planning path from object to A...")
path_object_to_a = self.path_planner.plan_auto(config_object, config_a)
# Segment 3: A to B (通用避障;若配置要求强制穿洞,内部会选择穿洞策略)
self.update_status("Planning path from A to B...")
path_a_to_b = self.path_planner.plan_auto(config_a, config_b)
# Optimize paths separately to maintain segment information
self.update_status("Optimizing path segments...")
# Optimize path to object (ignore transport object collision)
self.collision_checker.ignore_transport_object = True
try:
optimized_path_to_object = self.path_optimizer.optimize_path(path_to_object, self.collision_checker)
finally:
self.collision_checker.ignore_transport_object = False
optimized_path_object_to_a = self.path_optimizer.optimize_path(path_object_to_a, self.collision_checker)
optimized_path_a_to_b = self.path_optimizer.optimize_path(path_a_to_b, self.collision_checker)
# Merge optimized paths (remove duplicate points)
self.planned_path = (optimized_path_to_object[:-1] +
optimized_path_object_to_a[:-1] +
optimized_path_a_to_b)
# Store segment boundaries
self.object_index = len(optimized_path_to_object) - 1
self.point_a_index = self.object_index + len(optimized_path_object_to_a) - 1
path_length = len(self.planned_path)
self.update_status(f"Path planning successful! Full path contains {path_length} waypoints")
# Enable execute button
self.execute_btn.config(state=tk.NORMAL)
# Visualize path with segments
self._visualize_path(self.planned_path)
except Exception as e:
import traceback
self.update_status(f"Path planning failed: {str(e)}")
print("Full traceback:")
traceback.print_exc()
self.planned_path = None
self.execute_btn.config(state=tk.DISABLED)
finally:
# 规划完成后恢复仿真运行状态
if was_running:
self.start_simulation()
def execute_path(self):
"""Execute the planned path using pre-planned segments"""
self.update_status("Starting three-stage execution using planned path...")
# 可视化第一段路径(紫色)
self._visualize_path_segment(optimized_path_to_object, [0.8, 0, 0.8], "To Object")
try:
# Check if planned path exists
if self.planned_path is None:
raise RuntimeError("No planned path available. Please run path planning first.")
# Stage 1: Execute path to object using pre-planned segment
# 执行第一段路径
self.update_status("Stage 1: Moving to object...")
path_to_object = self.planned_path[:self.object_index + 1]
self.collision_checker.ignore_transport_object = True
try:
success = self.path_executor.execute_path(path_to_object, gripper_action=None)
success = self.path_executor.execute_path(optimized_path_to_object, gripper_action=None)
finally:
self.collision_checker.ignore_transport_object = False
if not success:
raise RuntimeError("Stage 1 failed: Could not reach object")
# Close gripper
# 抓取物体
self.update_status("Closing gripper at object...")
if hasattr(self.arm_controller, 'close_gripper'):
self.arm_controller.close_gripper()
import time
time.sleep(0.5)
# After gripping, ignore transport object collision
# 从现在开始忽略物体碰撞(因为已经抓取)
self.collision_checker.ignore_transport_object = True
# Stage 2: Execute path from object to A using pre-planned segment
self.update_status("Stage 2: Moving to point A...")
path_to_a = self.planned_path[self.object_index:self.point_a_index + 1]
# ===================
# 第二阶段:物体 → A点
# ===================
self.update_status("Stage 2: Planning path from object to point A...")
success = self.path_executor.execute_path(path_to_a, gripper_action=None)
# 计算A点的逆运动学
config_a = self.arm_controller.inverse_kinematics(point_a, seed_angles=config_object)
if config_a is None:
raise RuntimeError("Cannot find joint configuration for point A")
# 规划路径
path_object_to_a = self.path_planner.plan_auto(config_object, config_a)
optimized_path_object_to_a = self.path_optimizer.optimize_path(path_object_to_a, self.collision_checker)
# 可视化第二段路径(绿色)
self._visualize_path_segment(optimized_path_object_to_a, [0, 1, 0], "To A")
# 执行第二段路径
self.update_status("Stage 2: Moving to point A...")
success = self.path_executor.execute_path(optimized_path_object_to_a, gripper_action=None)
if not success:
raise RuntimeError("Stage 2 failed: Could not reach point A")
# Stage 3: Execute path from A to B using pre-planned segment
self.update_status("Stage 3: Moving to point B...")
path_to_b = self.planned_path[self.point_a_index:]
# ===================
# 第三阶段A点 → B点
# ===================
self.update_status("Stage 3: Planning path from A to B...")
success = self.path_executor.execute_path(path_to_b, gripper_action=None)
# 计算B点的逆运动学
config_b = self.arm_controller.inverse_kinematics(point_b, seed_angles=config_a)
if config_b is None:
raise RuntimeError("Cannot find joint configuration for point B")
# 规划路径
path_a_to_b = self.path_planner.plan_auto(config_a, config_b)
optimized_path_a_to_b = self.path_optimizer.optimize_path(path_a_to_b, self.collision_checker)
# 可视化第三段路径(蓝色)
self._visualize_path_segment(optimized_path_a_to_b, [0, 0, 1], "To B")
# 执行第三段路径
self.update_status("Stage 3: Moving to point B...")
success = self.path_executor.execute_path(optimized_path_a_to_b, gripper_action=None)
if not success:
raise RuntimeError("Stage 3 failed: Could not reach point B")
# Open gripper at B
# 释放物体
self.update_status("Opening gripper at point B...")
if hasattr(self.arm_controller, 'open_gripper'):
self.arm_controller.open_gripper()
time.sleep(0.5)
# 添加任务点标记
self._add_point_markers(object_position, point_a, point_b)
self.update_status("All three stages completed successfully!")
except Exception as e:
import traceback
self.update_status(f"Path execution error: {str(e)}")
self.update_status(f"Execution error: {str(e)}")
print("Full traceback:")
traceback.print_exc()
finally:
# Restore collision checking
# 恢复碰撞检测
self.collision_checker.ignore_transport_object = False
def _visualize_path(self, path):
"""Visualize the planned path in PyBullet"""
try:
# Remove old visualization if exists
if hasattr(self, 'path_visualization'):
for line_id in self.path_visualization:
p.removeUserDebugItem(line_id)
# Remove old markers if exists
if hasattr(self, 'point_markers'):
for marker_id in self.point_markers:
p.removeBody(marker_id)
# 恢复仿真运行状态
if was_running:
self.start_simulation()
def _visualize_path_segment(self, path, color, label):
"""可视化单个路径段 - 显示实际执行轨迹"""
if not hasattr(self, 'path_visualization'):
self.path_visualization = []
self.point_markers = []
# Get task points and object position for markers
task_points = self.config_loader.get_task_points()
point_a_pos = task_points['point_A']['position']
point_b_pos = task_points['point_B']['position']
object_pos = self.config_loader.get_full_config()['transport_object']['initial_position']
# Draw path as lines between waypoints仅用 KDL FK 计算,不改仿真状态)
for i in range(len(path) - 1):
start_pos, _ = self.arm_controller.forward_kinematics(path[i])
end_pos, _ = self.arm_controller.forward_kinematics(path[i + 1])
# Choose color based on segment
if hasattr(self, 'object_index') and i < self.object_index:
# Path from initial to object - Purple
color = [0.8, 0, 0.8]
elif hasattr(self, 'point_a_index') and i < self.point_a_index:
# Path from object to A - Green
color = [0, 1, 0]
else:
# Path from A to B - Blue
color = [0, 0, 1]
# Draw line
line_id = p.addUserDebugLine(
start_pos, end_pos,
lineColorRGB=color,
lineWidth=2,
physicsClientId=self.physics_client
# 导入numpy用于插值计算
import numpy as np
# 对每两个waypoint之间进行插值显示实际轨迹
for i in range(len(path) - 1):
current_config = path[i]
target_config = path[i + 1]
# 计算插值步数与path_executor相同的逻辑
distance = np.linalg.norm(np.array(target_config) - np.array(current_config))
if distance < 0.001: # 跳过太近的点
continue
# 使用与path_executor相同的参数
velocity_scaling = self.config_loader.get_full_config()['path_planning']['execution']['velocity_scaling']
timestep = self.config_loader.get_full_config()['simulation']['timestep']
move_time = distance / velocity_scaling
num_steps = max(5, int(move_time / timestep)) # 至少5步以保证平滑
# 插值并绘制
prev_pos = None
for step in range(num_steps + 1):
ratio = step / num_steps
interpolated = (
np.array(current_config) * (1 - ratio) +
np.array(target_config) * ratio
)
self.path_visualization.append(line_id)
# Add markers for object, A and B points
# Marker for object - Orange sphere
sphere_obj = p.createVisualShape(
p.GEOM_SPHERE,
radius=0.05,
rgbaColor=[1, 0.5, 0, 0.8], # Orange
physicsClientId=self.physics_client
)
marker_obj = p.createMultiBody(
baseVisualShapeIndex=sphere_obj,
basePosition=object_pos,
physicsClientId=self.physics_client
)
self.point_markers.append(marker_obj)
# Marker for point A - Yellow sphere
sphere_a = p.createVisualShape(
p.GEOM_SPHERE,
radius=0.05,
rgbaColor=[1, 1, 0, 0.8], # Yellow
physicsClientId=self.physics_client
)
marker_a = p.createMultiBody(
baseVisualShapeIndex=sphere_a,
basePosition=point_a_pos,
physicsClientId=self.physics_client
)
self.point_markers.append(marker_a)
# Marker for point B - Red sphere
sphere_b = p.createVisualShape(
p.GEOM_SPHERE,
radius=0.05,
rgbaColor=[1, 0, 0, 0.8], # Red
physicsClientId=self.physics_client
)
marker_b = p.createMultiBody(
baseVisualShapeIndex=sphere_b,
basePosition=point_b_pos,
physicsClientId=self.physics_client
)
self.point_markers.append(marker_b)
# Add text labels
p.addUserDebugText(
"OBJ", object_pos,
textColorRGB=[1, 0.5, 0],
textSize=1.5,
physicsClientId=self.physics_client
)
p.addUserDebugText(
"A", point_a_pos,
textColorRGB=[1, 1, 0],
textSize=1.5,
physicsClientId=self.physics_client
)
p.addUserDebugText(
"B", point_b_pos,
textColorRGB=[1, 0, 0],
textSize=1.5,
physicsClientId=self.physics_client
)
# 可选:不恢复任何机器人状态(可视化不改状态)
except Exception as e:
self.update_status(f"Path visualization failed: {e}")
# 计算插值点的末端位置
pos, _ = self.arm_controller.forward_kinematics(interpolated.tolist())
if prev_pos is not None:
# 画线段
line_id = p.addUserDebugLine(
prev_pos, pos,
lineColorRGB=color,
lineWidth=2,
physicsClientId=self.physics_client
)
self.path_visualization.append(line_id)
prev_pos = pos
self.update_status(f"Visualized {label} path: {len(path)} waypoints with interpolation")
def _clear_visualizations(self):
"""清除所有可视化元素"""
# 清除路径线条
if hasattr(self, 'path_visualization'):
for line_id in self.path_visualization:
p.removeUserDebugItem(line_id)
self.path_visualization = []
# 清除点标记
if hasattr(self, 'point_markers'):
for marker_id in self.point_markers:
p.removeBody(marker_id)
self.point_markers = []
def _add_point_markers(self, object_pos, point_a_pos, point_b_pos):
"""添加任务点标记"""
if not hasattr(self, 'point_markers'):
self.point_markers = []
# 物体标记 - 橙色
sphere_obj = p.createVisualShape(
p.GEOM_SPHERE,
radius=0.05,
rgbaColor=[1, 0.5, 0, 0.8],
physicsClientId=self.physics_client
)
marker_obj = p.createMultiBody(
baseVisualShapeIndex=sphere_obj,
basePosition=object_pos,
physicsClientId=self.physics_client
)
self.point_markers.append(marker_obj)
# A点标记 - 黄色
sphere_a = p.createVisualShape(
p.GEOM_SPHERE,
radius=0.05,
rgbaColor=[1, 1, 0, 0.8],
physicsClientId=self.physics_client
)
marker_a = p.createMultiBody(
baseVisualShapeIndex=sphere_a,
basePosition=point_a_pos,
physicsClientId=self.physics_client
)
self.point_markers.append(marker_a)
# B点标记 - 红色
sphere_b = p.createVisualShape(
p.GEOM_SPHERE,
radius=0.05,
rgbaColor=[1, 0, 0, 0.8],
physicsClientId=self.physics_client
)
marker_b = p.createMultiBody(
baseVisualShapeIndex=sphere_b,
basePosition=point_b_pos,
physicsClientId=self.physics_client
)
self.point_markers.append(marker_b)
# 添加文本标签
p.addUserDebugText("OBJ", object_pos, textColorRGB=[1, 0.5, 0], textSize=1.5, physicsClientId=self.physics_client)
p.addUserDebugText("A", point_a_pos, textColorRGB=[1, 1, 0], textSize=1.5, physicsClientId=self.physics_client)
p.addUserDebugText("B", point_b_pos, textColorRGB=[1, 0, 0], textSize=1.5, physicsClientId=self.physics_client)
def _reset_camera_view(self):
"""Reset camera to default view"""

View File

@ -206,10 +206,14 @@ class AIRRTStarPlanner:
if np.linalg.norm(new_config - np.array(goal_config)) < GOAL_TOLERANCE:
# 找到路径,提取并返回
path = self._extract_path(new_node)
# 确保路径以精确的目标配置结束
if not np.allclose(path[-1], goal_config, atol=1e-6):
path.append(goal_config.tolist() if isinstance(goal_config, np.ndarray) else goal_config)
# 缓存成功配置
self._cache_success_configs(path)
return path
else:
self.params.update(success=False)

View File

@ -70,19 +70,15 @@ class PathExecutor:
return True
def _move_to_configuration(self, target_config: List[float]) -> bool:
current_config = self.arm_controller.get_current_joint_positions()
distance = np.linalg.norm(
np.array(target_config) - np.array(current_config)
)
if distance < self.position_tolerance:
return True
move_time = distance / self.velocity_scaling
num_steps = int(move_time / self.timestep) + 1