- Unity frontend with ROS-TCP-Connector for ROS2 communication - Docker-based ROS2 Jazzy backend with MoveIt2 integration - Support for 1-9 DOF manipulators - UR5 robot configuration and URDF files - Assembly task feasibility analysis tools - Comprehensive documentation and deployment guides 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
156 lines
4.6 KiB
Python
156 lines
4.6 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
ROS2-Unity 通信测试脚本
|
|
测试从外部客户端连接到ROS-TCP-Endpoint
|
|
"""
|
|
|
|
import socket
|
|
import json
|
|
import time
|
|
import sys
|
|
import io
|
|
|
|
# 设置UTF-8编码输出
|
|
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
|
|
|
|
class ROSUnityConnectionTester:
|
|
def __init__(self, host='127.0.0.1', port=10000):
|
|
self.host = host
|
|
self.port = port
|
|
self.socket = None
|
|
|
|
def test_tcp_connection(self):
|
|
"""测试TCP连接"""
|
|
print(f"\n{'='*50}")
|
|
print("测试1: TCP连接测试")
|
|
print(f"{'='*50}")
|
|
|
|
try:
|
|
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.socket.settimeout(5)
|
|
|
|
print(f"正在连接到 {self.host}:{self.port}...")
|
|
self.socket.connect((self.host, self.port))
|
|
|
|
print("✅ TCP连接成功!")
|
|
return True
|
|
|
|
except socket.timeout:
|
|
print("❌ 连接超时!")
|
|
return False
|
|
except ConnectionRefusedError:
|
|
print("❌ 连接被拒绝! 请确认ROS2容器正在运行")
|
|
return False
|
|
except Exception as e:
|
|
print(f"❌ 连接失败: {e}")
|
|
return False
|
|
|
|
def test_socket_alive(self):
|
|
"""测试socket保持连接"""
|
|
print(f"\n{'='*50}")
|
|
print("测试2: Socket保持连接测试")
|
|
print(f"{'='*50}")
|
|
|
|
if not self.socket:
|
|
print("❌ Socket未连接")
|
|
return False
|
|
|
|
try:
|
|
# 检查socket是否仍然连接
|
|
print("正在检测Socket连接状态...")
|
|
|
|
# 设置为非阻塞模式检测
|
|
self.socket.setblocking(0)
|
|
try:
|
|
# 尝试peek数据(不从缓冲区移除)
|
|
data = self.socket.recv(1, socket.MSG_PEEK)
|
|
# 如果能读到数据或没有抛出异常,说明连接正常
|
|
except BlockingIOError:
|
|
# Windows下没有数据可读,但连接正常
|
|
pass
|
|
except Exception as e:
|
|
print(f"❌ Socket连接已断开: {e}")
|
|
return False
|
|
finally:
|
|
# 恢复阻塞模式
|
|
self.socket.setblocking(1)
|
|
|
|
print("✅ Socket保持连接正常!")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"❌ Socket测试失败: {e}")
|
|
return False
|
|
|
|
def test_ros_endpoint_info(self):
|
|
"""获取ROS Endpoint信息"""
|
|
print(f"\n{'='*50}")
|
|
print("测试3: ROS Endpoint信息获取")
|
|
print(f"{'='*50}")
|
|
|
|
print("提示: 完整的ROS消息测试需要在Unity中运行")
|
|
print("当前测试验证了基础的TCP连接层")
|
|
|
|
return True
|
|
|
|
def close(self):
|
|
"""关闭连接"""
|
|
if self.socket:
|
|
self.socket.close()
|
|
print("\n连接已关闭")
|
|
|
|
def run_all_tests(self):
|
|
"""运行所有测试"""
|
|
print("\n" + "="*50)
|
|
print("ROS2-Unity 通信测试开始")
|
|
print("="*50)
|
|
|
|
results = []
|
|
|
|
# 测试1: TCP连接
|
|
tcp_result = self.test_tcp_connection()
|
|
results.append(("TCP连接", tcp_result))
|
|
|
|
if tcp_result:
|
|
# 测试2: Socket保持连接
|
|
socket_result = self.test_socket_alive()
|
|
results.append(("Socket保持连接", socket_result))
|
|
|
|
# 测试3: ROS Endpoint信息
|
|
endpoint_result = self.test_ros_endpoint_info()
|
|
results.append(("ROS Endpoint信息", endpoint_result))
|
|
|
|
# 输出汇总
|
|
print(f"\n{'='*50}")
|
|
print("测试结果汇总")
|
|
print(f"{'='*50}")
|
|
|
|
for test_name, result in results:
|
|
status = "✅ 通过" if result else "❌ 失败"
|
|
print(f"{test_name:.<30} {status}")
|
|
|
|
all_passed = all(result for _, result in results)
|
|
|
|
print(f"\n{'='*50}")
|
|
if all_passed:
|
|
print("🎉 所有测试通过!")
|
|
print("\n后续步骤:")
|
|
print("1. 在Unity中打开项目")
|
|
print("2. 运行ConnectionTest场景或脚本")
|
|
print("3. 验证完整的ROS消息传递功能")
|
|
else:
|
|
print("⚠️ 部分测试失败,请检查:")
|
|
print("1. Docker容器是否正在运行: docker ps")
|
|
print("2. ROS2服务是否启动: docker logs unity-ros2-system")
|
|
print("3. 端口10000是否被占用: netstat -ano | findstr 10000")
|
|
print(f"{'='*50}\n")
|
|
|
|
self.close()
|
|
return all_passed
|
|
|
|
|
|
if __name__ == "__main__":
|
|
tester = ROSUnityConnectionTester()
|
|
tester.run_all_tests()
|