把 adxp客户端和适配器之间改成websocket通信

This commit is contained in:
Tian jianyong 2025-10-20 18:14:57 +08:00
parent 85253e94a5
commit a07f686aec
19 changed files with 1674 additions and 25 deletions

139
README_WEBSOCKET.md Normal file
View File

@ -0,0 +1,139 @@
# ADXP WebSocket 实时消息传输系统
## 系统架构
本系统实现了基于WebSocket的实时消息传输机制用于替代原有的轮询方式提高数据传输的实时性和系统效率。
### 组件构成
1. **adxp-adapter服务**
- 作为ADXP SDK与QAUP系统的桥梁
- 提供WebSocket服务端点
- 持续监听数据中台消息并通过WebSocket广播
2. **QAUP系统**
- 作为WebSocket客户端连接adxp-adapter
- 实时接收航班通知消息
- 处理消息并触发后续业务逻辑
## 部署和启动
### 1. 启动adxp-adapter服务
```bash
cd /Users/tianjianyong/apps/Company/QAUP-Management/adxp-adapter
./start.sh
```
或者手动启动:
```bash
cd /Users/tianjianyong/apps/Company/QAUP-Management/adxp-adapter
mvn spring-boot:run
```
### 2. 启动QAUP系统
```bash
cd /Users/tianjianyong/apps/Company/QAUP-Management/qaup-admin
mvn spring-boot:run
```
## WebSocket端点
- **服务端点**: `ws://localhost:8086/ws/flight-notifications`
- **健康检查**: `http://localhost:8086/actuator/health`
## 配置说明
### adxp-adapter配置 (application.yml)
```yaml
adxp:
host: localhost # 数据中台地址
port: 7001 # 数据中台端口
server:
port: 8086 # 适配器服务端口
```
### QAUP系统配置 (application-dev.yml)
```yaml
data:
collector:
adxp-adapter:
host: localhost # adxp-adapter地址
port: 8086 # adxp-adapter端口
username: dianxin # 登录用户名
password: dianxin@123 # 登录密码
websocket:
enabled: true # 启用WebSocket连接
```
## 测试工具
### 1. 系统集成测试
```bash
cd /Users/tianjianyong/apps/Company/QAUP-Management/tools
./test_adxp_websocket_system.py
```
### 2. WebSocket连接测试
```bash
cd /Users/tianjianyong/apps/Company/QAUP-Management/tools
./test_adxp_websocket_integration.py
```
### 3. 简单HTML测试页面
```bash
cd /Users/tianjianyong/apps/Company/QAUP-Management/tools
open test_adxp_websocket.html
```
## 日志监控
### adxp-adapter日志
- WebSocket连接建立/断开
- 消息广播统计
- 错误信息记录
### QAUP系统日志
- WebSocket客户端连接状态
- 消息接收和处理
- 错误和异常处理
## 性能优势
1. **实时性提升**: 消息从数据中台到QAUP系统的延迟从250ms降低到几乎实时
2. **资源消耗降低**: 避免频繁的HTTP请求减少网络和CPU开销
3. **系统稳定性增强**: 避免HTTP超时和轮询堆积问题
4. **扩展性改善**: 支持多客户端订阅同一数据流
## 故障排除
### 1. 连接失败
检查以下配置:
- adxp-adapter服务是否正常运行
- 端口配置是否正确
- 网络连接是否正常
### 2. 消息接收异常
检查以下方面:
- 数据中台连接是否正常
- ADXP SDK是否正常工作
- 消息解析逻辑是否正确
### 3. 性能问题
监控以下指标:
- WebSocket连接数
- 消息处理速率
- 内存和CPU使用情况

View File

@ -40,6 +40,12 @@
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Boot WebSocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- Spring Boot Actuator (health check) -->
<dependency>
<groupId>org.springframework.boot</groupId>

View File

@ -0,0 +1,30 @@
package com.qaup.adxp.adapter.config;
import com.qaup.adxp.adapter.service.AdxpSdkService;
import com.qaup.adxp.adapter.websocket.AdxpWebSocketHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Autowired
private AdxpSdkService adxpSdkService;
@Bean
public AdxpWebSocketHandler adxpWebSocketHandler() {
return new AdxpWebSocketHandler(adxpSdkService);
}
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// 注册WebSocket处理器
registry.addHandler(adxpWebSocketHandler(), "/ws/flight-notifications")
.setAllowedOrigins("*");
}
}

View File

