diff --git a/qaup-collision/src/main/java/com/qaup/collision/websocket/broadcaster/WebSocketMessageBroadcaster.java b/qaup-collision/src/main/java/com/qaup/collision/websocket/broadcaster/WebSocketMessageBroadcaster.java index 9cfc4a7..c9a92f3 100644 --- a/qaup-collision/src/main/java/com/qaup/collision/websocket/broadcaster/WebSocketMessageBroadcaster.java +++ b/qaup-collision/src/main/java/com/qaup/collision/websocket/broadcaster/WebSocketMessageBroadcaster.java @@ -1,34 +1,35 @@ package com.qaup.collision.websocket.broadcaster; +import com.fasterxml.jackson.databind.ObjectMapper; import com.qaup.collision.websocket.cache.MessageCacheService; import com.qaup.collision.websocket.event.CollisionWarningEvent; +import com.qaup.collision.websocket.event.GeofenceAlertWebSocketEvent; +import com.qaup.collision.websocket.event.PathConflictAlertWebSocketEvent; import com.qaup.collision.websocket.event.PositionUpdateEvent; import com.qaup.collision.websocket.event.RuleExecutionStatusWebSocketEvent; import com.qaup.collision.websocket.event.RuleStateChangeWebSocketEvent; import com.qaup.collision.websocket.event.RuleViolationWebSocketEvent; -import com.qaup.collision.websocket.event.GeofenceAlertWebSocketEvent; -import com.qaup.collision.websocket.event.PathConflictAlertWebSocketEvent; import com.qaup.collision.websocket.event.VehicleCommandEvent; import com.qaup.collision.websocket.event.VehicleStatusUpdateEvent; +import com.qaup.collision.websocket.handler.CollisionWebSocketHandler; import com.qaup.collision.websocket.message.CollisionWarningPayload; -import com.qaup.collision.websocket.message.MessageTypeConstants; import com.qaup.collision.websocket.message.GeofenceAlertPayload; +import com.qaup.collision.websocket.message.MessageTypeConstants; +import com.qaup.collision.websocket.message.PathConflictAlertMessage; import com.qaup.collision.websocket.message.PositionUpdatePayload; import com.qaup.collision.websocket.message.RuleExecutionStatusPayload; import com.qaup.collision.websocket.message.RuleStateChangePayload; import com.qaup.collision.websocket.message.RuleViolationPayload; +import com.qaup.collision.websocket.message.UniversalMessage; import com.qaup.collision.websocket.message.VehicleCommandPayload; import com.qaup.collision.websocket.message.VehicleStatusUpdatePayload; -import com.qaup.collision.websocket.message.UniversalMessage; -import com.qaup.collision.websocket.message.PathConflictAlertMessage; -import com.qaup.collision.websocket.handler.CollisionWebSocketHandler; - -import org.springframework.stereotype.Component; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.event.EventListener; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Async; -import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; @@ -36,65 +37,92 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; -import lombok.extern.slf4j.Slf4j; -/** - * WebSocket统一消息广播器 - * 监听所有WebSocket事件,使用原生WebSocket统一推送到控制台前端 - */ @Slf4j @Component public class WebSocketMessageBroadcaster { - - private final MessageCacheService messageCacheService; - private final CollisionWebSocketHandler collisionWebSocketHandler; // 注入实例 - private final ObjectMapper objectMapper; // JSON序列化器 - /** - * 高频位置更新做“覆盖式合并”,只保留每个objectId的最新一条,避免消息堆积。 - * flush线程如果跑不过来,下一轮会直接跳过(丢旧保实时)。 - */ + private static final int REACQUIRE_AFTER_REJECTS = 3; + private static final double EARTH_RADIUS_METERS = 6371000.0; + + private final MessageCacheService messageCacheService; + private final CollisionWebSocketHandler collisionWebSocketHandler; + private final ObjectMapper objectMapper; + private final Map latestPositionUpdates = new ConcurrentHashMap<>(); + private final Map positionTrackStates = new ConcurrentHashMap<>(); private final AtomicBoolean positionFlushInProgress = new AtomicBoolean(false); + @Value("${websocket.position.filter.ema-alpha:0.45}") + private double positionEmaAlpha; + + @Value("${websocket.position.filter.jitter-meter:2.5}") + private double jitterMeterThreshold; + + @Value("${websocket.position.filter.max-speed-mps-aircraft:150.0}") + private double maxSpeedMpsAircraft; + + @Value("${websocket.position.filter.max-speed-mps-vehicle:35.0}") + private double maxSpeedMpsVehicle; + + @Value("${websocket.position.filter.max-speed-mps-unmanned:15.0}") + private double maxSpeedMpsUnmanned; + + @Value("${websocket.position.filter.max-jump-margin-meter:20.0}") + private double maxJumpMarginMeter; + + @Value("${websocket.position.filter.state-ttl-ms:120000}") + private long positionTrackStateTtlMs; + private record CoalescedPositionUpdate(PositionUpdatePayload payload, long timestamp) {} - public WebSocketMessageBroadcaster(MessageCacheService messageCacheService, CollisionWebSocketHandler collisionWebSocketHandler, @Qualifier("websocketObjectMapper") ObjectMapper objectMapper) { + private static final class PositionTrackState { + private boolean initialized; + private long lastPayloadTimestampMs; + private long lastAcceptedAtMs; + private double lastRawLatitude; + private double lastRawLongitude; + private double lastFilteredLatitude; + private double lastFilteredLongitude; + private int rejectedCount; + } + + public WebSocketMessageBroadcaster( + MessageCacheService messageCacheService, + CollisionWebSocketHandler collisionWebSocketHandler, + @Qualifier("websocketObjectMapper") ObjectMapper objectMapper) { this.messageCacheService = messageCacheService; this.collisionWebSocketHandler = collisionWebSocketHandler; this.objectMapper = objectMapper; } - - /** - * 处理位置更新事件 - * @param event 位置更新事件 - */ + @EventListener public void handlePositionUpdate(PositionUpdateEvent event) { try { if (event == null || event.getPayload() == null) { return; } - if (!(event.getPayload() instanceof PositionUpdatePayload payload)) { return; } - if (payload.getObjectId() == null || payload.getObjectId().isBlank()) { return; } - // 覆盖式合并:同一对象只保留最新的 - latestPositionUpdates.put(payload.getObjectId(), new CoalescedPositionUpdate(payload, event.getTimestamp())); + PositionUpdatePayload sanitizedPayload = sanitizePositionPayload(payload); + if (sanitizedPayload == null) { + return; + } + + latestPositionUpdates.put( + sanitizedPayload.getObjectId(), + new CoalescedPositionUpdate(sanitizedPayload, event.getTimestamp()) + ); } catch (Exception e) { - log.error("合并位置更新事件失败", e); + log.error("Failed to merge position update", e); } } - /** - * 定期刷新合并后的高频位置消息。 - * 丢旧保实时:如果上一轮flush未完成,本轮直接跳过。 - */ @Scheduled(fixedDelayString = "${websocket.position.flush-interval-ms:250}") public void flushCoalescedPositionUpdates() { if (!positionFlushInProgress.compareAndSet(false, true)) { @@ -106,7 +134,6 @@ public class WebSocketMessageBroadcaster { return; } - // 没有客户端时直接清空,避免无意义积压 if (collisionWebSocketHandler.getConnectionCount() <= 0) { latestPositionUpdates.clear(); return; @@ -125,48 +152,43 @@ public class WebSocketMessageBroadcaster { broadcastMessageInternal(message); } } catch (Exception e) { - log.error("刷新合并位置更新失败", e); + log.error("Failed to flush coalesced position updates", e); } finally { positionFlushInProgress.set(false); } } - /** - * 处理车辆状态更新事件 - * 符合universal_autonomous_vehicle_api规范 - */ + @Scheduled(fixedDelayString = "${websocket.position.filter.cleanup-interval-ms:60000}") + public void cleanupPositionTrackStates() { + long now = System.currentTimeMillis(); + positionTrackStates.entrySet().removeIf(entry -> now - entry.getValue().lastAcceptedAtMs > positionTrackStateTtlMs); + } + @EventListener @Async public void handleVehicleStatusUpdate(VehicleStatusUpdateEvent event) { try { if (event.getPayload() == null) { - log.warn("车辆状态更新事件负载为空,跳过处理"); + log.warn("Vehicle status update payload is null"); return; } - + VehicleStatusUpdatePayload payload = event.getPayload(); - UniversalMessage message = UniversalMessage.builder() - .type(MessageTypeConstants.VEHICLE_STATUS_UPDATE) - .payload(payload) - .timestamp(event.getTimestamp()) - .messageId(generateMessageId()) - .build(); - + .type(MessageTypeConstants.VEHICLE_STATUS_UPDATE) + .payload(payload) + .timestamp(event.getTimestamp()) + .messageId(generateMessageId()) + .build(); + broadcastMessage(message); - - log.debug("发送车辆状态更新WebSocket消息: vehicleId={}, 消息ID={}", - payload.getVehicleId(), message.getMessageId()); - + log.debug("Vehicle status message published: vehicleId={}, messageId={}", + payload.getVehicleId(), message.getMessageId()); } catch (Exception e) { - log.error("处理车辆状态更新事件异常", e); + log.error("Failed to handle vehicle status update", e); } } - - /** - * 处理车辆指令事件 - * @param event 车辆指令事件 - */ + @EventListener public void handleVehicleCommand(VehicleCommandEvent event) { UniversalMessage message = UniversalMessage.builder() @@ -175,15 +197,10 @@ public class WebSocketMessageBroadcaster { .messageId(generateMessageId()) .payload((VehicleCommandPayload) event.getPayload()) .build(); - + broadcastMessageInternal(message); } - - - /** - * 处理碰撞预警事件 - * @param event 碰撞预警事件 - */ + @EventListener public void handleCollisionWarning(CollisionWarningEvent event) { UniversalMessage message = UniversalMessage.builder() @@ -192,16 +209,10 @@ public class WebSocketMessageBroadcaster { .messageId(generateMessageId()) .payload((CollisionWarningPayload) event.getPayload()) .build(); - + broadcastMessageInternal(message); } - - // === 规则事件处理方法 === - - /** - * 处理规则违规事件 - * @param event 规则违规事件 - */ + @EventListener public void handleRuleViolation(RuleViolationWebSocketEvent event) { UniversalMessage message = UniversalMessage.builder() @@ -210,14 +221,10 @@ public class WebSocketMessageBroadcaster { .messageId(generateMessageId()) .payload((RuleViolationPayload) event.getPayload()) .build(); - + broadcastMessageInternal(message); } - - /** - * 处理规则执行状态事件 - * @param event 规则执行状态事件 - */ + @EventListener public void handleRuleExecutionStatus(RuleExecutionStatusWebSocketEvent event) { UniversalMessage message = UniversalMessage.builder() @@ -226,14 +233,10 @@ public class WebSocketMessageBroadcaster { .messageId(generateMessageId()) .payload((RuleExecutionStatusPayload) event.getPayload()) .build(); - + broadcastMessageInternal(message); } - - /** - * 处理规则状态变更事件 - * @param event 规则状态变更事件 - */ + @EventListener public void handleRuleStateChange(RuleStateChangeWebSocketEvent event) { UniversalMessage message = UniversalMessage.builder() @@ -242,14 +245,10 @@ public class WebSocketMessageBroadcaster { .messageId(generateMessageId()) .payload((RuleStateChangePayload) event.getPayload()) .build(); - + broadcastMessageInternal(message); } - - /** - * 处理路径冲突告警事件 - * @param event 路径冲突告警事件 - */ + @EventListener public void handlePathConflictAlert(PathConflictAlertWebSocketEvent event) { UniversalMessage message = UniversalMessage.builder() @@ -258,14 +257,10 @@ public class WebSocketMessageBroadcaster { .messageId(generateMessageId()) .payload((PathConflictAlertMessage) event.getPayload()) .build(); - + broadcastMessageInternal(message); } - - /** - * 处理电子围栏告警事件 - * @param event 电子围栏告警事件 - */ + @EventListener public void handleGeofenceAlert(GeofenceAlertWebSocketEvent event) { UniversalMessage message = UniversalMessage.builder() @@ -274,97 +269,193 @@ public class WebSocketMessageBroadcaster { .messageId(generateMessageId()) .payload((GeofenceAlertPayload) event.getPayload()) .build(); - + broadcastMessageInternal(message); } - - /** - * 统一消息广播方法 - 使用原生WebSocket发送JSON消息(公共方法) - * @param message 要广播的消息 - */ + public void broadcastMessage(UniversalMessage message) { broadcastMessageInternal(message); } - - /** - * 统一消息广播方法 - 使用原生WebSocket发送JSON消息(内部方法) - * @param message 要广播的消息 - */ + private void broadcastMessageInternal(UniversalMessage message) { try { - // 使用Jackson ObjectMapper将UniversalMessage序列化为JSON字符串 - // 这样前端可以获得消息类型、时间戳、消息ID等完整信息 String jsonMessage = objectMapper.writeValueAsString(message); this.collisionWebSocketHandler.broadcastMessage(jsonMessage); - // 根据消息类型决定是否缓存(避免高频消息阻塞Redis) - // 高频实时消息(如位置更新)不缓存,追求实时性 - // 低频重要消息(如碰撞预警)缓存,保证不丢失 if (shouldCacheMessage(message.getType())) { messageCacheService.cacheMessage(message); } - } catch (Exception e) { - System.err.println("Failed to broadcast message via native WebSocket: " + e.getMessage()); - e.printStackTrace(); + log.error("Failed to broadcast message", e); } } - /** - * 判断消息是否应该缓存 - * - * @param messageType 消息类型 - * @return true=需要缓存,false=不需要缓存 - */ private boolean shouldCacheMessage(String messageType) { - // 高频实时消息 - 不缓存(追求实时性,避免Redis性能瓶颈) - // 这些消息更新频繁,用户关心的是最新状态,历史数据价值不高 - if (MessageTypeConstants.POSITION_UPDATE.equals(messageType) || - MessageTypeConstants.TRAFFIC_LIGHT_STATUS.equals(messageType) || - MessageTypeConstants.HEARTBEAT.equals(messageType) || - MessageTypeConstants.VEHICLE_STATUS_UPDATE.equals(messageType)) { + if (MessageTypeConstants.POSITION_UPDATE.equals(messageType) + || MessageTypeConstants.TRAFFIC_LIGHT_STATUS.equals(messageType) + || MessageTypeConstants.HEARTBEAT.equals(messageType) + || MessageTypeConstants.VEHICLE_STATUS_UPDATE.equals(messageType)) { return false; } - - // 低频重要消息 - 缓存(保证不丢失) - // 包括:碰撞预警、规则违规、路径冲突、电子围栏、航班通知、车辆指令等 - // 这些事件频率低但重要性高,用户需要完整的历史记录 return true; } - - /** - * 生成唯一消息ID - * @return 消息ID - */ + private String generateMessageId() { return UUID.randomUUID().toString(); } - - /** - * 获取最近消息(用于客户端重连恢复) - * @param messageType 消息类型,为空则获取所有类型 - * @param count 消息数量 - * @return 最近消息列表 - */ + public void sendRecentMessages(String messageType, int count) { try { - List> recentMessages = messageType == null || messageType.trim().isEmpty() + List> recentMessages = messageType == null || messageType.trim().isEmpty() ? messageCacheService.getAllRecentMessages(count) : messageCacheService.getRecentMessages(messageType, count); - - // 逐个发送最近消息 + recentMessages.forEach(this::broadcastMessageInternal); - } catch (Exception e) { - System.err.println("Failed to send recent messages: " + e.getMessage()); + log.error("Failed to send recent messages", e); } } - - /** - * 获取当前WebSocket连接数 - * @return 连接数量 - */ + public int getOnlineCount() { return this.collisionWebSocketHandler.getConnectionCount(); } -} \ No newline at end of file + + private PositionUpdatePayload sanitizePositionPayload(PositionUpdatePayload payload) { + if (payload.getPosition() == null + || payload.getPosition().getLatitude() == null + || payload.getPosition().getLongitude() == null) { + return null; + } + + double latitude = payload.getPosition().getLatitude(); + double longitude = payload.getPosition().getLongitude(); + if (!isValidCoordinate(latitude, longitude)) { + return null; + } + + long payloadTimestamp = payload.getTimestamp() != null ? payload.getTimestamp() : System.currentTimeMillis(); + PositionTrackState state = positionTrackStates.computeIfAbsent(payload.getObjectId(), k -> new PositionTrackState()); + + double filteredLatitude = latitude; + double filteredLongitude = longitude; + + synchronized (state) { + if (!state.initialized) { + initializeTrackState(state, latitude, longitude, payloadTimestamp); + } else { + if (payloadTimestamp <= state.lastPayloadTimestampMs) { + state.rejectedCount++; + return null; + } + + long dtMs = payloadTimestamp - state.lastPayloadTimestampMs; + double dtSec = dtMs / 1000.0; + double rawDistance = calculateDistanceMeters( + state.lastRawLatitude, + state.lastRawLongitude, + latitude, + longitude + ); + + double maxAllowedJump = resolveMaxAllowedJumpMeters(payload.getObjectType(), dtSec); + boolean isOutlierJump = rawDistance > maxAllowedJump; + if (isOutlierJump && state.rejectedCount < REACQUIRE_AFTER_REJECTS - 1) { + state.rejectedCount++; + log.debug("Drop outlier position update: objectId={}, jumpMeter={}, allowedMeter={}", + payload.getObjectId(), rawDistance, maxAllowedJump); + return null; + } + + if (isOutlierJump) { + state.rejectedCount = 0; + filteredLatitude = latitude; + filteredLongitude = longitude; + } else { + state.rejectedCount = 0; + if (rawDistance <= jitterMeterThreshold) { + filteredLatitude = state.lastFilteredLatitude; + filteredLongitude = state.lastFilteredLongitude; + } else { + double alpha = clamp(positionEmaAlpha, 0.05, 1.0); + filteredLatitude = state.lastFilteredLatitude + alpha * (latitude - state.lastFilteredLatitude); + filteredLongitude = state.lastFilteredLongitude + alpha * (longitude - state.lastFilteredLongitude); + } + } + + state.lastRawLatitude = latitude; + state.lastRawLongitude = longitude; + state.lastPayloadTimestampMs = payloadTimestamp; + state.lastFilteredLatitude = filteredLatitude; + state.lastFilteredLongitude = filteredLongitude; + state.lastAcceptedAtMs = System.currentTimeMillis(); + } + } + + PositionUpdatePayload.Position filteredPosition = PositionUpdatePayload.Position.builder() + .latitude(filteredLatitude) + .longitude(filteredLongitude) + .build(); + + return PositionUpdatePayload.builder() + .objectId(payload.getObjectId()) + .objectType(payload.getObjectType()) + .position(filteredPosition) + .heading(payload.getHeading()) + .speed(payload.getSpeed()) + .timestamp(payloadTimestamp) + .build(); + } + + private void initializeTrackState(PositionTrackState state, double latitude, double longitude, long payloadTimestamp) { + state.initialized = true; + state.lastPayloadTimestampMs = payloadTimestamp; + state.lastAcceptedAtMs = System.currentTimeMillis(); + state.lastRawLatitude = latitude; + state.lastRawLongitude = longitude; + state.lastFilteredLatitude = latitude; + state.lastFilteredLongitude = longitude; + state.rejectedCount = 0; + } + + private boolean isValidCoordinate(double latitude, double longitude) { + return latitude >= -90.0 && latitude <= 90.0 && longitude >= -180.0 && longitude <= 180.0; + } + + private double resolveMaxAllowedJumpMeters(String objectType, double dtSec) { + double safeDtSec = Math.max(dtSec, 0.05); + double speedMps = resolveMaxSpeedMps(objectType); + return speedMps * safeDtSec + Math.max(maxJumpMarginMeter, 0.0); + } + + private double resolveMaxSpeedMps(String objectType) { + if (objectType == null) { + return maxSpeedMpsVehicle; + } + return switch (objectType) { + case "AIRCRAFT" -> maxSpeedMpsAircraft; + case "UNMANNED_VEHICLE" -> maxSpeedMpsUnmanned; + case "SPECIAL_VEHICLE", "NORMAL_VEHICLE" -> maxSpeedMpsVehicle; + default -> maxSpeedMpsVehicle; + }; + } + + private double calculateDistanceMeters(double lat1, double lon1, double lat2, double lon2) { + double lat1Rad = Math.toRadians(lat1); + double lat2Rad = Math.toRadians(lat2); + double deltaLat = lat2Rad - lat1Rad; + double deltaLon = Math.toRadians(lon2 - lon1); + + double a = Math.sin(deltaLat / 2.0) * Math.sin(deltaLat / 2.0) + + Math.cos(lat1Rad) * Math.cos(lat2Rad) + * Math.sin(deltaLon / 2.0) * Math.sin(deltaLon / 2.0); + double c = 2.0 * Math.atan2(Math.sqrt(a), Math.sqrt(1.0 - a)); + return EARTH_RADIUS_METERS * c; + } + + private double clamp(double value, double min, double max) { + if (value < min) { + return min; + } + return Math.min(value, max); + } +} diff --git a/命令.md b/命令.md index d8cae42..6c78414 100644 --- a/命令.md +++ b/命令.md @@ -293,3 +293,15 @@ root@root:/home/project_20250804/qaup# curl -s -X POST http://10.64.58.228:8086/ root@root:/home/project_20250804/qaup# root@root:/home/project_20250804/qaup# ``` + +docker exec -i qaup-redis redis-cli HMGET "flightBiz:" inRunway outRunway inRunwayTs outRunwayTs + +for F in MU9692; do + BIZ=$(docker exec -i "qaup-redis" redis-cli HGET "flight:$F" activeBizKey | tr -d '\r' | sed 's/^"//;s/"$//') + FR=$(docker exec -i "qaup-redis" redis-cli HMGET "flight:$F" startSeat seat | tr '\n' ' ') + BR="" + if [ -n "$BIZ" ]; then + BR=$(docker exec -i "qaup-redis" redis-cli HMGET "flightBiz:$BIZ" startSeat seat | tr '\n' ' ') + fi + echo "$F | biz=$BIZ | flight=[$FR] | biz=[$BR]" +done \ No newline at end of file