系统存在以下关键设计问题导致测试超时:

无限阻塞线程设计:
数据处理器使用 BlockingQueue.take() 方法无限期阻塞线程
缺少超时机制导致线程永远不会退出
生命周期管理不当:
使用 @PostConstruct 启动线程但没有相应的 @PreDestroy 清理机制
缺少应用程序关闭钩子,导致资源无法正确释放
线程池配置不合理:
线程池配置过大(核心10线程,最大100线程)
没有设置优雅关闭参数,导致应用关闭时线程不会终止
测试隔离不充分:
测试运行时仍尝试连接外部资源(数据库、Kafka、数据采集API)
测试配置未完全禁用不必要的服务
解决方案
我们实施了以下改进措施:
优化线程池配置:
减小线程池大小(核心5线程,最大10线程)
添加 setWaitForTasksToCompleteOnShutdown(true) 和 setAwaitTerminationSeconds(5) 配置
优化队列容量,减少内存占用
添加优雅关闭机制:
在主类添加 JVM 关闭钩子,确保资源正确释放
为服务组件添加 @PreDestroy 方法,实现自定义关闭逻辑
引入状态标志(AtomicBoolean running)控制后台线程循环
防止无限阻塞:
修改 MovingObjectRepository,添加非阻塞的 pollUpdate() 方法替代 takeUpdate()
在数据处理循环中添加超时检查,避免无限等待
优化异常处理,防止线程崩溃或 CPU 使用率飙升
完善测试环境配置:
在测试配置中完全禁用数据采集和处理服务
配置 data.collector.disabled=true 和 data.processor.enabled=false
使用 @ActiveProfiles("test") 确保测试使用正确的配置文件
添加错误处理和日志:
包装所有关键操作在 try-catch 块中,防止错误传播
添加详细日志,便于诊断问题
实现错误恢复机制,确保系统稳定性
This commit is contained in:
Tian jianyong 2025-04-30 12:16:06 +08:00
parent 574dfc8b40
commit 3a551f69b9
7 changed files with 610 additions and 184 deletions

View File

