From c66b29067768da27f73ff222e8a762e8cb7156ec Mon Sep 17 00:00:00 2001
From: shan <1653261938@qq.com>
Date: Tue, 27 Jan 2026 17:01:22 +0800
Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=E7=BA=A2=E7=BB=BF=E7=81=AF?=
=?UTF-8?q?=E6=95=B0=E6=8D=AE=E9=87=87=E9=9B=86=E6=96=B9=E5=BC=8F=E4=BB=8E?=
=?UTF-8?q?TCP=E6=94=B9=E4=B8=BAMQTT=E5=B9=B6=E4=BC=98=E5=8C=96WebSocket?=
=?UTF-8?q?=E8=BD=A6=E8=BE=86=E6=8E=A7=E5=88=B6=E6=8C=87=E4=BB=A4?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
- 禁用TCP红绿灯服务器,改用MQTT订阅获取信号状态
- 新增MqttConfig配置类和TrafficLightMqttSubscriber订阅器
- 重构VehicleCommandInfoWebSocketHandler,根据红绿灯信号实时发送车辆控制指令
- 添加Eclipse Paho MQTT客户端依赖
- 新增接口文档和MQTT核心代码总结文档
- 添加WebSocket测试页面
---
VehicleCommandInfo接口文档.md | 54 +++++
mqtt项目核心代码总结.md | 128 ++++++++++
.../src/main/resources/application-prod.yml | 2 +-
qaup-admin/src/main/resources/application.yml | 20 +-
qaup-collision/pom.xml | 7 +
.../com/qaup/collision/config/MqttConfig.java | 49 ++++
.../server/TrafficLightMqttSubscriber.java | 220 ++++++++++++++++++
.../VehicleCommandInfoWebSocketHandler.java | 98 +++++---
test_websocket.html | 163 +++++++++++++
9 files changed, 701 insertions(+), 40 deletions(-)
create mode 100644 VehicleCommandInfo接口文档.md
create mode 100644 mqtt项目核心代码总结.md
create mode 100644 qaup-collision/src/main/java/com/qaup/collision/config/MqttConfig.java
create mode 100644 qaup-collision/src/main/java/com/qaup/collision/datacollector/server/TrafficLightMqttSubscriber.java
create mode 100644 test_websocket.html
diff --git a/VehicleCommandInfo接口文档.md b/VehicleCommandInfo接口文档.md
new file mode 100644
index 0000000..1b97df9
--- /dev/null
+++ b/VehicleCommandInfo接口文档.md
@@ -0,0 +1,54 @@
+# 车辆控制指令 WebSocket 接口文档
+
+## 接口概述
+
+- **接口名称**: VehicleCommandInfo
+- **协议**: WebSocket
+- **连接地址**: `ws://ip:8080/VehicleCommandInfo`
+- **说明**: 客户端连接后,服务端会根据红绿灯信号状态自动向所有连接的客户端推送车辆控制指令
+
+## 消息格式
+
+### 服务端 → 客户端(推送车辆控制指令)
+
+**JSON 格式**:
+
+```json
+{
+ "messageUniqueId": "550e8400-e29b-41d4-a716-446655440000",
+ "timestamp": 1737979200000,
+ "vehicleID": "AET01",
+ "commandType": "SIGNAL",
+ "commandReason": "TRAFFIC_LIGHT",
+ "signalState": "GREEN",
+ "intersectionId": "002",
+ "latitude": 343.23,
+ "longitude": 343.23,
+ "relativeSpeed": 3,
+ "relativeMotionX": 2002.12,
+ "relativeMotionY": 100.12,
+ "minDistance": 10.5
+}
+```
+
+### 字段说明
+
+| 字段名 | 类型 | 说明 | 示例值 |
+|--------|------|------|--------|
+| `messageUniqueId` | String | 消息唯一标识符 | "550e8400-e29b-41d4-a716-446655440000" |
+| `timestamp` | Long | 时间戳(毫秒) | 1737979200000 |
+| `vehicleID` | String | 车辆ID | "AET01" |
+| `commandType` | String | 指令类型 | "SIGNAL" |
+| `commandReason` | String | 指令原因 | "TRAFFIC_LIGHT" |
+| `signalState` | String | 信号状态 | "GREEN" 或 "RED" |
+| `intersectionId` | String | 交叉口ID | "002" |
+| `latitude` | Double | 纬度 | 343.23 |
+| `longitude` | Double | 经度 | 343.23 |
+| `relativeSpeed` | Integer | 相对速度 | 3 |
+| `relativeMotionX` | Double | 相对运动X方向 | 2002.12 |
+| `relativeMotionY` | Double | 相对运动Y方向 | 100.12 |
+| `minDistance` | Double | 最小距离 | 10.5 |
+
+
+
+
diff --git a/mqtt项目核心代码总结.md b/mqtt项目核心代码总结.md
new file mode 100644
index 0000000..01bd98a
--- /dev/null
+++ b/mqtt项目核心代码总结.md
@@ -0,0 +1,128 @@
+# MQTT 订阅者项目 - 核心代码总结
+
+## 项目概述
+Spring Boot MQTT 订阅者最小可行性验证项目,用于连接 MQTT Broker 并订阅指定 Topic 接收消息数据。
+
+## 技术栈
+- Eclipse Paho MQTT 1.2.5
+- dotenv-java 3.2.0
+
+
+## 2. application.yaml
+
+```yaml
+mqtt:
+ broker: ${MQTT_BROKER:tcp://10.0.0.202:8082}
+ client-id: ${MQTT_CLIENT_ID:springboot-mqtt-client}
+ username: ${MQTT_USERNAME:mqtt}
+ password: ${MQTT_PASSWORD:XAsOsLxaPs1}
+ topic: ${MQTT_TOPIC:test/topic}
+ timeout: ${MQTT_TIMEOUT:30}
+ keepalive: ${MQTT_KEEPALIVE:60}
+```
+
+## 4. MqttConfig.java
+
+```java
+package com.example.demo.config;
+
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class MqttConfig {
+
+ @Value("${mqtt.username:}")
+ private String username;
+
+ @Value("${mqtt.password:}")
+ private String password;
+
+ @Value("${mqtt.timeout:30}")
+ private int timeout;
+
+ @Value("${mqtt.keepalive:60}")
+ private int keepalive;
+
+ @Bean
+ public MqttConnectOptions mqttConnectOptions() {
+ MqttConnectOptions options = new MqttConnectOptions();
+ options.setCleanSession(true);
+ options.setConnectionTimeout(timeout);
+ options.setKeepAliveInterval(keepalive);
+
+ if (username != null && !username.isEmpty()) {
+ options.setUserName(username);
+ options.setPassword(password.toCharArray());
+ }
+
+ return options;
+ }
+}
+```
+
+---
+
+## 5. MqttSubscriber.java
+
+```java
+package com.example.demo.subscriber;
+
+import jakarta.annotation.PostConstruct;
+import org.eclipse.paho.client.mqttv3.*;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.nio.charset.StandardCharsets;
+
+@Component
+public class MqttSubscriber {
+
+ @Value("${mqtt.broker}")
+ private String broker;
+
+ @Value("${mqtt.client-id}")
+ private String clientId;
+
+ @Value("${mqtt.topic}")
+ private String topic;
+
+ private final MqttConnectOptions options;
+
+ public MqttSubscriber(MqttConnectOptions options) {
+ this.options = options;
+ }
+
+ @PostConstruct
+ public void init() throws MqttException {
+ MqttClient client = new MqttClient(broker, clientId);
+
+ client.setCallback(new MqttCallback() {
+ @Override
+ public void connectionLost(Throwable cause) {
+ System.out.println("❌ MQTT 连接断开:" + cause.getMessage());
+ }
+
+ @Override
+ public void messageArrived(String topic, MqttMessage message) {
+ String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
+ System.out.println("📥 收到消息");
+ System.out.println("Topic: " + topic);
+ System.out.println("Payload: " + payload);
+ }
+
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken token) {
+ }
+ });
+
+ client.connect(options);
+ client.subscribe(topic, 0);
+
+ System.out.println("✅ MQTT 已连接,订阅 Topic:" + topic);
+ }
+}
+```
+
diff --git a/qaup-admin/src/main/resources/application-prod.yml b/qaup-admin/src/main/resources/application-prod.yml
index 75bb76d..2dc5140 100644
--- a/qaup-admin/src/main/resources/application-prod.yml
+++ b/qaup-admin/src/main/resources/application-prod.yml
@@ -109,5 +109,5 @@ data:
traffic:
light:
tcp:
- enabled: ${TRAFFIC_LIGHT_TCP_ENABLED:true}
+ enabled: ${TRAFFIC_LIGHT_TCP_ENABLED:false}
port: ${TRAFFIC_LIGHT_TCP_PORT:8082}
diff --git a/qaup-admin/src/main/resources/application.yml b/qaup-admin/src/main/resources/application.yml
index 38da7b7..b3e3456 100644
--- a/qaup-admin/src/main/resources/application.yml
+++ b/qaup-admin/src/main/resources/application.yml
@@ -220,7 +220,7 @@ traffic:
light:
tcp:
# 是否启用TCP服务器
- enabled: true
+ enabled: false
# TCP监听端口
port: 8082
# 最大连接数
@@ -230,6 +230,24 @@ traffic:
# 心跳超时时间(分钟)
heartbeat-timeout-minutes: 5
+ mqtt:
+ # 是否启用MQTT订阅
+ enabled: true
+ # MQTT Broker地址
+ broker: ${MQTT_BROKER:tcp://10.64.58.228:8082}
+ # 客户端ID
+ client-id: ${MQTT_CLIENT_ID:qaup-traffic-light-mqtt}
+ # 用户名
+ username: ${MQTT_USERNAME:mqtt}
+ # 密码
+ password: ${MQTT_PASSWORD:XAsOsLxaPs1}
+ # 订阅主题
+ topic: ${MQTT_TOPIC:cusc/v2/SF053/QingDrsu001/data}
+ # 连接超时时间(秒)
+ timeout: ${MQTT_TIMEOUT:30}
+ # 心跳间隔(秒)
+ keepalive: ${MQTT_KEEPALIVE:60}
+
intersection:
# 默认路口ID(当信号中没有指定时使用)
default-id: "DEFAULT_INTERSECTION"
diff --git a/qaup-collision/pom.xml b/qaup-collision/pom.xml
index 4a09f87..f43361b 100644
--- a/qaup-collision/pom.xml
+++ b/qaup-collision/pom.xml
@@ -150,6 +150,13 @@
test
+
+
+ org.eclipse.paho
+ org.eclipse.paho.client.mqttv3
+ 1.2.5
+
+
diff --git a/qaup-collision/src/main/java/com/qaup/collision/config/MqttConfig.java b/qaup-collision/src/main/java/com/qaup/collision/config/MqttConfig.java
new file mode 100644
index 0000000..7e55abb
--- /dev/null
+++ b/qaup-collision/src/main/java/com/qaup/collision/config/MqttConfig.java
@@ -0,0 +1,49 @@
+package com.qaup.collision.config;
+
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * MQTT 配置类
+ *
+ * 用于配置 MQTT 连接选项,支持红绿灯数据通过 MQTT 订阅
+ */
+@Configuration
+public class MqttConfig {
+
+ @Value("${traffic.light.mqtt.username:}")
+ private String username;
+
+ @Value("${traffic.light.mqtt.password:}")
+ private String password;
+
+ @Value("${traffic.light.mqtt.timeout:30}")
+ private int timeout;
+
+ @Value("${traffic.light.mqtt.keepalive:60}")
+ private int keepalive;
+
+ /**
+ * MQTT 连接选项配置
+ *
+ * @return MQTT 连接选项
+ */
+ @Bean
+ public MqttConnectOptions mqttConnectOptions() {
+ MqttConnectOptions options = new MqttConnectOptions();
+ options.setCleanSession(true);
+ options.setConnectionTimeout(timeout);
+ options.setKeepAliveInterval(keepalive);
+ options.setAutomaticReconnect(true);
+
+ // 如果配置了用户名和密码,则设置认证信息
+ if (username != null && !username.isEmpty()) {
+ options.setUserName(username);
+ options.setPassword(password.toCharArray());
+ }
+
+ return options;
+ }
+}
\ No newline at end of file
diff --git a/qaup-collision/src/main/java/com/qaup/collision/datacollector/server/TrafficLightMqttSubscriber.java b/qaup-collision/src/main/java/com/qaup/collision/datacollector/server/TrafficLightMqttSubscriber.java
new file mode 100644
index 0000000..b843603
--- /dev/null
+++ b/qaup-collision/src/main/java/com/qaup/collision/datacollector/server/TrafficLightMqttSubscriber.java
@@ -0,0 +1,220 @@
+package com.qaup.collision.datacollector.server;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.qaup.collision.websocket.handler.CollisionWebSocketHandler;
+import com.qaup.collision.websocket.handler.VehicleCommandInfoWebSocketHandler;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.*;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * 红绿灯 MQTT 订阅者
+ *
+ * 通过 MQTT 协议订阅红绿灯设备的消息
+ * 与 TCP 方式并存,提供另一种数据接收方式
+ */
+@Slf4j
+@Component
+public class TrafficLightMqttSubscriber {
+
+ @Value("${traffic.light.mqtt.enabled:false}")
+ private boolean mqttEnabled;
+
+ @Value("${traffic.light.mqtt.broker}")
+ private String broker;
+
+ @Value("${traffic.light.mqtt.client-id}")
+ private String clientId;
+
+ @Value("${traffic.light.mqtt.topic}")
+ private String topic;
+
+ private final MqttConnectOptions mqttConnectOptions;
+ private final CollisionWebSocketHandler collisionWebSocketHandler;
+ private final VehicleCommandInfoWebSocketHandler vehicleCommandInfoWebSocketHandler;
+ private final ObjectMapper objectMapper;
+
+ private MqttClient mqttClient;
+ private final AtomicBoolean connected = new AtomicBoolean(false);
+
+ // 统计信息
+ private final AtomicLong totalMessages = new AtomicLong(0);
+ private final AtomicLong errorCount = new AtomicLong(0);
+
+ /**
+ * 构造函数
+ *
+ * @param mqttConnectOptions MQTT 连接选项
+ * @param collisionWebSocketHandler WebSocket 处理器,用于向前端广播消息
+ * @param vehicleCommandInfoWebSocketHandler 车辆控制指令 WebSocket 处理器
+ * @param objectMapper JSON 解析器
+ */
+ @Autowired
+ public TrafficLightMqttSubscriber(MqttConnectOptions mqttConnectOptions,
+ CollisionWebSocketHandler collisionWebSocketHandler,
+ VehicleCommandInfoWebSocketHandler vehicleCommandInfoWebSocketHandler,
+ ObjectMapper objectMapper) {
+ this.mqttConnectOptions = mqttConnectOptions;
+ this.collisionWebSocketHandler = collisionWebSocketHandler;
+ this.vehicleCommandInfoWebSocketHandler = vehicleCommandInfoWebSocketHandler;
+ this.objectMapper = objectMapper;
+ }
+
+ /**
+ * 初始化 MQTT 订阅者
+ */
+ @PostConstruct
+ public void init() {
+ if (mqttEnabled) {
+ connectAndSubscribe();
+ } else {
+ log.info("🚦 MQTT 红绿灯订阅已禁用");
+ }
+ }
+
+ /**
+ * 连接 MQTT Broker 并订阅主题
+ */
+ public void connectAndSubscribe() {
+ try {
+ log.info("🚦 正在连接 MQTT Broker: {}", broker);
+
+ // 创建 MQTT 客户端
+ mqttClient = new MqttClient(broker, clientId);
+
+ // 设置回调
+ mqttClient.setCallback(new MqttCallback() {
+ @Override
+ public void connectionLost(Throwable cause) {
+ connected.set(false);
+ log.error("❌ MQTT 连接断开: {}", cause.getMessage());
+ errorCount.incrementAndGet();
+ }
+
+ @Override
+ public void messageArrived(String topic, MqttMessage message) {
+ String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
+ totalMessages.incrementAndGet();
+
+ log.info("🚦 [MQTT] 收到红绿灯消息");
+ log.info("🚦 [MQTT] Topic: {}", topic);
+ log.info("🚦 [MQTT] Payload: {}", payload);
+
+ // 通过 WebSocket 广播消息给前端
+ try {
+ String websocketMessage = String.format(
+ "{\"type\":\"trafficLight\",\"source\":\"mqtt\",\"topic\":\"%s\",\"payload\":%s,\"timestamp\":%d}",
+ topic, payload, System.currentTimeMillis()
+ );
+ collisionWebSocketHandler.broadcastMessage(websocketMessage);
+ log.debug("🚦 [MQTT] 消息已通过 WebSocket 广播到前端");
+ } catch (Exception e) {
+ log.error("🚦 [MQTT] WebSocket 广播失败: {}", e.getMessage(), e);
+ }
+
+ // 解析 phaseColor 并发送车辆控制指令
+ try {
+ JsonNode jsonNode = objectMapper.readTree(payload);
+
+ // 从 serviceData.phases 数组中获取 phaseColor
+ JsonNode serviceDataNode = jsonNode.get("serviceData");
+ if (serviceDataNode != null) {
+ JsonNode phasesNode = serviceDataNode.get("phases");
+ if (phasesNode != null && phasesNode.isArray() && phasesNode.size() > 0) {
+ // 获取第一个相位的 phaseColor
+ JsonNode firstPhase = phasesNode.get(0);
+ JsonNode phaseColorNode = firstPhase.get("phaseColor");
+
+ if (phaseColorNode != null && phaseColorNode.isInt()) {
+ int phaseColor = phaseColorNode.asInt();
+ log.info("🚦 [MQTT] 解析到 phaseColor: {}", phaseColor);
+
+ // phaseColor 为 1 时发送 GREEN,为 3 时发送 RED
+ if (phaseColor == 1 || phaseColor == 3) {
+ vehicleCommandInfoWebSocketHandler.sendSignalState(phaseColor);
+ log.info("🚦 [MQTT] 已发送车辆控制指令: phaseColor={}", phaseColor);
+ } else {
+ log.warn("🚦 [MQTT] 未知的 phaseColor 值: {},仅支持 1 (GREEN) 或 3 (RED)", phaseColor);
+ }
+ } else {
+ log.warn("🚦 [MQTT] phases[0] 中未找到 phaseColor 字段或格式不正确");
+ }
+ } else {
+ log.warn("🚦 [MQTT] serviceData 中未找到 phases 数组或数组为空");
+ }
+ } else {
+ log.warn("🚦 [MQTT] 消息中未找到 serviceData 字段");
+ }
+ } catch (Exception e) {
+ log.error("🚦 [MQTT] 解析 phaseColor 或发送控制指令失败: {}", e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken token) {
+ // QOS 1/2 消息发送完成回调(发布者使用,订阅者不需要)
+ }
+ });
+
+ // 连接到 Broker
+ mqttClient.connect(mqttConnectOptions);
+ connected.set(true);
+
+ // 订阅主题
+ mqttClient.subscribe(topic, 0); // QOS 0 - 最多一次
+
+ log.info("✅ MQTT 红绿灯订阅已连接,订阅 Topic: {}", topic);
+
+ } catch (MqttException e) {
+ log.error("❌ MQTT 连接失败: {}", e.getMessage(), e);
+ connected.set(false);
+ errorCount.incrementAndGet();
+ }
+ }
+
+ /**
+ * 断开 MQTT 连接
+ */
+ @PreDestroy
+ public void disconnect() {
+ if (mqttClient != null && connected.get()) {
+ try {
+ log.info("🚦 正在断开 MQTT 连接...");
+ mqttClient.disconnect();
+ mqttClient.close();
+ connected.set(false);
+ log.info("✅ MQTT 连接已断开");
+ } catch (MqttException e) {
+ log.error("❌ MQTT 断开连接失败: {}", e.getMessage(), e);
+ }
+ }
+ }
+
+ /**
+ * 获取连接状态
+ *
+ * @return 是否已连接
+ */
+ public boolean isConnected() {
+ return connected.get();
+ }
+
+ /**
+ * 获取统计信息
+ *
+ * @return 统计信息字符串
+ */
+ public String getStatistics() {
+ return String.format("MQTT订阅状态 - 连接:%s, 总消息:%d, 错误:%d",
+ connected.get() ? "是" : "否", totalMessages.get(), errorCount.get());
+ }
+}
\ No newline at end of file
diff --git a/qaup-collision/src/main/java/com/qaup/collision/websocket/handler/VehicleCommandInfoWebSocketHandler.java b/qaup-collision/src/main/java/com/qaup/collision/websocket/handler/VehicleCommandInfoWebSocketHandler.java
index bfc427c..b524755 100644
--- a/qaup-collision/src/main/java/com/qaup/collision/websocket/handler/VehicleCommandInfoWebSocketHandler.java
+++ b/qaup-collision/src/main/java/com/qaup/collision/websocket/handler/VehicleCommandInfoWebSocketHandler.java
@@ -12,56 +12,27 @@ import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
/**
- * WebSocket 车辆控制指令测试端点
+ * WebSocket 车辆控制指令端点
*
* 客户端可通过 ws://:8080/VehicleCommandInfo 连接,
- * 服务端会周期性向所有连接会话发送固定的车辆控制指令 JSON。
+ * 服务端根据红绿灯信号状态向所有连接会话发送车辆控制指令。
*/
@Component
public class VehicleCommandInfoWebSocketHandler implements WebSocketHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(VehicleCommandInfoWebSocketHandler.class);
- private static final String COMMAND_JSON = "{" +
- "\"messageUniqueId\": \"68f79d1a-e27f-11ed-b28c-2cf05d9c2649\"," +
- "\"timestamp\": 1736175610," +
- "\"vehicleID\": \"A001\"," +
- "\"commandType\": \"SIGNAL\"," +
- "\"commandReason\": \"TRAFFIC_LIGHT\"," +
- "\"signalState\":\"RED\"," +
- "\"intersectionId\":\"002\"," +
- "\"latitude\": 343.23," +
- "\"longitude\": 343.23," +
- "\"relativeSpeed\": 3," +
- "\"relativeMotionX\": 2002.12," +
- "\"relativeMotionY\":100.12," +
- "\"minDistance\":10.5" +
- "}";
-
private final Map sessions = new ConcurrentHashMap<>();
- private final Map> sessionTasks = new ConcurrentHashMap<>();
- private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
@Override
public void afterConnectionEstablished(@NonNull WebSocketSession session) {
String sessionId = session.getId();
sessions.put(sessionId, session);
LOGGER.info("VehicleCommandInfo WebSocket 连接建立, sessionId={}", sessionId);
-
- ScheduledFuture> task = scheduler.scheduleAtFixedRate(
- () -> sendCommandJson(session),
- 0,
- 1,
- TimeUnit.SECONDS
- );
- sessionTasks.put(sessionId, task);
}
@Override
@@ -88,14 +59,59 @@ public class VehicleCommandInfoWebSocketHandler implements WebSocketHandler {
return false;
}
- private void sendCommandJson(WebSocketSession session) {
+ /**
+ * 发送信号状态给所有连接的客户端
+ *
+ * @param phaseColor 信号颜色值:1=green, 3=red
+ */
+ public void sendSignalState(int phaseColor) {
+ String signalState = (phaseColor == 1) ? "GREEN" : "RED";
+
+ String commandJson = String.format(
+ "{" +
+ "\"messageUniqueId\": \"%s\"," +
+ "\"timestamp\": %d," +
+ "\"vehicleID\": \"AET01\"," +
+ "\"commandType\": \"SIGNAL\"," +
+ "\"commandReason\": \"TRAFFIC_LIGHT\"," +
+ "\"signalState\":\"%s\"," +
+ "\"intersectionId\":\"002\"," +
+ "\"latitude\": 343.23," +
+ "\"longitude\": 343.23," +
+ "\"relativeSpeed\": 3," +
+ "\"relativeMotionX\": 2002.12," +
+ "\"relativeMotionY\":100.12," +
+ "\"minDistance\":10.5" +
+ "}",
+ UUID.randomUUID().toString(),
+ System.currentTimeMillis(),
+ signalState
+ );
+
+ LOGGER.info("🚦 [VehicleCommandInfo] 发送车辆控制指令: phaseColor={}, signalState={}", phaseColor, signalState);
+ LOGGER.info("🚦 [VehicleCommandInfo] 当前连接数: {}", sessions.size());
+
+ if (sessions.isEmpty()) {
+ LOGGER.warn("🚦 [VehicleCommandInfo] 没有客户端连接,无法发送消息");
+ return;
+ }
+
+ // 向所有连接的客户端发送
+ sessions.values().forEach(session -> sendMessage(session, commandJson));
+ LOGGER.info("🚦 [VehicleCommandInfo] 已向 {} 个客户端发送消息", sessions.size());
+ }
+
+ /**
+ * 发送消息给指定会话
+ */
+ private void sendMessage(WebSocketSession session, String message) {
if (session == null || !session.isOpen()) {
return;
}
try {
synchronized (session) {
if (session.isOpen()) {
- session.sendMessage(new TextMessage(COMMAND_JSON));
+ session.sendMessage(new TextMessage(message));
}
}
} catch (IOException e) {
@@ -107,12 +123,11 @@ public class VehicleCommandInfoWebSocketHandler implements WebSocketHandler {
}
}
+ /**
+ * 清理会话
+ */
private void cleanupSession(WebSocketSession session, CloseStatus status) {
String sessionId = session.getId();
- ScheduledFuture> task = sessionTasks.remove(sessionId);
- if (task != null) {
- task.cancel(true);
- }
sessions.remove(sessionId);
try {
if (session.isOpen()) {
@@ -122,4 +137,11 @@ public class VehicleCommandInfoWebSocketHandler implements WebSocketHandler {
LOGGER.debug("关闭 sessionId={} 时出现异常: {}", sessionId, e.getMessage());
}
}
+
+ /**
+ * 获取当前连接数
+ */
+ public int getConnectionCount() {
+ return sessions.size();
+ }
}
diff --git a/test_websocket.html b/test_websocket.html
new file mode 100644
index 0000000..cbee6e3
--- /dev/null
+++ b/test_websocket.html
@@ -0,0 +1,163 @@
+
+
+
+
+ WebSocket 测试
+
+
+
+ WebSocket 连接测试
+
+
+
测试 1: /collision 端点
+
+
+
状态: 未连接
+
+
+
+
+
测试 2: /VehicleCommandInfo 端点
+
+
+
状态: 未连接
+
+
+
+
+
+
\ No newline at end of file