diff --git a/qaup-collision/src/main/java/com/qaup/collision/common/config/SchedulerConfig.java b/qaup-collision/src/main/java/com/qaup/collision/common/config/SchedulerConfig.java index 5eaed4f..3c5d708 100644 --- a/qaup-collision/src/main/java/com/qaup/collision/common/config/SchedulerConfig.java +++ b/qaup-collision/src/main/java/com/qaup/collision/common/config/SchedulerConfig.java @@ -15,10 +15,12 @@ public class SchedulerConfig { @Bean public ThreadPoolTaskScheduler taskScheduler() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); - // 设置线程池大小,根据需求调整 - scheduler.setPoolSize(3); + // 定时任务较多(采集/处理/推送等),避免线程不足导致任务堆积 + scheduler.setPoolSize(Math.max(4, Runtime.getRuntime().availableProcessors())); // 设置线程名称前缀 scheduler.setThreadNamePrefix("ScheduledTask-"); + // 取消任务时及时从队列移除,降低堆积风险 + scheduler.setRemoveOnCancelPolicy(true); return scheduler; } } diff --git a/qaup-collision/src/main/java/com/qaup/collision/datacollector/service/DataCollectorService.java b/qaup-collision/src/main/java/com/qaup/collision/datacollector/service/DataCollectorService.java index 3235621..76945bb 100644 --- a/qaup-collision/src/main/java/com/qaup/collision/datacollector/service/DataCollectorService.java +++ b/qaup-collision/src/main/java/com/qaup/collision/datacollector/service/DataCollectorService.java @@ -115,6 +115,17 @@ public class DataCollectorService { // 用于缓存所有活跃的MovingObject的最新状态 private final Map activeMovingObjectsCache = new ConcurrentHashMap<>(); + // 记录每个对象最近一次更新的时间戳,用于淘汰不活跃对象(防止内存长期增长) + private final Map activeMovingObjectsLastUpdatedAtMs = new ConcurrentHashMap<>(); + + private void upsertActiveMovingObject(MovingObject movingObject) { + if (movingObject == null || movingObject.getObjectId() == null) { + return; + } + activeMovingObjectsCache.put(movingObject.getObjectId(), movingObject); + activeMovingObjectsLastUpdatedAtMs.put(movingObject.getObjectId(), System.currentTimeMillis()); + } + // 无人车ID列表(来自上游 HTTP:GET /api/vehicle_details) private final Set unmannedVehicleIds = ConcurrentHashMap.newKeySet(); @@ -369,7 +380,7 @@ public class DataCollectorService { vehicle.setCurrentSpeed(null); vehicle.setCurrentHeading(null); - activeMovingObjectsCache.put(vehicleId, vehicle); + upsertActiveMovingObject(vehicle); } vehicleManagerCacheService.updateVehiclePosition(vehicleId, dataNode); @@ -435,7 +446,7 @@ public class DataCollectorService { .altitude(aircraft.getAltitude()) .build(); - activeMovingObjectsCache.put(movingObject.getObjectId(), movingObject); + upsertActiveMovingObject(movingObject); log.debug("处理航空器数据并更新缓存: (航班号: {}, 位置: {}, {}, 速度: {})", aircraft.getObjectId(), @@ -521,7 +532,7 @@ public class DataCollectorService { .build(); // 将最新数据更新到缓存(不发送WebSocket消息,统一在周期性检测中发送) - activeMovingObjectsCache.put(movingObject.getObjectId(), movingObject); + upsertActiveMovingObject(movingObject); log.debug("处理机场车辆数据并更新缓存: (车牌号: {}, 位置: {}, {}, 速度: {})", vehicle.getObjectId(), @@ -664,7 +675,7 @@ public class DataCollectorService { UnmannedVehicle enhancedUnmannedVehicle = vehicleBuilder.build(); // 将最新数据更新到缓存 - activeMovingObjectsCache.put(enhancedUnmannedVehicle.getObjectId(), enhancedUnmannedVehicle); + upsertActiveMovingObject(enhancedUnmannedVehicle); // 更新无人车HTTP状态缓存(供HTTP接口查询) unmannedVehicleHttpStatusCache.put(vehicleId, UniversalVehicleStatusCacheEntry.builder() @@ -839,7 +850,7 @@ public class DataCollectorService { } // 更新缓存 - activeMovingObjectsCache.put(vehicleId, unmannedVehicle); + upsertActiveMovingObject(unmannedVehicle); log.debug("更新无人车任务上下文: vehicleId={}, 任务ID={}, 里程={}米, 路径点数量={}", vehicleId, @@ -1103,22 +1114,59 @@ public class DataCollectorService { int removedFlightNotifications = 0; // 清理 activeMovingObjectsCache - // 注意: MovingObject 没有时间戳字段,需要通过外部跟踪或限制缓存大小 - // 这里先保持简单策略:如果缓存超过一定数量,可以考虑清理 int maxCacheSize = 1000; // 最大缓存1000个对象 - if (activeMovingObjectsCache.size() > maxCacheSize) { - log.warn("activeMovingObjectsCache 超过最大限制 {}, 当前: {}", - maxCacheSize, activeMovingObjectsCache.size()); + long cutoff = now - inactiveThreshold; + + int sizeBeforeMoving = activeMovingObjectsCache.size(); + // 按最后更新时间淘汰(同时清理两张表,避免长期增长) + activeMovingObjectsLastUpdatedAtMs.entrySet().removeIf(entry -> { + Long last = entry.getValue(); + if (last == null || last < cutoff) { + activeMovingObjectsCache.remove(entry.getKey()); + return true; + } + return false; + }); + removedMovingObjects = sizeBeforeMoving - activeMovingObjectsCache.size(); + + // 兜底:如果缓存仍超过上限,按最老的lastUpdated继续淘汰(丢旧保实时) + int currentSize = activeMovingObjectsCache.size(); + if (currentSize > maxCacheSize) { + int needRemove = currentSize - maxCacheSize; + activeMovingObjectsLastUpdatedAtMs.entrySet().stream() + .sorted(java.util.Map.Entry.comparingByValue()) + .limit(needRemove) + .map(java.util.Map.Entry::getKey) + .forEach(key -> { + activeMovingObjectsCache.remove(key); + activeMovingObjectsLastUpdatedAtMs.remove(key); + }); + log.warn("activeMovingObjectsCache 超过最大限制 {},已丢弃最旧 {} 条以保持实时性(当前: {})", + maxCacheSize, needRemove, activeMovingObjectsCache.size()); } // 清理 flightNotificationCache - 使用 eventTime 字段判断 -// int sizeBefore = flightNotificationCache.size(); -// flightNotificationCache.entrySet().removeIf(entry -> { -// FlightNotification notification = entry.getValue(); -// long eventTime = (notification.getEventTime() != null) ? notification.getEventTime() : 0; -// return (now - eventTime) > inactiveThreshold; -// }); -// removedFlightNotifications = sizeBefore - flightNotificationCache.size(); + int sizeBeforeFlight = flightNotificationCache.size(); + LocalDateTime receivedCutoff = LocalDateTime.now().minusMinutes(5); + flightNotificationCache.entrySet().removeIf(entry -> { + FlightNotification notification = entry.getValue(); + if (notification == null) { + return true; + } + + // 优先按接收时间淘汰(更可靠) + if (notification.getReceivedTime() != null && notification.getReceivedTime().isBefore(receivedCutoff)) { + return true; + } + + Long eventTime = notification.getEventTime(); + if (eventTime != null && eventTime > 0 && (now - eventTime) > inactiveThreshold) { + return true; + } + + return false; + }); + removedFlightNotifications = sizeBeforeFlight - flightNotificationCache.size(); if (removedMovingObjects > 0 || removedFlightNotifications > 0) { log.info("清理不活跃缓存对象: {} 个移动对象, {} 个航班通知; 剩余: {} 个移动对象, {} 个航班通知", @@ -1137,6 +1185,7 @@ public class DataCollectorService { log.info("DataCollectorService 正在关闭..."); // 清理缓存 activeMovingObjectsCache.clear(); + activeMovingObjectsLastUpdatedAtMs.clear(); flightNotificationCache.clear(); log.info("缓存已清理"); } diff --git a/qaup-collision/src/main/java/com/qaup/collision/datacollector/websocket/AdxpFlightServiceWebSocketClient.java b/qaup-collision/src/main/java/com/qaup/collision/datacollector/websocket/AdxpFlightServiceWebSocketClient.java index 7c144a4..a524fc7 100644 --- a/qaup-collision/src/main/java/com/qaup/collision/datacollector/websocket/AdxpFlightServiceWebSocketClient.java +++ b/qaup-collision/src/main/java/com/qaup/collision/datacollector/websocket/AdxpFlightServiceWebSocketClient.java @@ -27,6 +27,9 @@ import java.util.concurrent.atomic.AtomicLong; @Slf4j public class AdxpFlightServiceWebSocketClient implements WebSocketHandler { + + // 单条消息最大字符数(防止分片拼接导致内存无限增长) + private static final int MAX_MESSAGE_CHARS = 2_000_000; private final WebSocketClient webSocketClient; private final FlightSdkProperties properties; @@ -179,8 +182,23 @@ public class AdxpFlightServiceWebSocketClient implements WebSocketHandler { return; } - //每来一片就拼 - partialMessageBuffer.append(textMessage.getPayload()); + // 每来一片就拼(加上上限,避免异常消息打爆内存) + String chunk = textMessage.getPayload(); + if (partialMessageBuffer.length() + chunk.length() > MAX_MESSAGE_CHARS) { + log.warn("ADXP航班通知 WebSocket 收到超大分片消息,已丢弃并重置缓冲: sessionId={}, currentChars={}, incomingChars={}", + session.getId(), partialMessageBuffer.length(), chunk.length()); + partialMessageBuffer.setLength(0); + try { + if (session.isOpen()) { + session.close(CloseStatus.POLICY_VIOLATION); + } + } catch (Exception ignore) { + // ignore + } + return; + } + + partialMessageBuffer.append(chunk); //不是最后一片,直接等 if (!textMessage.isLast()) { @@ -219,7 +237,6 @@ public class AdxpFlightServiceWebSocketClient implements WebSocketHandler { if (!notifications.isEmpty()) { log.info("🛬 接收到 {} 条航班通知", notifications.size()); - log.error("🛬 接收到 {} 条航班通知", notifications.size()); // 通知所有监听器 for (Consumer> listener : messageListeners) { @@ -284,7 +301,7 @@ public class AdxpFlightServiceWebSocketClient implements WebSocketHandler { for (FlightMessage message : flightMessages) { try { - log.error("获取消息类型: serviceCode={}", message.getServiceCode()); + log.debug("获取消息类型: serviceCode={}", message.getServiceCode()); //log.error("获取消息内容: content={}", message.getContent()); FlightNotificationDTO dto = parseXmlMessage(message.getServiceCode(), message.getContent()); if (dto != null) { @@ -295,7 +312,7 @@ public class AdxpFlightServiceWebSocketClient implements WebSocketHandler { errorCount.incrementAndGet(); } } - log.error("notifications={}", notifications); + log.debug("notifications={}", notifications); return notifications; } catch (Exception e) { log.error("解析消息失败", e); diff --git a/qaup-collision/src/main/java/com/qaup/collision/datacollector/websocket/VehicleManagerWebSocketClient.java b/qaup-collision/src/main/java/com/qaup/collision/datacollector/websocket/VehicleManagerWebSocketClient.java index bb497ab..2f090ed 100644 --- a/qaup-collision/src/main/java/com/qaup/collision/datacollector/websocket/VehicleManagerWebSocketClient.java +++ b/qaup-collision/src/main/java/com/qaup/collision/datacollector/websocket/VehicleManagerWebSocketClient.java @@ -27,6 +27,9 @@ import java.util.function.Consumer; @Slf4j public class VehicleManagerWebSocketClient implements WebSocketHandler { + // 单条消息最大字符数(防止分片拼接导致内存无限增长) + private static final int MAX_MESSAGE_CHARS = 2_000_000; + private final WebSocketClient webSocketClient; private final VehicleManagerProperties properties; private final ObjectMapper objectMapper; @@ -119,8 +122,25 @@ public class VehicleManagerWebSocketClient implements WebSocketHandler { return; } - StringBuilder buffer = partialBuffers.computeIfAbsent(session.getId(), key -> new StringBuilder(1024)); - buffer.append(textMessage.getPayload()); + String chunk = textMessage.getPayload(); + String sessionId = session.getId(); + StringBuilder buffer = partialBuffers.computeIfAbsent(sessionId, key -> new StringBuilder(1024)); + + if (buffer.length() + chunk.length() > MAX_MESSAGE_CHARS) { + log.warn("车辆管理 WebSocket 收到超大分片消息,已丢弃并重置缓冲: sessionId={}, currentChars={}, incomingChars={}", + sessionId, buffer.length(), chunk.length()); + partialBuffers.remove(sessionId); + try { + if (session.isOpen()) { + session.close(CloseStatus.POLICY_VIOLATION); + } + } catch (Exception ignore) { + // ignore + } + return; + } + + buffer.append(chunk); if (!textMessage.isLast()) { return; @@ -142,13 +162,37 @@ public class VehicleManagerWebSocketClient implements WebSocketHandler { @Override public void handleTransportError(@NonNull WebSocketSession session, @NonNull Throwable exception) { log.error("车辆管理 WebSocket 传输错误: sessionId={}", session.getId(), exception); + + String sessionId = session.getId(); + partialBuffers.remove(sessionId); + + String key = sessionKeyById.remove(sessionId); + if (key != null) { + sessions.remove(key); + scheduleReconnect(key, resolvePathByKey(key)); + } + + try { + if (session.isOpen()) { + session.close(CloseStatus.SERVER_ERROR); + } + } catch (Exception ignore) { + // ignore + } } @Override public void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull CloseStatus status) { log.warn("车辆管理 WebSocket 连接关闭: sessionId={}, status={}", session.getId(), status); - String key = sessionKeyById.remove(session.getId()); - sessions.values().remove(session); + String sessionId = session.getId(); + partialBuffers.remove(sessionId); + + String key = sessionKeyById.remove(sessionId); + if (key != null) { + sessions.remove(key); + } else { + sessions.values().remove(session); + } if (key != null) { scheduleReconnect(key, resolvePathByKey(key)); } diff --git a/qaup-collision/src/main/java/com/qaup/collision/dataprocessing/service/DataProcessingService.java b/qaup-collision/src/main/java/com/qaup/collision/dataprocessing/service/DataProcessingService.java index 7e5e701..22558ea 100644 --- a/qaup-collision/src/main/java/com/qaup/collision/dataprocessing/service/DataProcessingService.java +++ b/qaup-collision/src/main/java/com/qaup/collision/dataprocessing/service/DataProcessingService.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import com.qaup.collision.datacollector.dao.DataCollectorDao; @@ -94,6 +95,12 @@ public class DataProcessingService { // 从DataCollectorService获取缓存的引用 private Map activeMovingObjectsCache; + /** + * 防止@Scheduled任务重入导致并发执行、消息风暴与数据库压力。 + * 固定频率调度在任务执行时间超过间隔时可能出现并发重入。 + */ + private final AtomicBoolean periodicProcessingInProgress = new AtomicBoolean(false); + /** * 设置活跃对象缓存的引用(由DataCollectorService调用) */ @@ -121,6 +128,14 @@ public class DataProcessingService { return; } + // 丢弃旧轮次:如果上一轮尚未完成,直接跳过本轮以保证实时性 + if (!periodicProcessingInProgress.compareAndSet(false, true)) { + log.warn("周期性数据处理上一轮尚未完成,跳过本轮以避免重入(保持实时性)"); + return; + } + + try { + if (activeMovingObjectsCache == null || activeMovingObjectsCache.isEmpty()) { log.debug("活跃对象缓存为空,跳过数据处理"); return; @@ -153,6 +168,10 @@ public class DataProcessingService { saveUnmannedVehicleDataPeriodically(currentActiveObjects); log.info("周期性数据处理完成"); + + } finally { + periodicProcessingInProgress.set(false); + } } /** diff --git a/qaup-collision/src/main/java/com/qaup/collision/websocket/broadcaster/WebSocketMessageBroadcaster.java b/qaup-collision/src/main/java/com/qaup/collision/websocket/broadcaster/WebSocketMessageBroadcaster.java index c10b22f..9cfc4a7 100644 --- a/qaup-collision/src/main/java/com/qaup/collision/websocket/broadcaster/WebSocketMessageBroadcaster.java +++ b/qaup-collision/src/main/java/com/qaup/collision/websocket/broadcaster/WebSocketMessageBroadcaster.java @@ -26,11 +26,16 @@ import com.qaup.collision.websocket.handler.CollisionWebSocketHandler; import org.springframework.stereotype.Component; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Async; import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; /** @@ -45,6 +50,15 @@ public class WebSocketMessageBroadcaster { private final CollisionWebSocketHandler collisionWebSocketHandler; // 注入实例 private final ObjectMapper objectMapper; // JSON序列化器 + /** + * 高频位置更新做“覆盖式合并”,只保留每个objectId的最新一条,避免消息堆积。 + * flush线程如果跑不过来,下一轮会直接跳过(丢旧保实时)。 + */ + private final Map latestPositionUpdates = new ConcurrentHashMap<>(); + private final AtomicBoolean positionFlushInProgress = new AtomicBoolean(false); + + private record CoalescedPositionUpdate(PositionUpdatePayload payload, long timestamp) {} + public WebSocketMessageBroadcaster(MessageCacheService messageCacheService, CollisionWebSocketHandler collisionWebSocketHandler, @Qualifier("websocketObjectMapper") ObjectMapper objectMapper) { this.messageCacheService = messageCacheService; this.collisionWebSocketHandler = collisionWebSocketHandler; @@ -57,14 +71,64 @@ public class WebSocketMessageBroadcaster { */ @EventListener public void handlePositionUpdate(PositionUpdateEvent event) { - UniversalMessage message = UniversalMessage.builder() - .type(MessageTypeConstants.POSITION_UPDATE) - .timestamp(event.getTimestamp()) - .messageId(generateMessageId()) - .payload((PositionUpdatePayload) event.getPayload()) - .build(); - - broadcastMessageInternal(message); + try { + if (event == null || event.getPayload() == null) { + return; + } + + if (!(event.getPayload() instanceof PositionUpdatePayload payload)) { + return; + } + + if (payload.getObjectId() == null || payload.getObjectId().isBlank()) { + return; + } + + // 覆盖式合并:同一对象只保留最新的 + latestPositionUpdates.put(payload.getObjectId(), new CoalescedPositionUpdate(payload, event.getTimestamp())); + } catch (Exception e) { + log.error("合并位置更新事件失败", e); + } + } + + /** + * 定期刷新合并后的高频位置消息。 + * 丢旧保实时:如果上一轮flush未完成,本轮直接跳过。 + */ + @Scheduled(fixedDelayString = "${websocket.position.flush-interval-ms:250}") + public void flushCoalescedPositionUpdates() { + if (!positionFlushInProgress.compareAndSet(false, true)) { + return; + } + + try { + if (latestPositionUpdates.isEmpty()) { + return; + } + + // 没有客户端时直接清空,避免无意义积压 + if (collisionWebSocketHandler.getConnectionCount() <= 0) { + latestPositionUpdates.clear(); + return; + } + + List snapshot = new ArrayList<>(latestPositionUpdates.values()); + latestPositionUpdates.clear(); + + for (CoalescedPositionUpdate item : snapshot) { + UniversalMessage message = UniversalMessage.builder() + .type(MessageTypeConstants.POSITION_UPDATE) + .timestamp(item.timestamp()) + .messageId(generateMessageId()) + .payload(item.payload()) + .build(); + broadcastMessageInternal(message); + } + } catch (Exception e) { + log.error("刷新合并位置更新失败", e); + } finally { + positionFlushInProgress.set(false); + } } /** diff --git a/前端开发_无人车VehicleManager接口.md b/前端开发_无人车VehicleManager接口.md deleted file mode 100644 index 60e4e5d..0000000 --- a/前端开发_无人车VehicleManager接口.md +++ /dev/null @@ -1,222 +0,0 @@ -# 前端开发文档:无人车 VehicleManager HTTP 接口 - -本文档描述本次新增的前端 HTTP 接口(基于“接口文档1/2”无人车数据整合)。 - -## 1. 基本信息 - -- **服务地址**:`http://localhost:8080` -- **接口前缀**:`/api/vehicle-manager` -- **鉴权**:当前已在后端放开 `permitAll`(无需 token 即可访问该前缀)。后续如恢复鉴权,前端需要按系统登录流程携带 `Authorization`。 -- **返回格式**:RuoYi 风格 `AjaxResult` - - `code`: 200 成功 - - `msg`: 提示信息 - - `data`: 业务数据 - - `timestamp`: 后端附加的时间戳(毫秒) - -## 2. 接口列表 - -### 2.1 车辆统计汇总 - -**GET** `/api/vehicle-manager/vehicles/summary` - -#### Query -- `staleMillis`(可选,默认 `30000`):缓存过期阈值(毫秒)。超过该阈值会被视为离线/不可用。 - -#### Response.data -```json -{ - "totalCount": 10, - "onlineCount": 8, - "offlineCount": 2, - "emergencyCount": 1, - "faultCount": 1, - "unknownCount": 0, - "staleMillis": 30000, - "timestamp": 1730000000000 -} -``` - -#### 说明 -- online 判定:`VehicleLoginStatus.loginStatus == "login"` 且未过期 -- emergency 判定:`VehicleSuspendReport.suspendStatus != 0` 且未过期 -- fault(故障)临时规则: - - `GetFmsMessage.isActive == 1`(字段存在时)或 - - `GetFmsMessage.level >= 4` - ---- - -### 2.2 车辆详情快照(缓存聚合) - -**GET** `/api/vehicle-manager/vehicles/{vehicleId}` - -#### Path -- `vehicleId`:车号(例如 `AET01`) - -#### Response.data -```json -{ - "vehicleId": "AET01", - "details": { "messageName": "VehicleDetails", "vehicleId": "AET01", "vehicleType": "QTRUCK" }, - "loginStatus": { "messageName": "VehicleLoginStatus", "vehicleId": "AET01", "loginStatus": "login" }, - "position": { "messageName": "VehiclePositionInfo", "vehicleId": "AET01", "x": 6.12, "y": 101.70 }, - "chassis": { "messageName": "VehicleChassisInfo", "vehicleId": "AET01", "sys_info": { "state_info": { "d_speed_kmph": 0, "d_battery_available": "0.0" } } }, - "suspend": { "messageName": "VehicleSuspendReport", "vehicleId": "AET01", "suspendStatus": 0 }, - "tailer": { "messageName": "VehicleTailerNum", "vehicleId": "AET01", "tailerNum": "0" }, - "order": { "messageName": "VehicleOrderInfo", "vehicleId": "AET01", "businessKey": "1767..." }, - "path": { "messageName": "NaviShortPathReport", "vehicleId": "AET01", "path": [ {"x": 1, "y": 2, "theta": 0} ] }, - "fmsMessage": { "messageName": "GetFmsMessage", "vehicleID": "AET01", "level": 4, "description": "..." }, - "lastSeenAt": 1730000000000 -} -``` - -#### 说明 -- 该接口只读后端内存缓存,不会额外打上游 HTTP。 -- 字段可能为 `null`(例如 WS 尚未推送/缓存未命中)。 - ---- - -### 2.3 电池信息(通过接口文档2上游 HTTP 获取后转发) - -**GET** `/api/vehicle-manager/vehicles/{vehicleId}/battery` - -#### 行为 -- 后端会调用上游(接口文档2):`POST /api/vehicle_manager/v1/vehicles/{vehicleId}/status` -- 并把其中的 `batteryStatus` 转发给前端 - -#### Response.data -```json -{ - "vehicleId": "AET01", - "batteryStatus": { - "mainBattery": { - "chargeLevel": 0.0, - "voltage": 0.0, - "current": 0.0, - "temperature": 0.0, - "chargingStatus": "DISCHARGING" - } - }, - "source": "vehicle_manager_http", - "upstreamTimestamp": 1767838334301.7153 -} -``` - -#### 说明 -- 上游不可达或无数据时,后端会返回默认结构(voltage/current/temperature=0,chargingStatus=DISCHARGING)。 - ---- - -### 2.4 车辆路径 - -**GET** `/api/vehicle-manager/vehicles/{vehicleId}/path` - -#### Response.data -```json -{ - "vehicleId": "AET01", - "rawPath": [ - { "x": 6.12613, "y": 101.70456, "theta": -1.52245 } - ], - "waypoints": [ - { "waypointId": "1", "longitude": 6.12613, "latitude": 101.70456, "status": "PENDING" } - ], - "source": "ws_cache" -} -``` - -#### 说明 -- `source=ws_cache`:来自文档1 `NaviShortPathReport.path` -- `source=http_fallback`:WS 缓存没有路径时,后端会走聚合服务兜底(从 `missionContext.waypoints` 返回) - ---- - -### 2.5 充电桩(占位接口) - -**GET** `/api/vehicle-manager/charging-stations` - -#### Response.data -```json -[] -``` - ---- - -## 3. 任务 CRUD(内存,不落库) - -> 注意:这是“前端任务管理/任务库”的 CRUD,数据仅存在于后端内存中,服务重启会清空。 - -### 3.1 列表 - -**GET** `/api/vehicle-manager/tasks?pageNum=1&pageSize=10` - -#### Response.data -```json -{ - "total": 1, - "pageNum": 1, - "pageSize": 10, - "rows": [ - { - "id": 1, - "name": "测试任务1", - "type": "CARGO_TRANSPORT", - "priority": 3, - "payload": { "note": "from frontend" }, - "createdAt": 1730000000000, - "updatedAt": 1730000000000 - } - ] -} -``` - -### 3.2 创建 - -**POST** `/api/vehicle-manager/tasks` - -Body: -```json -{ - "name": "测试任务1", - "type": "CARGO_TRANSPORT", - "priority": 3, - "payload": { - "note": "from frontend" - } -} -``` - -### 3.3 详情 - -**GET** `/api/vehicle-manager/tasks/{id}` - -### 3.4 更新 - -**PUT** `/api/vehicle-manager/tasks/{id}` - -Body(同创建结构): -```json -{ - "name": "测试任务1-已更新", - "type": "CARGO_TRANSPORT", - "priority": 5, - "payload": { "note": "updated" } -} -``` - -### 3.5 删除(支持批量) - -**DELETE** `/api/vehicle-manager/tasks/{ids}` - -示例: -- 删除单个:`/api/vehicle-manager/tasks/1` -- 删除多个:`/api/vehicle-manager/tasks/1,2,3` - ---- - -## 4. 前端接入建议 - -1) 页面初始化先调:`/vehicles/summary` 拿到统计。 -2) 点击某辆车再调:`/vehicles/{vehicleId}` 拿快照。 -3) 电池面板单独调:`/vehicles/{vehicleId}/battery`(该接口会请求上游,避免高频轮询)。 -4) 路径面板调:`/vehicles/{vehicleId}/path`。 -5) 任务管理页面直接使用 `/tasks` CRUD。 diff --git a/命令.md b/命令.md new file mode 100644 index 0000000..9a7f1e9 --- /dev/null +++ b/命令.md @@ -0,0 +1,9 @@ +### 命令 +``` +mvn -pl qaup-admin -am package -DskipTests +``` + +``` +#测试本地 +java "-Dfile.encoding=GBK" -jar "qaup-admin\target\qaup-admin.jar" --spring.profiles.active=dev,druid +``` \ No newline at end of file diff --git a/实施方案_通用无人车状态API替换.md b/实施方案_通用无人车状态API替换.md deleted file mode 100644 index 5875903..0000000 --- a/实施方案_通用无人车状态API替换.md +++ /dev/null @@ -1,129 +0,0 @@ -# 通用无人车状态 API 数据源替换实施说明 - -## 目标 -将现有“通用无人车状态 API(universal_autonomous_vehicle_api)”的数据来源替换为: -- **接口文档1**(WebSocket 推送,/ws/at_manager、/ws/at_manager_bsm、/ws/at_manager_path) -- **接口文档2**(HTTP 获取,/api/vehicle_manager/v1/vehicles/{vehicleId}/status) - -同时保持前端使用的 **现有 WebSocket 推送通道与消息结构不变**(复用 `vehicle_status_update`)。 - -## 现状要点(便于对照) -1. `DataCollectorService` 调用 `DataCollectorDao.getUniversalVehicleStatus()` 拉取状态数据。 -2. `DataProcessingService.processUniversalVehicleStatusUpdates()` 发布 `VehicleStatusUpdateEvent`,由 `WebSocketMessageBroadcaster` 推送给前端。 -3. `UniversalVehicleApiController` 提供 `/api/v1/vehicles/{vehicleId}/status` 给前端请求车辆详情。 - -## 总体改造思路 -1. **通用无人车状态 API 仍然存在**,但其数据源改为**文档1的 WS 缓存 + 文档2的 HTTP 兜底**。 -2. **复用原有 WS 推送逻辑**,保持前端接口与消息不变。 -3. **所有文档1 WS 数据入缓存**,采集阶段不做计算、不开 WebSocket 推送(遵循采集/处理分离原则)。 - ---- - -## 详细实施步骤 - -### 1. 新增文档1 WebSocket 客户端 -**位置建议**:`qaup-collision/src/main/java/com/qaup/collision/datacollector/websocket` - -新增 `VehicleManagerWebSocketClient`,功能: -- 连接 3 个 WS: - - `/ws/at_manager` - - `/ws/at_manager_bsm` - - `/ws/at_manager_path` -- 解析消息 JSON,按 `messageName` 分流: - - `VehicleDetails` - - `VehicleLoginStatus` - - `VehicleChassisInfo` - - `VehicleOrderInfo` - - `VehicleSuspendReport` - - `VehicleTailerNum` - - `VehiclePositionInfo` - - `NaviShortPathReport` - - `GetFmsMessage` -- 支持重连与连接状态统计(参考 `AdxpFlightServiceWebSocketClient`)。 - -### 2. 增加缓存结构(DataCollectorService) -在 `DataCollectorService` 内新增缓存(ConcurrentHashMap): -- `vehicleDetailsCache` -- `vehicleLoginStatusCache` -- `vehicleChassisCache` -- `vehicleOrderCache` -- `vehicleSuspendCache` -- `vehicleTailerCache` -- `vehiclePositionCache` -- `vehiclePathCache` -- `vehicleFmsMessageCache` -- `vehicleSnapshotCache`(聚合后的最新快照) - -并在 `init()` 中注册 WS 客户端的消息监听回调,将消息写入缓存。 - -### 3. 替换通用状态采集逻辑 -**保留方法入口,替换内部数据源**: - -- `DataCollectorDao.getUniversalVehicleStatus()` 改为: - 1. 从文档1缓存组装 `UniversalVehicleStatusDTO` - 2. 缺失字段时调用文档2 HTTP `/api/vehicle_manager/v1/vehicles/{vehicleId}/status` - 3. 返回完整 DTO - -- `DataCollectorService.collectUniversalVehicleStatus()` 改为: - - 不再请求旧 universal API - - 仅通过 `getUniversalVehicleStatus()` 读取新数据源并缓存 - -### 4. 调整 DataProcessingService 推送逻辑 -- `processUniversalVehicleStatusUpdates()` 继续发布 `VehicleStatusUpdateEvent` -- 但其数据来源改为**新缓存/新聚合结果** -- 复用原有 `vehicle_status_update` WebSocket 输出 - -### 5. 调整 UniversalVehicleApiController -- `/api/v1/vehicles/{vehicleId}/status` 返回逻辑不改结构 -- 调用的新 `getUniversalVehicleStatus()`(已替换数据源) - ---- - -## 字段映射参考(文档1 → 通用状态字段) - -| 文档1字段 | 通用状态字段 | 说明 | -| --- | --- | --- | -| VehiclePositionInfo.x/y | motionStatus.position.longitude/latitude | x→经度, y→纬度 | -| VehiclePositionInfo.theta | motionStatus.velocity.direction | 缺失则默认 0 | -| VehicleChassisInfo.sys_info.state_info.d_speed_kmph | motionStatus.velocity.speed | km/h | -| VehicleChassisInfo.sys_info.state_info.d_battery_available | batteryStatus.mainBattery.chargeLevel | 转 Double | -| VehicleLoginStatus.loginStatus | operationalStatus.powerStatus | login→ON, logout→OFF | -| VehicleSuspendReport.suspendStatus | operationalStatus.emergencyStatus | 0→NORMAL, 1/2→EMERGENCY | -| VehicleOrderInfo.businessKey/jobType/jobStage | missionContext.currentMission.* | missionId/jobType/进度 | -| NaviShortPathReport.path | missionContext.waypoints | x/y→经纬度 | - -未映射字段保留在缓存中,后续可扩展。 - ---- - -## 配置项新增(application.yml) -建议新增: -```yaml -data: - collector: - vehicle-manager: - host: 10.3.8.22 - port: 8020 - ws: - at-manager: /ws/at_manager - at-manager-bsm: /ws/at_manager_bsm - at-manager-path: /ws/at_manager_path - http: - status: /api/vehicle_manager/v1/vehicles/{vehicleId}/status -``` - ---- - -## 验证流程(实施完成后) -1. 启动服务后确认 WS 客户端连接成功 -2. 有文档1数据时,`vehicle_status_update` 能持续推送给前端 -3. 前端请求 `/api/v1/vehicles/{vehicleId}/status` 返回正确 -4. 断开 WS 后,HTTP 兜底仍可返回数据 - ---- - -## 注意事项 -1. 采集阶段**不要做计算或 WebSocket 推送**(遵循采集/处理分离原则) -2. 缓存需要带时间戳,避免过期数据污染 -3. WS 断线要重连,避免数据空档 -4. 旧 universal API 数据源必须完全停止对前端输出