- Unity frontend with ROS-TCP-Connector for ROS2 communication - Docker-based ROS2 Jazzy backend with MoveIt2 integration - Support for 1-9 DOF manipulators - UR5 robot configuration and URDF files - Assembly task feasibility analysis tools - Comprehensive documentation and deployment guides 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
201 lines
6.7 KiB
Python
201 lines
6.7 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
ROS TCP连接测试脚本 / ROS TCP Connection Test Script
|
||
用于验证Unity与ROS2之间的TCP通信 / For testing TCP communication between Unity and ROS2
|
||
"""
|
||
|
||
import socket
|
||
import struct
|
||
import json
|
||
import time
|
||
import sys
|
||
|
||
class ROSTCPTester:
|
||
"""ROS TCP连接测试器 / ROS TCP Connection Tester"""
|
||
|
||
def __init__(self, host='127.0.0.1', port=10000):
|
||
"""
|
||
初始化测试器 / Initialize tester
|
||
|
||
Args:
|
||
host: ROS2服务器地址 / ROS2 server address
|
||
port: ROS2服务器端口 / ROS2 server port
|
||
"""
|
||
self.host = host
|
||
self.port = port
|
||
self.socket = None
|
||
|
||
def connect(self):
|
||
"""连接到ROS2 TCP端点 / Connect to ROS2 TCP Endpoint"""
|
||
try:
|
||
print(f"[测试器] 正在连接到 {self.host}:{self.port}...")
|
||
print(f"[Tester] Connecting to {self.host}:{self.port}...")
|
||
|
||
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
self.socket.settimeout(5.0)
|
||
self.socket.connect((self.host, self.port))
|
||
|
||
print("[测试器] ✓ 成功建立TCP连接")
|
||
print("[Tester] ✓ TCP connection established successfully")
|
||
return True
|
||
|
||
except socket.timeout:
|
||
print("[测试器] ✗ 连接超时")
|
||
print("[Tester] ✗ Connection timeout")
|
||
return False
|
||
except ConnectionRefusedError:
|
||
print("[测试器] ✗ 连接被拒绝 - 请确认ROS2服务正在运行")
|
||
print("[Tester] ✗ Connection refused - Make sure ROS2 service is running")
|
||
return False
|
||
except Exception as e:
|
||
print(f"[测试器] ✗ 连接错误: {e}")
|
||
print(f"[Tester] ✗ Connection error: {e}")
|
||
return False
|
||
|
||
def send_handshake(self):
|
||
"""
|
||
发送ROS-TCP-Connector握手消息 / Send ROS-TCP-Connector handshake message
|
||
根据ros-tcp-endpoint协议发送初始握手
|
||
"""
|
||
try:
|
||
# ROS-TCP-Connector握手格式:
|
||
# 1. 发送字符串 "ROS_TCP_UNITY_HANDSHAKE"
|
||
handshake_msg = b"ROS_TCP_UNITY_HANDSHAKE"
|
||
|
||
print(f"[测试器] 正在发送握手消息: {handshake_msg}")
|
||
print(f"[Tester] Sending handshake message: {handshake_msg}")
|
||
|
||
self.socket.sendall(handshake_msg)
|
||
|
||
# 等待响应
|
||
response = self.socket.recv(1024)
|
||
print(f"[测试器] ✓ 收到握手响应: {response}")
|
||
print(f"[Tester] ✓ Received handshake response: {response}")
|
||
|
||
return True
|
||
|
||
except socket.timeout:
|
||
print("[测试器] ✗ 握手超时")
|
||
print("[Tester] ✗ Handshake timeout")
|
||
return False
|
||
except Exception as e:
|
||
print(f"[测试器] ✗ 握手错误: {e}")
|
||
print(f"[Tester] ✗ Handshake error: {e}")
|
||
return False
|
||
|
||
def send_simple_message(self):
|
||
"""发送简单测试消息 / Send simple test message"""
|
||
try:
|
||
# 简单的JSON测试消息
|
||
test_message = {
|
||
"op": "test",
|
||
"type": "std_msgs/String",
|
||
"data": "Hello from Python Tester"
|
||
}
|
||
|
||
message_json = json.dumps(test_message).encode('utf-8')
|
||
message_length = len(message_json)
|
||
|
||
# 发送消息长度 (4字节,大端序)
|
||
length_header = struct.pack('>I', message_length)
|
||
|
||
print(f"[测试器] 正在发送消息 (长度: {message_length})")
|
||
print(f"[Tester] Sending message (length: {message_length})")
|
||
|
||
self.socket.sendall(length_header + message_json)
|
||
|
||
print("[测试器] ✓ 消息已发送")
|
||
print("[Tester] ✓ Message sent")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"[测试器] ✗ 发送消息错误: {e}")
|
||
print(f"[Tester] ✗ Send message error: {e}")
|
||
return False
|
||
|
||
def keep_alive(self, duration=5):
|
||
"""
|
||
保持连接活跃 / Keep connection alive
|
||
|
||
Args:
|
||
duration: 保持连接的秒数 / Seconds to keep connection alive
|
||
"""
|
||
try:
|
||
print(f"[测试器] 保持连接活跃 {duration} 秒...")
|
||
print(f"[Tester] Keeping connection alive for {duration} seconds...")
|
||
|
||
time.sleep(duration)
|
||
|
||
print("[测试器] ✓ 连接保持成功")
|
||
print("[Tester] ✓ Connection kept alive successfully")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"[测试器] ✗ 保持连接错误: {e}")
|
||
print(f"[Tester] ✗ Keep alive error: {e}")
|
||
return False
|
||
|
||
def disconnect(self):
|
||
"""断开连接 / Disconnect"""
|
||
if self.socket:
|
||
try:
|
||
self.socket.close()
|
||
print("[测试器] ✓ 连接已关闭")
|
||
print("[Tester] ✓ Connection closed")
|
||
except Exception as e:
|
||
print(f"[测试器] 关闭连接时出错: {e}")
|
||
print(f"[Tester] Error closing connection: {e}")
|
||
|
||
def run_full_test(self):
|
||
"""运行完整测试 / Run full test"""
|
||
print("=" * 60)
|
||
print("ROS TCP连接完整测试 / ROS TCP Connection Full Test")
|
||
print("=" * 60)
|
||
|
||
# 测试1: 基本TCP连接
|
||
print("\n[测试1/3] 基本TCP连接测试")
|
||
print("[Test 1/3] Basic TCP Connection Test")
|
||
if not self.connect():
|
||
print("\n[测试结果] ✗ 测试失败 - 无法建立TCP连接")
|
||
print("[Test Result] ✗ Test failed - Cannot establish TCP connection")
|
||
return False
|
||
|
||
# 测试2: 保持连接
|
||
print("\n[测试2/3] 连接保持测试")
|
||
print("[Test 2/3] Connection Keep-Alive Test")
|
||
if not self.keep_alive(3):
|
||
self.disconnect()
|
||
print("\n[测试结果] ✗ 测试失败 - 无法保持连接")
|
||
print("[Test Result] ✗ Test failed - Cannot keep connection alive")
|
||
return False
|
||
|
||
# 测试3: 断开连接
|
||
print("\n[测试3/3] 断开连接测试")
|
||
print("[Test 3/3] Disconnect Test")
|
||
self.disconnect()
|
||
|
||
print("\n" + "=" * 60)
|
||
print("[测试结果] ✓ 所有测试通过!")
|
||
print("[Test Result] ✓ All tests passed!")
|
||
print("=" * 60)
|
||
|
||
return True
|
||
|
||
|
||
def main():
|
||
"""主函数 / Main function"""
|
||
# 解析命令行参数
|
||
host = sys.argv[1] if len(sys.argv) > 1 else '127.0.0.1'
|
||
port = int(sys.argv[2]) if len(sys.argv) > 2 else 10000
|
||
|
||
# 创建测试器并运行测试
|
||
tester = ROSTCPTester(host, port)
|
||
success = tester.run_full_test()
|
||
|
||
sys.exit(0 if success else 1)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|