Compare commits

...

2 Commits

Author SHA1 Message Date
799d748cba Merge remote-tracking branch 'origin/main' 2025-03-31 14:32:19 +08:00
807b9f77db 1.重新优化了数据采集的逻辑(原逻辑存在数据不一致的问题,优化后的方案使用生产者消费者方式)
2.优化了MovingObject的数据结构
3.Websocket配置成功
2025-03-31 14:31:45 +08:00
44 changed files with 133995 additions and 248 deletions

View File

@ -0,0 +1,533 @@
# 碰撞避免系统数据处理模块设计文档
## 1. 模块概述
数据处理模块DataProcessing是碰撞避免系统的核心组件之一负责对从数据采集模块获取的原始数据进行处理、转换和分析。该模块包含坐标转换、速度计算、数据预处理等功能为后续的碰撞检测和避险决策提供高质量的数据支持。
### 1.1 功能职责
- 将全球地理坐标WGS84转换为机场局部坐标系
- 对移动物体的速度和加速度进行计算和分析
- 执行数据质量检测,识别和标记异常数据
- 进行数据平滑和滤波处理
- 维护各类移动物体的实时状态
- 执行碰撞风险评估和预警(计划实现)
### 1.2 模块结构
数据处理模块采用分层架构设计,主要包含以下组件:
```
dataProcessing/
├── config/ # 配置类,如坐标系统配置等
├── model/ # 数据模型,定义数据结构(如碰撞风险等级等)
├── service/ # 服务层,实现核心业务逻辑
│ ├── DataProcessor.java # 数据处理主服务
│ ├── CoordinateSystemService.java # 坐标转换服务
│ ├── SpeedCalculationService.java # 速度计算服务
│ ├── AirportCoordinateSystem.java # 机场坐标系统
│ └── CollisionDetectionService.java # 碰撞检测服务(待实现)
└── util/ # 工具类,提供辅助功能
```
## 2. 核心组件设计
### 2.1 DataProcessor数据处理器
DataProcessor是数据处理模块的核心组件负责协调各种处理任务管理数据流转并确保数据的一致性和质量。
#### 2.1.1 主要职责
- 接收来自数据采集模块的数据更新
- 调度不同类型数据的处理流程
- 维护处理过程中的数据一致性
- 将处理后的数据提供给其他模块使用
#### 2.1.2 processLoop方法设计
`processLoop`方法是DataProcessor的核心循环持续处理数据流。其主要业务逻辑如下
```java
private void processLoop() {
while (!Thread.currentThread().isInterrupted()) {
// 获取数据更新
Map<MovingObjectType, Set<String>> delta = movingObjectRepository.takeUpdate();
// 判断更新是否为空
if (delta == null || delta.isEmpty()) {
// 无数据更新,等待一段时间
Thread.sleep(500);
continue;
}
// 处理各类型的数据更新
delta.forEach((objectType, ids) -> {
// 获取该类型的数据快照
Map<String, MovingObject> snapshot = movingObjectRepository.getTypeMapDirect(objectType);
List<MovingObject> dataList = new CopyOnWriteArrayList<>(snapshot.values());
// 根据不同类型分别处理
switch (objectType) {
case AIRCRAFT -> processAircraftData(dataList);
case SPECIAL_VEHICLE -> processVehicleData(dataList);
case UNMANNED_VEHICLE -> processLocationData(dataList);
default -> log.warn("未支持的数据类型: {}", objectType);
}
});
}
}
```
processLoop方法的工作流程
1. **循环监听更新**:持续检查是否有新的数据更新
2. **获取变更数据**从数据仓库中获取已更新的对象ID集合
3. **获取数据快照**基于更新的ID获取完整的数据对象
4. **类型分发处理**:根据不同移动物体类型,调用相应的处理方法
5. **异常处理**:捕获并记录处理过程中的异常,确保主循环不会因异常而中断
#### 2.1.3 数据类型处理方法
对于不同类型的移动物体DataProcessor提供了专门的处理方法
- **processAircraftData**:处理航空器数据
- **processVehicleData**:处理特种车辆数据
- **processLocationData**:处理无人车数据
这些方法执行类似的处理流程,主要包括:
1. 坐标转换:调用 `convertToLocalCoordinate` 将地理坐标转换为局部坐标
2. 速度预处理:调用 `speedCalculationService.preprocessData` 进行速度相关计算和质量检查
### 2.2 CoordinateSystemService坐标系统服务
负责执行坐标转换将GPS或WGS84坐标系下的经纬度转换为以机场为中心的局部坐标系。
#### 2.2.1 主要功能
- 根据机场中心点创建局部坐标系
- 执行地理坐标到局部坐标的转换
- 计算局部坐标系下的距离和方向
#### 2.2.2 核心方法
```java
public double[] convertToLocalCoordinate(double longitude, double latitude) throws Exception {
return airportCoordinateSystem.convertToLocal(longitude, latitude);
}
```
此方法将WGS84坐标系的经纬度转换为以机场中心为原点的UTM局部坐标系下的坐标返回一个包含x东向和y北向坐标的数组。
### 2.3 SpeedCalculationService速度计算服务
负责计算和处理移动物体的速度、加速度等动力学参数,同时进行数据质量检查。
#### 2.3.1 主要功能
- 计算移动物体的速度分量
- 执行异常值检测
- 应用数据质量标记
- 提供数据平滑和滤波
- 根据历史数据计算加速度
#### 2.3.2 核心方法
```java
public void preprocessData(List<MovingObject> dataList) {
// 遍历数据列表
for (MovingObject obj : dataList) {
// 初始化历史状态队列
if (obj.getStateHistory().isEmpty()) {
// 创建初始状态
// ...
continue;
}
// 获取最近的历史状态
MovementState lastState = obj.getStateHistory().getFirst();
// 异常值检测(位置跳变、时间异常、速度异常)
// ...
// 标记数据质量
// ...
}
}
```
preprocessData方法对数据进行预处理主要包括
1. 历史状态初始化
2. 异常值检测:检查位置跳变、时间异常和速度异常
3. 数据质量标记将异常数据标记为SUSPICIOUS
4. 将处理后的状态添加到历史状态队列
## 3. 碰撞检测功能设计
### 3.1 功能需求
碰撞检测功能是碰撞避免系统的核心,需要实现以下功能:
1. 实时检测不同移动物体之间的潜在碰撞风险
2. 根据物体的当前位置、速度和轨迹预测未来可能的碰撞
3. 计算碰撞风险等级,并发出预警
4. 为避险决策提供准确的碰撞位置和时间预测
### 3.2 CollisionDetectionService设计
#### 3.2.1 类结构
```java
@Service
public class CollisionDetectionService {
// 碰撞风险阈值(米)
private static final double SEVERE_RISK_THRESHOLD = 50.0; // 严重风险
private static final double HIGH_RISK_THRESHOLD = 100.0; // 高风险
private static final double MEDIUM_RISK_THRESHOLD = 200.0; // 中等风险
private static final double LOW_RISK_THRESHOLD = 500.0; // 低风险
// 时间预测范围(秒)
private static final int PREDICTION_HORIZON = 30;
// 依赖注入
private final MovingObjectRepository repository;
private final WebSocketService webSocketService;
// 碰撞检测方法
public void detectCollisions() {
// 实现碰撞检测逻辑
}
// 两物体间碰撞风险计算
private CollisionRisk calculateCollisionRisk(MovingObject obj1, MovingObject obj2) {
// 计算当前距离
// 预测轨迹
// 计算最近接近点
// 评估风险等级
// 返回碰撞风险对象
}
// 轨迹预测方法
private List<PredictedPosition> predictTrajectory(MovingObject obj, int seconds) {
// 基于当前位置、速度和加速度预测未来轨迹
}
}
```
#### 3.2.2 数据模型设计
```java
// 碰撞风险模型
public class CollisionRisk {
private String id; // 风险ID
private String object1Id; // 第一个物体ID
private String object2Id; // 第二个物体ID
private MovingObjectType type1; // 第一个物体类型
private MovingObjectType type2; // 第二个物体类型
private double currentDistance; // 当前距离
private double minimumDistance; // 预测最小距离
private long timeToMinimumDistance; // 达到最小距离的时间(毫秒)
private RiskLevel riskLevel; // 风险等级
private PredictedPosition collisionPoint; // 潜在碰撞点
private long createdTime; // 创建时间
// getters and setters...
}
// 风险等级枚举
public enum RiskLevel {
SEVERE, // 严重风险(紧急)
HIGH, // 高风险
MEDIUM, // 中等风险
LOW, // 低风险
NONE // 无风险
}
// 预测位置
public class PredictedPosition {
private double x; // 局部坐标系x坐标
private double y; // 局部坐标系y坐标
private double z; // 局部坐标系z坐标高度
private long timestamp; // 预测时间戳
// getters and setters...
}
```
### 3.3 碰撞检测算法
#### 3.3.1 基本碰撞检测算法
```java
public void detectCollisions() {
// 获取所有活跃的移动物体
List<MovingObject> allObjects = getAllActiveObjects();
// 生成物体对组合
List<Pair<MovingObject, MovingObject>> objectPairs = generateObjectPairs(allObjects);
// 并行处理所有物体对
List<CollisionRisk> risks = objectPairs.parallelStream()
.map(pair -> calculateCollisionRisk(pair.getLeft(), pair.getRight()))
.filter(risk -> risk.getRiskLevel() != RiskLevel.NONE)
.collect(Collectors.toList());
// 处理检测到的风险
processDetectedRisks(risks);
}
private CollisionRisk calculateCollisionRisk(MovingObject obj1, MovingObject obj2) {
// 计算当前距离
double currentDistance = calculateDistance(obj1, obj2);
// 如果当前距离已经过远,可以直接排除
if (currentDistance > LOW_RISK_THRESHOLD &&
obj1.getVelocity().getSpeed() + obj2.getVelocity().getSpeed() < 50) {
return new CollisionRisk(obj1, obj2, currentDistance, RiskLevel.NONE);
}
// 预测未来30秒的轨迹
List<PredictedPosition> trajectory1 = predictTrajectory(obj1, PREDICTION_HORIZON);
List<PredictedPosition> trajectory2 = predictTrajectory(obj2, PREDICTION_HORIZON);
// 计算轨迹上所有点的距离,找出最小距离点
Pair<Double, Long> minDistanceAndTime = findMinimumDistance(trajectory1, trajectory2);
double minDistance = minDistanceAndTime.getLeft();
long timeToMinDistance = minDistanceAndTime.getRight();
// 确定风险等级
RiskLevel riskLevel = determineRiskLevel(minDistance);
// 创建并返回碰撞风险对象
return new CollisionRisk(obj1, obj2, currentDistance, minDistance,
timeToMinDistance, riskLevel,
findCollisionPoint(trajectory1, trajectory2, timeToMinDistance));
}
```
#### 3.3.2 轨迹预测算法
```java
private List<PredictedPosition> predictTrajectory(MovingObject obj, int seconds) {
List<PredictedPosition> trajectory = new ArrayList<>();
double deltaT = 1.0; // 时间步长(秒)
// 获取当前位置、速度
double x = obj.getVelocity().getX();
double y = obj.getVelocity().getY();
double z = obj.getCurrentPosition().getAltitude();
double vx = obj.getVelocity().getVx();
double vy = obj.getVelocity().getVy();
double vz = obj.getVelocity().getVz();
// 估计加速度(如果有历史数据)
double ax = 0, ay = 0, az = 0;
if (obj.getStateHistory().size() >= 2) {
// 计算加速度...
}
// 当前时间戳
long currentTime = obj.getTimestamp();
// 预测轨迹点
for (int i = 0; i <= seconds; i++) {
// 使用运动学公式预测位置
double predictedX = x + vx * i * deltaT + 0.5 * ax * Math.pow(i * deltaT, 2);
double predictedY = y + vy * i * deltaT + 0.5 * ay * Math.pow(i * deltaT, 2);
double predictedZ = z + vz * i * deltaT + 0.5 * az * Math.pow(i * deltaT, 2);
// 预测时间戳
long predictedTime = currentTime + (long)(i * deltaT * 1000);
// 添加到轨迹
trajectory.add(new PredictedPosition(predictedX, predictedY, predictedZ, predictedTime));
}
return trajectory;
}
```
### 3.4 集成到现有系统
要将碰撞检测功能集成到现有系统中需要修改DataProcessor类以调用碰撞检测服务
```java
@Slf4j
@Component
public class DataProcessor {
@Autowired
private MovingObjectRepository movingObjectRepository;
@Autowired
private CoordinateSystemService coordinateSystemService;
@Autowired
private SpeedCalculationService speedCalculationService;
@Autowired
private CollisionDetectionService collisionDetectionService; // 新增
@Resource
private Executor processingExecutor;
// 修改processLoop方法
private void processLoop() {
// ... 现有代码 ...
delta.forEach((objectType, ids) -> {
// ... 现有代码 ...
// 数据处理完成后执行碰撞检测
collisionDetectionService.detectCollisions();
});
}
}
```
## 4. 性能优化策略
由于碰撞检测需要大量计算,可采用以下优化策略:
### 4.1 空间分区优化
使用空间分区Spatial Partitioning技术将整个坐标空间分成网格或四叉树只检测同一区域或相邻区域的物体
```java
// 创建空间网格
private Map<GridCell, List<MovingObject>> createSpatialGrid(List<MovingObject> objects) {
Map<GridCell, List<MovingObject>> grid = new HashMap<>();
for (MovingObject obj : objects) {
GridCell cell = calculateGridCell(obj);
grid.computeIfAbsent(cell, k -> new ArrayList<>()).add(obj);
}
return grid;
}
// 获取需要比较的物体对
private List<Pair<MovingObject, MovingObject>> getPairsToCheck(Map<GridCell, List<MovingObject>> grid) {
List<Pair<MovingObject, MovingObject>> pairs = new ArrayList<>();
// 遍历每个网格及其相邻网格
for (Map.Entry<GridCell, List<MovingObject>> entry : grid.entrySet()) {
GridCell cell = entry.getKey();
List<MovingObject> objectsInCell = entry.getValue();
// 同一网格内的物体对比
for (int i = 0; i < objectsInCell.size(); i++) {
for (int j = i + 1; j < objectsInCell.size(); j++) {
pairs.add(Pair.of(objectsInCell.get(i), objectsInCell.get(j)));
}
}
// 与相邻网格的物体对比
for (GridCell neighbor : getNeighborCells(cell)) {
List<MovingObject> objectsInNeighbor = grid.get(neighbor);
if (objectsInNeighbor != null) {
for (MovingObject obj1 : objectsInCell) {
for (MovingObject obj2 : objectsInNeighbor) {
pairs.add(Pair.of(obj1, obj2));
}
}
}
}
}
return pairs;
}
```
### 4.2 多级碰撞检测
实现多级碰撞检测,从粗略到精细:
1. **粗略检测**使用包围盒Bounding Box或包围球进行快速排除
2. **中间检测**:对可能碰撞的物体对进行简化轨迹预测
3. **精细检测**:只对高风险物体对进行精确轨迹预测和分析
```java
private CollisionRisk calculateCollisionRisk(MovingObject obj1, MovingObject obj2) {
// 第一级:粗略检测
if (!couldPossiblyCollide(obj1, obj2)) {
return new CollisionRisk(obj1, obj2, Double.MAX_VALUE, RiskLevel.NONE);
}
// 第二级:中间检测
Pair<Double, Long> roughEstimate = estimateMinimumDistance(obj1, obj2);
if (roughEstimate.getLeft() > HIGH_RISK_THRESHOLD) {
return new CollisionRisk(obj1, obj2, roughEstimate.getLeft(), RiskLevel.LOW);
}
// 第三级:精细检测
return performDetailedCollisionAnalysis(obj1, obj2);
}
```
### 4.3 并行计算
利用多线程和并行计算技术提高处理速度:
1. 使用Java的Stream API的并行处理能力
2. 将碰撞检测任务拆分为多个子任务并行执行
3. 利用线程池管理并优化线程资源
## 5. 日志记录和监控
在碰撞检测过程中,实现全面的日志记录和监控机制:
```java
// 日志记录
private void logCollisionRisk(CollisionRisk risk) {
if (risk.getRiskLevel() == RiskLevel.SEVERE || risk.getRiskLevel() == RiskLevel.HIGH) {
log.warn("检测到高风险碰撞可能! 对象: {} 和 {}, 风险等级: {}, 最小距离: {}米, 预计时间: {}秒后",
risk.getObject1Id(), risk.getObject2Id(), risk.getRiskLevel(),
risk.getMinimumDistance(), risk.getTimeToMinimumDistance() / 1000);
} else {
log.info("检测到碰撞风险. 对象: {} 和 {}, 风险等级: {}, 最小距离: {}米",
risk.getObject1Id(), risk.getObject2Id(), risk.getRiskLevel(),
risk.getMinimumDistance());
}
}
// 性能监控
private void monitorPerformance(long startTime, int objectCount, int pairsChecked, List<CollisionRisk> risks) {
long endTime = System.currentTimeMillis();
long duration = endTime - startTime;
log.debug("碰撞检测完成. 处理时间: {}ms, 物体数: {}, 检查对数: {}, 发现风险: {}",
duration, objectCount, pairsChecked, risks.size());
// 记录性能指标到监控系统
// ...
}
```
## 6. 后续优化和扩展
### 6.1 机器学习增强
引入机器学习模型增强碰撞预测能力:
1. 使用历史数据训练移动物体的轨迹预测模型
2. 引入情境感知能力,识别特定场景下的常见模式
3. 通过强化学习优化避险策略
### 6.2 多传感器数据融合
整合多种数据源提高预测准确性:
1. 结合气象数据考虑环境因素对运动的影响
2. 整合地面雷达、ADS-B和其他传感器数据
3. 考虑地形和建筑物等静态障碍物
### 6.3 通信和响应机制
完善碰撞风险的通知和响应机制:
1. 实现分级预警通知WebSocket、移动应用推送等
2. 为不同类型的碰撞风险定制响应策略
3. 提供碰撞避免的建议路径和操作
## 7. 总结
本文档详细描述了碰撞避免系统中数据处理模块的当前设计和processLoop方法的业务逻辑并提出了碰撞检测功能的设计方案。通过实现高效的碰撞检测算法、多级优化策略和完善的监控机制系统能够有效检测和预警潜在的碰撞风险为机场安全运行提供保障。
随着系统的持续优化和功能扩展,碰撞避免系统将能够应对更复杂的场景和更高的安全需求,实现从被动监测到主动避险的演进。