@ -2,6 +2,7 @@ package com.qaup.adxp.adapter.controller;
import com.qaup.adxp.adapter.dto.*;
import com.qaup.adxp.adapter.service.AdxpSdkService;
import com.qaup.adxp.adapter.websocket.AdxpWebSocketHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -20,6 +21,9 @@ public class AdxpController {
@Autowired
private AdxpSdkService adxpSdkService;
@Autowired(required = false)
private AdxpWebSocketHandler adxpWebSocketHandler;
/**
* 登录接口
@ -79,6 +83,9 @@ public class AdxpController {
Map<String, Object> health = new HashMap<String, Object>();
health.put("status", "UP");
health.put("activeSessions", adxpSdkService.getActiveSessionCount());
if (adxpWebSocketHandler != null) {
health.put("websocketConnections", adxpWebSocketHandler.getSessionCount());
}
return ResponseEntity.ok(health);
}
}

View File

@ -30,7 +30,11 @@ public class AdxpSdkService {
// Session 管理: sessionId -> SessionInfo
private final Map<String, SessionInfo> sessions = new ConcurrentHashMap<>();
private static class SessionInfo {
public Map<String, SessionInfo> getSessions() {
return sessions;
}
public static class SessionInfo {
ADXPClient client;
String username;
String password;
@ -40,6 +44,18 @@ public class AdxpSdkService {
this.username = username;
this.password = password;
}
public ADXPClient getClient() {
return client;
}
public String getUsername() {
return username;
}
public String getPassword() {
return password;
}
}
/**

View File

@ -0,0 +1,119 @@
package com.qaup.adxp.adapter.service;
import com.qaup.adxp.adapter.websocket.AdxpWebSocketHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
@Service
public class MessageListenerService {
private static final Logger log = LoggerFactory.getLogger(MessageListenerService.class);
@Autowired
private AdxpSdkService adxpSdkService;
@Autowired
private AdxpWebSocketHandler adxpWebSocketHandler;
private ExecutorService executorService;
private final AtomicBoolean isRunning = new AtomicBoolean(false);
@PostConstruct
public void start() {
if (isRunning.compareAndSet(false, true)) {
executorService = Executors.newSingleThreadExecutor(r -> {
Thread t = new Thread(r, "ADXP-Message-Listener");
t.setDaemon(true);
return t;
});
executorService.submit(this::listenForMessages);
log.info("消息监听服务已启动");
}
}
@PreDestroy
public void stop() {
if (isRunning.compareAndSet(true, false)) {
if (executorService != null) {
executorService.shutdownNow();
}
log.info("消息监听服务已停止");
}
}
private void listenForMessages() {
log.info("开始监听数据中台消息");
// 获取所有活跃的会话ID
while (isRunning.get()) {
try {
// 获取当前会话数量
int sessionCount = adxpSdkService.getSessions().size();
if (sessionCount == 0) {
log.debug("当前没有活跃的会话,等待连接...");
Thread.sleep(1000);
continue;
}
// 遍历所有会话并接收消息
adxpSdkService.getSessions().forEach((sessionId, sessionInfo) -> {
try {
// 调用SDK接收消息这可能会阻塞直到有消息到达
java.util.List<com.qaup.adxp.adapter.dto.FlightMessage> messages =
adxpSdkService.receiveMessages(sessionId);
// 如果有消息广播给所有WebSocket客户端
if (messages != null && !messages.isEmpty()) {
adxpWebSocketHandler.broadcastMessages(messages);
log.info("接收到 {} 条航班消息并广播给 {} 个WebSocket客户端",
messages.size(), adxpWebSocketHandler.getSessionCount());
}
} catch (Exception e) {
log.error("处理会话消息失败: sessionId={}", sessionId, e);
}
});
// 短暂休眠避免过于频繁的轮询
Thread.sleep(100);
} catch (InterruptedException e) {
log.info("消息监听线程被中断");
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("消息监听过程中发生错误", e);
// 出错后短暂休眠再重试
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
log.info("消息监听服务已停止监听");
}
/**
* 获取服务统计信息
*/
public String getStats() {
return String.format("MessageListenerService Stats: " +
"isRunning=%s, " +
"sessions=%d, " +
"webSocketClients=%d",
isRunning.get(),
adxpSdkService.getSessions().size(),
adxpWebSocketHandler.getSessionCount());
}
}

View File

@ -0,0 +1,138 @@
package com.qaup.adxp.adapter.websocket;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.qaup.adxp.adapter.dto.FlightMessage;
import com.qaup.adxp.adapter.service.AdxpSdkService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
public class AdxpWebSocketHandler extends TextWebSocketHandler {
private static final Logger log = LoggerFactory.getLogger(AdxpWebSocketHandler.class);
private final List<WebSocketSession> sessions = new CopyOnWriteArrayList<>();
private final AdxpSdkService adxpSdkService;
private final ObjectMapper objectMapper;
private final AtomicBoolean isRunning = new AtomicBoolean(false);
public AdxpWebSocketHandler(AdxpSdkService adxpSdkService) {
this.adxpSdkService = adxpSdkService;
this.objectMapper = new ObjectMapper();
}
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
sessions.add(session);
log.info("WebSocket连接已建立: sessionId={}, 当前连接数={}", session.getId(), sessions.size());
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
// 处理客户端发送的消息如果需要
log.debug("收到客户端消息: sessionId={}, message={}", session.getId(), message.getPayload());
// 可以处理一些控制命令比如心跳
if ("ping".equals(message.getPayload())) {
session.sendMessage(new TextMessage("pong"));
log.debug("发送心跳响应到客户端: sessionId={}", session.getId());
}
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
sessions.remove(session);
log.info("WebSocket连接已关闭: sessionId={}, reason={}, 当前连接数={}",
session.getId(), status.getReason(), sessions.size());
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
log.error("WebSocket传输错误: sessionId={}", session.getId(), exception);
sessions.remove(session);
}
/**
* 向所有连接的客户端广播消息
*/
public void broadcastMessages(List<FlightMessage> messages) {
if (messages == null || messages.isEmpty()) {
return;
}
try {
String jsonMessage = objectMapper.writeValueAsString(messages);
TextMessage textMessage = new TextMessage(jsonMessage);
int successCount = 0;
int failedCount = 0;
for (WebSocketSession session : sessions) {
if (session.isOpen()) {
try {
session.sendMessage(textMessage);
successCount++;
} catch (Exception e) {
log.error("发送消息失败: sessionId={}", session.getId(), e);
// 如果发送失败移除会话
sessions.remove(session);
failedCount++;
}
} else {
sessions.remove(session);
failedCount++;
}
}
log.debug("广播消息完成: 消息数量={}, 连接数={}, 成功发送={}, 失败={}",
messages.size(), sessions.size(), successCount, failedCount);
} catch (Exception e) {
log.error("序列化或广播消息失败", e);
}
}
/**
* 获取当前连接数
*/
public int getSessionCount() {
return sessions.size();
}
/**
* 向指定会话发送消息
*/
public void sendMessage(WebSocketSession session, List<FlightMessage> messages) {
if (messages == null || messages.isEmpty() || session == null || !session.isOpen()) {
return;
}
try {
String jsonMessage = objectMapper.writeValueAsString(messages);
TextMessage textMessage = new TextMessage(jsonMessage);
session.sendMessage(textMessage);
log.debug("向会话 {} 发送 {} 条消息", session.getId(), messages.size());
} catch (Exception e) {
log.error("向会话 {} 发送消息失败", session.getId(), e);
}
}
/**
* 获取连接详细信息
*/
public List<String> getConnectionDetails() {
return sessions.stream()
.map(session -> String.format("ID: %s, Remote Address: %s, Open: %s",
session.getId(),
session.getRemoteAddress(),
session.isOpen()))
.collect(java.util.stream.Collectors.toList());
}
}

