unity2moveit2/examples/basic_usage.py
ayuan9957 fe15edcbd5 Initial commit: Unity-MoveIt2 integrated robotic arm simulation system
- Unity frontend with ROS-TCP-Connector for ROS2 communication
- Docker-based ROS2 Jazzy backend with MoveIt2 integration
- Support for 1-9 DOF manipulators
- UR5 robot configuration and URDF files
- Assembly task feasibility analysis tools
- Comprehensive documentation and deployment guides

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-13 12:08:34 +08:00

453 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Unity-MoveIt2 系统基本使用示例
此脚本演示了如何使用Unity-MoveIt2系统进行基本的机器人控制和路径规划。
包括连接设置、关节控制、末端执行器控制、轨迹规划等功能。
作者: Unity-MoveIt2 团队
版本: 1.0.0
"""
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, DurabilityPolicy
import numpy as np
import time
from typing import List, Optional
# ROS2消息类型
from sensor_msgs.msg import JointState
from geometry_msgs.msg import Pose, PoseStamped, Point, Quaternion
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from moveit_msgs.srv import GetMotionPlan
from moveit_msgs.msg import MotionPlanRequest, Constraints, JointConstraint
# 自定义消息类型(假设已定义)
try:
from unity_msgs.msg import RobotStatus, FeasibilityResult
from unity_msgs.srv import AnalyzeFeasibility
except ImportError:
print("警告: 自定义消息类型未找到,某些功能可能不可用")
class UnityRobotController(Node):
"""Unity机器人控制器类"""
def __init__(self):
super().__init__('unity_robot_controller')
# 机器人配置
self.robot_name = "niryo_one"
self.joint_names = [
"joint_1", "joint_2", "joint_3",
"joint_4", "joint_5", "joint_6"
]
# 当前状态
self.current_joint_state = None
self.current_pose = None
self.robot_status = "IDLE"
# 初始化通信
self._setup_communication()
# 等待连接
self._wait_for_connections()
self.get_logger().info("Unity机器人控制器初始化完成")
def _setup_communication(self):
"""设置ROS2通信"""
# QoS配置
self.real_time_qos = QoSProfile(
depth=10,
reliability=ReliabilityPolicy.RELIABLE,
durability=DurabilityPolicy.VOLATILE
)
self.state_qos = QoSProfile(
depth=10,
reliability=ReliabilityPolicy.BEST_EFFORT,
durability=DurabilityPolicy.VOLATILE
)
# 发布者
self.joint_trajectory_pub = self.create_publisher(
JointTrajectory,
'/joint_trajectory_controller/joint_trajectory',
self.real_time_qos
)
self.pose_target_pub = self.create_publisher(
PoseStamped,
'/moveit_servo/pose_target_cmds',
self.real_time_qos
)
# 订阅者
self.joint_state_sub = self.create_subscription(
JointState,
'/joint_states',
self.joint_state_callback,
self.state_qos
)
try:
self.robot_status_sub = self.create_subscription(
RobotStatus,
'/robot_status',
self.robot_status_callback,
self.state_qos
)
except NameError:
self.get_logger().warn("RobotStatus消息类型不可用")
# 服务客户端
self.motion_plan_client = self.create_client(
GetMotionPlan,
'/plan_kinematic_path'
)
try:
self.feasibility_client = self.create_client(
AnalyzeFeasibility,
'/analyze_assembly_feasibility'
)
except NameError:
self.get_logger().warn("AnalyzeFeasibility服务类型不可用")
def _wait_for_connections(self):
"""等待ROS2连接建立"""
self.get_logger().info("等待ROS2服务连接...")
# 等待运动规划服务
while not self.motion_plan_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('等待运动规划服务...')
self.get_logger().info("所有服务连接已建立")
def joint_state_callback(self, msg: JointState):
"""关节状态回调函数"""
self.current_joint_state = msg
# 记录关节位置(可选)
if len(msg.position) >= 6:
positions = [f"{pos:.3f}" for pos in msg.position[:6]]
self.get_logger().debug(f"关节位置: {positions}")
def robot_status_callback(self, msg):
"""机器人状态回调函数"""
self.robot_status = msg.status
self.get_logger().debug(f"机器人状态: {self.robot_status}")
def move_to_joint_positions(self, joint_positions: List[float],
duration: float = 3.0) -> bool:
"""移动到指定关节位置
Args:
joint_positions: 目标关节位置(弧度)
duration: 运动时间(秒)
Returns:
bool: 是否成功发送命令
"""
if len(joint_positions) != len(self.joint_names):
self.get_logger().error(
f"关节位置数量不匹配: 期望{len(self.joint_names)}, 实际{len(joint_positions)}"
)
return False
# 创建轨迹消息
trajectory = JointTrajectory()
trajectory.header.stamp = self.get_clock().now().to_msg()
trajectory.joint_names = self.joint_names
# 创建轨迹点
point = JointTrajectoryPoint()
point.positions = joint_positions
point.time_from_start.sec = int(duration)
point.time_from_start.nanosec = int((duration - int(duration)) * 1e9)
trajectory.points = [point]
# 发布轨迹
self.joint_trajectory_pub.publish(trajectory)
self.get_logger().info(f"发送关节轨迹: {joint_positions}")
return True
def move_to_pose(self, target_pose: Pose) -> bool:
"""移动到指定末端执行器位姿
Args:
target_pose: 目标位姿
Returns:
bool: 是否成功发送命令
"""
# 创建位姿目标消息
pose_stamped = PoseStamped()
pose_stamped.header.stamp = self.get_clock().now().to_msg()
pose_stamped.header.frame_id = "base_link"
pose_stamped.pose = target_pose
# 发布位姿目标
self.pose_target_pub.publish(pose_stamped)
self.get_logger().info(
f"发送位姿目标: 位置({target_pose.position.x:.3f}, "
f"{target_pose.position.y:.3f}, {target_pose.position.z:.3f})"
)
return True
def plan_motion(self, target_joint_positions: List[float]) -> Optional[JointTrajectory]:
"""规划运动轨迹
Args:
target_joint_positions: 目标关节位置
Returns:
Optional[JointTrajectory]: 规划的轨迹失败时返回None
"""
if not self.motion_plan_client.service_is_ready():
self.get_logger().error("运动规划服务不可用")
return None
# 创建规划请求
request = GetMotionPlan.Request()
# 设置运动规划请求
motion_request = MotionPlanRequest()
motion_request.group_name = "manipulator"
motion_request.num_planning_attempts = 10
motion_request.allowed_planning_time = 5.0
# 设置目标约束
joint_constraints = []
for i, (joint_name, position) in enumerate(zip(self.joint_names, target_joint_positions)):
constraint = JointConstraint()
constraint.joint_name = joint_name
constraint.position = position
constraint.tolerance_above = 0.01
constraint.tolerance_below = 0.01
constraint.weight = 1.0
joint_constraints.append(constraint)
goal_constraints = Constraints()
goal_constraints.joint_constraints = joint_constraints
motion_request.goal_constraints = [goal_constraints]
request.motion_plan_request = motion_request
# 发送规划请求
self.get_logger().info("发送运动规划请求...")
future = self.motion_plan_client.call_async(request)
# 等待响应
rclpy.spin_until_future_complete(self, future, timeout_sec=10.0)
if future.result() is not None:
response = future.result()
if response.motion_plan_response.error_code.val == 1: # SUCCESS
self.get_logger().info("运动规划成功")
return response.motion_plan_response.trajectory.joint_trajectory
else:
self.get_logger().error(
f"运动规划失败: {response.motion_plan_response.error_code.val}"
)
else:
self.get_logger().error("运动规划请求超时")
return None
def analyze_feasibility(self, task_description: str) -> bool:
"""分析装配任务可行性
Args:
task_description: 任务描述
Returns:
bool: 任务是否可行
"""
if not hasattr(self, 'feasibility_client'):
self.get_logger().warn("可行性分析服务不可用")
return True # 默认认为可行
if not self.feasibility_client.service_is_ready():
self.get_logger().error("可行性分析服务不可用")
return False
# 创建分析请求(这里需要根据实际消息定义调整)
try:
request = AnalyzeFeasibility.Request()
request.task_description = task_description
request.robot_name = self.robot_name
# 发送分析请求
self.get_logger().info(f"分析任务可行性: {task_description}")
future = self.feasibility_client.call_async(request)
# 等待响应
rclpy.spin_until_future_complete(self, future, timeout_sec=15.0)
if future.result() is not None:
response = future.result()
self.get_logger().info(f"可行性分析结果: {response.is_feasible}")
return response.is_feasible
else:
self.get_logger().error("可行性分析请求超时")
except Exception as e:
self.get_logger().error(f"可行性分析失败: {e}")
return False
def get_current_joint_positions(self) -> Optional[List[float]]:
"""获取当前关节位置
Returns:
Optional[List[float]]: 当前关节位置未获取到时返回None
"""
if self.current_joint_state is None:
return None
return list(self.current_joint_state.position[:len(self.joint_names)])
def wait_for_motion_complete(self, timeout: float = 30.0) -> bool:
"""等待运动完成
Args:
timeout: 超时时间(秒)
Returns:
bool: 是否在超时前完成运动
"""
start_time = time.time()
while time.time() - start_time < timeout:
if self.robot_status == "IDLE":
return True
time.sleep(0.1)
rclpy.spin_once(self, timeout_sec=0.0)
self.get_logger().warn("等待运动完成超时")
return False
def create_sample_poses() -> List[Pose]:
"""创建示例位姿列表"""
poses = []
# 位姿1: 正前方
pose1 = Pose()
pose1.position = Point(x=0.3, y=0.0, z=0.3)
pose1.orientation = Quaternion(x=0.0, y=0.0, z=0.0, w=1.0)
poses.append(pose1)
# 位姿2: 右侧
pose2 = Pose()
pose2.position = Point(x=0.2, y=-0.2, z=0.3)
pose2.orientation = Quaternion(x=0.0, y=0.0, z=0.707, w=0.707)
poses.append(pose2)
# 位姿3: 左侧
pose3 = Pose()
pose3.position = Point(x=0.2, y=0.2, z=0.3)
pose3.orientation = Quaternion(x=0.0, y=0.0, z=-0.707, w=0.707)
poses.append(pose3)
return poses
def main():
"""主函数 - 演示基本使用方法"""
# 初始化ROS2
rclpy.init()
try:
# 创建控制器
controller = UnityRobotController()
# 等待初始状态
print("等待机器人状态...")
time.sleep(2.0)
rclpy.spin_once(controller, timeout_sec=1.0)
# 示例1: 移动到预定义关节位置
print("\n=== 示例1: 关节位置控制 ===")
home_position = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
if controller.move_to_joint_positions(home_position, duration=3.0):
print("发送回零位置命令")
controller.wait_for_motion_complete()
time.sleep(2.0)
# 示例2: 移动到不同关节位置
print("\n=== 示例2: 移动到工作位置 ===")
work_position = [0.5, -0.3, 0.8, 0.0, 0.5, 0.0]
if controller.move_to_joint_positions(work_position, duration=4.0):
print("发送工作位置命令")
controller.wait_for_motion_complete()
time.sleep(2.0)
# 示例3: 末端执行器位姿控制
print("\n=== 示例3: 末端执行器位姿控制 ===")
sample_poses = create_sample_poses()
for i, pose in enumerate(sample_poses):
print(f"移动到位姿 {i+1}")
controller.move_to_pose(pose)
time.sleep(3.0)
# 示例4: 运动规划
print("\n=== 示例4: 运动规划 ===")
target_joints = [1.0, -0.5, 1.2, 0.0, 0.8, -0.5]
trajectory = controller.plan_motion(target_joints)
if trajectory is not None:
print("规划成功,执行轨迹")
controller.joint_trajectory_pub.publish(trajectory)
controller.wait_for_motion_complete()
else:
print("规划失败")
# 示例5: 可行性分析
print("\n=== 示例5: 可行性分析 ===")
task_desc = "在狭窄空间内装配螺栓"
is_feasible = controller.analyze_feasibility(task_desc)
print(f"任务 '{task_desc}' 可行性: {is_feasible}")
# 示例6: 获取当前状态
print("\n=== 示例6: 状态查询 ===")
current_joints = controller.get_current_joint_positions()
if current_joints:
joint_str = [f"{pos:.3f}" for pos in current_joints]
print(f"当前关节位置: {joint_str}")
print(f"当前机器人状态: {controller.robot_status}")
# 回到初始位置
print("\n=== 返回初始位置 ===")
controller.move_to_joint_positions(home_position, duration=3.0)
controller.wait_for_motion_complete()
print("\n示例演示完成!")
except KeyboardInterrupt:
print("\n用户中断")
except Exception as e:
print(f"\n发生错误: {e}")
finally:
# 清理资源
if 'controller' in locals():
controller.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()