feat: 实现完整的三段路径规划和可视化

主要改进:
- 添加初始位置→物体→A点→B点的完整路径规划
- 实现分段路径可视化(紫色/绿色/蓝色区分不同段)
- 统一使用单一CollisionChecker实例,支持忽略运输物体
- 修复路径优化器类型错误和移除回退方案
- 调整配置参数(物体高度0.7m,安全边界0.01m)

技术细节:
- AIRRTStarPlanner接受外部collision_checker参数
- 路径执行器分段处理,正确控制夹爪动作
- 添加物体、A点、B点的3D标记和文字标签

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
sladro 2025-09-12 17:40:28 +08:00
parent 62497af8be
commit 2ad7049386
5 changed files with 231 additions and 28 deletions

View File

@ -67,7 +67,7 @@
"initial_position": [
1.0,
0.5,
0.5
0.7
],
"dimensions": {
"width": 0.1,
@ -124,7 +124,7 @@
},
"collision": {
"check_resolution": 0.05,
"safety_margin": 0.02
"safety_margin": 0.01
},
"optimization": {
"shortcut_iterations": 50,

View File

@ -348,6 +348,22 @@ class MainWindow:
if self.environment:
self.environment.reset_environment()
# Clear path visualization
if hasattr(self, 'path_visualization'):
for line_id in self.path_visualization:
p.removeUserDebugItem(line_id)
self.path_visualization = []
# Clear point markers
if hasattr(self, 'point_markers'):
for marker_id in self.point_markers:
p.removeBody(marker_id)
self.point_markers = []
# Reset planned path
self.planned_path = None
self.execute_btn.config(state=tk.DISABLED)
# Reset camera
self._reset_camera_view()
@ -426,7 +442,8 @@ class MainWindow:
self.path_planner = AIRRTStarPlanner(
self.arm_controller,
self.environment,
self.config_loader
self.config_loader,
self.collision_checker # Pass the shared collision checker
)
self.path_optimizer = PathOptimizer(
self.arm_controller,
@ -449,40 +466,82 @@ class MainWindow:
self.pause_simulation()
try:
# Get task points
# Get task points and object position
task_points = self.config_loader.get_task_points()
point_a = task_points['point_A']['position']
point_b = task_points['point_B']['position']
# Get transport object position from config
transport_object_config = self.config_loader.get_full_config()['transport_object']
object_position = transport_object_config['initial_position']
# Get current joint configuration
current_config = self.arm_controller.get_current_joint_positions()
# Convert points to joint configurations using inverse kinematics
self.update_status("Computing target configurations...")
config_a = self.arm_controller.inverse_kinematics(point_a, seed_angles=current_config)
# Object position configuration
config_object = self.arm_controller.inverse_kinematics(object_position, seed_angles=current_config)
if config_object is None:
raise RuntimeError("Cannot find joint configuration for object position")
# A point configuration
config_a = self.arm_controller.inverse_kinematics(point_a, seed_angles=config_object)
if config_a is None:
raise RuntimeError("Cannot find joint configuration for point A")
# B point configuration
config_b = self.arm_controller.inverse_kinematics(point_b, seed_angles=config_a)
if config_b is None:
raise RuntimeError("Cannot find joint configuration for point B")
# Plan path (通用避障;若配置要求强制穿洞,内部会选择穿洞策略)
self.update_status("Planning path...")
raw_path = self.path_planner.plan_auto(config_a, config_b)
# Plan three path segments
# Segment 1: Current position to object (ignore transport object collision)
self.update_status("Planning path from current position to object...")
# Temporarily ignore transport object for approaching it
self.collision_checker.ignore_transport_object = True
try:
path_to_object = self.path_planner.plan_auto(current_config, config_object)
finally:
# Restore collision checking for transport object
self.collision_checker.ignore_transport_object = False
# Optimize path
self.update_status("Optimizing path...")
optimized_path = self.path_optimizer.optimize_path(raw_path, self.collision_checker)
# Segment 2: Object to A
self.update_status("Planning path from object to A...")
path_object_to_a = self.path_planner.plan_auto(config_object, config_a)
self.planned_path = optimized_path
# Segment 3: A to B (通用避障;若配置要求强制穿洞,内部会选择穿洞策略)
self.update_status("Planning path from A to B...")
path_a_to_b = self.path_planner.plan_auto(config_a, config_b)
# Optimize paths separately to maintain segment information
self.update_status("Optimizing path segments...")
# Optimize path to object (ignore transport object collision)
self.collision_checker.ignore_transport_object = True
try:
optimized_path_to_object = self.path_optimizer.optimize_path(path_to_object, self.collision_checker)
finally:
self.collision_checker.ignore_transport_object = False
optimized_path_object_to_a = self.path_optimizer.optimize_path(path_object_to_a, self.collision_checker)
optimized_path_a_to_b = self.path_optimizer.optimize_path(path_a_to_b, self.collision_checker)
# Merge optimized paths (remove duplicate points)
self.planned_path = (optimized_path_to_object[:-1] +
optimized_path_object_to_a[:-1] +
optimized_path_a_to_b)
# Store segment boundaries
self.object_index = len(optimized_path_to_object) - 1
self.point_a_index = self.object_index + len(optimized_path_object_to_a) - 1
path_length = len(self.planned_path)
self.update_status(f"Path planning successful! Path contains {path_length} waypoints")
self.update_status(f"Path planning successful! Full path contains {path_length} waypoints")
# Enable execute button
self.execute_btn.config(state=tk.NORMAL)
# Visualize path (optional)
# Visualize path with segments
self._visualize_path(self.planned_path)
except Exception as e:
@ -506,13 +565,57 @@ class MainWindow:
self.update_status("Executing planned path...")
try:
# Execute path with gripper control
success = self.path_executor.execute_path(self.planned_path, gripper_action='close')
if success:
# Split path into three segments
if hasattr(self, 'object_index') and hasattr(self, 'point_a_index'):
# Segment 1: Initial to object (no gripper action)
path_to_object = self.planned_path[:self.object_index + 1]
# Segment 2: Object to A (gripper closed)
path_object_to_a = self.planned_path[self.object_index:self.point_a_index + 1]
# Segment 3: A to B (gripper closed)
path_a_to_b = self.planned_path[self.point_a_index:]
# Execute path to object
self.update_status("Moving to object...")
success = self.path_executor.execute_path(path_to_object, gripper_action=None)
if not success:
self.update_status("Failed to reach object")
return
# Close gripper at object
self.update_status("Closing gripper at object...")
if hasattr(self.arm_controller, 'close_gripper'):
self.arm_controller.close_gripper()
import time
time.sleep(0.5) # Wait for gripper to close
# Execute path from object to A
self.update_status("Moving object to point A...")
success = self.path_executor.execute_path(path_object_to_a, gripper_action=None)
if not success:
self.update_status("Failed to reach point A")
return
# Execute path from A to B
self.update_status("Moving from A to B through hole...")
success = self.path_executor.execute_path(path_a_to_b, gripper_action=None)
if not success:
self.update_status("Failed to reach point B")
return
# Open gripper at B
self.update_status("Opening gripper at point B...")
if hasattr(self.arm_controller, 'open_gripper'):
self.arm_controller.open_gripper()
time.sleep(0.5) # Wait for gripper to open
self.update_status("Path execution completed successfully!")
else:
self.update_status("Path execution failed")
# Fallback to old behavior if no segment information
success = self.path_executor.execute_path(self.planned_path, gripper_action=None)
if success:
self.update_status("Path execution completed!")
else:
self.update_status("Path execution failed")
except Exception as e:
import traceback
@ -528,22 +631,108 @@ class MainWindow:
for line_id in self.path_visualization:
p.removeUserDebugItem(line_id)
# Remove old markers if exists
if hasattr(self, 'point_markers'):
for marker_id in self.point_markers:
p.removeBody(marker_id)
self.path_visualization = []
self.point_markers = []
# Get task points and object position for markers
task_points = self.config_loader.get_task_points()
point_a_pos = task_points['point_A']['position']
point_b_pos = task_points['point_B']['position']
object_pos = self.config_loader.get_full_config()['transport_object']['initial_position']
# Draw path as lines between waypoints仅用 KDL FK 计算,不改仿真状态)
for i in range(len(path) - 1):
start_pos, _ = self.arm_controller.forward_kinematics(path[i])
end_pos, _ = self.arm_controller.forward_kinematics(path[i + 1])
# Choose color based on segment
if hasattr(self, 'object_index') and i < self.object_index:
# Path from initial to object - Purple
color = [0.8, 0, 0.8]
elif hasattr(self, 'point_a_index') and i < self.point_a_index:
# Path from object to A - Green
color = [0, 1, 0]
else:
# Path from A to B - Blue
color = [0, 0, 1]
# Draw line
line_id = p.addUserDebugLine(
start_pos, end_pos,
lineColorRGB=[0, 1, 0], # Green color
lineColorRGB=color,
lineWidth=2,
physicsClientId=self.physics_client
)
self.path_visualization.append(line_id)
# Add markers for object, A and B points
# Marker for object - Orange sphere
sphere_obj = p.createVisualShape(
p.GEOM_SPHERE,
radius=0.05,
rgbaColor=[1, 0.5, 0, 0.8], # Orange
physicsClientId=self.physics_client
)
marker_obj = p.createMultiBody(
baseVisualShapeIndex=sphere_obj,
basePosition=object_pos,
physicsClientId=self.physics_client
)
self.point_markers.append(marker_obj)
# Marker for point A - Yellow sphere
sphere_a = p.createVisualShape(
p.GEOM_SPHERE,
radius=0.05,
rgbaColor=[1, 1, 0, 0.8], # Yellow
physicsClientId=self.physics_client
)
marker_a = p.createMultiBody(
baseVisualShapeIndex=sphere_a,
basePosition=point_a_pos,
physicsClientId=self.physics_client
)
self.point_markers.append(marker_a)
# Marker for point B - Red sphere
sphere_b = p.createVisualShape(
p.GEOM_SPHERE,
radius=0.05,
rgbaColor=[1, 0, 0, 0.8], # Red
physicsClientId=self.physics_client
)
marker_b = p.createMultiBody(
baseVisualShapeIndex=sphere_b,
basePosition=point_b_pos,
physicsClientId=self.physics_client
)
self.point_markers.append(marker_b)
# Add text labels
p.addUserDebugText(
"OBJ", object_pos,
textColorRGB=[1, 0.5, 0],
textSize=1.5,
physicsClientId=self.physics_client
)
p.addUserDebugText(
"A", point_a_pos,
textColorRGB=[1, 1, 0],
textSize=1.5,
physicsClientId=self.physics_client
)
p.addUserDebugText(
"B", point_b_pos,
textColorRGB=[1, 0, 0],
textSize=1.5,
physicsClientId=self.physics_client
)
# 可选:不恢复任何机器人状态(可视化不改状态)
except Exception as e:

View File

@ -86,7 +86,7 @@ class AdaptiveParameters:
class AIRRTStarPlanner:
"""AI增强的RRT*路径规划器"""
def __init__(self, arm_controller, environment, config_loader):
def __init__(self, arm_controller, environment, config_loader, collision_checker=None):
"""
初始化AI RRT*规划器
@ -94,6 +94,7 @@ class AIRRTStarPlanner:
arm_controller: 机械臂控制器实例
environment: 环境管理实例
config_loader: 配置加载器
collision_checker: 碰撞检测器实例可选如果不提供则创建新的
"""
self.arm_controller = arm_controller
self.environment = environment
@ -110,7 +111,10 @@ class AIRRTStarPlanner:
self.enforce_hole = self.rrt_config.get('enforce_hole_crossing', False)
# 初始化组件
self.collision_checker = CollisionChecker(arm_controller, environment, config_loader)
if collision_checker is None:
self.collision_checker = CollisionChecker(arm_controller, environment, config_loader)
else:
self.collision_checker = collision_checker
self.hole_strategy = HoleCrossingStrategy(arm_controller, environment, config_loader)
# 获取关节限位

View File

@ -32,6 +32,9 @@ class CollisionChecker:
self.robot_loader = arm_controller.robot_loader
self.robot_id = self.robot_loader.get_robot_id()
self.physics_client = self.robot_loader.physics_client
# 碰撞检测时是否忽略运输物体
self.ignore_transport_object = False
def _iter_obstacles(self):
"""遍历需要考虑的环境障碍物 body ids墙体各分段、可选 transport 物体)。"""
@ -41,8 +44,9 @@ class CollisionChecker:
if wid is not None:
yield wid
# 可选地加入运输物体(若要把它也视为障碍)
if getattr(self.environment, 'transport_object_id', None) is not None:
yield self.environment.transport_object_id
transport_id = getattr(self.environment, 'transport_object_id', None)
if not self.ignore_transport_object and transport_id is not None:
yield transport_id
def check_collision(self, joint_config: List[float]) -> bool:
"""检测给定关节配置是否发生碰撞(含安全边界)"""

View File

@ -198,12 +198,18 @@ class PathOptimizer:
point = (1 - alpha) * np.array(path[idx-1]) + alpha * np.array(path[idx])
# 验证点的有效性
if not collision_checker.check_collision(point.tolist()):
smoothed_path.append(point.tolist())
# 确保 point 是列表格式
if isinstance(point, np.ndarray):
point = point.tolist()
elif not isinstance(point, list):
point = list(point)
if not collision_checker.check_collision(point):
smoothed_path.append(point)
# 如果平滑失败,返回原路径
# 验证平滑结果
if len(smoothed_path) < 2:
return path
raise RuntimeError(f"Path smoothing failed: only {len(smoothed_path)} valid points generated")
return smoothed_path