增加稳定性

This commit is contained in:
sladro 2026-01-31 14:51:30 +08:00
parent dc4934d4d6
commit 4498cbfe8e
9 changed files with 240 additions and 387 deletions

View File

@ -15,10 +15,12 @@ public class SchedulerConfig {
@Bean
public ThreadPoolTaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
// 设置线程池大小根据需求调整
scheduler.setPoolSize(3);
// 定时任务较多采集/处理/推送等避免线程不足导致任务堆积
scheduler.setPoolSize(Math.max(4, Runtime.getRuntime().availableProcessors()));
// 设置线程名称前缀
scheduler.setThreadNamePrefix("ScheduledTask-");
// 取消任务时及时从队列移除降低堆积风险
scheduler.setRemoveOnCancelPolicy(true);
return scheduler;
}
}

View File

@ -115,6 +115,17 @@ public class DataCollectorService {
// 用于缓存所有活跃的MovingObject的最新状态
private final Map<String, MovingObject> activeMovingObjectsCache = new ConcurrentHashMap<>();
// 记录每个对象最近一次更新的时间戳用于淘汰不活跃对象防止内存长期增长
private final Map<String, Long> activeMovingObjectsLastUpdatedAtMs = new ConcurrentHashMap<>();
private void upsertActiveMovingObject(MovingObject movingObject) {
if (movingObject == null || movingObject.getObjectId() == null) {
return;
}
activeMovingObjectsCache.put(movingObject.getObjectId(), movingObject);
activeMovingObjectsLastUpdatedAtMs.put(movingObject.getObjectId(), System.currentTimeMillis());
}
// 无人车ID列表来自上游 HTTPGET /api/vehicle_details
private final Set<String> unmannedVehicleIds = ConcurrentHashMap.newKeySet();
@ -369,7 +380,7 @@ public class DataCollectorService {
vehicle.setCurrentSpeed(null);
vehicle.setCurrentHeading(null);
activeMovingObjectsCache.put(vehicleId, vehicle);
upsertActiveMovingObject(vehicle);
}
vehicleManagerCacheService.updateVehiclePosition(vehicleId, dataNode);
@ -435,7 +446,7 @@ public class DataCollectorService {
.altitude(aircraft.getAltitude())
.build();
activeMovingObjectsCache.put(movingObject.getObjectId(), movingObject);
upsertActiveMovingObject(movingObject);
log.debug("处理航空器数据并更新缓存: (航班号: {}, 位置: {}, {}, 速度: {})",
aircraft.getObjectId(),
@ -521,7 +532,7 @@ public class DataCollectorService {
.build();
// 将最新数据更新到缓存不发送WebSocket消息统一在周期性检测中发送
activeMovingObjectsCache.put(movingObject.getObjectId(), movingObject);
upsertActiveMovingObject(movingObject);
log.debug("处理机场车辆数据并更新缓存: (车牌号: {}, 位置: {}, {}, 速度: {})",
vehicle.getObjectId(),
@ -664,7 +675,7 @@ public class DataCollectorService {
UnmannedVehicle enhancedUnmannedVehicle = vehicleBuilder.build();
// 将最新数据更新到缓存
activeMovingObjectsCache.put(enhancedUnmannedVehicle.getObjectId(), enhancedUnmannedVehicle);
upsertActiveMovingObject(enhancedUnmannedVehicle);
// 更新无人车HTTP状态缓存供HTTP接口查询
unmannedVehicleHttpStatusCache.put(vehicleId, UniversalVehicleStatusCacheEntry.builder()
@ -839,7 +850,7 @@ public class DataCollectorService {
}
// 更新缓存
activeMovingObjectsCache.put(vehicleId, unmannedVehicle);
upsertActiveMovingObject(unmannedVehicle);
log.debug("更新无人车任务上下文: vehicleId={}, 任务ID={}, 里程={}米, 路径点数量={}",
vehicleId,
@ -1103,22 +1114,59 @@ public class DataCollectorService {
int removedFlightNotifications = 0;
// 清理 activeMovingObjectsCache
// 注意: MovingObject 没有时间戳字段需要通过外部跟踪或限制缓存大小
// 这里先保持简单策略如果缓存超过一定数量可以考虑清理
int maxCacheSize = 1000; // 最大缓存1000个对象
if (activeMovingObjectsCache.size() > maxCacheSize) {
log.warn("activeMovingObjectsCache 超过最大限制 {}, 当前: {}",
maxCacheSize, activeMovingObjectsCache.size());
long cutoff = now - inactiveThreshold;
int sizeBeforeMoving = activeMovingObjectsCache.size();
// 按最后更新时间淘汰同时清理两张表避免长期增长
activeMovingObjectsLastUpdatedAtMs.entrySet().removeIf(entry -> {
Long last = entry.getValue();
if (last == null || last < cutoff) {
activeMovingObjectsCache.remove(entry.getKey());
return true;
}
return false;
});
removedMovingObjects = sizeBeforeMoving - activeMovingObjectsCache.size();
// 兜底如果缓存仍超过上限按最老的lastUpdated继续淘汰丢旧保实时
int currentSize = activeMovingObjectsCache.size();
if (currentSize > maxCacheSize) {
int needRemove = currentSize - maxCacheSize;
activeMovingObjectsLastUpdatedAtMs.entrySet().stream()
.sorted(java.util.Map.Entry.comparingByValue())
.limit(needRemove)
.map(java.util.Map.Entry::getKey)
.forEach(key -> {
activeMovingObjectsCache.remove(key);
activeMovingObjectsLastUpdatedAtMs.remove(key);
});
log.warn("activeMovingObjectsCache 超过最大限制 {},已丢弃最旧 {} 条以保持实时性(当前: {}",
maxCacheSize, needRemove, activeMovingObjectsCache.size());
}
// 清理 flightNotificationCache - 使用 eventTime 字段判断
// int sizeBefore = flightNotificationCache.size();
// flightNotificationCache.entrySet().removeIf(entry -> {
// FlightNotification notification = entry.getValue();
// long eventTime = (notification.getEventTime() != null) ? notification.getEventTime() : 0;
// return (now - eventTime) > inactiveThreshold;
// });
// removedFlightNotifications = sizeBefore - flightNotificationCache.size();
int sizeBeforeFlight = flightNotificationCache.size();
LocalDateTime receivedCutoff = LocalDateTime.now().minusMinutes(5);
flightNotificationCache.entrySet().removeIf(entry -> {
FlightNotification notification = entry.getValue();
if (notification == null) {
return true;
}
// 优先按接收时间淘汰更可靠
if (notification.getReceivedTime() != null && notification.getReceivedTime().isBefore(receivedCutoff)) {
return true;
}
Long eventTime = notification.getEventTime();
if (eventTime != null && eventTime > 0 && (now - eventTime) > inactiveThreshold) {
return true;
}
return false;
});
removedFlightNotifications = sizeBeforeFlight - flightNotificationCache.size();
if (removedMovingObjects > 0 || removedFlightNotifications > 0) {
log.info("清理不活跃缓存对象: {} 个移动对象, {} 个航班通知; 剩余: {} 个移动对象, {} 个航班通知",
@ -1137,6 +1185,7 @@ public class DataCollectorService {
log.info("DataCollectorService 正在关闭...");
// 清理缓存
activeMovingObjectsCache.clear();
activeMovingObjectsLastUpdatedAtMs.clear();
flightNotificationCache.clear();
log.info("缓存已清理");
}

View File

@ -27,6 +27,9 @@ import java.util.concurrent.atomic.AtomicLong;
@Slf4j
public class AdxpFlightServiceWebSocketClient implements WebSocketHandler {
// 单条消息最大字符数防止分片拼接导致内存无限增长
private static final int MAX_MESSAGE_CHARS = 2_000_000;
private final WebSocketClient webSocketClient;
private final FlightSdkProperties properties;
@ -179,8 +182,23 @@ public class AdxpFlightServiceWebSocketClient implements WebSocketHandler {
return;
}
//每来一片就拼
partialMessageBuffer.append(textMessage.getPayload());
// 每来一片就拼加上上限避免异常消息打爆内存
String chunk = textMessage.getPayload();
if (partialMessageBuffer.length() + chunk.length() > MAX_MESSAGE_CHARS) {
log.warn("ADXP航班通知 WebSocket 收到超大分片消息,已丢弃并重置缓冲: sessionId={}, currentChars={}, incomingChars={}",
session.getId(), partialMessageBuffer.length(), chunk.length());
partialMessageBuffer.setLength(0);
try {
if (session.isOpen()) {
session.close(CloseStatus.POLICY_VIOLATION);
}
} catch (Exception ignore) {
// ignore
}
return;
}
partialMessageBuffer.append(chunk);
//不是最后一片直接等
if (!textMessage.isLast()) {
@ -219,7 +237,6 @@ public class AdxpFlightServiceWebSocketClient implements WebSocketHandler {
if (!notifications.isEmpty()) {
log.info("🛬 接收到 {} 条航班通知", notifications.size());
log.error("🛬 接收到 {} 条航班通知", notifications.size());
// 通知所有监听器
for (Consumer<List<FlightNotificationDTO>> listener : messageListeners) {
@ -284,7 +301,7 @@ public class AdxpFlightServiceWebSocketClient implements WebSocketHandler {
for (FlightMessage message : flightMessages) {
try {
log.error("获取消息类型: serviceCode={}", message.getServiceCode());
log.debug("获取消息类型: serviceCode={}", message.getServiceCode());
//log.error("获取消息内容: content={}", message.getContent());
FlightNotificationDTO dto = parseXmlMessage(message.getServiceCode(), message.getContent());
if (dto != null) {
@ -295,7 +312,7 @@ public class AdxpFlightServiceWebSocketClient implements WebSocketHandler {
errorCount.incrementAndGet();
}
}
log.error("notifications={}", notifications);
log.debug("notifications={}", notifications);
return notifications;
} catch (Exception e) {
log.error("解析消息失败", e);

View File

@ -27,6 +27,9 @@ import java.util.function.Consumer;
@Slf4j
public class VehicleManagerWebSocketClient implements WebSocketHandler {
// 单条消息最大字符数防止分片拼接导致内存无限增长
private static final int MAX_MESSAGE_CHARS = 2_000_000;
private final WebSocketClient webSocketClient;
private final VehicleManagerProperties properties;
private final ObjectMapper objectMapper;
@ -119,8 +122,25 @@ public class VehicleManagerWebSocketClient implements WebSocketHandler {
return;
}
StringBuilder buffer = partialBuffers.computeIfAbsent(session.getId(), key -> new StringBuilder(1024));
buffer.append(textMessage.getPayload());
String chunk = textMessage.getPayload();
String sessionId = session.getId();
StringBuilder buffer = partialBuffers.computeIfAbsent(sessionId, key -> new StringBuilder(1024));
if (buffer.length() + chunk.length() > MAX_MESSAGE_CHARS) {
log.warn("车辆管理 WebSocket 收到超大分片消息,已丢弃并重置缓冲: sessionId={}, currentChars={}, incomingChars={}",
sessionId, buffer.length(), chunk.length());
partialBuffers.remove(sessionId);
try {
if (session.isOpen()) {
session.close(CloseStatus.POLICY_VIOLATION);
}
} catch (Exception ignore) {
// ignore
}
return;
}
buffer.append(chunk);
if (!textMessage.isLast()) {
return;
@ -142,13 +162,37 @@ public class VehicleManagerWebSocketClient implements WebSocketHandler {
@Override
public void handleTransportError(@NonNull WebSocketSession session, @NonNull Throwable exception) {
log.error("车辆管理 WebSocket 传输错误: sessionId={}", session.getId(), exception);
String sessionId = session.getId();
partialBuffers.remove(sessionId);
String key = sessionKeyById.remove(sessionId);
if (key != null) {
sessions.remove(key);
scheduleReconnect(key, resolvePathByKey(key));
}
try {
if (session.isOpen()) {
session.close(CloseStatus.SERVER_ERROR);
}
} catch (Exception ignore) {
// ignore
}
}
@Override
public void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull CloseStatus status) {
log.warn("车辆管理 WebSocket 连接关闭: sessionId={}, status={}", session.getId(), status);
String key = sessionKeyById.remove(session.getId());
sessions.values().remove(session);
String sessionId = session.getId();
partialBuffers.remove(sessionId);
String key = sessionKeyById.remove(sessionId);
if (key != null) {
sessions.remove(key);
} else {
sessions.values().remove(session);
}
if (key != null) {
scheduleReconnect(key, resolvePathByKey(key));
}

View File

@ -34,6 +34,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import com.qaup.collision.datacollector.dao.DataCollectorDao;
@ -94,6 +95,12 @@ public class DataProcessingService {
// 从DataCollectorService获取缓存的引用
private Map<String, MovingObject> activeMovingObjectsCache;
/**
* 防止@Scheduled任务重入导致并发执行消息风暴与数据库压力
* 固定频率调度在任务执行时间超过间隔时可能出现并发重入
*/
private final AtomicBoolean periodicProcessingInProgress = new AtomicBoolean(false);
/**
* 设置活跃对象缓存的引用由DataCollectorService调用
*/
@ -121,6 +128,14 @@ public class DataProcessingService {
return;
}
// 丢弃旧轮次如果上一轮尚未完成直接跳过本轮以保证实时性
if (!periodicProcessingInProgress.compareAndSet(false, true)) {
log.warn("周期性数据处理上一轮尚未完成,跳过本轮以避免重入(保持实时性)");
return;
}
try {
if (activeMovingObjectsCache == null || activeMovingObjectsCache.isEmpty()) {
log.debug("活跃对象缓存为空,跳过数据处理");
return;
@ -153,6 +168,10 @@ public class DataProcessingService {
saveUnmannedVehicleDataPeriodically(currentActiveObjects);
log.info("周期性数据处理完成");
} finally {
periodicProcessingInProgress.set(false);
}
}
/**

View File

@ -26,11 +26,16 @@ import com.qaup.collision.websocket.handler.CollisionWebSocketHandler;
import org.springframework.stereotype.Component;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.annotation.Async;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
/**
@ -45,6 +50,15 @@ public class WebSocketMessageBroadcaster {
private final CollisionWebSocketHandler collisionWebSocketHandler; // 注入实例
private final ObjectMapper objectMapper; // JSON序列化器
/**
* 高频位置更新做覆盖式合并只保留每个objectId的最新一条避免消息堆积
* flush线程如果跑不过来下一轮会直接跳过丢旧保实时
*/
private final Map<String, CoalescedPositionUpdate> latestPositionUpdates = new ConcurrentHashMap<>();
private final AtomicBoolean positionFlushInProgress = new AtomicBoolean(false);
private record CoalescedPositionUpdate(PositionUpdatePayload payload, long timestamp) {}
public WebSocketMessageBroadcaster(MessageCacheService messageCacheService, CollisionWebSocketHandler collisionWebSocketHandler, @Qualifier("websocketObjectMapper") ObjectMapper objectMapper) {
this.messageCacheService = messageCacheService;
this.collisionWebSocketHandler = collisionWebSocketHandler;
@ -57,14 +71,64 @@ public class WebSocketMessageBroadcaster {
*/
@EventListener
public void handlePositionUpdate(PositionUpdateEvent event) {
UniversalMessage<PositionUpdatePayload> message = UniversalMessage.<PositionUpdatePayload>builder()
.type(MessageTypeConstants.POSITION_UPDATE)
.timestamp(event.getTimestamp())
.messageId(generateMessageId())
.payload((PositionUpdatePayload) event.getPayload())
.build();
broadcastMessageInternal(message);
try {
if (event == null || event.getPayload() == null) {
return;
}
if (!(event.getPayload() instanceof PositionUpdatePayload payload)) {
return;
}
if (payload.getObjectId() == null || payload.getObjectId().isBlank()) {
return;
}
// 覆盖式合并同一对象只保留最新的
latestPositionUpdates.put(payload.getObjectId(), new CoalescedPositionUpdate(payload, event.getTimestamp()));
} catch (Exception e) {
log.error("合并位置更新事件失败", e);
}
}
/**
* 定期刷新合并后的高频位置消息
* 丢旧保实时如果上一轮flush未完成本轮直接跳过
*/
@Scheduled(fixedDelayString = "${websocket.position.flush-interval-ms:250}")
public void flushCoalescedPositionUpdates() {
if (!positionFlushInProgress.compareAndSet(false, true)) {
return;
}
try {
if (latestPositionUpdates.isEmpty()) {
return;
}
// 没有客户端时直接清空避免无意义积压
if (collisionWebSocketHandler.getConnectionCount() <= 0) {
latestPositionUpdates.clear();
return;
}
List<CoalescedPositionUpdate> snapshot = new ArrayList<>(latestPositionUpdates.values());
latestPositionUpdates.clear();
for (CoalescedPositionUpdate item : snapshot) {
UniversalMessage<PositionUpdatePayload> message = UniversalMessage.<PositionUpdatePayload>builder()
.type(MessageTypeConstants.POSITION_UPDATE)
.timestamp(item.timestamp())
.messageId(generateMessageId())
.payload(item.payload())
.build();
broadcastMessageInternal(message);
}
} catch (Exception e) {
log.error("刷新合并位置更新失败", e);
} finally {
positionFlushInProgress.set(false);
}
}
/**

View File

@ -1,222 +0,0 @@
# 前端开发文档:无人车 VehicleManager HTTP 接口
本文档描述本次新增的前端 HTTP 接口基于“接口文档1/2”无人车数据整合
## 1. 基本信息
- **服务地址**`http://localhost:8080`
- **接口前缀**`/api/vehicle-manager`
- **鉴权**:当前已在后端放开 `permitAll`(无需 token 即可访问该前缀)。后续如恢复鉴权,前端需要按系统登录流程携带 `Authorization`
- **返回格式**RuoYi 风格 `AjaxResult`
- `code`: 200 成功
- `msg`: 提示信息
- `data`: 业务数据
- `timestamp`: 后端附加的时间戳(毫秒)
## 2. 接口列表
### 2.1 车辆统计汇总
**GET** `/api/vehicle-manager/vehicles/summary`
#### Query
- `staleMillis`(可选,默认 `30000`):缓存过期阈值(毫秒)。超过该阈值会被视为离线/不可用。
#### Response.data
```json
{
"totalCount": 10,
"onlineCount": 8,
"offlineCount": 2,
"emergencyCount": 1,
"faultCount": 1,
"unknownCount": 0,
"staleMillis": 30000,
"timestamp": 1730000000000
}
```
#### 说明
- online 判定:`VehicleLoginStatus.loginStatus == "login"` 且未过期
- emergency 判定:`VehicleSuspendReport.suspendStatus != 0` 且未过期
- fault故障临时规则
- `GetFmsMessage.isActive == 1`(字段存在时)或
- `GetFmsMessage.level >= 4`
---
### 2.2 车辆详情快照(缓存聚合)
**GET** `/api/vehicle-manager/vehicles/{vehicleId}`
#### Path
- `vehicleId`:车号(例如 `AET01`
#### Response.data
```json
{
"vehicleId": "AET01",
"details": { "messageName": "VehicleDetails", "vehicleId": "AET01", "vehicleType": "QTRUCK" },
"loginStatus": { "messageName": "VehicleLoginStatus", "vehicleId": "AET01", "loginStatus": "login" },
"position": { "messageName": "VehiclePositionInfo", "vehicleId": "AET01", "x": 6.12, "y": 101.70 },
"chassis": { "messageName": "VehicleChassisInfo", "vehicleId": "AET01", "sys_info": { "state_info": { "d_speed_kmph": 0, "d_battery_available": "0.0" } } },
"suspend": { "messageName": "VehicleSuspendReport", "vehicleId": "AET01", "suspendStatus": 0 },
"tailer": { "messageName": "VehicleTailerNum", "vehicleId": "AET01", "tailerNum": "0" },
"order": { "messageName": "VehicleOrderInfo", "vehicleId": "AET01", "businessKey": "1767..." },
"path": { "messageName": "NaviShortPathReport", "vehicleId": "AET01", "path": [ {"x": 1, "y": 2, "theta": 0} ] },
"fmsMessage": { "messageName": "GetFmsMessage", "vehicleID": "AET01", "level": 4, "description": "..." },
"lastSeenAt": 1730000000000
}
```
#### 说明
- 该接口只读后端内存缓存,不会额外打上游 HTTP。
- 字段可能为 `null`(例如 WS 尚未推送/缓存未命中)。
---
### 2.3 电池信息通过接口文档2上游 HTTP 获取后转发)
**GET** `/api/vehicle-manager/vehicles/{vehicleId}/battery`
#### 行为
- 后端会调用上游接口文档2`POST /api/vehicle_manager/v1/vehicles/{vehicleId}/status`
- 并把其中的 `batteryStatus` 转发给前端
#### Response.data
```json
{
"vehicleId": "AET01",
"batteryStatus": {
"mainBattery": {
"chargeLevel": 0.0,
"voltage": 0.0,
"current": 0.0,
"temperature": 0.0,
"chargingStatus": "DISCHARGING"
}
},
"source": "vehicle_manager_http",
"upstreamTimestamp": 1767838334301.7153
}
```
#### 说明
- 上游不可达或无数据时后端会返回默认结构voltage/current/temperature=0chargingStatus=DISCHARGING
---
### 2.4 车辆路径
**GET** `/api/vehicle-manager/vehicles/{vehicleId}/path`
#### Response.data
```json
{
"vehicleId": "AET01",
"rawPath": [
{ "x": 6.12613, "y": 101.70456, "theta": -1.52245 }
],
"waypoints": [
{ "waypointId": "1", "longitude": 6.12613, "latitude": 101.70456, "status": "PENDING" }
],
"source": "ws_cache"
}
```
#### 说明
- `source=ws_cache`来自文档1 `NaviShortPathReport.path`
- `source=http_fallback`WS 缓存没有路径时,后端会走聚合服务兜底(从 `missionContext.waypoints` 返回)
---
### 2.5 充电桩(占位接口)
**GET** `/api/vehicle-manager/charging-stations`
#### Response.data
```json
[]
```
---
## 3. 任务 CRUD内存不落库
> 注意:这是“前端任务管理/任务库”的 CRUD数据仅存在于后端内存中服务重启会清空。
### 3.1 列表
**GET** `/api/vehicle-manager/tasks?pageNum=1&pageSize=10`
#### Response.data
```json
{
"total": 1,
"pageNum": 1,
"pageSize": 10,
"rows": [
{
"id": 1,
"name": "测试任务1",
"type": "CARGO_TRANSPORT",
"priority": 3,
"payload": { "note": "from frontend" },
"createdAt": 1730000000000,
"updatedAt": 1730000000000
}
]
}
```
### 3.2 创建
**POST** `/api/vehicle-manager/tasks`
Body:
```json
{
"name": "测试任务1",
"type": "CARGO_TRANSPORT",
"priority": 3,
"payload": {
"note": "from frontend"
}
}
```
### 3.3 详情
**GET** `/api/vehicle-manager/tasks/{id}`
### 3.4 更新
**PUT** `/api/vehicle-manager/tasks/{id}`
Body同创建结构
```json
{
"name": "测试任务1-已更新",
"type": "CARGO_TRANSPORT",
"priority": 5,
"payload": { "note": "updated" }
}
```
### 3.5 删除(支持批量)
**DELETE** `/api/vehicle-manager/tasks/{ids}`
示例:
- 删除单个:`/api/vehicle-manager/tasks/1`
- 删除多个:`/api/vehicle-manager/tasks/1,2,3`
---
## 4. 前端接入建议
1) 页面初始化先调:`/vehicles/summary` 拿到统计。
2) 点击某辆车再调:`/vehicles/{vehicleId}` 拿快照。
3) 电池面板单独调:`/vehicles/{vehicleId}/battery`(该接口会请求上游,避免高频轮询)。
4) 路径面板调:`/vehicles/{vehicleId}/path`。
5) 任务管理页面直接使用 `/tasks` CRUD。

9
命令.md Normal file
View File

@ -0,0 +1,9 @@
### 命令
```
mvn -pl qaup-admin -am package -DskipTests
```
```
#测试本地
java "-Dfile.encoding=GBK" -jar "qaup-admin\target\qaup-admin.jar" --spring.profiles.active=dev,druid
```

View File

@ -1,129 +0,0 @@
# 通用无人车状态 API 数据源替换实施说明
## 目标
将现有“通用无人车状态 APIuniversal_autonomous_vehicle_api”的数据来源替换为
- **接口文档1**WebSocket 推送,/ws/at_manager、/ws/at_manager_bsm、/ws/at_manager_path
- **接口文档2**HTTP 获取,/api/vehicle_manager/v1/vehicles/{vehicleId}/status
同时保持前端使用的 **现有 WebSocket 推送通道与消息结构不变**(复用 `vehicle_status_update`)。
## 现状要点(便于对照)
1. `DataCollectorService` 调用 `DataCollectorDao.getUniversalVehicleStatus()` 拉取状态数据。
2. `DataProcessingService.processUniversalVehicleStatusUpdates()` 发布 `VehicleStatusUpdateEvent`,由 `WebSocketMessageBroadcaster` 推送给前端。
3. `UniversalVehicleApiController` 提供 `/api/v1/vehicles/{vehicleId}/status` 给前端请求车辆详情。
## 总体改造思路
1. **通用无人车状态 API 仍然存在**,但其数据源改为**文档1的 WS 缓存 + 文档2的 HTTP 兜底**。
2. **复用原有 WS 推送逻辑**,保持前端接口与消息不变。
3. **所有文档1 WS 数据入缓存**,采集阶段不做计算、不开 WebSocket 推送(遵循采集/处理分离原则)。
---
## 详细实施步骤
### 1. 新增文档1 WebSocket 客户端
**位置建议**`qaup-collision/src/main/java/com/qaup/collision/datacollector/websocket`
新增 `VehicleManagerWebSocketClient`,功能:
- 连接 3 个 WS
- `/ws/at_manager`
- `/ws/at_manager_bsm`
- `/ws/at_manager_path`
- 解析消息 JSON`messageName` 分流:
- `VehicleDetails`
- `VehicleLoginStatus`
- `VehicleChassisInfo`
- `VehicleOrderInfo`
- `VehicleSuspendReport`
- `VehicleTailerNum`
- `VehiclePositionInfo`
- `NaviShortPathReport`
- `GetFmsMessage`
- 支持重连与连接状态统计(参考 `AdxpFlightServiceWebSocketClient`)。
### 2. 增加缓存结构DataCollectorService
`DataCollectorService` 内新增缓存ConcurrentHashMap
- `vehicleDetailsCache`
- `vehicleLoginStatusCache`
- `vehicleChassisCache`
- `vehicleOrderCache`
- `vehicleSuspendCache`
- `vehicleTailerCache`
- `vehiclePositionCache`
- `vehiclePathCache`
- `vehicleFmsMessageCache`
- `vehicleSnapshotCache`(聚合后的最新快照)
并在 `init()` 中注册 WS 客户端的消息监听回调,将消息写入缓存。
### 3. 替换通用状态采集逻辑
**保留方法入口,替换内部数据源**
- `DataCollectorDao.getUniversalVehicleStatus()` 改为:
1. 从文档1缓存组装 `UniversalVehicleStatusDTO`
2. 缺失字段时调用文档2 HTTP `/api/vehicle_manager/v1/vehicles/{vehicleId}/status`
3. 返回完整 DTO
- `DataCollectorService.collectUniversalVehicleStatus()` 改为:
- 不再请求旧 universal API
- 仅通过 `getUniversalVehicleStatus()` 读取新数据源并缓存
### 4. 调整 DataProcessingService 推送逻辑
- `processUniversalVehicleStatusUpdates()` 继续发布 `VehicleStatusUpdateEvent`
- 但其数据来源改为**新缓存/新聚合结果**
- 复用原有 `vehicle_status_update` WebSocket 输出
### 5. 调整 UniversalVehicleApiController
- `/api/v1/vehicles/{vehicleId}/status` 返回逻辑不改结构
- 调用的新 `getUniversalVehicleStatus()`(已替换数据源)
---
## 字段映射参考文档1 → 通用状态字段)
| 文档1字段 | 通用状态字段 | 说明 |
| --- | --- | --- |
| VehiclePositionInfo.x/y | motionStatus.position.longitude/latitude | x→经度, y→纬度 |
| VehiclePositionInfo.theta | motionStatus.velocity.direction | 缺失则默认 0 |
| VehicleChassisInfo.sys_info.state_info.d_speed_kmph | motionStatus.velocity.speed | km/h |
| VehicleChassisInfo.sys_info.state_info.d_battery_available | batteryStatus.mainBattery.chargeLevel | 转 Double |
| VehicleLoginStatus.loginStatus | operationalStatus.powerStatus | login→ON, logout→OFF |
| VehicleSuspendReport.suspendStatus | operationalStatus.emergencyStatus | 0→NORMAL, 1/2→EMERGENCY |
| VehicleOrderInfo.businessKey/jobType/jobStage | missionContext.currentMission.* | missionId/jobType/进度 |
| NaviShortPathReport.path | missionContext.waypoints | x/y→经纬度 |
未映射字段保留在缓存中,后续可扩展。
---
## 配置项新增application.yml
建议新增:
```yaml
data:
collector:
vehicle-manager:
host: 10.3.8.22
port: 8020
ws:
at-manager: /ws/at_manager
at-manager-bsm: /ws/at_manager_bsm
at-manager-path: /ws/at_manager_path
http:
status: /api/vehicle_manager/v1/vehicles/{vehicleId}/status
```
---
## 验证流程(实施完成后)
1. 启动服务后确认 WS 客户端连接成功
2. 有文档1数据时`vehicle_status_update` 能持续推送给前端
3. 前端请求 `/api/v1/vehicles/{vehicleId}/status` 返回正确
4. 断开 WS 后HTTP 兜底仍可返回数据
---
## 注意事项
1. 采集阶段**不要做计算或 WebSocket 推送**(遵循采集/处理分离原则)
2. 缓存需要带时间戳,避免过期数据污染
3. WS 断线要重连,避免数据空档
4. 旧 universal API 数据源必须完全停止对前端输出