View File

@ -2,32 +2,50 @@
# ADXP Adapter 启动脚本
echo "🚀 启动 ADXP SDK 适配器服务..."
echo ""
# 项目路径
PROJECT_DIR="/Users/tianjianyong/apps/Company/QAUP-Management/adxp-adapter"
JAR_FILE="$PROJECT_DIR/target/adxp-adapter.jar"
# 自动查找 Java 8
if [ -z "$JAVA_8_HOME" ]; then
JAVA_8_HOME=$(/usr/libexec/java_home -v 1.8 2>/dev/null)
fi
if [ -z "$JAVA_8_HOME" ]; then
echo "❌ 错误: 未找到 Java 8"
echo ""
echo "请先安装 Java 8:"
echo " brew install --cask temurin8"
echo ""
echo "或者手动设置 JAVA_8_HOME:"
echo " export JAVA_8_HOME=/path/to/jdk8"
# 检查项目目录是否存在
if [ ! -d "$PROJECT_DIR" ]; then
echo "❌ 项目目录不存在: $PROJECT_DIR"
exit 1
fi
echo "Java 版本: $($JAVA_8_HOME/bin/java -version 2>&1 | head -1)"
echo ""
echo "配置:"
echo " - 适配器端口: 8086"
echo " - ADXP 数据中台地址: ${ADXP_HOST:-localhost}:${ADXP_PORT:-8086}"
echo ""
# 进入项目目录
cd "$PROJECT_DIR"
$JAVA_8_HOME/bin/java -jar target/adxp-adapter.jar \
--adxp.host=${ADXP_HOST:-localhost} \
--adxp.port=${ADXP_PORT:-8086}
# 检查JAR文件是否存在如果不存在则编译
if [ ! -f "$JAR_FILE" ]; then
echo "🔨 正在编译adxp-adapter项目..."
mvn clean package
if [ $? -ne 0 ]; then
echo "❌ 编译失败"
exit 1
fi
fi
# 检查JAR文件是否存在
if [ ! -f "$JAR_FILE" ]; then
echo "❌ JAR文件不存在: $JAR_FILE"
exit 1
fi
echo "🚀 正在启动ADXP Adapter服务..."
echo "📄 JAR文件: $JAR_FILE"
# 启动服务
java -jar "$JAR_FILE" &
# 等待几秒钟让服务启动
sleep 5
# 检查服务是否启动成功
if pgrep -f "adxp-adapter" > /dev/null; then
echo "✅ ADXP Adapter服务启动成功"
echo "🌐 WebSocket端点: ws://localhost:8086/ws/flight-notifications"
echo "📊 健康检查: http://localhost:8086/actuator/health"
else
echo "❌ ADXP Adapter服务启动失败"
exit 1
fi

View File

@ -105,6 +105,9 @@ data:
password: ${ADXP_PASSWORD:dianxin@123}
# 重连延迟(毫秒)
reconnect-delay-millis: 3000
# WebSocket连接启用
websocket:
enabled: true
# 无人车厂商数据源配置 - 开发环境
vehicle-api:

View File

@ -33,9 +33,22 @@ public class FlightSdkProperties {
* 重连延迟毫秒
*/
private long reconnectDelayMillis = 3000L;
/**
* WebSocket配置
*/
private WebSocketConfig websocket = new WebSocketConfig();
public boolean isConfigurationReady() {
return host != null && port != null && port > 0
&& username != null && password != null;
}
@Data
public static class WebSocketConfig {
/**
* 是否启用WebSocket连接
*/
private boolean enabled = true;
}
}

View File

@ -0,0 +1,23 @@
package com.qaup.collision.datacollector.config;
import com.qaup.collision.datacollector.websocket.AdxpFlightServiceWebSocketClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.client.WebSocketClient;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
@Configuration
public class WebSocketConfig {
@Bean
public WebSocketClient webSocketClient() {
return new StandardWebSocketClient();
}
@Bean
public AdxpFlightServiceWebSocketClient adxpFlightServiceWebSocketClient(
WebSocketClient webSocketClient,
FlightSdkProperties flightSdkProperties) {
return new AdxpFlightServiceWebSocketClient(webSocketClient, flightSdkProperties);
}
}

View File

