191 lines
6.8 KiB
Python
191 lines
6.8 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
ADXP WebSocket系统集成测试脚本
|
|
用于测试整个ADXP WebSocket消息传输流程
|
|
"""
|
|
|
|
import websocket
|
|
import json
|
|
import time
|
|
import threading
|
|
import requests
|
|
|
|
class AdxpSystemTest:
|
|
def __init__(self):
|
|
self.adxp_adapter_url = "http://localhost:8086"
|
|
self.qaup_system_url = "http://localhost:8080"
|
|
self.ws_url = "ws://localhost:8086/ws/flight-notifications"
|
|
self.ws = None
|
|
self.connected = False
|
|
self.messages_received = 0
|
|
self.start_time = None
|
|
|
|
def test_health_endpoints(self):
|
|
"""测试健康检查端点"""
|
|
print("🔍 测试健康检查端点...")
|
|
|
|
try:
|
|
# 测试adxp-adapter健康检查
|
|
response = requests.get(f"{self.adxp_adapter_url}/actuator/health")
|
|
if response.status_code == 200:
|
|
print("✅ adxp-adapter健康检查: OK")
|
|
print(f" 状态: {response.json()}")
|
|
else:
|
|
print(f"❌ adxp-adapter健康检查失败: {response.status_code}")
|
|
except Exception as e:
|
|
print(f"❌ adxp-adapter健康检查异常: {e}")
|
|
|
|
try:
|
|
# 测试QAUP系统健康检查
|
|
response = requests.get(f"{self.qaup_system_url}/actuator/health")
|
|
if response.status_code == 200:
|
|
print("✅ QAUP系统健康检查: OK")
|
|
print(f" 状态: {response.json()}")
|
|
else:
|
|
print(f"❌ QAUP系统健康检查失败: {response.status_code}")
|
|
except Exception as e:
|
|
print(f"❌ QAUP系统健康检查异常: {e}")
|
|
|
|
def on_message(self, ws, message):
|
|
"""处理接收到的消息"""
|
|
self.messages_received += 1
|
|
print(f"📥 [{self.messages_received}] 收到WebSocket消息")
|
|
|
|
try:
|
|
# 尝试解析JSON消息
|
|
data = json.loads(message)
|
|
if isinstance(data, list):
|
|
print(f" 解析到 {len(data)} 条航班消息")
|
|
for i, msg in enumerate(data):
|
|
service_code = msg.get('serviceCode', 'N/A')
|
|
print(f" [{i+1}] 服务代码: {service_code}")
|
|
if service_code in ['ADXP_NAOMS_O_DYN_ARR', 'ADXP_NAOMS_O_CDM_AXOT']:
|
|
print(f" 航班号: {self.extract_flight_number(msg.get('content', ''))}")
|
|
else:
|
|
print(f" 消息内容: {data}")
|
|
except json.JSONDecodeError:
|
|
print(f" 非JSON消息: {message}")
|
|
|
|
# 显示性能统计
|
|
if self.start_time:
|
|
elapsed = time.time() - self.start_time
|
|
if elapsed > 0:
|
|
rate = self.messages_received / elapsed
|
|
print(f" 📊 接收速率: {rate:.2f} 条消息/秒")
|
|
|
|
def extract_flight_number(self, xml_content):
|
|
"""从XML内容中提取航班号"""
|
|
try:
|
|
import xml.etree.ElementTree as ET
|
|
root = ET.fromstring(xml_content)
|
|
flight_number = root.find('.//FlightNumber')
|
|
return flight_number.text if flight_number is not None else 'N/A'
|
|
except Exception:
|
|
return 'N/A'
|
|
|
|
def on_error(self, ws, error):
|
|
"""处理错误"""
|
|
print(f"❌ WebSocket错误: {error}")
|
|
self.connected = False
|
|
|
|
def on_close(self, ws, close_status_code, close_msg):
|
|
"""处理连接关闭"""
|
|
print(f"🔒 WebSocket连接已关闭: 状态码={close_status_code}, 消息={close_msg}")
|
|
self.connected = False
|
|
|
|
def on_open(self, ws):
|
|
"""处理连接打开"""
|
|
self.connected = True
|
|
self.start_time = time.time()
|
|
print("✅ WebSocket连接已建立")
|
|
print(f"🔗 连接地址: {self.ws_url}")
|
|
|
|
# 启动心跳线程
|
|
def heartbeat():
|
|
while self.connected:
|
|
time.sleep(30) # 每30秒发送一次心跳
|
|
if self.connected and self.ws and self.ws.sock.connected:
|
|
self.ws.send("ping")
|
|
print("💓 发送心跳")
|
|
else:
|
|
break
|
|
|
|
threading.Thread(target=heartbeat, daemon=True).start()
|
|
|
|
def test_websocket_connection(self):
|
|
"""测试WebSocket连接"""
|
|
print("\n🚀 测试WebSocket连接...")
|
|
|
|
try:
|
|
# 创建WebSocket连接
|
|
self.ws = websocket.WebSocketApp(self.ws_url,
|
|
on_open=self.on_open,
|
|
on_message=self.on_message,
|
|
on_error=self.on_error,
|
|
on_close=self.on_close)
|
|
|
|
# 在单独线程中运行WebSocket
|
|
ws_thread = threading.Thread(target=self.ws.run_forever, daemon=True)
|
|
ws_thread.start()
|
|
|
|
# 等待连接建立
|
|
time.sleep(2)
|
|
|
|
if self.connected:
|
|
print("✅ WebSocket连接测试成功")
|
|
# 保持连接10秒以接收消息
|
|
print("⏳ 保持连接10秒以接收消息...")
|
|
time.sleep(10)
|
|
self.ws.close()
|
|
else:
|
|
print("❌ WebSocket连接测试失败")
|
|
|
|
except Exception as e:
|
|
print(f"❌ WebSocket连接测试异常: {e}")
|
|
|
|
def get_final_stats(self):
|
|
"""获取最终统计信息"""
|
|
if self.start_time:
|
|
elapsed = time.time() - self.start_time
|
|
return {
|
|
'messages_received': self.messages_received,
|
|
'elapsed_time': elapsed,
|
|
'rate': self.messages_received / elapsed if elapsed > 0 else 0
|
|
}
|
|
return None
|
|
|
|
def run_all_tests(self):
|
|
"""运行所有测试"""
|
|
print("🧪 开始ADXP WebSocket系统集成测试")
|
|
print("=" * 50)
|
|
|
|
# 测试健康检查端点
|
|
self.test_health_endpoints()
|
|
|
|
# 测试WebSocket连接
|
|
self.test_websocket_connection()
|
|
|
|
# 显示最终统计
|
|
stats = self.get_final_stats()
|
|
if stats:
|
|
print(f"\n📊 最终统计:")
|
|
print(f" 接收消息总数: {stats['messages_received']}")
|
|
print(f" 运行时间: {stats['elapsed_time']:.2f} 秒")
|
|
print(f" 平均接收速率: {stats['rate']:.2f} 条消息/秒")
|
|
|
|
print("\n✅ 系统集成测试完成")
|
|
|
|
def main():
|
|
"""主函数"""
|
|
tester = AdxpSystemTest()
|
|
try:
|
|
tester.run_all_tests()
|
|
except KeyboardInterrupt:
|
|
print("\n⚠️ 用户中断")
|
|
except Exception as e:
|
|
print(f"❌ 测试过程中发生错误: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|