This commit is contained in:
sladro 2026-01-31 14:53:20 +08:00
commit 87f7541028
16 changed files with 1036 additions and 61 deletions

View File

@ -0,0 +1,54 @@
# 车辆控制指令 WebSocket 接口文档
## 接口概述
- **接口名称**: VehicleCommandInfo
- **协议**: WebSocket
- **连接地址**: `ws://ip:8080/VehicleCommandInfo`
- **说明**: 客户端连接后,服务端会根据红绿灯信号状态自动向所有连接的客户端推送车辆控制指令
## 消息格式
### 服务端 → 客户端(推送车辆控制指令)
**JSON 格式**:
```json
{
"messageUniqueId": "550e8400-e29b-41d4-a716-446655440000",
"timestamp": 1737979200000,
"vehicleID": "AET01",
"commandType": "SIGNAL",
"commandReason": "TRAFFIC_LIGHT",
"signalState": "GREEN",
"intersectionId": "002",
"latitude": 343.23,
"longitude": 343.23,
"relativeSpeed": 3,
"relativeMotionX": 2002.12,
"relativeMotionY": 100.12,
"minDistance": 10.5
}
```
### 字段说明
| 字段名 | 类型 | 说明 | 示例值 |
|--------|------|------|--------|
| `messageUniqueId` | String | 消息唯一标识符 | "550e8400-e29b-41d4-a716-446655440000" |
| `timestamp` | Long | 时间戳(毫秒) | 1737979200000 |
| `vehicleID` | String | 车辆ID | "AET01" |
| `commandType` | String | 指令类型 | "SIGNAL" |
| `commandReason` | String | 指令原因 | "TRAFFIC_LIGHT" |
| `signalState` | String | 信号状态 | "GREEN" 或 "RED" |
| `intersectionId` | String | 交叉口ID | "002" |
| `latitude` | Double | 纬度 | 343.23 |
| `longitude` | Double | 经度 | 343.23 |
| `relativeSpeed` | Integer | 相对速度 | 3 |
| `relativeMotionX` | Double | 相对运动X方向 | 2002.12 |
| `relativeMotionY` | Double | 相对运动Y方向 | 100.12 |
| `minDistance` | Double | 最小距离 | 10.5 |

View File

@ -0,0 +1,128 @@
# MQTT 订阅者项目 - 核心代码总结
## 项目概述
Spring Boot MQTT 订阅者最小可行性验证项目,用于连接 MQTT Broker 并订阅指定 Topic 接收消息数据。
## 技术栈
- Eclipse Paho MQTT 1.2.5
- dotenv-java 3.2.0
## 2. application.yaml
```yaml
mqtt:
broker: ${MQTT_BROKER:tcp://10.0.0.202:8082}
client-id: ${MQTT_CLIENT_ID:springboot-mqtt-client}
username: ${MQTT_USERNAME:mqtt}
password: ${MQTT_PASSWORD:XAsOsLxaPs1}
topic: ${MQTT_TOPIC:test/topic}
timeout: ${MQTT_TIMEOUT:30}
keepalive: ${MQTT_KEEPALIVE:60}
```
## 4. MqttConfig.java
```java
package com.example.demo.config;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqttConfig {
@Value("${mqtt.username:}")
private String username;
@Value("${mqtt.password:}")
private String password;
@Value("${mqtt.timeout:30}")
private int timeout;
@Value("${mqtt.keepalive:60}")
private int keepalive;
@Bean
public MqttConnectOptions mqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
if (username != null && !username.isEmpty()) {
options.setUserName(username);
options.setPassword(password.toCharArray());
}
return options;
}
}
```
---
## 5. MqttSubscriber.java
```java
package com.example.demo.subscriber;
import jakarta.annotation.PostConstruct;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
@Component
public class MqttSubscriber {
@Value("${mqtt.broker}")
private String broker;
@Value("${mqtt.client-id}")
private String clientId;
@Value("${mqtt.topic}")
private String topic;
private final MqttConnectOptions options;
public MqttSubscriber(MqttConnectOptions options) {
this.options = options;
}
@PostConstruct
public void init() throws MqttException {
MqttClient client = new MqttClient(broker, clientId);
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("❌ MQTT 连接断开:" + cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) {
String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
System.out.println("📥 收到消息");
System.out.println("Topic: " + topic);
System.out.println("Payload: " + payload);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
client.connect(options);
client.subscribe(topic, 0);
System.out.println("✅ MQTT 已连接,订阅 Topic" + topic);
}
}
```

View File