305
doc/configuration_guide.md Normal file
View File

@ -0,0 +1,305 @@
# 碰撞避免系统配置文件说明文档
本文档详细描述了碰撞避免系统中的各种配置文件,按照它们在项目中的位置进行组织说明。不同位置的配置文件负责系统不同方面的功能配置。
## 1. 配置文件结构概览
系统的配置文件按照以下目录结构组织:
```
com.dongni.collisionavoidance/
├── common/config/ # 通用配置
├── config/ # 应用级全局配置
├── dataCollector/config/ # 数据采集模块配置
├── dataProcessing/config/ # 数据处理模块配置
└── webSocket/config/ # WebSocket通信模块配置
```
## 2. 通用配置 (common/config)
通用配置位于`com.dongni.collisionavoidance.common.config`包下,提供了系统中通用的基础设施配置。
### 2.1 SchedulerConfig.java
**功能**: 定时任务线程池配置
**说明**:
- 创建自定义线程池,避免定时任务单线程阻塞的情况
- 启用Spring的异步支持@EnableAsync
- 配置ThreadPoolTaskScheduler以执行定时任务
**关键配置**:
```java
@Configuration
@EnableAsync // 启用异步支持
public class SchedulerConfig {
@Bean
public ThreadPoolTaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
// 设置线程池大小,根据需求调整
scheduler.setPoolSize(3);
// 设置线程名称前缀
scheduler.setThreadNamePrefix("ScheduledTask-");
return scheduler;
}
}
```
**用途**:
- 用于管理系统中的各种定时任务,如定期数据采集、数据清理等
- 通过线程池提高系统定时任务的并发处理能力
- 防止单个定时任务阻塞导致其他任务延迟执行
## 3. 应用级全局配置 (config)
应用级全局配置位于`com.dongni.collisionavoidance.config`包下,提供了影响整个应用的核心配置。
### 3.1 RedisConfig.java
**功能**: Redis缓存配置
**说明**:
- 配置RedisTemplate用于与Redis交互
- 配置序列化器处理Java对象与Redis数据的转换
- 特别针对VehicleLocationInfo类进行了优化
**关键配置**:
```java
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, VehicleLocationInfo> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, VehicleLocationInfo> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
// 使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值
Jackson2JsonRedisSerializer<VehicleLocationInfo> serializer =
new Jackson2JsonRedisSerializer<>(VehicleLocationInfo.class);
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
serializer.setObjectMapper(mapper);
template.setValueSerializer(serializer);
template.setHashValueSerializer(serializer);
// 使用StringRedisSerializer来序列化和反序列化redis的key值
template.setKeySerializer(new StringRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
template.afterPropertiesSet();
return template;
}
}
```
**用途**:
- 提供高效的车辆位置信息缓存机制
- 支持实时数据快速读写
- 通过JavaTimeModule支持Java 8日期时间类型序列化
### 3.2 ThreadPoolConfig.java
**功能**: 数据处理线程池配置
**说明**:
- 创建用于数据处理的线程池执行器
- 配置核心线程数、最大线程数和队列容量
- 为线程设置有意义的名称前缀
**关键配置**:
```java
@Configuration
public class ThreadPoolConfig {
@Bean(name = "processingExecutor")
public Executor processingExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("data-process-");
executor.initialize();
return executor;
}
}
```
**用途**:
- 用于处理大量并发的数据处理任务
- 避免数据处理任务阻塞主线程
- 优化系统资源利用,提高处理效率
## 4. 数据采集模块配置 (dataCollector/config)
数据采集模块配置位于`com.dongni.collisionavoidance.dataCollector.config`包下,专注于数据采集相关的配置。
### 4.1 RestTemplateConfig.java
**功能**: HTTP客户端配置
**说明**:
- 配置RestTemplate用于外部API调用
- 自定义ObjectMapper忽略未知属性以增强兼容性
- 将自定义ObjectMapper应用到RestTemplate的消息转换器
**关键配置**:
```java
@Configuration
public class RestTemplateConfig {
@Bean
public RestTemplate restTemplate(ObjectMapper objectMapper) {
RestTemplate restTemplate = new RestTemplate();
restTemplate.getMessageConverters().forEach(converter -> {
if (converter instanceof MappingJackson2HttpMessageConverter) {
((MappingJackson2HttpMessageConverter) converter).setObjectMapper(objectMapper);
}
});
return restTemplate;
}
@Bean
public ObjectMapper objectMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return mapper;
}
}
```
**用途**:
- 用于从外部系统或API获取航空器、车辆等移动物体数据
- 通过配置的ObjectMapper实现宽松的JSON解析提高与外部系统的兼容性
- 支持数据采集模块的HTTP通信需求
## 5. 数据处理模块配置 (dataProcessing/config)
数据处理模块配置位于`com.dongni.collisionavoidance.dataProcessing.config`包下,专注于数据处理和分析相关配置。
### 5.1 CoordinateSystemProperties.java
**功能**: 坐标系统配置
**说明**:
- 从application.yml配置文件中读取机场中心点坐标
- 使用@ConfigurationProperties将配置值绑定到Java属性
- 提供getter/setter方法访问配置值
**关键配置**:
```java
@Component
@ConfigurationProperties(prefix = "coordinate-system.airport")
public class CoordinateSystemProperties {
private double centerLongitude;
private double centerLatitude;
// getter和setter方法
}
```
**用途**:
- 为坐标转换和距离计算提供基准点
- 在碰撞风险评估中作为参考坐标
- 支持局部坐标系与地理坐标系之间的转换
## 6. WebSocket通信模块配置 (webSocket/config)
WebSocket通信模块配置位于`com.dongni.collisionavoidance.webSocket.config`包下,负责实时通信相关配置。
### 6.1 JacksonConfig.java
**功能**: JSON序列化配置
**说明**:
- 配置Jackson2ObjectMapperBuilder
- 设置序列化选项如缩进输出、忽略null值
- 禁用将日期写为时间戳的功能
**关键配置**:
```java
@Configuration
public class JacksonConfig {
@Bean
public Jackson2ObjectMapperBuilder objectMapperBuilder() {
return new Jackson2ObjectMapperBuilder()
.indentOutput(true)
.serializationInclusion(JsonInclude.Include.NON_NULL)
.featuresToDisable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
}
}
```
**用途**:
- 为WebSocket通信提供一致的JSON序列化行为
- 优化JSON输出格式提高可读性
- 通过忽略null值减少传输数据量
### 6.2 WebSocketConfig.java
**功能**: WebSocket通信配置
**说明**:
- 启用WebSocket消息代理
- 注册STOMP端点并配置跨域访问
- 配置消息代理前缀和应用目标前缀
- 添加JSON消息转换器
**关键配置**:
```java
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 注册STOMP端点客户端通过此URL连接WebSocket
registry.addEndpoint("/ws")
.setAllowedOriginPatterns("*") // 允许跨域
.withSockJS(); // 启用SockJS支持
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 启用内存消息代理,客户端订阅地址前缀为/topic
registry.enableSimpleBroker("/topic");
// 客户端发送消息的地址前缀为/app
registry.setApplicationDestinationPrefixes("/app");
}
@Override
public boolean configureMessageConverters(List<MessageConverter> messageConverters) {
messageConverters.add(new MappingJackson2MessageConverter());
return false;
}
}
```
**用途**:
- 为前端客户端提供实时数据推送功能
- 支持移动物体位置的实时更新
- 实现碰撞警告的即时通知机制
- 通过STOMP子协议规范化WebSocket通信
## 7. 配置之间的关系
系统中不同位置的配置文件相互协作,共同支持碰撞避免系统的运行:
1. **线程池配置** (SchedulerConfig, ThreadPoolConfig)
- 提供异步处理能力,支持定时采集和高并发数据处理
2. **数据存储配置** (RedisConfig)
- 为实时数据提供高效缓存机制
3. **通信配置** (RestTemplateConfig, WebSocketConfig)
- 支持与外部系统数据交换和向客户端推送实时信息
4. **数据处理配置** (CoordinateSystemProperties)
- 提供坐标转换和碰撞计算的基础参数
5. **序列化配置** (JacksonConfig)
- 确保系统中JSON数据的一致性处理

442
doc/datacollector_design.md Normal file
View File

