#!/usr/bin/env python3 """ Unity-MoveIt2 系统基本使用示例 此脚本演示了如何使用Unity-MoveIt2系统进行基本的机器人控制和路径规划。 包括连接设置、关节控制、末端执行器控制、轨迹规划等功能。 作者: Unity-MoveIt2 团队 版本: 1.0.0 """ import rclpy from rclpy.node import Node from rclpy.qos import QoSProfile, ReliabilityPolicy, DurabilityPolicy import numpy as np import time from typing import List, Optional # ROS2消息类型 from sensor_msgs.msg import JointState from geometry_msgs.msg import Pose, PoseStamped, Point, Quaternion from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint from moveit_msgs.srv import GetMotionPlan from moveit_msgs.msg import MotionPlanRequest, Constraints, JointConstraint # 自定义消息类型(假设已定义) try: from unity_msgs.msg import RobotStatus, FeasibilityResult from unity_msgs.srv import AnalyzeFeasibility except ImportError: print("警告: 自定义消息类型未找到,某些功能可能不可用") class UnityRobotController(Node): """Unity机器人控制器类""" def __init__(self): super().__init__('unity_robot_controller') # 机器人配置 self.robot_name = "niryo_one" self.joint_names = [ "joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6" ] # 当前状态 self.current_joint_state = None self.current_pose = None self.robot_status = "IDLE" # 初始化通信 self._setup_communication() # 等待连接 self._wait_for_connections() self.get_logger().info("Unity机器人控制器初始化完成") def _setup_communication(self): """设置ROS2通信""" # QoS配置 self.real_time_qos = QoSProfile( depth=10, reliability=ReliabilityPolicy.RELIABLE, durability=DurabilityPolicy.VOLATILE ) self.state_qos = QoSProfile( depth=10, reliability=ReliabilityPolicy.BEST_EFFORT, durability=DurabilityPolicy.VOLATILE ) # 发布者 self.joint_trajectory_pub = self.create_publisher( JointTrajectory, '/joint_trajectory_controller/joint_trajectory', self.real_time_qos ) self.pose_target_pub = self.create_publisher( PoseStamped, '/moveit_servo/pose_target_cmds', self.real_time_qos ) # 订阅者 self.joint_state_sub = self.create_subscription( JointState, '/joint_states', self.joint_state_callback, self.state_qos ) try: self.robot_status_sub = self.create_subscription( RobotStatus, '/robot_status', self.robot_status_callback, self.state_qos ) except NameError: self.get_logger().warn("RobotStatus消息类型不可用") # 服务客户端 self.motion_plan_client = self.create_client( GetMotionPlan, '/plan_kinematic_path' ) try: self.feasibility_client = self.create_client( AnalyzeFeasibility, '/analyze_assembly_feasibility' ) except NameError: self.get_logger().warn("AnalyzeFeasibility服务类型不可用") def _wait_for_connections(self): """等待ROS2连接建立""" self.get_logger().info("等待ROS2服务连接...") # 等待运动规划服务 while not self.motion_plan_client.wait_for_service(timeout_sec=1.0): self.get_logger().info('等待运动规划服务...') self.get_logger().info("所有服务连接已建立") def joint_state_callback(self, msg: JointState): """关节状态回调函数""" self.current_joint_state = msg # 记录关节位置(可选) if len(msg.position) >= 6: positions = [f"{pos:.3f}" for pos in msg.position[:6]] self.get_logger().debug(f"关节位置: {positions}") def robot_status_callback(self, msg): """机器人状态回调函数""" self.robot_status = msg.status self.get_logger().debug(f"机器人状态: {self.robot_status}") def move_to_joint_positions(self, joint_positions: List[float], duration: float = 3.0) -> bool: """移动到指定关节位置 Args: joint_positions: 目标关节位置(弧度) duration: 运动时间(秒) Returns: bool: 是否成功发送命令 """ if len(joint_positions) != len(self.joint_names): self.get_logger().error( f"关节位置数量不匹配: 期望{len(self.joint_names)}, 实际{len(joint_positions)}" ) return False # 创建轨迹消息 trajectory = JointTrajectory() trajectory.header.stamp = self.get_clock().now().to_msg() trajectory.joint_names = self.joint_names # 创建轨迹点 point = JointTrajectoryPoint() point.positions = joint_positions point.time_from_start.sec = int(duration) point.time_from_start.nanosec = int((duration - int(duration)) * 1e9) trajectory.points = [point] # 发布轨迹 self.joint_trajectory_pub.publish(trajectory) self.get_logger().info(f"发送关节轨迹: {joint_positions}") return True def move_to_pose(self, target_pose: Pose) -> bool: """移动到指定末端执行器位姿 Args: target_pose: 目标位姿 Returns: bool: 是否成功发送命令 """ # 创建位姿目标消息 pose_stamped = PoseStamped() pose_stamped.header.stamp = self.get_clock().now().to_msg() pose_stamped.header.frame_id = "base_link" pose_stamped.pose = target_pose # 发布位姿目标 self.pose_target_pub.publish(pose_stamped) self.get_logger().info( f"发送位姿目标: 位置({target_pose.position.x:.3f}, " f"{target_pose.position.y:.3f}, {target_pose.position.z:.3f})" ) return True def plan_motion(self, target_joint_positions: List[float]) -> Optional[JointTrajectory]: """规划运动轨迹 Args: target_joint_positions: 目标关节位置 Returns: Optional[JointTrajectory]: 规划的轨迹,失败时返回None """ if not self.motion_plan_client.service_is_ready(): self.get_logger().error("运动规划服务不可用") return None # 创建规划请求 request = GetMotionPlan.Request() # 设置运动规划请求 motion_request = MotionPlanRequest() motion_request.group_name = "manipulator" motion_request.num_planning_attempts = 10 motion_request.allowed_planning_time = 5.0 # 设置目标约束 joint_constraints = [] for i, (joint_name, position) in enumerate(zip(self.joint_names, target_joint_positions)): constraint = JointConstraint() constraint.joint_name = joint_name constraint.position = position constraint.tolerance_above = 0.01 constraint.tolerance_below = 0.01 constraint.weight = 1.0 joint_constraints.append(constraint) goal_constraints = Constraints() goal_constraints.joint_constraints = joint_constraints motion_request.goal_constraints = [goal_constraints] request.motion_plan_request = motion_request # 发送规划请求 self.get_logger().info("发送运动规划请求...") future = self.motion_plan_client.call_async(request) # 等待响应 rclpy.spin_until_future_complete(self, future, timeout_sec=10.0) if future.result() is not None: response = future.result() if response.motion_plan_response.error_code.val == 1: # SUCCESS self.get_logger().info("运动规划成功") return response.motion_plan_response.trajectory.joint_trajectory else: self.get_logger().error( f"运动规划失败: {response.motion_plan_response.error_code.val}" ) else: self.get_logger().error("运动规划请求超时") return None def analyze_feasibility(self, task_description: str) -> bool: """分析装配任务可行性 Args: task_description: 任务描述 Returns: bool: 任务是否可行 """ if not hasattr(self, 'feasibility_client'): self.get_logger().warn("可行性分析服务不可用") return True # 默认认为可行 if not self.feasibility_client.service_is_ready(): self.get_logger().error("可行性分析服务不可用") return False # 创建分析请求(这里需要根据实际消息定义调整) try: request = AnalyzeFeasibility.Request() request.task_description = task_description request.robot_name = self.robot_name # 发送分析请求 self.get_logger().info(f"分析任务可行性: {task_description}") future = self.feasibility_client.call_async(request) # 等待响应 rclpy.spin_until_future_complete(self, future, timeout_sec=15.0) if future.result() is not None: response = future.result() self.get_logger().info(f"可行性分析结果: {response.is_feasible}") return response.is_feasible else: self.get_logger().error("可行性分析请求超时") except Exception as e: self.get_logger().error(f"可行性分析失败: {e}") return False def get_current_joint_positions(self) -> Optional[List[float]]: """获取当前关节位置 Returns: Optional[List[float]]: 当前关节位置,未获取到时返回None """ if self.current_joint_state is None: return None return list(self.current_joint_state.position[:len(self.joint_names)]) def wait_for_motion_complete(self, timeout: float = 30.0) -> bool: """等待运动完成 Args: timeout: 超时时间(秒) Returns: bool: 是否在超时前完成运动 """ start_time = time.time() while time.time() - start_time < timeout: if self.robot_status == "IDLE": return True time.sleep(0.1) rclpy.spin_once(self, timeout_sec=0.0) self.get_logger().warn("等待运动完成超时") return False def create_sample_poses() -> List[Pose]: """创建示例位姿列表""" poses = [] # 位姿1: 正前方 pose1 = Pose() pose1.position = Point(x=0.3, y=0.0, z=0.3) pose1.orientation = Quaternion(x=0.0, y=0.0, z=0.0, w=1.0) poses.append(pose1) # 位姿2: 右侧 pose2 = Pose() pose2.position = Point(x=0.2, y=-0.2, z=0.3) pose2.orientation = Quaternion(x=0.0, y=0.0, z=0.707, w=0.707) poses.append(pose2) # 位姿3: 左侧 pose3 = Pose() pose3.position = Point(x=0.2, y=0.2, z=0.3) pose3.orientation = Quaternion(x=0.0, y=0.0, z=-0.707, w=0.707) poses.append(pose3) return poses def main(): """主函数 - 演示基本使用方法""" # 初始化ROS2 rclpy.init() try: # 创建控制器 controller = UnityRobotController() # 等待初始状态 print("等待机器人状态...") time.sleep(2.0) rclpy.spin_once(controller, timeout_sec=1.0) # 示例1: 移动到预定义关节位置 print("\n=== 示例1: 关节位置控制 ===") home_position = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0] if controller.move_to_joint_positions(home_position, duration=3.0): print("发送回零位置命令") controller.wait_for_motion_complete() time.sleep(2.0) # 示例2: 移动到不同关节位置 print("\n=== 示例2: 移动到工作位置 ===") work_position = [0.5, -0.3, 0.8, 0.0, 0.5, 0.0] if controller.move_to_joint_positions(work_position, duration=4.0): print("发送工作位置命令") controller.wait_for_motion_complete() time.sleep(2.0) # 示例3: 末端执行器位姿控制 print("\n=== 示例3: 末端执行器位姿控制 ===") sample_poses = create_sample_poses() for i, pose in enumerate(sample_poses): print(f"移动到位姿 {i+1}") controller.move_to_pose(pose) time.sleep(3.0) # 示例4: 运动规划 print("\n=== 示例4: 运动规划 ===") target_joints = [1.0, -0.5, 1.2, 0.0, 0.8, -0.5] trajectory = controller.plan_motion(target_joints) if trajectory is not None: print("规划成功,执行轨迹") controller.joint_trajectory_pub.publish(trajectory) controller.wait_for_motion_complete() else: print("规划失败") # 示例5: 可行性分析 print("\n=== 示例5: 可行性分析 ===") task_desc = "在狭窄空间内装配螺栓" is_feasible = controller.analyze_feasibility(task_desc) print(f"任务 '{task_desc}' 可行性: {is_feasible}") # 示例6: 获取当前状态 print("\n=== 示例6: 状态查询 ===") current_joints = controller.get_current_joint_positions() if current_joints: joint_str = [f"{pos:.3f}" for pos in current_joints] print(f"当前关节位置: {joint_str}") print(f"当前机器人状态: {controller.robot_status}") # 回到初始位置 print("\n=== 返回初始位置 ===") controller.move_to_joint_positions(home_position, duration=3.0) controller.wait_for_motion_complete() print("\n示例演示完成!") except KeyboardInterrupt: print("\n用户中断") except Exception as e: print(f"\n发生错误: {e}") finally: # 清理资源 if 'controller' in locals(): controller.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()