RoboticArmTest/src/gui/main_window.py
sladro 91a6821c0c fix: 修复录制回放时路径可视化AttributeError
- 修复play_recording方法中错误调用clear_all_paths()的问题
- path_visualization是列表类型,应使用遍历方式清除PyBullet debug线条
- 更新CLAUDE.md文档记录此次修复

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-14 16:36:06 +08:00

1195 lines
49 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import tkinter as tk
from tkinter import ttk, messagebox, scrolledtext, filedialog
import pybullet as p
import pybullet_data
import threading
import time
import numpy as np
import traceback
from typing import Dict, Any
import sys
import os
# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from src.config_loader import ConfigLoader
from src.robot.arm_controller import create_arm_controller
from src.simulation.environment import Environment
from src.gui.config_window import ConfigWindow
from src.planning.ai_rrt_star import AIRRTStarPlanner
from src.planning.collision_checker import CollisionChecker
from src.planning.path_optimizer import PathOptimizer
from src.planning.path_executor import PathExecutor
from src.simulation.recording import SimulationRecorder
from src.simulation.playback import SimulationPlayer
# GUI设置
DEFAULT_WINDOW_SIZE = "1200x800"
# 按钮文本常量
EXECUTE_BUTTON_TEXT = "执行三阶段任务"
EXECUTE_BUTTON_BG = "#00BCD4"
# 相机默认设置
DEFAULT_CAMERA_DISTANCE = 4.0
DEFAULT_CAMERA_YAW = 45
DEFAULT_CAMERA_PITCH = -30
DEFAULT_CAMERA_TARGET = [0, 0, 0]
# 路径执行参数
PATH_EXECUTION_STEPS = 30 # 每个路径点的仿真步数(增加以提高精度)
PATH_EXECUTION_DELAY = 0.03 # 每个路径点的延时(秒)
KEYPOINT_PAUSE_TIME = 1.0 # 关键点停留时间(秒)
PATH_COLOR = [0.2, 0.5, 0.2] # 路径颜色(暗绿色)
# 物体检测和抓取参数
TASK_MARKER_RADIUS = 0.05 # 任务标记球体半径(米)
OBJECT_ATTACHMENT_OFFSET = 0.05 # 物体附着时的偏移量(米)- 物体在末端下方5cm
class MainWindow:
def __init__(self):
self.root = tk.Tk()
self.root.title("Robotic Arm Simulation - Feasibility Test")
# Get window dimensions from config or use defaults
self._setup_window_geometry()
# Simulation components
self.physics_client = None
self.config_loader = None
self.arm_controller = None
self.environment = None
self.config_window = None
# Path planning components
self.path_planner = None
self.collision_checker = None
self.path_optimizer = None
self.path_executor = None
# Recording and playback components
self.recorder = SimulationRecorder()
self.player = SimulationPlayer()
# Simulation state
self.simulation_running = False
self.simulation_thread = None
# Create GUI
self._create_control_panel()
self._create_log_panel()
self._create_bottom_panel()
# Initialize simulation
self.initialize_simulation()
def _setup_window_geometry(self):
"""Setup window geometry from defaults"""
self.root.geometry(DEFAULT_WINDOW_SIZE)
def _create_control_panel(self):
"""Create control panel on the left side"""
# Create main frame
self.main_frame = tk.Frame(self.root)
self.main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# Left panel - Control panel
left_panel = tk.Frame(self.main_frame, width=300)
left_panel.pack(side=tk.LEFT, fill=tk.Y, padx=(0, 10))
# Title
title_label = tk.Label(left_panel, text="Control Panel",
font=("Arial", 14, "bold"))
title_label.pack(pady=(0, 10))
# Configuration section
config_frame = tk.LabelFrame(left_panel, text="Configuration", padx=10, pady=10)
config_frame.pack(fill=tk.X, pady=(0, 10))
self.config_btn = tk.Button(config_frame, text="Open Config Manager",
command=self.open_config_window,
bg="#2196F3", fg="white", padx=10, pady=5)
self.config_btn.pack(fill=tk.X)
self.reload_btn = tk.Button(config_frame, text="Reload Config",
command=self.reload_configuration,
padx=10, pady=5)
self.reload_btn.pack(fill=tk.X, pady=(5, 0))
# Simulation control section
control_frame = tk.LabelFrame(left_panel, text="Simulation Control", padx=10, pady=10)
control_frame.pack(fill=tk.X, pady=(0, 10))
self.start_btn = tk.Button(control_frame, text="Start Simulation",
command=self.start_simulation,
bg="#4CAF50", fg="white", padx=10, pady=5)
self.start_btn.pack(fill=tk.X)
self.pause_btn = tk.Button(control_frame, text="Pause Simulation",
command=self.pause_simulation,
state=tk.DISABLED, padx=10, pady=5)
self.pause_btn.pack(fill=tk.X, pady=(5, 0))
self.reset_btn = tk.Button(control_frame, text="Reset Environment",
command=self.reset_environment,
bg="#FF9800", fg="white", padx=10, pady=5)
self.reset_btn.pack(fill=tk.X, pady=(5, 0))
# Reachability test section
test_frame = tk.LabelFrame(left_panel, text="Reachability Test", padx=10, pady=10)
test_frame.pack(fill=tk.X, pady=(0, 10))
self.test_btn = tk.Button(test_frame, text="Test Reachability",
command=self.test_reachability,
bg="#9C27B0", fg="white", padx=10, pady=5)
self.test_btn.pack(fill=tk.X)
tk.Label(test_frame, text="Check if A and B points are reachable",
font=("Arial", 9)).pack(pady=(5, 0))
# Path planning section
planning_frame = tk.LabelFrame(left_panel, text="Path Planning", padx=10, pady=10)
planning_frame.pack(fill=tk.X, pady=(0, 10))
self.execute_btn = tk.Button(planning_frame, text=EXECUTE_BUTTON_TEXT,
command=self.execute_three_stages,
bg=EXECUTE_BUTTON_BG, fg="white", padx=10, pady=5)
self.execute_btn.pack(fill=tk.X)
tk.Label(planning_frame, text="Execute three-stage path: Current→Object→A→B",
font=("Arial", 9)).pack(pady=(5, 0))
# Recording and Playback section
recording_frame = tk.LabelFrame(left_panel, text="Recording & Playback", padx=10, pady=10)
recording_frame.pack(fill=tk.X, pady=(0, 10))
# Recording controls
recording_controls = tk.Frame(recording_frame)
recording_controls.pack(fill=tk.X, pady=(0, 5))
self.record_btn = tk.Button(recording_controls, text="Start Recording",
command=self.toggle_recording,
bg="#E91E63", fg="white", padx=10, pady=5)
self.record_btn.pack(fill=tk.X)
# Playback controls
playback_controls = tk.Frame(recording_frame)
playback_controls.pack(fill=tk.X, pady=(5, 0))
self.load_btn = tk.Button(playback_controls, text="Load Recording",
command=self.load_recording_file,
padx=10, pady=5)
self.load_btn.pack(fill=tk.X, pady=(0, 2))
playback_buttons = tk.Frame(playback_controls)
playback_buttons.pack(fill=tk.X)
self.play_btn = tk.Button(playback_buttons, text="Play",
command=self.play_recording,
state=tk.DISABLED, width=8)
self.play_btn.pack(side=tk.LEFT, padx=(0, 2), fill=tk.X, expand=True)
self.pause_play_btn = tk.Button(playback_buttons, text="Pause",
command=self.pause_recording,
state=tk.DISABLED, width=8)
self.pause_play_btn.pack(side=tk.LEFT, padx=(2, 0), fill=tk.X, expand=True)
# Status section
status_frame = tk.LabelFrame(left_panel, text="Status", padx=10, pady=10)
status_frame.pack(fill=tk.X, pady=(0, 10))
# Robot status
tk.Label(status_frame, text="Robot:", font=("Arial", 9, "bold")).grid(row=0, column=0, sticky="w")
self.robot_status = tk.Label(status_frame, text="Not Loaded", fg="red")
self.robot_status.grid(row=0, column=1, sticky="w", padx=(5, 0))
# Environment status
tk.Label(status_frame, text="Environment:", font=("Arial", 9, "bold")).grid(row=1, column=0, sticky="w")
self.env_status = tk.Label(status_frame, text="Not Loaded", fg="red")
self.env_status.grid(row=1, column=1, sticky="w", padx=(5, 0))
# Simulation status
tk.Label(status_frame, text="Simulation:", font=("Arial", 9, "bold")).grid(row=2, column=0, sticky="w")
self.sim_status = tk.Label(status_frame, text="Stopped", fg="orange")
self.sim_status.grid(row=2, column=1, sticky="w", padx=(5, 0))
def _create_log_panel(self):
"""Create log panel with copyable text"""
from tkinter import scrolledtext
# Right panel - Log display
right_panel = tk.Frame(self.main_frame)
right_panel.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
# Log display frame
log_label = tk.Label(right_panel, text="System Log", font=("Arial", 12, "bold"))
log_label.pack(pady=(0, 5))
# Scrollable text widget for logs (can select and copy)
self.log_text = scrolledtext.ScrolledText(right_panel, height=25, wrap=tk.WORD)
self.log_text.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# Make it read-only initially but still selectable
self.log_text.config(state=tk.DISABLED)
def _create_bottom_panel(self):
"""Create bottom panel with viewer info and exit button"""
bottom_frame = tk.Frame(self.root)
bottom_frame.pack(fill=tk.X, padx=10, pady=(0, 10))
tk.Label(bottom_frame, text="PyBullet Viewer:", font=("Arial", 9, "bold")).pack(side=tk.LEFT, padx=(0, 5))
tk.Label(bottom_frame, text="Use external PyBullet GUI window for 3D visualization",
fg="gray").pack(side=tk.LEFT)
# Exit button
exit_btn = tk.Button(bottom_frame, text="Exit", command=self.on_closing,
bg="#f44336", fg="white", padx=20)
exit_btn.pack(side=tk.RIGHT)
def initialize_simulation(self):
"""Initialize PyBullet simulation"""
try:
# Connect to PyBullet in GUI mode
self.physics_client = p.connect(p.GUI)
p.setAdditionalSearchPath(pybullet_data.getDataPath())
# Configure visualization
p.configureDebugVisualizer(p.COV_ENABLE_GUI, 1)
p.configureDebugVisualizer(p.COV_ENABLE_SHADOWS, 1)
p.configureDebugVisualizer(p.COV_ENABLE_RENDERING, 1)
# Set camera from config or defaults
self._reset_camera_view()
# Load configuration
self.config_loader = ConfigLoader()
# Setup environment
self.environment = Environment(self.config_loader, self.physics_client)
self.environment.setup_environment()
self.env_status.config(text="Loaded", fg="green")
# Initialize arm controller
self.arm_controller = create_arm_controller(self.config_loader, self.physics_client)
self.robot_status.config(text="Loaded", fg="green")
# Initialize path planning components
self._initialize_planning_components()
self.update_status("System ready")
except Exception as e:
self.update_status(f"Initialization failed: {e}")
messagebox.showerror("Initialization Error", f"Failed to initialize simulation:\n{e}")
def open_config_window(self):
"""Open configuration management window"""
if not self.config_window:
self.config_window = ConfigWindow(
self.root,
on_apply_callback=self.on_config_applied
)
self.config_window.show()
def on_config_applied(self, new_config: Dict[str, Any]):
"""Handle configuration changes from config window"""
self.update_status("Applying configuration...")
try:
# Clear current environment
if self.environment:
self.environment.cleanup()
# Clear current robot
if self.arm_controller and hasattr(self.arm_controller, 'robot_loader') and self.arm_controller.robot_loader:
if self.arm_controller.robot_loader.robot_id is not None:
try:
p.removeBody(self.arm_controller.robot_loader.robot_id, physicsClientId=self.physics_client)
except:
pass # Robot might already be removed
# Create new ConfigLoader with updated configuration
self.config_loader = ConfigLoader()
# Update the config loader's internal config directly
self.config_loader._config = new_config
# Recreate environment
self.environment = Environment(self.config_loader, self.physics_client)
self.environment.setup_environment()
# Recreate arm controller
self.arm_controller = create_arm_controller(self.config_loader, self.physics_client)
# Reinitialize path planning components with new arm controller
self._initialize_planning_components()
self.update_status("Configuration applied")
except Exception as e:
self.update_status(f"Configuration error: {e}")
messagebox.showerror("Configuration Error", f"Failed to apply configuration:\n{e}")
def reload_configuration(self):
"""Reload configuration from file"""
try:
self.config_loader = ConfigLoader()
self.on_config_applied(self.config_loader.get_full_config())
self.update_status("Configuration reloaded")
except Exception as e:
self.update_status(f"Reload failed: {e}")
def start_simulation(self):
"""Start simulation loop"""
if not self.simulation_running:
self.simulation_running = True
self.sim_status.config(text="Running", fg="green")
self.start_btn.config(state=tk.DISABLED)
self.pause_btn.config(state=tk.NORMAL)
# Start simulation thread
self.simulation_thread = threading.Thread(target=self.simulation_loop, daemon=True)
self.simulation_thread.start()
self.update_status("Simulation running")
def pause_simulation(self):
"""Pause simulation loop"""
if self.simulation_running:
self.simulation_running = False
self.sim_status.config(text="Paused", fg="orange")
self.start_btn.config(state=tk.NORMAL)
self.pause_btn.config(state=tk.DISABLED)
self.update_status("Simulation paused")
def simulation_loop(self):
"""Main simulation loop (runs in separate thread)"""
while self.simulation_running:
try:
p.stepSimulation(physicsClientId=self.physics_client)
# Get timestep from config
timestep = self.config_loader.get_full_config().get('simulation', {}).get('timestep')
time.sleep(timestep)
except Exception as e:
self.update_status(f"Simulation error: {e}")
self.simulation_running = False
break
def reset_environment(self):
"""Reset environment to initial state"""
try:
# Reset robot to home position
if self.arm_controller and self.arm_controller.is_initialized:
self.arm_controller.reset_to_home_position()
# Reset environment objects
if self.environment:
self.environment.reset_environment()
# Clear path visualization
if hasattr(self, 'path_visualization'):
for line_id in self.path_visualization:
p.removeUserDebugItem(line_id)
self.path_visualization = []
# Clear point markers
if hasattr(self, 'point_markers'):
for marker_id in self.point_markers:
p.removeBody(marker_id)
self.point_markers = []
# Reset camera
self._reset_camera_view()
self.update_status("Environment reset")
except Exception as e:
self.update_status(f"Reset failed: {e}")
def test_reachability(self):
"""Test reachability of task points"""
self.update_status("Testing reachability...")
try:
# Get task points
task_points = self.config_loader.get_task_points()
point_a = task_points['point_A']['position']
point_b = task_points['point_B']['position']
# Get transport object position
transport_object = self.config_loader.get_transport_object_config()
object_position = transport_object['initial_position']
# Perform reachability check - now testing three points
self._check_reachability(object_position, point_a, point_b)
except Exception as e:
error_msg = f"Reachability test failed: {str(e)}"
self.update_status(error_msg)
print(f"ERROR: {error_msg}")
print("Full traceback:")
traceback.print_exc()
def _check_reachability(self, object_position, point_a, point_b):
"""Check if points are reachable by the robot using KDL kinematics"""
try:
# Use ArmController's precise reachability checking with KDL
# This will call KDL inverse kinematics and expose any problems
object_reachable = self.arm_controller.check_workspace_reachability(object_position)
a_reachable = self.arm_controller.check_workspace_reachability(point_a)
b_reachable = self.arm_controller.check_workspace_reachability(point_b)
# Get detailed results
results = []
if object_reachable:
results.append("Object: Reachable")
else:
results.append("Object: Not reachable")
if a_reachable:
results.append("Point A: Reachable")
else:
results.append("Point A: Not reachable")
if b_reachable:
results.append("Point B: Reachable")
else:
results.append("Point B: Not reachable")
# Update status based on results
if object_reachable and a_reachable and b_reachable:
self.update_status("All three points are reachable")
else:
self.update_status(f"Reachability: {', '.join(results)}")
except Exception as e:
error_msg = f"Reachability check failed: {str(e)}"
self.update_status(error_msg)
print(f"ERROR in _check_reachability: {error_msg}")
print("Full traceback:")
traceback.print_exc()
def _calculate_distance(self, point):
"""Calculate Euclidean distance from origin"""
return (point[0]**2 + point[1]**2 + point[2]**2) ** 0.5
def _initialize_planning_components(self):
"""Initialize path planning components"""
try:
self.collision_checker = CollisionChecker(
self.arm_controller,
self.environment,
self.config_loader
)
self.path_planner = AIRRTStarPlanner(
self.arm_controller,
self.environment,
self.config_loader,
self.collision_checker # Pass the shared collision checker
)
self.path_optimizer = PathOptimizer(
self.arm_controller,
self.config_loader
)
self.path_executor = PathExecutor(
self.arm_controller,
self.config_loader
)
self.update_status("Path planning components initialized")
except Exception as e:
self.update_status(f"Failed to initialize planning components: {e}")
def _print_debug_coordinates(self, stage_name):
"""打印KDL、PyBullet和真实机械臂末端坐标"""
print(f"\n=== {stage_name} 末端坐标调试信息 ===")
# 获取当前关节角度
current_joints = self.arm_controller.get_current_joint_positions()
print(f"当前关节角度: {[f'{j:.4f}' for j in current_joints]}")
# KDL正运动学计算
kdl_result = self.arm_controller.forward_kinematics(current_joints)
kdl_position = kdl_result[0] if kdl_result else [0, 0, 0]
print(f"KDL计算末端坐标: [{kdl_position[0]:.4f}, {kdl_position[1]:.4f}, {kdl_position[2]:.4f}]")
# PyBullet仿真中的实际末端坐标
end_link_state = p.getLinkState(self.arm_controller.robot_loader.robot_id,
self.arm_controller.robot_loader.end_effector_index,
physicsClientId=self.arm_controller.physics_client)
pybullet_position = list(end_link_state[0])
print(f"PyBullet末端坐标: [{pybullet_position[0]:.4f}, {pybullet_position[1]:.4f}, {pybullet_position[2]:.4f}]")
# 真实机械臂末端坐标与PyBullet相同因为这是仿真环境
print(f"真实机械臂末端坐标: [{pybullet_position[0]:.4f}, {pybullet_position[1]:.4f}, {pybullet_position[2]:.4f}]")
# 计算误差(如果可能的话)
try:
if hasattr(kdl_position, '__len__') and len(kdl_position) >= 3:
kdl_error = ((kdl_position[0] - pybullet_position[0])**2 +
(kdl_position[1] - pybullet_position[1])**2 +
(kdl_position[2] - pybullet_position[2])**2)**0.5
print(f"KDL与PyBullet误差: {kdl_error:.6f}m")
else:
print("无法计算误差KDL返回格式不匹配")
except Exception as e:
print(f"误差计算失败: {e}")
print("=" * 50)
def execute_three_stages(self):
"""执行完整路径当前位置→物体→A点→B点"""
self.update_status("Starting full path execution...")
# 暂停仿真以避免规划期间的副作用
was_running = self.simulation_running
if was_running:
self.pause_simulation()
try:
# 获取配置中的任务点和物体位置
task_points = self.config_loader.get_task_points()
point_a = task_points['point_A']['position']
point_b = task_points['point_B']['position']
transport_object_config = self.config_loader.get_full_config()['transport_object']
object_position = transport_object_config['initial_position']
# 获取位置容差配置
position_tolerance = self.config_loader.get_full_config()['path_planning']['execution']['position_tolerance']
# 清除旧的可视化
self._clear_visualizations()
self.update_status("Planning complete path: Current → Object → A → B...")
# 保存当前机器人状态
saved_joint_positions = self.arm_controller.get_current_joint_positions()
# 计算各点的逆运动学
config_object = self.arm_controller.inverse_kinematics(object_position, seed_angles=saved_joint_positions)
if config_object is None:
raise RuntimeError("Cannot find joint configuration for object position")
config_a = self.arm_controller.inverse_kinematics(point_a, seed_angles=config_object)
if config_a is None:
raise RuntimeError("Cannot find joint configuration for point A")
config_b = self.arm_controller.inverse_kinematics(point_b, seed_angles=config_a)
if config_b is None:
raise RuntimeError("Cannot find joint configuration for point B")
# 立即恢复机器人状态
self.arm_controller.set_joint_positions(saved_joint_positions)
# 规划完整路径(忽略物体碰撞,因为要抓取它)
self.collision_checker.ignore_transport_object = True
# 分段规划然后合并(允许部分路径失败)
try:
path1 = self.path_planner.plan_auto(saved_joint_positions, config_object)
except Exception as e:
self.update_status(f"Warning: Cannot plan path to object: {e}")
# 检查碰撞对
has_collision, collision_pairs = self.collision_checker.check_collision_detailed(config_object)
if has_collision and collision_pairs:
self._visualize_collision_points(collision_pairs)
path1 = [saved_joint_positions, config_object] # 直接尝试移动
try:
path2 = self.path_planner.plan_auto(config_object, config_a)
except Exception as e:
self.update_status(f"Warning: Cannot plan path to A: {e}")
# 检查碰撞对
has_collision, collision_pairs = self.collision_checker.check_collision_detailed(config_a)
if has_collision and collision_pairs:
self._visualize_collision_points(collision_pairs)
path2 = [config_object, config_a] # 直接尝试移动
try:
path3 = self.path_planner.plan_auto(config_a, config_b)
except Exception as e:
self.update_status(f"Warning: Cannot plan path to B: {e}")
# 检查碰撞对
has_collision, collision_pairs = self.collision_checker.check_collision_detailed(config_b)
if has_collision and collision_pairs:
self._visualize_collision_points(collision_pairs)
path3 = [config_a, config_b] # 直接尝试移动
# 合并路径(去除重复点)
full_path = path1[:-1] + path2[:-1] + path3
# 优化完整路径
optimized_path = self.path_optimizer.optimize_path(full_path, self.collision_checker)
# 可视化完整路径
self._visualize_path_segment(optimized_path, PATH_COLOR, "Complete Path")
# 记录规划路径到录制数据(如果正在录制)
if self.recorder.is_recording:
self.recorder.record_planned_path(optimized_path)
# 添加任务点标记
self._add_point_markers(object_position, point_a, point_b)
# 执行完整路径
self.update_status("Executing complete path...")
# 记录关键点在路径中的位置
object_idx = None
a_idx = None
# 找到最接近物体和A点的路径索引
for i, config in enumerate(optimized_path):
pos, _ = self.arm_controller.forward_kinematics(config)
# 检查是否接近物体位置
if object_idx is None:
dist_to_object = ((pos[0] - object_position[0])**2 +
(pos[1] - object_position[1])**2 +
(pos[2] - object_position[2])**2)**0.5
if dist_to_object < position_tolerance:
object_idx = i
# 检查是否接近A点
if a_idx is None:
dist_to_a = ((pos[0] - point_a[0])**2 +
(pos[1] - point_a[1])**2 +
(pos[2] - point_a[2])**2)**0.5
if dist_to_a < position_tolerance:
a_idx = i
# 执行路径,在关键点停留
object_attached = False # 标记物体是否已吸附
for i, config in enumerate(optimized_path):
# 直接尝试移动,物理引擎会自动处理碰撞
self.arm_controller.set_joint_positions(config)
# 执行仿真步进
for _ in range(PATH_EXECUTION_STEPS):
p.stepSimulation(physicsClientId=self.arm_controller.physics_client)
# 如果物体已吸附,让它跟随末端执行器
if object_attached and self.environment.transport_object_id is not None:
# 获取末端执行器位置
end_effector_state = p.getLinkState(
self.arm_controller.robot_loader.robot_id,
self.arm_controller.robot_loader.end_effector_index,
physicsClientId=self.physics_client
)
# 更新物体位置(稍微偏移以模拟抓取)
object_pos = list(end_effector_state[0])
object_pos[2] -= OBJECT_ATTACHMENT_OFFSET
p.resetBasePositionAndOrientation(
self.environment.transport_object_id,
object_pos,
[0, 0, 0, 1],
physicsClientId=self.physics_client
)
time.sleep(PATH_EXECUTION_DELAY)
# 记录当前帧数据(如果正在录制)
self._integrate_recording_into_execution()
# 在物体位置停留并"吸附"
if i == object_idx and not object_attached:
self.update_status("Reached object, attaching...")
object_attached = True
time.sleep(KEYPOINT_PAUSE_TIME)
# 在A点停留
elif i == a_idx:
self.update_status("Reached point A with object...")
time.sleep(KEYPOINT_PAUSE_TIME)
# 在B点最后释放
elif i == len(optimized_path) - 1:
self.update_status("Reached point B, releasing object...")
object_attached = False
# 将物体放置在B点位置
if self.environment.transport_object_id is not None:
p.resetBasePositionAndOrientation(
self.environment.transport_object_id,
point_b,
[0, 0, 0, 1],
physicsClientId=self.physics_client
)
time.sleep(KEYPOINT_PAUSE_TIME)
# 显示最终位置和误差
final_joints = self.arm_controller.get_current_joint_positions()
final_pos, _ = self.arm_controller.forward_kinematics(final_joints)
# 计算误差
error = ((final_pos[0] - point_b[0])**2 +
(final_pos[1] - point_b[1])**2 +
(final_pos[2] - point_b[2])**2)**0.5
# 控制台输出
print(f"\n=== 执行完成 ===")
print(f"最终到达位置: [{final_pos[0]:.3f}, {final_pos[1]:.3f}, {final_pos[2]:.3f}]")
print(f"目标B点位置: [{point_b[0]:.3f}, {point_b[1]:.3f}, {point_b[2]:.3f}]")
print(f"执行误差: {error*100:.1f}cm")
# GUI日志输出详细信息
self.update_status(f"Path execution completed! Final error: {error*100:.1f}cm")
self.update_status(f"Target B: [{point_b[0]:.3f}, {point_b[1]:.3f}, {point_b[2]:.3f}]")
self.update_status(f"Actual: [{final_pos[0]:.3f}, {final_pos[1]:.3f}, {final_pos[2]:.3f}]")
# 检查误差是否超过容差
tolerance = self.config_loader.get_full_config()['path_planning']['execution']['position_tolerance']
if error > tolerance:
self.update_status(f"WARNING: Error {error*100:.1f}cm exceeds tolerance {tolerance*100:.1f}cm")
except Exception as e:
self.update_status(f"Execution error: {str(e)}")
print("Full traceback:")
traceback.print_exc()
finally:
# 恢复碰撞检测
self.collision_checker.ignore_transport_object = False
# 恢复仿真运行状态
if was_running:
self.start_simulation()
def _visualize_path_segment(self, path, color, label):
"""可视化单个路径段 - 使用简单的线段连接路径点"""
if not hasattr(self, 'path_visualization'):
self.path_visualization = []
# 直接使用路径点的末端位置来绘制,避免移动机械臂
# 使用PyBullet IK来获取每个配置的末端位置
positions = []
for config in path:
# 使用PyBullet的正向运动学计算末端位置
# 通过IK的副作用获取末端位置而不实际移动机械臂
# 使用calculateInverseKinematics的restPoses参数来设置关节配置
# 获取当前末端执行器的目标位置
# 这里我们使用一个技巧用IK求解当前配置对应的末端位置
# 首先保存当前状态
saved_states = []
for j in range(len(config)):
state = p.getJointState(
self.arm_controller.robot_loader.robot_id,
j,
physicsClientId=self.physics_client
)
saved_states.append(state[0])
# 临时设置关节状态来计算末端位置
for j, angle in enumerate(config):
p.resetJointState(
self.arm_controller.robot_loader.robot_id,
j,
angle,
targetVelocity=0,
physicsClientId=self.physics_client
)
# 获取末端位置
link_state = p.getLinkState(
self.arm_controller.robot_loader.robot_id,
self.arm_controller.robot_loader.end_effector_index,
physicsClientId=self.physics_client
)
positions.append(list(link_state[0]))
# 立即恢复原始状态
for j, angle in enumerate(saved_states):
p.resetJointState(
self.arm_controller.robot_loader.robot_id,
j,
angle,
targetVelocity=0,
physicsClientId=self.physics_client
)
# 绘制路径线段
for i in range(len(positions) - 1):
line_id = p.addUserDebugLine(
positions[i],
positions[i + 1],
lineColorRGB=color,
lineWidth=3,
physicsClientId=self.physics_client
)
self.path_visualization.append(line_id)
self.update_status(f"Visualized {label} path: {len(path)} waypoints")
def _visualize_collision_points(self, collision_pairs):
"""将碰撞的物体双方显示为红色"""
if not hasattr(self, 'original_colors'):
self.original_colors = {}
# 获取机器人ID
robot_id = self.arm_controller.robot_loader.robot_id
# 遍历碰撞对
for robot_link, obstacle_id, obstacle_link in collision_pairs:
# 标记机器人link为红色
if robot_link >= 0:
# 保存原始颜色
key = (robot_id, robot_link)
if key not in self.original_colors:
visual_data = p.getVisualShapeData(robot_id, physicsClientId=self.physics_client)
for data in visual_data:
if data[1] == robot_link: # data[1]是linkIndex
self.original_colors[key] = data[7] # data[7]是rgbaColor
break
# 设置为红色
p.changeVisualShape(
robot_id,
robot_link,
rgbaColor=[1, 0, 0, 1], # 红色
physicsClientId=self.physics_client
)
# 标记障碍物为红色
if obstacle_id >= 0:
# 保存原始颜色
key = (obstacle_id, obstacle_link)
if key not in self.original_colors:
visual_data = p.getVisualShapeData(obstacle_id, physicsClientId=self.physics_client)
if visual_data:
self.original_colors[key] = visual_data[0][7] # 保存基体颜色
# 设置为红色
if obstacle_link < 0: # 基体
p.changeVisualShape(
obstacle_id,
-1,
rgbaColor=[1, 0, 0, 1], # 红色
physicsClientId=self.physics_client
)
else: # 特定link
p.changeVisualShape(
obstacle_id,
obstacle_link,
rgbaColor=[1, 0, 0, 1], # 红色
physicsClientId=self.physics_client
)
self.update_status(f"Highlighted {len(collision_pairs)} collision pairs in red")
def _clear_visualizations(self):
"""清除所有可视化元素"""
# 清除路径线条
if hasattr(self, 'path_visualization'):
for line_id in self.path_visualization:
p.removeUserDebugItem(line_id)
self.path_visualization = []
# 清除点标记
if hasattr(self, 'point_markers'):
for marker_id in self.point_markers:
p.removeBody(marker_id)
self.point_markers = []
# 恢复原始颜色
if hasattr(self, 'original_colors'):
for (body_id, link_index), color in self.original_colors.items():
try:
p.changeVisualShape(
body_id,
link_index,
rgbaColor=color,
physicsClientId=self.physics_client
)
except:
pass # 物体可能已被删除
self.original_colors = {}
def _add_point_markers(self, object_pos, point_a_pos, point_b_pos):
"""添加任务点标记"""
if not hasattr(self, 'point_markers'):
self.point_markers = []
# 物体标记 - 橙色
sphere_obj = p.createVisualShape(
p.GEOM_SPHERE,
radius=TASK_MARKER_RADIUS,
rgbaColor=[1, 0.5, 0, 0.8],
physicsClientId=self.physics_client
)
marker_obj = p.createMultiBody(
baseVisualShapeIndex=sphere_obj,
basePosition=object_pos,
physicsClientId=self.physics_client
)
self.point_markers.append(marker_obj)
# A点标记 - 黄色
sphere_a = p.createVisualShape(
p.GEOM_SPHERE,
radius=TASK_MARKER_RADIUS,
rgbaColor=[1, 1, 0, 0.8],
physicsClientId=self.physics_client
)
marker_a = p.createMultiBody(
baseVisualShapeIndex=sphere_a,
basePosition=point_a_pos,
physicsClientId=self.physics_client
)
self.point_markers.append(marker_a)
# B点标记 - 红色
sphere_b = p.createVisualShape(
p.GEOM_SPHERE,
radius=TASK_MARKER_RADIUS,
rgbaColor=[1, 0, 0, 0.8],
physicsClientId=self.physics_client
)
marker_b = p.createMultiBody(
baseVisualShapeIndex=sphere_b,
basePosition=point_b_pos,
physicsClientId=self.physics_client
)
self.point_markers.append(marker_b)
# 添加文本标签
p.addUserDebugText("OBJ", object_pos, textColorRGB=[1, 0.5, 0], textSize=1.5, physicsClientId=self.physics_client)
p.addUserDebugText("A", point_a_pos, textColorRGB=[1, 1, 0], textSize=1.5, physicsClientId=self.physics_client)
p.addUserDebugText("B", point_b_pos, textColorRGB=[1, 0, 0], textSize=1.5, physicsClientId=self.physics_client)
def _reset_camera_view(self):
"""Reset camera to default view"""
try:
# 使用文件内常量作为默认相机设置
distance = DEFAULT_CAMERA_DISTANCE
yaw = DEFAULT_CAMERA_YAW
pitch = DEFAULT_CAMERA_PITCH
target = DEFAULT_CAMERA_TARGET
p.resetDebugVisualizerCamera(
cameraDistance=distance,
cameraYaw=yaw,
cameraPitch=pitch,
cameraTargetPosition=target,
physicsClientId=self.physics_client
)
except Exception as e:
# Re-raise exception for proper error handling
raise Exception(f"Failed to reset camera view: {e}")
def update_status(self, message: str):
"""Update status display with copyable text"""
if hasattr(self, 'log_text'):
# Enable text widget to add content
self.log_text.config(state=tk.NORMAL)
# Add timestamp and message
timestamp = time.strftime("%H:%M:%S")
self.log_text.insert(tk.END, f"[{timestamp}] {message}\n")
# Auto-scroll to bottom
self.log_text.see(tk.END)
# Make read-only again but still selectable
self.log_text.config(state=tk.DISABLED)
def toggle_recording(self):
"""切换录制状态"""
if self.recorder.is_recording:
self.stop_recording()
else:
self.start_recording()
def start_recording(self):
"""开始录制"""
try:
config = self.config_loader.get_full_config()
task_points = self.config_loader.get_task_points()
self.recorder.start_recording(config, task_points)
self.record_btn.config(text="Stop Recording", bg="#FF5722")
self.update_status("录制已开始")
except Exception as e:
messagebox.showerror("录制错误", f"无法开始录制: {str(e)}")
def stop_recording(self):
"""停止录制"""
try:
recording_data = self.recorder.stop_recording()
if recording_data:
# 自动保存录制文件
filename = self.recorder.save_to_file()
self.update_status(f"录制已停止,保存为: {filename}")
else:
self.update_status("录制停止,无数据保存")
self.record_btn.config(text="Start Recording", bg="#E91E63")
except Exception as e:
messagebox.showerror("保存错误", f"无法保存录制文件: {str(e)}")
def load_recording_file(self):
"""加载录制文件"""
filename = filedialog.askopenfilename(
title="选择录制文件",
filetypes=[("JSON files", "*.json"), ("All files", "*.*")]
)
if filename:
if self.player.load_recording(filename):
self.play_btn.config(state=tk.NORMAL)
self.pause_play_btn.config(state=tk.NORMAL)
metadata = self.player.get_recording_metadata()
timestamp = metadata.get('timestamp', '未知时间')
duration = metadata.get('duration', 0)
self.update_status(f"录制文件已加载: {timestamp}, 时长: {duration:.1f}")
else:
messagebox.showerror("加载错误", "无法加载录制文件,请检查文件格式")
def play_recording(self):
"""开始回放录制"""
if self.player.start_playback():
self.play_btn.config(text="Stop", command=self.stop_playback)
self.pause_play_btn.config(state=tk.NORMAL)
# 显示录制的路径(如果存在)
planned_path = self.player.get_planned_path()
if planned_path:
# 清除旧的路径可视化
if hasattr(self, 'path_visualization'):
for line_id in self.path_visualization:
p.removeUserDebugItem(line_id)
self.path_visualization = []
# 显示录制的规划路径
self._visualize_path_segment(planned_path, [0.2, 0.5, 0.2], "Recorded Path")
# 重建任务点标记(从录制元数据)
metadata = self.player.get_recording_metadata()
if metadata and 'task_points' in metadata:
task_points = metadata['task_points']
object_pos = task_points.get('transport_object', {}).get('initial_position')
point_a = task_points.get('point_a')
point_b = task_points.get('point_b')
if object_pos and point_a and point_b:
self._add_point_markers(object_pos, point_a, point_b)
self.update_status("开始回放录制(已显示规划路径)")
else:
self.update_status("开始回放录制(无路径数据)")
# 开始回放循环
self._playback_loop()
else:
messagebox.showerror("回放错误", "无法开始回放,请先加载录制文件")
def pause_recording(self):
"""暂停/恢复回放"""
if self.player.is_paused:
self.player.resume_playback()
self.pause_play_btn.config(text="Pause")
self.update_status("回放已恢复")
else:
self.player.pause_playback()
self.pause_play_btn.config(text="Resume")
self.update_status("回放已暂停")
def stop_playback(self):
"""停止回放"""
self.player.stop_playback()
self.play_btn.config(text="Play", command=self.play_recording)
self.pause_play_btn.config(text="Pause", state=tk.DISABLED)
self.update_status("回放已停止")
def _playback_loop(self):
"""回放循环(在主线程中运行)"""
if not self.player.is_playing:
return
# 应用当前帧到仿真环境
try:
transport_object_id = getattr(self.environment, 'transport_object_id', None)
self.player.apply_frame_to_simulation(
self.arm_controller,
self.physics_client,
self.arm_controller.robot_loader.robot_id if self.arm_controller and self.arm_controller.robot_loader else None,
transport_object_id
)
# 执行仿真步进
p.stepSimulation(physicsClientId=self.physics_client)
except Exception as e:
self.update_status(f"回放错误: {str(e)}")
self.stop_playback()
return
# 继续下一帧
if self.player.is_playing:
self.root.after(33, self._playback_loop) # 约30fps
else:
self.stop_playback()
def _integrate_recording_into_execution(self):
"""将录制功能集成到路径执行中"""
if not self.recorder.is_recording:
return
# 记录当前帧数据
try:
joint_positions = self.arm_controller.get_current_joint_positions()
transport_object_config = self.config_loader.get_full_config()['transport_object']
object_position = transport_object_config['initial_position']
# 如果物体已被吸附,获取其当前位置
if hasattr(self, 'environment') and hasattr(self.environment, 'transport_object_id'):
if self.environment.transport_object_id is not None:
object_state = p.getBasePositionAndOrientation(
self.environment.transport_object_id,
physicsClientId=self.physics_client
)
object_position = list(object_state[0])
self.recorder.record_frame(
joint_positions,
object_position,
self.physics_client,
self.arm_controller.robot_loader.robot_id,
self.arm_controller.robot_loader.end_effector_index
)
except Exception as e:
# 静默处理录制错误,不影响主执行流程
pass
def on_closing(self):
"""Handle window closing"""
if messagebox.askokcancel("Quit", "Do you want to quit the simulation?"):
try:
# Stop simulation
self.simulation_running = False
# Cleanup
if self.environment:
self.environment.cleanup()
# Disconnect PyBullet
if self.physics_client is not None:
p.disconnect(self.physics_client)
except Exception as e:
# Log cleanup failure but still close
print(f"Cleanup error: {e}")
self.root.destroy()
def run(self):
"""Start the main window"""
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
self.root.mainloop()
if __name__ == "__main__":
# Test the main window directly
app = MainWindow()
app.run()