@ -0,0 +1,442 @@
# 碰撞避免系统数据采集模块设计文档
本文档详细描述了碰撞避免系统中数据采集模块的设计和实现,包括其架构、主要组件、工作流程以及与其他模块的交互方式。
## 1. 模块概述
数据采集模块DataCollector是碰撞避免系统的核心组件之一负责从多种数据源实时采集不同类型移动物体航空器、特勤车辆、无人车等的位置和状态信息并提供给系统的其他模块进行处理和分析。该模块采用定时任务机制定期从外部API获取数据并维护移动物体的实时状态和历史轨迹。
### 1.1 功能职责
- 从多个外部数据源采集移动物体的位置和状态信息
- 对采集的原始数据进行初步处理和转换
- 维护移动物体的历史轨迹记录
- 提供数据清理机制,避免历史数据过度积累
- 与系统其他模块协作,为碰撞检测和避险决策提供数据支持
### 1.2 模块结构
数据采集模块采用分层架构设计,主要包含以下组件:
```
dataCollector/
├── config/ # 配置类如RestTemplate配置等
├── dao/ # 数据访问层,负责与外部数据源交互
├── model/ # 数据模型,定义数据结构
│ └── enums/ # 枚举类型定义
├── repository/ # 数据仓库,负责数据存储和检索
└── service/ # 服务层,实现核心业务逻辑
```
## 2. 核心组件设计
### 2.1 数据模型 (model)
数据模型定义了数据采集模块处理的各类数据结构:
#### 2.1.1 VehicleLocationInfo.java
用于表示车辆位置信息的数据模型:
```java
@Data
public class VehicleLocationInfo {
private String transId; // 消息唯一id
private long timestamp; // 时间戳
private String vehicleId; // 车辆ID
private double longitude; // 经度
private double latitude; // 纬度
private double direction; // 车头航向角
private double speed; // 车速
}
```
#### 2.1.2 VehicleCommand.java
用于发送控制指令到无人车的命令模型:
```java
// 简化表示,实际实现可能包含更多字段
@Data
public class VehicleCommand {
private String commandId; // 命令ID
private String vehicleId; // 目标车辆ID
private String commandType; // 命令类型(如:转向、加速、减速等)
private Map<String, Object> parameters; // 命令参数
private long timestamp; // 命令发送时间戳
}
```
#### 2.1.3 CommandResponse.java
命令执行响应模型,用于接收无人车对命令的执行结果:
```java
@Data
public class CommandResponse {
private String commandId; // 对应的命令ID
private boolean success; // 执行是否成功
private String message; // 执行结果消息
private long timestamp; // 响应时间戳
}
```
### 2.2 数据访问对象 (dao)
数据访问对象负责与外部数据源的通信,获取原始数据:
#### 2.2.1 DataCollectorDao.java
```java
@Slf4j
@Component
public class DataCollectorDao {
// 配置属性
@Value("${data.collector.vehicle-api.base-url}")
private String vehicleBaseUrl;
@Value("${data.collector.vehicle-api.endpoints.vehicle-location}")
private String vehicleLocationEndpoint;
private final RestTemplate restTemplate;
private final AuthService authService;
// 构造函数注入依赖
public DataCollectorDao(RestTemplate restTemplate, AuthService authService) {
this.restTemplate = restTemplate;
this.authService = authService;
}
// 采集航空器数据
public List<Aircraft> collectAircraftData(String endpoint, String baseUrl) {
// 通过HTTP请求获取航空器数据
// 处理响应并返回数据列表
}
// 采集特种车辆数据
public List<SpecialVehicle> collectVehicleData(String endpoint, String baseUrl) {
// 通过HTTP请求获取特种车辆数据
// 处理响应并返回数据列表
}
// 获取无人车位置信息
public List<UnmannedVehicle> getVehicleLocationInfo() {
// 通过HTTP请求获取无人车位置数据
// 处理响应并返回数据列表
}
}
```
DAO层的主要职责
- 构建HTTP请求包括URL、头信息等
- 处理授权认证通过AuthService获取令牌
- 发送请求并接收响应
- 将响应数据转换为系统内部数据模型
- 提供异常处理和日志记录
### 2.3 服务层 (service)
服务层实现核心业务逻辑,包括数据采集调度、处理和存储:
#### 2.3.1 DataCollectorService.java
```java
@Slf4j
@Service
public class DataCollectorService {
// 配置信息
@Value("${data.collector.airport-api.endpoints.vehicle}")
private String airportVehicleEndpoint;
@Value("${data.collector.airport-api.endpoints.aircraft}")
private String airportAircraftEndpoint;
@Value("${data.collector.airport-api.base-url}")
private String airportBaseUrl;
// 内存数据存储
@Getter
ConcurrentHashMap<String, List<Object>> dataMap = new ConcurrentHashMap<>();
@Autowired
private DataCollectorDao dataCollectorDao;
@Autowired
private MovingObjectRepository movingObjectRepository;
// 定时采集航空器数据
@Scheduled(fixedRateString = "${data.collector.interval}")
public void collectAircraftData() {
// 调用DAO获取最新数据
// 处理和更新历史状态
// 存储到仓库中
}
// 定时采集特种车辆数据
@Scheduled(fixedRateString = "${data.collector.interval}")
@Async // 异步执行
public void collectVehicleData() {
// 调用DAO获取最新数据
// 处理和更新历史状态
// 存储到仓库中
}
// 定时采集无人车数据
@Scheduled(fixedRateString = "${data.collector.interval}")
@Async // 异步执行
public void collectVehicleLocationData() {
// 调用DAO获取最新数据
// 处理和更新历史状态
// 存储到仓库中
}
}
```
服务层的主要职责:
- 调度定期数据采集任务
- 处理和转换数据,更新移动物体状态
- 维护移动物体的历史状态记录
- 提供数据缓存和快速访问机制
- 与仓库层交互,进行数据持久化
#### 2.3.2 AuthService.java
负责处理与外部API的认证授权
```java
@Service
public class AuthService {
// 获取访问令牌
public String getToken() {
// 实现获取、缓存和刷新令牌的逻辑
// 可能包括用户名/密码认证或其他方式
}
}
```
#### 2.3.3 DataCleanupService.java
负责定期清理过期数据,防止系统资源占用过多:
```java
@Service
@Slf4j
public class DataCleanupService {
// 定期清理历史数据
@Scheduled(cron = "0 0 1 * * ?") // 每天凌晨1点执行
public void cleanupOldData() {
// 实现清理逻辑
// 例如删除30天前的数据
}
}
```
### 2.4 配置 (config)
定义模块所需的配置类:
#### 2.4.1 RestTemplateConfig.java
配置HTTP客户端用于与外部API通信
```java
@Configuration
public class RestTemplateConfig {
@Bean
public RestTemplate restTemplate(ObjectMapper objectMapper) {
RestTemplate restTemplate = new RestTemplate();
// 配置消息转换器使用自定义的ObjectMapper
// 设置连接超时、读取超时等参数
return restTemplate;
}
@Bean
public ObjectMapper objectMapper() {
ObjectMapper mapper = new ObjectMapper();
// 配置特性,如忽略未知属性等
return mapper;
}
}
```
## 3. 数据流程和工作机制
### 3.1 数据采集流程
```mermaid
sequenceDiagram
participant 外部API
participant DataCollectorDao
participant DataCollectorService
participant MovingObjectRepository
Note over DataCollectorService: 定时触发(@Scheduled)
DataCollectorService->>DataCollectorDao: 请求最新数据
DataCollectorDao->>外部API: HTTP请求(带认证令牌)
外部API-->>DataCollectorDao: 返回原始数据
DataCollectorDao-->>DataCollectorService: 转换为内部数据模型
Note over DataCollectorService: 处理历史状态
DataCollectorService->>DataCollectorService: 创建MovementState对象
DataCollectorService->>DataCollectorService: 添加到历史队列
DataCollectorService->>DataCollectorService: 控制历史记录长度
DataCollectorService->>MovingObjectRepository: 更新移动物体状态
Note over MovingObjectRepository: 存储最新状态供其他模块使用
```
### 3.2 数据更新机制
数据采集模块采用如下机制维护移动物体的状态:
1. **定时轮询**: 以固定时间间隔(通过`${data.collector.interval}`配置向外部API发送请求
2. **增量更新**: 每次只更新发生变化的数据,减少系统负担
3. **历史记录**: 为每个移动物体维护一个固定长度MAX_HISTORY的历史状态队列
4. **并发处理**: 使用ConcurrentHashMap等线程安全容器存储数据
5. **异步执行**: 通过@Async注解实现采集任务的异步处理
### 3.3 数据清理机制
为防止数据无限增长占用系统资源,模块实现了数据清理机制:
1. **内存数据控制**: 移动物体历史状态队列限制最大长度MAX_HISTORY
2. **定期清理**: DataCleanupService定期默认每天凌晨清理过期数据
3. **按时间阈值**: 默认清理30天前的历史数据
## 4. 与其他模块的交互
### 4.1 数据提供
数据采集模块作为数据提供方,与其他模块的交互如下:
```mermaid
flowchart TD
DC[数据采集模块] --> |提供实时位置数据| DP[数据处理模块]
DC --> |提供历史轨迹数据| DP
DC --> |提供移动物体更新| WS[WebSocket模块]
WS --> |推送位置更新给客户端| Client[客户端]
DP --> |碰撞风险分析| WS
DC --> |查询历史数据| API[控制器API]
```
### 4.2 接口定义
数据采集模块通过以下方式向其他模块提供数据:
1. **直接依赖注入**:
```java
@Service
public class ProcessingService {
@Autowired
private MovingObjectRepository movingObjectRepository;
// 使用仓库获取最新数据进行处理
}
```
2. **事件驱动**:
```java
// 在DataCollectorService中
private final ApplicationEventPublisher eventPublisher;
// 当检测到新数据时发布事件
eventPublisher.publishEvent(new NewDataEvent(data));
// 在其他模块中
@EventListener
public void handleNewData(NewDataEvent event) {
// 处理新数据
}
```
## 5. 配置项
数据采集模块通过application.yml或application.properties文件配置以下参数
```yaml
data:
collector:
interval: 5000 # 数据采集间隔(毫秒)
airport-api:
base-url: "https://api.airport.example.com"
endpoints:
vehicle: "/api/vehicles"
aircraft: "/api/aircrafts"
vehicle-api:
base-url: "https://api.vehicle-vendor.example.com"
endpoints:
vehicle-location: "/api/location"
```
## 6. 扩展性设计
### 6.1 增加新数据源
系统设计支持轻松添加新的数据源:
1. 在DataCollectorDao中添加新的数据采集方法
2. 在DataCollectorService中添加对应的定时任务方法
3. 更新配置文件添加新数据源的URL和端点
4. 根据需要添加新的数据模型类
### 6.2 支持不同协议
当前系统主要通过HTTP REST API获取数据但架构设计支持扩展其他协议
1. 通过创建专用的连接器类,如 MqttConnector、WebSocketConnector等
2. 在配置中指定通信协议和参数
3. 实现相应的数据处理和转换逻辑
## 7. 安全考虑
数据采集模块实现了以下安全措施:
1. **认证授权**通过AuthService管理API访问令牌定期刷新
2. **数据校验**:验证接收数据的完整性和有效性
3. **异常处理**:妥善处理网络错误和数据异常,防止系统崩溃
4. **数据隔离**:使用独立的数据模型,防止外部数据直接影响核心系统
## 8. 性能优化
为确保高性能运行,数据采集模块采用以下策略:
1. **并发处理**:使用线程池和异步任务处理多个数据源
2. **数据缓存**:通过内存缓存减少重复数据处理
3. **批量处理**:一次处理多条记录,减少系统调用开销
4. **增量更新**:只处理和传输变化的数据
5. **连接池**复用HTTP连接减少连接建立开销
## 9. 监控和错误处理
### 9.1 日志记录
系统使用SLF4J进行全面的日志记录
```java
log.info("成功获取航空器数据,数量: {}", dataList.size());
log.error("采集航空器数据失败: {}", endpoint, e);
```
### 9.2 异常处理
采用try-catch块捕获并处理异常确保数据采集失败不会影响整个系统运行
```java
try {
// 数据采集逻辑
} catch (Exception e) {
log.error("数据采集异常: {}", e.getMessage(), e);
return Collections.emptyList(); // 返回空结果而非抛出异常
}
```
## 10. 未来改进
数据采集模块计划的未来改进方向:
1. **自适应采集频率**:根据数据变化频率动态调整采集间隔
2. **数据源健康检查**:定期检测数据源可用性,自动切换备用源
3. **数据质量评估**:引入数据质量评分机制,过滤低质量数据
4. **实时数据流**从轮询机制升级到实时数据流如WebSocket、MQTT
5. **数据压缩**:对历史数据进行智能压缩,减少存储需求

178
doc/design_document.md Normal file
View File

@ -0,0 +1,178 @@
# 碰撞避免系统数据结构设计文档
## 1. 数据结构概述
本文档描述碰撞避免系统中的核心数据结构设计及其关系。系统采用面向对象设计方法,通过继承和组合实现不同移动物体类型的统一管理。
## 2. 核心数据结构设计
### 2.1 类图
```mermaid
classDiagram
class MovingObject {
+GeoPosition currentPosition
+Velocity velocity
+double heading
+long timestamp
+Deque~MovementState~ stateHistory
+int MAX_HISTORY
+double maxSpeed
+MovingObjectType type
}
class Aircraft {
+String flightNo
+Long trackNumber
}
class SpecialVehicle {
+String vehicleNo
}
class UnmannedVehicle {
+String transId
+String vehicleId
}
class GeoPosition {
+double latitude
+double longitude
+double altitude
}
class Velocity {
+double x, y, z
+double vx, vy, vz
+double confidence
+double cachedAcceleration
+getSpeed()
}
class MovementState {
+GeoPosition position
+Velocity velocity
+double heading
+long timestamp
+DataQuality dataQuality
}
class MovingObjectType {
<<enumeration>>
AIRCRAFT
SPECIAL_VEHICLE
UNMANNED_VEHICLE
}
MovingObject <|-- Aircraft
MovingObject <|-- SpecialVehicle
MovingObject <|-- UnmannedVehicle
MovingObject "1" *-- "1" GeoPosition
MovingObject "1" *-- "1" Velocity
MovingObject "1" *-- "0..*" MovementState
MovingObject "1" *-- "1" MovingObjectType
MovementState "1" *-- "1" GeoPosition
MovementState "1" *-- "1" Velocity
```
## 3. 数据结构详解
### 3.1 基础抽象类 - MovingObject
`MovingObject` 是系统中所有移动物体的基类,定义了共有属性:
- **currentPosition**: 当前地理位置,使用 `GeoPosition` 类表示,包含经度、纬度和高度信息
- **velocity**: 局部坐标系位置消息xy坐标、速度等等使用 `Velocity` 类表示,包含三维速度向量
- **heading**: 航向角度,以度为单位
- **timestamp**: 时间戳,表示数据最后更新时间
- **stateHistory**: 历史状态队列,存储 `MovementState` 对象,用于轨迹分析
- **MAX_HISTORY**: 历史记录最大保存数量默认为30
- **maxSpeed**: 最大速度限制
- **type**: 移动物体类型,使用 `MovingObjectType` 枚举
### 3.2 具体移动物体类型
系统实现了三种具体的移动物体类型,它们都继承自 `MovingObject` 抽象类:
#### 3.2.1 航空器 (Aircraft)
航空器特有属性:
- **flightNo**: 航班号,字符串类型,用于唯一标识航班
- **trackNumber**: 航迹号,长整型,用于雷达跟踪标识
构造函数接收位置参数(纬度、经度、高度)和时间戳,初始化基类属性。
#### 3.2.2 特勤车辆 (SpecialVehicle)
特勤车辆特有属性:
- **vehicleNo**: 车牌号,字符串类型,用于唯一标识车辆
构造函数接收位置参数(纬度、经度)、时间戳、速度和方向,并据此计算速度向量。特勤车辆被标记为"不可控"对象,表示系统只能监控而不能控制其行为。
#### 3.2.3 无人车 (UnmannedVehicle)
无人车特有属性:
- **transId**: 消息唯一ID用于消息跟踪
- **vehicleId**: 车辆ID用于唯一标识无人车
构造函数接收位置参数(经度、纬度)、航向和速度,并据此计算速度向量。无人车被标记为"可控"对象,表示系统可以向其发送控制指令。
### 3.3 辅助数据结构
#### 3.3.1 地理位置 - GeoPosition
`GeoPosition` 表示三维空间中的位置:
- **latitude**: 纬度,单位为度
- **longitude**: 经度,单位为度
- **altitude**: 高度,单位为米
#### 3.3.2 速度 - Velocity
`Velocity` 描述局部坐标系的位置消息:
- **x, y, z**: 三维坐标系中的位置
- **vx, vy, vz**: 三个方向上的速度分量,单位为米/秒
- **confidence**: 速度计算置信度取值范围0-1
- **cachedAcceleration**: 加速度计算结果缓存
- **getSpeed()**: 计算速度标量的方法
#### 3.3.3 运动状态 - MovementState
`MovementState` 封装了移动物体在特定时刻的完整状态:
- **position**: 地理位置GeoPosition类型
- **velocity**: 局部坐标系的位置消息Velocity类型
- **heading**: 航向,度数
- **timestamp**: 时间戳
- **dataQuality**: 数据质量枚举,表示数据可靠性
#### 3.3.4 移动物体类型 - MovingObjectType
`MovingObjectType` 是一个枚举类型,定义了系统支持的移动物体类型:
- **AIRCRAFT**: 航空器(飞机)
- **SPECIAL_VEHICLE**: 特勤车辆(不可控)
- **UNMANNED_VEHICLE**: 无人车(可控)
## 4. 数据流转机制
### 4.1 历史状态存储机制
每个 `MovingObject` 对象维护一个 `stateHistory` 队列,用于存储历史状态:
1. 当对象位置或速度更新时,系统创建新的 `MovementState` 对象
2. 新的状态对象被添加到 `stateHistory` 队列
3. 如果队列长度超过 `MAX_HISTORY`默认30最旧的记录会被移除
4. 历史记录用于轨迹分析、异常检测和预测计算
```mermaid
sequenceDiagram
participant 系统
participant MovingObject
participant stateHistory队列
系统->>MovingObject: 更新位置和速度
MovingObject->>MovingObject: 创建MovementState对象
MovingObject->>stateHistory队列: 添加新状态记录
alt 队列长度 > MAX_HISTORY
stateHistory队列->>stateHistory队列: 移除最旧记录
end
```

139
doc/directory_structure.md Normal file
View File

@ -0,0 +1,139 @@
# 碰撞避免系统目录结构说明
本文档描述了碰撞避免系统项目的目录结构及各个目录的功能和用途。
## 1. 根目录结构
```
CollisionAvoidanceSystem/
├── doc/ # 文档目录,包含系统设计和说明文档
├── src/ # 源代码目录
├── target/ # 编译输出目录
├── .idea/ # IntelliJ IDEA项目配置目录
├── .mvn/ # Maven包装器配置目录
├── .git/ # Git版本控制目录
├── pom.xml # Maven项目配置文件
├── mvnw # Maven包装器脚本(Unix/Linux)
├── mvnw.cmd # Maven包装器脚本(Windows)
├── README.md # 项目说明文档
├── VERSION.txt # 版本信息文件
├── change_log.md # 变更日志
├── development_log.md # 开发日志
├── .gitignore # Git忽略文件配置
└── .gitattributes # Git属性配置
```
## 2. 源代码目录结构
源代码目录(`src`)包含了项目的所有Java源代码和资源文件
```
src/
├── main/ # 主要源代码
│ ├── java/ # Java源代码
│ │ └── com/
│ │ └── dongni/
│ │ └── collisionavoidance/ # 应用程序主包
│ └── resources/ # 配置文件和静态资源
└── test/ # 测试源代码
```
## 3. 应用程序主包结构
应用程序主包(`com.dongni.collisionavoidance`)组织如下:
```
com.dongni.collisionavoidance/
├── CollisionAvoidanceApplication.java # 应用程序入口类
├── common/ # 通用组件目录
│ ├── model/ # 数据模型定义
│ └── config/ # 通用配置
├── config/ # 应用程序配置
├── controller/ # 控制器层处理HTTP请求
├── dataCollector/ # 数据采集模块
├── dataProcessing/ # 数据处理模块
└── webSocket/ # WebSocket通信模块
```
## 4. 数据模型目录
数据模型目录(`com.dongni.collisionavoidance.common.model`)包含系统的核心数据结构定义:
```
common/model/
├── Aircraft.java # 航空器模型
├── GeoPosition.java # 地理位置模型
├── MovementState.java # 运动状态模型
├── MovingObject.java # 移动物体抽象基类
├── MovingObjectType.java # 移动物体类型枚举
├── SpecialVehicle.java # 特勤车辆模型
├── UnmannedVehicle.java # 无人车模型
└── Velocity.java # 速度模型
```
## 5. 各模块功能说明
### 5.1 数据模型 (common/model)
数据模型模块定义了系统中使用的所有数据结构:
- **MovingObject**: 所有移动物体的抽象基类,定义共有属性和行为
- **Aircraft**: 代表航空器的具体实现类
- **SpecialVehicle**: 代表特勤车辆的具体实现类
- **UnmannedVehicle**: 代表无人车的具体实现类
- **GeoPosition**: 表示地理位置的数据结构
- **Velocity**: 表示速度和局部坐标系位置信息的数据结构
- **MovementState**: 封装移动物体在特定时刻的完整状态
- **MovingObjectType**: 定义了系统支持的移动物体类型的枚举
### 5.2 数据采集模块 (dataCollector)
负责从各种数据源获取移动物体的实时位置和状态信息:
- 航空器数据采集ADS-B、雷达等
- 特勤车辆数据采集GPS、地面雷达等
- 无人车数据采集(车载传感器等)
### 5.3 数据处理模块 (dataProcessing)
处理和分析采集到的数据:
- 轨迹计算和预测
- 碰撞风险评估
- 数据质量检查和过滤
- 历史数据管理和分析
### 5.4 控制器层 (controller)
提供RESTful API接口用于
- 数据查询和检索
- 系统配置和管理
- 状态报告和监控
### 5.5 WebSocket模块 (webSocket)
提供实时通信功能:
- 推送实时位置更新
- 发送碰撞警告
- 支持客户端实时监控
### 5.6 配置模块 (config)
包含应用程序的配置类:
- 系统参数配置
- 服务注册和发现
- 安全配置
- 数据库连接配置
## 6. 文档目录 (doc)
包含系统相关的设计文档和说明文档:
```
doc/
├── design_document.md # 系统数据结构设计文档
└── directory_structure.md # 目录结构说明文档(本文档)
```

53
doc/坐标转换.txt Normal file
View File

@ -0,0 +1,53 @@
# WGS84到自定义参考点坐标系的转换
## 1. 坐标系说明
### 1.1 源坐标系WGS84-EPSG:4326
- 经度longitude地球表面上一点与本初子午线的角度差东经为正西经为负
- 纬度latitude地球表面上一点与赤道平面的角度差北纬为正南纬为负
- 高度height相对于WGS84椭球面的高度单位为米
### 1.2 目标坐标系(自定义局部坐标系)
- 原点:机场中心点(由配置文件指定的经纬度)
- X轴指向东方单位为米
- Y轴指向北方单位为米
- Z轴指向天顶单位为米
## 2. 转换步骤
### 2.1 初始化
- 设置机场中心点经纬度作为坐标系原点
- 建立局部坐标系ENU - East, North, Up
### 2.2 坐标转换过程
1. 将WGS84经纬度转换为地心地固坐标系ECEF
2. 计算机场中心点的ECEF坐标
3. 计算目标点相对于机场中心点的偏移向量
4. 应用旋转矩阵将ECEF偏移向量转换为局部ENU坐标
## 3. 注意事项
### 3.1 精度考虑
- 当距离机场中心点较远时(>100km需考虑地球曲率影响
- 建议在较小范围内使用半径10km以内以保持较高精度
### 3.2 高度处理
- 默认使用WGS84椭球高度
- 如需使用海拔高度,需额外考虑大地水准面差异
## 4. 代码示例
```java
// 坐标转换服务初始化
CoordinateSystemService service = new CoordinateSystemService(properties);
// WGS84经纬度转换为局部坐标
double[] localCoord = service.convertToLocalCoordinate(longitude, latitude);
// localCoord[0] 为东向坐标X
// localCoord[1] 为北向坐标Y
```
## 5. 参考资料
- WGS84坐标系EPSG:4326规范
- 《大地测量学基础》
- PROJ坐标转换库文档

View File

@ -0,0 +1,142 @@
# 速度计算系统设计文档
## 1. 数据结构分析
### 1.1 当前数据结构
目前系统中已实现的数据结构如下:
#### MovementState运动状态
```java
public class MovementState {
public GeoPosition position; // 位置信息
public Velocity velocity; // 速度信息
public double heading; // 航向(度)
public long timestamp; // 时间戳(毫秒)
}
```
#### MovingObject移动对象基类
```java
public abstract class MovingObject {
public GeoPosition currentPosition; // 当前位置
public Velocity velocity; // 速度
public double heading; // 航向(度)
public long timestamp; // 时间戳(毫秒)
public Deque<MovementState> stateHistory; // 历史状态队列
public int MAX_HISTORY = 10; // 最大历史记录数
public double maxSpeed; // 最大速度
public MovingObjectType type; // 类型枚举
}
```
### 1.2 数据结构评估
当前数据结构基本合理,但需要考虑以下几点优化:
1. **历史状态队列**
- 当前MAX_HISTORY=10可能不足以支持平滑的速度计算
- 建议增加到30-60以支持更长时间窗口的数据分析
2. **速度计算相关字段**
- 需要在Velocity类中添加速度计算的置信度字段
- 考虑添加加速度计算结果的缓存字段
3. **数据质量标记**
- 建议在MovementState中添加数据质量标记字段
- 用于标识原始数据是否可靠
## 2. 速度计算方案
### 2.1 基本思路
1. **数据预处理**
- 对原始位置数据进行异常值检测
- 使用卡尔曼滤波进行数据平滑
2. **速度计算方法**
- 使用滑动窗口计算平均速度
- 根据飞机状态(起飞/降落)调整计算参数
### 2.2 具体实现方案
#### 2.2.1 数据预处理
1. **异常值检测**
```java
// 检测标准:
- 位置跳变:相邻两点间距离超过合理范围
- 时间异常:时间戳倒退或跳变
- 速度异常:计算得到的速度超过最大限制
```
2. **卡尔曼滤波**
- 状态向量:[x, y, vx, vy]
- 观测向量:[x, y]
- 考虑加速度作为系统噪声
#### 2.2.2 速度计算
1. **滑动窗口计算**
```java
// 窗口大小:
- 起飞状态5-10秒
- 降落状态3-5秒
```
2. **加速度约束**
```java
// 合理加速度范围:
- 起飞0.5-2.5 m/s²
- 降落:-2.0-0 m/s²
```
### 2.3 状态判断
1. **起飞状态判断**
- 速度持续增加
- 高度持续增加
- 位置相对跑道起点的距离增加
2. **降落状态判断**
- 速度持续减小
- 高度持续减小
- 位置接近跑道
## 3. 实现步骤
1. 更新数据结构:
- 修改MAX_HISTORY值
- 添加数据质量字段
- 添加速度计算置信度字段
2. 实现数据预处理:
- 异常值检测
- 卡尔曼滤波
3. 实现速度计算:
- 滑动窗口机制
- 状态判断
- 加速度约束
4. 添加数据质量监控:
- 记录异常数据比例
- 监控速度计算置信度
## 4. 注意事项
1. **数据同步**
- 确保位置数据的时间戳准确
- 处理数据延迟和丢失情况
2. **计算效率**
- 优化卡尔曼滤波计算
- 合理设置滑动窗口大小
3. **异常处理**
- 定义清晰的异常处理流程
- 保存异常数据用于分析
4. **参数调优**
- 根据实际运行数据调整参数
- 建立参数自适应机制

46
pom.xml
View File

@ -28,7 +28,29 @@
</scm>
<properties>
<java.version>17</java.version>
<!-- 使用经过验证的稳定版本 -->
<geotools.version>30.0</geotools.version>
</properties>
<repositories>
<repository>
<id>osgeo</id>
<name>OSGeo Release Repository</name>
<url>https://repo.osgeo.org/repository/release/</url>
<snapshots><enabled>false</enabled></snapshots>
<releases><enabled>true</enabled></releases>
</repository>
<repository>
<id>osgeo-snapshot</id>
<name>OSGeo Snapshot Repository</name>
<url>https://repo.osgeo.org/repository/snapshot/</url>
<snapshots><enabled>true</enabled></snapshots>
<releases><enabled>false</enabled></releases>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
@ -87,6 +109,30 @@
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- GeoTools坐标转化框架 -->
<!-- 核心三件套 -->
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-main</artifactId>
<version>${geotools.version}</version>
</dependency>
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-referencing</artifactId>
<version>${geotools.version}</version>
</dependency>
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-epsg-hsql</artifactId>
<version>${geotools.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -2,12 +2,14 @@ package com.dongni.collisionavoidance;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.data.mongodb.repository.config.EnableMongoRepositories;
@EnableScheduling
@SpringBootApplication
@EnableMongoRepositories
@EnableConfigurationProperties
public class CollisionAvoidanceApplication {
public static void main(String[] args) {

View File

@ -37,6 +37,23 @@ public class Aircraft extends MovingObject{
@JsonProperty("time") long timestamp
) {
this.currentPosition = new GeoPosition(latitude, longitude, altitude);
this.velocity = new Velocity();
this.timestamp = timestamp;
}
@Override
public String toString() {
return "Aircraft{" +
"flightNo='" + flightNo + '\'' +
", trackNumber=" + trackNumber +
", currentPosition=" + currentPosition +
", heading=" + heading +
", velocity=" + velocity +
", timestamp=" + timestamp +
", stateHistory=" + stateHistory +
", MAX_HISTORY=" + MAX_HISTORY +
", maxSpeed=" + maxSpeed +
", type=" + type +
'}';
}
}

View File

@ -15,5 +15,21 @@ public class MovementState {
public double heading;
// 时间戳毫秒
public long timestamp;
// 数据质量标记
public DataQuality dataQuality = DataQuality.UNKNOWN;
public MovementState(GeoPosition position, Velocity velocity, double heading, long timestamp) {
this.position = position;
this.velocity = velocity;
this.heading = heading;
this.timestamp = timestamp;
}
public enum DataQuality {
GOOD, // 数据质量良好
SUSPICIOUS, // 数据可疑
BAD, // 数据质量差
UNKNOWN // 未知质量
}
}

View File

@ -27,7 +27,7 @@ public abstract class MovingObject {
// 历史状态队列
public Deque<MovementState> stateHistory = new ArrayDeque<>();
// 最大历史记录数
public int MAX_HISTORY = 10;
public int MAX_HISTORY = 30; // 最大历史记录数支持更长时间窗口的数据分析
// 最大速度子类初始化
public double maxSpeed;
// 类型枚举

View File

@ -36,8 +36,23 @@ public class SpecialVehicle extends MovingObject{
// 对微小值归零阈值可根据需求调整
vx = Math.abs(vx) < 1e-10 ? 0.0 : vx;
vy = Math.abs(vy) < 1e-10 ? 0.0 : vy;
this.velocity = new Velocity(vx,vy,0);
this.velocity = new Velocity(0,0,0,vx,vy,0,0);
this.heading = heading;
this.timestamp = timestamp;
}
@Override
public String toString() {
return "SpecialVehicle{" +
"vehicleNo='" + vehicleNo + '\'' +
", currentPosition=" + currentPosition +
", velocity=" + velocity +
", heading=" + heading +
", timestamp=" + timestamp +
", stateHistory=" + stateHistory +
", MAX_HISTORY=" + MAX_HISTORY +
", maxSpeed=" + maxSpeed +
", type=" + type +
'}';
}
}

View File

@ -39,7 +39,23 @@ public class UnmannedVehicle extends MovingObject{
// 对微小值归零阈值可根据需求调整
vx = Math.abs(vx) < 1e-10 ? 0.0 : vx;
vy = Math.abs(vy) < 1e-10 ? 0.0 : vy;
this.velocity = new Velocity(vx,vy,0);
this.velocity = new Velocity(0,0,0,vx,vy,0,0);
this.heading = heading;
}
@Override
public String toString() {
return "UnmannedVehicle{" +
"transId='" + transId + '\'' +
", vehicleId='" + vehicleId + '\'' +
", currentPosition=" + currentPosition +
", velocity=" + velocity +
", heading=" + heading +
", timestamp=" + timestamp +
", stateHistory=" + stateHistory +
", MAX_HISTORY=" + MAX_HISTORY +
", maxSpeed=" + maxSpeed +
", type=" + type +
'}';
}
}

View File

@ -4,18 +4,40 @@ import lombok.Data;
@Data
public class Velocity {
// 东向速度/
private final double x;
// 北向速度/
private final double y;
// 垂直速度/
private final double z;
// 东向坐标/
private double x = 0;
// 北向坐标/
private double y = 0;
// 垂直坐标/
private double z = 0;
// 东向速度/
private double vx = 0;
// 北向速度/
private double vy = 0;
// 垂直速度/
private double vz = 0;
// 速度计算置信度0-1
private double confidence = 0.0;
// 加速度计算结果缓存
private double cachedAcceleration = 0.0;
// 计算速度标量
public double getSpeed() {
return Math.sqrt(x * x + y * y + z * z);
return Math.sqrt(vx * vx + vy * vy + vz * vz);
}
public Velocity() {
}
public Velocity(double x, double y, double vy, double vx, double z, double vz, double cachedAcceleration) {
this.x = x;
this.y = y;
this.vy = vy;
this.vx = vx;
this.z = z;
this.vz = vz;
this.cachedAcceleration = cachedAcceleration;
}
}

View File

@ -0,0 +1,93 @@
package com.dongni.collisionavoidance.common.model.repository;
import com.dongni.collisionavoidance.common.model.MovingObject;
import com.dongni.collisionavoidance.common.model.MovingObjectType;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
@Component
public class MovingObjectRepository {
// 使用嵌套Map存储不同类型对象 Key: 类型枚举值 Value: 对应类型对象Map
private final ConcurrentHashMap<MovingObjectType, ConcurrentHashMap<String, MovingObject>> storage =
new ConcurrentHashMap<>();
// 线程安全队列存储增量更新通知
private final BlockingQueue<Map<MovingObjectType, Set<String>>> updateQueue =
new LinkedBlockingQueue<>(100);
public MovingObjectRepository() {
// 初始化存储结构
Arrays.stream(MovingObjectType.values()).forEach(type ->
storage.put(type, new ConcurrentHashMap<>()));
}
// 原子更新方法保留单个更新
public void updateObject(MovingObjectType type, String id, MovingObject obj) {
ConcurrentHashMap<String, MovingObject> typeMap = storage.get(type);
if (typeMap != null) {
typeMap.put(id, obj);
// 移除单条更新时的队列推送
}
}
// 新增批量提交方法
public void commitBatchUpdate(MovingObjectType type) {
ConcurrentHashMap<String, MovingObject> typeMap = storage.get(type);
if (typeMap != null && !typeMap.isEmpty()) {
// 发送全量ID集合
Map<MovingObjectType, Set<String>> delta = new HashMap<>();
delta.put(type, ConcurrentHashMap.newKeySet());
delta.get(type).addAll(typeMap.keySet());
updateQueue.offer(delta);
}
}
// 批量更新方法原子操作
public void batchUpdate(MovingObjectType type, Map<String, MovingObject> batchData) {
ConcurrentHashMap<String, MovingObject> typeMap = storage.get(type);
if (typeMap != null) {
// 使用putAll原子操作
typeMap.putAll(batchData);
// 发送批量更新通知
Map<MovingObjectType, Set<String>> delta = new HashMap<>();
delta.put(type, batchData.keySet());
updateQueue.offer(delta);
}
}
// 获取类型快照线程安全
public Map<String, MovingObject> getSnapshot(MovingObjectType type) {
return new ConcurrentHashMap<>(storage.getOrDefault(type, new ConcurrentHashMap<>()));
}
// 阻塞获取更新通知
public Map<MovingObjectType, Set<String>> takeUpdate() throws InterruptedException {
return updateQueue.take();
}
// 添加在 MovingObjectRepository 类中
// 批量更新整个类型集合
public void updateTypeAll(MovingObjectType type, Map<String, MovingObject> newObjects) {
ConcurrentHashMap<String, MovingObject> typeMap = storage.get(type);
if (typeMap != null) {
typeMap.clear();
typeMap.putAll(newObjects);
// 发送全量更新通知
Map<MovingObjectType, Set<String>> delta = new HashMap<>();
delta.put(type, new HashSet<>(newObjects.keySet()));
updateQueue.offer(delta);
}
}
// 获取整个类型集合的引用注意直接操作需自行保证线程安全
public ConcurrentHashMap<String, MovingObject> getTypeMapDirect(MovingObjectType type) {
return storage.get(type);
}
}

View File

@ -0,0 +1,22 @@
package com.dongni.collisionavoidance.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
public class ThreadPoolConfig {
@Bean(name = "processingExecutor")
public Executor processingExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("data-process-");
executor.initialize();
return executor;
}
}

View File

@ -0,0 +1,24 @@
package com.dongni.collisionavoidance.controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@RestController
@RequestMapping("/api/monitor")
public class DataMonitorController {
// private final DataProcessor dataProcessor;
//
// public DataMonitorController(DataProcessor dataProcessor) {
// this.dataProcessor = dataProcessor;
// }
//
// @GetMapping("/data")
// public Map<String, List<Object>> getMonitorData() {
// return dataProcessor.getCurrentData();
// }
}

View File

@ -5,10 +5,6 @@ import com.dongni.collisionavoidance.common.model.Aircraft;
import com.dongni.collisionavoidance.common.model.SpecialVehicle;
import com.dongni.collisionavoidance.common.model.UnmannedVehicle;
import com.dongni.collisionavoidance.common.model.base.Response;
import com.dongni.collisionavoidance.common.model.dto.AircraftDTO;
import com.dongni.collisionavoidance.common.model.dto.SpecialVehicleDTO;
import com.dongni.collisionavoidance.dataCollector.document.VehicleLocationDocument;
import com.dongni.collisionavoidance.dataCollector.model.VehicleLocationInfo;
import com.dongni.collisionavoidance.dataCollector.service.AuthService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
@ -21,7 +17,6 @@ import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriComponentsBuilder;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.List;

View File

@ -1,24 +0,0 @@
package com.dongni.collisionavoidance.dataCollector.document;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.index.Indexed;
import org.springframework.data.mongodb.core.mapping.Document;
import java.time.LocalDateTime;
@Document(collection = "vehicle_locations")
@Data
public class VehicleLocationDocument {
@Id
private String id;
private String vehicleId;
private double longitude;
private double latitude;
private double direction;
private double speed;
private long timestamp;
@Indexed
private LocalDateTime createTime;
}

View File

@ -1,13 +0,0 @@
package com.dongni.collisionavoidance.dataCollector.repository;
import com.dongni.collisionavoidance.dataCollector.document.VehicleLocationDocument;
import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.stereotype.Repository;
import java.util.List;
@Repository
public interface VehicleLocationRepository extends MongoRepository<VehicleLocationDocument, String> {
List<VehicleLocationDocument> findByVehicleIdAndTimestampBetween(
String vehicleId, long startTime, long endTime);
}

View File

@ -1,15 +1,7 @@
package com.dongni.collisionavoidance.dataCollector.service;
import com.dongni.collisionavoidance.dataCollector.document.VehicleLocationDocument;
import com.mongodb.client.result.DeleteResult;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
@Service
@Slf4j

View File

@ -3,18 +3,22 @@ package com.dongni.collisionavoidance.dataCollector.service;
import com.dongni.collisionavoidance.common.model.*;
import com.dongni.collisionavoidance.common.model.dto.AircraftDTO;
import com.dongni.collisionavoidance.common.model.dto.SpecialVehicleDTO;
import com.dongni.collisionavoidance.common.model.repository.MovingObjectRepository;
import com.dongni.collisionavoidance.dataCollector.dao.DataCollectorDao;
import com.dongni.collisionavoidance.dataCollector.model.VehicleLocationInfo;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.stream.Collectors;
@Slf4j
@ -35,65 +39,48 @@ public class DataCollectorService {
@Getter
ConcurrentHashMap<String, List<Object>> dataMap = new ConcurrentHashMap<>();
// 使用ConcurrentHashMap存储所有移动物体的最新状态
// key为物体的唯一标识如航班号value为对应的移动物体
private final ConcurrentHashMap<String, Aircraft> aircraftMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, SpecialVehicle> vehicleMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, UnmannedVehicle> unmannedVehicleMap = new ConcurrentHashMap<>();
@Autowired
private DataCollectorDao dataCollectorDao;
private final DataCollectorDao dataCollectorDao;
@Autowired
private MovingObjectRepository movingObjectRepository;
public DataCollectorService(DataCollectorDao dataCollectorDao) {
this.dataCollectorDao = dataCollectorDao;
}
@Scheduled(fixedRateString = "${data.collector.interval}")
@Async // 异步执行
// @Async // 异步执行
public void collectAircraftData() {
List<Aircraft> newAircrafts = dataCollectorDao.collectAircraftData(airportAircraftEndpoint, airportBaseUrl);
for (Aircraft newAircraft : newAircrafts) {
String flightNo = newAircraft.getFlightNo();
Map<String, MovingObject> snapshot = movingObjectRepository.getTypeMapDirect(MovingObjectType.AIRCRAFT);
Aircraft existingAircraft = (Aircraft) snapshot.get(flightNo);
// 获取已存在的航空器如果存在
Aircraft existingAircraft = aircraftMap.get(flightNo);
if (existingAircraft != null) {
// 更新现有航空器的状态
MovementState currentState = new MovementState(
newAircraft.getCurrentPosition(),
newAircraft.getVelocity(),
newAircraft.getHeading(),
newAircraft.getTimestamp()
);
// 使用已存在航空器的历史队列
existingAircraft.setCurrentPosition(newAircraft.getCurrentPosition());
existingAircraft.setVelocity(newAircraft.getVelocity());
existingAircraft.setHeading(newAircraft.getHeading());
existingAircraft.setTimestamp(newAircraft.getTimestamp());
existingAircraft.getStateHistory().addFirst(currentState);
existingAircraft.getCurrentPosition(),
existingAircraft.getVelocity(),
existingAircraft.getHeading(),
existingAircraft.getTimestamp());
// 控制历史记录长度
if (existingAircraft.getStateHistory().size() > existingAircraft.MAX_HISTORY) {
if (existingAircraft.getStateHistory().size() + 1 > existingAircraft.MAX_HISTORY) {
existingAircraft.getStateHistory().removeLast();
}
} else {
// 新的航空器初始化历史记录
MovementState initialState = new MovementState(
newAircraft.getCurrentPosition(),
newAircraft.getVelocity(),
newAircraft.getHeading(),
newAircraft.getTimestamp()
);
newAircraft.getStateHistory().addFirst(initialState);
aircraftMap.put(flightNo, newAircraft);
existingAircraft.getStateHistory().addFirst(currentState);
newAircraft.setStateHistory(existingAircraft.getStateHistory());
}
// 将List转为MapflightNo -> Aircraft
Map<String, MovingObject> aircraftBatch = newAircrafts.stream()
.collect(Collectors.toMap(
Aircraft::getFlightNo,
aircraft -> aircraft,
(existing, replacement) -> existing,
ConcurrentHashMap::new
));
movingObjectRepository.updateTypeAll(MovingObjectType.AIRCRAFT, aircraftBatch);
}
// 更新数据Map用于其他服务访问
storeData(MovingObjectType.AIRCRAFT.toString(), new ArrayList<>(aircraftMap.values()));
}
@Scheduled(fixedRateString = "${data.collector.interval}")
@ -103,106 +90,109 @@ public class DataCollectorService {
for (SpecialVehicle newVehicle : vehicles) {
String vehicleNo = newVehicle.getVehicleNo();
// 获取已存在的航空器如果存在
SpecialVehicle specialVehicle = vehicleMap.get(vehicleNo);
Map<String, MovingObject> snapshot = movingObjectRepository.getSnapshot(MovingObjectType.SPECIAL_VEHICLE);
SpecialVehicle specialVehicle = (SpecialVehicle) snapshot.get(vehicleNo);
if (specialVehicle != null) {
// 更新现有航空器的状态
MovementState currentState = new MovementState(
newVehicle.getCurrentPosition(),
newVehicle.getVelocity(),
newVehicle.getHeading(),
newVehicle.getTimestamp()
specialVehicle.getCurrentPosition(),
specialVehicle.getVelocity(),
specialVehicle.getHeading(),
specialVehicle.getTimestamp()
);
// 使用已存在航空器的历史队列
specialVehicle.setCurrentPosition(newVehicle.getCurrentPosition());
specialVehicle.setVelocity(newVehicle.getVelocity());
specialVehicle.setHeading(newVehicle.getHeading());
specialVehicle.setTimestamp(newVehicle.getTimestamp());
specialVehicle.getStateHistory().addFirst(currentState);
// 控制历史记录长度
if (specialVehicle.getStateHistory().size() > specialVehicle.MAX_HISTORY) {
specialVehicle.getStateHistory().removeLast();
}
} else {
// 新的航空器初始化历史记录
MovementState initialState = new MovementState(
newVehicle.getCurrentPosition(),
newVehicle.getVelocity(),
newVehicle.getHeading(),
newVehicle.getTimestamp()
);
newVehicle.getStateHistory().addFirst(initialState);
vehicleMap.put(vehicleNo, newVehicle);
specialVehicle.getStateHistory().addFirst(currentState);
newVehicle.setStateHistory(specialVehicle.getStateHistory());
}
}
// 将List转为MapvehicleNo -> SpecialVehicle
Map<String, MovingObject> vehicleBatch = vehicles.stream()
.collect(Collectors.toMap(
SpecialVehicle::getVehicleNo,
vehicle -> vehicle,
(existing, replacement) -> existing,
ConcurrentHashMap::new
));
movingObjectRepository.updateTypeAll(MovingObjectType.SPECIAL_VEHICLE, vehicleBatch);
}
// @Scheduled(fixedRateString = "${data.collector.interval}")
// @Async // 异步执行
// public void collectVehicleLocationData() {
// List<UnmannedVehicle> unmannedVehicles = dataCollectorDao.getVehicleLocationInfo();
// for (UnmannedVehicle newVehicle : unmannedVehicles) {
// String vehicleNo = newVehicle.getVehicleId();
// // 获取已存在的航空器如果存在
// UnmannedVehicle unmannedVehicle = unmannedVehicleMap.get(vehicleNo);
// if (unmannedVehicle != null) {
// // 更新现有航空器的状态
// MovementState currentState = new MovementState(
// newVehicle.getCurrentPosition(),
// newVehicle.getVelocity(),
// newVehicle.getHeading(),
// newVehicle.getTimestamp()
// );
//
// // 使用已存在航空器的历史队列
// unmannedVehicle.setCurrentPosition(newVehicle.getCurrentPosition());
// unmannedVehicle.setVelocity(newVehicle.getVelocity());
// unmannedVehicle.setHeading(newVehicle.getHeading());
// unmannedVehicle.setTimestamp(newVehicle.getTimestamp());
// unmannedVehicle.getStateHistory().addFirst(currentState);
//
// // 控制历史记录长度
// if (unmannedVehicle.getStateHistory().size() > unmannedVehicle.MAX_HISTORY) {
// unmannedVehicle.getStateHistory().removeLast();
// }
// } else {
// // 新的航空器初始化历史记录
// MovementState initialState = new MovementState(
// newVehicle.getCurrentPosition(),
// newVehicle.getVelocity(),
// newVehicle.getHeading(),
// newVehicle.getTimestamp()
// );
// newVehicle.getStateHistory().addFirst(initialState);
// unmannedVehicleMap.put(vehicleNo, newVehicle);
// }
// }
// 更新数据Map用于其他服务访问
storeData(MovingObjectType.SPECIAL_VEHICLE.toString(), new ArrayList<>(vehicleMap.values()));
}
@Scheduled(fixedRateString = "${data.collector.interval}")
@Async // 异步执行
public void collectVehicleLocationData() {
List<UnmannedVehicle> unmannedVehicles = dataCollectorDao.getVehicleLocationInfo();
for (UnmannedVehicle newVehicle : unmannedVehicles) {
String vehicleNo = newVehicle.getVehicleId();
// 获取已存在的航空器如果存在
UnmannedVehicle unmannedVehicle = unmannedVehicleMap.get(vehicleNo);
if (unmannedVehicle != null) {
// 更新现有航空器的状态
MovementState currentState = new MovementState(
newVehicle.getCurrentPosition(),
newVehicle.getVelocity(),
newVehicle.getHeading(),
newVehicle.getTimestamp()
);
// 使用已存在航空器的历史队列
unmannedVehicle.setCurrentPosition(newVehicle.getCurrentPosition());
unmannedVehicle.setVelocity(newVehicle.getVelocity());
unmannedVehicle.setHeading(newVehicle.getHeading());
unmannedVehicle.setTimestamp(newVehicle.getTimestamp());
unmannedVehicle.getStateHistory().addFirst(currentState);
// 控制历史记录长度
if (unmannedVehicle.getStateHistory().size() > unmannedVehicle.MAX_HISTORY) {
unmannedVehicle.getStateHistory().removeLast();
}
} else {
// 新的航空器初始化历史记录
MovementState initialState = new MovementState(
newVehicle.getCurrentPosition(),
newVehicle.getVelocity(),
newVehicle.getHeading(),
newVehicle.getTimestamp()
);
newVehicle.getStateHistory().addFirst(initialState);
unmannedVehicleMap.put(vehicleNo, newVehicle);
}
}
// 更新数据Map用于其他服务访问
storeData(MovingObjectType.UNMANNED_VEHICLE.toString(), new ArrayList<>(unmannedVehicleMap.values()));
}
private <T> void storeData(String type, List<T> data) {
dataMap.put(type, new CopyOnWriteArrayList<>(data));
}
private void validateAndUpdateTimestamp(MovementState state, LinkedList<MovementState> history) {
if (!history.isEmpty()) {
MovementState lastState = history.getFirst();
if (state.getTimestamp() <= lastState.getTimestamp()) {
log.warn("检测到时间戳乱序: current={}, last={}",
state.getTimestamp(), lastState.getTimestamp());
// 使用递增时间戳
state.setTimestamp(lastState.getTimestamp() + 1);
}
}
}
// storeData(MovingObjectType.UNMANNED_VEHICLE.toString(), new ArrayList<>(unmannedVehicleMap.values()));
// }
//
//
// // 在DataCollectorService类中添加
// @Getter
// private final LinkedBlockingQueue<Map<String, List<Object>>> processingQueue = new LinkedBlockingQueue<>();
//
// // 修改storeData方法推送增量数据
// private <T> void storeData(String type, List<T> data) {
// List<Object> converted = new CopyOnWriteArrayList<>(data);
// dataMap.put(type, converted);
//
// // 创建增量数据包仅包含当前类型
// Map<String, List<Object>> delta = new ConcurrentHashMap<>();
// delta.put(type, converted);
// processingQueue.offer(delta);
// }
//
// private void validateAndUpdateTimestamp(MovementState state, LinkedList<MovementState> history) {
// if (!history.isEmpty()) {
// MovementState lastState = history.getFirst();
// if (state.getTimestamp() <= lastState.getTimestamp()) {
// log.warn("检测到时间戳乱序: current={}, last={}",
// state.getTimestamp(), lastState.getTimestamp());
// // 使用递增时间戳
// state.setTimestamp(lastState.getTimestamp() + 1);
// }
// }
}

View File

@ -0,0 +1,32 @@
package com.dongni.collisionavoidance.dataProcessing.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* 坐标系统配置属性类
* 用于从application.yml中读取机场中心点坐标
*/
@Component
@ConfigurationProperties(prefix = "coordinate-system.airport")
public class CoordinateSystemProperties {
private double centerLongitude;
private double centerLatitude;
public double getCenterLongitude() {
return centerLongitude;
}
public void setCenterLongitude(double centerLongitude) {
this.centerLongitude = centerLongitude;
}
public double getCenterLatitude() {
return centerLatitude;
}
public void setCenterLatitude(double centerLatitude) {
this.centerLatitude = centerLatitude;
}
}

View File

@ -0,0 +1,50 @@
package com.dongni.collisionavoidance.dataProcessing.service;
import org.geotools.api.referencing.crs.CoordinateReferenceSystem;
import org.geotools.api.referencing.operation.MathTransform;
import org.geotools.geometry.jts.JTS;
import org.geotools.referencing.CRS;
import org.locationtech.jts.geom.Coordinate;
import org.springframework.stereotype.Component;
public class AirportCoordinateSystem {
private final CoordinateReferenceSystem sourceCRS;
private final CoordinateReferenceSystem targetCRS;
private final MathTransform transform;
// 本地坐标系原点
private final double[] origin;
public AirportCoordinateSystem(double centerLon, double centerLat) throws Exception {
this.sourceCRS = CRS.decode("EPSG:4326",true);
String utmCode = calculateUtmZone(centerLon, centerLat);
this.targetCRS = CRS.decode(utmCode);
// 创建转换器并计算原点
this.transform = CRS.findMathTransform(sourceCRS, targetCRS);
this.origin = transformCoordinate(centerLon, centerLat);
}
// 转换单个坐标返回相对于原点的坐标
public double[] convertToLocal(double lon, double lat) throws Exception {
double[] utm = transformCoordinate(lon, lat);
return new double[]{
utm[0] - origin[0],
utm[1] - origin[1]
};
}
private double[] transformCoordinate(double lon, double lat) throws Exception {
Coordinate source = new Coordinate(lon, lat);
Coordinate target = new Coordinate();
JTS.transform(source, target, transform);
return new double[]{target.x, target.y};
}
public static String calculateUtmZone(double lon, double lat) {
int zone = (int) Math.floor((lon + 180) / 6) + 1;
// 北半球编码
return "EPSG:326" + (zone < 10 ? "0" : "") + zone;
}
}

View File

@ -0,0 +1,30 @@
package com.dongni.collisionavoidance.dataProcessing.service;
import com.dongni.collisionavoidance.dataProcessing.config.CoordinateSystemProperties;
import org.springframework.stereotype.Service;
@Service
public class CoordinateSystemService {
private final AirportCoordinateSystem airportCoordinateSystem;
public CoordinateSystemService(CoordinateSystemProperties properties) throws Exception {
this.airportCoordinateSystem = new AirportCoordinateSystem(
properties.getCenterLongitude(),
properties.getCenterLatitude()
);
System.out.println("坐标转换服务已经初始化成功!");
}
/**
* 将WGS84坐标系下的经纬度转换为以机场中心为原点的局部坐标系坐标
*
* @param longitude WGS84坐标系下的经度
* @param latitude WGS84坐标系下的纬度
* @return 局部坐标系下的坐标数组中第一个元素为x坐标东向第二个元素为y坐标北向
* @throws Exception 如果坐标转换过程中发生错误
*/
public double[] convertToLocalCoordinate(double longitude, double latitude) throws Exception {
return airportCoordinateSystem.convertToLocal(longitude, latitude);
}
}

View File

@ -1,88 +1,135 @@
package com.dongni.collisionavoidance.dataProcessing.service;
import com.dongni.collisionavoidance.common.model.MovingObject;
import com.dongni.collisionavoidance.common.model.MovingObjectType;
import com.dongni.collisionavoidance.dataCollector.service.DataCollectorService;
import com.dongni.collisionavoidance.common.model.repository.MovingObjectRepository;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
@Slf4j
@Service
@Component
public class DataProcessor {
// 处理线程池
private final ExecutorService executor = Executors.newFixedThreadPool(5);
// private final RedisTemplate<String, Object> redisTemplate;
// private final MongoTemplate mongoTemplate;
private final DataCollectorService dataCollectorService;
public DataProcessor(
// RedisTemplate<String, Object> redisTemplate,
// MongoTemplate mongoTemplate,
DataCollectorService dataCollectorService) {
// this.redisTemplate = redisTemplate;
// this.mongoTemplate = mongoTemplate;
this.dataCollectorService = dataCollectorService;
}
@Autowired
private MovingObjectRepository movingObjectRepository;
@Autowired
private CoordinateSystemService coordinateSystemService;
@Autowired
private SpeedCalculationService speedCalculationService;
@Resource
private Executor processingExecutor;
@PostConstruct
public void init() {
executor.execute(this::processLoop);
log.info("启动数据处理线程...");
processingExecutor.execute(() -> {
try {
processLoop();
} catch (Exception e) {
log.error("数据处理线程异常终止", e);
}
});
}
// 修改takeUpdate调用逻辑
private void processLoop() {
while (true) {
try {
// 获取共享的数据Map
ConcurrentHashMap<String, List<Object>> currentDataMap = dataCollectorService.getDataMap();
// 遍历所有数据类型
for (Map.Entry<String, List<Object>> entry : currentDataMap.entrySet()) {
String dataType = entry.getKey();
List<Object> dataList = entry.getValue();
if (dataList != null && !dataList.isEmpty()) {
// 根据不同的数据类型进行处理
switch (dataType) {
case "AIRCRAFT"-> processAircraftData(dataList);
case "SPECIAL_VEHICLE" -> processVehicleData(dataList);
case "UNMANNED_VEHICLE"-> processLocationData(dataList);
}
}
log.debug("进入数据处理循环");
try {
while (!Thread.currentThread().isInterrupted()) {
// 添加超时机制和空值判断
Map<MovingObjectType, Set<String>> delta = movingObjectRepository.takeUpdate();
if (delta == null || delta.isEmpty()) {
log.debug("未获取到数据更新等待500ms");
Thread.sleep(500);
continue;
}
// 处理完后休眠一段时间
Thread.sleep(1000);
delta.forEach((objectType, ids) -> {
// 添加集合空值检查
if (ids == null || ids.isEmpty()) {
log.warn("接收到空ID集合类型{}", objectType);
return;
}
// 获取该类型全量数据快照
Map<String, MovingObject> snapshot = movingObjectRepository.getTypeMapDirect(objectType);
} catch (Exception e) {
log.error("Error in data processing loop", e);
// 转换为待处理的数据列表
List<MovingObject> dataList = new CopyOnWriteArrayList<>(snapshot.values());
log.debug("正在处理 {} 类型的 {} 条数据", objectType, dataList.size());
try {
switch (objectType) {
case AIRCRAFT -> processAircraftData(dataList);
case SPECIAL_VEHICLE -> processVehicleData(dataList);
case UNMANNED_VEHICLE -> processLocationData(dataList);
default -> log.warn("未支持的数据类型: {}", objectType);
}
} catch (Exception e) {
log.error("数据处理异常 [类型: {}]", objectType, e);
}
});
log.debug("本次处理完成,共处理{}种类型更新", delta.size());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("数据处理线程被中断");
} catch (Exception e) {
log.error("数据处理循环发生未预期错误", e);
}
log.warn("退出数据处理循环");
}
private void processAircraftData(List<MovingObject> dataList) {
try {
log.info("开始处理航空器数据,共{}条", dataList.size());
convertToLocalCoordinate(dataList);
speedCalculationService.preprocessData(dataList);
} catch (Exception e) {
log.error("航空器数据处理异常", e);
}
}
// 预留的处理方法
private void processAircraftData(List<Object> dataList) {
// TODO: 实现飞机数据处理逻辑
System.out.println("processAircraftData" + dataList);
private void processVehicleData(List<MovingObject> dataList) {
try {
log.info("开始处理特种车辆数据,共{}条", dataList.size());
convertToLocalCoordinate(dataList);
speedCalculationService.preprocessData(dataList);
} catch (Exception e) {
log.error("车辆数据处理异常", e);
}
}
private void processVehicleData(List<Object> dataList) {
// TODO: 实现车辆数据处理逻辑
System.out.println("processAircraftData" + dataList);
private void processLocationData(List<MovingObject> dataList) {
try {
log.info("开始处理无人载具数据,共{}条", dataList.size());
convertToLocalCoordinate(dataList);
speedCalculationService.preprocessData(dataList);
} catch (Exception e) {
log.error("位置数据处理异常", e);
}
}
private void processLocationData(List<Object> dataList) {
// TODO: 实现位置数据处理逻辑
System.out.println("processAircraftData" + dataList);
// 在convertToLocalCoordinate方法中添加调试日志
private void convertToLocalCoordinate(List<MovingObject> rawData) throws Exception {
log.debug("开始坐标转换,数据量:{}", rawData.size());
for (MovingObject item : rawData) {
double[] doubles = coordinateSystemService.convertToLocalCoordinate(item.getCurrentPosition().getLongitude(), item.getCurrentPosition().getLatitude());
item.getVelocity().setX(doubles[0]);
item.getVelocity().setY(doubles[1]);
}
}
}
}

View File

@ -0,0 +1,253 @@
package com.dongni.collisionavoidance.dataProcessing.service;
import com.dongni.collisionavoidance.common.model.GeoPosition;
import com.dongni.collisionavoidance.common.model.MovementState;
import com.dongni.collisionavoidance.common.model.MovingObject;
import com.dongni.collisionavoidance.common.model.Velocity;
import org.springframework.stereotype.Service;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
@Service
public class SpeedCalculationService {
// 滑动窗口大小
private static final int TAKEOFF_WINDOW_SIZE = 10;
private static final int LANDING_WINDOW_SIZE = 5;
// 加速度范围/²
private static final double MIN_TAKEOFF_ACCELERATION = 0.5;
private static final double MAX_TAKEOFF_ACCELERATION = 2.5;
private static final double MIN_LANDING_ACCELERATION = -2.0;
private static final double MAX_LANDING_ACCELERATION = 0.0;
// 异常值检测阈值
private static final double MAX_POSITION_JUMP = 100.0; //
private static final double MAX_TIME_JUMP = 5000.0; // 毫秒
/**
* 数据预处理
*/
private final ReentrantLock lock = new ReentrantLock();
public void preprocessData(List<MovingObject> dataList) {
if (dataList == null || dataList.isEmpty()) {
return;
}
lock.lock();
try {
for (MovingObject obj : dataList) {
// 如果历史状态队列为空使用当前状态初始化
if (obj.getStateHistory().isEmpty()) {
MovementState state = new MovementState();
state.setPosition(obj.getCurrentPosition());
state.setVelocity(obj.getVelocity());
state.setTimestamp(obj.getTimestamp());
// 默认设置数据质量为GOOD
state.setDataQuality(MovementState.DataQuality.GOOD);
obj.getStateHistory().add(state);
continue;
}
// 获取最近的历史状态
MovementState lastState = obj.getStateHistory().getFirst();
// 异常值检测
double distance = calculateDistance(obj.getVelocity(), lastState.getVelocity());
long timeDiff = obj.getTimestamp() - lastState.getTimestamp();
// 检查位置跳变
if (distance > MAX_POSITION_JUMP) {
obj.getStateHistory().getFirst().setDataQuality(MovementState.DataQuality.SUSPICIOUS);
// state.setDataQuality(MovementState.DataQuality.SUSPICIOUS);
}
// 检查时间异常
if (timeDiff <= 0 || timeDiff > MAX_TIME_JUMP) {
obj.getStateHistory().getFirst().setDataQuality(MovementState.DataQuality.SUSPICIOUS);
// state.setDataQuality(MovementState.DataQuality.SUSPICIOUS);
}
// 检查速度异常
double speed = distance / (timeDiff / 1000.0);
if (speed > obj.getMaxSpeed()) {
obj.getStateHistory().getFirst().setDataQuality(MovementState.DataQuality.SUSPICIOUS);
// state.setDataQuality(MovementState.DataQuality.SUSPICIOUS);
}
// 只对质量良好的数据进行卡尔曼滤波
// if (object.getStateHistory().getLast().getDataQuality() == MovementState.DataQuality.GOOD) {
// applyKalmanFilter(object.getStateHistory().getLast(), object.getStateHistory());
// }
// object.getStateHistory().add(state);
}
}finally {
lock.unlock();
}
}
/**
* 异常值检测
*/
private boolean isDataAnomaly(MovingObject object, GeoPosition newPosition, long timestamp) {
if (object.getStateHistory().isEmpty()) {
return false;
}
MovementState lastState = object.getStateHistory().getLast();
// 检查位置跳变
double distance = calculateDistance(lastState.getVelocity(), object.getVelocity());
if (distance > MAX_POSITION_JUMP) {
return true;
}
// 检查时间异常
long timeDiff = timestamp - lastState.getTimestamp();
if (timeDiff <= 0 || timeDiff > MAX_TIME_JUMP) {
return true;
}
// 检查速度异常
double speed = distance / (timeDiff / 1000.0);
return speed > object.getMaxSpeed();
}
/**
* 卡尔曼滤波
*/
private void applyKalmanFilter(MovementState state, Deque<MovementState> history) {
// TODO: 实现卡尔曼滤波算法
// 状态向量[x, y, vx, vy]
// 观测向量[x, y]
}
/**
* 计算速度
*/
public void calculateSpeed(MovingObject object, MovementState newState) {
// Deque<MovementState> history = object.getStateHistory();
// if (history.isEmpty()) {
// return;
// }
//
// // 确定窗口大小
// int windowSize = determineWindowSize(object);
//
// // 计算速度
// Velocity velocity = calculateVelocityComponents(object, object.getStateHistory().getLast());
//
// // 应用加速度约束
// applyAccelerationConstraints(velocity, object);
//
// newState.setVelocity(velocity);
}
/**
* 确定滑动窗口大小
*/
private int determineWindowSize(MovingObject object) {
if (isInTakeoffState(object)) {
return TAKEOFF_WINDOW_SIZE;
} else if (isInLandingState(object)) {
return LANDING_WINDOW_SIZE;
}
return TAKEOFF_WINDOW_SIZE; // 默认使用起飞窗口大小
}
/**
* 判断是否在起飞状态
*/
private boolean isInTakeoffState(MovingObject object) {
// TODO: 实现起飞状态判断逻辑
return false;
}
/**
* 判断是否在降落状态
*/
private boolean isInLandingState(MovingObject object) {
// TODO: 实现降落状态判断逻辑
return false;
}
/**
* 从滑动窗口计算速度
*/
private void calculateVelocityComponents(MovingObject object, MovementState lastState) {
Velocity velocity = object.getVelocity();
long currentTime = object.getTimestamp();
long lastTime = lastState.getTimestamp();
long deltaTime = currentTime - lastTime;
if (deltaTime <= 0) {
object.getStateHistory().getLast().setDataQuality(MovementState.DataQuality.SUSPICIOUS);
return;
}
double deltaX = object.getVelocity().getX() - lastState.getVelocity().getX();
double deltaY = object.getVelocity().getY() - lastState.getVelocity().getY();
double deltaZ = object.getVelocity().getZ() - lastState.getVelocity().getZ();
object.getVelocity().setVx(deltaX / (deltaTime / 1000.0));
object.getVelocity().setVy(deltaY / (deltaTime / 1000.0));
object.getVelocity().setVz(deltaZ / (deltaTime / 1000.0));
// 缓存瞬时加速度
double lastSpeed = Math.sqrt(
Math.pow(lastState.getVelocity().getVx(), 2) +
Math.pow(lastState.getVelocity().getVy(), 2) +
Math.pow(lastState.getVelocity().getVz(), 2)
);
double currentSpeed = Math.sqrt(Math.pow(velocity.getVx(), 2) + Math.pow(velocity.getVy(), 2));
object.getVelocity().setCachedAcceleration((currentSpeed - lastSpeed) / (deltaTime / 1000.0));
}
/**
* 应用加速度约束
*/
private void applyAccelerationConstraints(Velocity velocity, MovingObject object) {
double acceleration = velocity.getCachedAcceleration();
if (isInTakeoffState(object)) {
if (acceleration < MIN_TAKEOFF_ACCELERATION) {
velocity.setConfidence(0.5);
} else if (acceleration > MAX_TAKEOFF_ACCELERATION) {
velocity.setConfidence(0.5);
} else {
velocity.setConfidence(1.0);
}
} else if (isInLandingState(object)) {
if (acceleration < MIN_LANDING_ACCELERATION) {
velocity.setConfidence(0.5);
} else if (acceleration > MAX_LANDING_ACCELERATION) {
velocity.setConfidence(0.5);
} else {
velocity.setConfidence(1.0);
}
}
}
/**
* 计算两个向量间的距离
* @param currentVelocity 当前向量
* @param historicalVelocity 历史向量
* @return 距离
*/
private double calculateDistance(Velocity currentVelocity, Velocity historicalVelocity) {
double deltaX = currentVelocity.getX() - historicalVelocity.getX();
double deltaY = currentVelocity.getY() - historicalVelocity.getY();
double deltaZ = currentVelocity.getZ() - historicalVelocity.getZ();
return Math.sqrt(deltaX * deltaX + deltaY * deltaY + deltaZ * deltaZ);
}
}

View File

@ -0,0 +1,19 @@
package com.dongni.collisionavoidance.webSocket.config;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.SerializationFeature;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
@Configuration
public class JacksonConfig {
@Bean
public Jackson2ObjectMapperBuilder objectMapperBuilder() {
return new Jackson2ObjectMapperBuilder()
.indentOutput(true)
.serializationInclusion(JsonInclude.Include.NON_NULL)
.featuresToDisable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
}
}

View File

@ -0,0 +1,40 @@
package com.dongni.collisionavoidance.webSocket.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import java.util.List;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 注册STOMP端点客户端通过此URL连接WebSocket
registry.addEndpoint("/ws")
.setAllowedOriginPatterns("*") // 允许跨域
.withSockJS(); // 启用SockJS支持
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 启用内存消息代理客户端订阅地址前缀为/topic
registry.enableSimpleBroker("/topic");
// 客户端发送消息的地址前缀为/app
registry.setApplicationDestinationPrefixes("/app");
}
// 关键配置添加JSON消息转换器
@Override
public boolean configureMessageConverters(List<MessageConverter> messageConverters) {
messageConverters.add(new MappingJackson2MessageConverter());
return false;
}
}

View File

@ -0,0 +1,27 @@
package com.dongni.collisionavoidance.webSocket.controller;
import com.dongni.collisionavoidance.common.model.MovingObject;
import com.dongni.collisionavoidance.common.model.MovingObjectType;
import com.dongni.collisionavoidance.common.model.repository.MovingObjectRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;
import java.util.concurrent.ConcurrentHashMap;
@Controller
public class GeopositionController {
@Autowired
MovingObjectRepository movingObjectRepository;
@MessageMapping("/getGeoposition")
@SendTo("/topic/geoSition")
public ConcurrentHashMap<String, MovingObject> getGeosition() {
System.out.println("接收到地理坐标请求");
ConcurrentHashMap<String, MovingObject> typeMapDirect = movingObjectRepository.getTypeMapDirect(MovingObjectType.AIRCRAFT);
return typeMapDirect;
}
}

View File

@ -0,0 +1,18 @@
package com.dongni.collisionavoidance.webSocket.controller;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;
@Controller
public class MessageController {
// 处理客户端发送到/app/send的消息
@MessageMapping("/send")
// 将返回结果广播到/topic/messages
@SendTo("/topic/messages")
public String handleMessage(String message) {
System.out.println("收到消息: " + message);
return "服务器回复: " + message;
}
}

View File

@ -39,7 +39,7 @@ spring:
# 数据采集配置
data:
collector:
interval: 1000
interval: 10000
topics:
position: aircraft-positions
vehicle: vehicle-positions
@ -68,6 +68,13 @@ data:
retention:
redis-expire-seconds: 60
mongodb-days: 30
# 坐标系统配置
coordinate-system:
airport:
# 机场中心点坐标(默认:青岛胶东国际机场中心点)
center-longitude: 120.0834104
center-latitude: 36.35406879
# 数据保留策略配置
# Actuator配置
management:

View File

@ -0,0 +1,105 @@
<!DOCTYPE html>
<html>
<head>
<title>Geoposition 接口测试</title>
<script src="https://cdn.jsdelivr.net/npm/sockjs-client@1.5.2/dist/sockjs.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
</head>
<body>
<h2>实时飞行器位置监控</h2>
<div style="margin-bottom:20px;">
<select id="frequency" style="margin-right:10px;">
<option value="1000">1秒</option>
<option value="5000">5秒</option>
<option value="10000">10秒</option>
</select>
<button id="autoRefresh" style="margin-right:10px;">自动刷新:关</button>
<button onclick="manualRefresh()">立即刷新</button>
</div>
<div id="positions"></div>
<script>
let refreshTimer;
const socket = new SockJS('http://localhost:8082/ws');
const stompClient = Stomp.over(socket);
// 从本地存储加载设置
const savedSettings = JSON.parse(localStorage.getItem('positionSettings') || '{}');
document.getElementById('frequency').value = savedSettings.frequency || '5000';
document.getElementById('autoRefresh').textContent = `自动刷新:${savedSettings.autoRefresh === 'true' ? '开' : '关'}`;
// 设置变化监听
document.getElementById('frequency').addEventListener('change', saveSettings);
document.getElementById('autoRefresh').addEventListener('click', toggleAutoRefresh);
stompClient.connect({}, () => {
stompClient.subscribe('/topic/geoSition', (message) => {
const positions = JSON.parse(message.body);
updateDisplay(positions);
});
if (savedSettings.autoRefresh === 'true') {
startAutoRefresh();
}
stompClient.send("/app/getGeoposition", {});
});
function saveSettings() {
const settings = {
frequency: document.getElementById('frequency').value,
autoRefresh: localStorage.getItem('positionSettings') ?
JSON.parse(localStorage.getItem('positionSettings')).autoRefresh : 'false'
};
localStorage.setItem('positionSettings', JSON.stringify(settings));
}
function toggleAutoRefresh() {
const button = document.getElementById('autoRefresh');
const isActive = button.textContent.includes('开');
button.textContent = `自动刷新:${isActive ? '关' : '开'}`;
const settings = JSON.parse(localStorage.getItem('positionSettings') || '{}');
settings.autoRefresh = String(!isActive);
localStorage.setItem('positionSettings', JSON.stringify(settings));
if (!isActive) {
startAutoRefresh();
} else {
clearInterval(refreshTimer);
}
}
function startAutoRefresh() {
clearInterval(refreshTimer);
const interval = parseInt(document.getElementById('frequency').value);
refreshTimer = setInterval(() => {
stompClient.send("/app/getGeoposition", {});
}, interval);
}
function manualRefresh() {
stompClient.send("/app/getGeoposition", {});
if (document.getElementById('autoRefresh').textContent.includes('开')) {
startAutoRefresh();
}
}
function updateDisplay(positionsMap) {
console.log('原始数据:', positionsMap);
if(Object.keys(positionsMap).length > 0) {
const firstItem = positionsMap[Object.keys(positionsMap)[0]];
console.log('示例对象结构:', firstItem);
}
const container = document.getElementById('positions');
container.innerHTML = Object.entries(positionsMap)
.map(([id, obj]) => `
<div style="margin:10px; padding:10px; border:1px solid #ccc">
<h3>飞行器 ${id}</h3>
<p>纬度: ${obj.latitude}</p>
<p>经度: ${obj.longitude}</p>
<p>高度: ${obj.altitude}m</p>
</div>`)
.join('');
}
</script>
</body>
</html>

View File

@ -0,0 +1,90 @@
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>数据监控面板</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css" rel="stylesheet">
<style>
.data-card {
margin-bottom: 20px;
}
.status-indicator {
width: 10px;
height: 10px;
border-radius: 50%;
display: inline-block;
margin-right: 5px;
}
.status-ok {
background-color: #28a745;
}
.status-warning {
background-color: #ffc107;
}
.status-error {
background-color: #dc3545;
}
pre {
background-color: #f8f9fa;
padding: 15px;
border-radius: 5px;
}
</style>
</head>
<body>
<div class="container mt-4">
<h1 class="mb-4">实时数据监控面板</h1>
<div class="row">
<div class="col-md-4">
<div class="card data-card">
<div class="card-header">
<div class="status-indicator" id="aircraft-status"></div>
飞机数据
</div>
<div class="card-body">
<pre id="aircraft-data">等待数据...</pre>
</div>
</div>
</div>
<div class="col-md-4">
<div class="card data-card">
<div class="card-header">
<div class="status-indicator" id="vehicle-status"></div>
特种车辆数据
</div>
<div class="card-body">
<pre id="vehicle-data">等待数据...</pre>
</div>
</div>
</div>
<div class="col-md-4">
<div class="card data-card">
<div class="card-header">
<div class="status-indicator" id="unmanned-status"></div>
无人车数据
</div>
<div class="card-body">
<pre id="unmanned-data">等待数据...</pre>
</div>
</div>
</div>
</div>
<div class="row mt-4">
<div class="col-12">
<div class="card">
<div class="card-header">
数据统计
</div>
<div class="card-body">
<div id="stats"></div>
</div>
</div>
</div>
</div>
</div>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/js/bootstrap.bundle.min.js"></script>
<script src="js/monitor.js"></script>
</body>
</html>

View File

@ -0,0 +1,89 @@
// 数据刷新间隔(毫秒)
const REFRESH_INTERVAL = 1000;
// 数据类型映射
const DATA_TYPES = {
'AIRCRAFT': {
elementId: 'aircraft-data',
statusId: 'aircraft-status'
},
'SPECIAL_VEHICLE': {
elementId: 'vehicle-data',
statusId: 'vehicle-status'
},
'UNMANNED_VEHICLE': {
elementId: 'unmanned-data',
statusId: 'unmanned-status'
}
};
// 更新状态指示器
function updateStatusIndicator(statusId, hasData) {
const indicator = document.getElementById(statusId);
if (hasData) {
indicator.className = 'status-indicator status-ok';
} else {
indicator.className = 'status-indicator status-error';
}
}
// 更新数据统计
function updateStats(data) {
const stats = document.getElementById('stats');
const statsHtml = Object.entries(data).map(([type, list]) => {
return `<p><strong>${type}:</strong> ${list.length} 条数据</p>`;
}).join('');
stats.innerHTML = statsHtml;
}
// 格式化数据显示
function formatData(data) {
return JSON.stringify(data, null, 2);
}
// 更新显示数据
function updateDisplay(data) {
// 更新每种类型的数据
Object.entries(DATA_TYPES).forEach(([type, config]) => {
const element = document.getElementById(config.elementId);
const dataList = data[type] || [];
// 更新数据显示
element.textContent = formatData(dataList);
// 更新状态指示器
updateStatusIndicator(config.statusId, dataList.length > 0);
});
// 更新统计信息
updateStats(data);
}
// 获取数据
async function fetchData() {
try {
const response = await fetch('/api/monitor/data');
if (!response.ok) {
throw new Error('网络响应异常');
}
const data = await response.json();
updateDisplay(data);
} catch (error) {
console.error('获取数据失败:', error);
// 更新所有状态指示器为错误状态
Object.values(DATA_TYPES).forEach(config => {
updateStatusIndicator(config.statusId, false);
});
}
}
// 启动定时刷新
function startMonitoring() {
// 立即执行一次
fetchData();
// 设置定时刷新
setInterval(fetchData, REFRESH_INTERVAL);
}
// 页面加载完成后启动监控
document.addEventListener('DOMContentLoaded', startMonitoring);

View File

@ -45,6 +45,8 @@ class VehicleLocationInfo(BaseModel):
vehicleId: str
longitude: float
latitude: float
x: float
y: float
direction: float
speed: float
@ -152,11 +154,15 @@ def get_or_create_vehicle_state(vehicle_id: str) -> dict:
"""获取或创建车辆状态"""
if vehicle_id not in vehicle_states:
route = random.choice(list(ROUTES.values()))
# 添加UTM坐标初始化
x, y = convert_to_utm(route[0]['longitude'], route[0]['latitude'])
vehicle_states[vehicle_id] = {
'current_pos': route[0],
'route': route,
'route_index': 0,
'speed': random.uniform(5.0, 8.0) # 5-8米/秒
'speed': random.uniform(5.0, 8.0), # 5-8米/秒
'x': x,
'y': y
}
return vehicle_states[vehicle_id]
@ -181,6 +187,11 @@ def update_vehicle_position(vehicle_id: str, state: dict) -> tuple:
state['current_pos'] = next_pos
state['route_index'] = route_index
# 添加UTM坐标转换
x, y = convert_to_utm(next_pos['longitude'], next_pos['latitude'])
state['x'] = x
state['y'] = y
direction = calculate_direction(current_pos, target_pos)
return next_pos, direction

View File

@ -0,0 +1,24 @@
package com.dongni.collisionavoidance.dataProcessing;
import com.dongni.collisionavoidance.dataProcessing.service.AirportCoordinateSystem;
import com.dongni.collisionavoidance.dataProcessing.service.CoordinateSystemService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.Arrays;
// 自动计算机场中心点UTM分区
@SpringBootTest
public class Test {
@Autowired
private CoordinateSystemService coordinateSystemService;
@org.junit.jupiter.api.Test
public void test() throws Exception {
double[] doubles = coordinateSystemService.convertToLocalCoordinate(120.0864911
, 36.35074527
);
System.out.println(Arrays.toString(doubles));
}
}

View File

@ -0,0 +1,29 @@
package com.dongni.collisionavoidance.dataProcessing;
import com.dongni.collisionavoidance.dataProcessing.service.AirportCoordinateSystem;
import static com.dongni.collisionavoidance.dataProcessing.service.AirportCoordinateSystem.calculateUtmZone;
// 自动计算机场中心点UTM分区
public class UtmZoneCalculator {
public static void main(String[] args) {
// 示例青岛胶东国际机场中心点
//lat纬度
double centerLon = 120.0834104, centerLat = 36.35406879;
String utmCode = calculateUtmZone(centerLon, centerLat);
// 输出: EPSG:32651
System.out.println("推荐投影坐标系: " + utmCode);
System.out.println("-----------");
try {
AirportCoordinateSystem airportCoordinateSystem = new AirportCoordinateSystem(centerLon,centerLat);
double[] doubles = airportCoordinateSystem.convertToLocal(120.0864911, 36.35074527);
System.out.println(doubles[0] + " " + doubles[1]);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,197 @@
from flask import Flask, jsonify, request
import time
import math
import logging
import os
import threading
from datetime import datetime
# 修复步骤1统一坐标点键名
POINT_T1 = {"longitude": 120.0868853, "latitude": 36.35496367}
POINT_T2 = {"longitude": 120.08502054, "latitude": 36.35448347}
POINT_T4 = {"longitude": 120.08558121, "latitude": 36.35305878}
POINT_T7 = {"longitude": 120.08562915, "latitude": 36.35052372}
POINT_T11 = {"longitude": 120.0873865, "latitude": 36.3509885}
POINT_T12 = {"longitude": 120.08603613, "latitude": 36.35190217}
class ImprovedTrajectory:
def __init__(self, points, speed_kmh):
self.points = points
self.speed = speed_kmh * 1000 / 3600 # m/s
self.current_idx = 0
self.start_time = time.time()
# 修复步骤2修正键名访问
def get_position(self):
elapsed = time.time() - self.start_time
distance = self.speed * elapsed
p1 = self.points[self.current_idx]
p2 = self.points[(self.current_idx + 1) % len(self.points)]
# 使用正确的键名 longitude/latitude
dx = p2['longitude'] - p1['longitude']
dy = p2['latitude'] - p1['latitude']
length = math.hypot(dx, dy) * 111319.9 # 转换为米
ratio = min(distance / length, 1.0)
if ratio >= 1.0:
self.current_idx = (self.current_idx + 1) % len(self.points)
self.start_time = time.time()
return self.get_position()
return {
'longitude': p1['longitude'] + dx * ratio, # 返回正确的键名
'latitude': p1['latitude'] + dy * ratio
}
# 初始化轨迹生成器(使用统一后的坐标点)
qn001_traj = ImprovedTrajectory(
points=[POINT_T1, POINT_T2, POINT_T4],
speed_kmh=18.0
)
ac001_traj = ImprovedTrajectory(
points=[POINT_T7, POINT_T11],
speed_kmh=36.0
)
# 数据更新线程修复步骤3确保使用正确的键名
def update_positions():
while True:
try:
current_ts = global_timestamp.get()
# 更新车辆位置
for v in vehicle_data:
if v["vehicleNo"] == "QN001":
pos = qn001_traj.get_position()
v["longitude"] = pos['longitude'] # 使用正确的键名
v["latitude"] = pos['latitude']
# 更新航空器位置
for a in aircraft_data:
if a["flightNo"] == "AC001":
pos = ac001_traj.get_position()
a["longitude"] = pos['longitude'] # 使用正确的键名
a["latitude"] = pos['latitude']
time.sleep(1.0)
except Exception as e:
logging.error(f"更新异常: {str(e)}")
# 其他代码保持不变...
threading.Thread(target=update_positions, daemon=True).start()
# API端点
@app.route('/login', methods=['POST', 'OPTIONS'])
def login():
if request.method == 'OPTIONS':
return '', 204
username = request.args.get('username')
password = request.args.get('password')
if username == "dianxin" and password == "dianxin@123":
return jsonify({
"status": 200,
"msg": "登入成功",
"data": "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3MzI3ODMwOTAsInVzZXJuYW1lIjoiYWRtaW4ifQ.y9feEL_9NT8UzED9NNkb0Ln6C-PBoufiSHWobWe5vWY"
})
else:
return jsonify({
"status": 401,
"msg": "认证失败",
"data": None
}), 401
@app.route('/openApi/getCurrentFlightPositions', methods=['GET', 'OPTIONS'])
def get_flight_positions():
if request.method == 'OPTIONS':
return '', 204
current_ts = global_timestamp.get()
response_data = []
for a in aircraft_data:
response_data.append({
"flightNo": a["flightNo"],
"longitude": a["longitude"],
"latitude": a["latitude"],
"time": current_ts,
"altitude": a["altitude"],
"trackNumber": a["trackNumber"]
})
return jsonify({
"status": 200,
"msg": "当前航空器实时位置数据",
"data": response_data
})
@app.route('/openApi/getCurrentVehiclePositions', methods=['GET', 'OPTIONS'])
def get_vehicle_positions():
if request.method == 'OPTIONS':
return '', 204
current_ts = global_timestamp.get()
for v in vehicle_data:
v["time"] = current_ts
return jsonify({
"status": 200,
"msg": "当前车辆实时位置数据",
"data": vehicle_data
})
@app.route('/api/VehicleCommandInfo', methods=['POST'])
def handle_vehicle_command():
try:
data = request.json
vehicle_id = data.get("vehicleID")
command_type = data.get("commandType", "").upper()
vehicle_state = vehicle_states.get(vehicle_id)
if not vehicle_state:
return jsonify({
"code": 404,
"msg": "车辆不存在",
"transId": data.get("transId"),
"timestamp": int(time.time() * 1000)
}), 404
if vehicle_state.can_be_overridden_by(command_type):
vehicle_state.update_command(command_type)
return jsonify({
"code": 200,
"msg": "指令执行成功",
"transId": data.get("transId"),
"timestamp": int(time.time() * 1000)
})
else:
return jsonify({
"code": 400,
"msg": "指令优先级不足",
"transId": data.get("transId"),
"timestamp": int(time.time() * 1000)
}), 400
except Exception as e:
return jsonify({
"code": 500,
"msg": str(e),
"transId": data.get("transId", ""),
"timestamp": int(time.time() * 1000)
}), 500
@app.after_request
def add_cors_headers(response):
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, OPTIONS'
response.headers['Access-Control-Max-Age'] = '3600'
return response
if __name__ == '__main__':
app.run(host='localhost', port=8090, debug=True)

View File

@ -0,0 +1,44 @@
<!DOCTYPE html>
<html>
<head>
<title>WebSocket测试</title>
<script src="https://cdn.jsdelivr.net/npm/sockjs-client@1.5.0/dist/sockjs.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
</head>
<body>
<input type="text" id="messageInput" placeholder="输入消息">
<button onclick="sendMessage()">发送</button>
<div id="messages"></div>
<div id="messages2" style="margin-top:20px; border-top:1px solid #ccc; padding-top:10px;"></div>
<script>
const socket = new SockJS('http://localhost:8082/ws');
const stompClient = Stomp.over(socket);
stompClient.connect({}, (frame) => {
console.log('连接成功: ' + frame);
// 订阅/topic/messages以接收消息
stompClient.subscribe('/topic/geoSition', (response) => {
const objects = JSON.parse(response.body);
let htmlContent = '';
Object.entries(objects).forEach(([key, obj]) => {
htmlContent += `<p>${obj.type} ${key}: 经度 ${obj.longitude} 纬度 ${obj.latitude}</p>`;
});
document.getElementById('messages').innerHTML = htmlContent;
});
// 新增对文字消息的订阅
stompClient.subscribe('/topic/messages', (response) => {
const message = response.body;
document.getElementById('messages2').innerHTML += `<p>${message}</p>`;
});
});
function sendMessage() {
const message = document.getElementById('messageInput').value;
// 发送消息到/app/send
stompClient.send("/app/getGeoposition", {}, JSON.stringify('request'));
}
</script>
</body>
</html>