#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ ADXP WebSocket客户端测试脚本 用于测试与adxp-adapter的WebSocket连接 """ import websocket import json import threading import time def on_message(ws, message): """处理接收到的消息""" print(f"📥 收到消息: {message}") try: # 尝试解析JSON消息 data = json.loads(message) if isinstance(data, list): print(f" 解析到 {len(data)} 条航班消息") else: print(f" 消息内容: {data}") except json.JSONDecodeError: print(f" 非JSON消息: {message}") def on_error(ws, error): """处理错误""" print(f"❌ WebSocket错误: {error}") def on_close(ws, close_status_code, close_msg): """处理连接关闭""" print(f"🔒 连接已关闭: 状态码={close_status_code}, 消息={close_msg}") def on_open(ws): """处理连接打开""" print("✅ WebSocket连接已建立") # 启动心跳线程 def run(*args): while True: time.sleep(30) # 每30秒发送一次心跳 if ws.sock.connected: ws.send("ping") print("💓 发送心跳") else: break threading.Thread(target=run, daemon=True).start() def main(): """主函数""" # WebSocket URL ws_url = "ws://localhost:8086/ws/flight-notifications" print(f"🚀 正在连接到ADXP适配器WebSocket服务: {ws_url}") # 启用调试日志 # websocket.enableTrace(True) # 创建WebSocket连接 ws = websocket.WebSocketApp(ws_url, on_open=on_open, on_message=on_message, on_error=on_error, on_close=on_close) # 启动连接(阻塞) ws.run_forever() if __name__ == "__main__": main()