QDAirPortBackend0122/tools/test_adxp_websocket.py
2026-01-22 13:19:47 +08:00

70 lines
1.9 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
ADXP WebSocket客户端测试脚本
用于测试与adxp-adapter的WebSocket连接
"""
import websocket
import json
import threading
import time
def on_message(ws, message):
"""处理接收到的消息"""
print(f"📥 收到消息: {message}")
try:
# 尝试解析JSON消息
data = json.loads(message)
if isinstance(data, list):
print(f" 解析到 {len(data)} 条航班消息")
else:
print(f" 消息内容: {data}")
except json.JSONDecodeError:
print(f" 非JSON消息: {message}")
def on_error(ws, error):
"""处理错误"""
print(f"❌ WebSocket错误: {error}")
def on_close(ws, close_status_code, close_msg):
"""处理连接关闭"""
print(f"🔒 连接已关闭: 状态码={close_status_code}, 消息={close_msg}")
def on_open(ws):
"""处理连接打开"""
print("✅ WebSocket连接已建立")
# 启动心跳线程
def run(*args):
while True:
time.sleep(30) # 每30秒发送一次心跳
if ws.sock.connected:
ws.send("ping")
print("💓 发送心跳")
else:
break
threading.Thread(target=run, daemon=True).start()
def main():
"""主函数"""
# WebSocket URL
ws_url = "ws://localhost:8086/ws/flight-notifications"
print(f"🚀 正在连接到ADXP适配器WebSocket服务: {ws_url}")
# 启用调试日志
# websocket.enableTrace(True)
# 创建WebSocket连接
ws = websocket.WebSocketApp(ws_url,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close)
# 启动连接(阻塞)
ws.run_forever()
if __name__ == "__main__":
main()