diff --git a/VehicleCommandInfo接口文档.md b/VehicleCommandInfo接口文档.md new file mode 100644 index 0000000..1b97df9 --- /dev/null +++ b/VehicleCommandInfo接口文档.md @@ -0,0 +1,54 @@ +# 车辆控制指令 WebSocket 接口文档 + +## 接口概述 + +- **接口名称**: VehicleCommandInfo +- **协议**: WebSocket +- **连接地址**: `ws://ip:8080/VehicleCommandInfo` +- **说明**: 客户端连接后,服务端会根据红绿灯信号状态自动向所有连接的客户端推送车辆控制指令 + +## 消息格式 + +### 服务端 → 客户端(推送车辆控制指令) + +**JSON 格式**: + +```json +{ + "messageUniqueId": "550e8400-e29b-41d4-a716-446655440000", + "timestamp": 1737979200000, + "vehicleID": "AET01", + "commandType": "SIGNAL", + "commandReason": "TRAFFIC_LIGHT", + "signalState": "GREEN", + "intersectionId": "002", + "latitude": 343.23, + "longitude": 343.23, + "relativeSpeed": 3, + "relativeMotionX": 2002.12, + "relativeMotionY": 100.12, + "minDistance": 10.5 +} +``` + +### 字段说明 + +| 字段名 | 类型 | 说明 | 示例值 | +|--------|------|------|--------| +| `messageUniqueId` | String | 消息唯一标识符 | "550e8400-e29b-41d4-a716-446655440000" | +| `timestamp` | Long | 时间戳(毫秒) | 1737979200000 | +| `vehicleID` | String | 车辆ID | "AET01" | +| `commandType` | String | 指令类型 | "SIGNAL" | +| `commandReason` | String | 指令原因 | "TRAFFIC_LIGHT" | +| `signalState` | String | 信号状态 | "GREEN" 或 "RED" | +| `intersectionId` | String | 交叉口ID | "002" | +| `latitude` | Double | 纬度 | 343.23 | +| `longitude` | Double | 经度 | 343.23 | +| `relativeSpeed` | Integer | 相对速度 | 3 | +| `relativeMotionX` | Double | 相对运动X方向 | 2002.12 | +| `relativeMotionY` | Double | 相对运动Y方向 | 100.12 | +| `minDistance` | Double | 最小距离 | 10.5 | + + + + diff --git a/mqtt项目核心代码总结.md b/mqtt项目核心代码总结.md new file mode 100644 index 0000000..01bd98a --- /dev/null +++ b/mqtt项目核心代码总结.md @@ -0,0 +1,128 @@ +# MQTT 订阅者项目 - 核心代码总结 + +## 项目概述 +Spring Boot MQTT 订阅者最小可行性验证项目,用于连接 MQTT Broker 并订阅指定 Topic 接收消息数据。 + +## 技术栈 +- Eclipse Paho MQTT 1.2.5 +- dotenv-java 3.2.0 + + +## 2. application.yaml + +```yaml +mqtt: + broker: ${MQTT_BROKER:tcp://10.0.0.202:8082} + client-id: ${MQTT_CLIENT_ID:springboot-mqtt-client} + username: ${MQTT_USERNAME:mqtt} + password: ${MQTT_PASSWORD:XAsOsLxaPs1} + topic: ${MQTT_TOPIC:test/topic} + timeout: ${MQTT_TIMEOUT:30} + keepalive: ${MQTT_KEEPALIVE:60} +``` + +## 4. MqttConfig.java + +```java +package com.example.demo.config; + +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class MqttConfig { + + @Value("${mqtt.username:}") + private String username; + + @Value("${mqtt.password:}") + private String password; + + @Value("${mqtt.timeout:30}") + private int timeout; + + @Value("${mqtt.keepalive:60}") + private int keepalive; + + @Bean + public MqttConnectOptions mqttConnectOptions() { + MqttConnectOptions options = new MqttConnectOptions(); + options.setCleanSession(true); + options.setConnectionTimeout(timeout); + options.setKeepAliveInterval(keepalive); + + if (username != null && !username.isEmpty()) { + options.setUserName(username); + options.setPassword(password.toCharArray()); + } + + return options; + } +} +``` + +--- + +## 5. MqttSubscriber.java + +```java +package com.example.demo.subscriber; + +import jakarta.annotation.PostConstruct; +import org.eclipse.paho.client.mqttv3.*; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.nio.charset.StandardCharsets; + +@Component +public class MqttSubscriber { + + @Value("${mqtt.broker}") + private String broker; + + @Value("${mqtt.client-id}") + private String clientId; + + @Value("${mqtt.topic}") + private String topic; + + private final MqttConnectOptions options; + + public MqttSubscriber(MqttConnectOptions options) { + this.options = options; + } + + @PostConstruct + public void init() throws MqttException { + MqttClient client = new MqttClient(broker, clientId); + + client.setCallback(new MqttCallback() { + @Override + public void connectionLost(Throwable cause) { + System.out.println("❌ MQTT 连接断开:" + cause.getMessage()); + } + + @Override + public void messageArrived(String topic, MqttMessage message) { + String payload = new String(message.getPayload(), StandardCharsets.UTF_8); + System.out.println("📥 收到消息"); + System.out.println("Topic: " + topic); + System.out.println("Payload: " + payload); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + } + }); + + client.connect(options); + client.subscribe(topic, 0); + + System.out.println("✅ MQTT 已连接,订阅 Topic:" + topic); + } +} +``` + diff --git a/qaup-admin/src/main/resources/application-prod.yml b/qaup-admin/src/main/resources/application-prod.yml index f4e6802..2dc5140 100644 --- a/qaup-admin/src/main/resources/application-prod.yml +++ b/qaup-admin/src/main/resources/application-prod.yml @@ -86,7 +86,7 @@ data: at-manager-bsm: /ws/at_manager_bsm at-manager-path: /ws/at_manager_path http: - base-url: ${VEHICLE_MANAGER_HTTP_BASE_URL:} + base-url: ${VEHICLE_MANAGER_HTTP_BASE_URL:http://localhost:8020} status: /api/vehicle_manager/v1/vehicles/{vehicleId}/status vehicle-details: /api/vehicle_details poll-interval-ms: 1000 @@ -109,5 +109,5 @@ data: traffic: light: tcp: - enabled: ${TRAFFIC_LIGHT_TCP_ENABLED:true} + enabled: ${TRAFFIC_LIGHT_TCP_ENABLED:false} port: ${TRAFFIC_LIGHT_TCP_PORT:8082} diff --git a/qaup-admin/src/main/resources/application.yml b/qaup-admin/src/main/resources/application.yml index 38da7b7..b3e3456 100644 --- a/qaup-admin/src/main/resources/application.yml +++ b/qaup-admin/src/main/resources/application.yml @@ -220,7 +220,7 @@ traffic: light: tcp: # 是否启用TCP服务器 - enabled: true + enabled: false # TCP监听端口 port: 8082 # 最大连接数 @@ -230,6 +230,24 @@ traffic: # 心跳超时时间(分钟) heartbeat-timeout-minutes: 5 + mqtt: + # 是否启用MQTT订阅 + enabled: true + # MQTT Broker地址 + broker: ${MQTT_BROKER:tcp://10.64.58.228:8082} + # 客户端ID + client-id: ${MQTT_CLIENT_ID:qaup-traffic-light-mqtt} + # 用户名 + username: ${MQTT_USERNAME:mqtt} + # 密码 + password: ${MQTT_PASSWORD:XAsOsLxaPs1} + # 订阅主题 + topic: ${MQTT_TOPIC:cusc/v2/SF053/QingDrsu001/data} + # 连接超时时间(秒) + timeout: ${MQTT_TIMEOUT:30} + # 心跳间隔(秒) + keepalive: ${MQTT_KEEPALIVE:60} + intersection: # 默认路口ID(当信号中没有指定时使用) default-id: "DEFAULT_INTERSECTION" diff --git a/qaup-collision/pom.xml b/qaup-collision/pom.xml index 4a09f87..f43361b 100644 --- a/qaup-collision/pom.xml +++ b/qaup-collision/pom.xml @@ -150,6 +150,13 @@ test + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.5 + + diff --git a/qaup-collision/src/main/java/com/qaup/collision/common/model/UnmannedVehicle.java b/qaup-collision/src/main/java/com/qaup/collision/common/model/UnmannedVehicle.java index d74ea7e..ab8a231 100644 --- a/qaup-collision/src/main/java/com/qaup/collision/common/model/UnmannedVehicle.java +++ b/qaup-collision/src/main/java/com/qaup/collision/common/model/UnmannedVehicle.java @@ -116,6 +116,7 @@ public class UnmannedVehicle extends MovingObject { @lombok.Builder @lombok.NoArgsConstructor @lombok.AllArgsConstructor + @com.fasterxml.jackson.annotation.JsonIgnoreProperties(ignoreUnknown = true) public static class WaypointInfo { private String waypointId; private Double latitude; diff --git a/qaup-collision/src/main/java/com/qaup/collision/config/MqttConfig.java b/qaup-collision/src/main/java/com/qaup/collision/config/MqttConfig.java new file mode 100644 index 0000000..7e55abb --- /dev/null +++ b/qaup-collision/src/main/java/com/qaup/collision/config/MqttConfig.java @@ -0,0 +1,49 @@ +package com.qaup.collision.config; + +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * MQTT 配置类 + * + * 用于配置 MQTT 连接选项,支持红绿灯数据通过 MQTT 订阅 + */ +@Configuration +public class MqttConfig { + + @Value("${traffic.light.mqtt.username:}") + private String username; + + @Value("${traffic.light.mqtt.password:}") + private String password; + + @Value("${traffic.light.mqtt.timeout:30}") + private int timeout; + + @Value("${traffic.light.mqtt.keepalive:60}") + private int keepalive; + + /** + * MQTT 连接选项配置 + * + * @return MQTT 连接选项 + */ + @Bean + public MqttConnectOptions mqttConnectOptions() { + MqttConnectOptions options = new MqttConnectOptions(); + options.setCleanSession(true); + options.setConnectionTimeout(timeout); + options.setKeepAliveInterval(keepalive); + options.setAutomaticReconnect(true); + + // 如果配置了用户名和密码,则设置认证信息 + if (username != null && !username.isEmpty()) { + options.setUserName(username); + options.setPassword(password.toCharArray()); + } + + return options; + } +} \ No newline at end of file diff --git a/qaup-collision/src/main/java/com/qaup/collision/controller/VehicleTaskController.java b/qaup-collision/src/main/java/com/qaup/collision/controller/VehicleTaskController.java index 6b3dd2c..2772e51 100644 --- a/qaup-collision/src/main/java/com/qaup/collision/controller/VehicleTaskController.java +++ b/qaup-collision/src/main/java/com/qaup/collision/controller/VehicleTaskController.java @@ -51,67 +51,113 @@ public class VehicleTaskController { private String status; } - @GetMapping +@GetMapping public AjaxResult list(@RequestParam(required = false, defaultValue = "1") int pageNum, @RequestParam(required = false, defaultValue = "10") int pageSize, @RequestParam(required = false) String status) { - Map activeMovingObjectsCache = dataCollectorService.getActiveMovingObjectsCache(); + // 从 HTTP 缓存获取车辆状态数据 + Map httpCache = dataCollectorService.getUniversalStatusCache(); List allTasks = new ArrayList<>(); - for (MovingObject movingObject : activeMovingObjectsCache.values()) { - if (!(movingObject instanceof UnmannedVehicle vehicle)) { + System.out.println("[DEBUG] ========== 任务接口调试信息 =========="); + System.out.println("[DEBUG] httpCache size: " + httpCache.size()); + System.out.println("[DEBUG] httpCache keys: " + httpCache.keySet()); + + for (Map.Entry entry : httpCache.entrySet()) { + String cacheKey = entry.getKey(); + DataCollectorService.UniversalVehicleStatusCacheEntry cacheEntry = entry.getValue(); + + System.out.println("[DEBUG] Processing cacheKey: " + cacheKey); + + if (cacheEntry.getStatusData() == null) { + System.out.println("[DEBUG] - statusData is null, skipping"); continue; } - String vehicleId = vehicle.getObjectId(); - String missionId = vehicle.getMissionId(); + // 从 cacheKey 中提取 vehicleId(格式:universal_status_vehicleId) + String vehicleId = cacheKey.replace("universal_status_", ""); + System.out.println("[DEBUG] - vehicleId: " + vehicleId); + + com.qaup.collision.datacollector.model.dto.UniversalVehicleStatusDTO statusData = cacheEntry.getStatusData(); + com.qaup.collision.datacollector.model.dto.MissionContextDTO missionContext = statusData.getMissionContext(); + + System.out.println("[DEBUG] - missionContext: " + (missionContext != null ? missionContext.getClass().getSimpleName() : "null")); + if (missionContext != null) { + System.out.println("[DEBUG] - currentMission: " + (missionContext.getCurrentMission() != null ? missionContext.getCurrentMission().getClass().getSimpleName() : "null")); + } + + if (missionContext == null || missionContext.getCurrentMission() == null) { + System.out.println("[DEBUG] - Skipping (no missionContext)"); + continue; + } + + com.qaup.collision.datacollector.model.dto.MissionContextDTO.CurrentMissionDTO currentMission = missionContext.getCurrentMission(); + String missionId = currentMission.getMissionId(); + + System.out.println("[DEBUG] - missionId: " + missionId); - // 如果没有任务ID,跳过 if (missionId == null || missionId.isBlank()) { + System.out.println("[DEBUG] - Skipping (no missionId)"); continue; } - String missionStatus = vehicle.getMissionStatus() != null ? vehicle.getMissionStatus().name() : "UNKNOWN"; + // 获取车辆类型(VehicleInfoDTO 只有 vehicleId,没有 vehicleType,使用默认值) + String vehicleType = "UNMANNED_VEHICLE"; + + // 获取任务状态(从 operationalStatus 获取 operationalMode) + String missionStatus = "UNKNOWN"; + if (statusData.getOperationalStatus() != null) { + missionStatus = statusData.getOperationalStatus().getOperationalMode(); + } // 状态筛选 if (status != null && !status.isEmpty() && !status.equalsIgnoreCase(missionStatus)) { + System.out.println("[DEBUG] - Skipping (status filter: " + status + " != " + missionStatus + ")"); continue; } - // 获取车辆类型 - String vehicleType = vehicle.getObjectType().name(); + System.out.println("[DEBUG] - Adding task: " + vehicleId + ", MissionId: " + missionId); // 获取路径点 List waypoints = new ArrayList<>(); - if (vehicle.getWaypoints() != null) { - for (UnmannedVehicle.WaypointInfo wp : vehicle.getWaypoints()) { + System.out.println("[DEBUG] - missionContext.getWaypoints(): " + missionContext.getWaypoints()); + if (missionContext.getWaypoints() != null) { + System.out.println("[DEBUG] - waypoints size: " + missionContext.getWaypoints().size()); + for (com.qaup.collision.datacollector.model.dto.MissionContextDTO.WaypointDTO wp : missionContext.getWaypoints()) { + System.out.println("[DEBUG] - waypoint: " + wp.getWaypointId() + ", lat: " + wp.getLatitude() + ", lon: " + wp.getLongitude() + ", status: " + wp.getStatus()); WaypointDTO waypoint = WaypointDTO.builder() .waypointId(wp.getWaypointId()) .latitude(wp.getLatitude()) .longitude(wp.getLongitude()) - .status(wp.getStatus().name()) + .status(wp.getStatus()) .build(); waypoints.add(waypoint); } + } else { + System.out.println("[DEBUG] - waypoints is null"); } + System.out.println("[DEBUG] - final waypoints size: " + waypoints.size()); VehicleTaskDTO task = VehicleTaskDTO.builder() .vehicleId(vehicleId) .vehicleType(vehicleType) .missionId(missionId) - .missionType(vehicle.getMissionType()) + .missionType(currentMission.getMissionType()) .missionStatus(missionStatus) - .missionStartTime(vehicle.getMissionStartTime()) - .estimatedEndTime(vehicle.getEstimatedEndTime()) - .progress(vehicle.getProgress()) - .totalMileage(vehicle.getTotalMileage()) + .missionStartTime(currentMission.getStartTime()) + .estimatedEndTime(currentMission.getEstimatedEndTime()) + .progress(currentMission.getProgress()) + .totalMileage(currentMission.getTotalMileage()) .waypoints(waypoints) - .lastSeenAt(null) // 暂时设为null,后续可以从缓存获取 + .lastSeenAt(cacheEntry.getTimestamp()) .build(); allTasks.add(task); } + System.out.println("[DEBUG] Total tasks found: " + allTasks.size()); + System.out.println("[DEBUG] ========================================="); + // 按任务开始时间倒序排序 allTasks.sort((a, b) -> { if (a.getMissionStartTime() == null && b.getMissionStartTime() == null) return 0; @@ -136,29 +182,41 @@ public class VehicleTaskController { @GetMapping("/{vehicleId}") public AjaxResult get(@PathVariable String vehicleId) { - Map activeMovingObjectsCache = dataCollectorService.getActiveMovingObjectsCache(); - MovingObject movingObject = activeMovingObjectsCache.get(vehicleId); + // 从 HTTP 缓存获取车辆状态数据(使用正确的 cacheKey 格式) + String cacheKey = "universal_status_" + vehicleId; + DataCollectorService.UniversalVehicleStatusCacheEntry entry = dataCollectorService.getUniversalStatusCache().get(cacheKey); - if (!(movingObject instanceof UnmannedVehicle vehicle)) { - return AjaxResult.error(HttpStatus.NOT_FOUND, "车辆不存在或无任务").put("timestamp", System.currentTimeMillis()); + if (entry == null || entry.getStatusData() == null) { + return AjaxResult.error(HttpStatus.NOT_FOUND, "车辆不存在或无数据").put("timestamp", System.currentTimeMillis()); } - String missionId = vehicle.getMissionId(); + com.qaup.collision.datacollector.model.dto.UniversalVehicleStatusDTO statusData = entry.getStatusData(); + com.qaup.collision.datacollector.model.dto.MissionContextDTO missionContext = statusData.getMissionContext(); + + if (missionContext == null || missionContext.getCurrentMission() == null) { + return AjaxResult.error(HttpStatus.NOT_FOUND, "车辆无任务").put("timestamp", System.currentTimeMillis()); + } + + com.qaup.collision.datacollector.model.dto.MissionContextDTO.CurrentMissionDTO currentMission = missionContext.getCurrentMission(); + String missionId = currentMission.getMissionId(); if (missionId == null || missionId.isBlank()) { return AjaxResult.error(HttpStatus.NOT_FOUND, "车辆无任务").put("timestamp", System.currentTimeMillis()); } - String vehicleType = vehicle.getObjectType().name(); + String vehicleType = "UNMANNED_VEHICLE"; + String missionStatus = statusData.getOperationalStatus() != null + ? statusData.getOperationalStatus().getOperationalMode() + : "UNKNOWN"; List waypoints = new ArrayList<>(); - if (vehicle.getWaypoints() != null) { - for (UnmannedVehicle.WaypointInfo wp : vehicle.getWaypoints()) { + if (missionContext.getWaypoints() != null) { + for (com.qaup.collision.datacollector.model.dto.MissionContextDTO.WaypointDTO wp : missionContext.getWaypoints()) { WaypointDTO waypoint = WaypointDTO.builder() .waypointId(wp.getWaypointId()) .latitude(wp.getLatitude()) .longitude(wp.getLongitude()) - .status(wp.getStatus().name()) + .status(wp.getStatus()) .build(); waypoints.add(waypoint); } @@ -168,14 +226,14 @@ public class VehicleTaskController { .vehicleId(vehicleId) .vehicleType(vehicleType) .missionId(missionId) - .missionType(vehicle.getMissionType()) - .missionStatus(vehicle.getMissionStatus() != null ? vehicle.getMissionStatus().name() : "UNKNOWN") - .missionStartTime(vehicle.getMissionStartTime()) - .estimatedEndTime(vehicle.getEstimatedEndTime()) - .progress(vehicle.getProgress()) - .totalMileage(vehicle.getTotalMileage()) + .missionType(currentMission.getMissionType()) + .missionStatus(missionStatus) + .missionStartTime(currentMission.getStartTime()) + .estimatedEndTime(currentMission.getEstimatedEndTime()) + .progress(currentMission.getProgress()) + .totalMileage(currentMission.getTotalMileage()) .waypoints(waypoints) - .lastSeenAt(null) // 暂时设为null + .lastSeenAt(entry.getTimestamp()) .build(); return AjaxResult.success(task).put("timestamp", System.currentTimeMillis()); diff --git a/qaup-collision/src/main/java/com/qaup/collision/datacollector/model/dto/MissionContextDTO.java b/qaup-collision/src/main/java/com/qaup/collision/datacollector/model/dto/MissionContextDTO.java index 3b75b33..5ae8e12 100644 --- a/qaup-collision/src/main/java/com/qaup/collision/datacollector/model/dto/MissionContextDTO.java +++ b/qaup-collision/src/main/java/com/qaup/collision/datacollector/model/dto/MissionContextDTO.java @@ -76,6 +76,7 @@ public class MissionContextDTO { @NoArgsConstructor @AllArgsConstructor @Builder + @com.fasterxml.jackson.annotation.JsonIgnoreProperties(ignoreUnknown = true) public static class WaypointDTO { /** * 路径点ID @@ -96,5 +97,31 @@ public class MissionContextDTO { * 状态 (PENDING, COMPLETED, SKIPPED) */ private String status; + + /** + * X坐标 (经度) + */ + @com.fasterxml.jackson.annotation.JsonProperty("x") + private Double x; + + /** + * Y坐标 (纬度) + */ + @com.fasterxml.jackson.annotation.JsonProperty("y") + private Double y; + + /** + * 获取纬度,如果为null则返回x值 + */ + public Double getLatitude() { + return latitude != null ? latitude : x; + } + + /** + * 获取经度,如果为null则返回y值 + */ + public Double getLongitude() { + return longitude != null ? longitude : y; + } } } \ No newline at end of file diff --git a/qaup-collision/src/main/java/com/qaup/collision/datacollector/server/TrafficLightMqttSubscriber.java b/qaup-collision/src/main/java/com/qaup/collision/datacollector/server/TrafficLightMqttSubscriber.java new file mode 100644 index 0000000..b843603 --- /dev/null +++ b/qaup-collision/src/main/java/com/qaup/collision/datacollector/server/TrafficLightMqttSubscriber.java @@ -0,0 +1,220 @@ +package com.qaup.collision.datacollector.server; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.qaup.collision.websocket.handler.CollisionWebSocketHandler; +import com.qaup.collision.websocket.handler.VehicleCommandInfoWebSocketHandler; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * 红绿灯 MQTT 订阅者 + * + * 通过 MQTT 协议订阅红绿灯设备的消息 + * 与 TCP 方式并存,提供另一种数据接收方式 + */ +@Slf4j +@Component +public class TrafficLightMqttSubscriber { + + @Value("${traffic.light.mqtt.enabled:false}") + private boolean mqttEnabled; + + @Value("${traffic.light.mqtt.broker}") + private String broker; + + @Value("${traffic.light.mqtt.client-id}") + private String clientId; + + @Value("${traffic.light.mqtt.topic}") + private String topic; + + private final MqttConnectOptions mqttConnectOptions; + private final CollisionWebSocketHandler collisionWebSocketHandler; + private final VehicleCommandInfoWebSocketHandler vehicleCommandInfoWebSocketHandler; + private final ObjectMapper objectMapper; + + private MqttClient mqttClient; + private final AtomicBoolean connected = new AtomicBoolean(false); + + // 统计信息 + private final AtomicLong totalMessages = new AtomicLong(0); + private final AtomicLong errorCount = new AtomicLong(0); + + /** + * 构造函数 + * + * @param mqttConnectOptions MQTT 连接选项 + * @param collisionWebSocketHandler WebSocket 处理器,用于向前端广播消息 + * @param vehicleCommandInfoWebSocketHandler 车辆控制指令 WebSocket 处理器 + * @param objectMapper JSON 解析器 + */ + @Autowired + public TrafficLightMqttSubscriber(MqttConnectOptions mqttConnectOptions, + CollisionWebSocketHandler collisionWebSocketHandler, + VehicleCommandInfoWebSocketHandler vehicleCommandInfoWebSocketHandler, + ObjectMapper objectMapper) { + this.mqttConnectOptions = mqttConnectOptions; + this.collisionWebSocketHandler = collisionWebSocketHandler; + this.vehicleCommandInfoWebSocketHandler = vehicleCommandInfoWebSocketHandler; + this.objectMapper = objectMapper; + } + + /** + * 初始化 MQTT 订阅者 + */ + @PostConstruct + public void init() { + if (mqttEnabled) { + connectAndSubscribe(); + } else { + log.info("🚦 MQTT 红绿灯订阅已禁用"); + } + } + + /** + * 连接 MQTT Broker 并订阅主题 + */ + public void connectAndSubscribe() { + try { + log.info("🚦 正在连接 MQTT Broker: {}", broker); + + // 创建 MQTT 客户端 + mqttClient = new MqttClient(broker, clientId); + + // 设置回调 + mqttClient.setCallback(new MqttCallback() { + @Override + public void connectionLost(Throwable cause) { + connected.set(false); + log.error("❌ MQTT 连接断开: {}", cause.getMessage()); + errorCount.incrementAndGet(); + } + + @Override + public void messageArrived(String topic, MqttMessage message) { + String payload = new String(message.getPayload(), StandardCharsets.UTF_8); + totalMessages.incrementAndGet(); + + log.info("🚦 [MQTT] 收到红绿灯消息"); + log.info("🚦 [MQTT] Topic: {}", topic); + log.info("🚦 [MQTT] Payload: {}", payload); + + // 通过 WebSocket 广播消息给前端 + try { + String websocketMessage = String.format( + "{\"type\":\"trafficLight\",\"source\":\"mqtt\",\"topic\":\"%s\",\"payload\":%s,\"timestamp\":%d}", + topic, payload, System.currentTimeMillis() + ); + collisionWebSocketHandler.broadcastMessage(websocketMessage); + log.debug("🚦 [MQTT] 消息已通过 WebSocket 广播到前端"); + } catch (Exception e) { + log.error("🚦 [MQTT] WebSocket 广播失败: {}", e.getMessage(), e); + } + + // 解析 phaseColor 并发送车辆控制指令 + try { + JsonNode jsonNode = objectMapper.readTree(payload); + + // 从 serviceData.phases 数组中获取 phaseColor + JsonNode serviceDataNode = jsonNode.get("serviceData"); + if (serviceDataNode != null) { + JsonNode phasesNode = serviceDataNode.get("phases"); + if (phasesNode != null && phasesNode.isArray() && phasesNode.size() > 0) { + // 获取第一个相位的 phaseColor + JsonNode firstPhase = phasesNode.get(0); + JsonNode phaseColorNode = firstPhase.get("phaseColor"); + + if (phaseColorNode != null && phaseColorNode.isInt()) { + int phaseColor = phaseColorNode.asInt(); + log.info("🚦 [MQTT] 解析到 phaseColor: {}", phaseColor); + + // phaseColor 为 1 时发送 GREEN,为 3 时发送 RED + if (phaseColor == 1 || phaseColor == 3) { + vehicleCommandInfoWebSocketHandler.sendSignalState(phaseColor); + log.info("🚦 [MQTT] 已发送车辆控制指令: phaseColor={}", phaseColor); + } else { + log.warn("🚦 [MQTT] 未知的 phaseColor 值: {},仅支持 1 (GREEN) 或 3 (RED)", phaseColor); + } + } else { + log.warn("🚦 [MQTT] phases[0] 中未找到 phaseColor 字段或格式不正确"); + } + } else { + log.warn("🚦 [MQTT] serviceData 中未找到 phases 数组或数组为空"); + } + } else { + log.warn("🚦 [MQTT] 消息中未找到 serviceData 字段"); + } + } catch (Exception e) { + log.error("🚦 [MQTT] 解析 phaseColor 或发送控制指令失败: {}", e.getMessage(), e); + } + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + // QOS 1/2 消息发送完成回调(发布者使用,订阅者不需要) + } + }); + + // 连接到 Broker + mqttClient.connect(mqttConnectOptions); + connected.set(true); + + // 订阅主题 + mqttClient.subscribe(topic, 0); // QOS 0 - 最多一次 + + log.info("✅ MQTT 红绿灯订阅已连接,订阅 Topic: {}", topic); + + } catch (MqttException e) { + log.error("❌ MQTT 连接失败: {}", e.getMessage(), e); + connected.set(false); + errorCount.incrementAndGet(); + } + } + + /** + * 断开 MQTT 连接 + */ + @PreDestroy + public void disconnect() { + if (mqttClient != null && connected.get()) { + try { + log.info("🚦 正在断开 MQTT 连接..."); + mqttClient.disconnect(); + mqttClient.close(); + connected.set(false); + log.info("✅ MQTT 连接已断开"); + } catch (MqttException e) { + log.error("❌ MQTT 断开连接失败: {}", e.getMessage(), e); + } + } + } + + /** + * 获取连接状态 + * + * @return 是否已连接 + */ + public boolean isConnected() { + return connected.get(); + } + + /** + * 获取统计信息 + * + * @return 统计信息字符串 + */ + public String getStatistics() { + return String.format("MQTT订阅状态 - 连接:%s, 总消息:%d, 错误:%d", + connected.get() ? "是" : "否", totalMessages.get(), errorCount.get()); + } +} \ No newline at end of file diff --git a/qaup-collision/src/main/java/com/qaup/collision/datacollector/service/DataCollectorService.java b/qaup-collision/src/main/java/com/qaup/collision/datacollector/service/DataCollectorService.java index 76945bb..f6e9bbd 100644 --- a/qaup-collision/src/main/java/com/qaup/collision/datacollector/service/DataCollectorService.java +++ b/qaup-collision/src/main/java/com/qaup/collision/datacollector/service/DataCollectorService.java @@ -643,12 +643,29 @@ public class DataCollectorService { // 提取路径点信息 if (missionContext.getWaypoints() != null) { java.util.List waypoints = missionContext.getWaypoints().stream() - .map(wp -> UnmannedVehicle.WaypointInfo.builder() - .waypointId(wp.getWaypointId()) - .latitude(wp.getLatitude()) - .longitude(wp.getLongitude()) - .status(UnmannedVehicle.WaypointStatus.valueOf(wp.getStatus())) - .build()) + .map(wp -> { + // 安全处理状态字段,避免 null 导致 NPE + UnmannedVehicle.WaypointStatus status = UnmannedVehicle.WaypointStatus.PENDING; + if (wp.getStatus() != null && !wp.getStatus().isEmpty()) { + try { + status = UnmannedVehicle.WaypointStatus.valueOf(wp.getStatus()); + } catch (IllegalArgumentException e) { + // 如果状态值无效,使用默认值 + status = UnmannedVehicle.WaypointStatus.PENDING; + } + } + + // x 对应 latitude(纬度),y 对应 longitude(经度) + Double lat = wp.getX() != null ? wp.getX() : wp.getLatitude(); + Double lon = wp.getY() != null ? wp.getY() : wp.getLongitude(); + + return UnmannedVehicle.WaypointInfo.builder() + .waypointId(wp.getWaypointId()) + .latitude(lat) + .longitude(lon) + .status(status) + .build(); + }) .collect(java.util.stream.Collectors.toList()); vehicleBuilder.waypoints(waypoints); } @@ -809,21 +826,47 @@ public class DataCollectorService { // 如果存在missionContext数据,更新到activeMovingObjectsCache中的无人车对象 if (statusData.getMissionContext() != null) { + log.info("========== 开始更新任务上下文 =========="); + log.info("vehicleId: {}", vehicleId); + log.info("missionContext: {}", statusData.getMissionContext()); + log.info("waypoints: {}", statusData.getMissionContext().getWaypoints()); updateUnmannedVehicleMissionContext(vehicleId, statusData.getMissionContext()); + log.info("========== 任务上下文更新完成 =========="); } log.debug("缓存通用车辆状态数据: vehicleId={}, cacheKey={}, 包含任务上下文: {}", vehicleId, cacheKey, statusData.getMissionContext() != null); } - /** - * 更新无人车的任务上下文信息 - */ - private void updateUnmannedVehicleMissionContext(String vehicleId, MissionContextDTO missionContext) { - MovingObject existingObject = activeMovingObjectsCache.get(vehicleId); + /** - if (existingObject instanceof UnmannedVehicle) { - UnmannedVehicle unmannedVehicle = (UnmannedVehicle) existingObject; + * 更新无人车的任务上下文信息 + + */ + + private void updateUnmannedVehicleMissionContext(String vehicleId, MissionContextDTO missionContext) { + + log.info("========== updateUnmannedVehicleMissionContext 开始 =========="); + + log.info("vehicleId: {}", vehicleId); + + log.info("activeMovingObjectsCache 是否包含 vehicleId: {}", activeMovingObjectsCache.containsKey(vehicleId)); + + + + MovingObject existingObject = activeMovingObjectsCache.get(vehicleId); + + log.info("existingObject: {}", existingObject); + + log.info("existingObject 类型: {}", existingObject != null ? existingObject.getClass().getName() : "null"); + + + + if (existingObject instanceof UnmannedVehicle) { + + log.info("找到 UnmannedVehicle 对象,开始更新任务上下文"); + + UnmannedVehicle unmannedVehicle = (UnmannedVehicle) existingObject; // 更新任务上下文信息 if (missionContext.getCurrentMission() != null) { @@ -838,15 +881,38 @@ public class DataCollectorService { // 更新路径点信息 if (missionContext.getWaypoints() != null) { + log.info("更新路径点信息: vehicleId={}, waypoints数量={}", vehicleId, missionContext.getWaypoints().size()); java.util.List waypoints = missionContext.getWaypoints().stream() - .map(wp -> UnmannedVehicle.WaypointInfo.builder() - .waypointId(wp.getWaypointId()) - .latitude(wp.getLatitude()) - .longitude(wp.getLongitude()) - .status(UnmannedVehicle.WaypointStatus.valueOf(wp.getStatus())) - .build()) + .map(wp -> { + // 安全处理状态字段,避免 null 导致 NPE + UnmannedVehicle.WaypointStatus status = UnmannedVehicle.WaypointStatus.PENDING; + if (wp.getStatus() != null && !wp.getStatus().isEmpty()) { + try { + status = UnmannedVehicle.WaypointStatus.valueOf(wp.getStatus()); + } catch (IllegalArgumentException e) { + // 如果状态值无效,使用默认值 + status = UnmannedVehicle.WaypointStatus.PENDING; + } + } + + // x 对应 latitude(纬度),y 对应 longitude(经度) + Double lat = wp.getX() != null ? wp.getX() : wp.getLatitude(); + Double lon = wp.getY() != null ? wp.getY() : wp.getLongitude(); + + log.info(" 路径点: waypointId={}, x={}, y={}, latitude={}, longitude={}, status={}", + wp.getWaypointId(), wp.getX(), wp.getY(), lat, lon, wp.getStatus()); + return UnmannedVehicle.WaypointInfo.builder() + .waypointId(wp.getWaypointId()) + .latitude(lat) + .longitude(lon) + .status(status) + .build(); + }) .collect(java.util.stream.Collectors.toList()); unmannedVehicle.setWaypoints(waypoints); + log.info("路径点更新完成: vehicleId={}, 设置的路径点数量={}", vehicleId, waypoints.size()); + } else { + log.debug("路径点为空: vehicleId={}", vehicleId); } // 更新缓存 @@ -860,6 +926,7 @@ public class DataCollectorService { } else { log.debug("缓存中未找到无人车对象或类型不匹配,跳过任务上下文更新: vehicleId={}", vehicleId); } + log.info("========== updateUnmannedVehicleMissionContext 完成 =========="); } // 用于存储通用车辆状态数据的缓存 diff --git a/qaup-collision/src/main/java/com/qaup/collision/websocket/config/WebSocketConfig.java b/qaup-collision/src/main/java/com/qaup/collision/websocket/config/WebSocketConfig.java index bbe2eff..ff07a4c 100644 --- a/qaup-collision/src/main/java/com/qaup/collision/websocket/config/WebSocketConfig.java +++ b/qaup-collision/src/main/java/com/qaup/collision/websocket/config/WebSocketConfig.java @@ -7,6 +7,7 @@ import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; import com.qaup.collision.websocket.handler.CollisionWebSocketHandler; +import com.qaup.collision.websocket.handler.VehicleCommandInfoWebSocketHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,9 +25,12 @@ public class WebSocketConfig implements WebSocketConfigurer { private static final Logger logger = LoggerFactory.getLogger(WebSocketConfig.class); private final CollisionWebSocketHandler collisionWebSocketHandler; + private final VehicleCommandInfoWebSocketHandler vehicleCommandInfoWebSocketHandler; - public WebSocketConfig(CollisionWebSocketHandler collisionWebSocketHandler) { + public WebSocketConfig(CollisionWebSocketHandler collisionWebSocketHandler, + VehicleCommandInfoWebSocketHandler vehicleCommandInfoWebSocketHandler) { this.collisionWebSocketHandler = collisionWebSocketHandler; + this.vehicleCommandInfoWebSocketHandler = vehicleCommandInfoWebSocketHandler; logger.info("🚀 WebSocket配置类初始化..."); } @@ -37,10 +41,14 @@ public class WebSocketConfig implements WebSocketConfigurer { // 注册冲突检测WebSocket端点 registry.addHandler(collisionWebSocketHandler, "/collision") .setAllowedOrigins("*"); // 允许所有来源,生产环境应该限制具体域名 + + // 注册车辆控制指令测试端点 + registry.addHandler(vehicleCommandInfoWebSocketHandler, "/VehicleCommandInfo") + .setAllowedOrigins("*"); logger.info("✅ WebSocket端点注册完成"); - logger.info("🎯 端点路径: /collision"); + logger.info("🎯 端点路径: /collision, /VehicleCommandInfo"); logger.info("🌐 允许的来源: *"); - logger.info("📡 WebSocket服务可用: ws://localhost:8080/collision"); + logger.info("📡 WebSocket服务可用: ws://localhost:8080/collision, ws://localhost:8080/VehicleCommandInfo"); } } \ No newline at end of file diff --git a/qaup-collision/src/main/java/com/qaup/collision/websocket/handler/VehicleCommandInfoWebSocketHandler.java b/qaup-collision/src/main/java/com/qaup/collision/websocket/handler/VehicleCommandInfoWebSocketHandler.java new file mode 100644 index 0000000..b524755 --- /dev/null +++ b/qaup-collision/src/main/java/com/qaup/collision/websocket/handler/VehicleCommandInfoWebSocketHandler.java @@ -0,0 +1,147 @@ +package com.qaup.collision.websocket.handler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.lang.NonNull; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.WebSocketMessage; +import org.springframework.web.socket.WebSocketSession; + +import java.io.IOException; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +/** + * WebSocket 车辆控制指令端点 + * + * 客户端可通过 ws://:8080/VehicleCommandInfo 连接, + * 服务端根据红绿灯信号状态向所有连接会话发送车辆控制指令。 + */ +@Component +public class VehicleCommandInfoWebSocketHandler implements WebSocketHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(VehicleCommandInfoWebSocketHandler.class); + + private final Map sessions = new ConcurrentHashMap<>(); + + @Override + public void afterConnectionEstablished(@NonNull WebSocketSession session) { + String sessionId = session.getId(); + sessions.put(sessionId, session); + LOGGER.info("VehicleCommandInfo WebSocket 连接建立, sessionId={}", sessionId); + } + + @Override + public void handleMessage(@NonNull WebSocketSession session, @NonNull WebSocketMessage message) { + // 简单回显客户端消息,主要用于连通性测试 + LOGGER.debug("VehicleCommandInfo 收到客户端消息, sessionId={}, payload={}", + session.getId(), message.getPayload()); + } + + @Override + public void handleTransportError(@NonNull WebSocketSession session, @NonNull Throwable exception) { + LOGGER.warn("VehicleCommandInfo WebSocket 传输错误, sessionId={}", session.getId(), exception); + cleanupSession(session, CloseStatus.SERVER_ERROR); + } + + @Override + public void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull CloseStatus closeStatus) { + LOGGER.info("VehicleCommandInfo WebSocket 连接关闭, sessionId={}, status={}", session.getId(), closeStatus); + cleanupSession(session, closeStatus); + } + + @Override + public boolean supportsPartialMessages() { + return false; + } + + /** + * 发送信号状态给所有连接的客户端 + * + * @param phaseColor 信号颜色值:1=green, 3=red + */ + public void sendSignalState(int phaseColor) { + String signalState = (phaseColor == 1) ? "GREEN" : "RED"; + + String commandJson = String.format( + "{" + + "\"messageUniqueId\": \"%s\"," + + "\"timestamp\": %d," + + "\"vehicleID\": \"AET01\"," + + "\"commandType\": \"SIGNAL\"," + + "\"commandReason\": \"TRAFFIC_LIGHT\"," + + "\"signalState\":\"%s\"," + + "\"intersectionId\":\"002\"," + + "\"latitude\": 343.23," + + "\"longitude\": 343.23," + + "\"relativeSpeed\": 3," + + "\"relativeMotionX\": 2002.12," + + "\"relativeMotionY\":100.12," + + "\"minDistance\":10.5" + + "}", + UUID.randomUUID().toString(), + System.currentTimeMillis(), + signalState + ); + + LOGGER.info("🚦 [VehicleCommandInfo] 发送车辆控制指令: phaseColor={}, signalState={}", phaseColor, signalState); + LOGGER.info("🚦 [VehicleCommandInfo] 当前连接数: {}", sessions.size()); + + if (sessions.isEmpty()) { + LOGGER.warn("🚦 [VehicleCommandInfo] 没有客户端连接,无法发送消息"); + return; + } + + // 向所有连接的客户端发送 + sessions.values().forEach(session -> sendMessage(session, commandJson)); + LOGGER.info("🚦 [VehicleCommandInfo] 已向 {} 个客户端发送消息", sessions.size()); + } + + /** + * 发送消息给指定会话 + */ + private void sendMessage(WebSocketSession session, String message) { + if (session == null || !session.isOpen()) { + return; + } + try { + synchronized (session) { + if (session.isOpen()) { + session.sendMessage(new TextMessage(message)); + } + } + } catch (IOException e) { + LOGGER.warn("向 sessionId={} 发送车辆控制指令 JSON 失败: {}", session.getId(), e.getMessage()); + cleanupSession(session, CloseStatus.SESSION_NOT_RELIABLE); + } catch (Exception e) { + LOGGER.warn("向 sessionId={} 发送车辆控制指令 JSON 时发生异常", session.getId(), e); + cleanupSession(session, CloseStatus.SERVER_ERROR); + } + } + + /** + * 清理会话 + */ + private void cleanupSession(WebSocketSession session, CloseStatus status) { + String sessionId = session.getId(); + sessions.remove(sessionId); + try { + if (session.isOpen()) { + session.close(status); + } + } catch (IOException e) { + LOGGER.debug("关闭 sessionId={} 时出现异常: {}", sessionId, e.getMessage()); + } + } + + /** + * 获取当前连接数 + */ + public int getConnectionCount() { + return sessions.size(); + } +} diff --git a/qaup-framework/src/main/java/com/qaup/framework/config/SecurityConfig.java b/qaup-framework/src/main/java/com/qaup/framework/config/SecurityConfig.java index ac6f029..62794f0 100644 --- a/qaup-framework/src/main/java/com/qaup/framework/config/SecurityConfig.java +++ b/qaup-framework/src/main/java/com/qaup/framework/config/SecurityConfig.java @@ -112,7 +112,7 @@ public class SecurityConfig // 对于登录login 注册register 验证码captchaImage 允许匿名访问 requests.requestMatchers("/login", "/register", "/captchaImage").permitAll() // WebSocket端点,允许匿名访问 - .requestMatchers("/collision", "/collision/**", "/test/websocket/**").permitAll() + .requestMatchers("/collision", "/collision/**", "/VehicleCommandInfo", "/test/websocket/**").permitAll() // 静态资源,可匿名访问 .requestMatchers(HttpMethod.GET, "/", "/*.html", "/**.html", "/**.css", "/**.js", "/profile/**").permitAll() .requestMatchers("/swagger-ui.html", "/v3/api-docs/**", "/swagger-ui/**", "/druid/**").permitAll() diff --git a/test_websocket.html b/test_websocket.html new file mode 100644 index 0000000..cbee6e3 --- /dev/null +++ b/test_websocket.html @@ -0,0 +1,163 @@ + + + + + WebSocket 测试 + + + +

WebSocket 连接测试

+ +
+

测试 1: /collision 端点

+ + +
状态: 未连接
+
+
+ +
+

测试 2: /VehicleCommandInfo 端点

+ + +
状态: 未连接
+
+
+ + + + \ No newline at end of file diff --git a/接口文档3.md b/接口文档3.md new file mode 100644 index 0000000..b2cf6f8 --- /dev/null +++ b/接口文档3.md @@ -0,0 +1,28 @@ +车辆管理系统接口文档 +1. 查询车辆列表 +接口描述 +该接口用于查询项目车辆列表数据。 + +接口详情 +协议名称: HTTP + +接口地址: /api/vehicle_details + +请求方法: GET + +请求参数: 无 +参数名称,参数类型,描述 +code,int,状态码 +msg,str,任务信息 +data,str,车辆列表 +timestamp,float,时间戳 + +{ + "code": 200, + "message": "success", + "data": [ + "AET02", + "AET01" + ], + "timestamp": 0 +} \ No newline at end of file