#!/usr/bin/env python3 """ ROS TCP连接测试脚本 / ROS TCP Connection Test Script 用于验证Unity与ROS2之间的TCP通信 / For testing TCP communication between Unity and ROS2 """ import socket import struct import json import time import sys class ROSTCPTester: """ROS TCP连接测试器 / ROS TCP Connection Tester""" def __init__(self, host='127.0.0.1', port=10000): """ 初始化测试器 / Initialize tester Args: host: ROS2服务器地址 / ROS2 server address port: ROS2服务器端口 / ROS2 server port """ self.host = host self.port = port self.socket = None def connect(self): """连接到ROS2 TCP端点 / Connect to ROS2 TCP Endpoint""" try: print(f"[测试器] 正在连接到 {self.host}:{self.port}...") print(f"[Tester] Connecting to {self.host}:{self.port}...") self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.settimeout(5.0) self.socket.connect((self.host, self.port)) print("[测试器] ✓ 成功建立TCP连接") print("[Tester] ✓ TCP connection established successfully") return True except socket.timeout: print("[测试器] ✗ 连接超时") print("[Tester] ✗ Connection timeout") return False except ConnectionRefusedError: print("[测试器] ✗ 连接被拒绝 - 请确认ROS2服务正在运行") print("[Tester] ✗ Connection refused - Make sure ROS2 service is running") return False except Exception as e: print(f"[测试器] ✗ 连接错误: {e}") print(f"[Tester] ✗ Connection error: {e}") return False def send_handshake(self): """ 发送ROS-TCP-Connector握手消息 / Send ROS-TCP-Connector handshake message 根据ros-tcp-endpoint协议发送初始握手 """ try: # ROS-TCP-Connector握手格式: # 1. 发送字符串 "ROS_TCP_UNITY_HANDSHAKE" handshake_msg = b"ROS_TCP_UNITY_HANDSHAKE" print(f"[测试器] 正在发送握手消息: {handshake_msg}") print(f"[Tester] Sending handshake message: {handshake_msg}") self.socket.sendall(handshake_msg) # 等待响应 response = self.socket.recv(1024) print(f"[测试器] ✓ 收到握手响应: {response}") print(f"[Tester] ✓ Received handshake response: {response}") return True except socket.timeout: print("[测试器] ✗ 握手超时") print("[Tester] ✗ Handshake timeout") return False except Exception as e: print(f"[测试器] ✗ 握手错误: {e}") print(f"[Tester] ✗ Handshake error: {e}") return False def send_simple_message(self): """发送简单测试消息 / Send simple test message""" try: # 简单的JSON测试消息 test_message = { "op": "test", "type": "std_msgs/String", "data": "Hello from Python Tester" } message_json = json.dumps(test_message).encode('utf-8') message_length = len(message_json) # 发送消息长度 (4字节,大端序) length_header = struct.pack('>I', message_length) print(f"[测试器] 正在发送消息 (长度: {message_length})") print(f"[Tester] Sending message (length: {message_length})") self.socket.sendall(length_header + message_json) print("[测试器] ✓ 消息已发送") print("[Tester] ✓ Message sent") return True except Exception as e: print(f"[测试器] ✗ 发送消息错误: {e}") print(f"[Tester] ✗ Send message error: {e}") return False def keep_alive(self, duration=5): """ 保持连接活跃 / Keep connection alive Args: duration: 保持连接的秒数 / Seconds to keep connection alive """ try: print(f"[测试器] 保持连接活跃 {duration} 秒...") print(f"[Tester] Keeping connection alive for {duration} seconds...") time.sleep(duration) print("[测试器] ✓ 连接保持成功") print("[Tester] ✓ Connection kept alive successfully") return True except Exception as e: print(f"[测试器] ✗ 保持连接错误: {e}") print(f"[Tester] ✗ Keep alive error: {e}") return False def disconnect(self): """断开连接 / Disconnect""" if self.socket: try: self.socket.close() print("[测试器] ✓ 连接已关闭") print("[Tester] ✓ Connection closed") except Exception as e: print(f"[测试器] 关闭连接时出错: {e}") print(f"[Tester] Error closing connection: {e}") def run_full_test(self): """运行完整测试 / Run full test""" print("=" * 60) print("ROS TCP连接完整测试 / ROS TCP Connection Full Test") print("=" * 60) # 测试1: 基本TCP连接 print("\n[测试1/3] 基本TCP连接测试") print("[Test 1/3] Basic TCP Connection Test") if not self.connect(): print("\n[测试结果] ✗ 测试失败 - 无法建立TCP连接") print("[Test Result] ✗ Test failed - Cannot establish TCP connection") return False # 测试2: 保持连接 print("\n[测试2/3] 连接保持测试") print("[Test 2/3] Connection Keep-Alive Test") if not self.keep_alive(3): self.disconnect() print("\n[测试结果] ✗ 测试失败 - 无法保持连接") print("[Test Result] ✗ Test failed - Cannot keep connection alive") return False # 测试3: 断开连接 print("\n[测试3/3] 断开连接测试") print("[Test 3/3] Disconnect Test") self.disconnect() print("\n" + "=" * 60) print("[测试结果] ✓ 所有测试通过!") print("[Test Result] ✓ All tests passed!") print("=" * 60) return True def main(): """主函数 / Main function""" # 解析命令行参数 host = sys.argv[1] if len(sys.argv) > 1 else '127.0.0.1' port = int(sys.argv[2]) if len(sys.argv) > 2 else 10000 # 创建测试器并运行测试 tester = ROSTCPTester(host, port) success = tester.run_full_test() sys.exit(0 if success else 1) if __name__ == "__main__": main()