diff --git a/README_WEBSOCKET.md b/README_WEBSOCKET.md new file mode 100644 index 00000000..6b2dae50 --- /dev/null +++ b/README_WEBSOCKET.md @@ -0,0 +1,139 @@ +# ADXP WebSocket 实时消息传输系统 + +## 系统架构 + +本系统实现了基于WebSocket的实时消息传输机制,用于替代原有的轮询方式,提高数据传输的实时性和系统效率。 + +### 组件构成 + +1. **adxp-adapter服务** + - 作为ADXP SDK与QAUP系统的桥梁 + - 提供WebSocket服务端点 + - 持续监听数据中台消息并通过WebSocket广播 + +2. **QAUP系统** + - 作为WebSocket客户端连接adxp-adapter + - 实时接收航班通知消息 + - 处理消息并触发后续业务逻辑 + +## 部署和启动 + +### 1. 启动adxp-adapter服务 + +```bash +cd /Users/tianjianyong/apps/Company/QAUP-Management/adxp-adapter +./start.sh +``` + +或者手动启动: + +```bash +cd /Users/tianjianyong/apps/Company/QAUP-Management/adxp-adapter +mvn spring-boot:run +``` + +### 2. 启动QAUP系统 + +```bash +cd /Users/tianjianyong/apps/Company/QAUP-Management/qaup-admin +mvn spring-boot:run +``` + +## WebSocket端点 + +- **服务端点**: `ws://localhost:8086/ws/flight-notifications` +- **健康检查**: `http://localhost:8086/actuator/health` + +## 配置说明 + +### adxp-adapter配置 (application.yml) + +```yaml +adxp: + host: localhost # 数据中台地址 + port: 7001 # 数据中台端口 + +server: + port: 8086 # 适配器服务端口 +``` + +### QAUP系统配置 (application-dev.yml) + +```yaml +data: + collector: + adxp-adapter: + host: localhost # adxp-adapter地址 + port: 8086 # adxp-adapter端口 + username: dianxin # 登录用户名 + password: dianxin@123 # 登录密码 + websocket: + enabled: true # 启用WebSocket连接 +``` + +## 测试工具 + +### 1. 系统集成测试 + +```bash +cd /Users/tianjianyong/apps/Company/QAUP-Management/tools +./test_adxp_websocket_system.py +``` + +### 2. WebSocket连接测试 + +```bash +cd /Users/tianjianyong/apps/Company/QAUP-Management/tools +./test_adxp_websocket_integration.py +``` + +### 3. 简单HTML测试页面 + +```bash +cd /Users/tianjianyong/apps/Company/QAUP-Management/tools +open test_adxp_websocket.html +``` + +## 日志监控 + +### adxp-adapter日志 + +- WebSocket连接建立/断开 +- 消息广播统计 +- 错误信息记录 + +### QAUP系统日志 + +- WebSocket客户端连接状态 +- 消息接收和处理 +- 错误和异常处理 + +## 性能优势 + +1. **实时性提升**: 消息从数据中台到QAUP系统的延迟从250ms降低到几乎实时 +2. **资源消耗降低**: 避免频繁的HTTP请求,减少网络和CPU开销 +3. **系统稳定性增强**: 避免HTTP超时和轮询堆积问题 +4. **扩展性改善**: 支持多客户端订阅同一数据流 + +## 故障排除 + +### 1. 连接失败 + +检查以下配置: +- adxp-adapter服务是否正常运行 +- 端口配置是否正确 +- 网络连接是否正常 + +### 2. 消息接收异常 + +检查以下方面: +- 数据中台连接是否正常 +- ADXP SDK是否正常工作 +- 消息解析逻辑是否正确 + +### 3. 性能问题 + +监控以下指标: +- WebSocket连接数 +- 消息处理速率 +- 内存和CPU使用情况 \ No newline at end of file diff --git a/adxp-adapter/pom.xml b/adxp-adapter/pom.xml index cae0b552..15035e7d 100644 --- a/adxp-adapter/pom.xml +++ b/adxp-adapter/pom.xml @@ -40,6 +40,12 @@ spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-websocket + + org.springframework.boot diff --git a/adxp-adapter/src/main/java/com/qaup/adxp/adapter/config/WebSocketConfig.java b/adxp-adapter/src/main/java/com/qaup/adxp/adapter/config/WebSocketConfig.java new file mode 100644 index 00000000..47d76b78 --- /dev/null +++ b/adxp-adapter/src/main/java/com/qaup/adxp/adapter/config/WebSocketConfig.java @@ -0,0 +1,30 @@ +package com.qaup.adxp.adapter.config; + +import com.qaup.adxp.adapter.service.AdxpSdkService; +import com.qaup.adxp.adapter.websocket.AdxpWebSocketHandler; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.config.annotation.EnableWebSocket; +import org.springframework.web.socket.config.annotation.WebSocketConfigurer; +import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; + +@Configuration +@EnableWebSocket +public class WebSocketConfig implements WebSocketConfigurer { + + @Autowired + private AdxpSdkService adxpSdkService; + + @Bean + public AdxpWebSocketHandler adxpWebSocketHandler() { + return new AdxpWebSocketHandler(adxpSdkService); + } + + @Override + public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { + // 注册WebSocket处理器 + registry.addHandler(adxpWebSocketHandler(), "/ws/flight-notifications") + .setAllowedOrigins("*"); + } +} \ No newline at end of file diff --git a/adxp-adapter/src/main/java/com/qaup/adxp/adapter/controller/AdxpController.java b/adxp-adapter/src/main/java/com/qaup/adxp/adapter/controller/AdxpController.java index 33a8cc07..bb1d77e8 100644 --- a/adxp-adapter/src/main/java/com/qaup/adxp/adapter/controller/AdxpController.java +++ b/adxp-adapter/src/main/java/com/qaup/adxp/adapter/controller/AdxpController.java @@ -2,6 +2,7 @@ package com.qaup.adxp.adapter.controller; import com.qaup.adxp.adapter.dto.*; import com.qaup.adxp.adapter.service.AdxpSdkService; +import com.qaup.adxp.adapter.websocket.AdxpWebSocketHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -20,6 +21,9 @@ public class AdxpController { @Autowired private AdxpSdkService adxpSdkService; + + @Autowired(required = false) + private AdxpWebSocketHandler adxpWebSocketHandler; /** * 登录接口 @@ -79,6 +83,9 @@ public class AdxpController { Map health = new HashMap(); health.put("status", "UP"); health.put("activeSessions", adxpSdkService.getActiveSessionCount()); + if (adxpWebSocketHandler != null) { + health.put("websocketConnections", adxpWebSocketHandler.getSessionCount()); + } return ResponseEntity.ok(health); } } diff --git a/adxp-adapter/src/main/java/com/qaup/adxp/adapter/service/AdxpSdkService.java b/adxp-adapter/src/main/java/com/qaup/adxp/adapter/service/AdxpSdkService.java index 7537e58e..da1c7b8a 100644 --- a/adxp-adapter/src/main/java/com/qaup/adxp/adapter/service/AdxpSdkService.java +++ b/adxp-adapter/src/main/java/com/qaup/adxp/adapter/service/AdxpSdkService.java @@ -30,7 +30,11 @@ public class AdxpSdkService { // Session 管理: sessionId -> SessionInfo private final Map sessions = new ConcurrentHashMap<>(); - private static class SessionInfo { + public Map getSessions() { + return sessions; + } + + public static class SessionInfo { ADXPClient client; String username; String password; @@ -40,6 +44,18 @@ public class AdxpSdkService { this.username = username; this.password = password; } + + public ADXPClient getClient() { + return client; + } + + public String getUsername() { + return username; + } + + public String getPassword() { + return password; + } } /** diff --git a/adxp-adapter/src/main/java/com/qaup/adxp/adapter/service/MessageListenerService.java b/adxp-adapter/src/main/java/com/qaup/adxp/adapter/service/MessageListenerService.java new file mode 100644 index 00000000..40d9729a --- /dev/null +++ b/adxp-adapter/src/main/java/com/qaup/adxp/adapter/service/MessageListenerService.java @@ -0,0 +1,119 @@ +package com.qaup.adxp.adapter.service; + +import com.qaup.adxp.adapter.websocket.AdxpWebSocketHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +@Service +public class MessageListenerService { + + private static final Logger log = LoggerFactory.getLogger(MessageListenerService.class); + + @Autowired + private AdxpSdkService adxpSdkService; + + @Autowired + private AdxpWebSocketHandler adxpWebSocketHandler; + + private ExecutorService executorService; + private final AtomicBoolean isRunning = new AtomicBoolean(false); + + @PostConstruct + public void start() { + if (isRunning.compareAndSet(false, true)) { + executorService = Executors.newSingleThreadExecutor(r -> { + Thread t = new Thread(r, "ADXP-Message-Listener"); + t.setDaemon(true); + return t; + }); + + executorService.submit(this::listenForMessages); + log.info("消息监听服务已启动"); + } + } + + @PreDestroy + public void stop() { + if (isRunning.compareAndSet(true, false)) { + if (executorService != null) { + executorService.shutdownNow(); + } + log.info("消息监听服务已停止"); + } + } + + private void listenForMessages() { + log.info("开始监听数据中台消息"); + + // 获取所有活跃的会话ID + while (isRunning.get()) { + try { + // 获取当前会话数量 + int sessionCount = adxpSdkService.getSessions().size(); + if (sessionCount == 0) { + log.debug("当前没有活跃的会话,等待连接..."); + Thread.sleep(1000); + continue; + } + + // 遍历所有会话并接收消息 + adxpSdkService.getSessions().forEach((sessionId, sessionInfo) -> { + try { + // 调用SDK接收消息(这可能会阻塞直到有消息到达) + java.util.List messages = + adxpSdkService.receiveMessages(sessionId); + + // 如果有消息,广播给所有WebSocket客户端 + if (messages != null && !messages.isEmpty()) { + adxpWebSocketHandler.broadcastMessages(messages); + log.info("接收到 {} 条航班消息并广播给 {} 个WebSocket客户端", + messages.size(), adxpWebSocketHandler.getSessionCount()); + } + } catch (Exception e) { + log.error("处理会话消息失败: sessionId={}", sessionId, e); + } + }); + + // 短暂休眠避免过于频繁的轮询 + Thread.sleep(100); + } catch (InterruptedException e) { + log.info("消息监听线程被中断"); + Thread.currentThread().interrupt(); + break; + } catch (Exception e) { + log.error("消息监听过程中发生错误", e); + + // 出错后短暂休眠再重试 + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + } + } + + log.info("消息监听服务已停止监听"); + } + + /** + * 获取服务统计信息 + */ + public String getStats() { + return String.format("MessageListenerService Stats: " + + "isRunning=%s, " + + "sessions=%d, " + + "webSocketClients=%d", + isRunning.get(), + adxpSdkService.getSessions().size(), + adxpWebSocketHandler.getSessionCount()); + } +} \ No newline at end of file diff --git a/adxp-adapter/src/main/java/com/qaup/adxp/adapter/websocket/AdxpWebSocketHandler.java b/adxp-adapter/src/main/java/com/qaup/adxp/adapter/websocket/AdxpWebSocketHandler.java new file mode 100644 index 00000000..871455e7 --- /dev/null +++ b/adxp-adapter/src/main/java/com/qaup/adxp/adapter/websocket/AdxpWebSocketHandler.java @@ -0,0 +1,138 @@ +package com.qaup.adxp.adapter.websocket; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.qaup.adxp.adapter.dto.FlightMessage; +import com.qaup.adxp.adapter.service.AdxpSdkService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.TextWebSocketHandler; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; + +public class AdxpWebSocketHandler extends TextWebSocketHandler { + + private static final Logger log = LoggerFactory.getLogger(AdxpWebSocketHandler.class); + + private final List sessions = new CopyOnWriteArrayList<>(); + private final AdxpSdkService adxpSdkService; + private final ObjectMapper objectMapper; + private final AtomicBoolean isRunning = new AtomicBoolean(false); + + public AdxpWebSocketHandler(AdxpSdkService adxpSdkService) { + this.adxpSdkService = adxpSdkService; + this.objectMapper = new ObjectMapper(); + } + + @Override + public void afterConnectionEstablished(WebSocketSession session) throws Exception { + sessions.add(session); + log.info("WebSocket连接已建立: sessionId={}, 当前连接数={}", session.getId(), sessions.size()); + } + + @Override + protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { + // 处理客户端发送的消息(如果需要) + log.debug("收到客户端消息: sessionId={}, message={}", session.getId(), message.getPayload()); + + // 可以处理一些控制命令,比如心跳 + if ("ping".equals(message.getPayload())) { + session.sendMessage(new TextMessage("pong")); + log.debug("发送心跳响应到客户端: sessionId={}", session.getId()); + } + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { + sessions.remove(session); + log.info("WebSocket连接已关闭: sessionId={}, reason={}, 当前连接数={}", + session.getId(), status.getReason(), sessions.size()); + } + + @Override + public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { + log.error("WebSocket传输错误: sessionId={}", session.getId(), exception); + sessions.remove(session); + } + + /** + * 向所有连接的客户端广播消息 + */ + public void broadcastMessages(List messages) { + if (messages == null || messages.isEmpty()) { + return; + } + + try { + String jsonMessage = objectMapper.writeValueAsString(messages); + TextMessage textMessage = new TextMessage(jsonMessage); + + int successCount = 0; + int failedCount = 0; + + for (WebSocketSession session : sessions) { + if (session.isOpen()) { + try { + session.sendMessage(textMessage); + successCount++; + } catch (Exception e) { + log.error("发送消息失败: sessionId={}", session.getId(), e); + // 如果发送失败,移除会话 + sessions.remove(session); + failedCount++; + } + } else { + sessions.remove(session); + failedCount++; + } + } + + log.debug("广播消息完成: 消息数量={}, 连接数={}, 成功发送={}, 失败={}", + messages.size(), sessions.size(), successCount, failedCount); + } catch (Exception e) { + log.error("序列化或广播消息失败", e); + } + } + + /** + * 获取当前连接数 + */ + public int getSessionCount() { + return sessions.size(); + } + + /** + * 向指定会话发送消息 + */ + public void sendMessage(WebSocketSession session, List messages) { + if (messages == null || messages.isEmpty() || session == null || !session.isOpen()) { + return; + } + + try { + String jsonMessage = objectMapper.writeValueAsString(messages); + TextMessage textMessage = new TextMessage(jsonMessage); + session.sendMessage(textMessage); + log.debug("向会话 {} 发送 {} 条消息", session.getId(), messages.size()); + } catch (Exception e) { + log.error("向会话 {} 发送消息失败", session.getId(), e); + } + } + + /** + * 获取连接详细信息 + */ + public List getConnectionDetails() { + return sessions.stream() + .map(session -> String.format("ID: %s, Remote Address: %s, Open: %s", + session.getId(), + session.getRemoteAddress(), + session.isOpen())) + .collect(java.util.stream.Collectors.toList()); + } +} \ No newline at end of file diff --git a/adxp-adapter/start.sh b/adxp-adapter/start.sh index 99c508ac..b5239066 100755 --- a/adxp-adapter/start.sh +++ b/adxp-adapter/start.sh @@ -2,32 +2,50 @@ # ADXP Adapter 启动脚本 -echo "🚀 启动 ADXP SDK 适配器服务..." -echo "" +# 项目路径 +PROJECT_DIR="/Users/tianjianyong/apps/Company/QAUP-Management/adxp-adapter" +JAR_FILE="$PROJECT_DIR/target/adxp-adapter.jar" -# 自动查找 Java 8 -if [ -z "$JAVA_8_HOME" ]; then - JAVA_8_HOME=$(/usr/libexec/java_home -v 1.8 2>/dev/null) -fi - -if [ -z "$JAVA_8_HOME" ]; then - echo "❌ 错误: 未找到 Java 8" - echo "" - echo "请先安装 Java 8:" - echo " brew install --cask temurin8" - echo "" - echo "或者手动设置 JAVA_8_HOME:" - echo " export JAVA_8_HOME=/path/to/jdk8" +# 检查项目目录是否存在 +if [ ! -d "$PROJECT_DIR" ]; then + echo "❌ 项目目录不存在: $PROJECT_DIR" exit 1 fi -echo "Java 版本: $($JAVA_8_HOME/bin/java -version 2>&1 | head -1)" -echo "" -echo "配置:" -echo " - 适配器端口: 8086" -echo " - ADXP 数据中台地址: ${ADXP_HOST:-localhost}:${ADXP_PORT:-8086}" -echo "" +# 进入项目目录 +cd "$PROJECT_DIR" -$JAVA_8_HOME/bin/java -jar target/adxp-adapter.jar \ - --adxp.host=${ADXP_HOST:-localhost} \ - --adxp.port=${ADXP_PORT:-8086} +# 检查JAR文件是否存在,如果不存在则编译 +if [ ! -f "$JAR_FILE" ]; then + echo "🔨 正在编译adxp-adapter项目..." + mvn clean package + if [ $? -ne 0 ]; then + echo "❌ 编译失败" + exit 1 + fi +fi + +# 检查JAR文件是否存在 +if [ ! -f "$JAR_FILE" ]; then + echo "❌ JAR文件不存在: $JAR_FILE" + exit 1 +fi + +echo "🚀 正在启动ADXP Adapter服务..." +echo "📄 JAR文件: $JAR_FILE" + +# 启动服务 +java -jar "$JAR_FILE" & + +# 等待几秒钟让服务启动 +sleep 5 + +# 检查服务是否启动成功 +if pgrep -f "adxp-adapter" > /dev/null; then + echo "✅ ADXP Adapter服务启动成功" + echo "🌐 WebSocket端点: ws://localhost:8086/ws/flight-notifications" + echo "📊 健康检查: http://localhost:8086/actuator/health" +else + echo "❌ ADXP Adapter服务启动失败" + exit 1 +fi \ No newline at end of file diff --git a/qaup-admin/src/main/resources/application-dev.yml b/qaup-admin/src/main/resources/application-dev.yml index ebd12656..115795cb 100644 --- a/qaup-admin/src/main/resources/application-dev.yml +++ b/qaup-admin/src/main/resources/application-dev.yml @@ -105,6 +105,9 @@ data: password: ${ADXP_PASSWORD:dianxin@123} # 重连延迟(毫秒) reconnect-delay-millis: 3000 + # WebSocket连接启用 + websocket: + enabled: true # 无人车厂商数据源配置 - 开发环境 vehicle-api: diff --git a/qaup-collision/src/main/java/com/qaup/collision/datacollector/config/FlightSdkProperties.java b/qaup-collision/src/main/java/com/qaup/collision/datacollector/config/FlightSdkProperties.java index a48a4eb3..81496d4d 100644 --- a/qaup-collision/src/main/java/com/qaup/collision/datacollector/config/FlightSdkProperties.java +++ b/qaup-collision/src/main/java/com/qaup/collision/datacollector/config/FlightSdkProperties.java @@ -33,9 +33,22 @@ public class FlightSdkProperties { * 重连延迟(毫秒) */ private long reconnectDelayMillis = 3000L; + + /** + * WebSocket配置 + */ + private WebSocketConfig websocket = new WebSocketConfig(); public boolean isConfigurationReady() { return host != null && port != null && port > 0 && username != null && password != null; } + + @Data + public static class WebSocketConfig { + /** + * 是否启用WebSocket连接 + */ + private boolean enabled = true; + } } \ No newline at end of file diff --git a/qaup-collision/src/main/java/com/qaup/collision/datacollector/config/WebSocketConfig.java b/qaup-collision/src/main/java/com/qaup/collision/datacollector/config/WebSocketConfig.java new file mode 100644 index 00000000..80c9256c --- /dev/null +++ b/qaup-collision/src/main/java/com/qaup/collision/datacollector/config/WebSocketConfig.java @@ -0,0 +1,23 @@ +package com.qaup.collision.datacollector.config; + +import com.qaup.collision.datacollector.websocket.AdxpFlightServiceWebSocketClient; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.client.WebSocketClient; +import org.springframework.web.socket.client.standard.StandardWebSocketClient; + +@Configuration +public class WebSocketConfig { + + @Bean + public WebSocketClient webSocketClient() { + return new StandardWebSocketClient(); + } + + @Bean + public AdxpFlightServiceWebSocketClient adxpFlightServiceWebSocketClient( + WebSocketClient webSocketClient, + FlightSdkProperties flightSdkProperties) { + return new AdxpFlightServiceWebSocketClient(webSocketClient, flightSdkProperties); + } +} \ No newline at end of file diff --git a/qaup-collision/src/main/java/com/qaup/collision/datacollector/sdk/AdxpFlightServiceHttpClient.java b/qaup-collision/src/main/java/com/qaup/collision/datacollector/sdk/AdxpFlightServiceHttpClient.java index 59bb3a8c..f4994234 100644 --- a/qaup-collision/src/main/java/com/qaup/collision/datacollector/sdk/AdxpFlightServiceHttpClient.java +++ b/qaup-collision/src/main/java/com/qaup/collision/datacollector/sdk/AdxpFlightServiceHttpClient.java @@ -49,6 +49,10 @@ public class AdxpFlightServiceHttpClient implements org.springframework.beans.fa this.properties = properties; this.restTemplate = new RestTemplate(); } + + public FlightSdkProperties getProperties() { + return properties; + } @Override public void afterPropertiesSet() { diff --git a/qaup-collision/src/main/java/com/qaup/collision/datacollector/service/DataCollectorService.java b/qaup-collision/src/main/java/com/qaup/collision/datacollector/service/DataCollectorService.java index dc47b80a..6303f9e9 100644 --- a/qaup-collision/src/main/java/com/qaup/collision/datacollector/service/DataCollectorService.java +++ b/qaup-collision/src/main/java/com/qaup/collision/datacollector/service/DataCollectorService.java @@ -84,6 +84,9 @@ public class DataCollectorService { @Autowired(required = false) private AdxpFlightServiceHttpClient adxpFlightServiceClient; + + @Autowired(required = false) + private com.qaup.collision.datacollector.websocket.AdxpFlightServiceWebSocketClient adxpFlightServiceWebSocketClient; private final GeometryFactory geometryFactory = new GeometryFactory(new PrecisionModel(), 4326); // SRID 4326 for WGS84 @@ -95,10 +98,72 @@ public class DataCollectorService { private final Map flightNotificationCache = new ConcurrentHashMap<>(); // 初始化时将缓存引用传递给数据处理服务 + /** + * 初始化时将缓存引用传递给数据处理服务 + */ @PostConstruct public void init() { dataProcessingService.setActiveMovingObjectsCache(activeMovingObjectsCache); log.info("DataCollectorService 初始化完成,缓存引用已传递给DataProcessingService"); + + // 初始化WebSocket客户端监听器 + initWebSocketClient(); + } + + /** + * 初始化WebSocket客户端监听器 + */ + private void initWebSocketClient() { + if (adxpFlightServiceWebSocketClient != null && + adxpFlightServiceClient != null && + adxpFlightServiceClient.getProperties() != null && + adxpFlightServiceClient.getProperties().getWebsocket().isEnabled()) { + try { + adxpFlightServiceWebSocketClient.addMessageListener(this::handleFlightNotifications); + // WebSocket客户端会在PostConstruct时自动连接 + log.info("已注册WebSocket航班通知客户端"); + } catch (Exception e) { + log.error("初始化WebSocket客户端失败", e); + } + } else { + log.info("WebSocket客户端未启用或未配置"); + } + } + + /** + * 处理通过WebSocket接收到的航班通知 + */ + private void handleFlightNotifications(List notifications) { + if (notifications == null || notifications.isEmpty()) { + return; + } + + log.info("通过WebSocket接收到 {} 条航班进出港通知", notifications.size()); + + // 将DTO转换为业务对象并处理 + for (FlightNotificationDTO dto : notifications) { + try { + FlightNotification notification = convertToFlightNotification(dto); + if (notification != null && notification.isValid()) { + log.info("🛬 处理航班进出港通知: 航班号={}, 类型={}, 跑道={}, 机位={}, 事件时间={}", + notification.getFlightNo(), + notification.getType(), + notification.getRunway(), + notification.getSeat(), + notification.getEventDateTime()); + + // 缓存航班通知供DataProcessingService处理 + cacheFlightNotification(notification); + + } else { + log.warn("⚠️ 航班进出港通知数据无效,跳过处理: {}", dto); + } + } catch (Exception e) { + log.error("❌ 处理航班进出港通知异常: flightNo={}", dto.getFlightNo(), e); + } + } + + log.info("✅ WebSocket航班进出港通知数据处理完成,处理数量: {}", notifications.size()); } @@ -613,6 +678,14 @@ public class DataCollectorService { } try { + // 优先使用WebSocket客户端 + if (adxpFlightServiceWebSocketClient != null && adxpFlightServiceWebSocketClient.isConnected()) { + // WebSocket客户端已连接,消息通过WebSocket实时接收 + log.debug("使用WebSocket客户端接收航班通知"); + return; + } + + // 如果WebSocket客户端不可用,回退到HTTP客户端 if (adxpFlightServiceClient == null || !adxpFlightServiceClient.isEnabled()) { log.warn("数据中台航班 SDK 未启用,跳过航班通知采集"); return; diff --git a/qaup-collision/src/main/java/com/qaup/collision/datacollector/websocket/AdxpFlightServiceWebSocketClient.java b/qaup-collision/src/main/java/com/qaup/collision/datacollector/websocket/AdxpFlightServiceWebSocketClient.java new file mode 100644 index 00000000..1cdffcc3 --- /dev/null +++ b/qaup-collision/src/main/java/com/qaup/collision/datacollector/websocket/AdxpFlightServiceWebSocketClient.java @@ -0,0 +1,492 @@ +package com.qaup.collision.datacollector.websocket; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.qaup.collision.datacollector.config.FlightSdkProperties; +import com.qaup.collision.datacollector.dto.FlightNotificationDTO; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.WebSocketMessage; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.client.WebSocketClient; + +import java.net.URI; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.concurrent.atomic.AtomicLong; + +@Slf4j +public class AdxpFlightServiceWebSocketClient implements WebSocketHandler { + + private final WebSocketClient webSocketClient; + private final FlightSdkProperties properties; + private final ObjectMapper objectMapper; + private final List>> messageListeners; + private final AtomicBoolean isConnected = new AtomicBoolean(false); + private final AtomicBoolean isRunning = new AtomicBoolean(false); + private final AtomicLong messageCount = new AtomicLong(0); + private final AtomicLong errorCount = new AtomicLong(0); + + private WebSocketSession session; + private String sessionId; + private Thread reconnectThread; + + public AdxpFlightServiceWebSocketClient(WebSocketClient webSocketClient, + FlightSdkProperties properties) { + this.webSocketClient = webSocketClient; + this.properties = properties; + this.objectMapper = new ObjectMapper(); + this.messageListeners = new CopyOnWriteArrayList<>(); + } + + /** + * 启动WebSocket客户端 + */ + @PostConstruct + public void start() { + if (isRunning.compareAndSet(false, true)) { + log.info("启动ADXP航班通知WebSocket客户端"); + connect(); + } + } + + /** + * 停止WebSocket客户端 + */ + @PreDestroy + public void stop() { + if (isRunning.compareAndSet(true, false)) { + log.info("停止ADXP航班通知WebSocket客户端"); + disconnect(); + } + } + + /** + * 连接到ADXP适配器的WebSocket服务 + */ + public void connect() { + if (!properties.isConfigurationReady()) { + log.warn("数据中台航班 SDK 配置不完整,WebSocket客户端将无法正常工作"); + return; + } + + try { + // 首先通过HTTP登录获取sessionId + sessionId = loginAndGetSessionId(); + if (sessionId == null) { + log.error("无法获取sessionId,WebSocket连接失败"); + scheduleReconnect(); + return; + } + + // 构建WebSocket URL + String wsUrl = String.format("ws://%s:%d/ws/flight-notifications", + properties.getHost(), properties.getPort()); + + log.info("正在连接到ADXP适配器WebSocket服务: url={}", wsUrl); + + // 连接WebSocket + session = webSocketClient.doHandshake(this, URI.create(wsUrl).toString()).get(); + isConnected.set(true); + log.info("✅ 已连接到ADXP适配器WebSocket服务"); + + } catch (Exception e) { + log.error("❌ 连接ADXP适配器WebSocket服务失败", e); + errorCount.incrementAndGet(); + isConnected.set(false); + scheduleReconnect(); + } + } + + /** + * 通过HTTP登录获取sessionId + */ + private String loginAndGetSessionId() { + try { + String baseUrl = String.format("http://%s:%d/api/adxp", + properties.getHost(), properties.getPort()); + + // 创建登录请求 + java.util.Map loginRequest = new java.util.HashMap<>(); + loginRequest.put("username", properties.getUsername()); + loginRequest.put("password", properties.getPassword()); + + // 发送HTTP POST请求 + java.net.http.HttpClient httpClient = java.net.http.HttpClient.newHttpClient(); + String loginUrl = baseUrl + "/login"; + + String requestBody = objectMapper.writeValueAsString(loginRequest); + java.net.http.HttpRequest request = java.net.http.HttpRequest.newBuilder() + .uri(URI.create(loginUrl)) + .header("Content-Type", "application/json") + .POST(java.net.http.HttpRequest.BodyPublishers.ofString(requestBody)) + .build(); + + java.net.http.HttpResponse response = httpClient.send(request, + java.net.http.HttpResponse.BodyHandlers.ofString()); + + if (response.statusCode() == 200) { + java.util.Map responseBody = objectMapper.readValue( + response.body(), new TypeReference>() {}); + + if (Boolean.TRUE.equals(responseBody.get("success"))) { + String sessionId = (String) responseBody.get("sessionId"); + log.info("✅ 登录ADXP适配器成功: sessionId={}", sessionId); + return sessionId; + } else { + String message = (String) responseBody.get("message"); + log.warn("❌ 登录ADXP适配器失败: {}", message); + } + } else { + log.warn("❌ 登录ADXP适配器HTTP请求失败: status={}", response.statusCode()); + } + } catch (Exception e) { + log.error("❌ 登录ADXP适配器时发生异常", e); + errorCount.incrementAndGet(); + } + + return null; + } + + /** + * 断开WebSocket连接 + */ + public void disconnect() { + try { + if (session != null && session.isOpen()) { + session.close(); + log.info("WebSocket连接已关闭"); + } + + // 尝试登出 + logout(); + } catch (Exception e) { + log.error("断开WebSocket连接时发生异常", e); + errorCount.incrementAndGet(); + } finally { + isConnected.set(false); + session = null; + } + } + + /** + * 登出 + */ + private void logout() { + if (sessionId == null) { + return; + } + + try { + String baseUrl = String.format("http://%s:%d/api/adxp", + properties.getHost(), properties.getPort()); + + // 创建登出请求 + java.util.Map logoutRequest = new java.util.HashMap<>(); + logoutRequest.put("sessionId", sessionId); + + // 发送HTTP POST请求 + java.net.http.HttpClient httpClient = java.net.http.HttpClient.newHttpClient(); + String logoutUrl = baseUrl + "/logout"; + + String requestBody = objectMapper.writeValueAsString(logoutRequest); + java.net.http.HttpRequest request = java.net.http.HttpRequest.newBuilder() + .uri(URI.create(logoutUrl)) + .header("Content-Type", "application/json") + .POST(java.net.http.HttpRequest.BodyPublishers.ofString(requestBody)) + .build(); + + httpClient.send(request, java.net.http.HttpResponse.BodyHandlers.ofString()); + log.info("✅ 已登出ADXP适配器服务"); + } catch (Exception e) { + log.warn("登出ADXP适配器服务失败", e); + errorCount.incrementAndGet(); + } + } + + @Override + public void afterConnectionEstablished(WebSocketSession session) throws Exception { + log.info("🟢 WebSocket连接已建立: sessionId={}", session.getId()); + this.session = session; + isConnected.set(true); + messageCount.set(0); // 重置消息计数 + } + + @Override + public void handleMessage(WebSocketSession session, WebSocketMessage message) throws Exception { + if (message instanceof TextMessage) { + handleTextMessage(session, (TextMessage) message); + } + } + + public void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { + try { + // 解析接收到的消息 + String payload = message.getPayload(); + log.debug("📥 收到WebSocket消息 ({} bytes)", payload.length()); + + // 增加消息计数 + messageCount.incrementAndGet(); + + // 将FlightMessage转换为FlightNotificationDTO + List notifications = parseMessages(payload); + + if (!notifications.isEmpty()) { + log.info("🛬 接收到 {} 条航班通知", notifications.size()); + + // 通知所有监听器 + for (Consumer> listener : messageListeners) { + try { + listener.accept(notifications); + } catch (Exception e) { + log.error("处理消息监听器时发生异常", e); + errorCount.incrementAndGet(); + } + } + } else { + log.debug("📭 收到空消息或无法解析的消息"); + } + } catch (Exception e) { + log.error("❌ 处理WebSocket消息时发生异常", e); + errorCount.incrementAndGet(); + } + } + + @Override + public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { + log.error("❌ WebSocket传输错误", exception); + errorCount.incrementAndGet(); + isConnected.set(false); + this.session = null; + + // 尝试重连 + if (isRunning.get()) { + scheduleReconnect(); + } + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { + log.info("🟡 WebSocket连接已关闭: reason={}, code={}", closeStatus.getReason(), closeStatus.getCode()); + isConnected.set(false); + this.session = null; + + // 尝试重连(除非是正常关闭) + if (isRunning.get() && closeStatus.getCode() != 1000) { + scheduleReconnect(); + } + } + + @Override + public boolean supportsPartialMessages() { + return false; + } + + /** + * 解析消息 + */ + private List parseMessages(String jsonContent) { + try { + // 解析FlightMessage列表 + List flightMessages = + objectMapper.readValue(jsonContent, new TypeReference>() {}); + + List notifications = new java.util.ArrayList<>(); + + for (FlightMessage message : flightMessages) { + try { + FlightNotificationDTO dto = parseXmlMessage(message.getServiceCode(), message.getContent()); + if (dto != null) { + notifications.add(dto); + } + } catch (Exception e) { + log.warn("解析消息失败: serviceCode={}", message.getServiceCode(), e); + errorCount.incrementAndGet(); + } + } + + return notifications; + } catch (Exception e) { + log.error("解析消息失败", e); + errorCount.incrementAndGet(); + return new java.util.ArrayList<>(); + } + } + + private FlightNotificationDTO parseXmlMessage(String serviceCode, String xmlContent) { + try { + javax.xml.parsers.DocumentBuilderFactory factory = javax.xml.parsers.DocumentBuilderFactory.newInstance(); + factory.setNamespaceAware(false); + javax.xml.parsers.DocumentBuilder builder = factory.newDocumentBuilder(); + org.w3c.dom.Document doc = builder.parse(new org.xml.sax.InputSource(new java.io.StringReader(xmlContent))); + + org.w3c.dom.Element root = doc.getDocumentElement(); + + switch (serviceCode) { + case "ADXP_NAOMS_O_DYN_ARR": + return parseArrival(root); + case "ADXP_NAOMS_O_CDM_AXOT": + return parsePushback(root); + case "ADXP_NAOMS_O_CDM_RUNWAY": + return parseRunway(root); + case "ADXP_NAOMS_O_DYN_CRAFTSEAT": + return parseGate(root); + default: + log.debug("未知的服务代码: {}", serviceCode); + return null; + } + } catch (Exception e) { + log.warn("解析 XML 失败: serviceCode={}", serviceCode, e); + errorCount.incrementAndGet(); + return null; + } + } + + private FlightNotificationDTO parseArrival(org.w3c.dom.Element root) { + String flightNo = getTextContent(root, "FlightNumber"); + String estimatedArrival = getTextContent(root, "EstimatedArrival"); + + FlightNotificationDTO dto = new FlightNotificationDTO(); + dto.setFlightNo(flightNo); + dto.setType("IN"); + + if (estimatedArrival != null && !estimatedArrival.isEmpty()) { + try { + // 解析日期时间 + java.time.format.DateTimeFormatter formatter = java.time.format.DateTimeFormatter.ofPattern("yyyyMMddHHmmss"); + java.time.LocalDateTime arrivalTime = java.time.LocalDateTime.parse(estimatedArrival, formatter); + dto.setTime(arrivalTime.atZone(java.time.ZoneId.systemDefault()).toInstant().toEpochMilli()); + } catch (Exception e) { + log.warn("无法解析到达时间: {}", estimatedArrival); + errorCount.incrementAndGet(); + } + } + + return dto; + } + + private FlightNotificationDTO parsePushback(org.w3c.dom.Element root) { + String flightNo = getTextContent(root, "FlightNumber"); + String actualPushback = getTextContent(root, "ActualPushback"); + + FlightNotificationDTO dto = new FlightNotificationDTO(); + dto.setFlightNo(flightNo); + dto.setType("OUT"); + + if (actualPushback != null && !actualPushback.isEmpty()) { + try { + // 解析日期时间 + java.time.format.DateTimeFormatter formatter = java.time.format.DateTimeFormatter.ofPattern("yyyyMMddHHmmss"); + java.time.LocalDateTime pushbackTime = java.time.LocalDateTime.parse(actualPushback, formatter); + dto.setTime(pushbackTime.atZone(java.time.ZoneId.systemDefault()).toInstant().toEpochMilli()); + } catch (Exception e) { + log.warn("无法解析推出时间: {}", actualPushback); + errorCount.incrementAndGet(); + } + } + + return dto; + } + + private FlightNotificationDTO parseRunway(org.w3c.dom.Element root) { + String flightNo = getTextContent(root, "FlightNumber"); + String runway = getTextContent(root, "Runway"); + + FlightNotificationDTO dto = new FlightNotificationDTO(); + dto.setFlightNo(flightNo); + dto.setRunway(runway); + return dto; + } + + private FlightNotificationDTO parseGate(org.w3c.dom.Element root) { + String flightNo = getTextContent(root, "FlightNumber"); + String seat = getTextContent(root, "Gate"); + + FlightNotificationDTO dto = new FlightNotificationDTO(); + dto.setFlightNo(flightNo); + dto.setSeat(seat); + return dto; + } + + private String getTextContent(org.w3c.dom.Element parent, String tagName) { + org.w3c.dom.NodeList nodeList = parent.getElementsByTagName(tagName); + if (nodeList.getLength() > 0) { + return nodeList.item(0).getTextContent(); + } + return null; + } + + /** + * 添加消息监听器 + */ + public void addMessageListener(Consumer> listener) { + messageListeners.add(listener); + log.info("添加消息监听器,当前监听器数量: {}", messageListeners.size()); + } + + /** + * 移除消息监听器 + */ + public void removeMessageListener(Consumer> listener) { + messageListeners.remove(listener); + log.info("移除消息监听器,当前监听器数量: {}", messageListeners.size()); + } + + /** + * 检查是否已连接 + */ + public boolean isConnected() { + return isConnected.get(); + } + + /** + * 安排重新连接 + */ + private void scheduleReconnect() { + if (!isRunning.get()) { + return; + } + + log.info("安排{}毫秒后重新连接WebSocket服务", properties.getReconnectDelayMillis()); + + if (reconnectThread != null && reconnectThread.isAlive()) { + reconnectThread.interrupt(); + } + + reconnectThread = new Thread(() -> { + try { + Thread.sleep(properties.getReconnectDelayMillis()); + if (isRunning.get()) { + connect(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.info("重连被中断"); + } + }, "WebSocket-Reconnect-Thread"); + + reconnectThread.setDaemon(true); + reconnectThread.start(); + } + + /** + * 获取客户端统计信息 + */ + public String getStats() { + return String.format("WebSocketClient Stats: " + + "connected=%s, " + + "messages=%d, " + + "errors=%d, " + + "running=%s", + isConnected.get(), + messageCount.get(), + errorCount.get(), + isRunning.get()); + } +} \ No newline at end of file diff --git a/qaup-collision/src/main/java/com/qaup/collision/datacollector/websocket/FlightMessage.java b/qaup-collision/src/main/java/com/qaup/collision/datacollector/websocket/FlightMessage.java new file mode 100644 index 00000000..7508f589 --- /dev/null +++ b/qaup-collision/src/main/java/com/qaup/collision/datacollector/websocket/FlightMessage.java @@ -0,0 +1,29 @@ +package com.qaup.collision.datacollector.websocket; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * 用于WebSocket传输的航班消息格式 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class FlightMessage { + + /** + * 服务代码 + */ + private String serviceCode; + + /** + * 操作代码 + */ + private String actionCode; + + /** + * 消息内容(XML格式) + */ + private String content; +} \ No newline at end of file diff --git a/tools/test_adxp_websocket.html b/tools/test_adxp_websocket.html new file mode 100644 index 00000000..d9ccf75e --- /dev/null +++ b/tools/test_adxp_websocket.html @@ -0,0 +1,146 @@ + + + + + ADXP WebSocket测试工具 + + + +
+

