QDAirPortBackend0122/tools/test_adxp_websocket_integration.py
2026-01-22 13:19:47 +08:00

133 lines
4.3 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
ADXP WebSocket集成测试脚本
用于测试整个ADXP WebSocket消息传输流程
"""
import websocket
import json
import time
import threading
class AdxpWebSocketTestClient:
def __init__(self, url):
self.url = url
self.ws = None
self.connected = False
self.messages_received = 0
self.start_time = None
def on_message(self, ws, message):
"""处理接收到的消息"""
self.messages_received += 1
print(f"📥 [{self.messages_received}] 收到消息: {message[:100]}...")
try:
# 尝试解析JSON消息
data = json.loads(message)
if isinstance(data, list):
print(f" 解析到 {len(data)} 条航班消息")
for i, msg in enumerate(data):
print(f" [{i+1}] 服务代码: {msg.get('serviceCode', 'N/A')}")
else:
print(f" 消息内容: {data}")
except json.JSONDecodeError:
print(f" 非JSON消息: {message}")
# 显示性能统计
if self.start_time:
elapsed = time.time() - self.start_time
if elapsed > 0:
rate = self.messages_received / elapsed
print(f" 📊 接收速率: {rate:.2f} 条消息/秒")
def on_error(self, ws, error):
"""处理错误"""
print(f"❌ WebSocket错误: {error}")
self.connected = False
def on_close(self, ws, close_status_code, close_msg):
"""处理连接关闭"""
print(f"🔒 连接已关闭: 状态码={close_status_code}, 消息={close_msg}")
self.connected = False
def on_open(self, ws):
"""处理连接打开"""
self.connected = True
self.start_time = time.time()
print("✅ WebSocket连接已建立")
print(f"🔗 连接地址: {self.url}")
# 启动心跳线程
def heartbeat():
while self.connected:
time.sleep(30) # 每30秒发送一次心跳
if self.connected and self.ws and self.ws.sock.connected:
self.ws.send("ping")
print("💓 发送心跳")
else:
break
threading.Thread(target=heartbeat, daemon=True).start()
def connect(self):
"""连接到WebSocket服务器"""
print(f"🚀 正在连接到ADXP适配器WebSocket服务: {self.url}")
# 创建WebSocket连接
self.ws = websocket.WebSocketApp(self.url,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close)
# 启动连接(阻塞)
self.ws.run_forever()
def disconnect(self):
"""断开连接"""
if self.ws:
self.ws.close()
print("🚫 主动断开连接")
self.connected = False
def get_stats(self):
"""获取统计信息"""
if self.start_time:
elapsed = time.time() - self.start_time
return {
'messages_received': self.messages_received,
'elapsed_time': elapsed,
'rate': self.messages_received / elapsed if elapsed > 0 else 0
}
return None
def main():
"""主函数"""
# WebSocket URL
ws_url = "ws://localhost:8086/ws/flight-notifications"
# 创建测试客户端
client = AdxpWebSocketTestClient(ws_url)
try:
# 连接并开始监听
client.connect()
except KeyboardInterrupt:
print("\n⚠️ 用户中断")
client.disconnect()
# 显示最终统计
stats = client.get_stats()
if stats:
print(f"\n📊 最终统计:")
print(f" 接收消息总数: {stats['messages_received']}")
print(f" 运行时间: {stats['elapsed_time']:.2f}")
print(f" 平均接收速率: {stats['rate']:.2f} 条消息/秒")
except Exception as e:
print(f"❌ 测试过程中发生错误: {e}")
client.disconnect()
if __name__ == "__main__":
main()