#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ ADXP WebSocket消息生成器 用于模拟ADXP适配器向WebSocket客户端发送航班消息 """ import json import time import random from datetime import datetime try: # 尝试导入websocket-client库 from websocket import WebSocketApp print("✅ 成功导入websocket库") except ImportError: try: # 备用导入方式 import websocket from websocket import WebSocketApp print("✅ 通过备用方式导入websocket库") except ImportError: print("❌ 未找到websocket库,请安装: pip install websocket-client") exit(1) class AdxpFlightMessageGenerator: def __init__(self, ws_url): self.ws_url = ws_url self.ws = None self.connected = False # 航班号前缀列表 self.flight_prefixes = ['CA', 'MU', 'CZ', 'HU', 'FM', 'HO', 'JD', 'SC'] # 跑道和机位列表 self.runways = ['01', '02', '03', '04', '05', '06', '07', '08'] self.gates = ['A1', 'A2', 'A3', 'A4', 'B1', 'B2', 'B3', 'B4', 'C1', 'C2', 'C3', 'C4'] def generate_random_flight_number(self): """生成随机航班号""" prefix = random.choice(self.flight_prefixes) number = random.randint(1000, 9999) return f"{prefix}{number}" def generate_arrival_message(self): """生成进港航班消息""" flight_no = self.generate_random_flight_number() service_code = "ADXP_NAOMS_O_DYN_ARR" # 生成XML内容 xml_content = f""" {flight_no} {datetime.now().strftime('%Y%m%d%H%M%S')} {random.choice(self.runways)} {random.choice(self.gates)} """ return { "serviceCode": service_code, "content": xml_content } def generate_pushback_message(self): """生成出港航班消息""" flight_no = self.generate_random_flight_number() service_code = "ADXP_NAOMS_O_CDM_AXOT" # 生成XML内容 xml_content = f""" {flight_no} {datetime.now().strftime('%Y%m%d%H%M%S')} {random.choice(self.runways)} {random.choice(self.gates)} """ return { "serviceCode": service_code, "content": xml_content } def generate_test_messages(self): """生成测试消息列表""" messages = [] # 生成几条进港消息 for _ in range(2): messages.append(self.generate_arrival_message()) # 生成几条出港消息 for _ in range(2): messages.append(self.generate_pushback_message()) return messages def on_message(self, ws, message): """处理接收到的消息""" print(f"📥 收到消息: {message}") # 如果收到ping,回复pong if message == "ping": try: ws.send("pong") print("📤 发送心跳响应: pong") except Exception as e: print(f"❌ 发送心跳响应失败: {e}") def on_error(self, ws, error): """处理错误""" print(f"❌ WebSocket错误: {error}") self.connected = False def on_close(self, ws, close_status_code, close_msg): """处理连接关闭""" print(f"🔒 连接已关闭: 状态码={close_status_code}, 消息={close_msg}") self.connected = False def on_open(self, ws): """处理连接打开""" self.connected = True print("✅ WebSocket连接已建立") print(f"🔗 连接地址: {self.ws_url}") # 启动消息发送循环 self.send_test_messages() def send_test_messages(self): """发送测试消息""" if not self.connected: print("❌ WebSocket未连接,无法发送消息") return try: print("🚀 开始发送测试消息...") # 发送几轮测试消息 for i in range(3): print(f"\n--- 第 {i+1} 轮消息 ---") # 生成并发送消息 messages = self.generate_test_messages() json_message = json.dumps(messages, ensure_ascii=False) print(f"📤 发送 {len(messages)} 条消息") self.ws.send(json_message) # 等待一段时间再发送下一轮 time.sleep(5) print("\n✅ 所有测试消息发送完成") except Exception as e: print(f"❌ 发送测试消息时发生错误: {e}") finally: self.disconnect() def connect(self): """连接到WebSocket服务器""" print(f"🚀 正在连接到WebSocket服务: {self.ws_url}") # 创建WebSocket连接 self.ws = WebSocketApp(self.ws_url, on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close) # 启动连接(阻塞) self.ws.run_forever() def disconnect(self): """断开连接""" if self.ws: self.ws.close() print("🚫 主动断开连接") self.connected = False def main(): """主函数""" # WebSocket URL (模拟ADXP适配器的WebSocket端点) ws_url = "ws://localhost:8086/ws/flight-notifications" # 创建消息生成器 generator = AdxpFlightMessageGenerator(ws_url) try: # 连接并开始发送消息 generator.connect() except KeyboardInterrupt: print("\n⚠️ 用户中断") generator.disconnect() except Exception as e: print(f"❌ 测试过程中发生错误: {e}") import traceback traceback.print_exc() generator.disconnect() if __name__ == "__main__": main()