ADXP WebSocket测试工具

+ +
+ + + + + +
+ +
+ + + +
+ +
+

日志:

+
+
+
+ + + + \ No newline at end of file diff --git a/tools/test_adxp_websocket.py b/tools/test_adxp_websocket.py new file mode 100644 index 00000000..b29ad00f --- /dev/null +++ b/tools/test_adxp_websocket.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +ADXP WebSocket客户端测试脚本 +用于测试与adxp-adapter的WebSocket连接 +""" + +import websocket +import json +import threading +import time + +def on_message(ws, message): + """处理接收到的消息""" + print(f"📥 收到消息: {message}") + try: + # 尝试解析JSON消息 + data = json.loads(message) + if isinstance(data, list): + print(f" 解析到 {len(data)} 条航班消息") + else: + print(f" 消息内容: {data}") + except json.JSONDecodeError: + print(f" 非JSON消息: {message}") + +def on_error(ws, error): + """处理错误""" + print(f"❌ WebSocket错误: {error}") + +def on_close(ws, close_status_code, close_msg): + """处理连接关闭""" + print(f"🔒 连接已关闭: 状态码={close_status_code}, 消息={close_msg}") + +def on_open(ws): + """处理连接打开""" + print("✅ WebSocket连接已建立") + + # 启动心跳线程 + def run(*args): + while True: + time.sleep(30) # 每30秒发送一次心跳 + if ws.sock.connected: + ws.send("ping") + print("💓 发送心跳") + else: + break + threading.Thread(target=run, daemon=True).start() + +def main(): + """主函数""" + # WebSocket URL + ws_url = "ws://localhost:8086/ws/flight-notifications" + + print(f"🚀 正在连接到ADXP适配器WebSocket服务: {ws_url}") + + # 启用调试日志 + # websocket.enableTrace(True) + + # 创建WebSocket连接 + ws = websocket.WebSocketApp(ws_url, + on_open=on_open, + on_message=on_message, + on_error=on_error, + on_close=on_close) + + # 启动连接(阻塞) + ws.run_forever() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/tools/test_adxp_websocket_integration.py b/tools/test_adxp_websocket_integration.py new file mode 100644 index 00000000..8c780c01 --- /dev/null +++ b/tools/test_adxp_websocket_integration.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +ADXP WebSocket集成测试脚本 +用于测试整个ADXP WebSocket消息传输流程 +""" + +import websocket +import json +import time +import threading + +class AdxpWebSocketTestClient: + def __init__(self, url): + self.url = url + self.ws = None + self.connected = False + self.messages_received = 0 + self.start_time = None + + def on_message(self, ws, message): + """处理接收到的消息""" + self.messages_received += 1 + print(f"📥 [{self.messages_received}] 收到消息: {message[:100]}...") + + try: + # 尝试解析JSON消息 + data = json.loads(message) + if isinstance(data, list): + print(f" 解析到 {len(data)} 条航班消息") + for i, msg in enumerate(data): + print(f" [{i+1}] 服务代码: {msg.get('serviceCode', 'N/A')}") + else: + print(f" 消息内容: {data}") + except json.JSONDecodeError: + print(f" 非JSON消息: {message}") + + # 显示性能统计 + if self.start_time: + elapsed = time.time() - self.start_time + if elapsed > 0: + rate = self.messages_received / elapsed + print(f" 📊 接收速率: {rate:.2f} 条消息/秒") + + def on_error(self, ws, error): + """处理错误""" + print(f"❌ WebSocket错误: {error}") + self.connected = False + + def on_close(self, ws, close_status_code, close_msg): + """处理连接关闭""" + print(f"🔒 连接已关闭: 状态码={close_status_code}, 消息={close_msg}") + self.connected = False + + def on_open(self, ws): + """处理连接打开""" + self.connected = True + self.start_time = time.time() + print("✅ WebSocket连接已建立") + print(f"🔗 连接地址: {self.url}") + + # 启动心跳线程 + def heartbeat(): + while self.connected: + time.sleep(30) # 每30秒发送一次心跳 + if self.connected and self.ws and self.ws.sock.connected: + self.ws.send("ping") + print("💓 发送心跳") + else: + break + + threading.Thread(target=heartbeat, daemon=True).start() + + def connect(self): + """连接到WebSocket服务器""" + print(f"🚀 正在连接到ADXP适配器WebSocket服务: {self.url}") + + # 创建WebSocket连接 + self.ws = websocket.WebSocketApp(self.url, + on_open=self.on_open, + on_message=self.on_message, + on_error=self.on_error, + on_close=self.on_close) + + # 启动连接(阻塞) + self.ws.run_forever() + + def disconnect(self): + """断开连接""" + if self.ws: + self.ws.close() + print("🚫 主动断开连接") + self.connected = False + + def get_stats(self): + """获取统计信息""" + if self.start_time: + elapsed = time.time() - self.start_time + return { + 'messages_received': self.messages_received, + 'elapsed_time': elapsed, + 'rate': self.messages_received / elapsed if elapsed > 0 else 0 + } + return None + +def main(): + """主函数""" + # WebSocket URL + ws_url = "ws://localhost:8086/ws/flight-notifications" + + # 创建测试客户端 + client = AdxpWebSocketTestClient(ws_url) + + try: + # 连接并开始监听 + client.connect() + except KeyboardInterrupt: + print("\n⚠️ 用户中断") + client.disconnect() + + # 显示最终统计 + stats = client.get_stats() + if stats: + print(f"\n📊 最终统计:") + print(f" 接收消息总数: {stats['messages_received']}") + print(f" 运行时间: {stats['elapsed_time']:.2f} 秒") + print(f" 平均接收速率: {stats['rate']:.2f} 条消息/秒") + except Exception as e: + print(f"❌ 测试过程中发生错误: {e}") + client.disconnect() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/tools/test_adxp_websocket_system.py b/tools/test_adxp_websocket_system.py new file mode 100755 index 00000000..2fe2096b --- /dev/null +++ b/tools/test_adxp_websocket_system.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +ADXP WebSocket系统集成测试脚本 +用于测试整个ADXP WebSocket消息传输流程 +""" + +import websocket +import json +import time +import threading +import requests + +class AdxpSystemTest: + def __init__(self): + self.adxp_adapter_url = "http://localhost:8086" + self.qaup_system_url = "http://localhost:8080" + self.ws_url = "ws://localhost:8086/ws/flight-notifications" + self.ws = None + self.connected = False + self.messages_received = 0 + self.start_time = None + + def test_health_endpoints(self): + """测试健康检查端点""" + print("🔍 测试健康检查端点...") + + try: + # 测试adxp-adapter健康检查 + response = requests.get(f"{self.adxp_adapter_url}/actuator/health") + if response.status_code == 200: + print("✅ adxp-adapter健康检查: OK") + print(f" 状态: {response.json()}") + else: + print(f"❌ adxp-adapter健康检查失败: {response.status_code}") + except Exception as e: + print(f"❌ adxp-adapter健康检查异常: {e}") + + try: + # 测试QAUP系统健康检查 + response = requests.get(f"{self.qaup_system_url}/actuator/health") + if response.status_code == 200: + print("✅ QAUP系统健康检查: OK") + print(f" 状态: {response.json()}") + else: + print(f"❌ QAUP系统健康检查失败: {response.status_code}") + except Exception as e: + print(f"❌ QAUP系统健康检查异常: {e}") + + def on_message(self, ws, message): + """处理接收到的消息""" + self.messages_received += 1 + print(f"📥 [{self.messages_received}] 收到WebSocket消息") + + try: + # 尝试解析JSON消息 + data = json.loads(message) + if isinstance(data, list): + print(f" 解析到 {len(data)} 条航班消息") + for i, msg in enumerate(data): + service_code = msg.get('serviceCode', 'N/A') + print(f" [{i+1}] 服务代码: {service_code}") + if service_code in ['ADXP_NAOMS_O_DYN_ARR', 'ADXP_NAOMS_O_CDM_AXOT']: + print(f" 航班号: {self.extract_flight_number(msg.get('content', ''))}") + else: + print(f" 消息内容: {data}") + except json.JSONDecodeError: + print(f" 非JSON消息: {message}") + + # 显示性能统计 + if self.start_time: + elapsed = time.time() - self.start_time + if elapsed > 0: + rate = self.messages_received / elapsed + print(f" 📊 接收速率: {rate:.2f} 条消息/秒") + + def extract_flight_number(self, xml_content): + """从XML内容中提取航班号""" + try: + import xml.etree.ElementTree as ET + root = ET.fromstring(xml_content) + flight_number = root.find('.//FlightNumber') + return flight_number.text if flight_number is not None else 'N/A' + except Exception: + return 'N/A' + + def on_error(self, ws, error): + """处理错误""" + print(f"❌ WebSocket错误: {error}") + self.connected = False + + def on_close(self, ws, close_status_code, close_msg): + """处理连接关闭""" + print(f"🔒 WebSocket连接已关闭: 状态码={close_status_code}, 消息={close_msg}") + self.connected = False + + def on_open(self, ws): + """处理连接打开""" + self.connected = True + self.start_time = time.time() + print("✅ WebSocket连接已建立") + print(f"🔗 连接地址: {self.ws_url}") + + # 启动心跳线程 + def heartbeat(): + while self.connected: + time.sleep(30) # 每30秒发送一次心跳 + if self.connected and self.ws and self.ws.sock.connected: + self.ws.send("ping") + print("💓 发送心跳") + else: + break + + threading.Thread(target=heartbeat, daemon=True).start() + + def test_websocket_connection(self): + """测试WebSocket连接""" + print("\n🚀 测试WebSocket连接...") + + try: + # 创建WebSocket连接 + self.ws = websocket.WebSocketApp(self.ws_url, + on_open=self.on_open, + on_message=self.on_message, + on_error=self.on_error, + on_close=self.on_close) + + # 在单独线程中运行WebSocket + ws_thread = threading.Thread(target=self.ws.run_forever, daemon=True) + ws_thread.start() + + # 等待连接建立 + time.sleep(2) + + if self.connected: + print("✅ WebSocket连接测试成功") + # 保持连接10秒以接收消息 + print("⏳ 保持连接10秒以接收消息...") + time.sleep(10) + self.ws.close() + else: + print("❌ WebSocket连接测试失败") + + except Exception as e: + print(f"❌ WebSocket连接测试异常: {e}") + + def get_final_stats(self): + """获取最终统计信息""" + if self.start_time: + elapsed = time.time() - self.start_time + return { + 'messages_received': self.messages_received, + 'elapsed_time': elapsed, + 'rate': self.messages_received / elapsed if elapsed > 0 else 0 + } + return None + + def run_all_tests(self): + """运行所有测试""" + print("🧪 开始ADXP WebSocket系统集成测试") + print("=" * 50) + + # 测试健康检查端点 + self.test_health_endpoints() + + # 测试WebSocket连接 + self.test_websocket_connection() + + # 显示最终统计 + stats = self.get_final_stats() + if stats: + print(f"\n📊 最终统计:") + print(f" 接收消息总数: {stats['messages_received']}") + print(f" 运行时间: {stats['elapsed_time']:.2f} 秒") + print(f" 平均接收速率: {stats['rate']:.2f} 条消息/秒") + + print("\n✅ 系统集成测试完成") + +def main(): + """主函数""" + tester = AdxpSystemTest() + try: + tester.run_all_tests() + except KeyboardInterrupt: + print("\n⚠️ 用户中断") + except Exception as e: + print(f"❌ 测试过程中发生错误: {e}") + +if __name__ == "__main__": + main()