#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ ADXP WebSocket集成测试脚本 用于测试整个ADXP WebSocket消息传输流程 """ import websocket import json import time import threading class AdxpWebSocketTestClient: def __init__(self, url): self.url = url self.ws = None self.connected = False self.messages_received = 0 self.start_time = None def on_message(self, ws, message): """处理接收到的消息""" self.messages_received += 1 print(f"📥 [{self.messages_received}] 收到消息: {message[:100]}...") try: # 尝试解析JSON消息 data = json.loads(message) if isinstance(data, list): print(f" 解析到 {len(data)} 条航班消息") for i, msg in enumerate(data): print(f" [{i+1}] 服务代码: {msg.get('serviceCode', 'N/A')}") else: print(f" 消息内容: {data}") except json.JSONDecodeError: print(f" 非JSON消息: {message}") # 显示性能统计 if self.start_time: elapsed = time.time() - self.start_time if elapsed > 0: rate = self.messages_received / elapsed print(f" 📊 接收速率: {rate:.2f} 条消息/秒") def on_error(self, ws, error): """处理错误""" print(f"❌ WebSocket错误: {error}") self.connected = False def on_close(self, ws, close_status_code, close_msg): """处理连接关闭""" print(f"🔒 连接已关闭: 状态码={close_status_code}, 消息={close_msg}") self.connected = False def on_open(self, ws): """处理连接打开""" self.connected = True self.start_time = time.time() print("✅ WebSocket连接已建立") print(f"🔗 连接地址: {self.url}") # 启动心跳线程 def heartbeat(): while self.connected: time.sleep(30) # 每30秒发送一次心跳 if self.connected and self.ws and self.ws.sock.connected: self.ws.send("ping") print("💓 发送心跳") else: break threading.Thread(target=heartbeat, daemon=True).start() def connect(self): """连接到WebSocket服务器""" print(f"🚀 正在连接到ADXP适配器WebSocket服务: {self.url}") # 创建WebSocket连接 self.ws = websocket.WebSocketApp(self.url, on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close) # 启动连接(阻塞) self.ws.run_forever() def disconnect(self): """断开连接""" if self.ws: self.ws.close() print("🚫 主动断开连接") self.connected = False def get_stats(self): """获取统计信息""" if self.start_time: elapsed = time.time() - self.start_time return { 'messages_received': self.messages_received, 'elapsed_time': elapsed, 'rate': self.messages_received / elapsed if elapsed > 0 else 0 } return None def main(): """主函数""" # WebSocket URL ws_url = "ws://localhost:8086/ws/flight-notifications" # 创建测试客户端 client = AdxpWebSocketTestClient(ws_url) try: # 连接并开始监听 client.connect() except KeyboardInterrupt: print("\n⚠️ 用户中断") client.disconnect() # 显示最终统计 stats = client.get_stats() if stats: print(f"\n📊 最终统计:") print(f" 接收消息总数: {stats['messages_received']}") print(f" 运行时间: {stats['elapsed_time']:.2f} 秒") print(f" 平均接收速率: {stats['rate']:.2f} 条消息/秒") except Exception as e: print(f"❌ 测试过程中发生错误: {e}") client.disconnect() if __name__ == "__main__": main()