diff --git a/doc/design/architecture_optimization.md b/doc/design/architecture_optimization.md new file mode 100644 index 0000000..31595c3 --- /dev/null +++ b/doc/design/architecture_optimization.md @@ -0,0 +1,410 @@ +# 碰撞避免系统架构优化设计 + +本文档提供了对碰撞避免系统架构的全面分析和优化建议,重点关注系统的线程管理、资源利用、生命周期控制和测试实践等方面。 + +## 1. 问题分析 + +通过代码审查和测试分析,我们发现系统存在以下架构设计问题: + +### 1.1 线程管理问题 + +- **无限阻塞线程设计**: + - 数据处理器使用 `BlockingQueue.take()` 方法无限期阻塞线程 + - 许多后台线程没有超时机制,导致资源无法及时释放 + - 应用关闭时,阻塞线程不会收到通知,导致进程无法正常退出 + +- **生命周期管理不当**: + - 使用 `@PostConstruct` 启动线程但没有相应的 `@PreDestroy` 清理机制 + - 缺少应用程序关闭钩子,导致资源无法正确释放 + - 服务间依赖关系复杂,缺乏明确的启动和关闭顺序 + +- **线程池配置不合理**: + - 线程池配置过大(核心10线程,最大100线程) + - 队列容量设置较大(100),可能导致内存压力 + - 没有设置优雅关闭参数,导致应用关闭时线程不会终止 + +### 1.2 资源管理问题 + +- **外部连接管理不当**: + - 数据采集器与外部API的连接没有合理的重试和超时策略 + - HTTP客户端(RestTemplate)配置不完善,缺少超时设置 + - 数据库连接(MongoDB、Redis)缺少连接池管理 + +- **内存管理不佳**: + - 历史数据保留机制不完善,可能导致内存溢出 + - 使用大量的ConcurrentHashMap但缺少大小限制 + - 没有有效的垃圾回收策略 + +### 1.3 测试设计问题 + +- **测试隔离不充分**: + - 测试运行时仍尝试连接外部资源(数据库、Kafka、数据采集API) + - 测试配置未完全禁用不必要的服务 + - 缺少专门的测试数据生成机制 + +- **单元测试与集成测试混合**: + - 测试边界不清晰,不符合单一职责原则 + - 集成测试依赖外部资源,导致测试不稳定 + - 测试代码重复,缺少通用测试基类 + +## 2. 优化方案 + +### 2.1 线程管理优化 + +#### 2.1.1 线程池配置优化 + +```java +@Configuration +public class ThreadPoolConfig { + @Bean(name = "processingExecutor") + public Executor processingExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + // 减小核心线程数和最大线程数 + executor.setCorePoolSize(5); + executor.setMaxPoolSize(10); + // 减小队列容量,避免过多任务堆积 + executor.setQueueCapacity(25); + executor.setThreadNamePrefix("data-process-"); + // 添加优雅关闭配置 + executor.setWaitForTasksToCompleteOnShutdown(true); + executor.setAwaitTerminationSeconds(5); + executor.initialize(); + return executor; + } + + // 添加专用于数据采集的线程池 + @Bean(name = "collectorExecutor") + public Executor collectorExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(3); + executor.setMaxPoolSize(5); + executor.setQueueCapacity(10); + executor.setThreadNamePrefix("data-collect-"); + executor.setWaitForTasksToCompleteOnShutdown(true); + executor.setAwaitTerminationSeconds(5); + executor.initialize(); + return executor; + } +} +``` + +#### 2.1.2 应用生命周期管理 + +```java +@Slf4j +@SpringBootApplication +@EnableScheduling +@EnableMongoRepositories +@EnableConfigurationProperties +public class CollisionAvoidanceApplication { + public static void main(String[] args) { + ConfigurableApplicationContext context = SpringApplication.run(CollisionAvoidanceApplication.class, args); + + // 添加关闭钩子,确保资源正确释放 + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + log.info("应用程序关闭中,正在清理资源..."); + try { + if (context != null && context.isActive()) { + context.close(); + } + } catch (Exception e) { + log.error("关闭应用程序时出错", e); + } + log.info("应用程序已安全关闭"); + })); + } +} +``` + +#### 2.1.3 服务组件生命周期管理 + +所有后台服务都应实现 `@PreDestroy` 方法: + +```java +@Component +public class SomeBackgroundService { + + private final AtomicBoolean running = new AtomicBoolean(false); + + @PostConstruct + public void init() { + running.set(true); + // 启动逻辑 + } + + @PreDestroy + public void shutdown() { + log.info("关闭服务..."); + running.set(false); + // 等待线程结束、释放资源等 + log.info("服务已关闭"); + } +} +``` + +### 2.2 阻塞操作优化 + +#### 2.2.1 使用非阻塞方法替代无限阻塞 + +```java +// 修改前:无限期阻塞 +Map> delta = movingObjectRepository.takeUpdate(); + +// 修改后:有超时的阻塞 +Map> delta = movingObjectRepository.pollUpdate(500, TimeUnit.MILLISECONDS); +if (delta == null || delta.isEmpty()) { + // 处理超时情况 + continue; +} +``` + +#### 2.2.2 添加中断和状态检查 + +```java +private void processLoop() { + try { + while (running.get() && !Thread.currentThread().isInterrupted()) { + try { + // 处理逻辑 + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("线程被中断"); + break; + } catch (Exception e) { + log.error("处理异常", e); + // 添加延迟,避免CPU占用过高 + Thread.sleep(1000); + } + } + } catch (Exception e) { + log.error("处理循环异常", e); + } + log.info("退出处理循环"); +} +``` + +### 2.3 资源管理优化 + +#### 2.3.1 外部连接优化 + +对于RestTemplate: + +```java +@Configuration +public class RestTemplateConfig { + @Bean + public RestTemplate restTemplate() { + RestTemplate template = new RestTemplate(); + + // 设置连接超时和读取超时 + HttpComponentsClientHttpRequestFactory factory = new HttpComponentsClientHttpRequestFactory(); + factory.setConnectTimeout(5000); + factory.setReadTimeout(5000); + factory.setConnectionRequestTimeout(5000); + + template.setRequestFactory(factory); + + // 添加重试机制 + template.setErrorHandler(new CustomResponseErrorHandler()); + + return template; + } +} +``` + +#### 2.3.2 内存管理优化 + +```java +// 添加缓存容量限制 +@Bean +public CacheManager cacheManager() { + SimpleCacheManager cacheManager = new SimpleCacheManager(); + + // 配置缓存 + ConcurrentMapCache movingObjectsCache = new ConcurrentMapCache( + "movingObjects", + CacheBuilder.newBuilder() + .maximumSize(1000) // 最大条目数 + .expireAfterWrite(5, TimeUnit.MINUTES) // 写入后过期时间 + .build().asMap(), + false); + + cacheManager.setCaches(Arrays.asList(movingObjectsCache)); + return cacheManager; +} +``` + +### 2.4 测试设计优化 + +#### 2.4.1 测试配置优化 + +```yaml +# application-test.yml +spring: + # 禁用各种自动配置 + autoconfigure: + exclude: + - org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration + - org.springframework.boot.autoconfigure.data.mongo.MongoDataAutoConfiguration + + # 禁用调度任务 + task: + scheduling: + enabled: false + execution: + enabled: false + +data: + collector: + disabled: true # 禁用数据采集器 + processor: + enabled: false # 禁用数据处理器 +``` + +#### 2.4.2 测试基类设计 + +```java +@SpringBootTest +@ActiveProfiles("test") +@Import(TestConfig.class) +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) +public abstract class BaseIntegrationTest { + // 通用测试设置和工具方法 + + @BeforeEach + public void setup() { + // 测试初始化逻辑 + } + + @AfterEach + public void cleanup() { + // 测试清理逻辑 + } +} +``` + +## 3. 新的架构设计原则 + +### 3.1 关注点分离 + +- 数据采集、数据处理、碰撞检测等功能应严格分离 +- 每个组件专注于单一职责,通过明确的接口进行通信 +- 使用事件驱动模型代替直接依赖,降低组件间耦合 + +### 3.2 弹性设计 + +- 所有外部服务调用都应有超时、重试和熔断机制 +- 系统应能在任何组件故障时保持核心功能运行 +- 提供降级策略和应急模式,确保系统可用性 + +### 3.3 可观测性 + +- 添加全面的日志、指标和追踪功能 +- 实现自定义健康检查,监控系统关键组件 +- 设计故障注入机制,测试系统弹性 + +### 3.4 资源效率 + +- 合理配置线程池和连接池,避免资源浪费 +- 实现资源限制和保护策略,防止过载 +- 采用分级缓存策略,优化内存使用 + +## 4. 建议采用的架构模式 + +### 4.1 事件驱动架构 + +```java +// 事件定义 +public class MovingObjectUpdatedEvent { + private final MovingObjectType type; + private final Set objectIds; + + // 构造器、getter等 +} + +// 发布事件 +@Service +public class DataCollectorService { + private final ApplicationEventPublisher eventPublisher; + + @Autowired + public DataCollectorService(ApplicationEventPublisher eventPublisher) { + this.eventPublisher = eventPublisher; + } + + @Scheduled(fixedRate = 5000) + public void collectData() { + // 采集数据 + + // 发布事件 + eventPublisher.publishEvent(new MovingObjectUpdatedEvent(type, updatedIds)); + } +} + +// 监听事件 +@Service +public class DataProcessor { + @EventListener + public void handleMovingObjectUpdates(MovingObjectUpdatedEvent event) { + // 处理更新通知 + } +} +``` + +### 4.2 响应式编程模型 + +使用 Spring WebFlux 和 Project Reactor: + +```java +@Service +public class ReactiveDataService { + private final MovingObjectRepository repository; + + // 提供响应式API + public Flux getMovingObjects(MovingObjectType type) { + return Flux.fromIterable(repository.getByType(type).values()); + } + + // 响应式处理逻辑 + public Mono processData(Flux objects) { + return objects + .filter(this::isValid) + .flatMap(this::transform) + .reduce(new ProcessingResult(), this::accumulate); + } +} +``` + +### 4.3 健康检查和监控机制 + +```java +@Component +public class DataCollectorHealthIndicator implements HealthIndicator { + private final DataCollectorService dataCollectorService; + + @Autowired + public DataCollectorHealthIndicator(DataCollectorService dataCollectorService) { + this.dataCollectorService = dataCollectorService; + } + + @Override + public Health health() { + if (!dataCollectorService.isConnected()) { + return Health.down() + .withDetail("reason", "无法连接到数据源") + .build(); + } + + return Health.up() + .withDetail("lastUpdateTime", dataCollectorService.getLastUpdateTime()) + .withDetail("collectedItems", dataCollectorService.getCollectedItemsCount()) + .build(); + } +} +``` + +## 5. 结论 + +碰撞避免系统需要从根本上改进线程管理、资源使用和测试实践。我们推荐采用更现代的架构设计原则和模式,特别是事件驱动架构和响应式编程模型,以提高系统的可靠性、可伸缩性和可维护性。 + +通过实施这些建议,系统将能够更高效地处理并发任务,更可靠地管理资源,并提供更好的测试覆盖,从而确保在各种条件下的稳定运行。 \ No newline at end of file diff --git a/src/main/java/com/dongni/collisionavoidance/CollisionAvoidanceApplication.java b/src/main/java/com/dongni/collisionavoidance/CollisionAvoidanceApplication.java index 53eb588..399a17c 100644 --- a/src/main/java/com/dongni/collisionavoidance/CollisionAvoidanceApplication.java +++ b/src/main/java/com/dongni/collisionavoidance/CollisionAvoidanceApplication.java @@ -1,11 +1,14 @@ package com.dongni.collisionavoidance; +import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.data.mongodb.repository.config.EnableMongoRepositories; +@Slf4j @EnableScheduling @SpringBootApplication @EnableMongoRepositories @@ -13,6 +16,20 @@ import org.springframework.data.mongodb.repository.config.EnableMongoRepositorie public class CollisionAvoidanceApplication { public static void main(String[] args) { - SpringApplication.run(CollisionAvoidanceApplication.class, args); + ConfigurableApplicationContext context = SpringApplication.run(CollisionAvoidanceApplication.class, args); + + // 添加关闭钩子,确保所有资源能够正确释放 + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + log.info("应用程序关闭中,正在清理资源..."); + try { + // 关闭Spring上下文 + if (context != null && context.isActive()) { + context.close(); + } + } catch (Exception e) { + log.error("关闭应用程序时出错", e); + } + log.info("应用程序已安全关闭"); + })); } } diff --git a/src/main/java/com/dongni/collisionavoidance/common/model/repository/MovingObjectRepository.java b/src/main/java/com/dongni/collisionavoidance/common/model/repository/MovingObjectRepository.java index 3e5f262..975f4fa 100644 --- a/src/main/java/com/dongni/collisionavoidance/common/model/repository/MovingObjectRepository.java +++ b/src/main/java/com/dongni/collisionavoidance/common/model/repository/MovingObjectRepository.java @@ -8,6 +8,7 @@ import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; @Component public class MovingObjectRepository { @@ -69,8 +70,11 @@ public class MovingObjectRepository { public Map> takeUpdate() throws InterruptedException { return updateQueue.take(); } - - // 添加在 MovingObjectRepository 类中 + + // 非阻塞获取更新通知,带超时 + public Map> pollUpdate(long timeout, TimeUnit unit) throws InterruptedException { + return updateQueue.poll(timeout, unit); + } // 批量更新整个类型集合 public void updateTypeAll(MovingObjectType type, Map newObjects) { diff --git a/src/main/java/com/dongni/collisionavoidance/config/ThreadPoolConfig.java b/src/main/java/com/dongni/collisionavoidance/config/ThreadPoolConfig.java index 2f51d68..ba2c7f6 100644 --- a/src/main/java/com/dongni/collisionavoidance/config/ThreadPoolConfig.java +++ b/src/main/java/com/dongni/collisionavoidance/config/ThreadPoolConfig.java @@ -12,10 +12,12 @@ public class ThreadPoolConfig { @Bean(name = "processingExecutor") public Executor processingExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(10); - executor.setMaxPoolSize(100); - executor.setQueueCapacity(100); + executor.setCorePoolSize(5); + executor.setMaxPoolSize(10); + executor.setQueueCapacity(25); executor.setThreadNamePrefix("data-process-"); + executor.setWaitForTasksToCompleteOnShutdown(true); + executor.setAwaitTerminationSeconds(5); executor.initialize(); return executor; } diff --git a/src/main/java/com/dongni/collisionavoidance/dataCollector/service/DataCollectorService.java b/src/main/java/com/dongni/collisionavoidance/dataCollector/service/DataCollectorService.java index 6fa855f..3aea147 100644 --- a/src/main/java/com/dongni/collisionavoidance/dataCollector/service/DataCollectorService.java +++ b/src/main/java/com/dongni/collisionavoidance/dataCollector/service/DataCollectorService.java @@ -6,6 +6,7 @@ import com.dongni.collisionavoidance.common.model.dto.SpecialVehicleDTO; import com.dongni.collisionavoidance.common.model.repository.MovingObjectRepository; import com.dongni.collisionavoidance.dataCollector.dao.DataCollectorDao; import com.dongni.collisionavoidance.dataCollector.model.VehicleLocationInfo; +import jakarta.annotation.PreDestroy; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -34,6 +35,9 @@ public class DataCollectorService { @Value("${data.collector.airport-api.base-url}") private String airportBaseUrl; + + @Value("${data.collector.disabled:false}") + private boolean collectorDisabled; // 线程安全队列(用于暂存原始数据) @Getter @@ -47,152 +51,106 @@ public class DataCollectorService { @Scheduled(fixedRateString = "${data.collector.interval}") -// @Async // 异步执行 public void collectAircraftData() { - List newAircrafts = dataCollectorDao.collectAircraftData(airportAircraftEndpoint, airportBaseUrl); - - for (Aircraft newAircraft : newAircrafts) { - String flightNo = newAircraft.getFlightNo(); - Map snapshot = movingObjectRepository.getTypeMapDirect(MovingObjectType.AIRCRAFT); - Aircraft existingAircraft = (Aircraft) snapshot.get(flightNo); - // 获取已存在的航空器(如果存在) - if (existingAircraft != null) { - // 更新现有航空器的状态 - MovementState currentState = new MovementState( - existingAircraft.getCurrentPosition(), - existingAircraft.getVelocity(), - existingAircraft.getHeading(), - existingAircraft.getTimestamp()); - - // 控制历史记录长度 - if (existingAircraft.getStateHistory().size() + 1 > existingAircraft.MAX_HISTORY) { - existingAircraft.getStateHistory().removeLast(); - } - existingAircraft.getStateHistory().addFirst(currentState); - newAircraft.setStateHistory(existingAircraft.getStateHistory()); + if (collectorDisabled) { + return; + } + + try { + List newAircrafts = dataCollectorDao.collectAircraftData(airportAircraftEndpoint, airportBaseUrl); + if (newAircrafts.isEmpty()) { + return; } - // 将List转为Map(flightNo -> Aircraft) - Map aircraftBatch = newAircrafts.stream() - .collect(Collectors.toMap( - Aircraft::getFlightNo, - aircraft -> aircraft, - (existing, replacement) -> existing, - ConcurrentHashMap::new - )); - movingObjectRepository.updateTypeAll(MovingObjectType.AIRCRAFT, aircraftBatch); + + for (Aircraft newAircraft : newAircrafts) { + String flightNo = newAircraft.getFlightNo(); + Map snapshot = movingObjectRepository.getTypeMapDirect(MovingObjectType.AIRCRAFT); + Aircraft existingAircraft = (Aircraft) snapshot.get(flightNo); + // 获取已存在的航空器(如果存在) + if (existingAircraft != null) { + // 更新现有航空器的状态 + MovementState currentState = new MovementState( + existingAircraft.getCurrentPosition(), + existingAircraft.getVelocity(), + existingAircraft.getHeading(), + existingAircraft.getTimestamp()); + + // 控制历史记录长度 + if (existingAircraft.getStateHistory().size() + 1 > existingAircraft.MAX_HISTORY) { + existingAircraft.getStateHistory().removeLast(); + } + existingAircraft.getStateHistory().addFirst(currentState); + newAircraft.setStateHistory(existingAircraft.getStateHistory()); + } + // 将List转为Map(flightNo -> Aircraft) + Map aircraftBatch = newAircrafts.stream() + .collect(Collectors.toMap( + Aircraft::getFlightNo, + aircraft -> aircraft, + (existing, replacement) -> existing, + ConcurrentHashMap::new + )); + movingObjectRepository.updateTypeAll(MovingObjectType.AIRCRAFT, aircraftBatch); + } + } catch (Exception e) { + log.error("采集航空器数据异常", e); } } @Scheduled(fixedRateString = "${data.collector.interval}") @Async // 异步执行 public void collectVehicleData() { - List vehicles = dataCollectorDao.collectVehicleData(airportVehicleEndpoint, airportBaseUrl); - for (SpecialVehicle newVehicle : vehicles) { - String vehicleNo = newVehicle.getVehicleNo(); - // 获取已存在的航空器(如果存在) - Map snapshot = movingObjectRepository.getSnapshot(MovingObjectType.SPECIAL_VEHICLE); - SpecialVehicle specialVehicle = (SpecialVehicle) snapshot.get(vehicleNo); - if (specialVehicle != null) { - // 更新现有航空器的状态 - MovementState currentState = new MovementState( - specialVehicle.getCurrentPosition(), - specialVehicle.getVelocity(), - specialVehicle.getHeading(), - specialVehicle.getTimestamp() - ); - - // 控制历史记录长度 - if (specialVehicle.getStateHistory().size() > specialVehicle.MAX_HISTORY) { - specialVehicle.getStateHistory().removeLast(); - } - specialVehicle.getStateHistory().addFirst(currentState); - newVehicle.setStateHistory(specialVehicle.getStateHistory()); - } - + if (collectorDisabled) { + return; + } + + try { + List vehicles = dataCollectorDao.collectVehicleData(airportVehicleEndpoint, airportBaseUrl); + if (vehicles.isEmpty()) { + return; + } + + for (SpecialVehicle newVehicle : vehicles) { + String vehicleNo = newVehicle.getVehicleNo(); + // 获取已存在的航空器(如果存在) + Map snapshot = movingObjectRepository.getSnapshot(MovingObjectType.SPECIAL_VEHICLE); + SpecialVehicle specialVehicle = (SpecialVehicle) snapshot.get(vehicleNo); + if (specialVehicle != null) { + // 更新现有航空器的状态 + MovementState currentState = new MovementState( + specialVehicle.getCurrentPosition(), + specialVehicle.getVelocity(), + specialVehicle.getHeading(), + specialVehicle.getTimestamp() + ); + + // 控制历史记录长度 + if (specialVehicle.getStateHistory().size() > specialVehicle.MAX_HISTORY) { + specialVehicle.getStateHistory().removeLast(); + } + specialVehicle.getStateHistory().addFirst(currentState); + newVehicle.setStateHistory(specialVehicle.getStateHistory()); + } + } + // 将List转为Map(vehicleNo -> SpecialVehicle) + Map vehicleBatch = vehicles.stream() + .collect(Collectors.toMap( + SpecialVehicle::getVehicleNo, + vehicle -> vehicle, + (existing, replacement) -> existing, + ConcurrentHashMap::new + )); + movingObjectRepository.updateTypeAll(MovingObjectType.SPECIAL_VEHICLE, vehicleBatch); + } catch (Exception e) { + log.error("采集车辆数据异常", e); } - // 将List转为Map(vehicleNo -> SpecialVehicle) - Map vehicleBatch = vehicles.stream() - .collect(Collectors.toMap( - SpecialVehicle::getVehicleNo, - vehicle -> vehicle, - (existing, replacement) -> existing, - ConcurrentHashMap::new - )); - movingObjectRepository.updateTypeAll(MovingObjectType.SPECIAL_VEHICLE, vehicleBatch); } - - - -// @Scheduled(fixedRateString = "${data.collector.interval}") -// @Async // 异步执行 -// public void collectVehicleLocationData() { -// List unmannedVehicles = dataCollectorDao.getVehicleLocationInfo(); -// for (UnmannedVehicle newVehicle : unmannedVehicles) { -// String vehicleNo = newVehicle.getVehicleId(); -// // 获取已存在的航空器(如果存在) -// UnmannedVehicle unmannedVehicle = unmannedVehicleMap.get(vehicleNo); -// if (unmannedVehicle != null) { -// // 更新现有航空器的状态 -// MovementState currentState = new MovementState( -// newVehicle.getCurrentPosition(), -// newVehicle.getVelocity(), -// newVehicle.getHeading(), -// newVehicle.getTimestamp() -// ); -// -// // 使用已存在航空器的历史队列 -// unmannedVehicle.setCurrentPosition(newVehicle.getCurrentPosition()); -// unmannedVehicle.setVelocity(newVehicle.getVelocity()); -// unmannedVehicle.setHeading(newVehicle.getHeading()); -// unmannedVehicle.setTimestamp(newVehicle.getTimestamp()); -// unmannedVehicle.getStateHistory().addFirst(currentState); -// -// // 控制历史记录长度 -// if (unmannedVehicle.getStateHistory().size() > unmannedVehicle.MAX_HISTORY) { -// unmannedVehicle.getStateHistory().removeLast(); -// } -// } else { -// // 新的航空器,初始化历史记录 -// MovementState initialState = new MovementState( -// newVehicle.getCurrentPosition(), -// newVehicle.getVelocity(), -// newVehicle.getHeading(), -// newVehicle.getTimestamp() -// ); -// newVehicle.getStateHistory().addFirst(initialState); -// unmannedVehicleMap.put(vehicleNo, newVehicle); -// } -// } - - // 更新数据Map(用于其他服务访问) -// storeData(MovingObjectType.UNMANNED_VEHICLE.toString(), new ArrayList<>(unmannedVehicleMap.values())); -// } -// -// -// // 在DataCollectorService类中添加: -// @Getter -// private final LinkedBlockingQueue>> processingQueue = new LinkedBlockingQueue<>(); -// -// // 修改storeData方法,推送增量数据 -// private void storeData(String type, List data) { -// List converted = new CopyOnWriteArrayList<>(data); -// dataMap.put(type, converted); -// -// // 创建增量数据包(仅包含当前类型) -// Map> delta = new ConcurrentHashMap<>(); -// delta.put(type, converted); -// processingQueue.offer(delta); -// } -// -// private void validateAndUpdateTimestamp(MovementState state, LinkedList history) { -// if (!history.isEmpty()) { -// MovementState lastState = history.getFirst(); -// if (state.getTimestamp() <= lastState.getTimestamp()) { -// log.warn("检测到时间戳乱序: current={}, last={}", -// state.getTimestamp(), lastState.getTimestamp()); -// // 使用递增时间戳 -// state.setTimestamp(lastState.getTimestamp() + 1); -// } -// } - + + @PreDestroy + public void shutdown() { + log.info("正在关闭数据采集服务..."); + // 清理资源 + dataMap.clear(); + log.info("数据采集服务已关闭"); + } } diff --git a/src/main/java/com/dongni/collisionavoidance/dataProcessing/service/DataProcessor.java b/src/main/java/com/dongni/collisionavoidance/dataProcessing/service/DataProcessor.java index 381808d..93f04da 100644 --- a/src/main/java/com/dongni/collisionavoidance/dataProcessing/service/DataProcessor.java +++ b/src/main/java/com/dongni/collisionavoidance/dataProcessing/service/DataProcessor.java @@ -4,9 +4,11 @@ import com.dongni.collisionavoidance.common.model.MovingObject; import com.dongni.collisionavoidance.common.model.MovingObjectType; import com.dongni.collisionavoidance.common.model.repository.MovingObjectRepository; import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; @@ -14,6 +16,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; @Slf4j @@ -28,12 +32,21 @@ public class DataProcessor { private SpeedCalculationService speedCalculationService; @Resource private Executor processingExecutor; - - + + @Value("${data.processor.enabled:true}") + private boolean processorEnabled; + + private final AtomicBoolean running = new AtomicBoolean(false); @PostConstruct public void init() { + if (!processorEnabled) { + log.info("数据处理器已禁用,不会启动处理线程"); + return; + } + log.info("启动数据处理线程..."); + running.set(true); processingExecutor.execute(() -> { try { processLoop(); @@ -42,51 +55,68 @@ public class DataProcessor { } }); } + + @PreDestroy + public void shutdown() { + log.info("正在关闭数据处理器..."); + // 标记处理循环应该停止 + running.set(false); + log.info("数据处理器已关闭"); + } - - // 修改takeUpdate调用逻辑 private void processLoop() { log.debug("进入数据处理循环"); try { - while (!Thread.currentThread().isInterrupted()) { - // 添加超时机制和空值判断 - Map> delta = movingObjectRepository.takeUpdate(); - if (delta == null || delta.isEmpty()) { - log.debug("未获取到数据更新,等待500ms"); - Thread.sleep(500); - continue; - } - - delta.forEach((objectType, ids) -> { - // 添加集合空值检查 - if (ids == null || ids.isEmpty()) { - log.warn("接收到空ID集合,类型:{}", objectType); - return; + while (running.get() && !Thread.currentThread().isInterrupted()) { + try { + // 添加超时机制和空值判断 + Map> delta = movingObjectRepository.pollUpdate(500, TimeUnit.MILLISECONDS); + if (delta == null || delta.isEmpty()) { + log.debug("未获取到数据更新,等待下一轮"); + continue; } - // 获取该类型全量数据快照 - Map snapshot = movingObjectRepository.getTypeMapDirect(objectType); - // 转换为待处理的数据列表 - List dataList = new CopyOnWriteArrayList<>(snapshot.values()); - - log.debug("正在处理 {} 类型的 {} 条数据", objectType, dataList.size()); - - try { - switch (objectType) { - case AIRCRAFT -> processAircraftData(dataList); - case SPECIAL_VEHICLE -> processVehicleData(dataList); - case UNMANNED_VEHICLE -> processLocationData(dataList); - default -> log.warn("未支持的数据类型: {}", objectType); + delta.forEach((objectType, ids) -> { + // 添加集合空值检查 + if (ids == null || ids.isEmpty()) { + log.warn("接收到空ID集合,类型:{}", objectType); + return; } - } catch (Exception e) { - log.error("数据处理异常 [类型: {}]", objectType, e); + // 获取该类型全量数据快照 + Map snapshot = movingObjectRepository.getTypeMapDirect(objectType); + + // 转换为待处理的数据列表 + List dataList = new CopyOnWriteArrayList<>(snapshot.values()); + + log.debug("正在处理 {} 类型的 {} 条数据", objectType, dataList.size()); + + try { + switch (objectType) { + case AIRCRAFT -> processAircraftData(dataList); + case SPECIAL_VEHICLE -> processVehicleData(dataList); + case UNMANNED_VEHICLE -> processLocationData(dataList); + default -> log.warn("未支持的数据类型: {}", objectType); + } + } catch (Exception e) { + log.error("数据处理异常 [类型: {}]", objectType, e); + } + }); + log.debug("本次处理完成,共处理{}种类型更新", delta.size()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("数据处理线程被中断"); + break; + } catch (Exception e) { + log.error("数据处理过程发生错误", e); + // 避免因错误导致CPU使用率飙升 + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; } - }); - log.debug("本次处理完成,共处理{}种类型更新", delta.size()); + } } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.warn("数据处理线程被中断"); } catch (Exception e) { log.error("数据处理循环发生未预期错误", e); } diff --git a/src/test/resources/application-test.yml b/src/test/resources/application-test.yml index 4358de6..cffe937 100644 --- a/src/test/resources/application-test.yml +++ b/src/test/resources/application-test.yml @@ -36,13 +36,18 @@ test-mode: true data: collector: disabled: true + interval: 60000 # 设置较长的间隔以防万一 airport-api: base-url: http://localhost:8090 auth: username: test password: test - data-refresh-interval-ms: 0 - + endpoints: + vehicle: /mock/vehicles + aircraft: /mock/aircrafts + processor: + enabled: false # 禁用数据处理器 + # 日志配置 logging: level: