#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ ADXP WebSocket系统集成测试脚本 用于测试整个ADXP WebSocket消息传输流程 """ import websocket import json import time import threading import requests class AdxpSystemTest: def __init__(self): self.adxp_adapter_url = "http://localhost:8086" self.qaup_system_url = "http://localhost:8080" self.ws_url = "ws://localhost:8086/ws/flight-notifications" self.ws = None self.connected = False self.messages_received = 0 self.start_time = None def test_health_endpoints(self): """测试健康检查端点""" print("🔍 测试健康检查端点...") try: # 测试adxp-adapter健康检查 response = requests.get(f"{self.adxp_adapter_url}/actuator/health") if response.status_code == 200: print("✅ adxp-adapter健康检查: OK") print(f" 状态: {response.json()}") else: print(f"❌ adxp-adapter健康检查失败: {response.status_code}") except Exception as e: print(f"❌ adxp-adapter健康检查异常: {e}") try: # 测试QAUP系统健康检查 response = requests.get(f"{self.qaup_system_url}/actuator/health") if response.status_code == 200: print("✅ QAUP系统健康检查: OK") print(f" 状态: {response.json()}") else: print(f"❌ QAUP系统健康检查失败: {response.status_code}") except Exception as e: print(f"❌ QAUP系统健康检查异常: {e}") def on_message(self, ws, message): """处理接收到的消息""" self.messages_received += 1 print(f"📥 [{self.messages_received}] 收到WebSocket消息") try: # 尝试解析JSON消息 data = json.loads(message) if isinstance(data, list): print(f" 解析到 {len(data)} 条航班消息") for i, msg in enumerate(data): service_code = msg.get('serviceCode', 'N/A') print(f" [{i+1}] 服务代码: {service_code}") if service_code in ['ADXP_NAOMS_O_DYN_ARR', 'ADXP_NAOMS_O_CDM_AXOT']: print(f" 航班号: {self.extract_flight_number(msg.get('content', ''))}") else: print(f" 消息内容: {data}") except json.JSONDecodeError: print(f" 非JSON消息: {message}") # 显示性能统计 if self.start_time: elapsed = time.time() - self.start_time if elapsed > 0: rate = self.messages_received / elapsed print(f" 📊 接收速率: {rate:.2f} 条消息/秒") def extract_flight_number(self, xml_content): """从XML内容中提取航班号""" try: import xml.etree.ElementTree as ET root = ET.fromstring(xml_content) flight_number = root.find('.//FlightNumber') return flight_number.text if flight_number is not None else 'N/A' except Exception: return 'N/A' def on_error(self, ws, error): """处理错误""" print(f"❌ WebSocket错误: {error}") self.connected = False def on_close(self, ws, close_status_code, close_msg): """处理连接关闭""" print(f"🔒 WebSocket连接已关闭: 状态码={close_status_code}, 消息={close_msg}") self.connected = False def on_open(self, ws): """处理连接打开""" self.connected = True self.start_time = time.time() print("✅ WebSocket连接已建立") print(f"🔗 连接地址: {self.ws_url}") # 启动心跳线程 def heartbeat(): while self.connected: time.sleep(30) # 每30秒发送一次心跳 if self.connected and self.ws and self.ws.sock.connected: self.ws.send("ping") print("💓 发送心跳") else: break threading.Thread(target=heartbeat, daemon=True).start() def test_websocket_connection(self): """测试WebSocket连接""" print("\n🚀 测试WebSocket连接...") try: # 创建WebSocket连接 self.ws = websocket.WebSocketApp(self.ws_url, on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close) # 在单独线程中运行WebSocket ws_thread = threading.Thread(target=self.ws.run_forever, daemon=True) ws_thread.start() # 等待连接建立 time.sleep(2) if self.connected: print("✅ WebSocket连接测试成功") # 保持连接10秒以接收消息 print("⏳ 保持连接10秒以接收消息...") time.sleep(10) self.ws.close() else: print("❌ WebSocket连接测试失败") except Exception as e: print(f"❌ WebSocket连接测试异常: {e}") def get_final_stats(self): """获取最终统计信息""" if self.start_time: elapsed = time.time() - self.start_time return { 'messages_received': self.messages_received, 'elapsed_time': elapsed, 'rate': self.messages_received / elapsed if elapsed > 0 else 0 } return None def run_all_tests(self): """运行所有测试""" print("🧪 开始ADXP WebSocket系统集成测试") print("=" * 50) # 测试健康检查端点 self.test_health_endpoints() # 测试WebSocket连接 self.test_websocket_connection() # 显示最终统计 stats = self.get_final_stats() if stats: print(f"\n📊 最终统计:") print(f" 接收消息总数: {stats['messages_received']}") print(f" 运行时间: {stats['elapsed_time']:.2f} 秒") print(f" 平均接收速率: {stats['rate']:.2f} 条消息/秒") print("\n✅ 系统集成测试完成") def main(): """主函数""" tester = AdxpSystemTest() try: tester.run_all_tests() except KeyboardInterrupt: print("\n⚠️ 用户中断") except Exception as e: print(f"❌ 测试过程中发生错误: {e}") if __name__ == "__main__": main()