diff --git a/qaup-collision/src/main/java/com/qaup/collision/datacollector/websocket/AdxpFlightServiceWebSocketClient.java b/qaup-collision/src/main/java/com/qaup/collision/datacollector/websocket/AdxpFlightServiceWebSocketClient.java index c6d4d04..6ecf681 100644 --- a/qaup-collision/src/main/java/com/qaup/collision/datacollector/websocket/AdxpFlightServiceWebSocketClient.java +++ b/qaup-collision/src/main/java/com/qaup/collision/datacollector/websocket/AdxpFlightServiceWebSocketClient.java @@ -503,6 +503,23 @@ public class AdxpFlightServiceWebSocketClient implements WebSocketHandler { return normalized.isEmpty() ? null : normalized.toUpperCase(); } + private static String normalizeRedisString(Object v) { + if (v == null) { + return null; + } + String s = String.valueOf(v).trim(); + if (s.isEmpty()) { + return null; + } + if (s.length() >= 2 && s.startsWith("\"") && s.endsWith("\"")) { + s = s.substring(1, s.length() - 1).trim(); + } + if (s.length() >= 2 && s.startsWith("'") && s.endsWith("'")) { + s = s.substring(1, s.length() - 1).trim(); + } + return s.isEmpty() ? null : s; + } + private static String buildBizRedisKey(String bizKey) { String normalized = normalizeBizKey(bizKey); if (normalized == null) { @@ -596,6 +613,30 @@ public class AdxpFlightServiceWebSocketClient implements WebSocketHandler { redisCache.setCacheMapValue(flightKey, "activeBizKeyTs", String.valueOf(nowMs)); } + private String readRunwayFromKeys(String flightKey, String bizKey, String field) { + String value = null; + if (bizKey != null) { + value = normalizeRedisString(redisCache.getCacheMapValue(bizKey, field)); + } + if ((value == null || value.isBlank()) && flightKey != null) { + value = normalizeRedisString(redisCache.getCacheMapValue(flightKey, field)); + } + return value; + } + + private String[] resolveRunwayPair(String incomingIn, String incomingOut, String existingIn, String existingOut) { + String finalIn = incomingIn != null && !incomingIn.isBlank() ? incomingIn : existingIn; + String finalOut = incomingOut != null && !incomingOut.isBlank() ? incomingOut : existingOut; + + if ((finalIn == null || finalIn.isBlank()) && finalOut != null && !finalOut.isBlank()) { + finalIn = finalOut; + } + if ((finalOut == null || finalOut.isBlank()) && finalIn != null && !finalIn.isBlank()) { + finalOut = finalIn; + } + return new String[]{finalIn, finalOut}; + } + private LocalDateTime parseBestFlightDateTimeForDfi(org.w3c.dom.Element root, String[] bizParts) { if (bizParts != null && bizParts.length >= 3) { LocalDateTime fromBiz = parseDateTimeLoose(bizParts[2]); @@ -824,12 +865,21 @@ public class AdxpFlightServiceWebSocketClient implements WebSocketHandler { setCommonFlightMeta(flightKey, normalizedFlightNo, arr); setCommonFlightMeta(bizRedisKey, normalizedFlightNo, arr); updateActiveBizKey(flightKey, normalizedBizKey, nowMs); - if (RunwayNum != null && !RunwayNum.isBlank()) { - // DFIE 的 RunwayNum 用于初始化跑道参数:in/out 同步赋值,再由 RUNWAY 事件按 BizKey 精确覆盖 - setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "inRunway", RunwayNum); - setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "outRunway", RunwayNum); + String incomingRunway = normalizeRedisString(RunwayNum); + String existingInRunway = readRunwayFromKeys(flightKey, bizRedisKey, "inRunway"); + String existingOutRunway = readRunwayFromKeys(flightKey, bizRedisKey, "outRunway"); + String[] resolvedRunways = resolveRunwayPair(incomingRunway, incomingRunway, existingInRunway, existingOutRunway); + String inRunwayToWrite = resolvedRunways[0]; + String outRunwayToWrite = resolvedRunways[1]; + if (inRunwayToWrite != null && !inRunwayToWrite.isBlank() + && outRunwayToWrite != null && !outRunwayToWrite.isBlank()) { + setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "inRunway", inRunwayToWrite); + setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "outRunway", outRunwayToWrite); setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "inRunwayTs", String.valueOf(nowMs)); setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "outRunwayTs", String.valueOf(nowMs)); + } else { + log.debug("DFIE runway skipped: both in/out are blank, flightNo={}, bizKey={}, runwayNum={}", + normalizedFlightNo, normalizedBizKey, RunwayNum); } LocalDateTime bizDateTime = parseBestFlightDateTimeForDfi(root, arr); applyDfiExpiry(flightKey, bizRedisKey, bizDateTime, nowMs, normalizedFlightNo, normalizedBizKey); @@ -873,13 +923,22 @@ public class AdxpFlightServiceWebSocketClient implements WebSocketHandler { setCommonFlightMeta(flightKey, normalizedFlightNo, arr); setCommonFlightMeta(bizRedisKey, normalizedFlightNo, arr); updateActiveBizKey(flightKey, normalizedBizKey, nowMs); - if (inRunway != null && !inRunway.isBlank()){ - setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "inRunway", inRunway); + String incomingInRunway = normalizeRedisString(inRunway); + String incomingOutRunway = normalizeRedisString(outRunway); + String existingInRunway = readRunwayFromKeys(flightKey, bizRedisKey, "inRunway"); + String existingOutRunway = readRunwayFromKeys(flightKey, bizRedisKey, "outRunway"); + String[] resolvedRunways = resolveRunwayPair(incomingInRunway, incomingOutRunway, existingInRunway, existingOutRunway); + String inRunwayToWrite = resolvedRunways[0]; + String outRunwayToWrite = resolvedRunways[1]; + if (inRunwayToWrite != null && !inRunwayToWrite.isBlank() + && outRunwayToWrite != null && !outRunwayToWrite.isBlank()) { + setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "inRunway", inRunwayToWrite); + setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "outRunway", outRunwayToWrite); setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "inRunwayTs", String.valueOf(nowMs)); - } - if (outRunway != null && !outRunway.isBlank()){ - setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "outRunway", outRunway); setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "outRunwayTs", String.valueOf(nowMs)); + } else { + log.debug("RUNWAY skipped: both in/out are blank, flightNo={}, bizKey={}, inRunway={}, outRunway={}", + normalizedFlightNo, normalizedBizKey, inRunway, outRunway); } log.info("成功将航班RUNWAY数据存储到Redis: flightNumber={}, bizKey={}, inRunway={}, outRunway={}", normalizedFlightNo, bizKey, inRunway, outRunway); diff --git a/qaup-collision/src/main/java/com/qaup/collision/dataprocessing/service/DataProcessingService.java b/qaup-collision/src/main/java/com/qaup/collision/dataprocessing/service/DataProcessingService.java index 44c3531..d5db4f0 100644 --- a/qaup-collision/src/main/java/com/qaup/collision/dataprocessing/service/DataProcessingService.java +++ b/qaup-collision/src/main/java/com/qaup/collision/dataprocessing/service/DataProcessingService.java @@ -1049,7 +1049,7 @@ public class DataProcessingService { */ private final Map pendingRouteQueries = new ConcurrentHashMap<>(); - private static final int ROUTE_RETRY_MAX_ATTEMPTS = 2; + private static final int ROUTE_RETRY_MAX_ATTEMPTS = 20; private static final long ROUTE_RETRY_MIN_DELAY_MS = 1_000L; // 1s private static final long ROUTE_RETRY_MAX_DELAY_MS = 5_000L; // 5s diff --git a/tmpjarcheck/com/qaup/collision/datacollector/websocket/AdxpFlightServiceWebSocketClient.class b/tmpjarcheck/com/qaup/collision/datacollector/websocket/AdxpFlightServiceWebSocketClient.class new file mode 100644 index 0000000..6100ce6 Binary files /dev/null and b/tmpjarcheck/com/qaup/collision/datacollector/websocket/AdxpFlightServiceWebSocketClient.class differ