@ -0,0 +1,410 @@
# 碰撞避免系统架构优化设计
本文档提供了对碰撞避免系统架构的全面分析和优化建议,重点关注系统的线程管理、资源利用、生命周期控制和测试实践等方面。
## 1. 问题分析
通过代码审查和测试分析,我们发现系统存在以下架构设计问题:
### 1.1 线程管理问题
- **无限阻塞线程设计**
- 数据处理器使用 `BlockingQueue.take()` 方法无限期阻塞线程
- 许多后台线程没有超时机制,导致资源无法及时释放
- 应用关闭时,阻塞线程不会收到通知,导致进程无法正常退出
- **生命周期管理不当**
- 使用 `@PostConstruct` 启动线程但没有相应的 `@PreDestroy` 清理机制
- 缺少应用程序关闭钩子,导致资源无法正确释放
- 服务间依赖关系复杂,缺乏明确的启动和关闭顺序
- **线程池配置不合理**
- 线程池配置过大核心10线程最大100线程
- 队列容量设置较大100可能导致内存压力
- 没有设置优雅关闭参数,导致应用关闭时线程不会终止
### 1.2 资源管理问题
- **外部连接管理不当**
- 数据采集器与外部API的连接没有合理的重试和超时策略
- HTTP客户端RestTemplate配置不完善缺少超时设置
- 数据库连接MongoDB、Redis缺少连接池管理
- **内存管理不佳**
- 历史数据保留机制不完善,可能导致内存溢出
- 使用大量的ConcurrentHashMap但缺少大小限制
- 没有有效的垃圾回收策略
### 1.3 测试设计问题
- **测试隔离不充分**
- 测试运行时仍尝试连接外部资源数据库、Kafka、数据采集API
- 测试配置未完全禁用不必要的服务
- 缺少专门的测试数据生成机制
- **单元测试与集成测试混合**
- 测试边界不清晰,不符合单一职责原则
- 集成测试依赖外部资源,导致测试不稳定
- 测试代码重复,缺少通用测试基类
## 2. 优化方案
### 2.1 线程管理优化
#### 2.1.1 线程池配置优化
```java
@Configuration
public class ThreadPoolConfig {
@Bean(name = "processingExecutor")
public Executor processingExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 减小核心线程数和最大线程数
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
// 减小队列容量,避免过多任务堆积
executor.setQueueCapacity(25);
executor.setThreadNamePrefix("data-process-");
// 添加优雅关闭配置
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(5);
executor.initialize();
return executor;
}
// 添加专用于数据采集的线程池
@Bean(name = "collectorExecutor")
public Executor collectorExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(3);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(10);
executor.setThreadNamePrefix("data-collect-");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(5);
executor.initialize();
return executor;
}
}
```
#### 2.1.2 应用生命周期管理
```java
@Slf4j
@SpringBootApplication
@EnableScheduling
@EnableMongoRepositories
@EnableConfigurationProperties
public class CollisionAvoidanceApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(CollisionAvoidanceApplication.class, args);
// 添加关闭钩子,确保资源正确释放
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("应用程序关闭中,正在清理资源...");
try {
if (context != null && context.isActive()) {
context.close();
}
} catch (Exception e) {
log.error("关闭应用程序时出错", e);
}
log.info("应用程序已安全关闭");
}));
}
}
```
#### 2.1.3 服务组件生命周期管理
所有后台服务都应实现 `@PreDestroy` 方法:
```java
@Component
public class SomeBackgroundService {
private final AtomicBoolean running = new AtomicBoolean(false);
@PostConstruct
public void init() {
running.set(true);
// 启动逻辑
}
@PreDestroy
public void shutdown() {
log.info("关闭服务...");
running.set(false);
// 等待线程结束、释放资源等
log.info("服务已关闭");
}
}
```
### 2.2 阻塞操作优化
#### 2.2.1 使用非阻塞方法替代无限阻塞
```java
// 修改前:无限期阻塞
Map<MovingObjectType, Set<String>> delta = movingObjectRepository.takeUpdate();
// 修改后:有超时的阻塞
Map<MovingObjectType, Set<String>> delta = movingObjectRepository.pollUpdate(500, TimeUnit.MILLISECONDS);
if (delta == null || delta.isEmpty()) {
// 处理超时情况
continue;
}
```
#### 2.2.2 添加中断和状态检查
```java
private void processLoop() {
try {
while (running.get() && !Thread.currentThread().isInterrupted()) {
try {
// 处理逻辑
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("线程被中断");
break;
} catch (Exception e) {
log.error("处理异常", e);
// 添加延迟避免CPU占用过高
Thread.sleep(1000);
}
}
} catch (Exception e) {
log.error("处理循环异常", e);
}
log.info("退出处理循环");
}
```
### 2.3 资源管理优化
#### 2.3.1 外部连接优化
对于RestTemplate
```java
@Configuration
public class RestTemplateConfig {
@Bean
public RestTemplate restTemplate() {
RestTemplate template = new RestTemplate();
// 设置连接超时和读取超时
HttpComponentsClientHttpRequestFactory factory = new HttpComponentsClientHttpRequestFactory();
factory.setConnectTimeout(5000);
factory.setReadTimeout(5000);
factory.setConnectionRequestTimeout(5000);
template.setRequestFactory(factory);
// 添加重试机制
template.setErrorHandler(new CustomResponseErrorHandler());
return template;
}
}
```
#### 2.3.2 内存管理优化
```java
// 添加缓存容量限制
@Bean
public CacheManager cacheManager() {
SimpleCacheManager cacheManager = new SimpleCacheManager();
// 配置缓存
ConcurrentMapCache movingObjectsCache = new ConcurrentMapCache(
"movingObjects",
CacheBuilder.newBuilder()
.maximumSize(1000) // 最大条目数
.expireAfterWrite(5, TimeUnit.MINUTES) // 写入后过期时间
.build().asMap(),
false);
cacheManager.setCaches(Arrays.asList(movingObjectsCache));
return cacheManager;
}
```
### 2.4 测试设计优化
#### 2.4.1 测试配置优化
```yaml
# application-test.yml
spring:
# 禁用各种自动配置
autoconfigure:
exclude:
- org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration
- org.springframework.boot.autoconfigure.data.mongo.MongoDataAutoConfiguration
# 禁用调度任务
task:
scheduling:
enabled: false
execution:
enabled: false
data:
collector:
disabled: true # 禁用数据采集器
processor:
enabled: false # 禁用数据处理器
```
#### 2.4.2 测试基类设计
```java
@SpringBootTest
@ActiveProfiles("test")
@Import(TestConfig.class)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
public abstract class BaseIntegrationTest {
// 通用测试设置和工具方法
@BeforeEach
public void setup() {
// 测试初始化逻辑
}
@AfterEach
public void cleanup() {
// 测试清理逻辑
}
}
```
## 3. 新的架构设计原则
### 3.1 关注点分离
- 数据采集、数据处理、碰撞检测等功能应严格分离
- 每个组件专注于单一职责,通过明确的接口进行通信
- 使用事件驱动模型代替直接依赖,降低组件间耦合
### 3.2 弹性设计
- 所有外部服务调用都应有超时、重试和熔断机制
- 系统应能在任何组件故障时保持核心功能运行
- 提供降级策略和应急模式,确保系统可用性
### 3.3 可观测性
- 添加全面的日志、指标和追踪功能
- 实现自定义健康检查,监控系统关键组件
- 设计故障注入机制,测试系统弹性
### 3.4 资源效率
- 合理配置线程池和连接池,避免资源浪费
- 实现资源限制和保护策略,防止过载
- 采用分级缓存策略,优化内存使用
## 4. 建议采用的架构模式
### 4.1 事件驱动架构
```java
// 事件定义
public class MovingObjectUpdatedEvent {
private final MovingObjectType type;
private final Set<String> objectIds;
// 构造器、getter等
}
// 发布事件
@Service
public class DataCollectorService {
private final ApplicationEventPublisher eventPublisher;
@Autowired
public DataCollectorService(ApplicationEventPublisher eventPublisher) {
this.eventPublisher = eventPublisher;
}
@Scheduled(fixedRate = 5000)
public void collectData() {
// 采集数据
// 发布事件
eventPublisher.publishEvent(new MovingObjectUpdatedEvent(type, updatedIds));
}
}
// 监听事件
@Service
public class DataProcessor {
@EventListener
public void handleMovingObjectUpdates(MovingObjectUpdatedEvent event) {
// 处理更新通知
}
}
```
### 4.2 响应式编程模型
使用 Spring WebFlux 和 Project Reactor
```java
@Service
public class ReactiveDataService {
private final MovingObjectRepository repository;
// 提供响应式API
public Flux<MovingObject> getMovingObjects(MovingObjectType type) {
return Flux.fromIterable(repository.getByType(type).values());
}
// 响应式处理逻辑
public Mono<ProcessingResult> processData(Flux<MovingObject> objects) {
return objects
.filter(this::isValid)
.flatMap(this::transform)
.reduce(new ProcessingResult(), this::accumulate);
}
}
```
### 4.3 健康检查和监控机制
```java
@Component
public class DataCollectorHealthIndicator implements HealthIndicator {
private final DataCollectorService dataCollectorService;
@Autowired
public DataCollectorHealthIndicator(DataCollectorService dataCollectorService) {
this.dataCollectorService = dataCollectorService;
}
@Override
public Health health() {
if (!dataCollectorService.isConnected()) {
return Health.down()
.withDetail("reason", "无法连接到数据源")
.build();
}
return Health.up()
.withDetail("lastUpdateTime", dataCollectorService.getLastUpdateTime())
.withDetail("collectedItems", dataCollectorService.getCollectedItemsCount())
.build();
}
}
```
## 5. 结论
碰撞避免系统需要从根本上改进线程管理、资源使用和测试实践。我们推荐采用更现代的架构设计原则和模式,特别是事件驱动架构和响应式编程模型,以提高系统的可靠性、可伸缩性和可维护性。
通过实施这些建议,系统将能够更高效地处理并发任务,更可靠地管理资源,并提供更好的测试覆盖,从而确保在各种条件下的稳定运行。

