规范 ADXP 适配器的代码

This commit is contained in:
Tian jianyong 2025-11-21 15:57:33 +08:00
parent f104f2f6ac
commit 78fc32db1c
5 changed files with 142 additions and 161 deletions

View File

@ -18,7 +18,7 @@ public class WebSocketConfig implements WebSocketConfigurer {
@Bean
public AdxpWebSocketHandler adxpWebSocketHandler() {
return new AdxpWebSocketHandler(adxpSdkService);
return new AdxpWebSocketHandler();
}
@Override

View File

@ -1,6 +1,7 @@
package com.qaup.adxp.adapter.service;
import com.qaup.adxp.adapter.dto.FlightMessage;
import com.qaup.adxp.adapter.websocket.AdxpWebSocketHandler;
import com.taocares.adxp.client.ADXPClient;
import com.taocares.adxp.client.ADXPClientFactory;
import com.taocares.adxp.model.LoginResult;
@ -8,14 +9,15 @@ import com.taocares.adxp.model.MessageResult;
import com.taocares.adxp.model.MessageList;
import com.taocares.adxp.model.MsgType;
import com.taocares.adxp.model.HeadType;
import com.taocares.adxp.AdxpConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -37,10 +39,12 @@ public class AdxpSdkService {
@Value("${adxp.password:Dianxin#2025}")
private String defaultPassword;
// 连接状态管理
@Autowired
private AdxpWebSocketHandler adxpWebSocketHandler;
// 单一连接状态管理
private volatile boolean connected = false;
private volatile String currentSessionId = null;
private ADXPClient currentClient = null;
private ADXPClient adxpClient = null;
// 定时重连任务
private final ScheduledExecutorService reconnectScheduler = Executors.newSingleThreadScheduledExecutor();
@ -52,8 +56,6 @@ public class AdxpSdkService {
log.info("ADXP适配器启动主动连接数据中台...");
try {
connectToADXP();
// 启动重连监控
startReconnectMonitor();
} catch (Exception e) {
log.error("ADXP数据中台初始连接失败: {}", e.getMessage());
// 启动定时重连
@ -77,30 +79,30 @@ public class AdxpSdkService {
host, port, defaultUsername);
// 创建客户端
currentClient = ADXPClientFactory.createWSClient(host, port);
adxpClient = ADXPClientFactory.createWSClient(host, port);
// 登录
LoginResult result = currentClient.login(defaultUsername, defaultPassword);
LoginResult result = adxpClient.login(defaultUsername, defaultPassword);
currentSessionId = UUID.randomUUID().toString();
connected = true;
boolean loginSuccess = result != null && Boolean.TRUE.equals(result.isSuccess());
if (result == null || !Boolean.TRUE.equals(result.isSuccess())) {
if (loginSuccess) {
connected = true;
log.info("用户: {} 登录平台成功", defaultUsername);
// 启动消息监听
startMessageListener();
} else {
connected = false;
String errorMsg = result != null ?
String.format("code=%s, message=%s", result.getCode(), result.getMessage()) :
"登录响应为空";
log.warn("登录响应解析失败: {} - 但连接可能已建立", errorMsg);
} else {
log.info("ADXP数据中台连接成功! sessionId={}", currentSessionId);
log.error("用户: {} 登录平台失败: {}", defaultUsername, errorMsg);
throw new RuntimeException("ADXP登录失败: " + errorMsg);
}
// 启动消息监听
startMessageListener();
} catch (Exception e) {
connected = false;
currentClient = null;
currentSessionId = null;
adxpClient = null;
log.error("ADXP数据中台连接失败: {}", e.getMessage());
throw new RuntimeException("ADXP连接失败", e);
}
@ -111,12 +113,11 @@ public class AdxpSdkService {
private void disconnectFromADXP() {
synchronized (lock) {
try {
if (currentClient != null) {
currentClient.logout(defaultUsername, defaultPassword);
if (adxpClient != null) {
adxpClient.logout(defaultUsername, defaultPassword);
}
connected = false;
currentSessionId = null;
currentClient = null;
adxpClient = null;
log.info("已断开ADXP数据中台连接");
} catch (Exception e) {
log.error("断开连接时发生错误", e);
@ -124,12 +125,6 @@ public class AdxpSdkService {
}
}
// 启动重连监控
private void startReconnectMonitor() {
// 这里可以添加连接健康检查
log.info("连接监控已启动");
}
// 定时重连任务
private void scheduleReconnect() {
reconnectScheduler.scheduleAtFixedRate(() -> {
@ -149,16 +144,17 @@ public class AdxpSdkService {
* 检查连接状态 - 供外部使用
*/
public boolean isConnected() {
return connected && currentClient != null;
return connected && adxpClient != null;
}
// 旧的healthCheck方法已移除使用返回Map的新版本
/**
* 获取当前连接信息
*/
public String getConnectionInfo() {
if (connected) {
return String.format("已连接到ADXP数据中台 (host=%s, port=%s, sessionId=%s)",
host, port, currentSessionId);
return String.format("已连接到ADXP数据中台 (host=%s, port=%s)", host, port);
} else {
return "未连接到ADXP数据中台";
}
@ -185,12 +181,12 @@ public class AdxpSdkService {
private void startMessageListener() {
Thread messageListener = new Thread(() -> {
log.info("ADXP消息监听线程已启动");
while (connected && currentClient != null) {
while (connected && adxpClient != null) {
try {
List<FlightMessage> messages = receiveMessagesInternal();
List<FlightMessage> messages = receiveMessages();
if (!messages.isEmpty()) {
log.info("接收到 {} 条消息", messages.size());
// 这里可以将消息发送到WebSocket或消息队列
// 处理接收到的消息
processMessages(messages);
}
Thread.sleep(1000); // 每秒检查一次
@ -214,15 +210,20 @@ public class AdxpSdkService {
}
/**
* 内部消息接收方法
* 消息接收方法
*/
private List<FlightMessage> receiveMessagesInternal() {
if (!connected || currentClient == null) {
private List<FlightMessage> receiveMessages() {
if (!connected || adxpClient == null) {
return Collections.emptyList();
}
try {
MessageResult result = currentClient.receiveMessage();
MessageResult result = adxpClient.receiveMessage();
// 使用统一的错误码处理方法
if (handleMessageResultError(result)) {
return Collections.emptyList();
}
if (result == null || !Boolean.TRUE.equals(result.isSuccess())) {
return Collections.emptyList();
@ -250,33 +251,65 @@ public class AdxpSdkService {
return messages;
} catch (Exception e) {
log.error("接收消息异常", e);
handleConnectionError(e);
return Collections.emptyList();
}
}
private void handleConnectionError(Exception e) {
log.error("ADXP连接错误: {}", e.getMessage());
// 对于所有连接错误直接尝试重新连接
log.info("尝试重新连接ADXP服务器");
reconnect();
}
/**
* 处理接收到的消息
* 处理接收到的消息并广播到WebSocket客户端
*/
private void processMessages(List<FlightMessage> messages) {
// TODO: 实现消息处理逻辑
// 可以发送到消息队列WebSocket推送等
for (FlightMessage message : messages) {
log.debug("处理消息: serviceCode={}", message.getServiceCode());
if (messages != null && !messages.isEmpty()) {
try {
log.info("接收到 {} 条消息,准备广播", messages.size());
// 将消息广播到WebSocket客户端
if (adxpWebSocketHandler != null) {
adxpWebSocketHandler.broadcastMessages(messages);
}
} catch (Exception e) {
log.error("广播消息失败: {}", e.getMessage());
}
}
}
/**
* 处理消息结果错误码
*/
private boolean handleMessageResultError(MessageResult result) {
if (result == null) {
return false;
}
int code = result.getCode();
// 使用正确的常量类引用
if (code == AdxpConstants.RC_CLIENT_NOT_LOGGED_IN ||
code == AdxpConstants.RC_TOKEN_EXPIRED ||
code == AdxpConstants.RC_CLIENT_EXCEPTION ||
code == AdxpConstants.RC_REMOTE_EXCEPTION) {
log.info("遇到错误码 {},需要重新连接", code);
reconnect();
return true;
}
return false;
}
/**
* 获取消息 - 供外部调用
*/
public List<FlightMessage> getMessages() {
return receiveMessagesInternal();
return receiveMessages();
}
/**
* 断开连接 - 主动断开与ADXP数据中台的连接
* 注意在ADXP主动连接架构下适配器启动时自动连接
* 该方法主要用于管理连接状态或强制断开连接
*/
public void disconnectFromADXPServer() {
log.info("主动断开ADXP数据中台连接");
@ -284,20 +317,36 @@ public class AdxpSdkService {
}
/**
* 向后兼容的logout方法 - 调用disconnectFromADXPServer
* @deprecated 请使用 {@link #disconnectFromADXPServer()} 替代
* 重新连接方法
*/
@Deprecated
public void logout() {
log.warn("logout()方法已废弃,请使用 disconnectFromADXPServer() 替代");
disconnectFromADXPServer();
}
/**
* 获取连接状态统计
*/
public int getActiveSessionCount() {
return connected ? 1 : 0;
private void reconnect() {
synchronized (lock) {
try {
log.info("尝试重新连接ADXP数据中台");
// 先断开现有连接
if (adxpClient != null) {
try {
adxpClient.logout(defaultUsername, defaultPassword);
} catch (Exception ignored) {}
}
// 重新创建客户端并登录
adxpClient = ADXPClientFactory.createWSClient(host, port);
LoginResult loginResult = adxpClient.login(defaultUsername, defaultPassword);
if (loginResult != null && Boolean.TRUE.equals(loginResult.isSuccess())) {
connected = true;
log.info("重新连接成功");
} else {
connected = false;
log.error("重新连接失败");
}
} catch (Exception e) {
connected = false;
adxpClient = null;
log.error("重新连接异常: {}", e.getMessage());
}
}
}
/**
@ -309,8 +358,6 @@ public class AdxpSdkService {
health.put("connected", connected);
health.put("host", host);
health.put("port", port);
health.put("sessionId", currentSessionId);
health.put("activeConnections", getActiveSessionCount());
if (connected) {
health.put("message", "ADXP数据中台连接正常");

View File

@ -8,8 +8,6 @@ import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
@Service
@ -23,92 +21,26 @@ public class MessageListenerService {
@Autowired
private AdxpWebSocketHandler adxpWebSocketHandler;
private ExecutorService executorService;
private final AtomicBoolean isRunning = new AtomicBoolean(false);
/**
* 服务启动时的初始化方法
* 注意消息监听逻辑已移至AdxpSdkService中本服务仅负责管理和监控
*/
@PostConstruct
public void start() {
if (isRunning.compareAndSet(false, true)) {
executorService = Executors.newSingleThreadExecutor(r -> {
Thread t = new Thread(r, "ADXP-Message-Listener");
t.setDaemon(true);
return t;
});
executorService.submit(this::listenForMessages);
log.info("消息监听服务已启动");
log.info("消息监听服务已启动 - 消息监听逻辑已移至AdxpSdkService");
}
}
@PreDestroy
public void stop() {
if (isRunning.compareAndSet(true, false)) {
if (executorService != null) {
executorService.shutdownNow();
}
log.info("消息监听服务已停止");
}
}
private void listenForMessages() {
log.info("开始监听数据中台消息");
// 获取所有活跃的会话ID
while (isRunning.get()) {
try {
// 检查ADXP连接状态
if (!adxpSdkService.isConnected()) {
log.debug("ADXP数据中台连接未建立等待重连...");
Thread.sleep(2000);
continue;
}
// 获取当前会话数量单连接架构下为1或0
int sessionCount = adxpSdkService.getActiveSessionCount();
if (sessionCount == 0) {
log.debug("当前没有活跃的会话,等待连接...");
Thread.sleep(1000);
continue;
}
// 单连接架构下直接接收消息
try {
// 调用SDK接收消息
java.util.List<com.qaup.adxp.adapter.dto.FlightMessage> messages =
adxpSdkService.getMessages();
// 如果有消息广播给所有WebSocket客户端
if (messages != null && !messages.isEmpty()) {
adxpWebSocketHandler.broadcastMessages(messages);
log.info("接收到 {} 条航班消息并广播给 {} 个WebSocket客户端",
messages.size(), adxpWebSocketHandler.getSessionCount());
}
} catch (Exception e) {
log.error("处理ADXP消息失败", e);
}
// 短暂休眠避免过于频繁的轮询
Thread.sleep(100);
} catch (InterruptedException e) {
log.info("消息监听线程被中断");
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("消息监听过程中发生错误", e);
// 出错后短暂休眠再重试
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
log.info("消息监听服务已停止监听");
}
/**
* 获取服务统计信息
*/
@ -116,11 +48,9 @@ public class MessageListenerService {
return String.format("MessageListenerService Stats: " +
"isRunning=%s, " +
"adxpConnected=%s, " +
"adxpActiveSessions=%d, " +
"webSocketClients=%d",
isRunning.get(),
adxpSdkService.isConnected(),
adxpSdkService.getActiveSessionCount(),
adxpWebSocketHandler.getSessionCount());
}
}

View File

@ -1,32 +1,30 @@
package com.qaup.adxp.adapter.websocket;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.qaup.adxp.adapter.dto.FlightMessage;
import com.qaup.adxp.adapter.service.AdxpSdkService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
public class AdxpWebSocketHandler extends TextWebSocketHandler {
private static final Logger log = LoggerFactory.getLogger(AdxpWebSocketHandler.class);
private final List<WebSocketSession> sessions = new CopyOnWriteArrayList<>();
private final AdxpSdkService adxpSdkService;
private final ObjectMapper objectMapper;
private final AtomicBoolean isRunning = new AtomicBoolean(false);
public AdxpWebSocketHandler(AdxpSdkService adxpSdkService) {
this.adxpSdkService = adxpSdkService;
public AdxpWebSocketHandler() {
this.objectMapper = new ObjectMapper();
// 配置ObjectMapper以更好地处理日期和其他序列化问题
this.objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
}
@Override
@ -64,41 +62,47 @@ public class AdxpWebSocketHandler extends TextWebSocketHandler {
* 向所有连接的客户端广播消息
*/
public void broadcastMessages(List<FlightMessage> messages) {
if (messages == null || messages.isEmpty()) {
if (messages == null || messages.isEmpty() || sessions.isEmpty()) {
return;
}
try {
// 序列化消息列表为JSON
String jsonMessage = objectMapper.writeValueAsString(messages);
TextMessage textMessage = new TextMessage(jsonMessage);
int successCount = 0;
int failedCount = 0;
// 清理已关闭的会话
cleanupClosedSessions();
int clientCount = sessions.size();
// 向所有活跃客户端广播消息
for (WebSocketSession session : sessions) {
if (session.isOpen()) {
try {
session.sendMessage(textMessage);
successCount++;
} catch (Exception e) {
log.error("发送消息失败: sessionId={}", session.getId(), e);
// 如果发送失败移除会话
sessions.remove(session);
failedCount++;
}
} else {
sessions.remove(session);
failedCount++;
}
}
log.debug("广播消息完成: 消息数量={}, 连接数={}, 成功发送={}, 失败={}",
messages.size(), sessions.size(), successCount, failedCount);
if (log.isDebugEnabled()) {
log.debug("已向 {} 个客户端广播 {} 条消息", clientCount, messages.size());
}
} catch (Exception e) {
log.error("序列化或广播消息失败", e);
}
}
/**
* 清理已关闭的WebSocket会话
*/
private void cleanupClosedSessions() {
sessions.removeIf(session -> !session.isOpen());
}
/**
* 获取当前连接数
*/
@ -129,10 +133,10 @@ public class AdxpWebSocketHandler extends TextWebSocketHandler {
*/
public List<String> getConnectionDetails() {
return sessions.stream()
.map(session -> String.format("ID: %s, Remote Address: %s, Open: %s",
.filter(WebSocketSession::isOpen)
.map(session -> String.format("ID: %s, Remote Address: %s",
session.getId(),
session.getRemoteAddress(),
session.isOpen()))
.collect(java.util.stream.Collectors.toList());
session.getRemoteAddress()))
.collect(Collectors.toList());
}
}