#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
ADXP WebSocket消息生成器
用于模拟ADXP适配器向WebSocket客户端发送航班消息
"""
import json
import time
import random
from datetime import datetime
try:
# 尝试导入websocket-client库
from websocket import WebSocketApp
print("✅ 成功导入websocket库")
except ImportError:
try:
# 备用导入方式
import websocket
from websocket import WebSocketApp
print("✅ 通过备用方式导入websocket库")
except ImportError:
print("❌ 未找到websocket库,请安装: pip install websocket-client")
exit(1)
class AdxpFlightMessageGenerator:
def __init__(self, ws_url):
self.ws_url = ws_url
self.ws = None
self.connected = False
# 航班号前缀列表
self.flight_prefixes = ['CA', 'MU', 'CZ', 'HU', 'FM', 'HO', 'JD', 'SC']
# 跑道和机位列表
self.runways = ['01', '02', '03', '04', '05', '06', '07', '08']
self.gates = ['A1', 'A2', 'A3', 'A4', 'B1', 'B2', 'B3', 'B4', 'C1', 'C2', 'C3', 'C4']
def generate_random_flight_number(self):
"""生成随机航班号"""
prefix = random.choice(self.flight_prefixes)
number = random.randint(1000, 9999)
return f"{prefix}{number}"
def generate_arrival_message(self):
"""生成进港航班消息"""
flight_no = self.generate_random_flight_number()
service_code = "ADXP_NAOMS_O_DYN_ARR"
# 生成XML内容
xml_content = f"""
{flight_no}
{datetime.now().strftime('%Y%m%d%H%M%S')}
{random.choice(self.runways)}
{random.choice(self.gates)}
"""
return {
"serviceCode": service_code,
"content": xml_content
}
def generate_pushback_message(self):
"""生成出港航班消息"""
flight_no = self.generate_random_flight_number()
service_code = "ADXP_NAOMS_O_CDM_AXOT"
# 生成XML内容
xml_content = f"""
{flight_no}
{datetime.now().strftime('%Y%m%d%H%M%S')}
{random.choice(self.runways)}
{random.choice(self.gates)}
"""
return {
"serviceCode": service_code,
"content": xml_content
}
def generate_test_messages(self):
"""生成测试消息列表"""
messages = []
# 生成几条进港消息
for _ in range(2):
messages.append(self.generate_arrival_message())
# 生成几条出港消息
for _ in range(2):
messages.append(self.generate_pushback_message())
return messages
def on_message(self, ws, message):
"""处理接收到的消息"""
print(f"📥 收到消息: {message}")
# 如果收到ping,回复pong
if message == "ping":
try:
ws.send("pong")
print("📤 发送心跳响应: pong")
except Exception as e:
print(f"❌ 发送心跳响应失败: {e}")
def on_error(self, ws, error):
"""处理错误"""
print(f"❌ WebSocket错误: {error}")
self.connected = False
def on_close(self, ws, close_status_code, close_msg):
"""处理连接关闭"""
print(f"🔒 连接已关闭: 状态码={close_status_code}, 消息={close_msg}")
self.connected = False
def on_open(self, ws):
"""处理连接打开"""
self.connected = True
print("✅ WebSocket连接已建立")
print(f"🔗 连接地址: {self.ws_url}")
# 启动消息发送循环
self.send_test_messages()
def send_test_messages(self):
"""发送测试消息"""
if not self.connected:
print("❌ WebSocket未连接,无法发送消息")
return
try:
print("🚀 开始发送测试消息...")
# 发送几轮测试消息
for i in range(3):
print(f"\n--- 第 {i+1} 轮消息 ---")
# 生成并发送消息
messages = self.generate_test_messages()
json_message = json.dumps(messages, ensure_ascii=False)
print(f"📤 发送 {len(messages)} 条消息")
self.ws.send(json_message)
# 等待一段时间再发送下一轮
time.sleep(5)
print("\n✅ 所有测试消息发送完成")
except Exception as e:
print(f"❌ 发送测试消息时发生错误: {e}")
finally:
self.disconnect()
def connect(self):
"""连接到WebSocket服务器"""
print(f"🚀 正在连接到WebSocket服务: {self.ws_url}")
# 创建WebSocket连接
self.ws = WebSocketApp(self.ws_url,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close)
# 启动连接(阻塞)
self.ws.run_forever()
def disconnect(self):
"""断开连接"""
if self.ws:
self.ws.close()
print("🚫 主动断开连接")
self.connected = False
def main():
"""主函数"""
# WebSocket URL (模拟ADXP适配器的WebSocket端点)
ws_url = "ws://localhost:8086/ws/flight-notifications"
# 创建消息生成器
generator = AdxpFlightMessageGenerator(ws_url)
try:
# 连接并开始发送消息
generator.connect()
except KeyboardInterrupt:
print("\n⚠️ 用户中断")
generator.disconnect()
except Exception as e:
print(f"❌ 测试过程中发生错误: {e}")
import traceback
traceback.print_exc()
generator.disconnect()
if __name__ == "__main__":
main()