unity2moveit2/test_ros_unity_connection.py
ayuan9957 fe15edcbd5 Initial commit: Unity-MoveIt2 integrated robotic arm simulation system
- Unity frontend with ROS-TCP-Connector for ROS2 communication
- Docker-based ROS2 Jazzy backend with MoveIt2 integration
- Support for 1-9 DOF manipulators
- UR5 robot configuration and URDF files
- Assembly task feasibility analysis tools
- Comprehensive documentation and deployment guides

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-13 12:08:34 +08:00

156 lines
4.6 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
ROS2-Unity 通信测试脚本
测试从外部客户端连接到ROS-TCP-Endpoint
"""
import socket
import json
import time
import sys
import io
# 设置UTF-8编码输出
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
class ROSUnityConnectionTester:
def __init__(self, host='127.0.0.1', port=10000):
self.host = host
self.port = port
self.socket = None
def test_tcp_connection(self):
"""测试TCP连接"""
print(f"\n{'='*50}")
print("测试1: TCP连接测试")
print(f"{'='*50}")
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(5)
print(f"正在连接到 {self.host}:{self.port}...")
self.socket.connect((self.host, self.port))
print("✅ TCP连接成功!")
return True
except socket.timeout:
print("❌ 连接超时!")
return False
except ConnectionRefusedError:
print("❌ 连接被拒绝! 请确认ROS2容器正在运行")
return False
except Exception as e:
print(f"❌ 连接失败: {e}")
return False
def test_socket_alive(self):
"""测试socket保持连接"""
print(f"\n{'='*50}")
print("测试2: Socket保持连接测试")
print(f"{'='*50}")
if not self.socket:
print("❌ Socket未连接")
return False
try:
# 检查socket是否仍然连接
print("正在检测Socket连接状态...")
# 设置为非阻塞模式检测
self.socket.setblocking(0)
try:
# 尝试peek数据(不从缓冲区移除)
data = self.socket.recv(1, socket.MSG_PEEK)
# 如果能读到数据或没有抛出异常,说明连接正常
except BlockingIOError:
# Windows下没有数据可读,但连接正常
pass
except Exception as e:
print(f"❌ Socket连接已断开: {e}")
return False
finally:
# 恢复阻塞模式
self.socket.setblocking(1)
print("✅ Socket保持连接正常!")
return True
except Exception as e:
print(f"❌ Socket测试失败: {e}")
return False
def test_ros_endpoint_info(self):
"""获取ROS Endpoint信息"""
print(f"\n{'='*50}")
print("测试3: ROS Endpoint信息获取")
print(f"{'='*50}")
print("提示: 完整的ROS消息测试需要在Unity中运行")
print("当前测试验证了基础的TCP连接层")
return True
def close(self):
"""关闭连接"""
if self.socket:
self.socket.close()
print("\n连接已关闭")
def run_all_tests(self):
"""运行所有测试"""
print("\n" + "="*50)
print("ROS2-Unity 通信测试开始")
print("="*50)
results = []
# 测试1: TCP连接
tcp_result = self.test_tcp_connection()
results.append(("TCP连接", tcp_result))
if tcp_result:
# 测试2: Socket保持连接
socket_result = self.test_socket_alive()
results.append(("Socket保持连接", socket_result))
# 测试3: ROS Endpoint信息
endpoint_result = self.test_ros_endpoint_info()
results.append(("ROS Endpoint信息", endpoint_result))
# 输出汇总
print(f"\n{'='*50}")
print("测试结果汇总")
print(f"{'='*50}")
for test_name, result in results:
status = "✅ 通过" if result else "❌ 失败"
print(f"{test_name:.<30} {status}")
all_passed = all(result for _, result in results)
print(f"\n{'='*50}")
if all_passed:
print("🎉 所有测试通过!")
print("\n后续步骤:")
print("1. 在Unity中打开项目")
print("2. 运行ConnectionTest场景或脚本")
print("3. 验证完整的ROS消息传递功能")
else:
print("⚠️ 部分测试失败,请检查:")
print("1. Docker容器是否正在运行: docker ps")
print("2. ROS2服务是否启动: docker logs unity-ros2-system")
print("3. 端口10000是否被占用: netstat -ano | findstr 10000")
print(f"{'='*50}\n")
self.close()
return all_passed
if __name__ == "__main__":
tester = ROSUnityConnectionTester()
tester.run_all_tests()