From 574dfc8b40975e403af8871c4ba51f78ce5e9e03 Mon Sep 17 00:00:00 2001 From: Tian jianyong <11429339@qq.com> Date: Wed, 30 Apr 2025 11:55:14 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BA=86=E6=B5=8B?= =?UTF-8?q?=E8=AF=95=E6=A1=86=E6=9E=B6=E5=92=8C=E6=B5=8B=E8=AF=95=E7=94=A8?= =?UTF-8?q?=E4=BE=8B=EF=BC=8C=E8=A7=A3=E5=86=B3=E8=B6=85=E6=97=B6=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 25 +++ .../area/model/AreaType.java | 16 -- .../area/service/AirportAreaService.java | 33 ---- .../areas/model/AreaInfo.java | 25 +++ .../areas/model/AreaType.java | 16 ++ .../areas/service/AirportAreaService.java | 176 ++++++++++++++++++ src/main/resources/application.yml | 3 +- .../AirportAreaServiceIntegrationTest.java | 127 ++++++++++--- .../collisionavoidance/config/TestConfig.java | 151 +++++++++++++++ .../RoadNetworkServiceIntegrationTest.java | 12 +- src/test/resources/application-test.yml | 51 +++++ src/test/resources/config/airport_areas.yaml | 38 ++-- 12 files changed, 581 insertions(+), 92 deletions(-) delete mode 100644 src/main/java/com/dongni/collisionavoidance/area/model/AreaType.java delete mode 100644 src/main/java/com/dongni/collisionavoidance/area/service/AirportAreaService.java create mode 100644 src/main/java/com/dongni/collisionavoidance/areas/model/AreaInfo.java create mode 100644 src/main/java/com/dongni/collisionavoidance/areas/model/AreaType.java create mode 100644 src/main/java/com/dongni/collisionavoidance/areas/service/AirportAreaService.java create mode 100644 src/test/java/com/dongni/collisionavoidance/config/TestConfig.java create mode 100644 src/test/resources/application-test.yml diff --git a/pom.xml b/pom.xml index 1de518e..2b46905 100644 --- a/pom.xml +++ b/pom.xml @@ -166,6 +166,31 @@ + + org.apache.maven.plugins + maven-surefire-plugin + + + 60 + + kill + + 2 + + false + 1 + false + + 5 + + + false + org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration + + test + + + diff --git a/src/main/java/com/dongni/collisionavoidance/area/model/AreaType.java b/src/main/java/com/dongni/collisionavoidance/area/model/AreaType.java deleted file mode 100644 index 4aed532..0000000 --- a/src/main/java/com/dongni/collisionavoidance/area/model/AreaType.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.dongni.collisionavoidance.area.model; - -/** - * 机场区域类型枚举 - */ -public enum AreaType { - RUNWAY, // 跑道 - TAXIWAY, // 滑行道 - APRON, // 停机坪 - SERVICE_AREA, // 服务区 - CARGO_AREA, // 货运区 - TERMINAL_AREA, // 航站楼区域 - MAINTENANCE, // 维修区 - RESTRICTED, // 限制区 - PROTECTION // 保护区 -} \ No newline at end of file diff --git a/src/main/java/com/dongni/collisionavoidance/area/service/AirportAreaService.java b/src/main/java/com/dongni/collisionavoidance/area/service/AirportAreaService.java deleted file mode 100644 index 0c9eb1a..0000000 --- a/src/main/java/com/dongni/collisionavoidance/area/service/AirportAreaService.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.dongni.collisionavoidance.area.service; - -import com.dongni.collisionavoidance.config.properties.AirportAreasProperties; -import com.dongni.collisionavoidance.config.properties.AreaProperties; -import org.springframework.stereotype.Service; - -import java.util.List; -import java.util.Optional; - -@Service -public class AirportAreaService { - private final List areas; - - public AirportAreaService(AirportAreasProperties properties) { - this.areas = properties.getAreas(); - } - - public List getAllAreas() { - return areas; - } - - public Optional getAreaById(String id) { - return areas.stream() - .filter(area -> area.getId().equals(id)) - .findFirst(); - } - - public List getAreasByType(String type) { - return areas.stream() - .filter(area -> area.getType().equals(type)) - .toList(); - } -} \ No newline at end of file diff --git a/src/main/java/com/dongni/collisionavoidance/areas/model/AreaInfo.java b/src/main/java/com/dongni/collisionavoidance/areas/model/AreaInfo.java new file mode 100644 index 0000000..2dd3f69 --- /dev/null +++ b/src/main/java/com/dongni/collisionavoidance/areas/model/AreaInfo.java @@ -0,0 +1,25 @@ +package com.dongni.collisionavoidance.areas.model; + +import lombok.Builder; +import lombok.Value; +import org.locationtech.jts.geom.Polygon; +import java.time.ZonedDateTime; +import java.util.List; + +@Value +@Builder +public class AreaInfo { + String id; // 区域唯一标识 + String name; // 区域名称 + AreaType type; // 区域类型(跑道、机坪等) + Double speedLimitKph; // 限速(公里/小时) + String description; // 区域用途描述 + boolean restricted; // 是否限制进入 + List allowedVehicleTypes; // 允许的车辆类型 + List allowedAircraftTypes; // 允许的航空器类型 + Double maxHeight; // 最大高度限制(米) + Double maxWeight; // 最大重量限制(吨) + Polygon boundary; // JTS 多边形边界 + ZonedDateTime activeTime; // 生效时间(用于临时区域) + ZonedDateTime expiryTime; // 失效时间(用于临时区域) +} \ No newline at end of file diff --git a/src/main/java/com/dongni/collisionavoidance/areas/model/AreaType.java b/src/main/java/com/dongni/collisionavoidance/areas/model/AreaType.java new file mode 100644 index 0000000..70a742b --- /dev/null +++ b/src/main/java/com/dongni/collisionavoidance/areas/model/AreaType.java @@ -0,0 +1,16 @@ +package com.dongni.collisionavoidance.areas.model; + +/** + * 机场区域类型枚举 + */ +public enum AreaType { + RUNWAY, // 跑道 + TAXIWAY, // 滑行道 + APRON, // 停机坪 + SERVICE_AREA, // 服务区 + CARGO_AREA, // 货运区 + TERMINAL_AREA, // 航站楼区域 + MAINTENANCE, // 维修区 + RESTRICTED, // 限制区 + PROTECTION // 保护区 +} \ No newline at end of file diff --git a/src/main/java/com/dongni/collisionavoidance/areas/service/AirportAreaService.java b/src/main/java/com/dongni/collisionavoidance/areas/service/AirportAreaService.java new file mode 100644 index 0000000..cc5f518 --- /dev/null +++ b/src/main/java/com/dongni/collisionavoidance/areas/service/AirportAreaService.java @@ -0,0 +1,176 @@ +package com.dongni.collisionavoidance.areas.service; + +import com.dongni.collisionavoidance.areas.model.AreaInfo; +import com.dongni.collisionavoidance.areas.model.AreaType; +import com.dongni.collisionavoidance.common.model.GeoPosition; +import com.dongni.collisionavoidance.config.properties.AirportAreasProperties; +import com.dongni.collisionavoidance.config.properties.AreaProperties; +import com.dongni.collisionavoidance.config.properties.GeometryProperties; +import lombok.extern.slf4j.Slf4j; +import org.locationtech.jts.geom.Coordinate; +import org.locationtech.jts.geom.GeometryFactory; +import org.locationtech.jts.geom.Point; +import org.locationtech.jts.geom.Polygon; +import org.locationtech.jts.index.strtree.STRtree; +import org.springframework.stereotype.Service; + +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +@Slf4j +@Service +public class AirportAreaService { + private final List areas; + private final STRtree spatialIndex; + private final GeometryFactory geometryFactory; + private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ISO_DATE_TIME; + + public AirportAreaService(AirportAreasProperties properties) { + this.geometryFactory = new GeometryFactory(); + this.areas = convertToAreaInfo(properties.getAreas()); + this.spatialIndex = buildSpatialIndex(); + } + + private List convertToAreaInfo(List properties) { + return properties.stream() + .map(this::convertToAreaInfo) + .collect(Collectors.toList()); + } + + private AreaInfo convertToAreaInfo(AreaProperties properties) { + return AreaInfo.builder() + .id(properties.getId()) + .name(properties.getName()) + .type(AreaType.valueOf(properties.getType())) + .speedLimitKph(properties.getSpeedLimit()) + .description(properties.getPurpose()) + .restricted(properties.getRestrictions() != null && !properties.getRestrictions().isEmpty()) + .allowedVehicleTypes(properties.getAllowedVehicleTypes()) + .allowedAircraftTypes(properties.getAllowedAircraftTypes()) + .maxHeight(properties.getMaxHeight()) + .maxWeight(properties.getMaxWeight()) + .boundary(convertToPolygon(properties.getGeometry())) + .activeTime(parseDateTime(properties.getActiveTime())) + .expiryTime(parseDateTime(properties.getExpiryTime())) + .build(); + } + + private Polygon convertToPolygon(GeometryProperties geometry) { + if (geometry == null || geometry.getCoordinates() == null || geometry.getCoordinates().isEmpty()) { + return null; + } + + try { + List coordinates = geometry.getCoordinates().stream() + .map(coord -> { + double lon = coord.get(0); + double lat = coord.get(1); + log.info("转换坐标: [{},{}] -> JTS坐标", lon, lat); + return new Coordinate(lon, lat); + }) + .collect(Collectors.toList()); + + // 确保多边形是闭合的 + if (!coordinates.get(0).equals(coordinates.get(coordinates.size() - 1))) { + log.info("多边形未闭合,添加闭合点"); + coordinates.add(coordinates.get(0)); + } + + Polygon polygon = geometryFactory.createPolygon(coordinates.toArray(new Coordinate[0])); + log.info("创建多边形: {}", polygon); + return polygon; + } catch (Exception e) { + log.error("转换多边形失败: {}", e.getMessage(), e); + return null; + } + } + + private ZonedDateTime parseDateTime(String dateTimeStr) { + if (dateTimeStr == null || dateTimeStr.isEmpty()) { + return null; + } + try { + return ZonedDateTime.parse(dateTimeStr, DATE_TIME_FORMATTER); + } catch (Exception e) { + log.warn("Failed to parse date time: {}", dateTimeStr, e); + return null; + } + } + + private STRtree buildSpatialIndex() { + STRtree index = new STRtree(); + for (AreaInfo area : areas) { + if (area.getBoundary() != null) { + index.insert(area.getBoundary().getEnvelopeInternal(), area); + } + } + index.build(); + return index; + } + + public List getAllAreas() { + return new ArrayList<>(areas); + } + + public Optional getAreaById(String id) { + return areas.stream() + .filter(area -> area.getId().equals(id)) + .findFirst(); + } + + public List getAreasByType(AreaType type) { + return areas.stream() + .filter(area -> area.getType() == type) + .collect(Collectors.toList()); + } + + public List findAreasContainingPoint(GeoPosition position) { + log.info("查询包含点的区域: lat={}, lon={}", position.getLatitude(), position.getLongitude()); + Point point = geometryFactory.createPoint(new Coordinate(position.getLongitude(), position.getLatitude())); + log.info("创建JTS点: {}", point); + @SuppressWarnings("unchecked") + List candidates = spatialIndex.query(point.getEnvelopeInternal()); + log.info("空间索引查询结果数量: {}", candidates.size()); + return candidates.stream() + .filter(area -> { + boolean contains = area.getBoundary() != null && area.getBoundary().contains(point); + log.info("区域 {} ({}): boundary={}, contains={}", + area.getId(), area.getName(), + area.getBoundary() != null ? area.getBoundary().toString() : "null", + contains); + return contains; + }) + .collect(Collectors.toList()); + } + + public Optional findDominantAreaAt(GeoPosition position) { + List containingAreas = findAreasContainingPoint(position); + if (containingAreas.isEmpty()) { + return Optional.empty(); + } + // 按照区域优先级返回最优先的区域 + return containingAreas.stream() + .max((a1, a2) -> a2.getType().ordinal() - a1.getType().ordinal()); + } + + public Double getSpeedLimitKphAt(GeoPosition position) { + return findDominantAreaAt(position) + .map(AreaInfo::getSpeedLimitKph) + .orElse(null); + } + + public boolean isPositionInRestrictedArea(GeoPosition position) { + return findAreasContainingPoint(position).stream() + .anyMatch(AreaInfo::isRestricted); + } + + public boolean isAreaActive(AreaInfo area) { + ZonedDateTime now = ZonedDateTime.now(); + return (area.getActiveTime() == null || !now.isBefore(area.getActiveTime())) && + (area.getExpiryTime() == null || !now.isAfter(area.getExpiryTime())); + } +} \ No newline at end of file diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 61af7b3..cb5d190 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -20,7 +20,6 @@ spring: value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring.json.trusted.packages: "com.airport.common.model" - # Redis配置 redis: host: localhost @@ -35,6 +34,8 @@ spring: min-idle: 0 key-serialization: org.springframework.data.redis.serialization.StringRedisSerializer value-serialization: org.springframework.data.redis.serialization.Jackson2JsonRedisSerializer + main: + allow-bean-definition-overriding: true # 数据采集配置 data: diff --git a/src/test/java/com/dongni/collisionavoidance/areas/service/AirportAreaServiceIntegrationTest.java b/src/test/java/com/dongni/collisionavoidance/areas/service/AirportAreaServiceIntegrationTest.java index fa9e581..15b7711 100644 --- a/src/test/java/com/dongni/collisionavoidance/areas/service/AirportAreaServiceIntegrationTest.java +++ b/src/test/java/com/dongni/collisionavoidance/areas/service/AirportAreaServiceIntegrationTest.java @@ -1,16 +1,22 @@ package com.dongni.collisionavoidance.areas.service; +import com.dongni.collisionavoidance.areas.model.AreaInfo; +import com.dongni.collisionavoidance.areas.model.AreaType; +import com.dongni.collisionavoidance.common.model.GeoPosition; import com.dongni.collisionavoidance.config.AirportAreaConfig; -import com.dongni.collisionavoidance.config.properties.AreaProperties; -import com.dongni.collisionavoidance.area.service.AirportAreaService; +import com.dongni.collisionavoidance.config.TestConfig; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.context.annotation.Import; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.TestPropertySource; import java.util.List; +import java.util.Optional; import static org.assertj.core.api.Assertions.assertThat; @@ -18,9 +24,12 @@ import static org.assertj.core.api.Assertions.assertThat; * 机场区域服务的集成测试类 * 确保区域配置正确加载,并且服务方法按预期工作 */ -@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE) +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE) @ExtendWith(MockitoExtension.class) -@Import(AirportAreaConfig.class) +@Import({AirportAreaConfig.class, TestConfig.class}) +@TestPropertySource(locations = "classpath:config/airport_areas.yaml") +@ActiveProfiles("test") +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) class AirportAreaServiceIntegrationTest { @Autowired @@ -34,12 +43,12 @@ class AirportAreaServiceIntegrationTest { @Test void getAllAreas_shouldReturnAllAreas() { - List areas = airportAreaService.getAllAreas(); + List areas = airportAreaService.getAllAreas(); assertThat(areas) .isNotNull() .isNotEmpty() - .hasSize(2); // 配置文件中有两个区域:跑道区域和停机坪区域 + .hasSize(2); // 配置文件中有两个区域:跑道区域和滑行道区域 System.out.println("获取到 " + areas.size() + " 个区域"); } @@ -47,18 +56,17 @@ class AirportAreaServiceIntegrationTest { @Test void getAreaById_shouldReturnArea_whenIdExists() { String areaId = "1"; // 跑道区域的ID - AreaProperties area = airportAreaService.getAreaById(areaId).orElse(null); + AreaInfo area = airportAreaService.getAreaById(areaId).orElse(null); assertThat(area) .isNotNull() .satisfies(a -> { assertThat(a.getId()).isEqualTo("1"); assertThat(a.getName()).isEqualTo("跑道区域"); - assertThat(a.getType()).isEqualTo("RUNWAY"); - assertThat(a.getSpeedLimit()).isEqualTo(0.0); - assertThat(a.getPurpose()).isEqualTo("用于航空器起降的主要跑道"); - assertThat(a.getRestrictions()) - .containsExactly("禁止停车", "禁止通行"); + assertThat(a.getType()).isEqualTo(AreaType.RUNWAY); + assertThat(a.getSpeedLimitKph()).isEqualTo(0.0); + assertThat(a.getDescription()).isEqualTo("用于航空器起降的主要跑道"); + assertThat(a.isRestricted()).isTrue(); assertThat(a.getAllowedVehicleTypes()) .containsExactly("AIRCRAFT"); assertThat(a.getAllowedAircraftTypes()) @@ -71,39 +79,116 @@ class AirportAreaServiceIntegrationTest { @Test void getAreaById_shouldReturnNull_whenIdDoesNotExist() { String nonExistentId = "non-existent-area-id"; - AreaProperties area = airportAreaService.getAreaById(nonExistentId).orElse(null); + AreaInfo area = airportAreaService.getAreaById(nonExistentId).orElse(null); assertThat(area).isNull(); } @Test void getAreasByType_shouldReturnAreas_whenTypeExists() { - String type = "RUNWAY"; - List areas = airportAreaService.getAreasByType(type); + AreaType type = AreaType.RUNWAY; + List areas = airportAreaService.getAreasByType(type); assertThat(areas) .isNotNull() .isNotEmpty() .hasSize(1) - .allMatch(area -> type.equals(area.getType())); + .allMatch(area -> type == area.getType()); // 验证返回的是跑道区域 - AreaProperties runwayArea = areas.get(0); + AreaInfo runwayArea = areas.get(0); assertThat(runwayArea) .satisfies(area -> { assertThat(area.getName()).isEqualTo("跑道区域"); - assertThat(area.getSpeedLimit()).isEqualTo(0.0); - assertThat(area.getPurpose()).isEqualTo("用于航空器起降的主要跑道"); + assertThat(area.getSpeedLimitKph()).isEqualTo(0.0); + assertThat(area.getDescription()).isEqualTo("用于航空器起降的主要跑道"); }); } @Test void getAreasByType_shouldReturnEmpty_whenTypeDoesNotExist() { - String nonExistentType = "non-existent-type"; - List areas = airportAreaService.getAreasByType(nonExistentType); + AreaType nonExistentType = AreaType.RESTRICTED; + List areas = airportAreaService.getAreasByType(nonExistentType); assertThat(areas) .isNotNull() .isEmpty(); } + + @Test + void findAreasContainingPoint_shouldReturnAllMatchingAreas_whenPointIsIn() { + // 测试点在跑道和滑行道重叠区域 + GeoPosition position = new GeoPosition(39.123500, 116.123600, 0.0); + List areas = airportAreaService.findAreasContainingPoint(position); + + assertThat(areas).isNotEmpty() + .extracting(AreaInfo::getType) + .contains(AreaType.RUNWAY, AreaType.TAXIWAY); + + // 测试点在区域外 + position = new GeoPosition(39.124000, 116.124000, 0.0); + areas = airportAreaService.findAreasContainingPoint(position); + + assertThat(areas).isEmpty(); + } + + @Test + void findDominantAreaAt_shouldReturnHighestPriorityArea_whenPointIsInMultipleAreas() { + // 测试点在跑道和滑行道重叠区域,跑道应该是优先级最高的 + GeoPosition position = new GeoPosition(39.123500, 116.123600, 0.0); + Optional dominantArea = airportAreaService.findDominantAreaAt(position); + + assertThat(dominantArea) + .isPresent() + .map(AreaInfo::getType) + .hasValue(AreaType.RUNWAY); + + // 测试点在区域外 + position = new GeoPosition(39.124000, 116.124000, 0.0); + dominantArea = airportAreaService.findDominantAreaAt(position); + + assertThat(dominantArea).isEmpty(); + } + + @Test + void getSpeedLimitKphAt_shouldReturnSpeedLimit_whenPointIsInArea() { + // 测试点在跑道和滑行道重叠区域,根据优先级应返回跑道的速度限制 + GeoPosition position = new GeoPosition(39.123500, 116.123600, 0.0); + Double speedLimit = airportAreaService.getSpeedLimitKphAt(position); + + assertThat(speedLimit) + .isNotNull() + .isEqualTo(0.0); + + // 测试点在区域外 + position = new GeoPosition(39.124000, 116.124000, 0.0); + speedLimit = airportAreaService.getSpeedLimitKphAt(position); + + assertThat(speedLimit).isNull(); + } + + @Test + void isPositionInRestrictedArea_shouldReturnTrue_whenPointIsInRestrictedArea() { + // 测试点在跑道内 + GeoPosition position = new GeoPosition(39.123600, 116.123600, 0.0); + boolean isRestricted = airportAreaService.isPositionInRestrictedArea(position); + + assertThat(isRestricted).isTrue(); + + // 测试点在滑行道内 + position = new GeoPosition(39.123500, 116.123600, 0.0); + isRestricted = airportAreaService.isPositionInRestrictedArea(position); + + assertThat(isRestricted).isTrue(); + } + + @Test + void isAreaActive_shouldReturnTrue_whenAreaIsActive() { + Optional area = airportAreaService.getAreaById("1"); + + assertThat(area) + .isPresent() + .hasValueSatisfying(a -> + assertThat(airportAreaService.isAreaActive(a)).isTrue()); + } } \ No newline at end of file diff --git a/src/test/java/com/dongni/collisionavoidance/config/TestConfig.java b/src/test/java/com/dongni/collisionavoidance/config/TestConfig.java new file mode 100644 index 0000000..e66da6a --- /dev/null +++ b/src/test/java/com/dongni/collisionavoidance/config/TestConfig.java @@ -0,0 +1,151 @@ +package com.dongni.collisionavoidance.config; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.context.annotation.Profile; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; + +import com.dongni.collisionavoidance.common.model.MovingObject; +import com.dongni.collisionavoidance.common.model.MovingObjectType; +import com.dongni.collisionavoidance.common.model.repository.MovingObjectRepository; +import com.dongni.collisionavoidance.dataCollector.service.AuthService; +import com.dongni.collisionavoidance.dataCollector.service.DataCollectorService; +import com.dongni.collisionavoidance.dataProcessing.service.CoordinateSystemService; +import com.dongni.collisionavoidance.dataProcessing.service.DataProcessor; +import com.dongni.collisionavoidance.dataProcessing.service.SpeedCalculationService; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executor; +import org.springframework.web.client.RestTemplate; + +/** + * 测试专用配置类,用于禁用后台线程和外部连接 + */ +@Configuration +@Profile("test") +public class TestConfig { + + /** + * 提供一个测试专用的线程池,会立即关闭而不会等待任务执行完成 + */ + @Bean(name = "processingExecutor") + @Primary + public Executor testProcessingExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(1); + executor.setMaxPoolSize(1); + executor.setQueueCapacity(1); + executor.setWaitForTasksToCompleteOnShutdown(false); + executor.setAwaitTerminationSeconds(1); + executor.setThreadNamePrefix("test-proc-"); + return executor; + } + + /** + * 提供一个用于调度的线程池,会立即关闭而不会等待任务执行完成 + */ + @Bean + @Primary + public ThreadPoolTaskScheduler testTaskScheduler() { + ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(1); + scheduler.setThreadNamePrefix("test-sched-"); + scheduler.setWaitForTasksToCompleteOnShutdown(false); + scheduler.setAwaitTerminationSeconds(1); + return scheduler; + } + + /** + * 提供一个不执行任何操作的数据处理器 + */ + @Bean + @Primary + public DataProcessor noOpDataProcessor(MovingObjectRepository movingObjectRepository, + CoordinateSystemService coordinateSystemService, + SpeedCalculationService speedCalculationService, + Executor processingExecutor) { + return new TestDataProcessor(movingObjectRepository, coordinateSystemService, + speedCalculationService, processingExecutor); + } + + /** + * 提供一个不进行HTTP请求的测试用认证服务 + */ + @Bean + @Primary + public AuthService testAuthService(RestTemplate restTemplate) { + return new TestAuthService(restTemplate); + } + + /** + * 提供一个不调用外部API的数据采集服务 + */ + @Bean + @Primary + public DataCollectorService testDataCollectorService() { + return new TestDataCollectorService(); + } + + /** + * 测试用数据处理器实现,不会启动实际的处理线程 + */ + static class TestDataProcessor extends DataProcessor { + public TestDataProcessor(MovingObjectRepository movingObjectRepository, + CoordinateSystemService coordinateSystemService, + SpeedCalculationService speedCalculationService, + Executor processingExecutor) { + super(); + // 注入依赖,但不启动后台线程 + } + + @Override + public void init() { + // 不启动处理线程 + } + } + + /** + * 测试用认证服务实现,返回固定的测试令牌 + */ + static class TestAuthService extends AuthService { + public TestAuthService(RestTemplate restTemplate) { + super(restTemplate); + } + + @Override + public String loginAndGetToken() { + return "test-token"; + } + + @Override + public String refreshToken() { + return "test-token"; + } + + @Override + public String getToken() { + return "test-token"; + } + } + + /** + * 测试用数据采集服务,不执行实际的数据采集 + */ + static class TestDataCollectorService extends DataCollectorService { + // 覆盖所有定时任务方法,不执行实际操作 + @Override + public void collectAircraftData() { + // 测试环境下不执行实际数据采集 + } + + @Override + public void collectVehicleData() { + // 测试环境下不执行实际数据采集 + } + } +} \ No newline at end of file diff --git a/src/test/java/com/dongni/collisionavoidance/roads/service/RoadNetworkServiceIntegrationTest.java b/src/test/java/com/dongni/collisionavoidance/roads/service/RoadNetworkServiceIntegrationTest.java index f6ea27e..500a600 100644 --- a/src/test/java/com/dongni/collisionavoidance/roads/service/RoadNetworkServiceIntegrationTest.java +++ b/src/test/java/com/dongni/collisionavoidance/roads/service/RoadNetworkServiceIntegrationTest.java @@ -4,13 +4,16 @@ import com.dongni.collisionavoidance.common.model.GeoPosition; import com.dongni.collisionavoidance.dataCollector.service.DataCollectorService; import com.dongni.collisionavoidance.dataProcessing.service.DataProcessor; import com.dongni.collisionavoidance.roads.model.RoadInfo; +import com.dongni.collisionavoidance.config.TestConfig; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.test.context.ActiveProfiles; // Optional: if you need specific test profile +import org.springframework.context.annotation.Import; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ActiveProfiles; import java.util.List; import java.util.Optional; @@ -26,12 +29,17 @@ import static org.junit.jupiter.api.Assertions.*; */ @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE) // Load context without web server @ExtendWith(MockitoExtension.class) -// @ActiveProfiles("test") // Activate a specific test profile if needed (e.g., for application-test.yml) +@ActiveProfiles("test") // 激活测试配置文件 +@Import(TestConfig.class) // 导入测试配置 +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) // 确保测试后销毁上下文 class RoadNetworkServiceIntegrationTest { @Autowired private RoadNetworkService roadNetworkService; + // 这些Mock对象在Spring上下文中不会生效,只会在测试类内部生效 + // 由于我们现在使用TestConfig,这些Mock对象实际上是不必要的 + // 但我们暂时保留它们以避免大量代码变更 @Mock private DataCollectorService dataCollectorService; diff --git a/src/test/resources/application-test.yml b/src/test/resources/application-test.yml new file mode 100644 index 0000000..4358de6 --- /dev/null +++ b/src/test/resources/application-test.yml @@ -0,0 +1,51 @@ +spring: + # 禁用MongoDB自动配置 + autoconfigure: + exclude: + - org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration + - org.springframework.boot.autoconfigure.data.mongo.MongoDataAutoConfiguration + + # 禁用数据服务 + data: + mongodb: + auto-index-creation: false + redis: + repositories: + enabled: false + + # 禁用Kafka + kafka: + bootstrap-servers: + producer: + bootstrap-servers: + consumer: + bootstrap-servers: + auto-startup: false + + # 禁用调度和异步任务 + task: + scheduling: + enabled: false + execution: + enabled: false + +# 测试模式标记 +test-mode: true + +# 数据采集器配置 +data: + collector: + disabled: true + airport-api: + base-url: http://localhost:8090 + auth: + username: test + password: test + data-refresh-interval-ms: 0 + +# 日志配置 +logging: + level: + root: INFO + com.dongni.collisionavoidance: DEBUG + org.locationtech.jts: INFO \ No newline at end of file diff --git a/src/test/resources/config/airport_areas.yaml b/src/test/resources/config/airport_areas.yaml index 2e24330..b370903 100644 --- a/src/test/resources/config/airport_areas.yaml +++ b/src/test/resources/config/airport_areas.yaml @@ -19,33 +19,33 @@ airport: geometry: type: "Polygon" coordinates: [ - [1.0, 1.0], - [1.0, 2.0], - [2.0, 2.0] + [116.123456, 39.123456], + [116.123789, 39.123456], + [116.123789, 39.123789], + [116.123456, 39.123789], + [116.123456, 39.123456] ] - id: "2" - name: "停机坪区域" - type: "APRON" + name: "滑行道区域" + type: "TAXIWAY" speedLimit: 30 # 单位:km/h - purpose: "用于航空器停放和地面服务" - restrictions: - - "限速30公里/小时" + purpose: "连接跑道和停机坪" + restrictions: [] allowedVehicleTypes: - - "AIRCRAFT" - - "TUG" + - "FOLLOW_ME" + - "TOW_TRUCK" - "FUEL_TRUCK" - - "BAGGAGE_CART" allowedAircraftTypes: - - "A320" - - "B737" - - "A330" - maxHeight: 15.0 # 单位:米 - maxWeight: 200.0 # 单位:吨 + - "AIRCRAFT" + maxHeight: 50.0 # 单位:米 + maxWeight: 500.0 # 单位:吨 geometry: type: "Polygon" coordinates: [ - [2.0, 2.0], - [2.0, 3.0], - [3.0, 3.0] + [116.123456, 39.123456], + [116.123789, 39.123456], + [116.123789, 39.123567], + [116.123456, 39.123567], + [116.123456, 39.123456] ] \ No newline at end of file From 3a551f69b98cef8dc5afbf431373f135dd6ec4ff Mon Sep 17 00:00:00 2001 From: Tian jianyong <11429339@qq.com> Date: Wed, 30 Apr 2025 12:16:06 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E7=B3=BB=E7=BB=9F=E5=AD=98=E5=9C=A8?= =?UTF-8?q?=E4=BB=A5=E4=B8=8B=E5=85=B3=E9=94=AE=E8=AE=BE=E8=AE=A1=E9=97=AE?= =?UTF-8?q?=E9=A2=98=E5=AF=BC=E8=87=B4=E6=B5=8B=E8=AF=95=E8=B6=85=E6=97=B6?= =?UTF-8?q?=EF=BC=9A=20=E6=97=A0=E9=99=90=E9=98=BB=E5=A1=9E=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E8=AE=BE=E8=AE=A1=EF=BC=9A=20=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=A4=84=E7=90=86=E5=99=A8=E4=BD=BF=E7=94=A8=20BlockingQueue.t?= =?UTF-8?q?ake()=20=E6=96=B9=E6=B3=95=E6=97=A0=E9=99=90=E6=9C=9F=E9=98=BB?= =?UTF-8?q?=E5=A1=9E=E7=BA=BF=E7=A8=8B=20=E7=BC=BA=E5=B0=91=E8=B6=85?= =?UTF-8?q?=E6=97=B6=E6=9C=BA=E5=88=B6=E5=AF=BC=E8=87=B4=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E6=B0=B8=E8=BF=9C=E4=B8=8D=E4=BC=9A=E9=80=80=E5=87=BA=20?= =?UTF-8?q?=E7=94=9F=E5=91=BD=E5=91=A8=E6=9C=9F=E7=AE=A1=E7=90=86=E4=B8=8D?= =?UTF-8?q?=E5=BD=93=EF=BC=9A=20=E4=BD=BF=E7=94=A8=20@PostConstruct=20?= =?UTF-8?q?=E5=90=AF=E5=8A=A8=E7=BA=BF=E7=A8=8B=E4=BD=86=E6=B2=A1=E6=9C=89?= =?UTF-8?q?=E7=9B=B8=E5=BA=94=E7=9A=84=20@PreDestroy=20=E6=B8=85=E7=90=86?= =?UTF-8?q?=E6=9C=BA=E5=88=B6=20=E7=BC=BA=E5=B0=91=E5=BA=94=E7=94=A8?= =?UTF-8?q?=E7=A8=8B=E5=BA=8F=E5=85=B3=E9=97=AD=E9=92=A9=E5=AD=90=EF=BC=8C?= =?UTF-8?q?=E5=AF=BC=E8=87=B4=E8=B5=84=E6=BA=90=E6=97=A0=E6=B3=95=E6=AD=A3?= =?UTF-8?q?=E7=A1=AE=E9=87=8A=E6=94=BE=20=E7=BA=BF=E7=A8=8B=E6=B1=A0?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E4=B8=8D=E5=90=88=E7=90=86=EF=BC=9A=20?= =?UTF-8?q?=E7=BA=BF=E7=A8=8B=E6=B1=A0=E9=85=8D=E7=BD=AE=E8=BF=87=E5=A4=A7?= =?UTF-8?q?=EF=BC=88=E6=A0=B8=E5=BF=8310=E7=BA=BF=E7=A8=8B=EF=BC=8C?= =?UTF-8?q?=E6=9C=80=E5=A4=A7100=E7=BA=BF=E7=A8=8B=EF=BC=89=20=E6=B2=A1?= =?UTF-8?q?=E6=9C=89=E8=AE=BE=E7=BD=AE=E4=BC=98=E9=9B=85=E5=85=B3=E9=97=AD?= =?UTF-8?q?=E5=8F=82=E6=95=B0=EF=BC=8C=E5=AF=BC=E8=87=B4=E5=BA=94=E7=94=A8?= =?UTF-8?q?=E5=85=B3=E9=97=AD=E6=97=B6=E7=BA=BF=E7=A8=8B=E4=B8=8D=E4=BC=9A?= =?UTF-8?q?=E7=BB=88=E6=AD=A2=20=E6=B5=8B=E8=AF=95=E9=9A=94=E7=A6=BB?= =?UTF-8?q?=E4=B8=8D=E5=85=85=E5=88=86=EF=BC=9A=20=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E8=BF=90=E8=A1=8C=E6=97=B6=E4=BB=8D=E5=B0=9D=E8=AF=95=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5=E5=A4=96=E9=83=A8=E8=B5=84=E6=BA=90=EF=BC=88=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=BA=93=E3=80=81Kafka=E3=80=81=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E9=87=87=E9=9B=86API=EF=BC=89=20=E6=B5=8B=E8=AF=95=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=E6=9C=AA=E5=AE=8C=E5=85=A8=E7=A6=81=E7=94=A8=E4=B8=8D?= =?UTF-8?q?=E5=BF=85=E8=A6=81=E7=9A=84=E6=9C=8D=E5=8A=A1=20=E8=A7=A3?= =?UTF-8?q?=E5=86=B3=E6=96=B9=E6=A1=88=20=E6=88=91=E4=BB=AC=E5=AE=9E?= =?UTF-8?q?=E6=96=BD=E4=BA=86=E4=BB=A5=E4=B8=8B=E6=94=B9=E8=BF=9B=E6=8E=AA?= =?UTF-8?q?=E6=96=BD=EF=BC=9A=20=E4=BC=98=E5=8C=96=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E6=B1=A0=E9=85=8D=E7=BD=AE=EF=BC=9A=20=E5=87=8F=E5=B0=8F?= =?UTF-8?q?=E7=BA=BF=E7=A8=8B=E6=B1=A0=E5=A4=A7=E5=B0=8F=EF=BC=88=E6=A0=B8?= =?UTF-8?q?=E5=BF=835=E7=BA=BF=E7=A8=8B=EF=BC=8C=E6=9C=80=E5=A4=A710?= =?UTF-8?q?=E7=BA=BF=E7=A8=8B=EF=BC=89=20=E6=B7=BB=E5=8A=A0=20setWaitForTa?= =?UTF-8?q?sksToCompleteOnShutdown(true)=20=E5=92=8C=20setAwaitTermination?= =?UTF-8?q?Seconds(5)=20=E9=85=8D=E7=BD=AE=20=E4=BC=98=E5=8C=96=E9=98=9F?= =?UTF-8?q?=E5=88=97=E5=AE=B9=E9=87=8F=EF=BC=8C=E5=87=8F=E5=B0=91=E5=86=85?= =?UTF-8?q?=E5=AD=98=E5=8D=A0=E7=94=A8=20=E6=B7=BB=E5=8A=A0=E4=BC=98?= =?UTF-8?q?=E9=9B=85=E5=85=B3=E9=97=AD=E6=9C=BA=E5=88=B6=EF=BC=9A=20?= =?UTF-8?q?=E5=9C=A8=E4=B8=BB=E7=B1=BB=E6=B7=BB=E5=8A=A0=20JVM=20=E5=85=B3?= =?UTF-8?q?=E9=97=AD=E9=92=A9=E5=AD=90=EF=BC=8C=E7=A1=AE=E4=BF=9D=E8=B5=84?= =?UTF-8?q?=E6=BA=90=E6=AD=A3=E7=A1=AE=E9=87=8A=E6=94=BE=20=E4=B8=BA?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E7=BB=84=E4=BB=B6=E6=B7=BB=E5=8A=A0=20@PreDe?= =?UTF-8?q?stroy=20=E6=96=B9=E6=B3=95=EF=BC=8C=E5=AE=9E=E7=8E=B0=E8=87=AA?= =?UTF-8?q?=E5=AE=9A=E4=B9=89=E5=85=B3=E9=97=AD=E9=80=BB=E8=BE=91=20?= =?UTF-8?q?=E5=BC=95=E5=85=A5=E7=8A=B6=E6=80=81=E6=A0=87=E5=BF=97=EF=BC=88?= =?UTF-8?q?AtomicBoolean=20running=EF=BC=89=E6=8E=A7=E5=88=B6=E5=90=8E?= =?UTF-8?q?=E5=8F=B0=E7=BA=BF=E7=A8=8B=E5=BE=AA=E7=8E=AF=20=E9=98=B2?= =?UTF-8?q?=E6=AD=A2=E6=97=A0=E9=99=90=E9=98=BB=E5=A1=9E=EF=BC=9A=20?= =?UTF-8?q?=E4=BF=AE=E6=94=B9=20MovingObjectRepository=EF=BC=8C=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E9=9D=9E=E9=98=BB=E5=A1=9E=E7=9A=84=20pollUpdate()=20?= =?UTF-8?q?=E6=96=B9=E6=B3=95=E6=9B=BF=E4=BB=A3=20takeUpdate()=20=E5=9C=A8?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=A4=84=E7=90=86=E5=BE=AA=E7=8E=AF=E4=B8=AD?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E8=B6=85=E6=97=B6=E6=A3=80=E6=9F=A5=EF=BC=8C?= =?UTF-8?q?=E9=81=BF=E5=85=8D=E6=97=A0=E9=99=90=E7=AD=89=E5=BE=85=20?= =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=BC=82=E5=B8=B8=E5=A4=84=E7=90=86=EF=BC=8C?= =?UTF-8?q?=E9=98=B2=E6=AD=A2=E7=BA=BF=E7=A8=8B=E5=B4=A9=E6=BA=83=E6=88=96?= =?UTF-8?q?=20CPU=20=E4=BD=BF=E7=94=A8=E7=8E=87=E9=A3=99=E5=8D=87=20?= =?UTF-8?q?=E5=AE=8C=E5=96=84=E6=B5=8B=E8=AF=95=E7=8E=AF=E5=A2=83=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=EF=BC=9A=20=E5=9C=A8=E6=B5=8B=E8=AF=95=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=E4=B8=AD=E5=AE=8C=E5=85=A8=E7=A6=81=E7=94=A8=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E9=87=87=E9=9B=86=E5=92=8C=E5=A4=84=E7=90=86=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=20=E9=85=8D=E7=BD=AE=20data.collector.disabled=3Dtrue?= =?UTF-8?q?=20=E5=92=8C=20data.processor.enabled=3Dfalse=20=E4=BD=BF?= =?UTF-8?q?=E7=94=A8=20@ActiveProfiles("test")=20=E7=A1=AE=E4=BF=9D?= =?UTF-8?q?=E6=B5=8B=E8=AF=95=E4=BD=BF=E7=94=A8=E6=AD=A3=E7=A1=AE=E7=9A=84?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E6=96=87=E4=BB=B6=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E9=94=99=E8=AF=AF=E5=A4=84=E7=90=86=E5=92=8C=E6=97=A5=E5=BF=97?= =?UTF-8?q?=EF=BC=9A=20=E5=8C=85=E8=A3=85=E6=89=80=E6=9C=89=E5=85=B3?= =?UTF-8?q?=E9=94=AE=E6=93=8D=E4=BD=9C=E5=9C=A8=20try-catch=20=E5=9D=97?= =?UTF-8?q?=E4=B8=AD=EF=BC=8C=E9=98=B2=E6=AD=A2=E9=94=99=E8=AF=AF=E4=BC=A0?= =?UTF-8?q?=E6=92=AD=20=E6=B7=BB=E5=8A=A0=E8=AF=A6=E7=BB=86=E6=97=A5?= =?UTF-8?q?=E5=BF=97=EF=BC=8C=E4=BE=BF=E4=BA=8E=E8=AF=8A=E6=96=AD=E9=97=AE?= =?UTF-8?q?=E9=A2=98=20=E5=AE=9E=E7=8E=B0=E9=94=99=E8=AF=AF=E6=81=A2?= =?UTF-8?q?=E5=A4=8D=E6=9C=BA=E5=88=B6=EF=BC=8C=E7=A1=AE=E4=BF=9D=E7=B3=BB?= =?UTF-8?q?=E7=BB=9F=E7=A8=B3=E5=AE=9A=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/design/architecture_optimization.md | 410 ++++++++++++++++++ .../CollisionAvoidanceApplication.java | 19 +- .../repository/MovingObjectRepository.java | 8 +- .../config/ThreadPoolConfig.java | 8 +- .../service/DataCollectorService.java | 234 ++++------ .../dataProcessing/service/DataProcessor.java | 106 +++-- src/test/resources/application-test.yml | 9 +- 7 files changed, 610 insertions(+), 184 deletions(-) create mode 100644 doc/design/architecture_optimization.md diff --git a/doc/design/architecture_optimization.md b/doc/design/architecture_optimization.md new file mode 100644 index 0000000..31595c3 --- /dev/null +++ b/doc/design/architecture_optimization.md @@ -0,0 +1,410 @@ +# 碰撞避免系统架构优化设计 + +本文档提供了对碰撞避免系统架构的全面分析和优化建议,重点关注系统的线程管理、资源利用、生命周期控制和测试实践等方面。 + +## 1. 问题分析 + +通过代码审查和测试分析,我们发现系统存在以下架构设计问题: + +### 1.1 线程管理问题 + +- **无限阻塞线程设计**: + - 数据处理器使用 `BlockingQueue.take()` 方法无限期阻塞线程 + - 许多后台线程没有超时机制,导致资源无法及时释放 + - 应用关闭时,阻塞线程不会收到通知,导致进程无法正常退出 + +- **生命周期管理不当**: + - 使用 `@PostConstruct` 启动线程但没有相应的 `@PreDestroy` 清理机制 + - 缺少应用程序关闭钩子,导致资源无法正确释放 + - 服务间依赖关系复杂,缺乏明确的启动和关闭顺序 + +- **线程池配置不合理**: + - 线程池配置过大(核心10线程,最大100线程) + - 队列容量设置较大(100),可能导致内存压力 + - 没有设置优雅关闭参数,导致应用关闭时线程不会终止 + +### 1.2 资源管理问题 + +- **外部连接管理不当**: + - 数据采集器与外部API的连接没有合理的重试和超时策略 + - HTTP客户端(RestTemplate)配置不完善,缺少超时设置 + - 数据库连接(MongoDB、Redis)缺少连接池管理 + +- **内存管理不佳**: + - 历史数据保留机制不完善,可能导致内存溢出 + - 使用大量的ConcurrentHashMap但缺少大小限制 + - 没有有效的垃圾回收策略 + +### 1.3 测试设计问题 + +- **测试隔离不充分**: + - 测试运行时仍尝试连接外部资源(数据库、Kafka、数据采集API) + - 测试配置未完全禁用不必要的服务 + - 缺少专门的测试数据生成机制 + +- **单元测试与集成测试混合**: + - 测试边界不清晰,不符合单一职责原则 + - 集成测试依赖外部资源,导致测试不稳定 + - 测试代码重复,缺少通用测试基类 + +## 2. 优化方案 + +### 2.1 线程管理优化 + +#### 2.1.1 线程池配置优化 + +```java +@Configuration +public class ThreadPoolConfig { + @Bean(name = "processingExecutor") + public Executor processingExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + // 减小核心线程数和最大线程数 + executor.setCorePoolSize(5); + executor.setMaxPoolSize(10); + // 减小队列容量,避免过多任务堆积 + executor.setQueueCapacity(25); + executor.setThreadNamePrefix("data-process-"); + // 添加优雅关闭配置 + executor.setWaitForTasksToCompleteOnShutdown(true); + executor.setAwaitTerminationSeconds(5); + executor.initialize(); + return executor; + } + + // 添加专用于数据采集的线程池 + @Bean(name = "collectorExecutor") + public Executor collectorExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(3); + executor.setMaxPoolSize(5); + executor.setQueueCapacity(10); + executor.setThreadNamePrefix("data-collect-"); + executor.setWaitForTasksToCompleteOnShutdown(true); + executor.setAwaitTerminationSeconds(5); + executor.initialize(); + return executor; + } +} +``` + +#### 2.1.2 应用生命周期管理 + +```java +@Slf4j +@SpringBootApplication +@EnableScheduling +@EnableMongoRepositories +@EnableConfigurationProperties +public class CollisionAvoidanceApplication { + public static void main(String[] args) { + ConfigurableApplicationContext context = SpringApplication.run(CollisionAvoidanceApplication.class, args); + + // 添加关闭钩子,确保资源正确释放 + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + log.info("应用程序关闭中,正在清理资源..."); + try { + if (context != null && context.isActive()) { + context.close(); + } + } catch (Exception e) { + log.error("关闭应用程序时出错", e); + } + log.info("应用程序已安全关闭"); + })); + } +} +``` + +#### 2.1.3 服务组件生命周期管理 + +所有后台服务都应实现 `@PreDestroy` 方法: + +```java +@Component +public class SomeBackgroundService { + + private final AtomicBoolean running = new AtomicBoolean(false); + + @PostConstruct + public void init() { + running.set(true); + // 启动逻辑 + } + + @PreDestroy + public void shutdown() { + log.info("关闭服务..."); + running.set(false); + // 等待线程结束、释放资源等 + log.info("服务已关闭"); + } +} +``` + +### 2.2 阻塞操作优化 + +#### 2.2.1 使用非阻塞方法替代无限阻塞 + +```java +// 修改前:无限期阻塞 +Map> delta = movingObjectRepository.takeUpdate(); + +// 修改后:有超时的阻塞 +Map> delta = movingObjectRepository.pollUpdate(500, TimeUnit.MILLISECONDS); +if (delta == null || delta.isEmpty()) { + // 处理超时情况 + continue; +} +``` + +#### 2.2.2 添加中断和状态检查 + +```java +private void processLoop() { + try { + while (running.get() && !Thread.currentThread().isInterrupted()) { + try { + // 处理逻辑 + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("线程被中断"); + break; + } catch (Exception e) { + log.error("处理异常", e); + // 添加延迟,避免CPU占用过高 + Thread.sleep(1000); + } + } + } catch (Exception e) { + log.error("处理循环异常", e); + } + log.info("退出处理循环"); +} +``` + +### 2.3 资源管理优化 + +#### 2.3.1 外部连接优化 + +对于RestTemplate: + +```java +@Configuration +public class RestTemplateConfig { + @Bean + public RestTemplate restTemplate() { + RestTemplate template = new RestTemplate(); + + // 设置连接超时和读取超时 + HttpComponentsClientHttpRequestFactory factory = new HttpComponentsClientHttpRequestFactory(); + factory.setConnectTimeout(5000); + factory.setReadTimeout(5000); + factory.setConnectionRequestTimeout(5000); + + template.setRequestFactory(factory); + + // 添加重试机制 + template.setErrorHandler(new CustomResponseErrorHandler()); + + return template; + } +} +``` + +#### 2.3.2 内存管理优化 + +```java +// 添加缓存容量限制 +@Bean +public CacheManager cacheManager() { + SimpleCacheManager cacheManager = new SimpleCacheManager(); + + // 配置缓存 + ConcurrentMapCache movingObjectsCache = new ConcurrentMapCache( + "movingObjects", + CacheBuilder.newBuilder() + .maximumSize(1000) // 最大条目数 + .expireAfterWrite(5, TimeUnit.MINUTES) // 写入后过期时间 + .build().asMap(), + false); + + cacheManager.setCaches(Arrays.asList(movingObjectsCache)); + return cacheManager; +} +``` + +### 2.4 测试设计优化 + +#### 2.4.1 测试配置优化 + +```yaml +# application-test.yml +spring: + # 禁用各种自动配置 + autoconfigure: + exclude: + - org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration + - org.springframework.boot.autoconfigure.data.mongo.MongoDataAutoConfiguration + + # 禁用调度任务 + task: + scheduling: + enabled: false + execution: + enabled: false + +data: + collector: + disabled: true # 禁用数据采集器 + processor: + enabled: false # 禁用数据处理器 +``` + +#### 2.4.2 测试基类设计 + +```java +@SpringBootTest +@ActiveProfiles("test") +@Import(TestConfig.class) +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) +public abstract class BaseIntegrationTest { + // 通用测试设置和工具方法 + + @BeforeEach + public void setup() { + // 测试初始化逻辑 + } + + @AfterEach + public void cleanup() { + // 测试清理逻辑 + } +} +``` + +## 3. 新的架构设计原则 + +### 3.1 关注点分离 + +- 数据采集、数据处理、碰撞检测等功能应严格分离 +- 每个组件专注于单一职责,通过明确的接口进行通信 +- 使用事件驱动模型代替直接依赖,降低组件间耦合 + +### 3.2 弹性设计 + +- 所有外部服务调用都应有超时、重试和熔断机制 +- 系统应能在任何组件故障时保持核心功能运行 +- 提供降级策略和应急模式,确保系统可用性 + +### 3.3 可观测性 + +- 添加全面的日志、指标和追踪功能 +- 实现自定义健康检查,监控系统关键组件 +- 设计故障注入机制,测试系统弹性 + +### 3.4 资源效率 + +- 合理配置线程池和连接池,避免资源浪费 +- 实现资源限制和保护策略,防止过载 +- 采用分级缓存策略,优化内存使用 + +## 4. 建议采用的架构模式 + +### 4.1 事件驱动架构 + +```java +// 事件定义 +public class MovingObjectUpdatedEvent { + private final MovingObjectType type; + private final Set objectIds; + + // 构造器、getter等 +} + +// 发布事件 +@Service +public class DataCollectorService { + private final ApplicationEventPublisher eventPublisher; + + @Autowired + public DataCollectorService(ApplicationEventPublisher eventPublisher) { + this.eventPublisher = eventPublisher; + } + + @Scheduled(fixedRate = 5000) + public void collectData() { + // 采集数据 + + // 发布事件 + eventPublisher.publishEvent(new MovingObjectUpdatedEvent(type, updatedIds)); + } +} + +// 监听事件 +@Service +public class DataProcessor { + @EventListener + public void handleMovingObjectUpdates(MovingObjectUpdatedEvent event) { + // 处理更新通知 + } +} +``` + +### 4.2 响应式编程模型 + +使用 Spring WebFlux 和 Project Reactor: + +```java +@Service +public class ReactiveDataService { + private final MovingObjectRepository repository; + + // 提供响应式API + public Flux getMovingObjects(MovingObjectType type) { + return Flux.fromIterable(repository.getByType(type).values()); + } + + // 响应式处理逻辑 + public Mono processData(Flux objects) { + return objects + .filter(this::isValid) + .flatMap(this::transform) + .reduce(new ProcessingResult(), this::accumulate); + } +} +``` + +### 4.3 健康检查和监控机制 + +```java +@Component +public class DataCollectorHealthIndicator implements HealthIndicator { + private final DataCollectorService dataCollectorService; + + @Autowired + public DataCollectorHealthIndicator(DataCollectorService dataCollectorService) { + this.dataCollectorService = dataCollectorService; + } + + @Override + public Health health() { + if (!dataCollectorService.isConnected()) { + return Health.down() + .withDetail("reason", "无法连接到数据源") + .build(); + } + + return Health.up() + .withDetail("lastUpdateTime", dataCollectorService.getLastUpdateTime()) + .withDetail("collectedItems", dataCollectorService.getCollectedItemsCount()) + .build(); + } +} +``` + +## 5. 结论 + +碰撞避免系统需要从根本上改进线程管理、资源使用和测试实践。我们推荐采用更现代的架构设计原则和模式,特别是事件驱动架构和响应式编程模型,以提高系统的可靠性、可伸缩性和可维护性。 + +通过实施这些建议,系统将能够更高效地处理并发任务,更可靠地管理资源,并提供更好的测试覆盖,从而确保在各种条件下的稳定运行。 \ No newline at end of file diff --git a/src/main/java/com/dongni/collisionavoidance/CollisionAvoidanceApplication.java b/src/main/java/com/dongni/collisionavoidance/CollisionAvoidanceApplication.java index 53eb588..399a17c 100644 --- a/src/main/java/com/dongni/collisionavoidance/CollisionAvoidanceApplication.java +++ b/src/main/java/com/dongni/collisionavoidance/CollisionAvoidanceApplication.java @@ -1,11 +1,14 @@ package com.dongni.collisionavoidance; +import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.data.mongodb.repository.config.EnableMongoRepositories; +@Slf4j @EnableScheduling @SpringBootApplication @EnableMongoRepositories @@ -13,6 +16,20 @@ import org.springframework.data.mongodb.repository.config.EnableMongoRepositorie public class CollisionAvoidanceApplication { public static void main(String[] args) { - SpringApplication.run(CollisionAvoidanceApplication.class, args); + ConfigurableApplicationContext context = SpringApplication.run(CollisionAvoidanceApplication.class, args); + + // 添加关闭钩子,确保所有资源能够正确释放 + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + log.info("应用程序关闭中,正在清理资源..."); + try { + // 关闭Spring上下文 + if (context != null && context.isActive()) { + context.close(); + } + } catch (Exception e) { + log.error("关闭应用程序时出错", e); + } + log.info("应用程序已安全关闭"); + })); } } diff --git a/src/main/java/com/dongni/collisionavoidance/common/model/repository/MovingObjectRepository.java b/src/main/java/com/dongni/collisionavoidance/common/model/repository/MovingObjectRepository.java index 3e5f262..975f4fa 100644 --- a/src/main/java/com/dongni/collisionavoidance/common/model/repository/MovingObjectRepository.java +++ b/src/main/java/com/dongni/collisionavoidance/common/model/repository/MovingObjectRepository.java @@ -8,6 +8,7 @@ import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; @Component public class MovingObjectRepository { @@ -69,8 +70,11 @@ public class MovingObjectRepository { public Map> takeUpdate() throws InterruptedException { return updateQueue.take(); } - - // 添加在 MovingObjectRepository 类中 + + // 非阻塞获取更新通知,带超时 + public Map> pollUpdate(long timeout, TimeUnit unit) throws InterruptedException { + return updateQueue.poll(timeout, unit); + } // 批量更新整个类型集合 public void updateTypeAll(MovingObjectType type, Map newObjects) { diff --git a/src/main/java/com/dongni/collisionavoidance/config/ThreadPoolConfig.java b/src/main/java/com/dongni/collisionavoidance/config/ThreadPoolConfig.java index 2f51d68..ba2c7f6 100644 --- a/src/main/java/com/dongni/collisionavoidance/config/ThreadPoolConfig.java +++ b/src/main/java/com/dongni/collisionavoidance/config/ThreadPoolConfig.java @@ -12,10 +12,12 @@ public class ThreadPoolConfig { @Bean(name = "processingExecutor") public Executor processingExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(10); - executor.setMaxPoolSize(100); - executor.setQueueCapacity(100); + executor.setCorePoolSize(5); + executor.setMaxPoolSize(10); + executor.setQueueCapacity(25); executor.setThreadNamePrefix("data-process-"); + executor.setWaitForTasksToCompleteOnShutdown(true); + executor.setAwaitTerminationSeconds(5); executor.initialize(); return executor; } diff --git a/src/main/java/com/dongni/collisionavoidance/dataCollector/service/DataCollectorService.java b/src/main/java/com/dongni/collisionavoidance/dataCollector/service/DataCollectorService.java index 6fa855f..3aea147 100644 --- a/src/main/java/com/dongni/collisionavoidance/dataCollector/service/DataCollectorService.java +++ b/src/main/java/com/dongni/collisionavoidance/dataCollector/service/DataCollectorService.java @@ -6,6 +6,7 @@ import com.dongni.collisionavoidance.common.model.dto.SpecialVehicleDTO; import com.dongni.collisionavoidance.common.model.repository.MovingObjectRepository; import com.dongni.collisionavoidance.dataCollector.dao.DataCollectorDao; import com.dongni.collisionavoidance.dataCollector.model.VehicleLocationInfo; +import jakarta.annotation.PreDestroy; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -34,6 +35,9 @@ public class DataCollectorService { @Value("${data.collector.airport-api.base-url}") private String airportBaseUrl; + + @Value("${data.collector.disabled:false}") + private boolean collectorDisabled; // 线程安全队列(用于暂存原始数据) @Getter @@ -47,152 +51,106 @@ public class DataCollectorService { @Scheduled(fixedRateString = "${data.collector.interval}") -// @Async // 异步执行 public void collectAircraftData() { - List newAircrafts = dataCollectorDao.collectAircraftData(airportAircraftEndpoint, airportBaseUrl); - - for (Aircraft newAircraft : newAircrafts) { - String flightNo = newAircraft.getFlightNo(); - Map snapshot = movingObjectRepository.getTypeMapDirect(MovingObjectType.AIRCRAFT); - Aircraft existingAircraft = (Aircraft) snapshot.get(flightNo); - // 获取已存在的航空器(如果存在) - if (existingAircraft != null) { - // 更新现有航空器的状态 - MovementState currentState = new MovementState( - existingAircraft.getCurrentPosition(), - existingAircraft.getVelocity(), - existingAircraft.getHeading(), - existingAircraft.getTimestamp()); - - // 控制历史记录长度 - if (existingAircraft.getStateHistory().size() + 1 > existingAircraft.MAX_HISTORY) { - existingAircraft.getStateHistory().removeLast(); - } - existingAircraft.getStateHistory().addFirst(currentState); - newAircraft.setStateHistory(existingAircraft.getStateHistory()); + if (collectorDisabled) { + return; + } + + try { + List newAircrafts = dataCollectorDao.collectAircraftData(airportAircraftEndpoint, airportBaseUrl); + if (newAircrafts.isEmpty()) { + return; } - // 将List转为Map(flightNo -> Aircraft) - Map aircraftBatch = newAircrafts.stream() - .collect(Collectors.toMap( - Aircraft::getFlightNo, - aircraft -> aircraft, - (existing, replacement) -> existing, - ConcurrentHashMap::new - )); - movingObjectRepository.updateTypeAll(MovingObjectType.AIRCRAFT, aircraftBatch); + + for (Aircraft newAircraft : newAircrafts) { + String flightNo = newAircraft.getFlightNo(); + Map snapshot = movingObjectRepository.getTypeMapDirect(MovingObjectType.AIRCRAFT); + Aircraft existingAircraft = (Aircraft) snapshot.get(flightNo); + // 获取已存在的航空器(如果存在) + if (existingAircraft != null) { + // 更新现有航空器的状态 + MovementState currentState = new MovementState( + existingAircraft.getCurrentPosition(), + existingAircraft.getVelocity(), + existingAircraft.getHeading(), + existingAircraft.getTimestamp()); + + // 控制历史记录长度 + if (existingAircraft.getStateHistory().size() + 1 > existingAircraft.MAX_HISTORY) { + existingAircraft.getStateHistory().removeLast(); + } + existingAircraft.getStateHistory().addFirst(currentState); + newAircraft.setStateHistory(existingAircraft.getStateHistory()); + } + // 将List转为Map(flightNo -> Aircraft) + Map aircraftBatch = newAircrafts.stream() + .collect(Collectors.toMap( + Aircraft::getFlightNo, + aircraft -> aircraft, + (existing, replacement) -> existing, + ConcurrentHashMap::new + )); + movingObjectRepository.updateTypeAll(MovingObjectType.AIRCRAFT, aircraftBatch); + } + } catch (Exception e) { + log.error("采集航空器数据异常", e); } } @Scheduled(fixedRateString = "${data.collector.interval}") @Async // 异步执行 public void collectVehicleData() { - List vehicles = dataCollectorDao.collectVehicleData(airportVehicleEndpoint, airportBaseUrl); - for (SpecialVehicle newVehicle : vehicles) { - String vehicleNo = newVehicle.getVehicleNo(); - // 获取已存在的航空器(如果存在) - Map snapshot = movingObjectRepository.getSnapshot(MovingObjectType.SPECIAL_VEHICLE); - SpecialVehicle specialVehicle = (SpecialVehicle) snapshot.get(vehicleNo); - if (specialVehicle != null) { - // 更新现有航空器的状态 - MovementState currentState = new MovementState( - specialVehicle.getCurrentPosition(), - specialVehicle.getVelocity(), - specialVehicle.getHeading(), - specialVehicle.getTimestamp() - ); - - // 控制历史记录长度 - if (specialVehicle.getStateHistory().size() > specialVehicle.MAX_HISTORY) { - specialVehicle.getStateHistory().removeLast(); - } - specialVehicle.getStateHistory().addFirst(currentState); - newVehicle.setStateHistory(specialVehicle.getStateHistory()); - } - + if (collectorDisabled) { + return; + } + + try { + List vehicles = dataCollectorDao.collectVehicleData(airportVehicleEndpoint, airportBaseUrl); + if (vehicles.isEmpty()) { + return; + } + + for (SpecialVehicle newVehicle : vehicles) { + String vehicleNo = newVehicle.getVehicleNo(); + // 获取已存在的航空器(如果存在) + Map snapshot = movingObjectRepository.getSnapshot(MovingObjectType.SPECIAL_VEHICLE); + SpecialVehicle specialVehicle = (SpecialVehicle) snapshot.get(vehicleNo); + if (specialVehicle != null) { + // 更新现有航空器的状态 + MovementState currentState = new MovementState( + specialVehicle.getCurrentPosition(), + specialVehicle.getVelocity(), + specialVehicle.getHeading(), + specialVehicle.getTimestamp() + ); + + // 控制历史记录长度 + if (specialVehicle.getStateHistory().size() > specialVehicle.MAX_HISTORY) { + specialVehicle.getStateHistory().removeLast(); + } + specialVehicle.getStateHistory().addFirst(currentState); + newVehicle.setStateHistory(specialVehicle.getStateHistory()); + } + } + // 将List转为Map(vehicleNo -> SpecialVehicle) + Map vehicleBatch = vehicles.stream() + .collect(Collectors.toMap( + SpecialVehicle::getVehicleNo, + vehicle -> vehicle, + (existing, replacement) -> existing, + ConcurrentHashMap::new + )); + movingObjectRepository.updateTypeAll(MovingObjectType.SPECIAL_VEHICLE, vehicleBatch); + } catch (Exception e) { + log.error("采集车辆数据异常", e); } - // 将List转为Map(vehicleNo -> SpecialVehicle) - Map vehicleBatch = vehicles.stream() - .collect(Collectors.toMap( - SpecialVehicle::getVehicleNo, - vehicle -> vehicle, - (existing, replacement) -> existing, - ConcurrentHashMap::new - )); - movingObjectRepository.updateTypeAll(MovingObjectType.SPECIAL_VEHICLE, vehicleBatch); } - - - -// @Scheduled(fixedRateString = "${data.collector.interval}") -// @Async // 异步执行 -// public void collectVehicleLocationData() { -// List unmannedVehicles = dataCollectorDao.getVehicleLocationInfo(); -// for (UnmannedVehicle newVehicle : unmannedVehicles) { -// String vehicleNo = newVehicle.getVehicleId(); -// // 获取已存在的航空器(如果存在) -// UnmannedVehicle unmannedVehicle = unmannedVehicleMap.get(vehicleNo); -// if (unmannedVehicle != null) { -// // 更新现有航空器的状态 -// MovementState currentState = new MovementState( -// newVehicle.getCurrentPosition(), -// newVehicle.getVelocity(), -// newVehicle.getHeading(), -// newVehicle.getTimestamp() -// ); -// -// // 使用已存在航空器的历史队列 -// unmannedVehicle.setCurrentPosition(newVehicle.getCurrentPosition()); -// unmannedVehicle.setVelocity(newVehicle.getVelocity()); -// unmannedVehicle.setHeading(newVehicle.getHeading()); -// unmannedVehicle.setTimestamp(newVehicle.getTimestamp()); -// unmannedVehicle.getStateHistory().addFirst(currentState); -// -// // 控制历史记录长度 -// if (unmannedVehicle.getStateHistory().size() > unmannedVehicle.MAX_HISTORY) { -// unmannedVehicle.getStateHistory().removeLast(); -// } -// } else { -// // 新的航空器,初始化历史记录 -// MovementState initialState = new MovementState( -// newVehicle.getCurrentPosition(), -// newVehicle.getVelocity(), -// newVehicle.getHeading(), -// newVehicle.getTimestamp() -// ); -// newVehicle.getStateHistory().addFirst(initialState); -// unmannedVehicleMap.put(vehicleNo, newVehicle); -// } -// } - - // 更新数据Map(用于其他服务访问) -// storeData(MovingObjectType.UNMANNED_VEHICLE.toString(), new ArrayList<>(unmannedVehicleMap.values())); -// } -// -// -// // 在DataCollectorService类中添加: -// @Getter -// private final LinkedBlockingQueue>> processingQueue = new LinkedBlockingQueue<>(); -// -// // 修改storeData方法,推送增量数据 -// private void storeData(String type, List data) { -// List converted = new CopyOnWriteArrayList<>(data); -// dataMap.put(type, converted); -// -// // 创建增量数据包(仅包含当前类型) -// Map> delta = new ConcurrentHashMap<>(); -// delta.put(type, converted); -// processingQueue.offer(delta); -// } -// -// private void validateAndUpdateTimestamp(MovementState state, LinkedList history) { -// if (!history.isEmpty()) { -// MovementState lastState = history.getFirst(); -// if (state.getTimestamp() <= lastState.getTimestamp()) { -// log.warn("检测到时间戳乱序: current={}, last={}", -// state.getTimestamp(), lastState.getTimestamp()); -// // 使用递增时间戳 -// state.setTimestamp(lastState.getTimestamp() + 1); -// } -// } - + + @PreDestroy + public void shutdown() { + log.info("正在关闭数据采集服务..."); + // 清理资源 + dataMap.clear(); + log.info("数据采集服务已关闭"); + } } diff --git a/src/main/java/com/dongni/collisionavoidance/dataProcessing/service/DataProcessor.java b/src/main/java/com/dongni/collisionavoidance/dataProcessing/service/DataProcessor.java index 381808d..93f04da 100644 --- a/src/main/java/com/dongni/collisionavoidance/dataProcessing/service/DataProcessor.java +++ b/src/main/java/com/dongni/collisionavoidance/dataProcessing/service/DataProcessor.java @@ -4,9 +4,11 @@ import com.dongni.collisionavoidance.common.model.MovingObject; import com.dongni.collisionavoidance.common.model.MovingObjectType; import com.dongni.collisionavoidance.common.model.repository.MovingObjectRepository; import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; @@ -14,6 +16,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; @Slf4j @@ -28,12 +32,21 @@ public class DataProcessor { private SpeedCalculationService speedCalculationService; @Resource private Executor processingExecutor; - - + + @Value("${data.processor.enabled:true}") + private boolean processorEnabled; + + private final AtomicBoolean running = new AtomicBoolean(false); @PostConstruct public void init() { + if (!processorEnabled) { + log.info("数据处理器已禁用,不会启动处理线程"); + return; + } + log.info("启动数据处理线程..."); + running.set(true); processingExecutor.execute(() -> { try { processLoop(); @@ -42,51 +55,68 @@ public class DataProcessor { } }); } + + @PreDestroy + public void shutdown() { + log.info("正在关闭数据处理器..."); + // 标记处理循环应该停止 + running.set(false); + log.info("数据处理器已关闭"); + } - - // 修改takeUpdate调用逻辑 private void processLoop() { log.debug("进入数据处理循环"); try { - while (!Thread.currentThread().isInterrupted()) { - // 添加超时机制和空值判断 - Map> delta = movingObjectRepository.takeUpdate(); - if (delta == null || delta.isEmpty()) { - log.debug("未获取到数据更新,等待500ms"); - Thread.sleep(500); - continue; - } - - delta.forEach((objectType, ids) -> { - // 添加集合空值检查 - if (ids == null || ids.isEmpty()) { - log.warn("接收到空ID集合,类型:{}", objectType); - return; + while (running.get() && !Thread.currentThread().isInterrupted()) { + try { + // 添加超时机制和空值判断 + Map> delta = movingObjectRepository.pollUpdate(500, TimeUnit.MILLISECONDS); + if (delta == null || delta.isEmpty()) { + log.debug("未获取到数据更新,等待下一轮"); + continue; } - // 获取该类型全量数据快照 - Map snapshot = movingObjectRepository.getTypeMapDirect(objectType); - // 转换为待处理的数据列表 - List dataList = new CopyOnWriteArrayList<>(snapshot.values()); - - log.debug("正在处理 {} 类型的 {} 条数据", objectType, dataList.size()); - - try { - switch (objectType) { - case AIRCRAFT -> processAircraftData(dataList); - case SPECIAL_VEHICLE -> processVehicleData(dataList); - case UNMANNED_VEHICLE -> processLocationData(dataList); - default -> log.warn("未支持的数据类型: {}", objectType); + delta.forEach((objectType, ids) -> { + // 添加集合空值检查 + if (ids == null || ids.isEmpty()) { + log.warn("接收到空ID集合,类型:{}", objectType); + return; } - } catch (Exception e) { - log.error("数据处理异常 [类型: {}]", objectType, e); + // 获取该类型全量数据快照 + Map snapshot = movingObjectRepository.getTypeMapDirect(objectType); + + // 转换为待处理的数据列表 + List dataList = new CopyOnWriteArrayList<>(snapshot.values()); + + log.debug("正在处理 {} 类型的 {} 条数据", objectType, dataList.size()); + + try { + switch (objectType) { + case AIRCRAFT -> processAircraftData(dataList); + case SPECIAL_VEHICLE -> processVehicleData(dataList); + case UNMANNED_VEHICLE -> processLocationData(dataList); + default -> log.warn("未支持的数据类型: {}", objectType); + } + } catch (Exception e) { + log.error("数据处理异常 [类型: {}]", objectType, e); + } + }); + log.debug("本次处理完成,共处理{}种类型更新", delta.size()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("数据处理线程被中断"); + break; + } catch (Exception e) { + log.error("数据处理过程发生错误", e); + // 避免因错误导致CPU使用率飙升 + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; } - }); - log.debug("本次处理完成,共处理{}种类型更新", delta.size()); + } } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.warn("数据处理线程被中断"); } catch (Exception e) { log.error("数据处理循环发生未预期错误", e); } diff --git a/src/test/resources/application-test.yml b/src/test/resources/application-test.yml index 4358de6..cffe937 100644 --- a/src/test/resources/application-test.yml +++ b/src/test/resources/application-test.yml @@ -36,13 +36,18 @@ test-mode: true data: collector: disabled: true + interval: 60000 # 设置较长的间隔以防万一 airport-api: base-url: http://localhost:8090 auth: username: test password: test - data-refresh-interval-ms: 0 - + endpoints: + vehicle: /mock/vehicles + aircraft: /mock/aircrafts + processor: + enabled: false # 禁用数据处理器 + # 日志配置 logging: level: