Refactor DataProcessingService to support bizKey in route queries and reduce retry attempts
- Updated ROUTE_RETRY_MAX_ATTEMPTS from 20 to 2 and ROUTE_RETRY_MAX_DELAY_MS from 30s to 5s. - Modified PendingRouteQuery to include bizKey for better route query management. - Enhanced methods to check and mark routes as retrieved using bizKey. - Adjusted route query methods to handle bizKey, ensuring proper caching and retrieval. - Cleaned up redundant code and improved logging for better traceability. - Added a new section in the documentation for the restart process of the application.
This commit is contained in:
parent
aa13f343ea
commit
ea3f4a3b7b
@ -18,7 +18,7 @@
|
||||
(4)Z31020**&d1231=+33
|
||||
4.测试平台-- 10.100.23.10
|
||||
(1)root
|
||||
Dianxin@222
|
||||
(2)Dianxin@222
|
||||
5.红绿灯测试平台-- 10.98.23.111 windows系统
|
||||
(1)账密
|
||||
①administrator
|
||||
|
||||
@ -18,8 +18,14 @@ import org.springframework.web.socket.client.WebSocketClient;
|
||||
import org.springframework.lang.NonNull;
|
||||
|
||||
import java.net.URI;
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.time.format.DateTimeParseException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
@ -29,6 +35,12 @@ public class AdxpFlightServiceWebSocketClient implements WebSocketHandler {
|
||||
|
||||
// 单条消息最大字符数(防止分片拼接导致内存无限增长)
|
||||
private static final int MAX_MESSAGE_CHARS = 2_000_000;
|
||||
private static final ZoneId BIZ_DATE_ZONE = ZoneId.of("Asia/Shanghai");
|
||||
private static final DateTimeFormatter BIZ_KEY_TS_14 = DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
|
||||
private static final DateTimeFormatter BIZ_KEY_TS_12 = DateTimeFormatter.ofPattern("yyyyMMddHHmm");
|
||||
private static final DateTimeFormatter BIZ_KEY_DATE_8 = DateTimeFormatter.ofPattern("yyyyMMdd");
|
||||
private static final long DEFAULT_DFIE_TTL_SECONDS = TimeUnit.DAYS.toSeconds(2);
|
||||
private static final long MIN_DFIE_TTL_SECONDS = 60L;
|
||||
|
||||
private final WebSocketClient webSocketClient;
|
||||
private final FlightSdkProperties properties;
|
||||
@ -432,6 +444,244 @@ public class AdxpFlightServiceWebSocketClient implements WebSocketHandler {
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 统一航班号归一化规则,避免同一航班参数写入不同 Redis Key,导致路由参数难以凑齐。
|
||||
*
|
||||
* 规则(尽量保守,不改变业务含义):
|
||||
* - trim
|
||||
* - 如果包含 '-',取第一段(与 DataProcessingService 的 normalizedFlightNo 规则一致)
|
||||
* - 统一转大写(避免大小写不一致导致 key 分裂)
|
||||
*
|
||||
* @param raw 原始航班号(可能来自 BizKey/FlightNumber/FlNo)
|
||||
* @return 归一化后的航班号;为空返回 null
|
||||
*/
|
||||
private static String normalizeFlightNo(String raw) {
|
||||
if (raw == null) {
|
||||
return null;
|
||||
}
|
||||
String s = raw.trim();
|
||||
if (s.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
int dash = s.indexOf('-');
|
||||
if (dash > 0) {
|
||||
s = s.substring(0, dash).trim();
|
||||
}
|
||||
if (s.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
return s.toUpperCase();
|
||||
}
|
||||
|
||||
/**
|
||||
* 将 BizKey 按最多 3 段安全切分,避免空值导致 NPE。
|
||||
*/
|
||||
private static String[] splitBizKey(String bizKey) {
|
||||
if (bizKey == null || bizKey.trim().isEmpty()) {
|
||||
return new String[0];
|
||||
}
|
||||
return bizKey.split("-", 3);
|
||||
}
|
||||
|
||||
/**
|
||||
* 生成航班 Redis Key(flight:<flightNo>),并保证 flightNo 已归一化。
|
||||
*/
|
||||
private static String buildFlightRedisKey(String flightNo) {
|
||||
String normalized = normalizeFlightNo(flightNo);
|
||||
if (normalized == null) {
|
||||
return null;
|
||||
}
|
||||
return "flight:" + normalized;
|
||||
}
|
||||
|
||||
private static String normalizeBizKey(String bizKey) {
|
||||
if (bizKey == null) {
|
||||
return null;
|
||||
}
|
||||
String normalized = bizKey.trim();
|
||||
return normalized.isEmpty() ? null : normalized.toUpperCase();
|
||||
}
|
||||
|
||||
private static String buildBizRedisKey(String bizKey) {
|
||||
String normalized = normalizeBizKey(bizKey);
|
||||
if (normalized == null) {
|
||||
return null;
|
||||
}
|
||||
return "flightBiz:" + normalized;
|
||||
}
|
||||
|
||||
private static String firstSegmentFromBizLike(String raw) {
|
||||
String normalized = normalizeBizKey(raw);
|
||||
if (normalized == null) {
|
||||
return null;
|
||||
}
|
||||
String[] parts = normalized.split("-", 3);
|
||||
return parts.length > 0 ? normalizeFlightNo(parts[0]) : null;
|
||||
}
|
||||
|
||||
private String getTextContentAny(org.w3c.dom.Element parent, String... tagNames) {
|
||||
if (tagNames == null) {
|
||||
return null;
|
||||
}
|
||||
for (String tagName : tagNames) {
|
||||
String value = getTextContent(parent, tagName);
|
||||
if (value != null && !value.trim().isBlank()) {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private String resolveTisFlightBizKey(String bizKeyRaw, String fuIdRaw, String flightId, String resolvedFlightNo) {
|
||||
String bizNorm = normalizeBizKey(bizKeyRaw);
|
||||
if (bizNorm != null) {
|
||||
return bizNorm;
|
||||
}
|
||||
|
||||
// TISFLIGHT 缺 BizKey 时,先用 FlightId 关联 FuId(FuId 视作可复用业务键)
|
||||
String fuNorm = normalizeBizKey(fuIdRaw);
|
||||
String normalizedFlightId = normalizeFlightNo(flightId);
|
||||
if (fuNorm != null && normalizedFlightId != null) {
|
||||
String fuFlightNo = firstSegmentFromBizLike(fuNorm);
|
||||
if (normalizedFlightId.equalsIgnoreCase(fuFlightNo)) {
|
||||
return fuNorm;
|
||||
}
|
||||
}
|
||||
|
||||
// FlightId-FuId 关联失败时,按航班号回退到 flight:<flightNo>.activeBizKey
|
||||
String flightKey = buildFlightRedisKey(resolvedFlightNo);
|
||||
if (flightKey != null) {
|
||||
Object activeBizRaw = redisCache.getCacheMapValue(flightKey, "activeBizKey");
|
||||
String activeBizKey = normalizeBizKey(activeBizRaw == null ? null : String.valueOf(activeBizRaw));
|
||||
if (activeBizKey != null) {
|
||||
return activeBizKey;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private void setCommonFlightMeta(String key, String normalizedFlightNo, String[] bizParts) {
|
||||
if (key == null) {
|
||||
return;
|
||||
}
|
||||
if (normalizedFlightNo != null && !normalizedFlightNo.isBlank()) {
|
||||
redisCache.setCacheMapValue(key, "flightNumber", normalizedFlightNo);
|
||||
}
|
||||
if (bizParts.length >= 2 && bizParts[1] != null && !bizParts[1].isBlank()) {
|
||||
redisCache.setCacheMapValue(key, "type", bizParts[1]);
|
||||
}
|
||||
if (bizParts.length >= 3 && bizParts[2] != null && !bizParts[2].isBlank()) {
|
||||
redisCache.setCacheMapValue(key, "time", bizParts[2]);
|
||||
}
|
||||
}
|
||||
|
||||
private void setValueOnFlightAndBizKeys(String flightKey, String bizKey, String field, String value) {
|
||||
if (value == null) {
|
||||
return;
|
||||
}
|
||||
if (flightKey != null) {
|
||||
redisCache.setCacheMapValue(flightKey, field, value);
|
||||
}
|
||||
if (bizKey != null) {
|
||||
redisCache.setCacheMapValue(bizKey, field, value);
|
||||
}
|
||||
}
|
||||
|
||||
private void updateActiveBizKey(String flightKey, String normalizedBizKey, long nowMs) {
|
||||
if (flightKey == null || normalizedBizKey == null) {
|
||||
return;
|
||||
}
|
||||
redisCache.setCacheMapValue(flightKey, "activeBizKey", normalizedBizKey);
|
||||
redisCache.setCacheMapValue(flightKey, "activeBizKeyTs", String.valueOf(nowMs));
|
||||
}
|
||||
|
||||
private LocalDateTime parseBestFlightDateTimeForDfi(org.w3c.dom.Element root, String[] bizParts) {
|
||||
if (bizParts != null && bizParts.length >= 3) {
|
||||
LocalDateTime fromBiz = parseDateTimeLoose(bizParts[2]);
|
||||
if (fromBiz != null) {
|
||||
return fromBiz;
|
||||
}
|
||||
}
|
||||
String[] candidateTags = new String[]{"FlightDate", "Date", "OpDate", "DataDate", "SchDate"};
|
||||
for (String tag : candidateTags) {
|
||||
LocalDateTime dt = parseDateTimeLoose(getTextContent(root, tag));
|
||||
if (dt != null) {
|
||||
return dt;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private LocalDateTime parseDateTimeLoose(String raw) {
|
||||
if (raw == null) {
|
||||
return null;
|
||||
}
|
||||
String value = raw.trim();
|
||||
if (value.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
if (value.length() == 14) {
|
||||
return LocalDateTime.parse(value, BIZ_KEY_TS_14);
|
||||
}
|
||||
if (value.length() == 12) {
|
||||
return LocalDateTime.parse(value, BIZ_KEY_TS_12);
|
||||
}
|
||||
if (value.length() == 8) {
|
||||
LocalDate d = LocalDate.parse(value, BIZ_KEY_DATE_8);
|
||||
return d.atStartOfDay();
|
||||
}
|
||||
} catch (DateTimeParseException ignored) {
|
||||
// ignore
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private long computeDfiTtlSeconds(LocalDateTime bizDateTime, long nowMs) {
|
||||
if (bizDateTime == null) {
|
||||
return DEFAULT_DFIE_TTL_SECONDS;
|
||||
}
|
||||
// 以 DFIE 日期为基准,保留到“次日后一天的零点”(跨天场景更稳妥)
|
||||
long expireAtMs = bizDateTime.toLocalDate()
|
||||
.plusDays(2)
|
||||
.atStartOfDay(BIZ_DATE_ZONE)
|
||||
.toInstant()
|
||||
.toEpochMilli();
|
||||
long ttlSec = (expireAtMs - nowMs) / 1000L;
|
||||
if (ttlSec < MIN_DFIE_TTL_SECONDS) {
|
||||
return MIN_DFIE_TTL_SECONDS;
|
||||
}
|
||||
return ttlSec;
|
||||
}
|
||||
|
||||
private void applyDfiExpiry(String flightKey,
|
||||
String bizRedisKey,
|
||||
LocalDateTime bizDateTime,
|
||||
long nowMs,
|
||||
String normalizedFlightNo,
|
||||
String normalizedBizKey) {
|
||||
long ttlSeconds = computeDfiTtlSeconds(bizDateTime, nowMs);
|
||||
try {
|
||||
if (flightKey != null) {
|
||||
redisCache.expire(flightKey, ttlSeconds, TimeUnit.SECONDS);
|
||||
}
|
||||
if (bizRedisKey != null) {
|
||||
redisCache.expire(bizRedisKey, ttlSeconds, TimeUnit.SECONDS);
|
||||
}
|
||||
if (flightKey != null && bizDateTime != null) {
|
||||
redisCache.setCacheMapValue(flightKey, "bizDateRef", bizDateTime.toString());
|
||||
}
|
||||
if (bizRedisKey != null && bizDateTime != null) {
|
||||
redisCache.setCacheMapValue(bizRedisKey, "bizDateRef", bizDateTime.toString());
|
||||
}
|
||||
log.info("DFIE设置Redis过期成功: flightNo={}, bizKey={}, ttlSeconds={}, bizDateRef={}",
|
||||
normalizedFlightNo, normalizedBizKey, ttlSeconds, bizDateTime);
|
||||
} catch (Exception e) {
|
||||
log.error("DFIE设置Redis过期失败: flightNo={}, bizKey={}, err={}",
|
||||
normalizedFlightNo, normalizedBizKey, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 添加消息监听器
|
||||
@ -512,38 +762,55 @@ public class AdxpFlightServiceWebSocketClient implements WebSocketHandler {
|
||||
|
||||
//处理ARR事件
|
||||
private FlightNotificationDTO handleARR(org.w3c.dom.Element root) {
|
||||
String flightNo = getTextContent(root, "BizKey");
|
||||
//分解BizKey
|
||||
String[] arr = flightNo.split("-", 3); // 最多切 3 段
|
||||
String bizKey = getTextContent(root, "BizKey");
|
||||
// 分解 BizKey(安全处理,避免 NPE)
|
||||
String[] arr = splitBizKey(bizKey); // 最多切 3 段
|
||||
String normalizedFlightNo = (arr.length >= 1) ? normalizeFlightNo(arr[0]) : null;
|
||||
String normalizedBizKey = normalizeBizKey(bizKey);
|
||||
String flightKey = buildFlightRedisKey(normalizedFlightNo);
|
||||
String bizRedisKey = buildBizRedisKey(normalizedBizKey);
|
||||
long nowMs = System.currentTimeMillis();
|
||||
|
||||
FlightNotificationDTO dto = new FlightNotificationDTO();
|
||||
dto.setFlightNo(flightNo);
|
||||
// 前端/后续处理更需要“航班号”而不是携带后缀的 BizKey
|
||||
dto.setFlightNo(normalizedFlightNo != null ? normalizedFlightNo : bizKey);
|
||||
dto.setType("IN");
|
||||
// 存储到Redis
|
||||
if (arr.length >= 3) {
|
||||
|
||||
// 存储到Redis(只要有 flightNo 就写入,避免因为字段不全而错过 key 初始化)
|
||||
if (flightKey != null || bizRedisKey != null) {
|
||||
try {
|
||||
String key = "flight:" + arr[0]; // 使用航班号作为键
|
||||
redisCache.setCacheMapValue(key, "flightNumber", arr[0]);
|
||||
redisCache.setCacheMapValue(key, "type", arr[1]);
|
||||
redisCache.setCacheMapValue(key, "time", arr[2]);
|
||||
log.info("成功将航班数据存储到Redis: flightNumber={}, type={}, time={}", arr[0], arr[1], arr[2]);
|
||||
setCommonFlightMeta(flightKey, normalizedFlightNo, arr);
|
||||
setCommonFlightMeta(bizRedisKey, normalizedFlightNo, arr);
|
||||
updateActiveBizKey(flightKey, normalizedBizKey, nowMs);
|
||||
log.info("成功将航班ARR数据存储到Redis: flightNumber={}, bizKey={}", normalizedFlightNo, bizKey);
|
||||
} catch (Exception e) {
|
||||
log.error("存储航班数据到Redis失败: {}", e.getMessage());
|
||||
log.error("存储航班ARR数据到Redis失败: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
//假装模拟请求接口
|
||||
dataProcessingService.ARR(arr[0]);
|
||||
|
||||
// 触发路由查询:使用归一化航班号,避免 flightNo 不一致导致拼参失败
|
||||
if (normalizedFlightNo != null) {
|
||||
dataProcessingService.ARR(normalizedFlightNo, normalizedBizKey);
|
||||
} else {
|
||||
log.warn("ARR事件缺少可用航班号,跳过路由触发: bizKey={}", bizKey);
|
||||
}
|
||||
return dto;
|
||||
}
|
||||
|
||||
private FlightNotificationDTO handleDFIE(org.w3c.dom.Element root) {
|
||||
String flightNo = getTextContent(root, "BizKey");
|
||||
String bizKey = getTextContent(root, "BizKey");
|
||||
String RunwayNum = getTextContent(root, "RunwayNum");
|
||||
String InOut = getTextContent(root, "InOut");
|
||||
long nowMs = System.currentTimeMillis();
|
||||
//分解BizKey
|
||||
String[] arr = (flightNo == null ? new String[0] : flightNo.split("-", 3)); // 最多切 3 段
|
||||
String[] arr = splitBizKey(bizKey); // 最多切 3 段
|
||||
String normalizedFlightNo = (arr.length >= 1) ? normalizeFlightNo(arr[0]) : null;
|
||||
String normalizedBizKey = normalizeBizKey(bizKey);
|
||||
String flightKey = buildFlightRedisKey(normalizedFlightNo);
|
||||
String bizRedisKey = buildBizRedisKey(normalizedBizKey);
|
||||
|
||||
FlightNotificationDTO dto = new FlightNotificationDTO();
|
||||
dto.setFlightNo(flightNo);
|
||||
dto.setFlightNo(normalizedFlightNo != null ? normalizedFlightNo : bizKey);
|
||||
if ("A".equals(InOut)){
|
||||
dto.setType("IN");
|
||||
}else {
|
||||
@ -552,22 +819,24 @@ public class AdxpFlightServiceWebSocketClient implements WebSocketHandler {
|
||||
dto.setRunway(RunwayNum);
|
||||
|
||||
// 存储到Redis
|
||||
if (arr.length >= 3) {
|
||||
if (flightKey != null || bizRedisKey != null) {
|
||||
try {
|
||||
String key = "flight:" + arr[0]; // 使用航班号作为键
|
||||
redisCache.setCacheMapValue(key, "flightNumber", arr[0]);
|
||||
redisCache.setCacheMapValue(key, "type", arr[1]);
|
||||
redisCache.setCacheMapValue(key, "time", arr[2]);
|
||||
setCommonFlightMeta(flightKey, normalizedFlightNo, arr);
|
||||
setCommonFlightMeta(bizRedisKey, normalizedFlightNo, arr);
|
||||
updateActiveBizKey(flightKey, normalizedBizKey, nowMs);
|
||||
if (RunwayNum != null && !RunwayNum.isBlank()) {
|
||||
if ("A".equals(InOut)){
|
||||
redisCache.setCacheMapValue(key, "inRunway", RunwayNum);
|
||||
}else {
|
||||
redisCache.setCacheMapValue(key, "outRunway", RunwayNum);
|
||||
}
|
||||
// DFIE 的 RunwayNum 用于初始化跑道参数:in/out 同步赋值,再由 RUNWAY 事件按 BizKey 精确覆盖
|
||||
setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "inRunway", RunwayNum);
|
||||
setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "outRunway", RunwayNum);
|
||||
setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "inRunwayTs", String.valueOf(nowMs));
|
||||
setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "outRunwayTs", String.valueOf(nowMs));
|
||||
}
|
||||
log.info("成功将航班数据存储到Redis: flightNumber={}, type={}, time={}", arr[0], arr[1], arr[2]);
|
||||
LocalDateTime bizDateTime = parseBestFlightDateTimeForDfi(root, arr);
|
||||
applyDfiExpiry(flightKey, bizRedisKey, bizDateTime, nowMs, normalizedFlightNo, normalizedBizKey);
|
||||
log.info("成功将航班DFIE数据存储到Redis: flightNumber={}, bizKey={}, runwayNum={}, inOut={}",
|
||||
normalizedFlightNo, bizKey, RunwayNum, InOut);
|
||||
} catch (Exception e) {
|
||||
log.error("存储航班数据到Redis失败: {}", e.getMessage());
|
||||
log.error("存储航班DFIE数据到Redis失败: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@ -576,15 +845,21 @@ public class AdxpFlightServiceWebSocketClient implements WebSocketHandler {
|
||||
|
||||
//处理RUNWAY事件
|
||||
private FlightNotificationDTO handleRUNWAY(org.w3c.dom.Element root) {
|
||||
String flightNo = getTextContent(root, "BizKey");
|
||||
String bizKey = getTextContent(root, "BizKey");
|
||||
String outRunway = getTextContent(root, "RUNWAYDEP");
|
||||
String inRunway = getTextContent(root, "RUNWAYARR");
|
||||
long nowMs = System.currentTimeMillis();
|
||||
log.error("inRunway-{}",inRunway);
|
||||
log.error("outRunway-{}",outRunway);
|
||||
//分解BizKey
|
||||
String[] arr = (flightNo == null ? new String[0] : flightNo.split("-", 3)); // 最多切 3 段
|
||||
String[] arr = splitBizKey(bizKey); // 最多切 3 段
|
||||
String normalizedFlightNo = (arr.length >= 1) ? normalizeFlightNo(arr[0]) : null;
|
||||
String normalizedBizKey = normalizeBizKey(bizKey);
|
||||
String flightKey = buildFlightRedisKey(normalizedFlightNo);
|
||||
String bizRedisKey = buildBizRedisKey(normalizedBizKey);
|
||||
|
||||
FlightNotificationDTO dto = new FlightNotificationDTO();
|
||||
dto.setFlightNo(flightNo);
|
||||
dto.setFlightNo(normalizedFlightNo != null ? normalizedFlightNo : bizKey);
|
||||
if (arr.length >= 2) {
|
||||
if ("A".equals(arr[1])){
|
||||
dto.setType("IN");
|
||||
@ -593,68 +868,166 @@ public class AdxpFlightServiceWebSocketClient implements WebSocketHandler {
|
||||
}
|
||||
}
|
||||
// 存储到Redis
|
||||
if (arr.length >= 3) {
|
||||
if (flightKey != null || bizRedisKey != null) {
|
||||
try {
|
||||
String key = "flight:" + arr[0]; // 使用航班号作为键
|
||||
redisCache.setCacheMapValue(key, "flightNumber", arr[0]);
|
||||
redisCache.setCacheMapValue(key, "type", arr[1]);
|
||||
redisCache.setCacheMapValue(key, "time", arr[2]);
|
||||
setCommonFlightMeta(flightKey, normalizedFlightNo, arr);
|
||||
setCommonFlightMeta(bizRedisKey, normalizedFlightNo, arr);
|
||||
updateActiveBizKey(flightKey, normalizedBizKey, nowMs);
|
||||
if (inRunway != null && !inRunway.isBlank()){
|
||||
redisCache.setCacheMapValue(key, "inRunway", inRunway);
|
||||
setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "inRunway", inRunway);
|
||||
setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "inRunwayTs", String.valueOf(nowMs));
|
||||
}
|
||||
if (outRunway != null && !outRunway.isBlank()){
|
||||
redisCache.setCacheMapValue(key, "outRunway", outRunway);
|
||||
setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "outRunway", outRunway);
|
||||
setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "outRunwayTs", String.valueOf(nowMs));
|
||||
}
|
||||
log.info("成功将航班数据存储到Redis: flightNumber={}, type={}, time={}", arr[0], arr[1], arr[2]);
|
||||
log.info("成功将航班RUNWAY数据存储到Redis: flightNumber={}, bizKey={}, inRunway={}, outRunway={}",
|
||||
normalizedFlightNo, bizKey, inRunway, outRunway);
|
||||
} catch (Exception e) {
|
||||
log.error("存储航班数据到Redis失败: {}", e.getMessage());
|
||||
log.error("存储航班RUNWAY数据到Redis失败: {}", e.getMessage());
|
||||
}
|
||||
} else {
|
||||
log.warn("RUNWAY事件缺少可用航班号,跳过Redis写入: bizKey={}", bizKey);
|
||||
}
|
||||
return dto;
|
||||
}
|
||||
|
||||
//处理CRAFTSEAT事件
|
||||
private FlightNotificationDTO handleCRAFTSEAT(org.w3c.dom.Element root) {
|
||||
String flightNo = getTextContent(root, "BizKey");
|
||||
String bizKey = getTextContent(root, "BizKey");
|
||||
String code = getTextContent(root, "Code");
|
||||
long nowMs = System.currentTimeMillis();
|
||||
log.error("seat-{}",code);
|
||||
//分解BizKey
|
||||
String[] arr = (flightNo == null ? new String[0] : flightNo.split("-", 3)); // 最多切 3 段
|
||||
String[] arr = splitBizKey(bizKey); // 最多切 3 段
|
||||
String normalizedFlightNo = (arr.length >= 1) ? normalizeFlightNo(arr[0]) : null;
|
||||
String normalizedBizKey = normalizeBizKey(bizKey);
|
||||
String flightKey = buildFlightRedisKey(normalizedFlightNo);
|
||||
String bizRedisKey = buildBizRedisKey(normalizedBizKey);
|
||||
|
||||
FlightNotificationDTO dto = new FlightNotificationDTO();
|
||||
dto.setFlightNo(flightNo);
|
||||
// 存储到Redis
|
||||
if (arr.length >= 3) {
|
||||
try {
|
||||
String key = "flight:" + arr[0]; // 使用航班号作为键
|
||||
redisCache.setCacheMapValue(key, "flightNumber", arr[0]);
|
||||
redisCache.setCacheMapValue(key, "type", arr[1]);
|
||||
redisCache.setCacheMapValue(key, "time", arr[2]);
|
||||
if (code != null && !code.isBlank()) {
|
||||
redisCache.setCacheMapValue(key, "seat", code);
|
||||
}
|
||||
log.info("成功将航班数据存储到Redis: flightNumber={}, type={}, time={}", arr[0], arr[1], arr[2]);
|
||||
} catch (Exception e) {
|
||||
log.error("存储航班数据到Redis失败: {}", e.getMessage());
|
||||
dto.setFlightNo(normalizedFlightNo != null ? normalizedFlightNo : bizKey);
|
||||
String routeType = null;
|
||||
if (arr.length >= 2 && arr[1] != null) {
|
||||
String bizType = arr[1].trim().toUpperCase();
|
||||
if ("A".equals(bizType) || "ARR".equals(bizType) || "IN".equals(bizType)) {
|
||||
routeType = "IN";
|
||||
} else if ("D".equals(bizType) || "DEP".equals(bizType) || "OUT".equals(bizType)) {
|
||||
routeType = "OUT";
|
||||
}
|
||||
}
|
||||
dto.setType(routeType);
|
||||
// 存储到Redis
|
||||
if (flightKey != null || bizRedisKey != null) {
|
||||
try {
|
||||
setCommonFlightMeta(flightKey, normalizedFlightNo, arr);
|
||||
setCommonFlightMeta(bizRedisKey, normalizedFlightNo, arr);
|
||||
updateActiveBizKey(flightKey, normalizedBizKey, nowMs);
|
||||
if (code != null && !code.isBlank()) {
|
||||
setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "seat", code);
|
||||
setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "seatTs", String.valueOf(nowMs));
|
||||
// 出港路由参数主写startSeat,避免长期依赖seat兜底
|
||||
if ("OUT".equalsIgnoreCase(routeType)) {
|
||||
setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "startSeat", code);
|
||||
setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "startSeatTs", String.valueOf(nowMs));
|
||||
}
|
||||
}
|
||||
log.info("成功将航班CRAFTSEAT数据存储到Redis: flightNumber={}, bizKey={}, seat={}",
|
||||
normalizedFlightNo, bizKey, code);
|
||||
} catch (Exception e) {
|
||||
log.error("存储航班CRAFTSEAT数据到Redis失败: {}", e.getMessage());
|
||||
}
|
||||
} else {
|
||||
log.warn("CRAFTSEAT事件缺少可用航班号,跳过Redis写入: bizKey={}", bizKey);
|
||||
}
|
||||
return dto;
|
||||
}
|
||||
|
||||
private FlightNotificationDTO handleTISFLIGHT(org.w3c.dom.Element root) {
|
||||
String flightNumber = getTextContent(root, "FlNo");
|
||||
String bizKey = getTextContent(root, "BizKey");
|
||||
String fuId = getTextContentAny(root, "FuId", "FUID", "FuID");
|
||||
String flightIdRaw = getTextContentAny(root, "FlightId", "FlightID", "FlId", "FLID");
|
||||
String flightNumberRaw = getTextContent(root, "FlNo");
|
||||
String flightNumber = normalizeFlightNo(flightNumberRaw);
|
||||
String flightId = normalizeFlightNo(flightIdRaw);
|
||||
String contactCross = getTextContent(root, "ContactCross");
|
||||
String seat = getTextContentAny(root, "Seat", "SEAT");
|
||||
String type = getTextContent(root, "Type");
|
||||
long nowMs = System.currentTimeMillis();
|
||||
|
||||
String routeType = null;
|
||||
if (type != null) {
|
||||
String normalizedType = type.trim().toUpperCase();
|
||||
if ("ARR".equals(normalizedType) || "A".equals(normalizedType) || "IN".equals(normalizedType)) {
|
||||
routeType = "IN";
|
||||
} else if ("DEP".equals(normalizedType) || "D".equals(normalizedType) || "OUT".equals(normalizedType)) {
|
||||
routeType = "OUT";
|
||||
}
|
||||
}
|
||||
if (routeType == null && bizKey != null && !bizKey.isBlank()) {
|
||||
String[] arr = splitBizKey(bizKey);
|
||||
if (arr.length >= 2 && arr[1] != null) {
|
||||
String bizType = arr[1].trim().toUpperCase();
|
||||
if ("A".equals(bizType) || "ARR".equals(bizType) || "IN".equals(bizType)) {
|
||||
routeType = "IN";
|
||||
} else if ("D".equals(bizType) || "DEP".equals(bizType) || "OUT".equals(bizType)) {
|
||||
routeType = "OUT";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
String fuIdFlightNo = firstSegmentFromBizLike(fuId);
|
||||
String bizFlightNo = firstSegmentFromBizLike(bizKey);
|
||||
String resolvedFlightNo = flightNumber != null ? flightNumber
|
||||
: (flightId != null ? flightId
|
||||
: (fuIdFlightNo != null ? fuIdFlightNo : bizFlightNo));
|
||||
|
||||
FlightNotificationDTO dto = new FlightNotificationDTO();
|
||||
dto.setFlightNo(flightNumber);
|
||||
dto.setType("IN");
|
||||
dto.setFlightNo(resolvedFlightNo != null ? resolvedFlightNo : flightNumberRaw);
|
||||
dto.setType(routeType);
|
||||
if ("IN".equalsIgnoreCase(routeType)) {
|
||||
dto.setContactCross(contactCross);
|
||||
dto.setSeat(seat);
|
||||
}
|
||||
|
||||
String key = "flight:" + flightNumber;
|
||||
// TISFLIGHT 特殊:可能无 BizKey。无 BizKey 时先 FuId 关联,再按航班号回退。
|
||||
String normalizedBizKey = resolveTisFlightBizKey(bizKey, fuId, flightId, resolvedFlightNo);
|
||||
String flightKey = buildFlightRedisKey(resolvedFlightNo);
|
||||
String bizRedisKey = buildBizRedisKey(normalizedBizKey);
|
||||
boolean canUpdateRouteParams = normalizedBizKey != null;
|
||||
|
||||
try {
|
||||
redisCache.setCacheMapValue(key, "flightNumber", flightNumber);
|
||||
redisCache.setCacheMapValue(key, "contactCross",contactCross);
|
||||
} catch (Exception e) {
|
||||
log.error("存储航班数据到Redis失败: {}", e.getMessage());
|
||||
if (flightKey != null || bizRedisKey != null) {
|
||||
try {
|
||||
setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "flightNumber", resolvedFlightNo);
|
||||
updateActiveBizKey(flightKey, normalizedBizKey, nowMs);
|
||||
// contactCross 仅对进港有效,避免进出港参数串用
|
||||
if ("IN".equalsIgnoreCase(routeType)
|
||||
&& canUpdateRouteParams
|
||||
&& contactCross != null && !contactCross.isBlank()) {
|
||||
setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "contactCross", contactCross);
|
||||
setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "contactCrossTs", String.valueOf(nowMs));
|
||||
}
|
||||
// TISFLIGHT 也可提供 seat,作为 CRAFTSEAT 的补充来源
|
||||
if (canUpdateRouteParams
|
||||
&& seat != null && !seat.isBlank()) {
|
||||
setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "seat", seat);
|
||||
setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "seatTs", String.valueOf(nowMs));
|
||||
if ("OUT".equalsIgnoreCase(routeType)) {
|
||||
setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "startSeat", seat);
|
||||
setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "startSeatTs", String.valueOf(nowMs));
|
||||
}
|
||||
}
|
||||
if (!canUpdateRouteParams && (contactCross != null || seat != null)) {
|
||||
log.warn("TISFLIGHT未建立BizKey关联,跳过contactCross/seat覆盖: flNo={}, flightId={}, fuId={}, bizKey={}, resolvedFlightNo={}",
|
||||
flightNumber, flightId, fuId, bizKey, resolvedFlightNo);
|
||||
}
|
||||
log.info("TISFLIGHT航班号解析: flNo={}, flightId={}, fuId={}, bizKey={}, selectedBizKey={}, resolvedFlightNo={}, hasBizKey={}",
|
||||
flightNumber, flightId, fuId, bizKey, normalizedBizKey, resolvedFlightNo, normalizedBizKey != null);
|
||||
} catch (Exception e) {
|
||||
log.error("存储航班TISFLIGHT数据到Redis失败: {}", e.getMessage());
|
||||
}
|
||||
} else {
|
||||
log.warn("TISFLIGHT事件缺少可用航班号,跳过Redis写入: flNo={}, bizKey={}, fuId={}", flightNumberRaw, bizKey, fuId);
|
||||
}
|
||||
|
||||
return dto;
|
||||
@ -667,14 +1040,19 @@ public class AdxpFlightServiceWebSocketClient implements WebSocketHandler {
|
||||
String actualPushback = getTextContent(root, "ActualPushback");
|
||||
|
||||
// 分解 BizKey(最少保证不会抛异常)
|
||||
String[] arr = (bizKey == null ? new String[0] : bizKey.split("-", 3)); // 最多切 3 段
|
||||
String[] arr = splitBizKey(bizKey); // 最多切 3 段
|
||||
String flightNoForKey = (arr.length >= 1 && arr[0] != null && !arr[0].isBlank())
|
||||
? arr[0]
|
||||
: (flightNumber != null && !flightNumber.isBlank() ? flightNumber : bizKey);
|
||||
String normalizedFlightNoForKey = normalizeFlightNo(flightNoForKey);
|
||||
String normalizedBizKey = normalizeBizKey(bizKey);
|
||||
String flightKey = buildFlightRedisKey(normalizedFlightNoForKey);
|
||||
String bizRedisKey = buildBizRedisKey(normalizedBizKey);
|
||||
|
||||
FlightNotificationDTO dto = new FlightNotificationDTO();
|
||||
// 保持兼容:如果FlightNumber存在优先用它;否则回退到BizKey
|
||||
dto.setFlightNo(flightNumber != null && !flightNumber.isBlank() ? flightNumber : bizKey);
|
||||
dto.setFlightNo(normalizedFlightNoForKey != null ? normalizedFlightNoForKey
|
||||
: (flightNumber != null && !flightNumber.isBlank() ? flightNumber : bizKey));
|
||||
dto.setType("OUT");
|
||||
|
||||
// 解析推出时间(ActualPushback),用于“推出事件时间”展示/调试
|
||||
@ -689,34 +1067,27 @@ public class AdxpFlightServiceWebSocketClient implements WebSocketHandler {
|
||||
}
|
||||
|
||||
// 存储到Redis:保留原有字段,同时补充 actualPushback 便于运行环境调试
|
||||
if (flightNoForKey != null && !flightNoForKey.isBlank()) {
|
||||
if (flightKey != null || bizRedisKey != null) {
|
||||
try {
|
||||
String key = "flight:" + flightNoForKey.trim(); // 使用航班号作为键
|
||||
redisCache.setCacheMapValue(key, "flightNumber", flightNoForKey.trim());
|
||||
if (arr.length >= 2) {
|
||||
redisCache.setCacheMapValue(key, "type", arr[1]);
|
||||
}
|
||||
if (arr.length >= 3) {
|
||||
redisCache.setCacheMapValue(key, "time", arr[2]);
|
||||
}
|
||||
setCommonFlightMeta(flightKey, normalizedFlightNoForKey, arr);
|
||||
setCommonFlightMeta(bizRedisKey, normalizedFlightNoForKey, arr);
|
||||
updateActiveBizKey(flightKey, normalizedBizKey, System.currentTimeMillis());
|
||||
if (actualPushback != null && !actualPushback.isBlank()) {
|
||||
redisCache.setCacheMapValue(key, "actualPushback", actualPushback.trim());
|
||||
setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "actualPushback", actualPushback.trim());
|
||||
}
|
||||
if (dto.getTime() != null) {
|
||||
redisCache.setCacheMapValue(key, "pushbackTimeMs", String.valueOf(dto.getTime()));
|
||||
setValueOnFlightAndBizKeys(flightKey, bizRedisKey, "pushbackTimeMs", String.valueOf(dto.getTime()));
|
||||
}
|
||||
log.info("成功将航班推出数据存储到Redis: flightNumber={}, bizKey={}, actualPushback={}",
|
||||
flightNoForKey, bizKey, actualPushback);
|
||||
normalizedFlightNoForKey, bizKey, actualPushback);
|
||||
} catch (Exception e) {
|
||||
log.error("存储航班推出数据到Redis失败: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
// 保持原有行为:触发路由查询
|
||||
if (arr.length >= 1 && arr[0] != null && !arr[0].isBlank()) {
|
||||
dataProcessingService.AXOT(arr[0]);
|
||||
} else if (flightNumber != null && !flightNumber.isBlank()) {
|
||||
dataProcessingService.AXOT(flightNumber);
|
||||
if (normalizedFlightNoForKey != null) {
|
||||
dataProcessingService.AXOT(normalizedFlightNoForKey, normalizedBizKey);
|
||||
} else {
|
||||
log.warn("AXOT事件缺少可用航班号,跳过路由触发: bizKey={}, flightNumber={}", bizKey, flightNumber);
|
||||
}
|
||||
@ -724,20 +1095,47 @@ public class AdxpFlightServiceWebSocketClient implements WebSocketHandler {
|
||||
}
|
||||
|
||||
private FlightNotificationDTO handleDFDE(org.w3c.dom.Element root) {
|
||||
String flightNo = getTextContent(root, "BizKey");
|
||||
//分解BizKey
|
||||
String[] arr = flightNo.split("-", 3); // 最多切 3 段
|
||||
FlightNotificationDTO dto = new FlightNotificationDTO();
|
||||
if (arr[1].equals("A")){
|
||||
dto.setType("IN");
|
||||
}else {
|
||||
dto.setType("OUT");
|
||||
String bizKey = getTextContent(root, "BizKey");
|
||||
// 分解 BizKey(安全处理,避免 NPE)
|
||||
String[] arr = splitBizKey(bizKey);
|
||||
String normalizedFlightNo = (arr.length >= 1) ? normalizeFlightNo(arr[0]) : null;
|
||||
String normalizedBizKey = normalizeBizKey(bizKey);
|
||||
String key = buildFlightRedisKey(normalizedFlightNo);
|
||||
String bizRedisKey = buildBizRedisKey(normalizedBizKey);
|
||||
|
||||
// 原实现会直接删除整个 flight:<flightNo>,会导致“跑道/机位/联络道口”补齐过程被中断,路由参数更难凑齐。
|
||||
// 这里改为:标记 DFDE 已发生,并设置较短 TTL 以便及时回收,但不立刻清空参数。
|
||||
if (key != null || bizRedisKey != null) {
|
||||
try {
|
||||
if (key != null) {
|
||||
redisCache.setCacheMapValue(key, "dfde", "1");
|
||||
redisCache.setCacheMapValue(key, "dfdeTimeMs", String.valueOf(System.currentTimeMillis()));
|
||||
}
|
||||
if (bizRedisKey != null) {
|
||||
redisCache.setCacheMapValue(bizRedisKey, "dfde", "1");
|
||||
redisCache.setCacheMapValue(bizRedisKey, "dfdeTimeMs", String.valueOf(System.currentTimeMillis()));
|
||||
}
|
||||
|
||||
// 设置短 TTL(保守值),避免长期占用;同时避免“立刻删除”导致并发报文拼参失败
|
||||
if (key != null) {
|
||||
redisCache.expire(key, 2, TimeUnit.HOURS);
|
||||
}
|
||||
if (bizRedisKey != null) {
|
||||
redisCache.expire(bizRedisKey, 2, TimeUnit.HOURS);
|
||||
}
|
||||
|
||||
log.info("DFDE事件已标记并设置TTL: flightNo={}, bizKey={}, key={}", normalizedFlightNo, bizKey, key);
|
||||
} catch (Exception e) {
|
||||
log.error("处理DFDE事件写Redis失败: bizKey={}, err={}", bizKey, e.getMessage());
|
||||
}
|
||||
} else {
|
||||
log.warn("DFDE事件缺少可用航班号,跳过Redis处理: bizKey={}", bizKey);
|
||||
}
|
||||
String key = "flight:" + arr[0]; // 使用航班号作为键
|
||||
redisCache.deleteObject(key);
|
||||
|
||||
// DFDE 不作为航班通知下发(保持原逻辑返回 null)
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -1049,20 +1049,22 @@ public class DataProcessingService {
|
||||
*/
|
||||
private final Map<String, PendingRouteQuery> pendingRouteQueries = new ConcurrentHashMap<>();
|
||||
|
||||
private static final int ROUTE_RETRY_MAX_ATTEMPTS = 20;
|
||||
private static final int ROUTE_RETRY_MAX_ATTEMPTS = 2;
|
||||
private static final long ROUTE_RETRY_MIN_DELAY_MS = 1_000L; // 1s
|
||||
private static final long ROUTE_RETRY_MAX_DELAY_MS = 30_000L; // 30s
|
||||
private static final long ROUTE_RETRY_MAX_DELAY_MS = 5_000L; // 5s
|
||||
|
||||
private static class PendingRouteQuery {
|
||||
private final String flightNo;
|
||||
private final String routeType; // IN / OUT
|
||||
private final String bizKey;
|
||||
private long nextRetryAtMs;
|
||||
private int attempts;
|
||||
private String lastSource;
|
||||
|
||||
private PendingRouteQuery(String flightNo, String routeType, long nextRetryAtMs, int attempts, String lastSource) {
|
||||
private PendingRouteQuery(String flightNo, String routeType, String bizKey, long nextRetryAtMs, int attempts, String lastSource) {
|
||||
this.flightNo = flightNo;
|
||||
this.routeType = routeType;
|
||||
this.bizKey = bizKey;
|
||||
this.nextRetryAtMs = nextRetryAtMs;
|
||||
this.attempts = attempts;
|
||||
this.lastSource = lastSource;
|
||||
@ -1086,7 +1088,7 @@ public class DataProcessingService {
|
||||
String routeType = flightNotification.getType().name();
|
||||
|
||||
// 检查是否已获取过该航班路由(按 flightNo + routeType 去重)
|
||||
if (isRouteAlreadyRetrieved(flightNo, routeType)) {
|
||||
if (isRouteAlreadyRetrieved(flightNo, routeType, null)) {
|
||||
log.info("航班路由已获取过,跳过重复查询: 航班号={}, 类型={}", flightNo, routeType);
|
||||
return;
|
||||
}
|
||||
@ -1094,10 +1096,10 @@ public class DataProcessingService {
|
||||
log.info("航班通知触发路由查询(仅Redis拼参): 航班号={}, 类型={}, eventTime={}",
|
||||
flightNo, routeType, flightNotification.getEventTime());
|
||||
|
||||
boolean success = tryQueryAndPublishRouteFromRedis(flightNo, routeType, "FLIGHT_NOTIFICATION");
|
||||
boolean success = tryQueryAndPublishRouteFromRedis(flightNo, routeType, null, "FLIGHT_NOTIFICATION");
|
||||
if (!success) {
|
||||
// Redis 字段未齐全或路由接口暂不可用:进入重试队列,下一轮再试
|
||||
schedulePendingRouteQuery(flightNo, routeType, "FLIGHT_NOTIFICATION");
|
||||
schedulePendingRouteQuery(flightNo, routeType, null, "FLIGHT_NOTIFICATION");
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
@ -1109,20 +1111,30 @@ public class DataProcessingService {
|
||||
/**
|
||||
* 检查指定航班事件的路由是否已经获取过
|
||||
*/
|
||||
private boolean isRouteAlreadyRetrieved(String flightNo, String routeType) {
|
||||
String cacheKey = flightNo + ":" + routeType;
|
||||
private boolean isRouteAlreadyRetrieved(String flightNo, String routeType, String bizKey) {
|
||||
String cacheKey = buildRouteRetrievalCacheKey(flightNo, routeType, bizKey);
|
||||
return routeRetrievalCache.containsKey(cacheKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* 标记指定航班事件的路由已获取
|
||||
*/
|
||||
private void markRouteAsRetrieved(String flightNo, String routeType) {
|
||||
String cacheKey = flightNo + ":" + routeType;
|
||||
private void markRouteAsRetrieved(String flightNo, String routeType, String bizKey) {
|
||||
String cacheKey = buildRouteRetrievalCacheKey(flightNo, routeType, bizKey);
|
||||
routeRetrievalCache.put(cacheKey, System.currentTimeMillis());
|
||||
log.debug("标记路由已获取: cacheKey={}, 当前缓存数量={}", cacheKey, routeRetrievalCache.size());
|
||||
}
|
||||
|
||||
private String buildRouteRetrievalCacheKey(String flightNo, String routeType, String bizKey) {
|
||||
String normalizedFlightNo = normalizeFlightNo(flightNo);
|
||||
String normalizedRouteType = routeType == null ? "" : routeType.trim().toUpperCase();
|
||||
String normalizedBizKey = normalizeBizKey(bizKey);
|
||||
if (normalizedBizKey == null) {
|
||||
return normalizedFlightNo + ":" + normalizedRouteType;
|
||||
}
|
||||
return normalizedFlightNo + ":" + normalizedRouteType + ":" + normalizedBizKey;
|
||||
}
|
||||
|
||||
/**
|
||||
* 从 Redis 拼装路由参数并查询路由,成功后发布路由更新事件。
|
||||
*
|
||||
@ -1132,23 +1144,33 @@ public class DataProcessingService {
|
||||
*
|
||||
* @return true=成功获取并发布;false=参数未齐全或路由接口失败(可进入重试)
|
||||
*/
|
||||
private boolean tryQueryAndPublishRouteFromRedis(String flightNo, String routeType, String eventSource) {
|
||||
private boolean tryQueryAndPublishRouteFromRedis(String flightNo, String routeType, String bizKey, String eventSource) {
|
||||
try {
|
||||
if (flightNo == null || flightNo.isBlank() || routeType == null || routeType.isBlank()) {
|
||||
return false;
|
||||
}
|
||||
String normalizedFlightNo = flightNo.contains("-") ? flightNo.split("-", 2)[0] : flightNo;
|
||||
String normalizedFlightNo = normalizeFlightNo(flightNo);
|
||||
String normalizedRouteType = routeType.trim().toUpperCase();
|
||||
String key = "flight:" + normalizedFlightNo.trim();
|
||||
if (normalizedFlightNo == null || normalizedFlightNo.isBlank()) {
|
||||
return false;
|
||||
}
|
||||
String flightKey = "flight:" + normalizedFlightNo.trim();
|
||||
String normalizedBizKey = normalizeBizKey(bizKey);
|
||||
if (normalizedBizKey == null) {
|
||||
normalizedBizKey = normalizeRedisString(redisCache.getCacheMapValue(flightKey, "activeBizKey"));
|
||||
normalizedBizKey = normalizeBizKey(normalizedBizKey);
|
||||
}
|
||||
String bizRedisKey = buildBizRedisKey(normalizedBizKey);
|
||||
|
||||
// 从 Redis 获取参数快照
|
||||
// 注意:RedisCache 可能会把字符串序列化为带引号的 JSON 字符串(例如 "35"),甚至返回非 String 类型。
|
||||
// 这里统一做归一化,避免 ClassCastException,并保证 queryParam 不携带多余引号。
|
||||
String inRunway = normalizeRedisString(redisCache.getCacheMapValue(key, "inRunway"));
|
||||
String outRunway = normalizeRedisString(redisCache.getCacheMapValue(key, "outRunway"));
|
||||
String contactCross = normalizeRedisString(redisCache.getCacheMapValue(key, "contactCross"));
|
||||
String seat = normalizeRedisString(redisCache.getCacheMapValue(key, "seat"));
|
||||
String startSeat = normalizeRedisString(redisCache.getCacheMapValue(key, "startSeat"));
|
||||
String inRunway = readRedisParamBySelectedKey(bizRedisKey, flightKey, "inRunway");
|
||||
String outRunway = readRedisParamBySelectedKey(bizRedisKey, flightKey, "outRunway");
|
||||
String contactCross = readRedisParamBySelectedKey(bizRedisKey, flightKey, "contactCross");
|
||||
String seat = readRedisParamBySelectedKey(bizRedisKey, flightKey, "seat");
|
||||
String startSeat = readRedisParamBySelectedKey(bizRedisKey, flightKey, "startSeat");
|
||||
Long seatTs = readRedisParamLongBySelectedKey(bizRedisKey, flightKey, "seatTs");
|
||||
|
||||
// OUT:允许用 seat 兜底 startSeat(机位推出)
|
||||
if ("OUT".equalsIgnoreCase(normalizedRouteType) && (startSeat == null || startSeat.isBlank())) {
|
||||
@ -1157,22 +1179,6 @@ public class DataProcessingService {
|
||||
}
|
||||
}
|
||||
|
||||
// 跑道补齐策略(按你的要求):
|
||||
// - OUT 缺少 inRunway:使用固定值 "34" 补齐
|
||||
// - IN 缺少 outRunway:使用固定值 "34" 补齐
|
||||
// 同时记录是否“补齐”,用于下发给前端联调核对
|
||||
final String PATCH_RUNWAY_VALUE = "34";
|
||||
boolean inRunwayPatched = false;
|
||||
boolean outRunwayPatched = false;
|
||||
if ("OUT".equalsIgnoreCase(normalizedRouteType) && (inRunway == null || inRunway.isBlank())) {
|
||||
inRunway = PATCH_RUNWAY_VALUE;
|
||||
inRunwayPatched = true;
|
||||
}
|
||||
if ("IN".equalsIgnoreCase(normalizedRouteType) && (outRunway == null || outRunway.isBlank())) {
|
||||
outRunway = PATCH_RUNWAY_VALUE;
|
||||
outRunwayPatched = true;
|
||||
}
|
||||
|
||||
// 参数校验(字段不全:返回 false,交给重试机制)
|
||||
if ("IN".equalsIgnoreCase(normalizedRouteType)) {
|
||||
// IN:按“四个参数”拼参:inRunway/outRunway/contactCross/seat
|
||||
@ -1187,9 +1193,8 @@ public class DataProcessingService {
|
||||
return false;
|
||||
}
|
||||
if (contactCross == null || contactCross.isBlank() || seat == null || seat.isBlank()) {
|
||||
// 打印当前已拼装参数,便于定位到底缺了哪些字段以及跑道是否为补齐值
|
||||
log.info("进港路由Redis参数未齐全,等待补齐后重试: flightNo={}, inRunway={} (patched={}), outRunway={} (patched={}), contactCross={}, seat={}",
|
||||
normalizedFlightNo, inRunway, inRunwayPatched, outRunway, outRunwayPatched, contactCross, seat);
|
||||
log.info("进港路由Redis参数未齐全,等待补齐后重试: flightNo={}, inRunway={}, outRunway={}, contactCross={}, seat={}",
|
||||
normalizedFlightNo, inRunway, outRunway, contactCross, seat);
|
||||
return false;
|
||||
}
|
||||
} else if ("OUT".equalsIgnoreCase(normalizedRouteType)) {
|
||||
@ -1205,8 +1210,8 @@ public class DataProcessingService {
|
||||
return false;
|
||||
}
|
||||
if (startSeat == null || startSeat.isBlank()) {
|
||||
log.info("出港路由Redis参数未齐全,等待补齐后重试: flightNo={}, inRunway={} (patched={}), outRunway={} (patched={}), startSeat={}, seatAsFallback={}",
|
||||
normalizedFlightNo, inRunway, inRunwayPatched, outRunway, outRunwayPatched, startSeat, seat);
|
||||
log.info("出港路由Redis参数未齐全,等待补齐后重试: flightNo={}, inRunway={}, outRunway={}, startSeat={}, seatAsFallback={}",
|
||||
normalizedFlightNo, inRunway, outRunway, startSeat, seat);
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
@ -1227,8 +1232,6 @@ public class DataProcessingService {
|
||||
routeParams.setSeat(seat);
|
||||
routeParams.setStartSeat(startSeat);
|
||||
routeParams.setTimestamp(System.currentTimeMillis());
|
||||
routeParams.setInRunwayPatched(inRunwayPatched);
|
||||
routeParams.setOutRunwayPatched(outRunwayPatched);
|
||||
|
||||
if ("IN".equalsIgnoreCase(normalizedRouteType)) {
|
||||
routeData = dataCollectorDao.getArrivalRoute(inRunway, outRunway, contactCross, seat);
|
||||
@ -1237,10 +1240,9 @@ public class DataProcessingService {
|
||||
}
|
||||
|
||||
if (routeData == null) {
|
||||
// 路由服务返回空体/无法解析时打印本次请求参数(含补齐标记),便于容器日志直接定位
|
||||
log.info("路由接口返回为空,稍后重试: flightNo={}, routeType={}, source={}, inRunway={} (patched={}), outRunway={} (patched={}), contactCross={}, seat={}, startSeat={}",
|
||||
log.info("路由接口返回为空,稍后重试: flightNo={}, routeType={}, source={}, inRunway={}, outRunway={}, contactCross={}, seat={}, startSeat={}",
|
||||
normalizedFlightNo, normalizedRouteType, eventSource,
|
||||
inRunway, inRunwayPatched, outRunway, outRunwayPatched, contactCross, seat, startSeat);
|
||||
inRunway, outRunway, contactCross, seat, startSeat);
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -1257,11 +1259,11 @@ public class DataProcessingService {
|
||||
publishAircraftRouteUpdateEvent(normalizedFlightNo, aircraftRoute, routeParams, eventSource);
|
||||
|
||||
// 标记已获取,避免两种触发方式重复下发
|
||||
markRouteAsRetrieved(normalizedFlightNo, normalizedRouteType);
|
||||
pendingRouteQueries.remove(normalizedFlightNo + ":" + normalizedRouteType);
|
||||
markRouteAsRetrieved(normalizedFlightNo, normalizedRouteType, normalizedBizKey);
|
||||
pendingRouteQueries.remove(buildRouteRetrievalCacheKey(normalizedFlightNo, normalizedRouteType, normalizedBizKey));
|
||||
|
||||
log.info("路由查询与下发成功: flightNo={}, routeType={}, source={}",
|
||||
normalizedFlightNo, normalizedRouteType, eventSource);
|
||||
log.info("路由查询与下发成功: flightNo={}, routeType={}, bizKey={}, source={}",
|
||||
normalizedFlightNo, normalizedRouteType, normalizedBizKey, eventSource);
|
||||
return true;
|
||||
|
||||
} catch (Exception e) {
|
||||
@ -1298,21 +1300,81 @@ public class DataProcessingService {
|
||||
return s.isEmpty() ? null : s;
|
||||
}
|
||||
|
||||
private void schedulePendingRouteQuery(String flightNo, String routeType, String source) {
|
||||
private static String normalizeFlightNo(String raw) {
|
||||
String normalized = normalizeRedisString(raw);
|
||||
if (normalized == null) {
|
||||
return null;
|
||||
}
|
||||
int dash = normalized.indexOf('-');
|
||||
if (dash > 0) {
|
||||
normalized = normalized.substring(0, dash).trim();
|
||||
}
|
||||
if (normalized.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
return normalized.toUpperCase();
|
||||
}
|
||||
|
||||
private static String normalizeBizKey(String raw) {
|
||||
String normalized = normalizeRedisString(raw);
|
||||
if (normalized == null) {
|
||||
return null;
|
||||
}
|
||||
return normalized.toUpperCase();
|
||||
}
|
||||
|
||||
private static String buildBizRedisKey(String bizKey) {
|
||||
String normalized = normalizeBizKey(bizKey);
|
||||
if (normalized == null) {
|
||||
return null;
|
||||
}
|
||||
return "flightBiz:" + normalized;
|
||||
}
|
||||
|
||||
private String readRedisParamBySelectedKey(String bizRedisKey, String flightKey, String field) {
|
||||
String sourceKey = bizRedisKey != null ? bizRedisKey : flightKey;
|
||||
if (sourceKey == null) {
|
||||
return null;
|
||||
}
|
||||
return normalizeRedisString(redisCache.getCacheMapValue(sourceKey, field));
|
||||
}
|
||||
|
||||
private Long readRedisParamLongBySelectedKey(String bizRedisKey, String flightKey, String field) {
|
||||
String sourceKey = bizRedisKey != null ? bizRedisKey : flightKey;
|
||||
if (sourceKey == null) {
|
||||
return null;
|
||||
}
|
||||
return parseRedisLong(redisCache.getCacheMapValue(sourceKey, field));
|
||||
}
|
||||
|
||||
private static Long parseRedisLong(Object v) {
|
||||
String s = normalizeRedisString(v);
|
||||
if (s == null) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
return Long.parseLong(s);
|
||||
} catch (Exception e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private void schedulePendingRouteQuery(String flightNo, String routeType, String bizKey, String source) {
|
||||
try {
|
||||
if (flightNo == null || flightNo.isBlank() || routeType == null || routeType.isBlank()) {
|
||||
return;
|
||||
}
|
||||
String normalizedFlightNo = flightNo.contains("-") ? flightNo.split("-", 2)[0] : flightNo;
|
||||
String normalizedFlightNo = normalizeFlightNo(flightNo);
|
||||
String normalizedRouteType = routeType.trim().toUpperCase();
|
||||
String key = normalizedFlightNo + ":" + normalizedRouteType;
|
||||
String normalizedBizKey = normalizeBizKey(bizKey);
|
||||
String key = buildRouteRetrievalCacheKey(normalizedFlightNo, normalizedRouteType, normalizedBizKey);
|
||||
|
||||
PendingRouteQuery existing = pendingRouteQueries.get(key);
|
||||
long now = System.currentTimeMillis();
|
||||
|
||||
if (existing == null) {
|
||||
long next = now + ROUTE_RETRY_MIN_DELAY_MS;
|
||||
pendingRouteQueries.put(key, new PendingRouteQuery(normalizedFlightNo, normalizedRouteType, next, 0, source));
|
||||
pendingRouteQueries.put(key, new PendingRouteQuery(normalizedFlightNo, normalizedRouteType, normalizedBizKey, next, 0, source));
|
||||
log.debug("新增待重试路由查询: key={}, nextRetryAtMs={}", key, next);
|
||||
return;
|
||||
}
|
||||
@ -1342,10 +1404,10 @@ public class DataProcessingService {
|
||||
continue;
|
||||
}
|
||||
|
||||
String key = pending.flightNo + ":" + pending.routeType;
|
||||
String key = buildRouteRetrievalCacheKey(pending.flightNo, pending.routeType, pending.bizKey);
|
||||
|
||||
// 已成功则无需重试
|
||||
if (isRouteAlreadyRetrieved(pending.flightNo, pending.routeType)) {
|
||||
if (isRouteAlreadyRetrieved(pending.flightNo, pending.routeType, pending.bizKey)) {
|
||||
pendingRouteQueries.remove(key);
|
||||
continue;
|
||||
}
|
||||
@ -1358,7 +1420,7 @@ public class DataProcessingService {
|
||||
continue;
|
||||
}
|
||||
|
||||
boolean ok = tryQueryAndPublishRouteFromRedis(pending.flightNo, pending.routeType, "RETRY(" + pending.lastSource + ")");
|
||||
boolean ok = tryQueryAndPublishRouteFromRedis(pending.flightNo, pending.routeType, pending.bizKey, "RETRY(" + pending.lastSource + ")");
|
||||
if (ok) {
|
||||
pendingRouteQueries.remove(key);
|
||||
continue;
|
||||
@ -1527,8 +1589,6 @@ public class DataProcessingService {
|
||||
.routeGeometry(route.getGeometry() != null ? route.getGeometry().toText() : null)
|
||||
.inRunway(routeParams.getInRunway())
|
||||
.outRunway(routeParams.getOutRunway())
|
||||
.inRunwayPatched(routeParams.getInRunwayPatched())
|
||||
.outRunwayPatched(routeParams.getOutRunwayPatched())
|
||||
.timestamp(System.currentTimeMillis())
|
||||
.eventSource(eventSource) // 标识事件来源
|
||||
.build();
|
||||
@ -1548,24 +1608,32 @@ public class DataProcessingService {
|
||||
* 触发查询航班号对应的路由
|
||||
*/
|
||||
public void ARR(String flightNo){
|
||||
ARR(flightNo, null);
|
||||
}
|
||||
|
||||
public void ARR(String flightNo, String bizKey){
|
||||
// 两种触发方式均启用:WS 收到 ARR 立即触发(参数只从 Redis 拼装)
|
||||
if (isRouteAlreadyRetrieved(flightNo, "IN")) {
|
||||
if (isRouteAlreadyRetrieved(flightNo, "IN", bizKey)) {
|
||||
return;
|
||||
}
|
||||
boolean ok = tryQueryAndPublishRouteFromRedis(flightNo, "IN", "ADXP_WS_ARR");
|
||||
boolean ok = tryQueryAndPublishRouteFromRedis(flightNo, "IN", bizKey, "ADXP_WS_ARR");
|
||||
if (!ok) {
|
||||
schedulePendingRouteQuery(flightNo, "IN", "ADXP_WS_ARR");
|
||||
schedulePendingRouteQuery(flightNo, "IN", bizKey, "ADXP_WS_ARR");
|
||||
}
|
||||
}
|
||||
|
||||
public void AXOT(String flightNo){
|
||||
AXOT(flightNo, null);
|
||||
}
|
||||
|
||||
public void AXOT(String flightNo, String bizKey){
|
||||
// 两种触发方式均启用:WS 收到 AXOT 立即触发(参数只从 Redis 拼装)
|
||||
if (isRouteAlreadyRetrieved(flightNo, "OUT")) {
|
||||
if (isRouteAlreadyRetrieved(flightNo, "OUT", bizKey)) {
|
||||
return;
|
||||
}
|
||||
boolean ok = tryQueryAndPublishRouteFromRedis(flightNo, "OUT", "ADXP_WS_AXOT");
|
||||
boolean ok = tryQueryAndPublishRouteFromRedis(flightNo, "OUT", bizKey, "ADXP_WS_AXOT");
|
||||
if (!ok) {
|
||||
schedulePendingRouteQuery(flightNo, "OUT", "ADXP_WS_AXOT");
|
||||
schedulePendingRouteQuery(flightNo, "OUT", bizKey, "ADXP_WS_AXOT");
|
||||
}
|
||||
}
|
||||
|
||||
@ -1593,4 +1661,4 @@ public class DataProcessingService {
|
||||
log.warn("⚠️ 未获取到路由数据: flightNo={}", flightNo);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
26
命令.md
26
命令.md
@ -268,3 +268,29 @@ curl -sS -i -H "Authorization: $TOKEN" \
|
||||
|
||||
```
|
||||
|
||||
```
|
||||
#重启流程
|
||||
root@root:~# cd /home/project_20250804/qaup/
|
||||
root@root:/home/project_20250804/qaup# ls
|
||||
adxp-adapter Dockerfile mock.sh qaup-app-bak0120.jar qaup-app.jar_back20251121 qaup-app.jar_back20260124 qaup-app.jar_bak20251021 qaup-app.jar_bak20251211 tools
|
||||
app.env emqx mock_traffic_light_client.py qaup-app.jar qaup-app.jar_back20260122 qaup-app.jar_back202601241 qaup-app.jar_bak20251022 qaup-app.jar_bak20260205 unhumanVechile
|
||||
build.sh logs qaup-admin.jar qaup-app.jar0 qaup-app.jar_back20260123 qaup-app.jar_back202601242 qaup-app.jar_bak20251113 qaup-app.jar_bal20250925 websocket
|
||||
root@root:/home/project_20250804/qaup# rm qaup-app.jar
|
||||
root@root:/home/project_20250804/qaup# mv qaup-admin.jar qaup-app.jar
|
||||
root@root:/home/project_20250804/qaup# ls
|
||||
adxp-adapter Dockerfile mock.sh qaup-app.jar qaup-app.jar_back20260122 qaup-app.jar_back202601241 qaup-app.jar_bak20251022 qaup-app.jar_bak20260205 unhumanVechile
|
||||
app.env emqx mock_traffic_light_client.py qaup-app.jar0 qaup-app.jar_back20260123 qaup-app.jar_back202601242 qaup-app.jar_bak20251113 qaup-app.jar_bal20250925 websocket
|
||||
build.sh logs qaup-app-bak0120.jar qaup-app.jar_back20251121 qaup-app.jar_back20260124 qaup-app.jar_bak20251021 qaup-app.jar_bak20251211 tools
|
||||
root@root:/home/project_20250804/qaup# docker exec -it qaup-app sh -lc 'sha256sum /app.jar'
|
||||
0cd2cc99809e04ca618a0f179bd06fd258876a0d401c9630619da20d8ce3d4c1 /app.jar
|
||||
root@root:/home/project_20250804/qaup# sha256sum qaup-app.jar
|
||||
0856bcf6da8d231b1035e3105d86f690dd0ec61ead921bee6d6ad5595f1cd7e7 qaup-app.jar
|
||||
root@root:/home/project_20250804/qaup# docker exec -it qaup-app rm /app.jar
|
||||
root@root:/home/project_20250804/qaup# docker cp qaup-app.jar qaup-app:/app.jar
|
||||
root@root:/home/project_20250804/qaup# docker restart qaup-app
|
||||
qaup-app
|
||||
root@root:/home/project_20250804/qaup# curl -s -X POST http://10.64.58.228:8086/api/adxp/reconnect
|
||||
{"connected":true,"success":true,"message":"重连成功"}root@root:/home/project_20250804/qaup#
|
||||
root@root:/home/project_20250804/qaup#
|
||||
root@root:/home/project_20250804/qaup#
|
||||
```
|
||||
|
||||
Loading…
Reference in New Issue
Block a user