unity2moveit2/scripts/test_ros_tcp_connection.py
ayuan9957 fe15edcbd5 Initial commit: Unity-MoveIt2 integrated robotic arm simulation system
- Unity frontend with ROS-TCP-Connector for ROS2 communication
- Docker-based ROS2 Jazzy backend with MoveIt2 integration
- Support for 1-9 DOF manipulators
- UR5 robot configuration and URDF files
- Assembly task feasibility analysis tools
- Comprehensive documentation and deployment guides

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-13 12:08:34 +08:00

201 lines
6.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
ROS TCP连接测试脚本 / ROS TCP Connection Test Script
用于验证Unity与ROS2之间的TCP通信 / For testing TCP communication between Unity and ROS2
"""
import socket
import struct
import json
import time
import sys
class ROSTCPTester:
"""ROS TCP连接测试器 / ROS TCP Connection Tester"""
def __init__(self, host='127.0.0.1', port=10000):
"""
初始化测试器 / Initialize tester
Args:
host: ROS2服务器地址 / ROS2 server address
port: ROS2服务器端口 / ROS2 server port
"""
self.host = host
self.port = port
self.socket = None
def connect(self):
"""连接到ROS2 TCP端点 / Connect to ROS2 TCP Endpoint"""
try:
print(f"[测试器] 正在连接到 {self.host}:{self.port}...")
print(f"[Tester] Connecting to {self.host}:{self.port}...")
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(5.0)
self.socket.connect((self.host, self.port))
print("[测试器] ✓ 成功建立TCP连接")
print("[Tester] ✓ TCP connection established successfully")
return True
except socket.timeout:
print("[测试器] ✗ 连接超时")
print("[Tester] ✗ Connection timeout")
return False
except ConnectionRefusedError:
print("[测试器] ✗ 连接被拒绝 - 请确认ROS2服务正在运行")
print("[Tester] ✗ Connection refused - Make sure ROS2 service is running")
return False
except Exception as e:
print(f"[测试器] ✗ 连接错误: {e}")
print(f"[Tester] ✗ Connection error: {e}")
return False
def send_handshake(self):
"""
发送ROS-TCP-Connector握手消息 / Send ROS-TCP-Connector handshake message
根据ros-tcp-endpoint协议发送初始握手
"""
try:
# ROS-TCP-Connector握手格式:
# 1. 发送字符串 "ROS_TCP_UNITY_HANDSHAKE"
handshake_msg = b"ROS_TCP_UNITY_HANDSHAKE"
print(f"[测试器] 正在发送握手消息: {handshake_msg}")
print(f"[Tester] Sending handshake message: {handshake_msg}")
self.socket.sendall(handshake_msg)
# 等待响应
response = self.socket.recv(1024)
print(f"[测试器] ✓ 收到握手响应: {response}")
print(f"[Tester] ✓ Received handshake response: {response}")
return True
except socket.timeout:
print("[测试器] ✗ 握手超时")
print("[Tester] ✗ Handshake timeout")
return False
except Exception as e:
print(f"[测试器] ✗ 握手错误: {e}")
print(f"[Tester] ✗ Handshake error: {e}")
return False
def send_simple_message(self):
"""发送简单测试消息 / Send simple test message"""
try:
# 简单的JSON测试消息
test_message = {
"op": "test",
"type": "std_msgs/String",
"data": "Hello from Python Tester"
}
message_json = json.dumps(test_message).encode('utf-8')
message_length = len(message_json)
# 发送消息长度 (4字节大端序)
length_header = struct.pack('>I', message_length)
print(f"[测试器] 正在发送消息 (长度: {message_length})")
print(f"[Tester] Sending message (length: {message_length})")
self.socket.sendall(length_header + message_json)
print("[测试器] ✓ 消息已发送")
print("[Tester] ✓ Message sent")
return True
except Exception as e:
print(f"[测试器] ✗ 发送消息错误: {e}")
print(f"[Tester] ✗ Send message error: {e}")
return False
def keep_alive(self, duration=5):
"""
保持连接活跃 / Keep connection alive
Args:
duration: 保持连接的秒数 / Seconds to keep connection alive
"""
try:
print(f"[测试器] 保持连接活跃 {duration} 秒...")
print(f"[Tester] Keeping connection alive for {duration} seconds...")
time.sleep(duration)
print("[测试器] ✓ 连接保持成功")
print("[Tester] ✓ Connection kept alive successfully")
return True
except Exception as e:
print(f"[测试器] ✗ 保持连接错误: {e}")
print(f"[Tester] ✗ Keep alive error: {e}")
return False
def disconnect(self):
"""断开连接 / Disconnect"""
if self.socket:
try:
self.socket.close()
print("[测试器] ✓ 连接已关闭")
print("[Tester] ✓ Connection closed")
except Exception as e:
print(f"[测试器] 关闭连接时出错: {e}")
print(f"[Tester] Error closing connection: {e}")
def run_full_test(self):
"""运行完整测试 / Run full test"""
print("=" * 60)
print("ROS TCP连接完整测试 / ROS TCP Connection Full Test")
print("=" * 60)
# 测试1: 基本TCP连接
print("\n[测试1/3] 基本TCP连接测试")
print("[Test 1/3] Basic TCP Connection Test")
if not self.connect():
print("\n[测试结果] ✗ 测试失败 - 无法建立TCP连接")
print("[Test Result] ✗ Test failed - Cannot establish TCP connection")
return False
# 测试2: 保持连接
print("\n[测试2/3] 连接保持测试")
print("[Test 2/3] Connection Keep-Alive Test")
if not self.keep_alive(3):
self.disconnect()
print("\n[测试结果] ✗ 测试失败 - 无法保持连接")
print("[Test Result] ✗ Test failed - Cannot keep connection alive")
return False
# 测试3: 断开连接
print("\n[测试3/3] 断开连接测试")
print("[Test 3/3] Disconnect Test")
self.disconnect()
print("\n" + "=" * 60)
print("[测试结果] ✓ 所有测试通过!")
print("[Test Result] ✓ All tests passed!")
print("=" * 60)
return True
def main():
"""主函数 / Main function"""
# 解析命令行参数
host = sys.argv[1] if len(sys.argv) > 1 else '127.0.0.1'
port = int(sys.argv[2]) if len(sys.argv) > 2 else 10000
# 创建测试器并运行测试
tester = ROSTCPTester(host, port)
success = tester.run_full_test()
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()