133 lines
4.3 KiB
Python
133 lines
4.3 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
ADXP WebSocket集成测试脚本
|
|
用于测试整个ADXP WebSocket消息传输流程
|
|
"""
|
|
|
|
import websocket
|
|
import json
|
|
import time
|
|
import threading
|
|
|
|
class AdxpWebSocketTestClient:
|
|
def __init__(self, url):
|
|
self.url = url
|
|
self.ws = None
|
|
self.connected = False
|
|
self.messages_received = 0
|
|
self.start_time = None
|
|
|
|
def on_message(self, ws, message):
|
|
"""处理接收到的消息"""
|
|
self.messages_received += 1
|
|
print(f"📥 [{self.messages_received}] 收到消息: {message[:100]}...")
|
|
|
|
try:
|
|
# 尝试解析JSON消息
|
|
data = json.loads(message)
|
|
if isinstance(data, list):
|
|
print(f" 解析到 {len(data)} 条航班消息")
|
|
for i, msg in enumerate(data):
|
|
print(f" [{i+1}] 服务代码: {msg.get('serviceCode', 'N/A')}")
|
|
else:
|
|
print(f" 消息内容: {data}")
|
|
except json.JSONDecodeError:
|
|
print(f" 非JSON消息: {message}")
|
|
|
|
# 显示性能统计
|
|
if self.start_time:
|
|
elapsed = time.time() - self.start_time
|
|
if elapsed > 0:
|
|
rate = self.messages_received / elapsed
|
|
print(f" 📊 接收速率: {rate:.2f} 条消息/秒")
|
|
|
|
def on_error(self, ws, error):
|
|
"""处理错误"""
|
|
print(f"❌ WebSocket错误: {error}")
|
|
self.connected = False
|
|
|
|
def on_close(self, ws, close_status_code, close_msg):
|
|
"""处理连接关闭"""
|
|
print(f"🔒 连接已关闭: 状态码={close_status_code}, 消息={close_msg}")
|
|
self.connected = False
|
|
|
|
def on_open(self, ws):
|
|
"""处理连接打开"""
|
|
self.connected = True
|
|
self.start_time = time.time()
|
|
print("✅ WebSocket连接已建立")
|
|
print(f"🔗 连接地址: {self.url}")
|
|
|
|
# 启动心跳线程
|
|
def heartbeat():
|
|
while self.connected:
|
|
time.sleep(30) # 每30秒发送一次心跳
|
|
if self.connected and self.ws and self.ws.sock.connected:
|
|
self.ws.send("ping")
|
|
print("💓 发送心跳")
|
|
else:
|
|
break
|
|
|
|
threading.Thread(target=heartbeat, daemon=True).start()
|
|
|
|
def connect(self):
|
|
"""连接到WebSocket服务器"""
|
|
print(f"🚀 正在连接到ADXP适配器WebSocket服务: {self.url}")
|
|
|
|
# 创建WebSocket连接
|
|
self.ws = websocket.WebSocketApp(self.url,
|
|
on_open=self.on_open,
|
|
on_message=self.on_message,
|
|
on_error=self.on_error,
|
|
on_close=self.on_close)
|
|
|
|
# 启动连接(阻塞)
|
|
self.ws.run_forever()
|
|
|
|
def disconnect(self):
|
|
"""断开连接"""
|
|
if self.ws:
|
|
self.ws.close()
|
|
print("🚫 主动断开连接")
|
|
self.connected = False
|
|
|
|
def get_stats(self):
|
|
"""获取统计信息"""
|
|
if self.start_time:
|
|
elapsed = time.time() - self.start_time
|
|
return {
|
|
'messages_received': self.messages_received,
|
|
'elapsed_time': elapsed,
|
|
'rate': self.messages_received / elapsed if elapsed > 0 else 0
|
|
}
|
|
return None
|
|
|
|
def main():
|
|
"""主函数"""
|
|
# WebSocket URL
|
|
ws_url = "ws://localhost:8086/ws/flight-notifications"
|
|
|
|
# 创建测试客户端
|
|
client = AdxpWebSocketTestClient(ws_url)
|
|
|
|
try:
|
|
# 连接并开始监听
|
|
client.connect()
|
|
except KeyboardInterrupt:
|
|
print("\n⚠️ 用户中断")
|
|
client.disconnect()
|
|
|
|
# 显示最终统计
|
|
stats = client.get_stats()
|
|
if stats:
|
|
print(f"\n📊 最终统计:")
|
|
print(f" 接收消息总数: {stats['messages_received']}")
|
|
print(f" 运行时间: {stats['elapsed_time']:.2f} 秒")
|
|
print(f" 平均接收速率: {stats['rate']:.2f} 条消息/秒")
|
|
except Exception as e:
|
|
print(f"❌ 测试过程中发生错误: {e}")
|
|
client.disconnect()
|
|
|
|
if __name__ == "__main__":
|
|
main() |