更新航班通知数据处理逻辑,恢复WebSocket推送并优化缓存管理。新增日志记录以支持并发处理,确保通知不被重复推送。更新相关文档以反映最新功能。

This commit is contained in:
sladro 2026-02-06 10:34:22 +08:00
parent 022bc183ab
commit dee9bc4420
2 changed files with 38 additions and 3 deletions

View File

@ -155,8 +155,9 @@ public class DataProcessingService {
// 第三步处理通用车辆状态数据并发送WebSocket更新 // 第三步处理通用车辆状态数据并发送WebSocket更新
processUniversalVehicleStatusUpdates(); processUniversalVehicleStatusUpdates();
// 第四步处理航班通知数据并发送WebSocket更新, 暂时关闭改为用websocket接收 // 第四步处理航班通知数据并发送WebSocket更新
//processFlightNotificationUpdates(); // 注意采集侧只缓存发送侧在处理完成后会清理缓存避免同一通知被每秒重复推送刷屏
processFlightNotificationUpdates();
// 第五步执行路径冲突检测 // 第五步执行路径冲突检测
pathConflictDetectionService.detectPathConflicts(currentActiveObjects); pathConflictDetectionService.detectPathConflicts(currentActiveObjects);
@ -522,8 +523,12 @@ public class DataProcessingService {
log.info("开始处理航班通知数据,缓存数量: {}", flightNotificationCache.size()); log.info("开始处理航班通知数据,缓存数量: {}", flightNotificationCache.size());
// 这里直接迭代共享的 ConcurrentHashMap
// - 只在成功发布事件后做条件删除 remove(key, value)避免并发覆盖时误删新通知
// - 失败/无效则保留允许下一轮重试
for (Map.Entry<String, FlightNotification> entry : flightNotificationCache.entrySet()) { for (Map.Entry<String, FlightNotification> entry : flightNotificationCache.entrySet()) {
try { try {
final String cacheKey = entry.getKey();
FlightNotification notification = entry.getValue(); FlightNotification notification = entry.getValue();
// 创建并发布WebSocket事件 // 创建并发布WebSocket事件
@ -538,6 +543,13 @@ public class DataProcessingService {
// 触发基于航班通知的路由查询和更新处理 // 触发基于航班通知的路由查询和更新处理
triggerRouteQueryByFlightNotification(notification); triggerRouteQueryByFlightNotification(notification);
// 发布成功后清理缓存避免每秒重复推送同一条通知直到5分钟后自然淘汰
// 使用 remove(key, value) 防止并发更新导致误删新通知
boolean removed = flightNotificationCache.remove(cacheKey, notification);
if (!removed) {
log.debug("航班通知缓存条目未删除(可能已被并发更新/移除): cacheKey={}", cacheKey);
}
} else { } else {
log.warn("⚠️ 航班进出港通知WebSocket事件创建失败或无效: {}", notification.getFlightNo()); log.warn("⚠️ 航班进出港通知WebSocket事件创建失败或无效: {}", notification.getFlightNo());
} }

View File

@ -40,6 +40,8 @@ docker logs qaup-app | grep -n "flight-notifications" | tail -n 200
### 确定adapter没问题 ### 确定adapter没问题
tail -n 20000 /home/project_20250804/qaup/adxp-adapter/logs/adapter-logs.log | grep -E "事件的类型是|ADXP_NAOMS_O_CDM_AXOT" | tail -n 200 tail -n 20000 /home/project_20250804/qaup/adxp-adapter/logs/adapter-logs.log | grep -E "事件的类型是|ADXP_NAOMS_O_CDM_AXOT" | tail -n 200
docker logs qaup-app | grep -E "通过WebSocket接收到|AXOT|BizKey|ActualPushback" | tail -n 200 docker logs qaup-app | grep -E "通过WebSocket接收到|AXOT|BizKey|ActualPushback" | tail -n 200
docker exec -it qaup-app sh -lc ' docker exec -it qaup-app sh -lc '
@ -49,4 +51,25 @@ while true; do
echo "$(date +%T) no-axot"; echo "$(date +%T) no-axot";
sleep 1; sleep 1;
done done
' '
docker exec -it qaup-app sh -lc 'grep -nE "已连接到ADXP适配器WebSocket服务|正在连接到ADXP适配器WebSocket服务|数据中台航班 SDK 配置不完整|接收到 [0-9]+ 条航班通知|通过WebSocket接收到|新增航班通知缓存|合并航班通知缓存|数据无效|未知的服务代码|解析消息失败" /logs/sys-info.log /logs/sys-error.log 2>/dev/null | tail -n 200'
tail -n 200000 /home/project_20250804/qaup/adxp-adapter/logs/adapter-logs.log \
| grep -E "事件的类型是ADXP_NAOMS_O_(CDM_AXOT|DYN_ARR|CDM_RUNWAY|DYN_DFIE|DYN_DFDE|DYN_TISFLIGHT|DYN_CRAFTSEAT)" \
| tail -n 200
tail -n 200000 /home/project_20250804/qaup/adxp-adapter/logs/adapter-logs.log \
| grep -E "ERROR|异常|断开|close|重连|reconnect|失败|fail|timeout" \
| tail -n 200
curl -s http://10.64.58.228:8086/api/adxp/status
curl -s http://10.64.58.228:8086/api/adxp/health
curl -s -X POST http://10.64.58.228:8086/api/adxp/reconnect
docker exec -it qaup-app sh -lc 'grep -nE "连接到ADXP适配器WebSocket服务|已连接到ADXP适配器WebSocket服务|数据中台航班 SDK 配置不完整|接收到 .* 条航班通知|通过WebSocket接收到 .* 条航班进出港通知|数据中台航班 SDK 未启用|未获取到航班进出港通知数据|航班进出港通知数据无效" /logs/sys-info.log /logs/sys-error.log 2>/dev/null | tail -n 200'
docker exec -it qaup-app sh -lc 'grep -nE "接收到 [0-9]+ 条航班通知|通过WebSocket接收到|新增航班通知缓存|合并航班通知缓存|发布航班进出港通知WebSocket事件" /logs/sys-info.log /logs/sys-error.log 2>/dev/null | tail -n 200'