QDAirPortBackend0122/tools/test_adxp_websocket_system.py
2026-01-22 13:19:47 +08:00

191 lines
6.8 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
ADXP WebSocket系统集成测试脚本
用于测试整个ADXP WebSocket消息传输流程
"""
import websocket
import json
import time
import threading
import requests
class AdxpSystemTest:
def __init__(self):
self.adxp_adapter_url = "http://localhost:8086"
self.qaup_system_url = "http://localhost:8080"
self.ws_url = "ws://localhost:8086/ws/flight-notifications"
self.ws = None
self.connected = False
self.messages_received = 0
self.start_time = None
def test_health_endpoints(self):
"""测试健康检查端点"""
print("🔍 测试健康检查端点...")
try:
# 测试adxp-adapter健康检查
response = requests.get(f"{self.adxp_adapter_url}/actuator/health")
if response.status_code == 200:
print("✅ adxp-adapter健康检查: OK")
print(f" 状态: {response.json()}")
else:
print(f"❌ adxp-adapter健康检查失败: {response.status_code}")
except Exception as e:
print(f"❌ adxp-adapter健康检查异常: {e}")
try:
# 测试QAUP系统健康检查
response = requests.get(f"{self.qaup_system_url}/actuator/health")
if response.status_code == 200:
print("✅ QAUP系统健康检查: OK")
print(f" 状态: {response.json()}")
else:
print(f"❌ QAUP系统健康检查失败: {response.status_code}")
except Exception as e:
print(f"❌ QAUP系统健康检查异常: {e}")
def on_message(self, ws, message):
"""处理接收到的消息"""
self.messages_received += 1
print(f"📥 [{self.messages_received}] 收到WebSocket消息")
try:
# 尝试解析JSON消息
data = json.loads(message)
if isinstance(data, list):
print(f" 解析到 {len(data)} 条航班消息")
for i, msg in enumerate(data):
service_code = msg.get('serviceCode', 'N/A')
print(f" [{i+1}] 服务代码: {service_code}")
if service_code in ['ADXP_NAOMS_O_DYN_ARR', 'ADXP_NAOMS_O_CDM_AXOT']:
print(f" 航班号: {self.extract_flight_number(msg.get('content', ''))}")
else:
print(f" 消息内容: {data}")
except json.JSONDecodeError:
print(f" 非JSON消息: {message}")
# 显示性能统计
if self.start_time:
elapsed = time.time() - self.start_time
if elapsed > 0:
rate = self.messages_received / elapsed
print(f" 📊 接收速率: {rate:.2f} 条消息/秒")
def extract_flight_number(self, xml_content):
"""从XML内容中提取航班号"""
try:
import xml.etree.ElementTree as ET
root = ET.fromstring(xml_content)
flight_number = root.find('.//FlightNumber')
return flight_number.text if flight_number is not None else 'N/A'
except Exception:
return 'N/A'
def on_error(self, ws, error):
"""处理错误"""
print(f"❌ WebSocket错误: {error}")
self.connected = False
def on_close(self, ws, close_status_code, close_msg):
"""处理连接关闭"""
print(f"🔒 WebSocket连接已关闭: 状态码={close_status_code}, 消息={close_msg}")
self.connected = False
def on_open(self, ws):
"""处理连接打开"""
self.connected = True
self.start_time = time.time()
print("✅ WebSocket连接已建立")
print(f"🔗 连接地址: {self.ws_url}")
# 启动心跳线程
def heartbeat():
while self.connected:
time.sleep(30) # 每30秒发送一次心跳
if self.connected and self.ws and self.ws.sock.connected:
self.ws.send("ping")
print("💓 发送心跳")
else:
break
threading.Thread(target=heartbeat, daemon=True).start()
def test_websocket_connection(self):
"""测试WebSocket连接"""
print("\n🚀 测试WebSocket连接...")
try:
# 创建WebSocket连接
self.ws = websocket.WebSocketApp(self.ws_url,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close)
# 在单独线程中运行WebSocket
ws_thread = threading.Thread(target=self.ws.run_forever, daemon=True)
ws_thread.start()
# 等待连接建立
time.sleep(2)
if self.connected:
print("✅ WebSocket连接测试成功")
# 保持连接10秒以接收消息
print("⏳ 保持连接10秒以接收消息...")
time.sleep(10)
self.ws.close()
else:
print("❌ WebSocket连接测试失败")
except Exception as e:
print(f"❌ WebSocket连接测试异常: {e}")
def get_final_stats(self):
"""获取最终统计信息"""
if self.start_time:
elapsed = time.time() - self.start_time
return {
'messages_received': self.messages_received,
'elapsed_time': elapsed,
'rate': self.messages_received / elapsed if elapsed > 0 else 0
}
return None
def run_all_tests(self):
"""运行所有测试"""
print("🧪 开始ADXP WebSocket系统集成测试")
print("=" * 50)
# 测试健康检查端点
self.test_health_endpoints()
# 测试WebSocket连接
self.test_websocket_connection()
# 显示最终统计
stats = self.get_final_stats()
if stats:
print(f"\n📊 最终统计:")
print(f" 接收消息总数: {stats['messages_received']}")
print(f" 运行时间: {stats['elapsed_time']:.2f}")
print(f" 平均接收速率: {stats['rate']:.2f} 条消息/秒")
print("\n✅ 系统集成测试完成")
def main():
"""主函数"""
tester = AdxpSystemTest()
try:
tester.run_all_tests()
except KeyboardInterrupt:
print("\n⚠️ 用户中断")
except Exception as e:
print(f"❌ 测试过程中发生错误: {e}")
if __name__ == "__main__":
main()