@ -49,6 +49,10 @@ public class AdxpFlightServiceHttpClient implements org.springframework.beans.fa
this.properties = properties;
this.restTemplate = new RestTemplate();
}
public FlightSdkProperties getProperties() {
return properties;
}
@Override
public void afterPropertiesSet() {

View File

@ -84,6 +84,9 @@ public class DataCollectorService {
@Autowired(required = false)
private AdxpFlightServiceHttpClient adxpFlightServiceClient;
@Autowired(required = false)
private com.qaup.collision.datacollector.websocket.AdxpFlightServiceWebSocketClient adxpFlightServiceWebSocketClient;
private final GeometryFactory geometryFactory = new GeometryFactory(new PrecisionModel(), 4326); // SRID 4326 for WGS84
@ -95,10 +98,72 @@ public class DataCollectorService {
private final Map<String, FlightNotification> flightNotificationCache = new ConcurrentHashMap<>();
// 初始化时将缓存引用传递给数据处理服务
/**
* 初始化时将缓存引用传递给数据处理服务
*/
@PostConstruct
public void init() {
dataProcessingService.setActiveMovingObjectsCache(activeMovingObjectsCache);
log.info("DataCollectorService 初始化完成缓存引用已传递给DataProcessingService");
// 初始化WebSocket客户端监听器
initWebSocketClient();
}
/**
* 初始化WebSocket客户端监听器
*/
private void initWebSocketClient() {
if (adxpFlightServiceWebSocketClient != null &&
adxpFlightServiceClient != null &&
adxpFlightServiceClient.getProperties() != null &&
adxpFlightServiceClient.getProperties().getWebsocket().isEnabled()) {
try {
adxpFlightServiceWebSocketClient.addMessageListener(this::handleFlightNotifications);
// WebSocket客户端会在PostConstruct时自动连接
log.info("已注册WebSocket航班通知客户端");
} catch (Exception e) {
log.error("初始化WebSocket客户端失败", e);
}
} else {
log.info("WebSocket客户端未启用或未配置");
}
}
/**
* 处理通过WebSocket接收到的航班通知
*/
private void handleFlightNotifications(List<FlightNotificationDTO> notifications) {
if (notifications == null || notifications.isEmpty()) {
return;
}
log.info("通过WebSocket接收到 {} 条航班进出港通知", notifications.size());
// 将DTO转换为业务对象并处理
for (FlightNotificationDTO dto : notifications) {
try {
FlightNotification notification = convertToFlightNotification(dto);
if (notification != null && notification.isValid()) {
log.info("🛬 处理航班进出港通知: 航班号={}, 类型={}, 跑道={}, 机位={}, 事件时间={}",
notification.getFlightNo(),
notification.getType(),
notification.getRunway(),
notification.getSeat(),
notification.getEventDateTime());
// 缓存航班通知供DataProcessingService处理
cacheFlightNotification(notification);
} else {
log.warn("⚠️ 航班进出港通知数据无效,跳过处理: {}", dto);
}
} catch (Exception e) {
log.error("❌ 处理航班进出港通知异常: flightNo={}", dto.getFlightNo(), e);
}
}
log.info("✅ WebSocket航班进出港通知数据处理完成处理数量: {}", notifications.size());
}
@ -613,6 +678,14 @@ public class DataCollectorService {
}
try {
// 优先使用WebSocket客户端
if (adxpFlightServiceWebSocketClient != null && adxpFlightServiceWebSocketClient.isConnected()) {
// WebSocket客户端已连接消息通过WebSocket实时接收
log.debug("使用WebSocket客户端接收航班通知");
return;
}
// 如果WebSocket客户端不可用回退到HTTP客户端
if (adxpFlightServiceClient == null || !adxpFlightServiceClient.isEnabled()) {
log.warn("数据中台航班 SDK 未启用,跳过航班通知采集");
return;

View File

@ -0,0 +1,492 @@
package com.qaup.collision.datacollector.websocket;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.qaup.collision.datacollector.config.FlightSdkProperties;
import com.qaup.collision.datacollector.dto.FlightNotificationDTO;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.client.WebSocketClient;
import java.net.URI;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.concurrent.atomic.AtomicLong;
@Slf4j
public class AdxpFlightServiceWebSocketClient implements WebSocketHandler {
private final WebSocketClient webSocketClient;
private final FlightSdkProperties properties;
private final ObjectMapper objectMapper;
private final List<Consumer<List<FlightNotificationDTO>>> messageListeners;
private final AtomicBoolean isConnected = new AtomicBoolean(false);
private final AtomicBoolean isRunning = new AtomicBoolean(false);
private final AtomicLong messageCount = new AtomicLong(0);
private final AtomicLong errorCount = new AtomicLong(0);
private WebSocketSession session;
private String sessionId;
private Thread reconnectThread;
public AdxpFlightServiceWebSocketClient(WebSocketClient webSocketClient,
FlightSdkProperties properties) {
this.webSocketClient = webSocketClient;
this.properties = properties;
this.objectMapper = new ObjectMapper();
this.messageListeners = new CopyOnWriteArrayList<>();
}
/**
* 启动WebSocket客户端
*/
@PostConstruct
public void start() {
if (isRunning.compareAndSet(false, true)) {
log.info("启动ADXP航班通知WebSocket客户端");
connect();
}
}
/**
* 停止WebSocket客户端
*/
@PreDestroy
public void stop() {
if (isRunning.compareAndSet(true, false)) {
log.info("停止ADXP航班通知WebSocket客户端");
disconnect();
}
}
/**
* 连接到ADXP适配器的WebSocket服务
*/
public void connect() {
if (!properties.isConfigurationReady()) {
log.warn("数据中台航班 SDK 配置不完整WebSocket客户端将无法正常工作");
return;
}
try {
// 首先通过HTTP登录获取sessionId
sessionId = loginAndGetSessionId();
if (sessionId == null) {
log.error("无法获取sessionIdWebSocket连接失败");
scheduleReconnect();
return;
}
// 构建WebSocket URL
String wsUrl = String.format("ws://%s:%d/ws/flight-notifications",
properties.getHost(), properties.getPort());
log.info("正在连接到ADXP适配器WebSocket服务: url={}", wsUrl);
// 连接WebSocket
session = webSocketClient.doHandshake(this, URI.create(wsUrl).toString()).get();
isConnected.set(true);
log.info("✅ 已连接到ADXP适配器WebSocket服务");
} catch (Exception e) {
log.error("❌ 连接ADXP适配器WebSocket服务失败", e);
errorCount.incrementAndGet();
isConnected.set(false);
scheduleReconnect();
}
}
/**
* 通过HTTP登录获取sessionId
*/
private String loginAndGetSessionId() {
try {
String baseUrl = String.format("http://%s:%d/api/adxp",
properties.getHost(), properties.getPort());
// 创建登录请求
java.util.Map<String, Object> loginRequest = new java.util.HashMap<>();
loginRequest.put("username", properties.getUsername());
loginRequest.put("password", properties.getPassword());
// 发送HTTP POST请求
java.net.http.HttpClient httpClient = java.net.http.HttpClient.newHttpClient();
String loginUrl = baseUrl + "/login";
String requestBody = objectMapper.writeValueAsString(loginRequest);
java.net.http.HttpRequest request = java.net.http.HttpRequest.newBuilder()
.uri(URI.create(loginUrl))
.header("Content-Type", "application/json")
.POST(java.net.http.HttpRequest.BodyPublishers.ofString(requestBody))
.build();
java.net.http.HttpResponse<String> response = httpClient.send(request,
java.net.http.HttpResponse.BodyHandlers.ofString());
if (response.statusCode() == 200) {
java.util.Map<String, Object> responseBody = objectMapper.readValue(
response.body(), new TypeReference<java.util.Map<String, Object>>() {});
if (Boolean.TRUE.equals(responseBody.get("success"))) {
String sessionId = (String) responseBody.get("sessionId");
log.info("✅ 登录ADXP适配器成功: sessionId={}", sessionId);
return sessionId;
} else {
String message = (String) responseBody.get("message");
log.warn("❌ 登录ADXP适配器失败: {}", message);
}
} else {
log.warn("❌ 登录ADXP适配器HTTP请求失败: status={}", response.statusCode());
}
} catch (Exception e) {
log.error("❌ 登录ADXP适配器时发生异常", e);
errorCount.incrementAndGet();
}
return null;
}
/**
* 断开WebSocket连接
*/
public void disconnect() {
try {
if (session != null && session.isOpen()) {
session.close();
log.info("WebSocket连接已关闭");
}
// 尝试登出
logout();
} catch (Exception e) {
log.error("断开WebSocket连接时发生异常", e);
errorCount.incrementAndGet();
} finally {
isConnected.set(false);
session = null;
}
}
/**
* 登出
*/
private void logout() {
if (sessionId == null) {
return;
}
try {
String baseUrl = String.format("http://%s:%d/api/adxp",
properties.getHost(), properties.getPort());
// 创建登出请求
java.util.Map<String, String> logoutRequest = new java.util.HashMap<>();
logoutRequest.put("sessionId", sessionId);
// 发送HTTP POST请求
java.net.http.HttpClient httpClient = java.net.http.HttpClient.newHttpClient();
String logoutUrl = baseUrl + "/logout";
String requestBody = objectMapper.writeValueAsString(logoutRequest);
java.net.http.HttpRequest request = java.net.http.HttpRequest.newBuilder()
.uri(URI.create(logoutUrl))
.header("Content-Type", "application/json")
.POST(java.net.http.HttpRequest.BodyPublishers.ofString(requestBody))
.build();
httpClient.send(request, java.net.http.HttpResponse.BodyHandlers.ofString());
log.info("✅ 已登出ADXP适配器服务");
} catch (Exception e) {
log.warn("登出ADXP适配器服务失败", e);
errorCount.incrementAndGet();
}
}
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
log.info("🟢 WebSocket连接已建立: sessionId={}", session.getId());
this.session = session;
isConnected.set(true);
messageCount.set(0); // 重置消息计数
}
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
if (message instanceof TextMessage) {
handleTextMessage(session, (TextMessage) message);
}
}
public void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
try {
// 解析接收到的消息
String payload = message.getPayload();
log.debug("📥 收到WebSocket消息 ({} bytes)", payload.length());
// 增加消息计数
messageCount.incrementAndGet();
// 将FlightMessage转换为FlightNotificationDTO
List<FlightNotificationDTO> notifications = parseMessages(payload);
if (!notifications.isEmpty()) {
log.info("🛬 接收到 {} 条航班通知", notifications.size());
// 通知所有监听器
for (Consumer<List<FlightNotificationDTO>> listener : messageListeners) {
try {
listener.accept(notifications);
} catch (Exception e) {
log.error("处理消息监听器时发生异常", e);
errorCount.incrementAndGet();
}
}
} else {
log.debug("📭 收到空消息或无法解析的消息");
}
} catch (Exception e) {
log.error("❌ 处理WebSocket消息时发生异常", e);
errorCount.incrementAndGet();
}
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
log.error("❌ WebSocket传输错误", exception);
errorCount.incrementAndGet();
isConnected.set(false);
this.session = null;
// 尝试重连
if (isRunning.get()) {
scheduleReconnect();
}
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
log.info("🟡 WebSocket连接已关闭: reason={}, code={}", closeStatus.getReason(), closeStatus.getCode());
isConnected.set(false);
this.session = null;
// 尝试重连除非是正常关闭
if (isRunning.get() && closeStatus.getCode() != 1000) {
scheduleReconnect();
}
}
@Override
public boolean supportsPartialMessages() {
return false;
}
/**
* 解析消息
*/
private List<FlightNotificationDTO> parseMessages(String jsonContent) {
try {
// 解析FlightMessage列表
List<FlightMessage> flightMessages =
objectMapper.readValue(jsonContent, new TypeReference<List<FlightMessage>>() {});
List<FlightNotificationDTO> notifications = new java.util.ArrayList<>();
for (FlightMessage message : flightMessages) {
try {
FlightNotificationDTO dto = parseXmlMessage(message.getServiceCode(), message.getContent());
if (dto != null) {
notifications.add(dto);
}
} catch (Exception e) {
log.warn("解析消息失败: serviceCode={}", message.getServiceCode(), e);
errorCount.incrementAndGet();
}
}
return notifications;
} catch (Exception e) {
log.error("解析消息失败", e);
errorCount.incrementAndGet();
return new java.util.ArrayList<>();
}
}
private FlightNotificationDTO parseXmlMessage(String serviceCode, String xmlContent) {
try {
javax.xml.parsers.DocumentBuilderFactory factory = javax.xml.parsers.DocumentBuilderFactory.newInstance();
factory.setNamespaceAware(false);
javax.xml.parsers.DocumentBuilder builder = factory.newDocumentBuilder();
org.w3c.dom.Document doc = builder.parse(new org.xml.sax.InputSource(new java.io.StringReader(xmlContent)));
org.w3c.dom.Element root = doc.getDocumentElement();
switch (serviceCode) {
case "ADXP_NAOMS_O_DYN_ARR":
return parseArrival(root);
case "ADXP_NAOMS_O_CDM_AXOT":
return parsePushback(root);
case "ADXP_NAOMS_O_CDM_RUNWAY":
return parseRunway(root);
case "ADXP_NAOMS_O_DYN_CRAFTSEAT":
return parseGate(root);
default:
log.debug("未知的服务代码: {}", serviceCode);
return null;
}
} catch (Exception e) {
log.warn("解析 XML 失败: serviceCode={}", serviceCode, e);
errorCount.incrementAndGet();
return null;
}
}
private FlightNotificationDTO parseArrival(org.w3c.dom.Element root) {
String flightNo = getTextContent(root, "FlightNumber");
String estimatedArrival = getTextContent(root, "EstimatedArrival");
FlightNotificationDTO dto = new FlightNotificationDTO();
dto.setFlightNo(flightNo);
dto.setType("IN");
if (estimatedArrival != null && !estimatedArrival.isEmpty()) {
try {
// 解析日期时间
java.time.format.DateTimeFormatter formatter = java.time.format.DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
java.time.LocalDateTime arrivalTime = java.time.LocalDateTime.parse(estimatedArrival, formatter);
dto.setTime(arrivalTime.atZone(java.time.ZoneId.systemDefault()).toInstant().toEpochMilli());
} catch (Exception e) {
log.warn("无法解析到达时间: {}", estimatedArrival);
errorCount.incrementAndGet();
}
}
return dto;
}
private FlightNotificationDTO parsePushback(org.w3c.dom.Element root) {
String flightNo = getTextContent(root, "FlightNumber");
String actualPushback = getTextContent(root, "ActualPushback");
FlightNotificationDTO dto = new FlightNotificationDTO();
dto.setFlightNo(flightNo);
dto.setType("OUT");
if (actualPushback != null && !actualPushback.isEmpty()) {
try {
// 解析日期时间
java.time.format.DateTimeFormatter formatter = java.time.format.DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
java.time.LocalDateTime pushbackTime = java.time.LocalDateTime.parse(actualPushback, formatter);
dto.setTime(pushbackTime.atZone(java.time.ZoneId.systemDefault()).toInstant().toEpochMilli());
} catch (Exception e) {
log.warn("无法解析推出时间: {}", actualPushback);
errorCount.incrementAndGet();
}
}
return dto;
}
private FlightNotificationDTO parseRunway(org.w3c.dom.Element root) {
String flightNo = getTextContent(root, "FlightNumber");
String runway = getTextContent(root, "Runway");
FlightNotificationDTO dto = new FlightNotificationDTO();
dto.setFlightNo(flightNo);
dto.setRunway(runway);
return dto;
}
private FlightNotificationDTO parseGate(org.w3c.dom.Element root) {
String flightNo = getTextContent(root, "FlightNumber");
String seat = getTextContent(root, "Gate");
FlightNotificationDTO dto = new FlightNotificationDTO();
dto.setFlightNo(flightNo);
dto.setSeat(seat);
return dto;
}
private String getTextContent(org.w3c.dom.Element parent, String tagName) {
org.w3c.dom.NodeList nodeList = parent.getElementsByTagName(tagName);
if (nodeList.getLength() > 0) {
return nodeList.item(0).getTextContent();
}
return null;
}
/**
* 添加消息监听器
*/
public void addMessageListener(Consumer<List<FlightNotificationDTO>> listener) {
messageListeners.add(listener);
log.info("添加消息监听器,当前监听器数量: {}", messageListeners.size());
}
/**
* 移除消息监听器
*/
public void removeMessageListener(Consumer<List<FlightNotificationDTO>> listener) {
messageListeners.remove(listener);
log.info("移除消息监听器,当前监听器数量: {}", messageListeners.size());
}
/**
* 检查是否已连接
*/
public boolean isConnected() {
return isConnected.get();
}
/**
* 安排重新连接
*/
private void scheduleReconnect() {
if (!isRunning.get()) {
return;
}
log.info("安排{}毫秒后重新连接WebSocket服务", properties.getReconnectDelayMillis());
if (reconnectThread != null && reconnectThread.isAlive()) {
reconnectThread.interrupt();
}
reconnectThread = new Thread(() -> {
try {
Thread.sleep(properties.getReconnectDelayMillis());
if (isRunning.get()) {
connect();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.info("重连被中断");
}
}, "WebSocket-Reconnect-Thread");
reconnectThread.setDaemon(true);
reconnectThread.start();
}
/**
* 获取客户端统计信息
*/
public String getStats() {
return String.format("WebSocketClient Stats: " +
"connected=%s, " +
"messages=%d, " +
"errors=%d, " +
"running=%s",
isConnected.get(),
messageCount.get(),
errorCount.get(),
isRunning.get());
}
}

View File

@ -0,0 +1,29 @@
package com.qaup.collision.datacollector.websocket;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 用于WebSocket传输的航班消息格式
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class FlightMessage {
/**
* 服务代码
*/
private String serviceCode;
/**
* 操作代码
*/
private String actionCode;
/**
* 消息内容XML格式
*/
private String content;
}

View File

@ -0,0 +1,146 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>ADXP WebSocket测试工具</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 20px;
}
.container {
max-width: 800px;
margin: 0 auto;
}
.control-panel {
background: #f5f5f5;
padding: 15px;
border-radius: 5px;
margin-bottom: 20px;
}
.log-container {
height: 400px;
overflow-y: auto;
border: 1px solid #ccc;
padding: 10px;
background: #fff;
font-family: monospace;
font-size: 12px;
}
.log-entry {
margin: 2px 0;
padding: 2px;
}
.log-info { color: #0066cc; }
.log-success { color: #009900; }
.log-error { color: #cc0000; }
.log-warning { color: #ff9900; }
input, button {
padding: 5px;
margin: 5px;
}
</style>
</head>
<body>
<div class="container">
<h1>ADXP WebSocket测试工具</h1>
<div class="control-panel">
<label>WebSocket URL:</label>
<input type="text" id="wsUrl" value="ws://localhost:8086/ws/flight-notifications" size="50">
<button onclick="connect()">连接</button>
<button onclick="disconnect()">断开</button>
<button onclick="clearLog()">清空日志</button>
</div>
<div class="control-panel">
<label>发送消息:</label>
<input type="text" id="messageInput" placeholder="输入要发送的消息">
<button onclick="sendMessage()">发送</button>
</div>
<div>
<h3>日志:</h3>
<div class="log-container" id="logContainer"></div>
</div>
</div>
<script>
let ws = null;
function log(message, type = 'info') {
const container = document.getElementById('logContainer');
const entry = document.createElement('div');
entry.className = `log-entry log-${type}`;
entry.textContent = `[${new Date().toLocaleTimeString()}] ${message}`;
container.appendChild(entry);
container.scrollTop = container.scrollHeight;
}
function connect() {
const url = document.getElementById('wsUrl').value;
if (ws && ws.readyState === WebSocket.OPEN) {
log('WebSocket已经连接', 'warning');
return;
}
try {
ws = new WebSocket(url);
ws.onopen = function(event) {
log(`WebSocket连接成功: ${url}`, 'success');
};
ws.onmessage = function(event) {
log(`收到消息: ${event.data}`, 'info');
try {
const messages = JSON.parse(event.data);
log(`解析到 ${messages.length} 条航班消息`, 'success');
} catch (e) {
log(`非JSON消息: ${event.data}`, 'info');
}
};
ws.onerror = function(error) {
log(`连接错误: ${error.message}`, 'error');
};
ws.onclose = function(event) {
log(`连接关闭: code=${event.code}, reason=${event.reason}`, 'warning');
};
} catch (error) {
log(`连接失败: ${error.message}`, 'error');
}
}
function disconnect() {
if (ws) {
ws.close();
log('主动断开连接', 'info');
}
}
function sendMessage() {
const message = document.getElementById('messageInput').value;
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(message);
log(`发送消息: ${message}`, 'info');
} else {
log('WebSocket未连接', 'error');
}
}
function clearLog() {
document.getElementById('logContainer').innerHTML = '';
}
// 页面加载完成
window.onload = function() {
log('ADXP WebSocket测试工具就绪', 'info');
};
</script>
</body>
</html>

View File

@ -0,0 +1,70 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
ADXP WebSocket客户端测试脚本
用于测试与adxp-adapter的WebSocket连接
"""
import websocket
import json
import threading
import time
def on_message(ws, message):
"""处理接收到的消息"""
print(f"📥 收到消息: {message}")
try:
# 尝试解析JSON消息
data = json.loads(message)
if isinstance(data, list):
print(f" 解析到 {len(data)} 条航班消息")
else:
print(f" 消息内容: {data}")
except json.JSONDecodeError:
print(f" 非JSON消息: {message}")
def on_error(ws, error):
"""处理错误"""
print(f"❌ WebSocket错误: {error}")
def on_close(ws, close_status_code, close_msg):
"""处理连接关闭"""
print(f"🔒 连接已关闭: 状态码={close_status_code}, 消息={close_msg}")
def on_open(ws):
"""处理连接打开"""
print("✅ WebSocket连接已建立")
# 启动心跳线程
def run(*args):
while True:
time.sleep(30) # 每30秒发送一次心跳
if ws.sock.connected:
ws.send("ping")
print("💓 发送心跳")
else:
break
threading.Thread(target=run, daemon=True).start()
def main():
"""主函数"""
# WebSocket URL
ws_url = "ws://localhost:8086/ws/flight-notifications"
print(f"🚀 正在连接到ADXP适配器WebSocket服务: {ws_url}")
# 启用调试日志
# websocket.enableTrace(True)
# 创建WebSocket连接
ws = websocket.WebSocketApp(ws_url,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close)
# 启动连接(阻塞)
ws.run_forever()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,133 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
ADXP WebSocket集成测试脚本
用于测试整个ADXP WebSocket消息传输流程
"""
import websocket
import json
import time
import threading
class AdxpWebSocketTestClient:
def __init__(self, url):
self.url = url
self.ws = None
self.connected = False
self.messages_received = 0
self.start_time = None
def on_message(self, ws, message):
"""处理接收到的消息"""
self.messages_received += 1
print(f"📥 [{self.messages_received}] 收到消息: {message[:100]}...")
try:
# 尝试解析JSON消息
data = json.loads(message)
if isinstance(data, list):
print(f" 解析到 {len(data)} 条航班消息")
for i, msg in enumerate(data):
print(f" [{i+1}] 服务代码: {msg.get('serviceCode', 'N/A')}")
else:
print(f" 消息内容: {data}")
except json.JSONDecodeError:
print(f" 非JSON消息: {message}")
# 显示性能统计
if self.start_time:
elapsed = time.time() - self.start_time
if elapsed > 0:
rate = self.messages_received / elapsed
print(f" 📊 接收速率: {rate:.2f} 条消息/秒")
def on_error(self, ws, error):
"""处理错误"""
print(f"❌ WebSocket错误: {error}")
self.connected = False
def on_close(self, ws, close_status_code, close_msg):
"""处理连接关闭"""
print(f"🔒 连接已关闭: 状态码={close_status_code}, 消息={close_msg}")
self.connected = False
def on_open(self, ws):
"""处理连接打开"""
self.connected = True
self.start_time = time.time()
print("✅ WebSocket连接已建立")
print(f"🔗 连接地址: {self.url}")
# 启动心跳线程
def heartbeat():
while self.connected:
time.sleep(30) # 每30秒发送一次心跳
if self.connected and self.ws and self.ws.sock.connected:
self.ws.send("ping")
print("💓 发送心跳")
else:
break
threading.Thread(target=heartbeat, daemon=True).start()
def connect(self):
"""连接到WebSocket服务器"""
print(f"🚀 正在连接到ADXP适配器WebSocket服务: {self.url}")
# 创建WebSocket连接
self.ws = websocket.WebSocketApp(self.url,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close)
# 启动连接(阻塞)
self.ws.run_forever()
def disconnect(self):
"""断开连接"""
if self.ws:
self.ws.close()
print("🚫 主动断开连接")
self.connected = False
def get_stats(self):
"""获取统计信息"""
if self.start_time:
elapsed = time.time() - self.start_time
return {
'messages_received': self.messages_received,
'elapsed_time': elapsed,
'rate': self.messages_received / elapsed if elapsed > 0 else 0
}
return None
def main():
"""主函数"""
# WebSocket URL
ws_url = "ws://localhost:8086/ws/flight-notifications"
# 创建测试客户端
client = AdxpWebSocketTestClient(ws_url)
try:
# 连接并开始监听
client.connect()
except KeyboardInterrupt:
print("\n⚠️ 用户中断")
client.disconnect()
# 显示最终统计
stats = client.get_stats()
if stats:
print(f"\n📊 最终统计:")
print(f" 接收消息总数: {stats['messages_received']}")
print(f" 运行时间: {stats['elapsed_time']:.2f}")
print(f" 平均接收速率: {stats['rate']:.2f} 条消息/秒")
except Exception as e:
print(f"❌ 测试过程中发生错误: {e}")
client.disconnect()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,190 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
ADXP WebSocket系统集成测试脚本
用于测试整个ADXP WebSocket消息传输流程
"""
import websocket
import json
import time
import threading
import requests
class AdxpSystemTest:
def __init__(self):
self.adxp_adapter_url = "http://localhost:8086"
self.qaup_system_url = "http://localhost:8080"
self.ws_url = "ws://localhost:8086/ws/flight-notifications"
self.ws = None
self.connected = False
self.messages_received = 0
self.start_time = None
def test_health_endpoints(self):
"""测试健康检查端点"""
print("🔍 测试健康检查端点...")
try:
# 测试adxp-adapter健康检查
response = requests.get(f"{self.adxp_adapter_url}/actuator/health")
if response.status_code == 200:
print("✅ adxp-adapter健康检查: OK")
print(f" 状态: {response.json()}")
else:
print(f"❌ adxp-adapter健康检查失败: {response.status_code}")
except Exception as e:
print(f"❌ adxp-adapter健康检查异常: {e}")
try:
# 测试QAUP系统健康检查
response = requests.get(f"{self.qaup_system_url}/actuator/health")
if response.status_code == 200:
print("✅ QAUP系统健康检查: OK")
print(f" 状态: {response.json()}")
else:
print(f"❌ QAUP系统健康检查失败: {response.status_code}")
except Exception as e:
print(f"❌ QAUP系统健康检查异常: {e}")
def on_message(self, ws, message):
"""处理接收到的消息"""
self.messages_received += 1
print(f"📥 [{self.messages_received}] 收到WebSocket消息")
try:
# 尝试解析JSON消息
data = json.loads(message)
if isinstance(data, list):
print(f" 解析到 {len(data)} 条航班消息")
for i, msg in enumerate(data):
service_code = msg.get('serviceCode', 'N/A')
print(f" [{i+1}] 服务代码: {service_code}")
if service_code in ['ADXP_NAOMS_O_DYN_ARR', 'ADXP_NAOMS_O_CDM_AXOT']:
print(f" 航班号: {self.extract_flight_number(msg.get('content', ''))}")
else:
print(f" 消息内容: {data}")
except json.JSONDecodeError:
print(f" 非JSON消息: {message}")
# 显示性能统计
if self.start_time:
elapsed = time.time() - self.start_time
if elapsed > 0:
rate = self.messages_received / elapsed
print(f" 📊 接收速率: {rate:.2f} 条消息/秒")
def extract_flight_number(self, xml_content):
"""从XML内容中提取航班号"""
try:
import xml.etree.ElementTree as ET
root = ET.fromstring(xml_content)
flight_number = root.find('.//FlightNumber')
return flight_number.text if flight_number is not None else 'N/A'
except Exception:
return 'N/A'
def on_error(self, ws, error):
"""处理错误"""
print(f"❌ WebSocket错误: {error}")
self.connected = False
def on_close(self, ws, close_status_code, close_msg):
"""处理连接关闭"""
print(f"🔒 WebSocket连接已关闭: 状态码={close_status_code}, 消息={close_msg}")
self.connected = False
def on_open(self, ws):
"""处理连接打开"""
self.connected = True
self.start_time = time.time()
print("✅ WebSocket连接已建立")
print(f"🔗 连接地址: {self.ws_url}")
# 启动心跳线程
def heartbeat():
while self.connected:
time.sleep(30) # 每30秒发送一次心跳
if self.connected and self.ws and self.ws.sock.connected:
self.ws.send("ping")
print("💓 发送心跳")
else:
break
threading.Thread(target=heartbeat, daemon=True).start()
def test_websocket_connection(self):
"""测试WebSocket连接"""
print("\n🚀 测试WebSocket连接...")
try:
# 创建WebSocket连接
self.ws = websocket.WebSocketApp(self.ws_url,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close)
# 在单独线程中运行WebSocket
ws_thread = threading.Thread(target=self.ws.run_forever, daemon=True)
ws_thread.start()
# 等待连接建立
time.sleep(2)
if self.connected:
print("✅ WebSocket连接测试成功")
# 保持连接10秒以接收消息
print("⏳ 保持连接10秒以接收消息...")
time.sleep(10)
self.ws.close()
else:
print("❌ WebSocket连接测试失败")
except Exception as e:
print(f"❌ WebSocket连接测试异常: {e}")
def get_final_stats(self):
"""获取最终统计信息"""
if self.start_time:
elapsed = time.time() - self.start_time
return {
'messages_received': self.messages_received,
'elapsed_time': elapsed,
'rate': self.messages_received / elapsed if elapsed > 0 else 0
}
return None
def run_all_tests(self):
"""运行所有测试"""
print("🧪 开始ADXP WebSocket系统集成测试")
print("=" * 50)
# 测试健康检查端点
self.test_health_endpoints()
# 测试WebSocket连接
self.test_websocket_connection()
# 显示最终统计
stats = self.get_final_stats()
if stats:
print(f"\n📊 最终统计:")
print(f" 接收消息总数: {stats['messages_received']}")
print(f" 运行时间: {stats['elapsed_time']:.2f}")
print(f" 平均接收速率: {stats['rate']:.2f} 条消息/秒")
print("\n✅ 系统集成测试完成")
def main():
"""主函数"""
tester = AdxpSystemTest()
try:
tester.run_all_tests()
except KeyboardInterrupt:
print("\n⚠️ 用户中断")
except Exception as e:
print(f"❌ 测试过程中发生错误: {e}")
if __name__ == "__main__":
main()