- Unity frontend with ROS-TCP-Connector for ROS2 communication - Docker-based ROS2 Jazzy backend with MoveIt2 integration - Support for 1-9 DOF manipulators - UR5 robot configuration and URDF files - Assembly task feasibility analysis tools - Comprehensive documentation and deployment guides 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
453 lines
15 KiB
Python
453 lines
15 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Unity-MoveIt2 系统基本使用示例
|
||
|
||
此脚本演示了如何使用Unity-MoveIt2系统进行基本的机器人控制和路径规划。
|
||
包括连接设置、关节控制、末端执行器控制、轨迹规划等功能。
|
||
|
||
作者: Unity-MoveIt2 团队
|
||
版本: 1.0.0
|
||
"""
|
||
|
||
import rclpy
|
||
from rclpy.node import Node
|
||
from rclpy.qos import QoSProfile, ReliabilityPolicy, DurabilityPolicy
|
||
import numpy as np
|
||
import time
|
||
from typing import List, Optional
|
||
|
||
# ROS2消息类型
|
||
from sensor_msgs.msg import JointState
|
||
from geometry_msgs.msg import Pose, PoseStamped, Point, Quaternion
|
||
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
|
||
from moveit_msgs.srv import GetMotionPlan
|
||
from moveit_msgs.msg import MotionPlanRequest, Constraints, JointConstraint
|
||
|
||
# 自定义消息类型(假设已定义)
|
||
try:
|
||
from unity_msgs.msg import RobotStatus, FeasibilityResult
|
||
from unity_msgs.srv import AnalyzeFeasibility
|
||
except ImportError:
|
||
print("警告: 自定义消息类型未找到,某些功能可能不可用")
|
||
|
||
|
||
class UnityRobotController(Node):
|
||
"""Unity机器人控制器类"""
|
||
|
||
def __init__(self):
|
||
super().__init__('unity_robot_controller')
|
||
|
||
# 机器人配置
|
||
self.robot_name = "niryo_one"
|
||
self.joint_names = [
|
||
"joint_1", "joint_2", "joint_3",
|
||
"joint_4", "joint_5", "joint_6"
|
||
]
|
||
|
||
# 当前状态
|
||
self.current_joint_state = None
|
||
self.current_pose = None
|
||
self.robot_status = "IDLE"
|
||
|
||
# 初始化通信
|
||
self._setup_communication()
|
||
|
||
# 等待连接
|
||
self._wait_for_connections()
|
||
|
||
self.get_logger().info("Unity机器人控制器初始化完成")
|
||
|
||
def _setup_communication(self):
|
||
"""设置ROS2通信"""
|
||
|
||
# QoS配置
|
||
self.real_time_qos = QoSProfile(
|
||
depth=10,
|
||
reliability=ReliabilityPolicy.RELIABLE,
|
||
durability=DurabilityPolicy.VOLATILE
|
||
)
|
||
|
||
self.state_qos = QoSProfile(
|
||
depth=10,
|
||
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||
durability=DurabilityPolicy.VOLATILE
|
||
)
|
||
|
||
# 发布者
|
||
self.joint_trajectory_pub = self.create_publisher(
|
||
JointTrajectory,
|
||
'/joint_trajectory_controller/joint_trajectory',
|
||
self.real_time_qos
|
||
)
|
||
|
||
self.pose_target_pub = self.create_publisher(
|
||
PoseStamped,
|
||
'/moveit_servo/pose_target_cmds',
|
||
self.real_time_qos
|
||
)
|
||
|
||
# 订阅者
|
||
self.joint_state_sub = self.create_subscription(
|
||
JointState,
|
||
'/joint_states',
|
||
self.joint_state_callback,
|
||
self.state_qos
|
||
)
|
||
|
||
try:
|
||
self.robot_status_sub = self.create_subscription(
|
||
RobotStatus,
|
||
'/robot_status',
|
||
self.robot_status_callback,
|
||
self.state_qos
|
||
)
|
||
except NameError:
|
||
self.get_logger().warn("RobotStatus消息类型不可用")
|
||
|
||
# 服务客户端
|
||
self.motion_plan_client = self.create_client(
|
||
GetMotionPlan,
|
||
'/plan_kinematic_path'
|
||
)
|
||
|
||
try:
|
||
self.feasibility_client = self.create_client(
|
||
AnalyzeFeasibility,
|
||
'/analyze_assembly_feasibility'
|
||
)
|
||
except NameError:
|
||
self.get_logger().warn("AnalyzeFeasibility服务类型不可用")
|
||
|
||
def _wait_for_connections(self):
|
||
"""等待ROS2连接建立"""
|
||
self.get_logger().info("等待ROS2服务连接...")
|
||
|
||
# 等待运动规划服务
|
||
while not self.motion_plan_client.wait_for_service(timeout_sec=1.0):
|
||
self.get_logger().info('等待运动规划服务...')
|
||
|
||
self.get_logger().info("所有服务连接已建立")
|
||
|
||
def joint_state_callback(self, msg: JointState):
|
||
"""关节状态回调函数"""
|
||
self.current_joint_state = msg
|
||
|
||
# 记录关节位置(可选)
|
||
if len(msg.position) >= 6:
|
||
positions = [f"{pos:.3f}" for pos in msg.position[:6]]
|
||
self.get_logger().debug(f"关节位置: {positions}")
|
||
|
||
def robot_status_callback(self, msg):
|
||
"""机器人状态回调函数"""
|
||
self.robot_status = msg.status
|
||
self.get_logger().debug(f"机器人状态: {self.robot_status}")
|
||
|
||
def move_to_joint_positions(self, joint_positions: List[float],
|
||
duration: float = 3.0) -> bool:
|
||
"""移动到指定关节位置
|
||
|
||
Args:
|
||
joint_positions: 目标关节位置(弧度)
|
||
duration: 运动时间(秒)
|
||
|
||
Returns:
|
||
bool: 是否成功发送命令
|
||
"""
|
||
if len(joint_positions) != len(self.joint_names):
|
||
self.get_logger().error(
|
||
f"关节位置数量不匹配: 期望{len(self.joint_names)}, 实际{len(joint_positions)}"
|
||
)
|
||
return False
|
||
|
||
# 创建轨迹消息
|
||
trajectory = JointTrajectory()
|
||
trajectory.header.stamp = self.get_clock().now().to_msg()
|
||
trajectory.joint_names = self.joint_names
|
||
|
||
# 创建轨迹点
|
||
point = JointTrajectoryPoint()
|
||
point.positions = joint_positions
|
||
point.time_from_start.sec = int(duration)
|
||
point.time_from_start.nanosec = int((duration - int(duration)) * 1e9)
|
||
|
||
trajectory.points = [point]
|
||
|
||
# 发布轨迹
|
||
self.joint_trajectory_pub.publish(trajectory)
|
||
self.get_logger().info(f"发送关节轨迹: {joint_positions}")
|
||
|
||
return True
|
||
|
||
def move_to_pose(self, target_pose: Pose) -> bool:
|
||
"""移动到指定末端执行器位姿
|
||
|
||
Args:
|
||
target_pose: 目标位姿
|
||
|
||
Returns:
|
||
bool: 是否成功发送命令
|
||
"""
|
||
# 创建位姿目标消息
|
||
pose_stamped = PoseStamped()
|
||
pose_stamped.header.stamp = self.get_clock().now().to_msg()
|
||
pose_stamped.header.frame_id = "base_link"
|
||
pose_stamped.pose = target_pose
|
||
|
||
# 发布位姿目标
|
||
self.pose_target_pub.publish(pose_stamped)
|
||
self.get_logger().info(
|
||
f"发送位姿目标: 位置({target_pose.position.x:.3f}, "
|
||
f"{target_pose.position.y:.3f}, {target_pose.position.z:.3f})"
|
||
)
|
||
|
||
return True
|
||
|
||
def plan_motion(self, target_joint_positions: List[float]) -> Optional[JointTrajectory]:
|
||
"""规划运动轨迹
|
||
|
||
Args:
|
||
target_joint_positions: 目标关节位置
|
||
|
||
Returns:
|
||
Optional[JointTrajectory]: 规划的轨迹,失败时返回None
|
||
"""
|
||
if not self.motion_plan_client.service_is_ready():
|
||
self.get_logger().error("运动规划服务不可用")
|
||
return None
|
||
|
||
# 创建规划请求
|
||
request = GetMotionPlan.Request()
|
||
|
||
# 设置运动规划请求
|
||
motion_request = MotionPlanRequest()
|
||
motion_request.group_name = "manipulator"
|
||
motion_request.num_planning_attempts = 10
|
||
motion_request.allowed_planning_time = 5.0
|
||
|
||
# 设置目标约束
|
||
joint_constraints = []
|
||
for i, (joint_name, position) in enumerate(zip(self.joint_names, target_joint_positions)):
|
||
constraint = JointConstraint()
|
||
constraint.joint_name = joint_name
|
||
constraint.position = position
|
||
constraint.tolerance_above = 0.01
|
||
constraint.tolerance_below = 0.01
|
||
constraint.weight = 1.0
|
||
joint_constraints.append(constraint)
|
||
|
||
goal_constraints = Constraints()
|
||
goal_constraints.joint_constraints = joint_constraints
|
||
motion_request.goal_constraints = [goal_constraints]
|
||
|
||
request.motion_plan_request = motion_request
|
||
|
||
# 发送规划请求
|
||
self.get_logger().info("发送运动规划请求...")
|
||
future = self.motion_plan_client.call_async(request)
|
||
|
||
# 等待响应
|
||
rclpy.spin_until_future_complete(self, future, timeout_sec=10.0)
|
||
|
||
if future.result() is not None:
|
||
response = future.result()
|
||
if response.motion_plan_response.error_code.val == 1: # SUCCESS
|
||
self.get_logger().info("运动规划成功")
|
||
return response.motion_plan_response.trajectory.joint_trajectory
|
||
else:
|
||
self.get_logger().error(
|
||
f"运动规划失败: {response.motion_plan_response.error_code.val}"
|
||
)
|
||
else:
|
||
self.get_logger().error("运动规划请求超时")
|
||
|
||
return None
|
||
|
||
def analyze_feasibility(self, task_description: str) -> bool:
|
||
"""分析装配任务可行性
|
||
|
||
Args:
|
||
task_description: 任务描述
|
||
|
||
Returns:
|
||
bool: 任务是否可行
|
||
"""
|
||
if not hasattr(self, 'feasibility_client'):
|
||
self.get_logger().warn("可行性分析服务不可用")
|
||
return True # 默认认为可行
|
||
|
||
if not self.feasibility_client.service_is_ready():
|
||
self.get_logger().error("可行性分析服务不可用")
|
||
return False
|
||
|
||
# 创建分析请求(这里需要根据实际消息定义调整)
|
||
try:
|
||
request = AnalyzeFeasibility.Request()
|
||
request.task_description = task_description
|
||
request.robot_name = self.robot_name
|
||
|
||
# 发送分析请求
|
||
self.get_logger().info(f"分析任务可行性: {task_description}")
|
||
future = self.feasibility_client.call_async(request)
|
||
|
||
# 等待响应
|
||
rclpy.spin_until_future_complete(self, future, timeout_sec=15.0)
|
||
|
||
if future.result() is not None:
|
||
response = future.result()
|
||
self.get_logger().info(f"可行性分析结果: {response.is_feasible}")
|
||
return response.is_feasible
|
||
else:
|
||
self.get_logger().error("可行性分析请求超时")
|
||
|
||
except Exception as e:
|
||
self.get_logger().error(f"可行性分析失败: {e}")
|
||
|
||
return False
|
||
|
||
def get_current_joint_positions(self) -> Optional[List[float]]:
|
||
"""获取当前关节位置
|
||
|
||
Returns:
|
||
Optional[List[float]]: 当前关节位置,未获取到时返回None
|
||
"""
|
||
if self.current_joint_state is None:
|
||
return None
|
||
|
||
return list(self.current_joint_state.position[:len(self.joint_names)])
|
||
|
||
def wait_for_motion_complete(self, timeout: float = 30.0) -> bool:
|
||
"""等待运动完成
|
||
|
||
Args:
|
||
timeout: 超时时间(秒)
|
||
|
||
Returns:
|
||
bool: 是否在超时前完成运动
|
||
"""
|
||
start_time = time.time()
|
||
|
||
while time.time() - start_time < timeout:
|
||
if self.robot_status == "IDLE":
|
||
return True
|
||
|
||
time.sleep(0.1)
|
||
rclpy.spin_once(self, timeout_sec=0.0)
|
||
|
||
self.get_logger().warn("等待运动完成超时")
|
||
return False
|
||
|
||
|
||
def create_sample_poses() -> List[Pose]:
|
||
"""创建示例位姿列表"""
|
||
poses = []
|
||
|
||
# 位姿1: 正前方
|
||
pose1 = Pose()
|
||
pose1.position = Point(x=0.3, y=0.0, z=0.3)
|
||
pose1.orientation = Quaternion(x=0.0, y=0.0, z=0.0, w=1.0)
|
||
poses.append(pose1)
|
||
|
||
# 位姿2: 右侧
|
||
pose2 = Pose()
|
||
pose2.position = Point(x=0.2, y=-0.2, z=0.3)
|
||
pose2.orientation = Quaternion(x=0.0, y=0.0, z=0.707, w=0.707)
|
||
poses.append(pose2)
|
||
|
||
# 位姿3: 左侧
|
||
pose3 = Pose()
|
||
pose3.position = Point(x=0.2, y=0.2, z=0.3)
|
||
pose3.orientation = Quaternion(x=0.0, y=0.0, z=-0.707, w=0.707)
|
||
poses.append(pose3)
|
||
|
||
return poses
|
||
|
||
|
||
def main():
|
||
"""主函数 - 演示基本使用方法"""
|
||
|
||
# 初始化ROS2
|
||
rclpy.init()
|
||
|
||
try:
|
||
# 创建控制器
|
||
controller = UnityRobotController()
|
||
|
||
# 等待初始状态
|
||
print("等待机器人状态...")
|
||
time.sleep(2.0)
|
||
rclpy.spin_once(controller, timeout_sec=1.0)
|
||
|
||
# 示例1: 移动到预定义关节位置
|
||
print("\n=== 示例1: 关节位置控制 ===")
|
||
home_position = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
|
||
if controller.move_to_joint_positions(home_position, duration=3.0):
|
||
print("发送回零位置命令")
|
||
controller.wait_for_motion_complete()
|
||
|
||
time.sleep(2.0)
|
||
|
||
# 示例2: 移动到不同关节位置
|
||
print("\n=== 示例2: 移动到工作位置 ===")
|
||
work_position = [0.5, -0.3, 0.8, 0.0, 0.5, 0.0]
|
||
if controller.move_to_joint_positions(work_position, duration=4.0):
|
||
print("发送工作位置命令")
|
||
controller.wait_for_motion_complete()
|
||
|
||
time.sleep(2.0)
|
||
|
||
# 示例3: 末端执行器位姿控制
|
||
print("\n=== 示例3: 末端执行器位姿控制 ===")
|
||
sample_poses = create_sample_poses()
|
||
|
||
for i, pose in enumerate(sample_poses):
|
||
print(f"移动到位姿 {i+1}")
|
||
controller.move_to_pose(pose)
|
||
time.sleep(3.0)
|
||
|
||
# 示例4: 运动规划
|
||
print("\n=== 示例4: 运动规划 ===")
|
||
target_joints = [1.0, -0.5, 1.2, 0.0, 0.8, -0.5]
|
||
trajectory = controller.plan_motion(target_joints)
|
||
|
||
if trajectory is not None:
|
||
print("规划成功,执行轨迹")
|
||
controller.joint_trajectory_pub.publish(trajectory)
|
||
controller.wait_for_motion_complete()
|
||
else:
|
||
print("规划失败")
|
||
|
||
# 示例5: 可行性分析
|
||
print("\n=== 示例5: 可行性分析 ===")
|
||
task_desc = "在狭窄空间内装配螺栓"
|
||
is_feasible = controller.analyze_feasibility(task_desc)
|
||
print(f"任务 '{task_desc}' 可行性: {is_feasible}")
|
||
|
||
# 示例6: 获取当前状态
|
||
print("\n=== 示例6: 状态查询 ===")
|
||
current_joints = controller.get_current_joint_positions()
|
||
if current_joints:
|
||
joint_str = [f"{pos:.3f}" for pos in current_joints]
|
||
print(f"当前关节位置: {joint_str}")
|
||
|
||
print(f"当前机器人状态: {controller.robot_status}")
|
||
|
||
# 回到初始位置
|
||
print("\n=== 返回初始位置 ===")
|
||
controller.move_to_joint_positions(home_position, duration=3.0)
|
||
controller.wait_for_motion_complete()
|
||
|
||
print("\n示例演示完成!")
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n用户中断")
|
||
except Exception as e:
|
||
print(f"\n发生错误: {e}")
|
||
finally:
|
||
# 清理资源
|
||
if 'controller' in locals():
|
||
controller.destroy_node()
|
||
rclpy.shutdown()
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main() |