diff --git a/adxp-adapter/libs/com/taocares/adxp/AdxpConstants.class b/adxp-adapter/libs/com/taocares/adxp/AdxpConstants.class new file mode 100644 index 00000000..c4c44af9 Binary files /dev/null and b/adxp-adapter/libs/com/taocares/adxp/AdxpConstants.class differ diff --git a/adxp-adapter/src/main/java/com/qaup/adxp/adapter/config/WebSocketConfig.java b/adxp-adapter/src/main/java/com/qaup/adxp/adapter/config/WebSocketConfig.java index 47d76b78..4ef255f6 100644 --- a/adxp-adapter/src/main/java/com/qaup/adxp/adapter/config/WebSocketConfig.java +++ b/adxp-adapter/src/main/java/com/qaup/adxp/adapter/config/WebSocketConfig.java @@ -18,7 +18,7 @@ public class WebSocketConfig implements WebSocketConfigurer { @Bean public AdxpWebSocketHandler adxpWebSocketHandler() { - return new AdxpWebSocketHandler(adxpSdkService); + return new AdxpWebSocketHandler(); } @Override diff --git a/adxp-adapter/src/main/java/com/qaup/adxp/adapter/service/AdxpSdkService.java b/adxp-adapter/src/main/java/com/qaup/adxp/adapter/service/AdxpSdkService.java index 3292f6a9..ecff74cf 100644 --- a/adxp-adapter/src/main/java/com/qaup/adxp/adapter/service/AdxpSdkService.java +++ b/adxp-adapter/src/main/java/com/qaup/adxp/adapter/service/AdxpSdkService.java @@ -1,6 +1,7 @@ package com.qaup.adxp.adapter.service; import com.qaup.adxp.adapter.dto.FlightMessage; +import com.qaup.adxp.adapter.websocket.AdxpWebSocketHandler; import com.taocares.adxp.client.ADXPClient; import com.taocares.adxp.client.ADXPClientFactory; import com.taocares.adxp.model.LoginResult; @@ -8,14 +9,15 @@ import com.taocares.adxp.model.MessageResult; import com.taocares.adxp.model.MessageList; import com.taocares.adxp.model.MsgType; import com.taocares.adxp.model.HeadType; +import com.taocares.adxp.AdxpConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -37,10 +39,12 @@ public class AdxpSdkService { @Value("${adxp.password:Dianxin#2025}") private String defaultPassword; - // 连接状态管理 + @Autowired + private AdxpWebSocketHandler adxpWebSocketHandler; + + // 单一连接状态管理 private volatile boolean connected = false; - private volatile String currentSessionId = null; - private ADXPClient currentClient = null; + private ADXPClient adxpClient = null; // 定时重连任务 private final ScheduledExecutorService reconnectScheduler = Executors.newSingleThreadScheduledExecutor(); @@ -52,8 +56,6 @@ public class AdxpSdkService { log.info("ADXP适配器启动,主动连接数据中台..."); try { connectToADXP(); - // 启动重连监控 - startReconnectMonitor(); } catch (Exception e) { log.error("ADXP数据中台初始连接失败: {}", e.getMessage()); // 启动定时重连 @@ -77,30 +79,30 @@ public class AdxpSdkService { host, port, defaultUsername); // 创建客户端 - currentClient = ADXPClientFactory.createWSClient(host, port); + adxpClient = ADXPClientFactory.createWSClient(host, port); // 登录 - LoginResult result = currentClient.login(defaultUsername, defaultPassword); + LoginResult result = adxpClient.login(defaultUsername, defaultPassword); - currentSessionId = UUID.randomUUID().toString(); - connected = true; + boolean loginSuccess = result != null && Boolean.TRUE.equals(result.isSuccess()); - if (result == null || !Boolean.TRUE.equals(result.isSuccess())) { + if (loginSuccess) { + connected = true; + log.info("用户: {} 登录平台成功", defaultUsername); + // 启动消息监听 + startMessageListener(); + } else { + connected = false; String errorMsg = result != null ? String.format("code=%s, message=%s", result.getCode(), result.getMessage()) : "登录响应为空"; - log.warn("登录响应解析失败: {} - 但连接可能已建立", errorMsg); - } else { - log.info("ADXP数据中台连接成功! sessionId={}", currentSessionId); + log.error("用户: {} 登录平台失败: {}", defaultUsername, errorMsg); + throw new RuntimeException("ADXP登录失败: " + errorMsg); } - // 启动消息监听 - startMessageListener(); - } catch (Exception e) { connected = false; - currentClient = null; - currentSessionId = null; + adxpClient = null; log.error("ADXP数据中台连接失败: {}", e.getMessage()); throw new RuntimeException("ADXP连接失败", e); } @@ -111,12 +113,11 @@ public class AdxpSdkService { private void disconnectFromADXP() { synchronized (lock) { try { - if (currentClient != null) { - currentClient.logout(defaultUsername, defaultPassword); + if (adxpClient != null) { + adxpClient.logout(defaultUsername, defaultPassword); } connected = false; - currentSessionId = null; - currentClient = null; + adxpClient = null; log.info("已断开ADXP数据中台连接"); } catch (Exception e) { log.error("断开连接时发生错误", e); @@ -124,12 +125,6 @@ public class AdxpSdkService { } } - // 启动重连监控 - private void startReconnectMonitor() { - // 这里可以添加连接健康检查 - log.info("连接监控已启动"); - } - // 定时重连任务 private void scheduleReconnect() { reconnectScheduler.scheduleAtFixedRate(() -> { @@ -149,16 +144,17 @@ public class AdxpSdkService { * 检查连接状态 - 供外部使用 */ public boolean isConnected() { - return connected && currentClient != null; + return connected && adxpClient != null; } + // 旧的healthCheck方法已移除,使用返回Map的新版本 + /** * 获取当前连接信息 */ public String getConnectionInfo() { if (connected) { - return String.format("已连接到ADXP数据中台 (host=%s, port=%s, sessionId=%s)", - host, port, currentSessionId); + return String.format("已连接到ADXP数据中台 (host=%s, port=%s)", host, port); } else { return "未连接到ADXP数据中台"; } @@ -185,12 +181,12 @@ public class AdxpSdkService { private void startMessageListener() { Thread messageListener = new Thread(() -> { log.info("ADXP消息监听线程已启动"); - while (connected && currentClient != null) { + while (connected && adxpClient != null) { try { - List messages = receiveMessagesInternal(); + List messages = receiveMessages(); if (!messages.isEmpty()) { log.info("接收到 {} 条消息", messages.size()); - // 这里可以将消息发送到WebSocket或消息队列 + // 处理接收到的消息 processMessages(messages); } Thread.sleep(1000); // 每秒检查一次 @@ -214,15 +210,20 @@ public class AdxpSdkService { } /** - * 内部消息接收方法 + * 消息接收方法 */ - private List receiveMessagesInternal() { - if (!connected || currentClient == null) { + private List receiveMessages() { + if (!connected || adxpClient == null) { return Collections.emptyList(); } try { - MessageResult result = currentClient.receiveMessage(); + MessageResult result = adxpClient.receiveMessage(); + + // 使用统一的错误码处理方法 + if (handleMessageResultError(result)) { + return Collections.emptyList(); + } if (result == null || !Boolean.TRUE.equals(result.isSuccess())) { return Collections.emptyList(); @@ -250,33 +251,65 @@ public class AdxpSdkService { return messages; } catch (Exception e) { - log.error("接收消息异常", e); + handleConnectionError(e); return Collections.emptyList(); } } + private void handleConnectionError(Exception e) { + log.error("ADXP连接错误: {}", e.getMessage()); + // 对于所有连接错误,直接尝试重新连接 + log.info("尝试重新连接ADXP服务器"); + reconnect(); + } + /** - * 处理接收到的消息 + * 处理接收到的消息并广播到WebSocket客户端 */ private void processMessages(List messages) { - // TODO: 实现消息处理逻辑 - // 可以发送到消息队列、WebSocket推送等 - for (FlightMessage message : messages) { - log.debug("处理消息: serviceCode={}", message.getServiceCode()); + if (messages != null && !messages.isEmpty()) { + try { + log.info("接收到 {} 条消息,准备广播", messages.size()); + // 将消息广播到WebSocket客户端 + if (adxpWebSocketHandler != null) { + adxpWebSocketHandler.broadcastMessages(messages); + } + } catch (Exception e) { + log.error("广播消息失败: {}", e.getMessage()); + } } } + + /** + * 处理消息结果错误码 + */ + private boolean handleMessageResultError(MessageResult result) { + if (result == null) { + return false; + } + + int code = result.getCode(); + // 使用正确的常量类引用 + if (code == AdxpConstants.RC_CLIENT_NOT_LOGGED_IN || + code == AdxpConstants.RC_TOKEN_EXPIRED || + code == AdxpConstants.RC_CLIENT_EXCEPTION || + code == AdxpConstants.RC_REMOTE_EXCEPTION) { + log.info("遇到错误码 {},需要重新连接", code); + reconnect(); + return true; + } + return false; + } /** * 获取消息 - 供外部调用 */ public List getMessages() { - return receiveMessagesInternal(); + return receiveMessages(); } /** * 断开连接 - 主动断开与ADXP数据中台的连接 - * 注意:在ADXP主动连接架构下,适配器启动时自动连接 - * 该方法主要用于管理连接状态或强制断开连接 */ public void disconnectFromADXPServer() { log.info("主动断开ADXP数据中台连接"); @@ -284,20 +317,36 @@ public class AdxpSdkService { } /** - * 向后兼容的logout方法 - 调用disconnectFromADXPServer - * @deprecated 请使用 {@link #disconnectFromADXPServer()} 替代 + * 重新连接方法 */ - @Deprecated - public void logout() { - log.warn("logout()方法已废弃,请使用 disconnectFromADXPServer() 替代"); - disconnectFromADXPServer(); - } - - /** - * 获取连接状态统计 - */ - public int getActiveSessionCount() { - return connected ? 1 : 0; + private void reconnect() { + synchronized (lock) { + try { + log.info("尝试重新连接ADXP数据中台"); + // 先断开现有连接 + if (adxpClient != null) { + try { + adxpClient.logout(defaultUsername, defaultPassword); + } catch (Exception ignored) {} + } + + // 重新创建客户端并登录 + adxpClient = ADXPClientFactory.createWSClient(host, port); + LoginResult loginResult = adxpClient.login(defaultUsername, defaultPassword); + + if (loginResult != null && Boolean.TRUE.equals(loginResult.isSuccess())) { + connected = true; + log.info("重新连接成功"); + } else { + connected = false; + log.error("重新连接失败"); + } + } catch (Exception e) { + connected = false; + adxpClient = null; + log.error("重新连接异常: {}", e.getMessage()); + } + } } /** @@ -309,8 +358,6 @@ public class AdxpSdkService { health.put("connected", connected); health.put("host", host); health.put("port", port); - health.put("sessionId", currentSessionId); - health.put("activeConnections", getActiveSessionCount()); if (connected) { health.put("message", "ADXP数据中台连接正常"); diff --git a/adxp-adapter/src/main/java/com/qaup/adxp/adapter/service/MessageListenerService.java b/adxp-adapter/src/main/java/com/qaup/adxp/adapter/service/MessageListenerService.java index 1ea333f6..7527237c 100644 --- a/adxp-adapter/src/main/java/com/qaup/adxp/adapter/service/MessageListenerService.java +++ b/adxp-adapter/src/main/java/com/qaup/adxp/adapter/service/MessageListenerService.java @@ -8,8 +8,6 @@ import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; @Service @@ -23,92 +21,26 @@ public class MessageListenerService { @Autowired private AdxpWebSocketHandler adxpWebSocketHandler; - private ExecutorService executorService; private final AtomicBoolean isRunning = new AtomicBoolean(false); + /** + * 服务启动时的初始化方法 + * 注意:消息监听逻辑已移至AdxpSdkService中,本服务仅负责管理和监控 + */ @PostConstruct public void start() { if (isRunning.compareAndSet(false, true)) { - executorService = Executors.newSingleThreadExecutor(r -> { - Thread t = new Thread(r, "ADXP-Message-Listener"); - t.setDaemon(true); - return t; - }); - - executorService.submit(this::listenForMessages); - log.info("消息监听服务已启动"); + log.info("消息监听服务已启动 - 消息监听逻辑已移至AdxpSdkService"); } } @PreDestroy public void stop() { if (isRunning.compareAndSet(true, false)) { - if (executorService != null) { - executorService.shutdownNow(); - } log.info("消息监听服务已停止"); } } - private void listenForMessages() { - log.info("开始监听数据中台消息"); - - // 获取所有活跃的会话ID - while (isRunning.get()) { - try { - // 检查ADXP连接状态 - if (!adxpSdkService.isConnected()) { - log.debug("ADXP数据中台连接未建立,等待重连..."); - Thread.sleep(2000); - continue; - } - - // 获取当前会话数量(单连接架构下为1或0) - int sessionCount = adxpSdkService.getActiveSessionCount(); - if (sessionCount == 0) { - log.debug("当前没有活跃的会话,等待连接..."); - Thread.sleep(1000); - continue; - } - - // 单连接架构下,直接接收消息 - try { - // 调用SDK接收消息 - java.util.List messages = - adxpSdkService.getMessages(); - - // 如果有消息,广播给所有WebSocket客户端 - if (messages != null && !messages.isEmpty()) { - adxpWebSocketHandler.broadcastMessages(messages); - log.info("接收到 {} 条航班消息并广播给 {} 个WebSocket客户端", - messages.size(), adxpWebSocketHandler.getSessionCount()); - } - } catch (Exception e) { - log.error("处理ADXP消息失败", e); - } - - // 短暂休眠避免过于频繁的轮询 - Thread.sleep(100); - } catch (InterruptedException e) { - log.info("消息监听线程被中断"); - Thread.currentThread().interrupt(); - break; - } catch (Exception e) { - log.error("消息监听过程中发生错误", e); - - // 出错后短暂休眠再重试 - try { - Thread.sleep(1000); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - break; - } - } - } - - log.info("消息监听服务已停止监听"); - } - /** * 获取服务统计信息 */ @@ -116,11 +48,9 @@ public class MessageListenerService { return String.format("MessageListenerService Stats: " + "isRunning=%s, " + "adxpConnected=%s, " + - "adxpActiveSessions=%d, " + "webSocketClients=%d", isRunning.get(), adxpSdkService.isConnected(), - adxpSdkService.getActiveSessionCount(), adxpWebSocketHandler.getSessionCount()); } } \ No newline at end of file diff --git a/adxp-adapter/src/main/java/com/qaup/adxp/adapter/websocket/AdxpWebSocketHandler.java b/adxp-adapter/src/main/java/com/qaup/adxp/adapter/websocket/AdxpWebSocketHandler.java index 871455e7..dc02107d 100644 --- a/adxp-adapter/src/main/java/com/qaup/adxp/adapter/websocket/AdxpWebSocketHandler.java +++ b/adxp-adapter/src/main/java/com/qaup/adxp/adapter/websocket/AdxpWebSocketHandler.java @@ -1,32 +1,30 @@ package com.qaup.adxp.adapter.websocket; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; import com.qaup.adxp.adapter.dto.FlightMessage; -import com.qaup.adxp.adapter.service.AdxpSdkService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; -import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; public class AdxpWebSocketHandler extends TextWebSocketHandler { private static final Logger log = LoggerFactory.getLogger(AdxpWebSocketHandler.class); private final List sessions = new CopyOnWriteArrayList<>(); - private final AdxpSdkService adxpSdkService; private final ObjectMapper objectMapper; - private final AtomicBoolean isRunning = new AtomicBoolean(false); - public AdxpWebSocketHandler(AdxpSdkService adxpSdkService) { - this.adxpSdkService = adxpSdkService; + public AdxpWebSocketHandler() { this.objectMapper = new ObjectMapper(); + // 配置ObjectMapper以更好地处理日期和其他序列化问题 + this.objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); } @Override @@ -64,41 +62,47 @@ public class AdxpWebSocketHandler extends TextWebSocketHandler { * 向所有连接的客户端广播消息 */ public void broadcastMessages(List messages) { - if (messages == null || messages.isEmpty()) { + if (messages == null || messages.isEmpty() || sessions.isEmpty()) { return; } try { + // 序列化消息列表为JSON String jsonMessage = objectMapper.writeValueAsString(messages); TextMessage textMessage = new TextMessage(jsonMessage); - int successCount = 0; - int failedCount = 0; + // 清理已关闭的会话 + cleanupClosedSessions(); + int clientCount = sessions.size(); + + // 向所有活跃客户端广播消息 for (WebSocketSession session : sessions) { if (session.isOpen()) { try { session.sendMessage(textMessage); - successCount++; } catch (Exception e) { log.error("发送消息失败: sessionId={}", session.getId(), e); - // 如果发送失败,移除会话 sessions.remove(session); - failedCount++; } - } else { - sessions.remove(session); - failedCount++; } } - log.debug("广播消息完成: 消息数量={}, 连接数={}, 成功发送={}, 失败={}", - messages.size(), sessions.size(), successCount, failedCount); + if (log.isDebugEnabled()) { + log.debug("已向 {} 个客户端广播 {} 条消息", clientCount, messages.size()); + } } catch (Exception e) { log.error("序列化或广播消息失败", e); } } + /** + * 清理已关闭的WebSocket会话 + */ + private void cleanupClosedSessions() { + sessions.removeIf(session -> !session.isOpen()); + } + /** * 获取当前连接数 */ @@ -129,10 +133,10 @@ public class AdxpWebSocketHandler extends TextWebSocketHandler { */ public List getConnectionDetails() { return sessions.stream() - .map(session -> String.format("ID: %s, Remote Address: %s, Open: %s", + .filter(WebSocketSession::isOpen) + .map(session -> String.format("ID: %s, Remote Address: %s", session.getId(), - session.getRemoteAddress(), - session.isOpen())) - .collect(java.util.stream.Collectors.toList()); + session.getRemoteAddress())) + .collect(Collectors.toList()); } } \ No newline at end of file