QAUP_Management/tools/generate_flight_messages.py

201 lines
6.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
ADXP WebSocket消息生成器
用于模拟ADXP适配器向WebSocket客户端发送航班消息
"""
import json
import time
import random
from datetime import datetime
try:
# 尝试导入websocket-client库
from websocket import WebSocketApp
print("✅ 成功导入websocket库")
except ImportError:
try:
# 备用导入方式
import websocket
from websocket import WebSocketApp
print("✅ 通过备用方式导入websocket库")
except ImportError:
print("❌ 未找到websocket库请安装: pip install websocket-client")
exit(1)
class AdxpFlightMessageGenerator:
def __init__(self, ws_url):
self.ws_url = ws_url
self.ws = None
self.connected = False
# 航班号前缀列表
self.flight_prefixes = ['CA', 'MU', 'CZ', 'HU', 'FM', 'HO', 'JD', 'SC']
# 跑道和机位列表
self.runways = ['01', '02', '03', '04', '05', '06', '07', '08']
self.gates = ['A1', 'A2', 'A3', 'A4', 'B1', 'B2', 'B3', 'B4', 'C1', 'C2', 'C3', 'C4']
def generate_random_flight_number(self):
"""生成随机航班号"""
prefix = random.choice(self.flight_prefixes)
number = random.randint(1000, 9999)
return f"{prefix}{number}"
def generate_arrival_message(self):
"""生成进港航班消息"""
flight_no = self.generate_random_flight_number()
service_code = "ADXP_NAOMS_O_DYN_ARR"
# 生成XML内容
xml_content = f"""<?xml version="1.0" encoding="UTF-8"?>
<FlightArrival>
<FlightNumber>{flight_no}</FlightNumber>
<EstimatedArrival>{datetime.now().strftime('%Y%m%d%H%M%S')}</EstimatedArrival>
<Runway>{random.choice(self.runways)}</Runway>
<Gate>{random.choice(self.gates)}</Gate>
</FlightArrival>"""
return {
"serviceCode": service_code,
"content": xml_content
}
def generate_pushback_message(self):
"""生成出港航班消息"""
flight_no = self.generate_random_flight_number()
service_code = "ADXP_NAOMS_O_CDM_AXOT"
# 生成XML内容
xml_content = f"""<?xml version="1.0" encoding="UTF-8"?>
<FlightPushback>
<FlightNumber>{flight_no}</FlightNumber>
<ActualPushback>{datetime.now().strftime('%Y%m%d%H%M%S')}</ActualPushback>
<Runway>{random.choice(self.runways)}</Runway>
<Gate>{random.choice(self.gates)}</Gate>
</FlightPushback>"""
return {
"serviceCode": service_code,
"content": xml_content
}
def generate_test_messages(self):
"""生成测试消息列表"""
messages = []
# 生成几条进港消息
for _ in range(2):
messages.append(self.generate_arrival_message())
# 生成几条出港消息
for _ in range(2):
messages.append(self.generate_pushback_message())
return messages
def on_message(self, ws, message):
"""处理接收到的消息"""
print(f"📥 收到消息: {message}")
# 如果收到ping回复pong
if message == "ping":
try:
ws.send("pong")
print("📤 发送心跳响应: pong")
except Exception as e:
print(f"❌ 发送心跳响应失败: {e}")
def on_error(self, ws, error):
"""处理错误"""
print(f"❌ WebSocket错误: {error}")
self.connected = False
def on_close(self, ws, close_status_code, close_msg):
"""处理连接关闭"""
print(f"🔒 连接已关闭: 状态码={close_status_code}, 消息={close_msg}")
self.connected = False
def on_open(self, ws):
"""处理连接打开"""
self.connected = True
print("✅ WebSocket连接已建立")
print(f"🔗 连接地址: {self.ws_url}")
# 启动消息发送循环
self.send_test_messages()
def send_test_messages(self):
"""发送测试消息"""
if not self.connected:
print("❌ WebSocket未连接无法发送消息")
return
try:
print("🚀 开始发送测试消息...")
# 发送几轮测试消息
for i in range(3):
print(f"\n--- 第 {i+1} 轮消息 ---")
# 生成并发送消息
messages = self.generate_test_messages()
json_message = json.dumps(messages, ensure_ascii=False)
print(f"📤 发送 {len(messages)} 条消息")
self.ws.send(json_message)
# 等待一段时间再发送下一轮
time.sleep(5)
print("\n✅ 所有测试消息发送完成")
except Exception as e:
print(f"❌ 发送测试消息时发生错误: {e}")
finally:
self.disconnect()
def connect(self):
"""连接到WebSocket服务器"""
print(f"🚀 正在连接到WebSocket服务: {self.ws_url}")
# 创建WebSocket连接
self.ws = WebSocketApp(self.ws_url,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close)
# 启动连接(阻塞)
self.ws.run_forever()
def disconnect(self):
"""断开连接"""
if self.ws:
self.ws.close()
print("🚫 主动断开连接")
self.connected = False
def main():
"""主函数"""
# WebSocket URL (模拟ADXP适配器的WebSocket端点)
ws_url = "ws://localhost:8086/ws/flight-notifications"
# 创建消息生成器
generator = AdxpFlightMessageGenerator(ws_url)
try:
# 连接并开始发送消息
generator.connect()
except KeyboardInterrupt:
print("\n⚠️ 用户中断")
generator.disconnect()
except Exception as e:
print(f"❌ 测试过程中发生错误: {e}")
import traceback
traceback.print_exc()
generator.disconnect()
if __name__ == "__main__":
main()