70 lines
1.9 KiB
Python
70 lines
1.9 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
ADXP WebSocket客户端测试脚本
|
|
用于测试与adxp-adapter的WebSocket连接
|
|
"""
|
|
|
|
import websocket
|
|
import json
|
|
import threading
|
|
import time
|
|
|
|
def on_message(ws, message):
|
|
"""处理接收到的消息"""
|
|
print(f"📥 收到消息: {message}")
|
|
try:
|
|
# 尝试解析JSON消息
|
|
data = json.loads(message)
|
|
if isinstance(data, list):
|
|
print(f" 解析到 {len(data)} 条航班消息")
|
|
else:
|
|
print(f" 消息内容: {data}")
|
|
except json.JSONDecodeError:
|
|
print(f" 非JSON消息: {message}")
|
|
|
|
def on_error(ws, error):
|
|
"""处理错误"""
|
|
print(f"❌ WebSocket错误: {error}")
|
|
|
|
def on_close(ws, close_status_code, close_msg):
|
|
"""处理连接关闭"""
|
|
print(f"🔒 连接已关闭: 状态码={close_status_code}, 消息={close_msg}")
|
|
|
|
def on_open(ws):
|
|
"""处理连接打开"""
|
|
print("✅ WebSocket连接已建立")
|
|
|
|
# 启动心跳线程
|
|
def run(*args):
|
|
while True:
|
|
time.sleep(30) # 每30秒发送一次心跳
|
|
if ws.sock.connected:
|
|
ws.send("ping")
|
|
print("💓 发送心跳")
|
|
else:
|
|
break
|
|
threading.Thread(target=run, daemon=True).start()
|
|
|
|
def main():
|
|
"""主函数"""
|
|
# WebSocket URL
|
|
ws_url = "ws://localhost:8086/ws/flight-notifications"
|
|
|
|
print(f"🚀 正在连接到ADXP适配器WebSocket服务: {ws_url}")
|
|
|
|
# 启用调试日志
|
|
# websocket.enableTrace(True)
|
|
|
|
# 创建WebSocket连接
|
|
ws = websocket.WebSocketApp(ws_url,
|
|
on_open=on_open,
|
|
on_message=on_message,
|
|
on_error=on_error,
|
|
on_close=on_close)
|
|
|
|
# 启动连接(阻塞)
|
|
ws.run_forever()
|
|
|
|
if __name__ == "__main__":
|
|
main() |