373 lines
16 KiB
Java
373 lines
16 KiB
Java
package com.dongni.collisionavoidance.websocket.broadcaster;
|
||
|
||
import com.dongni.collisionavoidance.common.model.MovingObjectType;
|
||
import com.dongni.collisionavoidance.geofence.model.enums.GeofenceAlertLevel;
|
||
import com.dongni.collisionavoidance.rule.event.RuleExecutionEvent;
|
||
import com.dongni.collisionavoidance.rule.event.RuleStateChangeEvent;
|
||
import com.dongni.collisionavoidance.rule.event.RuleViolationEvent;
|
||
import com.dongni.collisionavoidance.rule.event.RuleViolationEventOccurred;
|
||
import com.dongni.collisionavoidance.rule.model.entity.SpatialRule;
|
||
import com.dongni.collisionavoidance.rule.model.enums.RuleExecutionResult;
|
||
import com.dongni.collisionavoidance.websocket.event.RuleExecutionStatusWebSocketEvent;
|
||
import com.dongni.collisionavoidance.websocket.event.RuleStateChangeWebSocketEvent;
|
||
import com.dongni.collisionavoidance.websocket.event.RuleViolationWebSocketEvent;
|
||
import com.dongni.collisionavoidance.websocket.message.RuleExecutionStatusPayload;
|
||
import com.dongni.collisionavoidance.websocket.message.RuleStateChangePayload;
|
||
import com.dongni.collisionavoidance.websocket.message.RuleViolationPayload;
|
||
import com.fasterxml.jackson.databind.JsonNode;
|
||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||
import org.locationtech.jts.geom.Point;
|
||
import org.locationtech.jts.io.WKTWriter;
|
||
import org.slf4j.Logger;
|
||
import org.slf4j.LoggerFactory;
|
||
import org.springframework.beans.factory.annotation.Autowired;
|
||
import org.springframework.context.ApplicationEventPublisher;
|
||
import org.springframework.context.event.EventListener;
|
||
import org.springframework.stereotype.Service;
|
||
|
||
import java.time.LocalDateTime;
|
||
import java.util.HashMap;
|
||
import java.util.Map;
|
||
|
||
/**
|
||
* 规则事件WebSocket发布服务
|
||
* 监听内部规则事件,转换为WebSocket事件并发布到前端
|
||
*
|
||
* @author AI Assistant
|
||
* @since 2025-01-17
|
||
*/
|
||
@Service
|
||
public class RuleEventWebSocketPublisher {
|
||
|
||
private static final Logger logger = LoggerFactory.getLogger(RuleEventWebSocketPublisher.class);
|
||
|
||
private final ApplicationEventPublisher eventPublisher;
|
||
private final WKTWriter wktWriter;
|
||
private final ObjectMapper objectMapper;
|
||
|
||
@Autowired
|
||
public RuleEventWebSocketPublisher(ApplicationEventPublisher eventPublisher, ObjectMapper objectMapper) {
|
||
this.eventPublisher = eventPublisher;
|
||
this.objectMapper = objectMapper;
|
||
this.wktWriter = new WKTWriter();
|
||
}
|
||
|
||
/**
|
||
* 监听规则违规事件,转换为WebSocket事件
|
||
*
|
||
* @param event 规则违规事件
|
||
*/
|
||
@EventListener
|
||
public void handleRuleViolationEvent(RuleViolationEventOccurred event) {
|
||
try {
|
||
RuleViolationEvent violationEvent = event.getViolationEvent();
|
||
|
||
// 构建WebSocket载荷
|
||
RuleViolationPayload payload = RuleViolationPayload.builder()
|
||
.eventId(violationEvent.getEventId())
|
||
.vehicleId(violationEvent.getVehicleId())
|
||
.vehicleType(getVehicleTypeString(violationEvent.getVehicleType()))
|
||
.ruleId(violationEvent.getRuleId())
|
||
.ruleName(getRuleName(violationEvent))
|
||
.ruleCategory(getRuleCategory(violationEvent))
|
||
.violationType(violationEvent.getViolationType() != null ? violationEvent.getViolationType().name() : "UNKNOWN")
|
||
.alertLevel(violationEvent.getAlertLevel() != null ? violationEvent.getAlertLevel().name() : "INFO")
|
||
.severityScore(violationEvent.getSeverityScore())
|
||
.location(getLocationWKT(violationEvent.getViolationLocation()))
|
||
.longitude(getLocationLongitude(violationEvent.getViolationLocation()))
|
||
.latitude(getLocationLatitude(violationEvent.getViolationLocation()))
|
||
.violationTime(violationEvent.getViolationTimestamp())
|
||
.description(violationEvent.getViolationDescription())
|
||
.requiresImmediateResponse(violationEvent.requiresImmediateAction())
|
||
.isCritical(isCriticalViolation(violationEvent))
|
||
.areaId(getAreaId(violationEvent))
|
||
.areaName(getAreaName(violationEvent))
|
||
.metadata(violationEvent.getMetadata())
|
||
.recommendedAction(getRecommendedAction(violationEvent))
|
||
.status(violationEvent.getProcessed() ? "PROCESSED" : "PENDING")
|
||
.build();
|
||
|
||
// 发布WebSocket事件
|
||
RuleViolationWebSocketEvent webSocketEvent = RuleViolationWebSocketEvent.create(payload);
|
||
eventPublisher.publishEvent(webSocketEvent);
|
||
|
||
logger.debug("Published rule violation WebSocket event for vehicle {} and rule {}",
|
||
violationEvent.getVehicleId(), violationEvent.getRuleId());
|
||
|
||
} catch (Exception e) {
|
||
logger.error("Failed to publish rule violation WebSocket event", e);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 监听规则执行事件,转换为WebSocket事件
|
||
*
|
||
* @param event 规则执行事件
|
||
*/
|
||
@EventListener
|
||
public void handleRuleExecutionEvent(RuleExecutionEvent event) {
|
||
try {
|
||
// 构建WebSocket载荷
|
||
RuleExecutionStatusPayload payload = RuleExecutionStatusPayload.builder()
|
||
.ruleId(event.getRule().getRuleId())
|
||
.ruleName(event.getRule().getName())
|
||
.vehicleId(event.getVehicleId())
|
||
.vehicleType(getVehicleTypeString(null)) // RuleExecutionEvent中没有vehicleType,先设为null
|
||
.eventType(event.getEventType() != null ? event.getEventType().name() : "UNKNOWN")
|
||
.executionStatus(event.getExecutionStatus() != null ? event.getExecutionStatus().name() : "UNKNOWN")
|
||
.executionResult(event.getExecutionResult() != null ? event.getExecutionResult().name() : "UNKNOWN")
|
||
.startTime(event.getExecutionStartTime())
|
||
.endTime(event.getExecutionEndTime())
|
||
.durationMs(event.getExecutionDurationMs())
|
||
.location(null) // RuleExecutionEvent中没有location信息
|
||
.longitude(null)
|
||
.latitude(null)
|
||
.isSuccess(event.isExecutionSuccessful())
|
||
.isViolation(event.getExecutionResult() != null && event.getExecutionResult().name().contains("VIOLATION"))
|
||
.errorMessage(event.getErrorMessage())
|
||
.executionDetails(event.getExecutionDetails())
|
||
.ruleCategory(event.getRule().getCategory() != null ? event.getRule().getCategory().name() : "UNKNOWN")
|
||
.requiresAction(event.getExecutionResult() != null && event.getExecutionResult() != RuleExecutionResult.PASS)
|
||
.context(event.getExecutionContext())
|
||
.performanceMetrics(createPerformanceMetrics(event))
|
||
.build();
|
||
|
||
// 发布WebSocket事件
|
||
RuleExecutionStatusWebSocketEvent webSocketEvent = RuleExecutionStatusWebSocketEvent.create(payload);
|
||
eventPublisher.publishEvent(webSocketEvent);
|
||
|
||
logger.debug("Published rule execution WebSocket event for vehicle {} and rule {}",
|
||
event.getVehicleId(), event.getRule().getRuleId());
|
||
|
||
} catch (Exception e) {
|
||
logger.error("Failed to publish rule execution WebSocket event", e);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 监听规则状态变更事件,转换为WebSocket事件
|
||
*
|
||
* @param event 规则状态变更事件
|
||
*/
|
||
@EventListener
|
||
public void handleRuleStateChangeEvent(RuleStateChangeEvent event) {
|
||
try {
|
||
SpatialRule rule = event.getRule();
|
||
|
||
// 构建WebSocket载荷
|
||
RuleStateChangePayload payload = RuleStateChangePayload.builder()
|
||
.ruleId(rule.getRuleId())
|
||
.ruleName(rule.getName())
|
||
.ruleCategory(rule.getCategory() != null ? rule.getCategory().name() : "UNKNOWN")
|
||
.changeType(event.getChangeType() != null ? event.getChangeType().name() : "UNKNOWN")
|
||
.fromStatus(event.getPreviousStatus() != null ? event.getPreviousStatus().name() : "UNKNOWN")
|
||
.toStatus(event.getNewStatus() != null ? event.getNewStatus().name() : "UNKNOWN")
|
||
.changeTime(event.getChangeTimestamp())
|
||
.reason(event.getChangeReason())
|
||
.changedBy(event.getChangedBy())
|
||
.affectsRuntime(event.isAffectsRuntime())
|
||
.severityScore(event.getChangeSeverity())
|
||
.requiresNotification(event.requiresNotification())
|
||
.impactScope(determineImpactScope(event))
|
||
.description(event.getEventDescription())
|
||
.configChanges(event.getChangeContext())
|
||
.effectiveTime(LocalDateTime.now())
|
||
.expiryTime(rule.getEffectiveEndTime())
|
||
.context(event.getChangeContext())
|
||
.rollbackInfo(createRollbackInfo(event))
|
||
.notificationLevel(determineNotificationLevel(event))
|
||
.build();
|
||
|
||
// 发布WebSocket事件
|
||
RuleStateChangeWebSocketEvent webSocketEvent = RuleStateChangeWebSocketEvent.create(payload);
|
||
eventPublisher.publishEvent(webSocketEvent);
|
||
|
||
logger.debug("Published rule state change WebSocket event for rule {}", rule.getRuleId());
|
||
|
||
} catch (Exception e) {
|
||
logger.error("Failed to publish rule state change WebSocket event", e);
|
||
}
|
||
}
|
||
|
||
// === 辅助方法 ===
|
||
|
||
/**
|
||
* 获取车辆类型字符串
|
||
*/
|
||
private String getVehicleTypeString(MovingObjectType vehicleType) {
|
||
return vehicleType != null ? vehicleType.name() : "UNKNOWN";
|
||
}
|
||
|
||
/**
|
||
* 获取位置WKT格式字符串
|
||
*/
|
||
private String getLocationWKT(Point location) {
|
||
if (location == null) {
|
||
return null;
|
||
}
|
||
try {
|
||
return wktWriter.write(location);
|
||
} catch (Exception e) {
|
||
logger.warn("Failed to convert location to WKT", e);
|
||
return null;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 获取位置经度
|
||
*/
|
||
private Double getLocationLongitude(Point location) {
|
||
return location != null ? location.getX() : null;
|
||
}
|
||
|
||
/**
|
||
* 获取位置纬度
|
||
*/
|
||
private Double getLocationLatitude(Point location) {
|
||
return location != null ? location.getY() : null;
|
||
}
|
||
|
||
/**
|
||
* 判断是否为紧急违规
|
||
*/
|
||
private Boolean isCriticalViolation(RuleViolationEvent violationEvent) {
|
||
if (violationEvent.getAlertLevel() == null) {
|
||
return false;
|
||
}
|
||
return violationEvent.getAlertLevel() == GeofenceAlertLevel.CRITICAL ||
|
||
violationEvent.getAlertLevel() == GeofenceAlertLevel.EMERGENCY;
|
||
}
|
||
|
||
/**
|
||
* 获取推荐处理动作
|
||
*/
|
||
private String getRecommendedAction(RuleViolationEvent violationEvent) {
|
||
if (violationEvent.requiresImmediateAction()) {
|
||
return "立即停车并等待指示";
|
||
}
|
||
if (isCriticalViolation(violationEvent)) {
|
||
return "立即离开限制区域";
|
||
}
|
||
return "调整路径避免违规";
|
||
}
|
||
|
||
/**
|
||
* 从规则详情中获取规则名称
|
||
*/
|
||
private String getRuleName(RuleViolationEvent violationEvent) {
|
||
if (violationEvent.getRuleDetails() != null && !violationEvent.getRuleDetails().trim().isEmpty()) {
|
||
try {
|
||
JsonNode ruleDetails = objectMapper.readTree(violationEvent.getRuleDetails());
|
||
if (ruleDetails.has("name")) {
|
||
return ruleDetails.get("name").asText();
|
||
}
|
||
} catch (Exception e) {
|
||
logger.warn("解析规则详情失败: {}", violationEvent.getRuleDetails(), e);
|
||
}
|
||
}
|
||
return "规则-" + violationEvent.getRuleId();
|
||
}
|
||
|
||
/**
|
||
* 从规则详情中获取规则类别
|
||
*/
|
||
private String getRuleCategory(RuleViolationEvent violationEvent) {
|
||
if (violationEvent.getRuleDetails() != null && !violationEvent.getRuleDetails().trim().isEmpty()) {
|
||
try {
|
||
JsonNode ruleDetails = objectMapper.readTree(violationEvent.getRuleDetails());
|
||
if (ruleDetails.has("category")) {
|
||
return ruleDetails.get("category").asText();
|
||
}
|
||
} catch (Exception e) {
|
||
logger.warn("解析规则详情失败: {}", violationEvent.getRuleDetails(), e);
|
||
}
|
||
}
|
||
return "UNKNOWN";
|
||
}
|
||
|
||
/**
|
||
* 从元数据中获取区域ID
|
||
*/
|
||
private String getAreaId(RuleViolationEvent violationEvent) {
|
||
if (violationEvent.getMetadata() != null && !violationEvent.getMetadata().trim().isEmpty()) {
|
||
try {
|
||
JsonNode metadata = objectMapper.readTree(violationEvent.getMetadata());
|
||
if (metadata.has("areaId")) {
|
||
return metadata.get("areaId").asText();
|
||
}
|
||
} catch (Exception e) {
|
||
logger.warn("解析元数据失败: {}", violationEvent.getMetadata(), e);
|
||
}
|
||
}
|
||
return null;
|
||
}
|
||
|
||
/**
|
||
* 从元数据中获取区域名称
|
||
*/
|
||
private String getAreaName(RuleViolationEvent violationEvent) {
|
||
if (violationEvent.getMetadata() != null && !violationEvent.getMetadata().trim().isEmpty()) {
|
||
try {
|
||
JsonNode metadata = objectMapper.readTree(violationEvent.getMetadata());
|
||
if (metadata.has("areaName")) {
|
||
return metadata.get("areaName").asText();
|
||
}
|
||
} catch (Exception e) {
|
||
logger.warn("解析元数据失败: {}", violationEvent.getMetadata(), e);
|
||
}
|
||
}
|
||
return null;
|
||
}
|
||
|
||
/**
|
||
* 创建性能指标信息
|
||
*/
|
||
private Map<String, Object> createPerformanceMetrics(RuleExecutionEvent event) {
|
||
Map<String, Object> metrics = new HashMap<>();
|
||
metrics.put("executionTime", event.getExecutionDurationMs());
|
||
metrics.put("startTime", event.getExecutionStartTime());
|
||
metrics.put("endTime", event.getExecutionEndTime());
|
||
metrics.put("successful", event.isExecutionSuccessful());
|
||
if (event.getErrorMessage() != null) {
|
||
metrics.put("errorMessage", event.getErrorMessage());
|
||
}
|
||
return metrics;
|
||
}
|
||
|
||
/**
|
||
* 确定影响范围
|
||
*/
|
||
private String determineImpactScope(RuleStateChangeEvent event) {
|
||
if (event.isAffectsRuntime()) {
|
||
return "RUNTIME";
|
||
}
|
||
if (event.getChangeType() == RuleStateChangeEvent.StateChangeType.CONFIGURATION_UPDATE) {
|
||
return "CONFIGURATION";
|
||
}
|
||
return "METADATA";
|
||
}
|
||
|
||
/**
|
||
* 创建回滚信息
|
||
*/
|
||
private Map<String, Object> createRollbackInfo(RuleStateChangeEvent event) {
|
||
Map<String, Object> rollbackInfo = new HashMap<>();
|
||
rollbackInfo.put("previousStatus", event.getPreviousStatus() != null ? event.getPreviousStatus().name() : "UNKNOWN");
|
||
rollbackInfo.put("changeTime", event.getChangeTimestamp());
|
||
rollbackInfo.put("changeType", event.getChangeType() != null ? event.getChangeType().name() : "UNKNOWN");
|
||
return rollbackInfo;
|
||
}
|
||
|
||
/**
|
||
* 确定通知级别
|
||
*/
|
||
private String determineNotificationLevel(RuleStateChangeEvent event) {
|
||
if (event.isAffectsRuntime()) {
|
||
return "HIGH";
|
||
}
|
||
if (event.requiresNotification()) {
|
||
return "MEDIUM";
|
||
}
|
||
return "LOW";
|
||
}
|
||
} |