View File

@ -1,11 +1,14 @@
package com.dongni.collisionavoidance;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.data.mongodb.repository.config.EnableMongoRepositories;
@Slf4j
@EnableScheduling
@SpringBootApplication
@EnableMongoRepositories
@ -13,6 +16,20 @@ import org.springframework.data.mongodb.repository.config.EnableMongoRepositorie
public class CollisionAvoidanceApplication {
public static void main(String[] args) {
SpringApplication.run(CollisionAvoidanceApplication.class, args);
ConfigurableApplicationContext context = SpringApplication.run(CollisionAvoidanceApplication.class, args);
// 添加关闭钩子确保所有资源能够正确释放
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("应用程序关闭中,正在清理资源...");
try {
// 关闭Spring上下文
if (context != null && context.isActive()) {
context.close();
}
} catch (Exception e) {
log.error("关闭应用程序时出错", e);
}
log.info("应用程序已安全关闭");
}));
}
}

View File

@ -8,6 +8,7 @@ import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@Component
public class MovingObjectRepository {
@ -69,8 +70,11 @@ public class MovingObjectRepository {
public Map<MovingObjectType, Set<String>> takeUpdate() throws InterruptedException {
return updateQueue.take();
}
// 添加在 MovingObjectRepository 类中
// 非阻塞获取更新通知带超时
public Map<MovingObjectType, Set<String>> pollUpdate(long timeout, TimeUnit unit) throws InterruptedException {
return updateQueue.poll(timeout, unit);
}
// 批量更新整个类型集合
public void updateTypeAll(MovingObjectType type, Map<String, MovingObject> newObjects) {

View File

@ -12,10 +12,12 @@ public class ThreadPoolConfig {
@Bean(name = "processingExecutor")
public Executor processingExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(100);
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
executor.setThreadNamePrefix("data-process-");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(5);
executor.initialize();
return executor;
}

View File

@ -6,6 +6,7 @@ import com.dongni.collisionavoidance.common.model.dto.SpecialVehicleDTO;
import com.dongni.collisionavoidance.common.model.repository.MovingObjectRepository;
import com.dongni.collisionavoidance.dataCollector.dao.DataCollectorDao;
import com.dongni.collisionavoidance.dataCollector.model.VehicleLocationInfo;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -34,6 +35,9 @@ public class DataCollectorService {
@Value("${data.collector.airport-api.base-url}")
private String airportBaseUrl;
@Value("${data.collector.disabled:false}")
private boolean collectorDisabled;
// 线程安全队列用于暂存原始数据
@Getter
@ -47,152 +51,106 @@ public class DataCollectorService {
@Scheduled(fixedRateString = "${data.collector.interval}")
// @Async // 异步执行
public void collectAircraftData() {
List<Aircraft> newAircrafts = dataCollectorDao.collectAircraftData(airportAircraftEndpoint, airportBaseUrl);
for (Aircraft newAircraft : newAircrafts) {
String flightNo = newAircraft.getFlightNo();
Map<String, MovingObject> snapshot = movingObjectRepository.getTypeMapDirect(MovingObjectType.AIRCRAFT);
Aircraft existingAircraft = (Aircraft) snapshot.get(flightNo);
// 获取已存在的航空器如果存在
if (existingAircraft != null) {
// 更新现有航空器的状态
MovementState currentState = new MovementState(
existingAircraft.getCurrentPosition(),
existingAircraft.getVelocity(),
existingAircraft.getHeading(),
existingAircraft.getTimestamp());
// 控制历史记录长度
if (existingAircraft.getStateHistory().size() + 1 > existingAircraft.MAX_HISTORY) {
existingAircraft.getStateHistory().removeLast();
}
existingAircraft.getStateHistory().addFirst(currentState);
newAircraft.setStateHistory(existingAircraft.getStateHistory());
if (collectorDisabled) {
return;
}
try {
List<Aircraft> newAircrafts = dataCollectorDao.collectAircraftData(airportAircraftEndpoint, airportBaseUrl);
if (newAircrafts.isEmpty()) {
return;
}
// 将List转为MapflightNo -> Aircraft
Map<String, MovingObject> aircraftBatch = newAircrafts.stream()
.collect(Collectors.toMap(
Aircraft::getFlightNo,
aircraft -> aircraft,
(existing, replacement) -> existing,
ConcurrentHashMap::new
));
movingObjectRepository.updateTypeAll(MovingObjectType.AIRCRAFT, aircraftBatch);
for (Aircraft newAircraft : newAircrafts) {
String flightNo = newAircraft.getFlightNo();
Map<String, MovingObject> snapshot = movingObjectRepository.getTypeMapDirect(MovingObjectType.AIRCRAFT);
Aircraft existingAircraft = (Aircraft) snapshot.get(flightNo);
// 获取已存在的航空器如果存在
if (existingAircraft != null) {
// 更新现有航空器的状态
MovementState currentState = new MovementState(
existingAircraft.getCurrentPosition(),
existingAircraft.getVelocity(),
existingAircraft.getHeading(),
existingAircraft.getTimestamp());
// 控制历史记录长度
if (existingAircraft.getStateHistory().size() + 1 > existingAircraft.MAX_HISTORY) {
existingAircraft.getStateHistory().removeLast();
}
existingAircraft.getStateHistory().addFirst(currentState);
newAircraft.setStateHistory(existingAircraft.getStateHistory());
}
// 将List转为MapflightNo -> Aircraft
Map<String, MovingObject> aircraftBatch = newAircrafts.stream()
.collect(Collectors.toMap(
Aircraft::getFlightNo,
aircraft -> aircraft,
(existing, replacement) -> existing,
ConcurrentHashMap::new
));
movingObjectRepository.updateTypeAll(MovingObjectType.AIRCRAFT, aircraftBatch);
}
} catch (Exception e) {
log.error("采集航空器数据异常", e);
}
}
@Scheduled(fixedRateString = "${data.collector.interval}")
@Async // 异步执行
public void collectVehicleData() {
List<SpecialVehicle> vehicles = dataCollectorDao.collectVehicleData(airportVehicleEndpoint, airportBaseUrl);
for (SpecialVehicle newVehicle : vehicles) {
String vehicleNo = newVehicle.getVehicleNo();
// 获取已存在的航空器如果存在
Map<String, MovingObject> snapshot = movingObjectRepository.getSnapshot(MovingObjectType.SPECIAL_VEHICLE);
SpecialVehicle specialVehicle = (SpecialVehicle) snapshot.get(vehicleNo);
if (specialVehicle != null) {
// 更新现有航空器的状态
MovementState currentState = new MovementState(
specialVehicle.getCurrentPosition(),
specialVehicle.getVelocity(),
specialVehicle.getHeading(),
specialVehicle.getTimestamp()
);
// 控制历史记录长度
if (specialVehicle.getStateHistory().size() > specialVehicle.MAX_HISTORY) {
specialVehicle.getStateHistory().removeLast();
}
specialVehicle.getStateHistory().addFirst(currentState);
newVehicle.setStateHistory(specialVehicle.getStateHistory());
}
if (collectorDisabled) {
return;
}
try {
List<SpecialVehicle> vehicles = dataCollectorDao.collectVehicleData(airportVehicleEndpoint, airportBaseUrl);
if (vehicles.isEmpty()) {
return;
}
for (SpecialVehicle newVehicle : vehicles) {
String vehicleNo = newVehicle.getVehicleNo();
// 获取已存在的航空器如果存在
Map<String, MovingObject> snapshot = movingObjectRepository.getSnapshot(MovingObjectType.SPECIAL_VEHICLE);
SpecialVehicle specialVehicle = (SpecialVehicle) snapshot.get(vehicleNo);
if (specialVehicle != null) {
// 更新现有航空器的状态
MovementState currentState = new MovementState(
specialVehicle.getCurrentPosition(),
specialVehicle.getVelocity(),
specialVehicle.getHeading(),
specialVehicle.getTimestamp()
);
// 控制历史记录长度
if (specialVehicle.getStateHistory().size() > specialVehicle.MAX_HISTORY) {
specialVehicle.getStateHistory().removeLast();
}
specialVehicle.getStateHistory().addFirst(currentState);
newVehicle.setStateHistory(specialVehicle.getStateHistory());
}
}
// 将List转为MapvehicleNo -> SpecialVehicle
Map<String, MovingObject> vehicleBatch = vehicles.stream()
.collect(Collectors.toMap(
SpecialVehicle::getVehicleNo,
vehicle -> vehicle,
(existing, replacement) -> existing,
ConcurrentHashMap::new
));
movingObjectRepository.updateTypeAll(MovingObjectType.SPECIAL_VEHICLE, vehicleBatch);
} catch (Exception e) {
log.error("采集车辆数据异常", e);
}
// 将List转为MapvehicleNo -> SpecialVehicle
Map<String, MovingObject> vehicleBatch = vehicles.stream()
.collect(Collectors.toMap(
SpecialVehicle::getVehicleNo,
vehicle -> vehicle,
(existing, replacement) -> existing,
ConcurrentHashMap::new
));
movingObjectRepository.updateTypeAll(MovingObjectType.SPECIAL_VEHICLE, vehicleBatch);
}
// @Scheduled(fixedRateString = "${data.collector.interval}")
// @Async // 异步执行
// public void collectVehicleLocationData() {
// List<UnmannedVehicle> unmannedVehicles = dataCollectorDao.getVehicleLocationInfo();
// for (UnmannedVehicle newVehicle : unmannedVehicles) {
// String vehicleNo = newVehicle.getVehicleId();
// // 获取已存在的航空器如果存在
// UnmannedVehicle unmannedVehicle = unmannedVehicleMap.get(vehicleNo);
// if (unmannedVehicle != null) {
// // 更新现有航空器的状态
// MovementState currentState = new MovementState(
// newVehicle.getCurrentPosition(),
// newVehicle.getVelocity(),
// newVehicle.getHeading(),
// newVehicle.getTimestamp()
// );
//
// // 使用已存在航空器的历史队列
// unmannedVehicle.setCurrentPosition(newVehicle.getCurrentPosition());
// unmannedVehicle.setVelocity(newVehicle.getVelocity());
// unmannedVehicle.setHeading(newVehicle.getHeading());
// unmannedVehicle.setTimestamp(newVehicle.getTimestamp());
// unmannedVehicle.getStateHistory().addFirst(currentState);
//
// // 控制历史记录长度
// if (unmannedVehicle.getStateHistory().size() > unmannedVehicle.MAX_HISTORY) {
// unmannedVehicle.getStateHistory().removeLast();
// }
// } else {
// // 新的航空器初始化历史记录
// MovementState initialState = new MovementState(
// newVehicle.getCurrentPosition(),
// newVehicle.getVelocity(),
// newVehicle.getHeading(),
// newVehicle.getTimestamp()
// );
// newVehicle.getStateHistory().addFirst(initialState);
// unmannedVehicleMap.put(vehicleNo, newVehicle);
// }
// }
// 更新数据Map用于其他服务访问
// storeData(MovingObjectType.UNMANNED_VEHICLE.toString(), new ArrayList<>(unmannedVehicleMap.values()));
// }
//
//
// // 在DataCollectorService类中添加
// @Getter
// private final LinkedBlockingQueue<Map<String, List<Object>>> processingQueue = new LinkedBlockingQueue<>();
//
// // 修改storeData方法推送增量数据
// private <T> void storeData(String type, List<T> data) {
// List<Object> converted = new CopyOnWriteArrayList<>(data);
// dataMap.put(type, converted);
//
// // 创建增量数据包仅包含当前类型
// Map<String, List<Object>> delta = new ConcurrentHashMap<>();
// delta.put(type, converted);
// processingQueue.offer(delta);
// }
//
// private void validateAndUpdateTimestamp(MovementState state, LinkedList<MovementState> history) {
// if (!history.isEmpty()) {
// MovementState lastState = history.getFirst();
// if (state.getTimestamp() <= lastState.getTimestamp()) {
// log.warn("检测到时间戳乱序: current={}, last={}",
// state.getTimestamp(), lastState.getTimestamp());
// // 使用递增时间戳
// state.setTimestamp(lastState.getTimestamp() + 1);
// }
// }
@PreDestroy
public void shutdown() {
log.info("正在关闭数据采集服务...");
// 清理资源
dataMap.clear();
log.info("数据采集服务已关闭");
}
}

View File

@ -4,9 +4,11 @@ import com.dongni.collisionavoidance.common.model.MovingObject;
import com.dongni.collisionavoidance.common.model.MovingObjectType;
import com.dongni.collisionavoidance.common.model.repository.MovingObjectRepository;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
@ -14,6 +16,8 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
@ -28,12 +32,21 @@ public class DataProcessor {
private SpeedCalculationService speedCalculationService;
@Resource
private Executor processingExecutor;
@Value("${data.processor.enabled:true}")
private boolean processorEnabled;
private final AtomicBoolean running = new AtomicBoolean(false);
@PostConstruct
public void init() {
if (!processorEnabled) {
log.info("数据处理器已禁用,不会启动处理线程");
return;
}
log.info("启动数据处理线程...");
running.set(true);
processingExecutor.execute(() -> {
try {
processLoop();
@ -42,51 +55,68 @@ public class DataProcessor {
}
});
}
@PreDestroy
public void shutdown() {
log.info("正在关闭数据处理器...");
// 标记处理循环应该停止
running.set(false);
log.info("数据处理器已关闭");
}
// 修改takeUpdate调用逻辑
private void processLoop() {
log.debug("进入数据处理循环");
try {
while (!Thread.currentThread().isInterrupted()) {
// 添加超时机制和空值判断
Map<MovingObjectType, Set<String>> delta = movingObjectRepository.takeUpdate();
if (delta == null || delta.isEmpty()) {
log.debug("未获取到数据更新等待500ms");
Thread.sleep(500);
continue;
}
delta.forEach((objectType, ids) -> {
// 添加集合空值检查
if (ids == null || ids.isEmpty()) {
log.warn("接收到空ID集合类型{}", objectType);
return;
while (running.get() && !Thread.currentThread().isInterrupted()) {
try {
// 添加超时机制和空值判断
Map<MovingObjectType, Set<String>> delta = movingObjectRepository.pollUpdate(500, TimeUnit.MILLISECONDS);
if (delta == null || delta.isEmpty()) {
log.debug("未获取到数据更新,等待下一轮");
continue;
}
// 获取该类型全量数据快照
Map<String, MovingObject> snapshot = movingObjectRepository.getTypeMapDirect(objectType);
// 转换为待处理的数据列表
List<MovingObject> dataList = new CopyOnWriteArrayList<>(snapshot.values());
log.debug("正在处理 {} 类型的 {} 条数据", objectType, dataList.size());
try {
switch (objectType) {
case AIRCRAFT -> processAircraftData(dataList);
case SPECIAL_VEHICLE -> processVehicleData(dataList);
case UNMANNED_VEHICLE -> processLocationData(dataList);
default -> log.warn("未支持的数据类型: {}", objectType);
delta.forEach((objectType, ids) -> {
// 添加集合空值检查
if (ids == null || ids.isEmpty()) {
log.warn("接收到空ID集合类型{}", objectType);
return;
}
} catch (Exception e) {
log.error("数据处理异常 [类型: {}]", objectType, e);
// 获取该类型全量数据快照
Map<String, MovingObject> snapshot = movingObjectRepository.getTypeMapDirect(objectType);
// 转换为待处理的数据列表
List<MovingObject> dataList = new CopyOnWriteArrayList<>(snapshot.values());
log.debug("正在处理 {} 类型的 {} 条数据", objectType, dataList.size());
try {
switch (objectType) {
case AIRCRAFT -> processAircraftData(dataList);
case SPECIAL_VEHICLE -> processVehicleData(dataList);
case UNMANNED_VEHICLE -> processLocationData(dataList);
default -> log.warn("未支持的数据类型: {}", objectType);
}
} catch (Exception e) {
log.error("数据处理异常 [类型: {}]", objectType, e);
}
});
log.debug("本次处理完成,共处理{}种类型更新", delta.size());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("数据处理线程被中断");
break;
} catch (Exception e) {
log.error("数据处理过程发生错误", e);
// 避免因错误导致CPU使用率飙升
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
});
log.debug("本次处理完成,共处理{}种类型更新", delta.size());
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("数据处理线程被中断");
} catch (Exception e) {
log.error("数据处理循环发生未预期错误", e);
}

View File

@ -36,13 +36,18 @@ test-mode: true
data:
collector:
disabled: true
interval: 60000 # 设置较长的间隔以防万一
airport-api:
base-url: http://localhost:8090
auth:
username: test
password: test
data-refresh-interval-ms: 0
endpoints:
vehicle: /mock/vehicles
aircraft: /mock/aircrafts
processor:
enabled: false # 禁用数据处理器
# 日志配置
logging:
level: