CollisionAvoidanceSystem/doc/design/architecture_optimization.md
Tian jianyong 3a551f69b9 系统存在以下关键设计问题导致测试超时:
无限阻塞线程设计:
数据处理器使用 BlockingQueue.take() 方法无限期阻塞线程
缺少超时机制导致线程永远不会退出
生命周期管理不当:
使用 @PostConstruct 启动线程但没有相应的 @PreDestroy 清理机制
缺少应用程序关闭钩子,导致资源无法正确释放
线程池配置不合理:
线程池配置过大(核心10线程,最大100线程)
没有设置优雅关闭参数,导致应用关闭时线程不会终止
测试隔离不充分:
测试运行时仍尝试连接外部资源(数据库、Kafka、数据采集API)
测试配置未完全禁用不必要的服务
解决方案
我们实施了以下改进措施:
优化线程池配置:
减小线程池大小(核心5线程,最大10线程)
添加 setWaitForTasksToCompleteOnShutdown(true) 和 setAwaitTerminationSeconds(5) 配置
优化队列容量,减少内存占用
添加优雅关闭机制:
在主类添加 JVM 关闭钩子,确保资源正确释放
为服务组件添加 @PreDestroy 方法,实现自定义关闭逻辑
引入状态标志(AtomicBoolean running)控制后台线程循环
防止无限阻塞:
修改 MovingObjectRepository,添加非阻塞的 pollUpdate() 方法替代 takeUpdate()
在数据处理循环中添加超时检查,避免无限等待
优化异常处理,防止线程崩溃或 CPU 使用率飙升
完善测试环境配置:
在测试配置中完全禁用数据采集和处理服务
配置 data.collector.disabled=true 和 data.processor.enabled=false
使用 @ActiveProfiles("test") 确保测试使用正确的配置文件
添加错误处理和日志:
包装所有关键操作在 try-catch 块中,防止错误传播
添加详细日志,便于诊断问题
实现错误恢复机制,确保系统稳定性
2025-04-30 12:16:06 +08:00

12 KiB
Raw Blame History

碰撞避免系统架构优化设计

本文档提供了对碰撞避免系统架构的全面分析和优化建议,重点关注系统的线程管理、资源利用、生命周期控制和测试实践等方面。

1. 问题分析

通过代码审查和测试分析,我们发现系统存在以下架构设计问题:

1.1 线程管理问题

  • 无限阻塞线程设计

    • 数据处理器使用 BlockingQueue.take() 方法无限期阻塞线程
    • 许多后台线程没有超时机制,导致资源无法及时释放
    • 应用关闭时,阻塞线程不会收到通知,导致进程无法正常退出
  • 生命周期管理不当

    • 使用 @PostConstruct 启动线程但没有相应的 @PreDestroy 清理机制
    • 缺少应用程序关闭钩子,导致资源无法正确释放
    • 服务间依赖关系复杂,缺乏明确的启动和关闭顺序
  • 线程池配置不合理

    • 线程池配置过大核心10线程最大100线程
    • 队列容量设置较大100可能导致内存压力
    • 没有设置优雅关闭参数,导致应用关闭时线程不会终止

1.2 资源管理问题

  • 外部连接管理不当

    • 数据采集器与外部API的连接没有合理的重试和超时策略
    • HTTP客户端RestTemplate配置不完善缺少超时设置
    • 数据库连接MongoDB、Redis缺少连接池管理
  • 内存管理不佳

    • 历史数据保留机制不完善,可能导致内存溢出
    • 使用大量的ConcurrentHashMap但缺少大小限制
    • 没有有效的垃圾回收策略

1.3 测试设计问题

  • 测试隔离不充分

    • 测试运行时仍尝试连接外部资源数据库、Kafka、数据采集API
    • 测试配置未完全禁用不必要的服务
    • 缺少专门的测试数据生成机制
  • 单元测试与集成测试混合

    • 测试边界不清晰,不符合单一职责原则
    • 集成测试依赖外部资源,导致测试不稳定
    • 测试代码重复,缺少通用测试基类

2. 优化方案

2.1 线程管理优化

2.1.1 线程池配置优化

@Configuration
public class ThreadPoolConfig {
    @Bean(name = "processingExecutor")
    public Executor processingExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 减小核心线程数和最大线程数
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        // 减小队列容量,避免过多任务堆积
        executor.setQueueCapacity(25);
        executor.setThreadNamePrefix("data-process-");
        // 添加优雅关闭配置
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(5);
        executor.initialize();
        return executor;
    }
    
    // 添加专用于数据采集的线程池
    @Bean(name = "collectorExecutor")
    public Executor collectorExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(3);
        executor.setMaxPoolSize(5);
        executor.setQueueCapacity(10);
        executor.setThreadNamePrefix("data-collect-");
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(5);
        executor.initialize();
        return executor;
    }
}

2.1.2 应用生命周期管理

@Slf4j
@SpringBootApplication
@EnableScheduling
@EnableMongoRepositories
@EnableConfigurationProperties
public class CollisionAvoidanceApplication {
    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(CollisionAvoidanceApplication.class, args);
        
        // 添加关闭钩子,确保资源正确释放
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            log.info("应用程序关闭中,正在清理资源...");
            try {
                if (context != null && context.isActive()) {
                    context.close();
                }
            } catch (Exception e) {
                log.error("关闭应用程序时出错", e);
            }
            log.info("应用程序已安全关闭");
        }));
    }
}

2.1.3 服务组件生命周期管理

所有后台服务都应实现 @PreDestroy 方法:

@Component
public class SomeBackgroundService {
    
    private final AtomicBoolean running = new AtomicBoolean(false);
    
    @PostConstruct
    public void init() {
        running.set(true);
        // 启动逻辑
    }
    
    @PreDestroy
    public void shutdown() {
        log.info("关闭服务...");
        running.set(false);
        // 等待线程结束、释放资源等
        log.info("服务已关闭");
    }
}

2.2 阻塞操作优化

2.2.1 使用非阻塞方法替代无限阻塞

// 修改前:无限期阻塞
Map<MovingObjectType, Set<String>> delta = movingObjectRepository.takeUpdate();

// 修改后:有超时的阻塞
Map<MovingObjectType, Set<String>> delta = movingObjectRepository.pollUpdate(500, TimeUnit.MILLISECONDS);
if (delta == null || delta.isEmpty()) {
    // 处理超时情况
    continue;
}

2.2.2 添加中断和状态检查

private void processLoop() {
    try {
        while (running.get() && !Thread.currentThread().isInterrupted()) {
            try {
                // 处理逻辑
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.warn("线程被中断");
                break;
            } catch (Exception e) {
                log.error("处理异常", e);
                // 添加延迟避免CPU占用过高
                Thread.sleep(1000);
            }
        }
    } catch (Exception e) {
        log.error("处理循环异常", e);
    }
    log.info("退出处理循环");
}

2.3 资源管理优化

2.3.1 外部连接优化

对于RestTemplate

@Configuration
public class RestTemplateConfig {
    @Bean
    public RestTemplate restTemplate() {
        RestTemplate template = new RestTemplate();
        
        // 设置连接超时和读取超时
        HttpComponentsClientHttpRequestFactory factory = new HttpComponentsClientHttpRequestFactory();
        factory.setConnectTimeout(5000);
        factory.setReadTimeout(5000);
        factory.setConnectionRequestTimeout(5000);
        
        template.setRequestFactory(factory);
        
        // 添加重试机制
        template.setErrorHandler(new CustomResponseErrorHandler());
        
        return template;
    }
}

2.3.2 内存管理优化

// 添加缓存容量限制
@Bean
public CacheManager cacheManager() {
    SimpleCacheManager cacheManager = new SimpleCacheManager();
    
    // 配置缓存
    ConcurrentMapCache movingObjectsCache = new ConcurrentMapCache(
        "movingObjects", 
        CacheBuilder.newBuilder()
            .maximumSize(1000)  // 最大条目数
            .expireAfterWrite(5, TimeUnit.MINUTES)  // 写入后过期时间
            .build().asMap(), 
        false);
            
    cacheManager.setCaches(Arrays.asList(movingObjectsCache));
    return cacheManager;
}

2.4 测试设计优化

2.4.1 测试配置优化

# application-test.yml
spring:
  # 禁用各种自动配置
  autoconfigure:
    exclude:
      - org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration
      - org.springframework.boot.autoconfigure.data.mongo.MongoDataAutoConfiguration
      
  # 禁用调度任务
  task:
    scheduling:
      enabled: false
    execution:
      enabled: false
      
data:
  collector:
    disabled: true  # 禁用数据采集器
  processor:
    enabled: false  # 禁用数据处理器

2.4.2 测试基类设计

@SpringBootTest
@ActiveProfiles("test")
@Import(TestConfig.class)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
public abstract class BaseIntegrationTest {
    // 通用测试设置和工具方法
    
    @BeforeEach
    public void setup() {
        // 测试初始化逻辑
    }
    
    @AfterEach
    public void cleanup() {
        // 测试清理逻辑
    }
}

3. 新的架构设计原则

3.1 关注点分离

  • 数据采集、数据处理、碰撞检测等功能应严格分离
  • 每个组件专注于单一职责,通过明确的接口进行通信
  • 使用事件驱动模型代替直接依赖,降低组件间耦合

3.2 弹性设计

  • 所有外部服务调用都应有超时、重试和熔断机制
  • 系统应能在任何组件故障时保持核心功能运行
  • 提供降级策略和应急模式,确保系统可用性

3.3 可观测性

  • 添加全面的日志、指标和追踪功能
  • 实现自定义健康检查,监控系统关键组件
  • 设计故障注入机制,测试系统弹性

3.4 资源效率

  • 合理配置线程池和连接池,避免资源浪费
  • 实现资源限制和保护策略,防止过载
  • 采用分级缓存策略,优化内存使用

4. 建议采用的架构模式

4.1 事件驱动架构

// 事件定义
public class MovingObjectUpdatedEvent {
    private final MovingObjectType type;
    private final Set<String> objectIds;
    
    // 构造器、getter等
}

// 发布事件
@Service
public class DataCollectorService {
    private final ApplicationEventPublisher eventPublisher;
    
    @Autowired
    public DataCollectorService(ApplicationEventPublisher eventPublisher) {
        this.eventPublisher = eventPublisher;
    }
    
    @Scheduled(fixedRate = 5000)
    public void collectData() {
        // 采集数据
        
        // 发布事件
        eventPublisher.publishEvent(new MovingObjectUpdatedEvent(type, updatedIds));
    }
}

// 监听事件
@Service
public class DataProcessor {
    @EventListener
    public void handleMovingObjectUpdates(MovingObjectUpdatedEvent event) {
        // 处理更新通知
    }
}

4.2 响应式编程模型

使用 Spring WebFlux 和 Project Reactor

@Service
public class ReactiveDataService {
    private final MovingObjectRepository repository;
    
    // 提供响应式API
    public Flux<MovingObject> getMovingObjects(MovingObjectType type) {
        return Flux.fromIterable(repository.getByType(type).values());
    }
    
    // 响应式处理逻辑
    public Mono<ProcessingResult> processData(Flux<MovingObject> objects) {
        return objects
            .filter(this::isValid)
            .flatMap(this::transform)
            .reduce(new ProcessingResult(), this::accumulate);
    }
}

4.3 健康检查和监控机制

@Component
public class DataCollectorHealthIndicator implements HealthIndicator {
    private final DataCollectorService dataCollectorService;
    
    @Autowired
    public DataCollectorHealthIndicator(DataCollectorService dataCollectorService) {
        this.dataCollectorService = dataCollectorService;
    }
    
    @Override
    public Health health() {
        if (!dataCollectorService.isConnected()) {
            return Health.down()
                .withDetail("reason", "无法连接到数据源")
                .build();
        }
        
        return Health.up()
            .withDetail("lastUpdateTime", dataCollectorService.getLastUpdateTime())
            .withDetail("collectedItems", dataCollectorService.getCollectedItemsCount())
            .build();
    }
}

5. 结论

碰撞避免系统需要从根本上改进线程管理、资源使用和测试实践。我们推荐采用更现代的架构设计原则和模式,特别是事件驱动架构和响应式编程模型,以提高系统的可靠性、可伸缩性和可维护性。

通过实施这些建议,系统将能够更高效地处理并发任务,更可靠地管理资源,并提供更好的测试覆盖,从而确保在各种条件下的稳定运行。