@ -86,7 +86,7 @@ data:
at-manager-bsm: /ws/at_manager_bsm
at-manager-path: /ws/at_manager_path
http:
base-url: ${VEHICLE_MANAGER_HTTP_BASE_URL:}
base-url: ${VEHICLE_MANAGER_HTTP_BASE_URL:http://localhost:8020}
status: /api/vehicle_manager/v1/vehicles/{vehicleId}/status
vehicle-details: /api/vehicle_details
poll-interval-ms: 1000
@ -109,5 +109,5 @@ data:
traffic:
light:
tcp:
enabled: ${TRAFFIC_LIGHT_TCP_ENABLED:true}
enabled: ${TRAFFIC_LIGHT_TCP_ENABLED:false}
port: ${TRAFFIC_LIGHT_TCP_PORT:8082}

View File

@ -220,7 +220,7 @@ traffic:
light:
tcp:
# 是否启用TCP服务器
enabled: true
enabled: false
# TCP监听端口
port: 8082
# 最大连接数
@ -230,6 +230,24 @@ traffic:
# 心跳超时时间(分钟)
heartbeat-timeout-minutes: 5
mqtt:
# 是否启用MQTT订阅
enabled: true
# MQTT Broker地址
broker: ${MQTT_BROKER:tcp://10.64.58.228:8082}
# 客户端ID
client-id: ${MQTT_CLIENT_ID:qaup-traffic-light-mqtt}
# 用户名
username: ${MQTT_USERNAME:mqtt}
# 密码
password: ${MQTT_PASSWORD:XAsOsLxaPs1}
# 订阅主题
topic: ${MQTT_TOPIC:cusc/v2/SF053/QingDrsu001/data}
# 连接超时时间(秒)
timeout: ${MQTT_TIMEOUT:30}
# 心跳间隔(秒)
keepalive: ${MQTT_KEEPALIVE:60}
intersection:
# 默认路口ID当信号中没有指定时使用
default-id: "DEFAULT_INTERSECTION"

View File

@ -150,6 +150,13 @@
<scope>test</scope>
</dependency>
<!-- Eclipse Paho MQTT Client -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
</dependencies>
<build>

View File

@ -116,6 +116,7 @@ public class UnmannedVehicle extends MovingObject {
@lombok.Builder
@lombok.NoArgsConstructor
@lombok.AllArgsConstructor
@com.fasterxml.jackson.annotation.JsonIgnoreProperties(ignoreUnknown = true)
public static class WaypointInfo {
private String waypointId;
private Double latitude;

View File

@ -0,0 +1,49 @@
package com.qaup.collision.config;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* MQTT 配置类
*
* 用于配置 MQTT 连接选项支持红绿灯数据通过 MQTT 订阅
*/
@Configuration
public class MqttConfig {
@Value("${traffic.light.mqtt.username:}")
private String username;
@Value("${traffic.light.mqtt.password:}")
private String password;
@Value("${traffic.light.mqtt.timeout:30}")
private int timeout;
@Value("${traffic.light.mqtt.keepalive:60}")
private int keepalive;
/**
* MQTT 连接选项配置
*
* @return MQTT 连接选项
*/
@Bean
public MqttConnectOptions mqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
options.setAutomaticReconnect(true);
// 如果配置了用户名和密码则设置认证信息
if (username != null && !username.isEmpty()) {
options.setUserName(username);
options.setPassword(password.toCharArray());
}
return options;
}
}

View File

@ -51,67 +51,113 @@ public class VehicleTaskController {
private String status;
}
@GetMapping
@GetMapping
public AjaxResult list(@RequestParam(required = false, defaultValue = "1") int pageNum,
@RequestParam(required = false, defaultValue = "10") int pageSize,
@RequestParam(required = false) String status) {
Map<String, MovingObject> activeMovingObjectsCache = dataCollectorService.getActiveMovingObjectsCache();
// HTTP 缓存获取车辆状态数据
Map<String, DataCollectorService.UniversalVehicleStatusCacheEntry> httpCache = dataCollectorService.getUniversalStatusCache();
List<VehicleTaskDTO> allTasks = new ArrayList<>();
for (MovingObject movingObject : activeMovingObjectsCache.values()) {
if (!(movingObject instanceof UnmannedVehicle vehicle)) {
System.out.println("[DEBUG] ========== 任务接口调试信息 ==========");
System.out.println("[DEBUG] httpCache size: " + httpCache.size());
System.out.println("[DEBUG] httpCache keys: " + httpCache.keySet());
for (Map.Entry<String, DataCollectorService.UniversalVehicleStatusCacheEntry> entry : httpCache.entrySet()) {
String cacheKey = entry.getKey();
DataCollectorService.UniversalVehicleStatusCacheEntry cacheEntry = entry.getValue();
System.out.println("[DEBUG] Processing cacheKey: " + cacheKey);
if (cacheEntry.getStatusData() == null) {
System.out.println("[DEBUG] - statusData is null, skipping");
continue;
}
String vehicleId = vehicle.getObjectId();
String missionId = vehicle.getMissionId();
// cacheKey 中提取 vehicleId格式universal_status_vehicleId
String vehicleId = cacheKey.replace("universal_status_", "");
System.out.println("[DEBUG] - vehicleId: " + vehicleId);
com.qaup.collision.datacollector.model.dto.UniversalVehicleStatusDTO statusData = cacheEntry.getStatusData();
com.qaup.collision.datacollector.model.dto.MissionContextDTO missionContext = statusData.getMissionContext();
System.out.println("[DEBUG] - missionContext: " + (missionContext != null ? missionContext.getClass().getSimpleName() : "null"));
if (missionContext != null) {
System.out.println("[DEBUG] - currentMission: " + (missionContext.getCurrentMission() != null ? missionContext.getCurrentMission().getClass().getSimpleName() : "null"));
}
if (missionContext == null || missionContext.getCurrentMission() == null) {
System.out.println("[DEBUG] - Skipping (no missionContext)");
continue;
}
com.qaup.collision.datacollector.model.dto.MissionContextDTO.CurrentMissionDTO currentMission = missionContext.getCurrentMission();
String missionId = currentMission.getMissionId();
System.out.println("[DEBUG] - missionId: " + missionId);
// 如果没有任务ID跳过
if (missionId == null || missionId.isBlank()) {
System.out.println("[DEBUG] - Skipping (no missionId)");
continue;
}
String missionStatus = vehicle.getMissionStatus() != null ? vehicle.getMissionStatus().name() : "UNKNOWN";
// 获取车辆类型VehicleInfoDTO 只有 vehicleId没有 vehicleType使用默认值
String vehicleType = "UNMANNED_VEHICLE";
// 获取任务状态 operationalStatus 获取 operationalMode
String missionStatus = "UNKNOWN";
if (statusData.getOperationalStatus() != null) {
missionStatus = statusData.getOperationalStatus().getOperationalMode();
}
// 状态筛选
if (status != null && !status.isEmpty() && !status.equalsIgnoreCase(missionStatus)) {
System.out.println("[DEBUG] - Skipping (status filter: " + status + " != " + missionStatus + ")");
continue;
}
// 获取车辆类型
String vehicleType = vehicle.getObjectType().name();
System.out.println("[DEBUG] - Adding task: " + vehicleId + ", MissionId: " + missionId);
// 获取路径点
List<WaypointDTO> waypoints = new ArrayList<>();
if (vehicle.getWaypoints() != null) {
for (UnmannedVehicle.WaypointInfo wp : vehicle.getWaypoints()) {
System.out.println("[DEBUG] - missionContext.getWaypoints(): " + missionContext.getWaypoints());
if (missionContext.getWaypoints() != null) {
System.out.println("[DEBUG] - waypoints size: " + missionContext.getWaypoints().size());
for (com.qaup.collision.datacollector.model.dto.MissionContextDTO.WaypointDTO wp : missionContext.getWaypoints()) {
System.out.println("[DEBUG] - waypoint: " + wp.getWaypointId() + ", lat: " + wp.getLatitude() + ", lon: " + wp.getLongitude() + ", status: " + wp.getStatus());
WaypointDTO waypoint = WaypointDTO.builder()
.waypointId(wp.getWaypointId())
.latitude(wp.getLatitude())
.longitude(wp.getLongitude())
.status(wp.getStatus().name())
.status(wp.getStatus())
.build();
waypoints.add(waypoint);
}
} else {
System.out.println("[DEBUG] - waypoints is null");
}
System.out.println("[DEBUG] - final waypoints size: " + waypoints.size());
VehicleTaskDTO task = VehicleTaskDTO.builder()
.vehicleId(vehicleId)
.vehicleType(vehicleType)
.missionId(missionId)
.missionType(vehicle.getMissionType())
.missionType(currentMission.getMissionType())
.missionStatus(missionStatus)
.missionStartTime(vehicle.getMissionStartTime())
.estimatedEndTime(vehicle.getEstimatedEndTime())
.progress(vehicle.getProgress())
.totalMileage(vehicle.getTotalMileage())
.missionStartTime(currentMission.getStartTime())
.estimatedEndTime(currentMission.getEstimatedEndTime())
.progress(currentMission.getProgress())
.totalMileage(currentMission.getTotalMileage())
.waypoints(waypoints)
.lastSeenAt(null) // 暂时设为null后续可以从缓存获取
.lastSeenAt(cacheEntry.getTimestamp())
.build();
allTasks.add(task);
}
System.out.println("[DEBUG] Total tasks found: " + allTasks.size());
System.out.println("[DEBUG] =========================================");
// 按任务开始时间倒序排序
allTasks.sort((a, b) -> {
if (a.getMissionStartTime() == null && b.getMissionStartTime() == null) return 0;
@ -136,29 +182,41 @@ public class VehicleTaskController {
@GetMapping("/{vehicleId}")
public AjaxResult get(@PathVariable String vehicleId) {
Map<String, MovingObject> activeMovingObjectsCache = dataCollectorService.getActiveMovingObjectsCache();
MovingObject movingObject = activeMovingObjectsCache.get(vehicleId);
// HTTP 缓存获取车辆状态数据使用正确的 cacheKey 格式
String cacheKey = "universal_status_" + vehicleId;
DataCollectorService.UniversalVehicleStatusCacheEntry entry = dataCollectorService.getUniversalStatusCache().get(cacheKey);
if (!(movingObject instanceof UnmannedVehicle vehicle)) {
return AjaxResult.error(HttpStatus.NOT_FOUND, "车辆不存在或无任务").put("timestamp", System.currentTimeMillis());
if (entry == null || entry.getStatusData() == null) {
return AjaxResult.error(HttpStatus.NOT_FOUND, "车辆不存在或无数据").put("timestamp", System.currentTimeMillis());
}
String missionId = vehicle.getMissionId();
com.qaup.collision.datacollector.model.dto.UniversalVehicleStatusDTO statusData = entry.getStatusData();
com.qaup.collision.datacollector.model.dto.MissionContextDTO missionContext = statusData.getMissionContext();
if (missionContext == null || missionContext.getCurrentMission() == null) {
return AjaxResult.error(HttpStatus.NOT_FOUND, "车辆无任务").put("timestamp", System.currentTimeMillis());
}
com.qaup.collision.datacollector.model.dto.MissionContextDTO.CurrentMissionDTO currentMission = missionContext.getCurrentMission();
String missionId = currentMission.getMissionId();
if (missionId == null || missionId.isBlank()) {
return AjaxResult.error(HttpStatus.NOT_FOUND, "车辆无任务").put("timestamp", System.currentTimeMillis());
}
String vehicleType = vehicle.getObjectType().name();
String vehicleType = "UNMANNED_VEHICLE";
String missionStatus = statusData.getOperationalStatus() != null
? statusData.getOperationalStatus().getOperationalMode()
: "UNKNOWN";
List<WaypointDTO> waypoints = new ArrayList<>();
if (vehicle.getWaypoints() != null) {
for (UnmannedVehicle.WaypointInfo wp : vehicle.getWaypoints()) {
if (missionContext.getWaypoints() != null) {
for (com.qaup.collision.datacollector.model.dto.MissionContextDTO.WaypointDTO wp : missionContext.getWaypoints()) {
WaypointDTO waypoint = WaypointDTO.builder()
.waypointId(wp.getWaypointId())
.latitude(wp.getLatitude())
.longitude(wp.getLongitude())
.status(wp.getStatus().name())
.status(wp.getStatus())
.build();
waypoints.add(waypoint);
}
@ -168,14 +226,14 @@ public class VehicleTaskController {
.vehicleId(vehicleId)
.vehicleType(vehicleType)
.missionId(missionId)
.missionType(vehicle.getMissionType())
.missionStatus(vehicle.getMissionStatus() != null ? vehicle.getMissionStatus().name() : "UNKNOWN")
.missionStartTime(vehicle.getMissionStartTime())
.estimatedEndTime(vehicle.getEstimatedEndTime())
.progress(vehicle.getProgress())
.totalMileage(vehicle.getTotalMileage())
.missionType(currentMission.getMissionType())
.missionStatus(missionStatus)
.missionStartTime(currentMission.getStartTime())
.estimatedEndTime(currentMission.getEstimatedEndTime())
.progress(currentMission.getProgress())
.totalMileage(currentMission.getTotalMileage())
.waypoints(waypoints)
.lastSeenAt(null) // 暂时设为null
.lastSeenAt(entry.getTimestamp())
.build();
return AjaxResult.success(task).put("timestamp", System.currentTimeMillis());

View File

@ -76,6 +76,7 @@ public class MissionContextDTO {
@NoArgsConstructor
@AllArgsConstructor
@Builder
@com.fasterxml.jackson.annotation.JsonIgnoreProperties(ignoreUnknown = true)
public static class WaypointDTO {
/**
* 路径点ID
@ -96,5 +97,31 @@ public class MissionContextDTO {
* 状态 (PENDING, COMPLETED, SKIPPED)
*/
private String status;
/**
* X坐标 (经度)
*/
@com.fasterxml.jackson.annotation.JsonProperty("x")
private Double x;
/**
* Y坐标 (纬度)
*/
@com.fasterxml.jackson.annotation.JsonProperty("y")
private Double y;
/**
* 获取纬度如果为null则返回x值
*/
public Double getLatitude() {
return latitude != null ? latitude : x;
}
/**
* 获取经度如果为null则返回y值
*/
public Double getLongitude() {
return longitude != null ? longitude : y;
}
}
}

View File

@ -0,0 +1,220 @@
package com.qaup.collision.datacollector.server;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.qaup.collision.websocket.handler.CollisionWebSocketHandler;
import com.qaup.collision.websocket.handler.VehicleCommandInfoWebSocketHandler;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* 红绿灯 MQTT 订阅者
*
* 通过 MQTT 协议订阅红绿灯设备的消息
* TCP 方式并存提供另一种数据接收方式
*/
@Slf4j
@Component
public class TrafficLightMqttSubscriber {
@Value("${traffic.light.mqtt.enabled:false}")
private boolean mqttEnabled;
@Value("${traffic.light.mqtt.broker}")
private String broker;
@Value("${traffic.light.mqtt.client-id}")
private String clientId;
@Value("${traffic.light.mqtt.topic}")
private String topic;
private final MqttConnectOptions mqttConnectOptions;
private final CollisionWebSocketHandler collisionWebSocketHandler;
private final VehicleCommandInfoWebSocketHandler vehicleCommandInfoWebSocketHandler;
private final ObjectMapper objectMapper;
private MqttClient mqttClient;
private final AtomicBoolean connected = new AtomicBoolean(false);
// 统计信息
private final AtomicLong totalMessages = new AtomicLong(0);
private final AtomicLong errorCount = new AtomicLong(0);
/**
* 构造函数
*
* @param mqttConnectOptions MQTT 连接选项
* @param collisionWebSocketHandler WebSocket 处理器用于向前端广播消息
* @param vehicleCommandInfoWebSocketHandler 车辆控制指令 WebSocket 处理器
* @param objectMapper JSON 解析器
*/
@Autowired
public TrafficLightMqttSubscriber(MqttConnectOptions mqttConnectOptions,
CollisionWebSocketHandler collisionWebSocketHandler,
VehicleCommandInfoWebSocketHandler vehicleCommandInfoWebSocketHandler,
ObjectMapper objectMapper) {
this.mqttConnectOptions = mqttConnectOptions;
this.collisionWebSocketHandler = collisionWebSocketHandler;
this.vehicleCommandInfoWebSocketHandler = vehicleCommandInfoWebSocketHandler;
this.objectMapper = objectMapper;
}
/**
* 初始化 MQTT 订阅者
*/
@PostConstruct
public void init() {
if (mqttEnabled) {
connectAndSubscribe();
} else {
log.info("🚦 MQTT 红绿灯订阅已禁用");
}
}
/**
* 连接 MQTT Broker 并订阅主题
*/
public void connectAndSubscribe() {
try {
log.info("🚦 正在连接 MQTT Broker: {}", broker);
// 创建 MQTT 客户端
mqttClient = new MqttClient(broker, clientId);
// 设置回调
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
connected.set(false);
log.error("❌ MQTT 连接断开: {}", cause.getMessage());
errorCount.incrementAndGet();
}
@Override
public void messageArrived(String topic, MqttMessage message) {
String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
totalMessages.incrementAndGet();
log.info("🚦 [MQTT] 收到红绿灯消息");
log.info("🚦 [MQTT] Topic: {}", topic);
log.info("🚦 [MQTT] Payload: {}", payload);
// 通过 WebSocket 广播消息给前端
try {
String websocketMessage = String.format(
"{\"type\":\"trafficLight\",\"source\":\"mqtt\",\"topic\":\"%s\",\"payload\":%s,\"timestamp\":%d}",
topic, payload, System.currentTimeMillis()
);
collisionWebSocketHandler.broadcastMessage(websocketMessage);
log.debug("🚦 [MQTT] 消息已通过 WebSocket 广播到前端");
} catch (Exception e) {
log.error("🚦 [MQTT] WebSocket 广播失败: {}", e.getMessage(), e);
}
// 解析 phaseColor 并发送车辆控制指令
try {
JsonNode jsonNode = objectMapper.readTree(payload);
// serviceData.phases 数组中获取 phaseColor
JsonNode serviceDataNode = jsonNode.get("serviceData");
if (serviceDataNode != null) {
JsonNode phasesNode = serviceDataNode.get("phases");
if (phasesNode != null && phasesNode.isArray() && phasesNode.size() > 0) {
// 获取第一个相位的 phaseColor
JsonNode firstPhase = phasesNode.get(0);
JsonNode phaseColorNode = firstPhase.get("phaseColor");
if (phaseColorNode != null && phaseColorNode.isInt()) {
int phaseColor = phaseColorNode.asInt();
log.info("🚦 [MQTT] 解析到 phaseColor: {}", phaseColor);
// phaseColor 1 时发送 GREEN 3 时发送 RED
if (phaseColor == 1 || phaseColor == 3) {
vehicleCommandInfoWebSocketHandler.sendSignalState(phaseColor);
log.info("🚦 [MQTT] 已发送车辆控制指令: phaseColor={}", phaseColor);
} else {
log.warn("🚦 [MQTT] 未知的 phaseColor 值: {},仅支持 1 (GREEN) 或 3 (RED)", phaseColor);
}
} else {
log.warn("🚦 [MQTT] phases[0] 中未找到 phaseColor 字段或格式不正确");
}
} else {
log.warn("🚦 [MQTT] serviceData 中未找到 phases 数组或数组为空");
}
} else {
log.warn("🚦 [MQTT] 消息中未找到 serviceData 字段");
}
} catch (Exception e) {
log.error("🚦 [MQTT] 解析 phaseColor 或发送控制指令失败: {}", e.getMessage(), e);
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// QOS 1/2 消息发送完成回调发布者使用订阅者不需要
}
});
// 连接到 Broker
mqttClient.connect(mqttConnectOptions);
connected.set(true);
// 订阅主题
mqttClient.subscribe(topic, 0); // QOS 0 - 最多一次
log.info("✅ MQTT 红绿灯订阅已连接,订阅 Topic: {}", topic);
} catch (MqttException e) {
log.error("❌ MQTT 连接失败: {}", e.getMessage(), e);
connected.set(false);
errorCount.incrementAndGet();
}
}
/**
* 断开 MQTT 连接
*/
@PreDestroy
public void disconnect() {
if (mqttClient != null && connected.get()) {
try {
log.info("🚦 正在断开 MQTT 连接...");
mqttClient.disconnect();
mqttClient.close();
connected.set(false);
log.info("✅ MQTT 连接已断开");
} catch (MqttException e) {
log.error("❌ MQTT 断开连接失败: {}", e.getMessage(), e);
}
}
}
/**
* 获取连接状态
*
* @return 是否已连接
*/
public boolean isConnected() {
return connected.get();
}
/**
* 获取统计信息
*
* @return 统计信息字符串
*/
public String getStatistics() {
return String.format("MQTT订阅状态 - 连接:%s, 总消息:%d, 错误:%d",
connected.get() ? "" : "", totalMessages.get(), errorCount.get());
}
}

View File

@ -643,12 +643,29 @@ public class DataCollectorService {
// 提取路径点信息
if (missionContext.getWaypoints() != null) {
java.util.List<UnmannedVehicle.WaypointInfo> waypoints = missionContext.getWaypoints().stream()
.map(wp -> UnmannedVehicle.WaypointInfo.builder()
.waypointId(wp.getWaypointId())
.latitude(wp.getLatitude())
.longitude(wp.getLongitude())
.status(UnmannedVehicle.WaypointStatus.valueOf(wp.getStatus()))
.build())
.map(wp -> {
// 安全处理状态字段避免 null 导致 NPE
UnmannedVehicle.WaypointStatus status = UnmannedVehicle.WaypointStatus.PENDING;
if (wp.getStatus() != null && !wp.getStatus().isEmpty()) {
try {
status = UnmannedVehicle.WaypointStatus.valueOf(wp.getStatus());
} catch (IllegalArgumentException e) {
// 如果状态值无效使用默认值
status = UnmannedVehicle.WaypointStatus.PENDING;
}
}
// x 对应 latitude纬度y 对应 longitude经度
Double lat = wp.getX() != null ? wp.getX() : wp.getLatitude();
Double lon = wp.getY() != null ? wp.getY() : wp.getLongitude();
return UnmannedVehicle.WaypointInfo.builder()
.waypointId(wp.getWaypointId())
.latitude(lat)
.longitude(lon)
.status(status)
.build();
})
.collect(java.util.stream.Collectors.toList());
vehicleBuilder.waypoints(waypoints);
}
@ -809,21 +826,47 @@ public class DataCollectorService {
// 如果存在missionContext数据更新到activeMovingObjectsCache中的无人车对象
if (statusData.getMissionContext() != null) {
log.info("========== 开始更新任务上下文 ==========");
log.info("vehicleId: {}", vehicleId);
log.info("missionContext: {}", statusData.getMissionContext());
log.info("waypoints: {}", statusData.getMissionContext().getWaypoints());
updateUnmannedVehicleMissionContext(vehicleId, statusData.getMissionContext());
log.info("========== 任务上下文更新完成 ==========");
}
log.debug("缓存通用车辆状态数据: vehicleId={}, cacheKey={}, 包含任务上下文: {}",
vehicleId, cacheKey, statusData.getMissionContext() != null);
}
/**
* 更新无人车的任务上下文信息
*/
private void updateUnmannedVehicleMissionContext(String vehicleId, MissionContextDTO missionContext) {
MovingObject existingObject = activeMovingObjectsCache.get(vehicleId);
/**
if (existingObject instanceof UnmannedVehicle) {
UnmannedVehicle unmannedVehicle = (UnmannedVehicle) existingObject;
* 更新无人车的任务上下文信息
*/
private void updateUnmannedVehicleMissionContext(String vehicleId, MissionContextDTO missionContext) {
log.info("========== updateUnmannedVehicleMissionContext 开始 ==========");
log.info("vehicleId: {}", vehicleId);
log.info("activeMovingObjectsCache 是否包含 vehicleId: {}", activeMovingObjectsCache.containsKey(vehicleId));
MovingObject existingObject = activeMovingObjectsCache.get(vehicleId);
log.info("existingObject: {}", existingObject);
log.info("existingObject 类型: {}", existingObject != null ? existingObject.getClass().getName() : "null");
if (existingObject instanceof UnmannedVehicle) {
log.info("找到 UnmannedVehicle 对象,开始更新任务上下文");
UnmannedVehicle unmannedVehicle = (UnmannedVehicle) existingObject;
// 更新任务上下文信息
if (missionContext.getCurrentMission() != null) {
@ -838,15 +881,38 @@ public class DataCollectorService {
// 更新路径点信息
if (missionContext.getWaypoints() != null) {
log.info("更新路径点信息: vehicleId={}, waypoints数量={}", vehicleId, missionContext.getWaypoints().size());
java.util.List<UnmannedVehicle.WaypointInfo> waypoints = missionContext.getWaypoints().stream()
.map(wp -> UnmannedVehicle.WaypointInfo.builder()
.waypointId(wp.getWaypointId())
.latitude(wp.getLatitude())
.longitude(wp.getLongitude())
.status(UnmannedVehicle.WaypointStatus.valueOf(wp.getStatus()))
.build())
.map(wp -> {
// 安全处理状态字段避免 null 导致 NPE
UnmannedVehicle.WaypointStatus status = UnmannedVehicle.WaypointStatus.PENDING;
if (wp.getStatus() != null && !wp.getStatus().isEmpty()) {
try {
status = UnmannedVehicle.WaypointStatus.valueOf(wp.getStatus());
} catch (IllegalArgumentException e) {
// 如果状态值无效使用默认值
status = UnmannedVehicle.WaypointStatus.PENDING;
}
}
// x 对应 latitude纬度y 对应 longitude经度
Double lat = wp.getX() != null ? wp.getX() : wp.getLatitude();
Double lon = wp.getY() != null ? wp.getY() : wp.getLongitude();
log.info(" 路径点: waypointId={}, x={}, y={}, latitude={}, longitude={}, status={}",
wp.getWaypointId(), wp.getX(), wp.getY(), lat, lon, wp.getStatus());
return UnmannedVehicle.WaypointInfo.builder()
.waypointId(wp.getWaypointId())
.latitude(lat)
.longitude(lon)
.status(status)
.build();
})
.collect(java.util.stream.Collectors.toList());
unmannedVehicle.setWaypoints(waypoints);
log.info("路径点更新完成: vehicleId={}, 设置的路径点数量={}", vehicleId, waypoints.size());
} else {
log.debug("路径点为空: vehicleId={}", vehicleId);
}
// 更新缓存
@ -860,6 +926,7 @@ public class DataCollectorService {
} else {
log.debug("缓存中未找到无人车对象或类型不匹配,跳过任务上下文更新: vehicleId={}", vehicleId);
}
log.info("========== updateUnmannedVehicleMissionContext 完成 ==========");
}
// 用于存储通用车辆状态数据的缓存

View File

@ -7,6 +7,7 @@ import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import com.qaup.collision.websocket.handler.CollisionWebSocketHandler;
import com.qaup.collision.websocket.handler.VehicleCommandInfoWebSocketHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -24,9 +25,12 @@ public class WebSocketConfig implements WebSocketConfigurer {
private static final Logger logger = LoggerFactory.getLogger(WebSocketConfig.class);
private final CollisionWebSocketHandler collisionWebSocketHandler;
private final VehicleCommandInfoWebSocketHandler vehicleCommandInfoWebSocketHandler;
public WebSocketConfig(CollisionWebSocketHandler collisionWebSocketHandler) {
public WebSocketConfig(CollisionWebSocketHandler collisionWebSocketHandler,
VehicleCommandInfoWebSocketHandler vehicleCommandInfoWebSocketHandler) {
this.collisionWebSocketHandler = collisionWebSocketHandler;
this.vehicleCommandInfoWebSocketHandler = vehicleCommandInfoWebSocketHandler;
logger.info("🚀 WebSocket配置类初始化...");
}
@ -37,10 +41,14 @@ public class WebSocketConfig implements WebSocketConfigurer {
// 注册冲突检测WebSocket端点
registry.addHandler(collisionWebSocketHandler, "/collision")
.setAllowedOrigins("*"); // 允许所有来源生产环境应该限制具体域名
// 注册车辆控制指令测试端点
registry.addHandler(vehicleCommandInfoWebSocketHandler, "/VehicleCommandInfo")
.setAllowedOrigins("*");
logger.info("✅ WebSocket端点注册完成");
logger.info("🎯 端点路径: /collision");
logger.info("🎯 端点路径: /collision, /VehicleCommandInfo");
logger.info("🌐 允许的来源: *");
logger.info("📡 WebSocket服务可用: ws://localhost:8080/collision");
logger.info("📡 WebSocket服务可用: ws://localhost:8080/collision, ws://localhost:8080/VehicleCommandInfo");
}
}

View File

@ -0,0 +1,147 @@
package com.qaup.collision.websocket.handler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
* WebSocket 车辆控制指令端点
*
* 客户端可通过 ws://<host>:8080/VehicleCommandInfo 连接
* 服务端根据红绿灯信号状态向所有连接会话发送车辆控制指令
*/
@Component
public class VehicleCommandInfoWebSocketHandler implements WebSocketHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(VehicleCommandInfoWebSocketHandler.class);
private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
@Override
public void afterConnectionEstablished(@NonNull WebSocketSession session) {
String sessionId = session.getId();
sessions.put(sessionId, session);
LOGGER.info("VehicleCommandInfo WebSocket 连接建立, sessionId={}", sessionId);
}
@Override
public void handleMessage(@NonNull WebSocketSession session, @NonNull WebSocketMessage<?> message) {
// 简单回显客户端消息主要用于连通性测试
LOGGER.debug("VehicleCommandInfo 收到客户端消息, sessionId={}, payload={}",
session.getId(), message.getPayload());
}
@Override
public void handleTransportError(@NonNull WebSocketSession session, @NonNull Throwable exception) {
LOGGER.warn("VehicleCommandInfo WebSocket 传输错误, sessionId={}", session.getId(), exception);
cleanupSession(session, CloseStatus.SERVER_ERROR);
}
@Override
public void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull CloseStatus closeStatus) {
LOGGER.info("VehicleCommandInfo WebSocket 连接关闭, sessionId={}, status={}", session.getId(), closeStatus);
cleanupSession(session, closeStatus);
}
@Override
public boolean supportsPartialMessages() {
return false;
}
/**
* 发送信号状态给所有连接的客户端
*
* @param phaseColor 信号颜色值1=green, 3=red
*/
public void sendSignalState(int phaseColor) {
String signalState = (phaseColor == 1) ? "GREEN" : "RED";
String commandJson = String.format(
"{" +
"\"messageUniqueId\": \"%s\"," +
"\"timestamp\": %d," +
"\"vehicleID\": \"AET01\"," +
"\"commandType\": \"SIGNAL\"," +
"\"commandReason\": \"TRAFFIC_LIGHT\"," +
"\"signalState\":\"%s\"," +
"\"intersectionId\":\"002\"," +
"\"latitude\": 343.23," +
"\"longitude\": 343.23," +
"\"relativeSpeed\": 3," +
"\"relativeMotionX\": 2002.12," +
"\"relativeMotionY\":100.12," +
"\"minDistance\":10.5" +
"}",
UUID.randomUUID().toString(),
System.currentTimeMillis(),
signalState
);
LOGGER.info("🚦 [VehicleCommandInfo] 发送车辆控制指令: phaseColor={}, signalState={}", phaseColor, signalState);
LOGGER.info("🚦 [VehicleCommandInfo] 当前连接数: {}", sessions.size());
if (sessions.isEmpty()) {
LOGGER.warn("🚦 [VehicleCommandInfo] 没有客户端连接,无法发送消息");
return;
}
// 向所有连接的客户端发送
sessions.values().forEach(session -> sendMessage(session, commandJson));
LOGGER.info("🚦 [VehicleCommandInfo] 已向 {} 个客户端发送消息", sessions.size());
}
/**
* 发送消息给指定会话
*/
private void sendMessage(WebSocketSession session, String message) {
if (session == null || !session.isOpen()) {
return;
}
try {
synchronized (session) {
if (session.isOpen()) {
session.sendMessage(new TextMessage(message));
}
}
} catch (IOException e) {
LOGGER.warn("向 sessionId={} 发送车辆控制指令 JSON 失败: {}", session.getId(), e.getMessage());
cleanupSession(session, CloseStatus.SESSION_NOT_RELIABLE);
} catch (Exception e) {
LOGGER.warn("向 sessionId={} 发送车辆控制指令 JSON 时发生异常", session.getId(), e);
cleanupSession(session, CloseStatus.SERVER_ERROR);
}
}
/**
* 清理会话
*/
private void cleanupSession(WebSocketSession session, CloseStatus status) {
String sessionId = session.getId();
sessions.remove(sessionId);
try {
if (session.isOpen()) {
session.close(status);
}
} catch (IOException e) {
LOGGER.debug("关闭 sessionId={} 时出现异常: {}", sessionId, e.getMessage());
}
}
/**
* 获取当前连接数
*/
public int getConnectionCount() {
return sessions.size();
}
}

View File

@ -112,7 +112,7 @@ public class SecurityConfig
// 对于登录login 注册register 验证码captchaImage 允许匿名访问
requests.requestMatchers("/login", "/register", "/captchaImage").permitAll()
// WebSocket端点允许匿名访问
.requestMatchers("/collision", "/collision/**", "/test/websocket/**").permitAll()
.requestMatchers("/collision", "/collision/**", "/VehicleCommandInfo", "/test/websocket/**").permitAll()
// 静态资源可匿名访问
.requestMatchers(HttpMethod.GET, "/", "/*.html", "/**.html", "/**.css", "/**.js", "/profile/**").permitAll()
.requestMatchers("/swagger-ui.html", "/v3/api-docs/**", "/swagger-ui/**", "/druid/**").permitAll()

163
test_websocket.html Normal file
View File

@ -0,0 +1,163 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>WebSocket 测试</title>
<style>
body { font-family: Arial, sans-serif; padding: 20px; }
.test-section { margin: 20px 0; padding: 15px; border: 1px solid #ccc; border-radius: 5px; }
.success { background-color: #d4edda; border-color: #c3e6cb; }
.error { background-color: #f8d7da; border-color: #f5c6cb; }
.log { background-color: #f8f9fa; padding: 10px; margin-top: 10px; border-radius: 3px; font-family: monospace; max-height: 300px; overflow-y: auto; }
button { padding: 8px 16px; margin: 5px; cursor: pointer; }
</style>
</head>
<body>
<h1>WebSocket 连接测试</h1>
<div class="test-section">
<h3>测试 1: /collision 端点</h3>
<button onclick="testCollision()">连接 /collision</button>
<button onclick="closeCollision()">关闭连接</button>
<div id="collisionStatus" style="margin-top: 10px;">状态: 未连接</div>
<div id="collisionLog" class="log"></div>
</div>
<div class="test-section">
<h3>测试 2: /VehicleCommandInfo 端点</h3>
<button onclick="testVehicleCommand()">连接 /VehicleCommandInfo</button>
<button onclick="closeVehicleCommand()">关闭连接</button>
<div id="vehicleStatus" style="margin-top: 10px;">状态: 未连接</div>
<div id="vehicleLog" class="log"></div>
</div>
<script>
let collisionWs = null;
let vehicleWs = null;
function log(elementId, message) {
const logElement = document.getElementById(elementId);
const timestamp = new Date().toLocaleTimeString();
logElement.innerHTML += `[${timestamp}] ${message}<br>`;
logElement.scrollTop = logElement.scrollHeight;
}
function testCollision() {
if (collisionWs) {
log('collisionLog', '已经存在连接,请先关闭');
return;
}
log('collisionLog', '尝试连接 ws://localhost:8080/collision...');
try {
collisionWs = new WebSocket('ws://localhost:8080/collision');
collisionWs.onopen = function(event) {
document.getElementById('collisionStatus').innerHTML = '状态: <span style="color: green">已连接</span>';
document.getElementById('collisionStatus').parentElement.classList.add('success');
log('collisionLog', '✅ 连接成功! readyState: ' + collisionWs.readyState);
};
collisionWs.onmessage = function(event) {
log('collisionLog', '📨 收到消息: ' + event.data);
};
collisionWs.onerror = function(event) {
document.getElementById('collisionStatus').innerHTML = '状态: <span style="color: red">错误</span>';
document.getElementById('collisionStatus').parentElement.classList.add('error');
log('collisionLog', '❌ 连接错误');
log('collisionLog', '错误详情: ' + JSON.stringify({
type: event.type,
target: event.target ? event.target.url : 'N/A',
readyState: event.target ? event.target.readyState : 'N/A'
}));
};
collisionWs.onclose = function(event) {
document.getElementById('collisionStatus').innerHTML = '状态: <span style="color: orange">已关闭</span>';
document.getElementById('collisionStatus').parentElement.classList.remove('success', 'error');
log('collisionLog', '🔌 连接关闭');
log('collisionLog', '关闭代码: ' + event.code + ', 原因: ' + event.reason);
collisionWs = null;
};
} catch (error) {
log('collisionLog', '❌ 异常: ' + error.message);
}
}
function testVehicleCommand() {
if (vehicleWs) {
log('vehicleLog', '已经存在连接,请先关闭');
return;
}
log('vehicleLog', '尝试连接 ws://localhost:8080/VehicleCommandInfo...');
try {
vehicleWs = new WebSocket('ws://localhost:8080/VehicleCommandInfo');
vehicleWs.onopen = function(event) {
document.getElementById('vehicleStatus').innerHTML = '状态: <span style="color: green">已连接</span>';
document.getElementById('vehicleStatus').parentElement.classList.add('success');
log('vehicleLog', '✅ 连接成功! readyState: ' + vehicleWs.readyState);
};
vehicleWs.onmessage = function(event) {
log('vehicleLog', '📨 收到消息: ' + event.data);
};
vehicleWs.onerror = function(event) {
document.getElementById('vehicleStatus').innerHTML = '状态: <span style="color: red">错误</span>';
document.getElementById('vehicleStatus').parentElement.classList.add('error');
log('vehicleLog', '❌ 连接错误');
log('vehicleLog', '错误详情: ' + JSON.stringify({
type: event.type,
target: event.target ? event.target.url : 'N/A',
readyState: event.target ? event.target.readyState : 'N/A'
}, null, 2));
// 尝试获取更多错误信息
if (event.target) {
log('vehicleLog', 'WebSocket 对象属性:');
for (let key in event.target) {
try {
const value = event.target[key];
if (typeof value !== 'function') {
log('vehicleLog', ` ${key}: ${value}`);
}
} catch (e) {
// 忽略无法访问的属性
}
}
}
};
vehicleWs.onclose = function(event) {
document.getElementById('vehicleStatus').innerHTML = '状态: <span style="color: orange">已关闭</span>';
document.getElementById('vehicleStatus').parentElement.classList.remove('success', 'error');
log('vehicleLog', '🔌 连接关闭');
log('vehicleLog', '关闭代码: ' + event.code + ', 原因: ' + event.reason);
vehicleWs = null;
};
} catch (error) {
log('vehicleLog', '❌ 异常: ' + error.message);
}
}
function closeCollision() {
if (collisionWs) {
collisionWs.close();
log('collisionLog', '主动关闭连接');
}
}
function closeVehicleCommand() {
if (vehicleWs) {
vehicleWs.close();
log('vehicleLog', '主动关闭连接');
}
}
</script>
</body>
</html>

28
接口文档3.md Normal file
View File

@ -0,0 +1,28 @@
车辆管理系统接口文档
1. 查询车辆列表
接口描述
该接口用于查询项目车辆列表数据。
接口详情
协议名称: HTTP
接口地址: /api/vehicle_details
请求方法: GET
请求参数: 无
参数名称,参数类型,描述
code,int,状态码
msg,str,任务信息
data,str,车辆列表
timestamp,float,时间戳
{
"code": 200,
"message": "success",
"data": [
"AET02",
"AET01"
],
"timestamp": 0
}