QDAirPortBackend0122/tools/mock_airport.py
2026-01-22 13:19:47 +08:00

2539 lines
96 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, jsonify, request
import time
import math
import logging
import os
import threading
import atexit
import random
from typing import Any, Literal, final, TypedDict
# 导入统一的日志配置
import sys
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from logging_config import setup_logger
# 设置当前模块的日志记录器
logger = setup_logger(
name='mock_airport',
log_file='../logs/mock_airport.log',
max_bytes=10*1024*1024, # 10MB
backup_count=5
)
app = Flask(__name__)
# 为位置与移动对象定义类型,避免使用 Any
class Point(TypedDict):
latitude: float
longitude: float
# 必填字段
class MovingObjectRequired(TypedDict):
latitude: float
longitude: float
time: int
speed: float
moving_to_end: bool
start_point: Point
end_point: Point
# 可选字段total=False
class MovingObject(MovingObjectRequired, total=False):
flightNo: str
vehicleNo: str
wait_until: float
direction: float
# 地球半径(米)
EARTH_RADIUS = 6378137.0
COMMAND_PRIORITIES = {
"ALERT": 5,
"WARNING": 3,
"PARKING": 2,
"RESUME": 1
}
def meters_to_degrees(meters: float, latitude: float) -> tuple[float, float]:
"""
将米转换为度,考虑纬度的影响
"""
# 纬度方向1度 = 111,319.9米
meters_per_deg_lat: float = 111319.9
# 经度方向1度 = 111,319.9 * cos(latitude)米
meters_per_deg_lon: float = meters_per_deg_lat * math.cos(math.radians(latitude))
return meters / meters_per_deg_lat, meters / meters_per_deg_lon
# 距离配置(米)
DIST_300M = 300
DIST_200M = 200
DIST_150M = 150
DIST_100M = 100
DIST_50M = 50
# 时间配置(秒)
WAIT_TIME_AFTER_RETURN = 0.1 # 返回起点后的等待时间进一步优化减少到0.1秒)
UPDATE_INTERVAL = 1.0 # 位置更新间隔, 默认 1 秒
TRAFFIC_LIGHT_SWITCH_INTERVAL = 10.0 # 红绿灯切换间隔
# 根据 route.md 定义的坐标点
# 飞机 CA1234 路径
CA_START = {"longitude": 120.086263, "latitude": 36.370484} # 飞机起点
CA_END = {"longitude": 120.080996, "latitude": 36.369105} # 飞机终点
# 飞机 MU5123 路径
MU_START = {"longitude": 120.088076, "latitude": 36.374179} # 飞机起点
MU_END = {"longitude": 120.077971, "latitude": 36.371503} # 飞机终点
# 特勤车 鲁B123 路径
SPECIAL_VEHICLE_START = {"longitude": 120.080801, "latitude": 36.366626} # 特勤车起点
SPECIAL_VEHICLE_END = {"longitude": 120.083899, "latitude": 36.367403} # 特勤车终点
# 普通车 鲁B234 路径
NORMAL_VEHICLE_START = {"longitude": 120.087259, "latitude": 36.368299} # 普通车起点
NORMAL_VEHICLE_END = {"longitude": 120.083899, "latitude": 36.367403} # 普通车终点
# 飞机和车辆尺寸(半径 米)
AIRCRAFT_SIZE_M = 30.0
VEHICLE_SIZE_M = 10.0
# 航班数据配置 - 用于进出港查询接口(简化版)
# 移除时间周期控制,仅保留基础信息供航班通知使用
flight_data = {
"CA1234": { # 出港航班
"flightNo": "CA1234",
"type": "OUT",
"runway": "17",
"contactCross": "A2",
"seat": "201"
},
"MU5123": { # 进港航班 - 使用真实路由
"flightNo": "MU5123",
"type": "IN",
"runway": "35",
"contactCross": "F1", # F1滑行道
"seat": "138" # 138机位
}
}
# 航空器路由参数配置
aircraft_route_params = {
"CA1234": {
"arrival": {
"inRunway": "17",
"outRunway": "35",
"contactCross": "A2",
"seat": "201"
},
"departure": {
"inRunway": "17",
"outRunway": "35",
"startSeat": "201"
}
},
"MU5123": {
"arrival": {
"inRunway": "35",
"outRunway": "34",
"contactCross": "F1",
"seat": "138"
},
"departure": {
"inRunway": "35",
"outRunway": "34",
"startSeat": "138"
}
}
}
@app.route('/aircraftRouteParamsController/getRouteParams', methods=['GET', 'OPTIONS'])
def get_aircraft_route_params():
"""获取航空器路由查询参数"""
if request.method == 'OPTIONS':
return '', 204
if not check_auth():
return jsonify({
"status": 401,
"msg": "认证失败",
"data": None
}), 401
# 获取航班号和路由类型参数
flight_no = request.args.get('flightNo')
route_type = request.args.get('routeType', '').upper() # IN 或 OUT
if not flight_no:
return jsonify({
"status": 400,
"msg": "缺少flightNo参数",
"data": None
}), 400
if route_type not in ['IN', 'OUT']:
return jsonify({
"status": 400,
"msg": "routeType参数必须是IN或OUT",
"data": None
}), 400
# 查找航班路由参数
aircraft_params = aircraft_route_params.get(flight_no)
if not aircraft_params:
return jsonify({
"status": 404,
"msg": f"未找到航班 {flight_no} 的路由参数",
"data": None
}), 404
# 根据路由类型返回对应参数
if route_type == 'IN':
route_params = aircraft_params.get('arrival')
if not route_params:
return jsonify({
"status": 404,
"msg": f"未找到航班 {flight_no} 的进港路由参数",
"data": None
}), 404
logger.info(f"进港路由参数查询: flightNo={flight_no}, params={route_params}")
return jsonify({
"status": 200,
"msg": "进港路由参数查询成功",
"data": {
"flightNo": flight_no,
"routeType": "IN",
"inRunway": route_params["inRunway"],
"outRunway": route_params["outRunway"],
"contactCross": route_params["contactCross"],
"seat": route_params["seat"],
"timestamp": int(time.time() * 1000)
}
})
else: # OUT
route_params = aircraft_params.get('departure')
if not route_params:
return jsonify({
"status": 404,
"msg": f"未找到航班 {flight_no} 的出港路由参数",
"data": None
}), 404
logger.info(f"出港路由参数查询: flightNo={flight_no}, params={route_params}")
return jsonify({
"status": 200,
"msg": "出港路由参数查询成功",
"data": {
"flightNo": flight_no,
"routeType": "OUT",
"inRunway": route_params["inRunway"],
"outRunway": route_params["outRunway"],
"startSeat": route_params["startSeat"],
"timestamp": int(time.time() * 1000)
}
})
# 航空器路由数据 - 每架飞机使用不同的路由
aircraft_routes = {
# CA1234的路由 - 简化的出港路由17号跑道到201机位区域
"CA1234": {
"departure": {
"type": "OUT",
"status": "COMPLETE",
"codes": "201,A2,17",
"geometry": None,
"coordinateSystem": "WGS84",
"geoPath": {
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[120.08508640012495, 36.36182498963186],
[120.08090199425916, 36.362577136200784],
[120.08077110864791, 36.36212595046586],
[120.08004241896676, 36.36188154498729],
[120.07859116299593, 36.365372400901556],
[120.07757535108954, 36.36533179518197],
[120.07649732717964, 36.36814042245338],
[120.07441715579117, 36.36757367925402]
]
},
"properties": {
"code": "A2"
}
}
]
}
}
},
# MU5123的路由 - 真实完整进港路由35号跑道经F1滑行道到138机位
"MU5123": {
"arrival": {
"type": "IN",
"status": "COMPLETE",
"codes": "F1,L4,138",
"geometry": None,
"coordinateSystem": "CGCS2000",
"geoPath": {
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050742275893088E7, 4026164.644604296],
[4.050742342874898E7, 4026162.545793306]
]
},
"properties": {
"code": "L4"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050743615407222E7, 4026122.672208275],
[4.050743684026714E7, 4026120.146600441],
[4.050743730372977E7, 4026117.570797326],
[4.050743754093282E7, 4026114.964402468],
[4.050743757419489E7, 4026113.602043673],
[4.050743755007106E7, 4026112.347252104],
[4.050743733107493E7, 4026109.739264329],
[4.050743688561112E7, 4026107.160287504]
]
},
"properties": {
"code": "L4"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050717462298063E7, 4026091.904402129],
[4.050716820216861E7, 4026089.855066455]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050722536188381E7, 4026108.097315812],
[4.050720821283463E7, 4026102.624334418]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050727144214725E7, 4026112.527790001],
[4.050726278505515E7, 4026114.415332655]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050731882638656E7, 4026102.196402456],
[4.050727312768086E7, 4026112.160285922]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050738651815705E7, 4026087.437277401],
[4.050734647450486E7, 4026096.168165339]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050714461981621E7, 4026082.328947974],
[4.05071119278174E7, 4026071.895744022]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050734647450486E7, 4026096.168165339],
[4.050733913391775E7, 4026097.768664928]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050689454491971E7, 4026002.519737061],
[4.050693265139649E7, 4026014.681113256],
[4.050697075787329E7, 4026026.842489458]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050741162298967E7, 4026083.825606086],
[4.050741416963529E7, 4026084.285112275],
[4.050741669524226E7, 4026084.971307588],
[4.050741915143272E7, 4026085.875012957],
[4.050742151951354E7, 4026086.989350639],
[4.050742378146222E7, 4026088.305839852],
[4.050742592006397E7, 4026089.814461317],
[4.050742791904272E7, 4026091.503733515],
[4.050742976318505E7, 4026093.360800063],
[4.050743143845592E7, 4026095.371527565],
[4.050743293210549E7, 4026097.52061317],
[4.050743423276621E7, 4026099.791701039],
[4.050743533053925E7, 4026102.167506821],
[4.05074362170699E7, 4026104.629949201],
[4.050743683431807E7, 4026106.966150228],
[4.050743688561112E7, 4026107.160287504]
]
},
"properties": {
"code": ""
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050697552118288E7, 4026028.362661481],
[4.050697075787329E7, 4026026.842489458]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050704221346159E7, 4026049.646966941],
[4.050703137036284E7, 4026046.18647901]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050708746004742E7, 4026064.087051627],
[4.050704840096232E7, 4026051.621473066]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.05071119278174E7, 4026071.895744022],
[4.050710556055213E7, 4026069.863682419]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050741939071107E7, 4026175.198599438],
[4.05074216811156E7, 4026168.021835575],
[4.050742275893088E7, 4026164.644604296]
]
},
"properties": {
"code": "L4"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050753774654577E7, 4026246.448261945],
[4.050749515081406E7, 4026236.848849251],
[4.050744870329395E7, 4026226.381394062]
]
},
"properties": {
"code": "138"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050753774654577E7, 4026246.448261945],
[4.0507613391983E7, 4026263.495786141],
[4.05076192451935E7, 4026264.814870958],
[4.050762119626365E7, 4026265.254565894]
]
},
"properties": {
"code": "138"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050742342874898E7, 4026162.545793306],
[4.050743615407222E7, 4026122.672208275]
]
},
"properties": {
"code": "L4"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050716820216861E7, 4026089.855066455],
[4.050714461981621E7, 4026082.328947974]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050720821283463E7, 4026102.624334418],
[4.050717462298063E7, 4026091.904402129]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050726278505515E7, 4026114.415332655],
[4.050725934642086E7, 4026115.009285077],
[4.050725586910526E7, 4026115.301280484],
[4.050725237957282E7, 4026115.289096617],
[4.050724890438099E7, 4026114.9728262],
[4.050724546997807E7, 4026114.354876244],
[4.050724210250195E7, 4026113.43994972],
[4.050722536188381E7, 4026108.097315812]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050727312768086E7, 4026112.160285922],
[4.050727144214725E7, 4026112.527790001]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050733913391775E7, 4026097.768664928],
[4.050731882638656E7, 4026102.196402456]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.05074011833275E7, 4026084.239767811],
[4.050738651815705E7, 4026087.437277401]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.05074011833275E7, 4026084.239767811],
[4.050740376230329E7, 4026083.794303922],
[4.050740637029002E7, 4026083.5753078],
[4.050740898743934E7, 4026083.584446135],
[4.050741162298967E7, 4026083.825606086]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050744870329395E7, 4026226.381394062],
[4.050744533581797E7, 4026225.466467002],
[4.050744206089737E7, 4026224.261526533],
[4.050743890345625E7, 4026222.775742978],
[4.050743588752466E7, 4026221.020424048],
[4.050743303605565E7, 4026219.00892878],
[4.050743037075064E7, 4026216.756565868],
[4.050742791189419E7, 4026214.280477153],
[4.050742567819968E7, 4026211.599507165],
[4.050742368666689E7, 4026208.734059705],
[4.050742195245258E7, 4026205.705942559],
[4.050742048875517E7, 4026202.538201526],
[4.050741930671428E7, 4026199.254945029],
[4.050741841532595E7, 4026195.88116063],
[4.050741782137419E7, 4026192.442524868],
[4.050741752937933E7, 4026188.965207836],
[4.050741754156362E7, 4026185.475674017],
[4.050741785783435E7, 4026182.000480871],
[4.050741847578449E7, 4026178.566076715],
[4.050741939071107E7, 4026175.198599438]
]
},
"properties": {
"code": ""
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050684675534101E7, 4025987.268076611],
[4.050685643844293E7, 4025990.358360866],
[4.050689454491971E7, 4026002.519737061]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050703137036284E7, 4026046.18647901],
[4.05070295640735E7, 4026045.610016264],
[4.050701981996215E7, 4026042.500261312],
[4.050697623399237E7, 4026028.590148907],
[4.050697552118288E7, 4026028.362661481]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050704840096232E7, 4026051.621473066],
[4.050704221346159E7, 4026049.646966941]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050710556055213E7, 4026069.863682419],
[4.050708746004742E7, 4026064.087051627]
]
},
"properties": {
"code": "F1"
}
}
]
}
}
}
}
# 飞机数据将在路径跟随器创建后初始化
aircraft_data = []
# 车辆数据,根据 route.md 配置
DEFAULT_VEHICLE_SPEED = 30.0 # km/h默认速度
EMERGENCY_BRAKE_DECELERATION = 0.8 # 紧急制动减速度 (每次更新减速 80%)
NORMAL_BRAKE_DECELERATION = 0.2 # 正常制动减速度 (每次更新减速 20%)
vehicle_data = [
{
"vehicleNo": "鲁B123", # 特勤车根据route.md更新
"longitude": SPECIAL_VEHICLE_START["longitude"],
"latitude": SPECIAL_VEHICLE_START["latitude"],
"time": int(time.time() * 1000),
"direction": 0.0, # 方向角
"speed": 30.0, # 根据route.md设置为30km/h
"start_point": SPECIAL_VEHICLE_START, # 起点
"end_point": SPECIAL_VEHICLE_END, # 终点
"moving_to_end": True # 当前是否向终点移动
},
{
"vehicleNo": "鲁B234", # 普通车根据route.md更新
"longitude": NORMAL_VEHICLE_START["longitude"],
"latitude": NORMAL_VEHICLE_START["latitude"],
"time": int(time.time() * 1000),
"direction": 180.0, # 根据route.md设置为180度
"speed": 30.0, # 根据route.md设置为30km/h
"start_point": NORMAL_VEHICLE_START, # 起点
"end_point": NORMAL_VEHICLE_END, # 终点
"moving_to_end": True # 当前是否向终点移动
}
]
# 车辆分类
airport_vehicle_data = [v for v in vehicle_data if v["vehicleNo"] in ["鲁B123", "鲁B234"]] # 特勤车和普通车
# 添加车辆状态类
@final
class VehicleState:
vehicle_no: str
is_running: bool
current_command: str | None
command_priority: int
target_speed: float
brake_mode: Literal['emergency', 'normal'] | None
command_reason: str | None
last_command_time: float
traffic_light_state: str | None
target_lat: float | None
target_lon: float | None
is_traffic_light_stop: bool
def __init__(self, vehicle_no: str) -> None:
self.vehicle_no = vehicle_no
self.is_running = True # 运行状态
self.current_command = None # 当前执行的指令
self.command_priority = 0 # 当前指令优先级
self.target_speed = float(DEFAULT_VEHICLE_SPEED) # 目标速度
self.brake_mode = None # 制动模式:'emergency' 或 'normal'
self.command_reason = None # 指令原因
self.last_command_time = float(time.time()) # 最后一次指令时间
self.traffic_light_state = None # 当前红绿灯状态
self.target_lat = None # 目标纬度
self.target_lon = None # 目标经度
self.is_traffic_light_stop = False # 是否因红灯停车
self.remoteControlActive = False # 远程控制是否激活
def can_be_overridden_by(self, command_type: str) -> bool:
"""判断当前指令是否可以被新指令覆盖"""
# 红绿灯信号不参与指令优先级判断
if command_type in ["RED", "GREEN", "YELLOW"]:
return True
# 类型安全获取优先级,避免 Unknown | None 作为 dict key
new_priority = COMMAND_PRIORITIES.get(str(command_type), 0)
current_key = self.current_command if isinstance(self.current_command, str) else None
current_priority = COMMAND_PRIORITIES.get(current_key or "", 0)
# 相同类型的指令可以覆盖(因为需要持续发送)
if isinstance(self.current_command, str) and command_type == self.current_command:
return True
# ALERT 指令可以被 RESUME 解除
if self.current_command == "ALERT":
return command_type == "RESUME"
# WARNING 指令可以被 RESUME 或相同及更高优先级指令覆盖
if self.current_command == "WARNING":
return command_type == "RESUME" or new_priority >= current_priority
# 其他情况下,相同或更高优先级可以覆盖
return new_priority >= current_priority
def update_command(self, command_type: str, target_lat: float | None = None, target_lon: float | None = None) -> None:
"""更新指令状态"""
# 更新目标位置
if target_lat is not None and target_lon is not None:
self.target_lat = target_lat
self.target_lon = target_lon
# 如果是红绿灯状态,只更新状态,不影响其他指令
if command_type in ["RED", "GREEN", "YELLOW"]:
old_state = self.traffic_light_state
self.traffic_light_state = command_type
# 更新红灯停车状态
if command_type == "RED":
self.is_traffic_light_stop = True
elif command_type in ["GREEN", "YELLOW"]:
self.is_traffic_light_stop = False
print(f"车辆 {self.vehicle_no} 收到红绿灯信号: {command_type} (原状态: {old_state})")
else:
# 其他指令正常更新
self.current_command = command_type
self.command_priority = COMMAND_PRIORITIES.get(command_type, 0)
# RESUME 指令不清除红绿灯状态,红绿灯状态由信号灯独立控制
if command_type == "RESUME":
# 清除其他状态
self.brake_mode = None
self.command_reason = None
# 更新运行状态
self.is_running = self.can_move()
print(f"更新车辆 {self.vehicle_no} 状态: command={self.current_command}, traffic_light={self.traffic_light_state}, priority={self.command_priority}, is_running={self.is_running}")
def can_move(self) -> bool:
"""检查车辆是否可以移动"""
# 如果有告警指令,不能移动
if self.current_command == "ALERT":
return False
# 如果有预警指令,不能移动
if self.current_command == "WARNING":
return False
# 如果有安全停靠指令,不能移动
if self.current_command == "PARKING":
return False
# 如果是红灯停车状态,不能移动
if self.is_traffic_light_stop:
return False
# 其他情况可以移动
return True
def log_state(self) -> None:
"""记录当前车辆状态"""
print(f"""
车辆状态:
- 车辆编号: {self.vehicle_no}
- 运行状态: {'运行中' if self.is_running else '已停止'}
- 当前指令: {self.current_command}
- 红绿灯状态: {self.traffic_light_state}
- 红灯停车: {'' if self.is_traffic_light_stop else ''}
- 指令优先级: {self.command_priority}
- 目标速度: {self.target_speed}
- 制动模式: {self.brake_mode}
- 指令原因: {self.command_reason}
- 目标位置: ({self.target_lat}, {self.target_lon})
""")
# 添加车辆状态管理
vehicle_states = {}
for vehicle in airport_vehicle_data:
vehicle_states[str(vehicle["vehicleNo"])] = VehicleState(str(vehicle["vehicleNo"]))
# 坐标转换系统 - 使用pyproj进行精确转换
class AirportCoordinateSystem:
"""机场坐标转换系统使用pyproj进行精确的CGCS2000横轴墨卡托投影转换"""
def __init__(self):
# 导入pyproj库
try:
from pyproj import CRS, Transformer
import warnings
warnings.filterwarnings('ignore', category=UserWarning, module='pyproj')
# 定义CGCS2000坐标系横轴墨卡托投影120°E中央经线
cgcs2000_proj4 = '+proj=tmerc +ellps=GRS80 +lon_0=120 +x_0=40500000 +y_0=0 +k_0=1.0 +units=m +no_defs'
self.cgcs2000_crs = CRS.from_proj4(cgcs2000_proj4)
self.wgs84_crs = CRS.from_epsg(4326) # WGS84
# 创建转换器
self.transformer_to_wgs84 = Transformer.from_crs(self.cgcs2000_crs, self.wgs84_crs, always_xy=True)
self.transformer_to_cgcs2000 = Transformer.from_crs(self.wgs84_crs, self.cgcs2000_crs, always_xy=True)
# 机场中心坐标基于CGCS2000投影坐标
self.utm_origin_x = 40507423 # 机场中心的CGCS2000 X坐标
self.utm_origin_y = 4026164 # 机场中心的CGCS2000 Y坐标
# 计算机场中心的WGS84坐标
center_lon, center_lat = self.transformer_to_wgs84.transform(self.utm_origin_x, self.utm_origin_y)
self.center_lon = center_lon
self.center_lat = center_lat
print(f"机场中心CGCS2000坐标: X={self.utm_origin_x}, Y={self.utm_origin_y}")
print(f"机场中心WGS84坐标: lon={self.center_lon:.10f}, lat={self.center_lat:.10f}")
except ImportError:
print("警告: pyproj库未安装将回退到简化转换算法")
self.use_pyproj = False
# 回退到原来的参数
self.center_lon = 120.0834104
self.center_lat = 36.35406879
self.utm_origin_x = 40507423
self.utm_origin_y = 4026164
self.meters_per_degree_lon = 89932
self.meters_per_degree_lat = 111320
except Exception as e:
print(f"pyproj初始化失败: {str(e)},将回退到简化转换算法")
self.use_pyproj = False
else:
self.use_pyproj = True
def cgcs2000_to_wgs84(self, x: float, y: float) -> tuple[float, float]:
"""
将CGCS2000投影坐标转换为WGS84地理坐标
使用pyproj进行精确转换
Args:
x: CGCS2000 X坐标东向
y: CGCS2000 Y坐标北向
Returns:
tuple: (纬度, 经度) WGS84坐标
"""
try:
if self.use_pyproj:
# 使用pyproj进行精确转换
longitude, latitude = self.transformer_to_wgs84.transform(float(x), float(y))
return latitude, longitude
else:
# 回退到简化算法
return self._fallback_cgcs2000_to_wgs84(x, y)
except Exception as e:
print(f"坐标转换错误: {str(e)}, x={x}, y={y}")
return self.center_lat, self.center_lon
def wgs84_to_cgcs2000(self, latitude: float, longitude: float) -> tuple[float, float]:
"""
将WGS84地理坐标转换为CGCS2000投影坐标
使用pyproj进行精确转换
Args:
latitude: WGS84纬度
longitude: WGS84经度
Returns:
tuple: (CGCS2000 X坐标, CGCS2000 Y坐标)
"""
try:
if self.use_pyproj:
# 使用pyproj进行精确转换
x, y = self.transformer_to_cgcs2000.transform(float(longitude), float(latitude))
return x, y
else:
# 回退到简化算法
return self._fallback_wgs84_to_cgcs2000(latitude, longitude)
except Exception as e:
print(f"WGS84到CGCS2000坐标转换错误: {str(e)}, lat={latitude}, lon={longitude}")
return self.utm_origin_x, self.utm_origin_y
def _fallback_cgcs2000_to_wgs84(self, x: float, y: float) -> tuple[float, float]:
"""回退的简化转换算法"""
offset_x = float(x) - self.utm_origin_x
offset_y = float(y) - self.utm_origin_y
lon_offset = offset_x / self.meters_per_degree_lon
lat_offset = offset_y / self.meters_per_degree_lat
longitude = self.center_lon + lon_offset
latitude = self.center_lat + lat_offset
return latitude, longitude
def _fallback_wgs84_to_cgcs2000(self, latitude: float, longitude: float) -> tuple[float, float]:
"""回退的简化转换算法"""
lon_offset = float(longitude) - self.center_lon
lat_offset = float(latitude) - self.center_lat
offset_x = lon_offset * self.meters_per_degree_lon
offset_y = lat_offset * self.meters_per_degree_lat
x = self.utm_origin_x + offset_x
y = self.utm_origin_y + offset_y
return x, y
def batch_convert_coordinates(self, coordinates: list[list[float]]) -> list[tuple[float, float]]:
"""批量转换坐标列表"""
converted = []
for coord in coordinates:
if len(coord) >= 2:
lat, lon = self.cgcs2000_to_wgs84(coord[0], coord[1])
converted.append((lat, lon))
return converted
# 全局坐标转换系统实例
coordinate_system = AirportCoordinateSystem()
def convert_route_to_cgcs2000(route_data: dict) -> dict:
"""
将路由数据转换为CGCS2000坐标系格式
Args:
route_data: 原始路由数据
Returns:
dict: 转换后的路由数据统一使用CGCS2000坐标系
"""
import copy
# 深度复制路由数据,避免修改原始数据
converted_route = copy.deepcopy(route_data)
try:
# 检查当前坐标系类型
current_coordinate_system = route_data.get("coordinateSystem", "WGS84")
logger.info(f"转换路由数据坐标系: 当前={current_coordinate_system} -> 目标=CGCS2000")
# 如果已经是CGCS2000坐标系直接返回
if current_coordinate_system == "CGCS2000":
logger.info("路由数据已经是CGCS2000坐标系无需转换")
return converted_route
# 如果是WGS84坐标系需要转换为CGCS2000
if current_coordinate_system == "WGS84":
geo_path = converted_route.get("geoPath", {})
features = geo_path.get("features", [])
logger.info(f"开始转换WGS84坐标到CGCS2000共有 {len(features)} 个feature")
for feature in features:
geometry = feature.get("geometry", {})
if geometry.get("type") == "LineString":
coordinates = geometry.get("coordinates", [])
# 转换坐标点
converted_coordinates = []
for coord in coordinates:
if len(coord) >= 2:
# WGS84坐标格式[longitude, latitude]
longitude, latitude = float(coord[0]), float(coord[1])
# 转换为CGCS2000坐标X, Y
x, y = coordinate_system.wgs84_to_cgcs2000(latitude, longitude)
converted_coordinates.append([x, y])
logger.debug(f"WGS84坐标转换: lon={longitude}, lat={latitude} -> CGCS2000: x={x}, y={y}")
# 更新坐标数据
geometry["coordinates"] = converted_coordinates
# 更新坐标系标识
converted_route["coordinateSystem"] = "CGCS2000"
logger.info(f"WGS84到CGCS2000坐标转换完成共转换 {len(features)} 个feature")
return converted_route
except Exception as e:
logger.error(f"路由数据坐标转换失败: {str(e)}")
# 转换失败时返回原始数据但确保坐标系标识为CGCS2000
converted_route["coordinateSystem"] = "CGCS2000"
return converted_route
def merge_discontinuous_route_for_flight(route_data: dict, flight_no: str) -> dict:
"""
使用Shapely自动拼接多个路径段为连续路径并基于codes字段验证方向
"""
try:
from shapely.geometry import LineString, Point
from shapely.ops import linemerge
geo_path = route_data.get("geoPath", {})
features = geo_path.get("features", [])
if len(features) <= 1:
return route_data
# 解析codes字段获取正确的路径顺序
codes_str = route_data.get("codes", "")
if codes_str:
code_order = [code.strip() for code in codes_str.split(",")]
logger.info(f"航班 {flight_no} codes顺序: {code_order}")
else:
logger.warning(f"航班 {flight_no} 缺少codes字段无法验证方向")
code_order = []
# 收集所有LineString路径段并记录每段的code标识
lines = []
feature_codes = []
for feature in features:
geometry = feature.get("geometry", {})
if geometry.get("type") == "LineString":
coordinates = geometry.get("coordinates", [])
if len(coordinates) >= 2:
lines.append(LineString(coordinates))
code = feature.get("properties", {}).get("code", "")
feature_codes.append(code)
if not lines:
return route_data
# Shapely自动拼接
merged_geometry = linemerge(lines)
# 如果成功合并为单条路径
if isinstance(merged_geometry, LineString):
continuous_coords = list(merged_geometry.coords)
logger.info(f"✅ 航班 {flight_no} 合并为连续路径,包含 {len(continuous_coords)} 个坐标点")
# 基于codes字段验证和修正路径方向
if len(code_order) >= 2:
continuous_coords = _verify_and_correct_path_direction(
continuous_coords, features, code_order, flight_no
)
merged_route_data = route_data.copy()
merged_route_data["geoPath"]["features"] = [{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": continuous_coords
},
"properties": {"code": "MERGED_PATH"}
}]
return merged_route_data
else:
# 无法合并就报错,不能保持原样掩盖问题
error_msg = f"❌ 航班 {flight_no} 路径无法合并为连续路径,存在不连续段"
logger.error(error_msg)
raise RuntimeError(error_msg)
except ImportError:
logger.warning(f"Shapely库未安装跳过路径合并")
return route_data
except Exception as e:
logger.error(f"航班 {flight_no} 路径合并失败: {str(e)}")
return route_data
def _verify_and_correct_path_direction(continuous_coords: list, original_features: list, code_order: list, flight_no: str) -> list:
"""
简化版本检查合并后路径端点所在的路径段code值来判断方向
"""
try:
if len(code_order) < 2:
return continuous_coords
# 找到端点最接近的路径段
def find_point_code(coord):
closest_code = None
min_distance = float('inf')
for feature in original_features:
geometry = feature.get("geometry", {})
code = feature.get("properties", {}).get("code", "")
if geometry.get("type") == "LineString" and code:
coordinates = geometry.get("coordinates", [])
# 计算到这个路径段的最小距离
for seg_coord in coordinates:
# 简单的欧几里得距离
distance = ((coord[0] - seg_coord[0]) ** 2 + (coord[1] - seg_coord[1]) ** 2) ** 0.5
if distance < min_distance:
min_distance = distance
closest_code = code
return closest_code
start_code = find_point_code(continuous_coords[0])
end_code = find_point_code(continuous_coords[-1])
logger.info(f"航班 {flight_no} 端点检查: 起点在{start_code}, 终点在{end_code}, 期望顺序{code_order}")
# 检查方向是否正确 - 只使用实际存在的路径段codes进行判断
expected_start = code_order[0] # F1
# 对于MU5123进港应该从F1开始但终点138不是路径段code所以检查起点即可
logger.info(f"航班 {flight_no} 方向检查: 期望从{expected_start}开始, 实际起点{start_code}, 终点{end_code}")
# 通用路径方向检查逻辑
if start_code == expected_start:
logger.info(f"✅ 航班 {flight_no} 路径方向正确: 从{expected_start}开始")
else:
# 方向相反,需要反转
continuous_coords.reverse()
logger.info(f"✅ 航班 {flight_no} 路径方向已修正: 从{expected_start}开始 (原起点: {start_code})")
return continuous_coords
except Exception as e:
logger.error(f"航班 {flight_no} 路径方向验证失败: {str(e)}")
return continuous_coords
def validate_route_data_integrity(route_data: dict, flight_no: str) -> dict:
"""
验证路由数据的完整性和格式正确性
Args:
route_data: 包含geoPath的路由数据
flight_no: 航班号,用于日志
Returns:
dict: 原始路由数据(不做任何修改)
"""
try:
geo_path = route_data.get("geoPath", {})
features = geo_path.get("features", [])
if len(features) == 0:
logger.warning(f"⚠️ 航班 {flight_no} 路径数据为空,无路径段")
return route_data
logger.info(f"✅ 航班 {flight_no} 路径数据完整:包含 {len(features)} 个路径段")
# 验证每个路径段的数据完整性
valid_segments = 0
total_coordinates = 0
for i, feature in enumerate(features):
geometry = feature.get("geometry", {})
properties = feature.get("properties", {})
if geometry.get("type") == "LineString":
coordinates = geometry.get("coordinates", [])
code = properties.get("code", "")
if len(coordinates) >= 2:
valid_segments += 1
total_coordinates += len(coordinates)
logger.debug(f" 路径段 {i+1}: 代码={code}, 坐标点数={len(coordinates)}")
else:
logger.warning(f" 路径段 {i+1}: 代码={code}, 坐标点数不足({len(coordinates)}) ⚠️")
else:
logger.warning(f" 路径段 {i+1}: 非LineString类型 ({geometry.get('type')}) ⚠️")
logger.info(f"✅ 航班 {flight_no} 数据验证完成:{valid_segments}/{len(features)} 有效路径段,共 {total_coordinates} 个坐标点")
# 验证坐标系信息
coordinate_system = route_data.get("coordinateSystem", "未知")
logger.info(f"✅ 航班 {flight_no} 坐标系:{coordinate_system}")
return route_data
except Exception as e:
logger.error(f"航班 {flight_no} 路径数据完整性验证异常: {str(e)}")
return route_data
def parse_route_path(route_data: dict) -> list[tuple[float, float]]:
"""
解析GeoJSON路由数据转换为WGS84坐标点序列
Args:
route_data: 包含geoPath的路由数据
Returns:
list: WGS84坐标点列表 [(lat, lon), ...]
"""
path_points = []
try:
# 检查坐标系类型
coordinate_system_type = route_data.get("coordinateSystem", "CGCS2000")
logger.info(f"路由数据坐标系: {coordinate_system_type}")
geo_path = route_data.get("geoPath", {})
features = geo_path.get("features", [])
logger.info(f"解析路径,共有 {len(features)} 个feature")
for feature in features:
geometry = feature.get("geometry", {})
if geometry.get("type") == "LineString":
coordinates = geometry.get("coordinates", [])
# 根据坐标系类型处理坐标点
for coord in coordinates:
if len(coord) >= 2:
x, y = float(coord[0]), float(coord[1])
if coordinate_system_type == "WGS84":
# 如果已经是WGS84坐标直接使用 (经度, 纬度)
lat, lon = y, x
logger.debug(f"WGS84坐标直接使用: lon={lon}, lat={lat}")
else:
# 如果是CGCS2000坐标需要转换
lat, lon = coordinate_system.cgcs2000_to_wgs84(x, y)
logger.debug(f"CGCS2000坐标转换: {x},{y} -> lat={lat}, lon={lon}")
path_points.append((lat, lon))
logger.info(f"路径解析完成,坐标系={coordinate_system_type},共生成 {len(path_points)} 个坐标点")
# 暂时禁用过滤,保留所有路径点用于测试
logger.info(f"保留所有 {len(path_points)} 个路径点")
return path_points
except Exception as e:
logger.error(f"路径解析失败: {str(e)}")
return []
def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""使用Haversine公式计算两点间距离"""
lat1_rad = math.radians(lat1)
lon1_rad = math.radians(lon1)
lat2_rad = math.radians(lat2)
lon2_rad = math.radians(lon2)
dlat = lat2_rad - lat1_rad
dlon = lon2_rad - lon1_rad
a = math.sin(dlat/2) * math.sin(dlat/2) + \
math.cos(lat1_rad) * math.cos(lat2_rad) * \
math.sin(dlon/2) * math.sin(dlon/2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
return EARTH_RADIUS * c
# 航空器路径跟随系统
class AircraftRouteFollower:
"""航空器路径跟随管理类"""
def __init__(self, flight_no: str):
self.flight_no = flight_no
self.current_route_type = None # 'arrival' or 'departure'
self.route_points = [] # 当前路径的坐标点序列
self.current_point_index = 0 # 当前目标路径点索引
self.route_progress = 0.0 # 路径完成进度 (0-1)
self.is_at_gate = False # 是否在机位停留
self.gate_position = None # 机位坐标
self._status_start_time = time.time() # 状态开始时间,用于简化状态管理
# 新增状态管理属性
self.flight_status = 'idle' # 飞机状态:'idle', 'in_route', 'at_gate', 'waiting'
self.wait_until = 0.0 # 等待结束时间戳
self.waiting_duration = 5.0 # 路径完成后的等待时间(秒)
# 解析航空器路由路径
self._parse_routes()
def _parse_routes(self):
"""解析进出港路径"""
try:
# 根据航班号获取对应的路由数据
flight_routes = aircraft_routes.get(self.flight_no)
if not flight_routes:
logger.warning(f"未找到航班 {self.flight_no} 的路由数据使用MU5123的路由作为默认")
flight_routes = aircraft_routes.get("MU5123", {})
# 解析进港路径
arrival_route = flight_routes.get("arrival", {})
# 验证MU5123进港路径的数据完整性然后合并为连续路径
if self.flight_no == "MU5123" and arrival_route:
logger.info(f"验证 {self.flight_no} 进港路径数据完整性")
arrival_route = validate_route_data_integrity(arrival_route, self.flight_no)
# 将31个路径段合并为连续路径让飞机能够完整运行
logger.info(f"{self.flight_no} 的分散路径段合并为连续路径")
arrival_route = merge_discontinuous_route_for_flight(arrival_route, self.flight_no)
self.arrival_points = parse_route_path(arrival_route)
logger.info(f"航空器 {self.flight_no} 进港路径: {len(self.arrival_points)} 个点")
# 解析出港路径
departure_route = flight_routes.get("departure", {})
self.departure_points = parse_route_path(departure_route)
logger.info(f"航空器 {self.flight_no} 出港路径: {len(self.departure_points)} 个点")
# 设置机位坐标138号机位的大概位置
if self.arrival_points:
self.gate_position = self.arrival_points[-1] # 进港路径的终点作为机位
elif self.departure_points:
self.gate_position = self.departure_points[0] # 出港路径的起点作为机位
else:
# 默认机位坐标
self.gate_position = (36.354068, 120.083410)
except Exception as e:
logger.error(f"路径解析失败: {str(e)}")
self.arrival_points = []
self.departure_points = []
self.gate_position = (36.354068, 120.083410)
def start_route(self, route_type: str):
"""开始跟随指定类型的路径"""
if route_type == "arrival" and self.arrival_points:
self.current_route_type = "arrival"
self.route_points = self.arrival_points.copy()
self.current_point_index = 0
self.route_progress = 0.0
self.is_at_gate = False
self.flight_status = 'in_route' # 设置为路径运行状态
self.wait_until = 0.0 # 清除等待时间
logger.info(f"航空器 {self.flight_no} 开始进港路径跟随")
elif route_type == "departure" and self.departure_points:
self.current_route_type = "departure"
self.route_points = self.departure_points.copy()
self.current_point_index = 0
self.route_progress = 0.0
self.is_at_gate = False
self.flight_status = 'in_route' # 设置为路径运行状态
self.wait_until = 0.0 # 清除等待时间
logger.info(f"航空器 {self.flight_no} 开始出港路径跟随")
else:
logger.warning(f"航空器 {self.flight_no} 无效的路径类型: {route_type}")
def update_position_on_route(self, aircraft: dict[str, Any], speed_kmh: float, elapsed_time: float) -> bool:
"""
沿路径更新航空器位置
Args:
aircraft: 航空器数据字典
speed_kmh: 速度 (km/h)
elapsed_time: 经过时间 (秒)
Returns:
bool: 是否到达路径终点
"""
current_time = time.time()
# 检查是否在等待期
if current_time < self.wait_until:
self.flight_status = 'waiting'
return False # 还在等待中,不更新位置
if not self.route_points or self.current_point_index >= len(self.route_points):
return True # 路径已完成
# 设置状态为路径运行中
self.flight_status = 'in_route'
# 获取当前位置
current_lat = to_float_safe(aircraft.get("latitude", 0))
current_lon = to_float_safe(aircraft.get("longitude", 0))
if current_lat is None or current_lon is None:
return False
# 获取目标路径点
target_lat, target_lon = self.route_points[self.current_point_index]
# 计算到目标点的距离
distance_to_target = haversine_distance(current_lat, current_lon, target_lat, target_lon)
# 计算移动距离
speed_mps = speed_kmh * 1000.0 / 3600.0 # 转换为米/秒
move_distance = speed_mps * elapsed_time
# 优化路径跟随:如果移动距离大于到目标点的距离,直接跳到下一个合适的点
remaining_distance = move_distance
while remaining_distance > 0 and self.current_point_index < len(self.route_points):
# 如果已经接近目标点或剩余移动距离大于目标距离,移动到下一个路径点
if distance_to_target <= remaining_distance:
# 移动到当前目标点
remaining_distance -= distance_to_target
self.current_point_index += 1
self.route_progress = min(1.0, self.current_point_index / max(1, len(self.route_points) - 1))
if self.current_point_index >= len(self.route_points):
# 路径完成,设置到真正的路径终点
final_lat, final_lon = self.route_points[-1] # 路径的最后一个点
aircraft["latitude"] = final_lat
aircraft["longitude"] = final_lon
self.is_at_gate = (self.current_route_type == "arrival")
self.flight_status = 'at_gate' if self.is_at_gate else 'completed'
# 设置5秒等待时间
self.wait_until = current_time + self.waiting_duration
logger.info(f"航空器 {self.flight_no} 完成 {self.current_route_type} 路径,等待{self.waiting_duration}秒后重新开始")
return True
# 更新到下一个目标点
current_lat, current_lon = target_lat, target_lon # 当前位置更新为刚到达的点
if self.current_point_index < len(self.route_points):
target_lat, target_lon = self.route_points[self.current_point_index]
distance_to_target = haversine_distance(current_lat, current_lon, target_lat, target_lon)
else:
# 已经超出路径范围,跳出循环
break
else:
# 不能完全到达目标点,按比例移动
break
# 计算最终位置
if remaining_distance > 0 and distance_to_target > 0:
# 按比例移动到目标点
move_ratio = remaining_distance / distance_to_target
# 线性插值计算新位置
new_lat = current_lat + (target_lat - current_lat) * move_ratio
new_lon = current_lon + (target_lon - current_lon) * move_ratio
# 更新航空器位置
aircraft["latitude"] = new_lat
aircraft["longitude"] = new_lon
else:
# 已到达当前目标点,更新位置
aircraft["latitude"] = current_lat
aircraft["longitude"] = current_lon
return False
def set_at_gate(self, aircraft: dict[str, Any]):
"""设置航空器在机位停留"""
if self.gate_position:
aircraft["latitude"] = self.gate_position[0]
aircraft["longitude"] = self.gate_position[1]
self.is_at_gate = True
# 多飞机路径跟随管理器
class AircraftRouteManager:
"""多架飞机路径跟随管理器"""
def __init__(self):
self.followers = {} # {flight_no: AircraftRouteFollower}
self.flight_configs = {} # 飞机配置信息
def add_aircraft(self, flight_no: str, config: dict):
"""添加飞机到路径跟随系统
Args:
flight_no: 航班号
config: 飞机配置 {
'primary_behavior': 'arrival'|'departure'|'cycle',
'speed': 30.0,
'cycle_timing': {...} # 仅用于cycle模式
}
"""
self.flight_configs[flight_no] = config
self.followers[flight_no] = AircraftRouteFollower(flight_no)
logger.info(f"已添加飞机 {flight_no} 到路径跟随系统,行为模式: {config.get('primary_behavior', 'cycle')}")
def update_aircraft_position(self, aircraft: dict[str, Any], elapsed_time: float):
"""统一的航空器位置更新入口"""
flight_no = aircraft.get("flightNo")
if flight_no not in self.followers:
logger.warning(f"飞机 {flight_no} 未注册到路径跟随系统")
return
follower = self.followers[flight_no]
config = self.flight_configs[flight_no]
behavior = config.get("primary_behavior", "cycle")
speed = config.get("speed", 30.0)
if behavior == "arrival":
# 纯进港模式
self._update_arrival_only_aircraft(aircraft, follower, speed, elapsed_time)
elif behavior == "departure":
# 纯出港模式
self._update_departure_only_aircraft(aircraft, follower, speed, elapsed_time)
# 更新时间戳
aircraft["time"] = int(time.time() * 1000)
def _update_arrival_only_aircraft(self, aircraft: dict[str, Any], follower: AircraftRouteFollower,
speed: float, elapsed_time: float):
"""更新纯进港模式飞机位置"""
current_time = time.time()
# 检查是否需要开始新路径(等待期结束后或初始状态)
if follower.flight_status in ['idle', 'completed', 'at_gate', 'waiting'] and current_time >= follower.wait_until:
follower.start_route("arrival")
# 设置到进港路径起点
if follower.route_points:
aircraft["latitude"] = follower.route_points[0][0]
aircraft["longitude"] = follower.route_points[0][1]
logger.info(f"航空器 {aircraft.get('flightNo')} 等待结束,重新开始进港路径")
route_completed = follower.update_position_on_route(aircraft, speed, elapsed_time)
if route_completed and follower.flight_status != 'waiting':
# 路径完成已设置5秒等待时间无需立即重新开始
logger.info(f"航空器 {aircraft.get('flightNo')} 进港完成,将等待{follower.waiting_duration}秒后重新开始")
def _update_departure_only_aircraft(self, aircraft: dict[str, Any], follower: AircraftRouteFollower,
speed: float, elapsed_time: float):
"""更新纯出港模式飞机位置"""
current_time = time.time()
# 检查是否需要开始新路径(等待期结束后或初始状态)
if follower.flight_status in ['idle', 'completed', 'at_gate', 'waiting'] and current_time >= follower.wait_until:
follower.start_route("departure")
# 设置到出港路径起点
if follower.route_points:
aircraft["latitude"] = follower.route_points[0][0]
aircraft["longitude"] = follower.route_points[0][1]
logger.info(f"航空器 {aircraft.get('flightNo')} 等待结束,重新开始出港路径")
route_completed = follower.update_position_on_route(aircraft, speed, elapsed_time)
if route_completed and follower.flight_status != 'waiting':
# 路径完成已设置5秒等待时间无需立即重新开始
logger.info(f"航空器 {aircraft.get('flightNo')} 出港完成,将等待{follower.waiting_duration}秒后重新开始")
# 创建全局路径跟随管理器
route_manager = AircraftRouteManager()
# 配置并添加飞机到路径跟随系统
# CA1234纯出港模式模拟出港飞机
route_manager.add_aircraft("CA1234", {
'primary_behavior': 'departure',
'speed': 50.0
})
# MU5123纯进港模式使用真实进港路由
route_manager.add_aircraft("MU5123", {
'primary_behavior': 'arrival',
'speed': 50.0 # 设置速度为50 km/h约45秒完成一个周期
})
# 初始化飞机数据
def initialize_aircraft_data():
"""初始化航空器数据,设置合适的起始位置"""
# CA1234 - 出港模式,从机位开始(出港路径起点)
ca1234_follower = route_manager.followers["CA1234"]
ca1234_start_pos = ca1234_follower.departure_points[0] if ca1234_follower.departure_points else (36.354068, 120.083410)
# MU5123 - 进港模式,从跑道开始(进港路径起点)
mu5123_follower = route_manager.followers["MU5123"]
mu5123_start_pos = mu5123_follower.arrival_points[0] if mu5123_follower.arrival_points else (36.374179, 120.088076)
return [
{
"flightNo": "CA1234", # 出港模式
"longitude": ca1234_start_pos[1],
"latitude": ca1234_start_pos[0],
"time": int(time.time() * 1000),
"altitude": 0.0,
"trackNumber": 1001,
"speed": 50.0,
"use_route_following": True
},
{
"flightNo": "MU5123", # 进港模式
"longitude": mu5123_start_pos[1],
"latitude": mu5123_start_pos[0],
"time": int(time.time() * 1000),
"altitude": 0.0,
"trackNumber": 1002,
"speed": 50.0,
"use_route_following": True
}
]
# 初始化飞机数据
aircraft_data.extend(initialize_aircraft_data())
# 额外测试飞机数据(用于前端性能测试)
additional_aircraft_data = []
def initialize_additional_aircraft():
"""初始化 60 架测试飞机"""
global additional_aircraft_data
# 机场中心坐标(基于 CGCS2000 投影坐标)
airport_center_lat = 36.35406879
airport_center_lon = 120.0834104
# 机场覆盖范围(大约 2km x 2km 区域)
lat_range = 0.02 # 大约 2.2km
lon_range = 0.02 # 大约 1.8km
# 创建 60 架测试飞机
for i in range(1, 61):
# 随机分布在机场范围内
lat_offset = (random.random() - 0.5) * lat_range
lon_offset = (random.random() - 0.5) * lon_range
aircraft_lat = airport_center_lat + lat_offset
aircraft_lon = airport_center_lon + lon_offset
# 前 55 架静止,后 5 架移动
is_moving = i > 55
speed = random.uniform(20.0, 50.0) if is_moving else 0.0
# 为移动的飞机设置简单的往复移动路径
if is_moving:
# 在当前位置周围创建一个小范围的移动路径
move_radius = 0.001 # 大约 100 米范围
start_lat = aircraft_lat - move_radius
start_lon = aircraft_lon - move_radius
end_lat = aircraft_lat + move_radius
end_lon = aircraft_lon + move_radius
moving_to_end = True
else:
# 静止飞机使用当前位置作为起点和终点
start_lat = aircraft_lat
start_lon = aircraft_lon
end_lat = aircraft_lat
end_lon = aircraft_lon
moving_to_end = False
aircraft = {
"flightNo": f"PT{i:03d}", # 测试飞机航班号PT001-PT060
"longitude": aircraft_lon,
"latitude": aircraft_lat,
"time": int(time.time() * 1000),
"altitude": 0.0,
"trackNumber": 2000 + i, # 跟踪号2001-2060
"speed": speed,
"use_route_following": False, # 不使用复杂路径跟随
"start_point": {"latitude": start_lat, "longitude": start_lon},
"end_point": {"latitude": end_lat, "longitude": end_lon},
"moving_to_end": moving_to_end
}
additional_aircraft_data.append(aircraft)
# 统计信息
stationary_count = sum(1 for a in additional_aircraft_data if a["speed"] == 0)
moving_count = len(additional_aircraft_data) - stationary_count
print(f"✅ 已添加测试飞机:")
print(f" - 总飞机数: {len(additional_aircraft_data)}")
print(f" - 静止飞机: {stationary_count}")
print(f" - 移动飞机: {moving_count}")
print(f" - 移动速度: 20-50 km/h")
# 添加到主飞机数据列表
aircraft_data.extend(additional_aircraft_data)
# 初始化测试飞机
initialize_additional_aircraft()
from collections.abc import Mapping
def calculate_distance_to_target(vehicle: Mapping[str, float], target_lat: float, target_lon: float) -> float:
"""计算车辆到目标位的距离(米)"""
# 转换为弧度
lat1_rad = math.radians(vehicle["latitude"])
lon1_rad = math.radians(vehicle["longitude"])
lat2_rad = math.radians(target_lat)
lon2_rad = math.radians(target_lon)
dlat = lat2_rad - lat1_rad
dlon = lon2_rad - lon1_rad
# 使用 Haversine 公式计算距离
a = math.sin(dlat/2) * math.sin(dlat/2) + \
math.cos(lat1_rad) * math.cos(lat2_rad) * \
math.sin(dlon/2) * math.sin(dlon/2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
return EARTH_RADIUS * c
# 删除了复杂的红绿灯和路口计算函数,因为已简化为往复运动逻辑
def update_position_with_vector(
obj: dict[str, Any],
target_point: "Point",
start_point: "Point",
speed: float,
elapsed_time: float,
return_to_start: bool = False
) -> bool:
"""
基于向量的位置更新逻辑
Args:
obj: 需要更新位置的对象(航空器或车辆)
target_point: 目标点坐标 {"latitude": float, "longitude": float}
start_point: 起点坐标 {"latitude": float, "longitude": float}
speed: 移动速度km/h
elapsed_time: 经过的时间(秒)
return_to_start: 是否在到达目标点时返回起点
"""
# 检查是否在等待状态
if "wait_until" not in obj:
obj["wait_until"] = 0.0
current_time = time.time()
# 基于类型安全读取 wait_until避免 float 与 object 比较
_wu = to_float_safe(obj.get("wait_until"))
wait_until = 0.0 if _wu is None else float(_wu)
if current_time < wait_until:
return False # 还在等待中,不更新位置
# 计算这一步要移动的距离(米)并转换为经纬度
speed_mps = float(speed) * 1000.0 / 3600.0
distance = speed_mps * float(elapsed_time)
# 安全读取并转换当前位置与目标点/起点经纬度为 float
cur_lat = to_float_safe(obj.get("latitude"))
cur_lon = to_float_safe(obj.get("longitude"))
tgt_lat = to_float_safe(target_point.get("latitude"))
tgt_lon = to_float_safe(target_point.get("longitude"))
if cur_lat is None or cur_lon is None or tgt_lat is None or tgt_lon is None:
# 无法解析经纬度,跳过更新
return False
# 将移动距离转换为经纬度变化量
move_dlat, move_dlon = meters_to_degrees(distance, cur_lat)
# 将5米的判断距离转换为经纬度优化从15米减少到5米减少颤抖
check_dlat, check_dlon = meters_to_degrees(5.0, cur_lat)
# 计算当前位置到终点的向量(经纬度空间)
vector_lat = float(tgt_lat) - float(cur_lat)
vector_lon = float(tgt_lon) - float(cur_lon)
# 计算向量长度(经纬度空间)
vector_length = math.sqrt(vector_lat * vector_lat + vector_lon * vector_lon)
# 检查是否到达终点
if vector_length <= math.sqrt(check_dlat * check_dlat + check_dlon * check_dlon):
# 精确设置到目标点位置,避免位置偏差
obj["latitude"] = float(tgt_lat)
obj["longitude"] = float(tgt_lon)
if return_to_start:
# 返回起点并设置等待时间
obj["wait_until"] = current_time + WAIT_TIME_AFTER_RETURN
print(f"对象 {obj.get('flightNo', obj.get('vehicleNo'))} 返回起点,等待{WAIT_TIME_AFTER_RETURN}秒后继续")
else:
# 设置短暂等待时间,避免立即切换方向导致的颤抖
obj["wait_until"] = current_time + 0.1 # 0.1秒的短暂等待
print(f"对象 {obj.get('flightNo', obj.get('vehicleNo'))} 到达目标点等待0.1秒后切换方向")
return True # 表示已到达终点
else:
# 归一化方向向量
unit_lat = vector_lat / vector_length
unit_lon = vector_lon / vector_length
# 计算这一步的位置变化
dlat = move_dlat * unit_lat
dlon = move_dlon * unit_lon
# 更新位置(写回为 float
obj["latitude"] = float(cur_lat + dlat)
obj["longitude"] = float(cur_lon + dlon)
return False # 表示正在移动中
def update_object_position(obj: dict[str, Any], elapsed_time: float) -> None:
"""统一的位置更新函数 - 用于飞机和车辆的往复运动"""
# 获取对象类型和标识
obj_type = "航空器" if "flightNo" in obj else "车辆"
obj_id = obj.get("flightNo", obj.get("vehicleNo"))
# 根据移动方向确定目标点
if obj["moving_to_end"]:
target_raw = obj["end_point"]
start_raw = obj["start_point"]
else:
target_raw = obj["start_point"]
start_raw = obj["end_point"]
# 构造类型安全的 Point
t_lat = to_float_safe(target_raw.get("latitude")) if isinstance(target_raw, dict) else None
t_lon = to_float_safe(target_raw.get("longitude")) if isinstance(target_raw, dict) else None
s_lat = to_float_safe(start_raw.get("latitude")) if isinstance(start_raw, dict) else None
s_lon = to_float_safe(start_raw.get("longitude")) if isinstance(start_raw, dict) else None
if t_lat is None or t_lon is None or s_lat is None or s_lon is None:
# 无法解析坐标,直接更新时间戳后返回
obj["time"] = int(time.time() * 1000)
return
target_point: Point = {"latitude": float(t_lat), "longitude": float(t_lon)}
start_point: Point = {"latitude": float(s_lat), "longitude": float(s_lon)}
# 读取速度为 float
spd = to_float_safe(obj.get("speed"))
if spd is None:
spd = 0.0
# 更新位置
reached_target = update_position_with_vector(
obj,
target_point,
start_point,
float(spd),
float(elapsed_time),
return_to_start=False
)
if reached_target:
# 切换移动方向
obj["moving_to_end"] = not obj["moving_to_end"]
target_name = "终点" if not obj["moving_to_end"] else "起点"
print(f"{obj_type} {obj_id} 到达{'终点' if obj['moving_to_end'] else '起点'},开始前往{target_name}")
# 注意等待时间已经在update_position_with_vector中设置了不需要重复设置
# 更新时间戳
obj["time"] = int(time.time() * 1000)
def update_aircraft_position(aircraft: dict[str, Any], elapsed_time: float) -> None:
"""更新航空器位置 - 统一使用路径跟随管理器"""
flight_no = aircraft.get("flightNo")
use_route_following = aircraft.get("use_route_following", False)
if use_route_following:
# 使用新的路径跟随管理器
route_manager.update_aircraft_position(aircraft, elapsed_time)
else:
# 保留旧的简单移动逻辑作为兜底
update_object_position(aircraft, elapsed_time)
def update_vehicle_position(vehicle: dict[str, Any], elapsed_time: float) -> None:
"""更新车辆位置 - 使用统一的位置更新函数"""
update_object_position(vehicle, elapsed_time)
# 简化的路口信息保留API兼容性
INTERSECTIONS = {
"TL001": {
"id": "INT001",
"name": "路口1",
"latitude": 36.369696,
"longitude": 120.083084
},
"TL002": {
"id": "INT002",
"name": "路口2",
"latitude": 36.365617,
"longitude": 120.084637
}
}
# T6路口定义
T6_INTERSECTION = {
"id": "T6",
"name": "T6路口",
"latitude": 36.372000,
"longitude": 120.085000
}
def generate_traffic_light_data() -> list[dict[str, Any]]:
"""生成红绿灯数据"""
traffic_light_data = []
# 生成两个固定路口的红绿灯数据
for tl_id, intersection in INTERSECTIONS.items():
signal = {
"id": tl_id,
"state": 1, # 0: RED, 1: GREEN测试时保持绿灯
"timestamp": int(time.time() * 1000),
"intersection": intersection["id"],
"position": {
"latitude": intersection["latitude"],
"longitude": intersection["longitude"]
}
}
traffic_light_data.append(signal)
return traffic_light_data
# 红绿灯数据
traffic_light_data = generate_traffic_light_data()
# 分别记录航空器、机场车辆的上次更新时间
last_aircraft_update_time = time.time()
last_vehicle_update_time = time.time() # 机场车辆更新时间
last_light_switch_time = time.time()
# 后台更新管理器
class BackgroundUpdateManager:
"""独立的后台更新管理器,主动更新航空器状态和位置"""
def __init__(self):
self.running = False
self.update_thread = None
self.data_lock = threading.Lock() # 保护共享数据的锁
def start(self):
"""启动后台更新线程"""
if not self.running:
self.running = True
self.update_thread = threading.Thread(target=self._update_loop, daemon=True)
self.update_thread.start()
logger.info("后台更新管理器已启动")
def stop(self):
"""停止后台更新线程"""
if self.running:
self.running = False
if self.update_thread:
self.update_thread.join(timeout=2.0)
logger.info("后台更新管理器已停止")
def _update_loop(self):
"""主更新循环"""
while self.running:
try:
current_time = time.time()
with self.data_lock:
# 更新所有航空器位置
for aircraft in aircraft_data:
update_aircraft_position(aircraft, UPDATE_INTERVAL)
aircraft["time"] = int(current_time * 1000)
# 更新所有车辆位置
for vehicle in airport_vehicle_data:
update_vehicle_position(vehicle, UPDATE_INTERVAL)
vehicle["time"] = int(current_time * 1000)
# 休眠到下一个更新周期
time.sleep(UPDATE_INTERVAL)
except Exception as e:
logger.error(f"后台更新出错: {str(e)}")
time.sleep(1.0) # 出错时短暂休眠后继续
# 全局后台更新管理器实例
background_manager = BackgroundUpdateManager()
def check_auth() -> bool:
auth_header = request.headers.get('Authorization')
if not auth_header:
logger.warning("认证失败: 缺少Authorization头")
return False
# 直接比较完整的Authorization头
result = auth_header == AUTH_TOKEN
if not result:
logger.warning(f"认证失败: token不匹配")
logger.debug(f"收到的token: {auth_header}")
logger.debug(f"期望的token: {AUTH_TOKEN}")
return result
def to_float_safe(x: object) -> float | None:
"""
安全将值转换为 float:
- 允许 int/float
- 允许非布尔的数字字符串
- 允许 dict优先取 'value''longitude''latitude' 等常见键
- 无法解析则返回 None
"""
# 排除布尔值bool 是 int 的子类,但不应当被视为数值)
if isinstance(x, bool):
return None
if isinstance(x, (int, float)):
return float(x)
if isinstance(x, str):
try:
return float(x.strip())
except Exception:
return None
if isinstance(x, dict):
for key in ("value", "longitude", "latitude"):
if key in x:
return to_float_safe(x[key])
return None
return None
@app.route('/openApi/getCurrentFlightPositions', methods=['GET', 'OPTIONS'])
def get_flight_positions():
"""获取当前航空器位置信息"""
if request.method == 'OPTIONS':
return '', 204
if not check_auth():
return jsonify({
"status": 401,
"msg": "认证失败",
"data": None
}), 401
# 使用线程锁保护数据访问
with background_manager.data_lock:
# 创建符合 API 格式的响应数据
response_data = []
for aircraft in aircraft_data:
lon_val = to_float_safe(aircraft.get("longitude"))
lat_val = to_float_safe(aircraft.get("latitude"))
api_aircraft = {
"flightNo": aircraft.get("flightNo"),
"longitude": round(lon_val, 6) if lon_val is not None else aircraft.get("longitude"),
"latitude": round(lat_val, 6) if lat_val is not None else aircraft.get("latitude"),
"time": aircraft.get("time")
}
response_data.append(api_aircraft)
return jsonify({
"status": 200,
"msg": "当前航空器实时位置数据",
"data": response_data
})
def switch_traffic_light_state() -> None:
"""统一处理红绿灯状态切换"""
global last_light_switch_time
current_time = time.time()
# 西路口红绿灯每15秒切换一次
elapsed_since_switch = current_time - last_light_switch_time
if elapsed_since_switch >= TRAFFIC_LIGHT_SWITCH_INTERVAL: # 使用常量
# 获取当前状态
current_state = traffic_light_data[0]["state"]
# 根据当前状态决定下一个状态
if current_state == 0: # 当前是红灯
# 切换到黄灯(表示红绿灯故障)
traffic_light_data[0]["state"] = 2 # 2 表示黄灯
print(f"西路口红绿灯状态切换为: 黄灯(红绿灯故障)")
elif current_state == 2: # 当前是黄灯
# 从黄灯切换到绿灯
traffic_light_data[0]["state"] = 1 # 1 表示绿灯
print(f"西路口红绿灯状态切换为: 绿灯")
else: # 当前是绿灯
# 从绿灯切换到红灯
traffic_light_data[0]["state"] = 0 # 0 表示红灯
print(f"西路口红绿灯状态切换为: 红灯")
last_light_switch_time = current_time
# 更新东路口红绿灯(根据航空器位置)
if aircraft_data:
aircraft = aircraft_data[0]
# 安全获取并转换纬度为浮点数,避免类型错误
aircraft_lat = to_float_safe(aircraft.get("latitude"))
t6_lat = to_float_safe(T6_INTERSECTION.get("latitude"))
if aircraft_lat is not None and t6_lat is not None:
lat_diff = abs(aircraft_lat - t6_lat) * 111319.9 # 转米
else:
# 无法解析时默认认为距离较大,避免误判为红灯
lat_diff = float('inf')
old_state = traffic_light_data[1]["state"]
# 大于50米为绿灯(1),否则红灯(0)
traffic_light_data[1]["state"] = 1 if lat_diff > DIST_50M else 0
if old_state != traffic_light_data[1]["state"]:
print(f"东路口红绿灯状态切换为: {'绿灯' if traffic_light_data[1]['state'] == 1 else '红灯'}")
@app.route('/openApi/getCurrentVehiclePositions', methods=['GET', 'OPTIONS'])
def get_vehicle_positions():
"""获取当前车辆位置信息(仅机场车辆)"""
if request.method == 'OPTIONS':
return '', 204
if not check_auth():
return jsonify({
"status": 401,
"msg": "认证失败",
"data": None
}), 401
try:
# 使用线程锁保护数据访问
with background_manager.data_lock:
response_data = []
for v in airport_vehicle_data:
lon_val = to_float_safe(v.get("longitude"))
lat_val = to_float_safe(v.get("latitude"))
v_out = {
"vehicleNo": v.get("vehicleNo"),
"longitude": round(lon_val, 6) if lon_val is not None else v.get("longitude"),
"latitude": round(lat_val, 6) if lat_val is not None else v.get("latitude"),
"time": v.get("time")
}
response_data.append(v_out)
return jsonify({
"status": 200,
"msg": "当前车辆实时位置数据",
"data": response_data
})
except Exception as e:
logger.error(f"Error in get_vehicle_positions: {str(e)}")
return jsonify({
"status": 500,
"msg": str(e),
"data": None
}), 500
@app.route('/openApi/getTrafficLightSignals', methods=['GET', 'OPTIONS'])
def get_traffic_light_signals():
"""获取红绿灯信号状态 (旧的轮询接口)"""
if request.method == 'OPTIONS':
return '', 204
# 新增: 返回空数据,以禁用轮询数据源
return jsonify({
"status": 200,
"msg": "Traffic light polling endpoint disabled to test push mechanism.",
"data": []
})
@app.after_request
def add_cors_headers(response):
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, OPTIONS'
response.headers['Access-Control-Max-Age'] = '3600'
return response
def check_vehicle_states():
"""定期检查所有车辆状态"""
for vehicle_no, state in vehicle_states.items():
state.log_state()
# 认证相关配置
AUTH_USERNAME = "dianxin"
AUTH_PASSWORD = "dianxin@123"
AUTH_TOKEN = "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3MzI3ODMwOTAsInVzZXJuYW1lIjoiYWRtaW4ifQ.y9feEL_9NT8UzED9NNkb0Ln6C-PBoufiSHWobWe5vWY"
@app.route('/login', methods=['POST', 'OPTIONS'])
def login():
if request.method == 'OPTIONS':
return '', 204
# 支持多种方式获取用户名和密码
username = None
password = None
# 优先从 URL query parameters 中获取符合API文档示例
username = request.args.get('username')
password = request.args.get('password')
# 如果query参数中没有尝试从form data中获取
if not username or not password:
username = request.form.get('username') or username
password = request.form.get('password') or password
# 如果form data中也没有尝试从JSON body中获取
if not username or not password:
try:
json_data = request.get_json(silent=True)
if json_data:
username = json_data.get('username') or username
password = json_data.get('password') or password
except:
pass
logger.info(f"收到登录请求: username={username}, password={password}")
logger.debug(f"请求 URL: {request.url}")
logger.debug(f"请求方法: {request.method}")
logger.debug(f"请求参数: {request.args}")
logger.debug(f"请求表单: {request.form}")
logger.debug(f"请求JSON: {request.get_json(silent=True)}")
if not username or not password:
return jsonify({
"status": 400,
"msg": "缺少用户名或密码",
"data": None
}), 400
if username == AUTH_USERNAME and password == AUTH_PASSWORD:
return jsonify({
"status": 200,
"msg": "登入成功",
"data": AUTH_TOKEN
})
else:
return jsonify({
"status": 401,
"msg": "用户名或密码错误",
"data": None
}), 401
# 新增API端点
@app.route('/userInfoController/refreshToken', methods=['GET', 'OPTIONS'])
def refresh_token():
"""Token刷新接口"""
if request.method == 'OPTIONS':
return '', 204
if not check_auth():
return jsonify({
"status": 401,
"msg": "认证失败",
"data": None
}), 401
# 返回新的token实际上还是同一个
return jsonify({
"status": 200,
"msg": "Token刷新成功",
"data": {
"token": "Bearer dianxin-token-2024",
"expiresIn": 86400 # 24小时过期
}
})
@app.route('/runwayPathPlanningController/findArrTaxiwayByRunwayAndContactCrossAndSeat', methods=['GET', 'OPTIONS'])
def get_arrival_taxiway_route():
"""获取进港滑行路线"""
if request.method == 'OPTIONS':
return '', 204
if not check_auth():
return jsonify({
"status": 401,
"msg": "认证失败",
"data": None
}), 401
# 获取请求参数
in_runway = request.args.get('inRunway', '35')
out_runway = request.args.get('outRunway', '34')
contact_cross = request.args.get('contactCross', 'F1')
seat = request.args.get('seat', '138')
logger.info(f"进港路线查询: inRunway={in_runway}, outRunway={out_runway}, contactCross={contact_cross}, seat={seat}")
# 根据参数匹配对应的航班路由
matching_flight = None
for flight_no, params in aircraft_route_params.items():
arrival_params = params.get("arrival", {})
if (arrival_params.get("inRunway") == in_runway and
arrival_params.get("contactCross") == contact_cross and
arrival_params.get("seat") == seat):
matching_flight = flight_no
break
# 如果找到匹配的航班返回对应路由否则使用MU5123作为默认
if matching_flight and matching_flight in aircraft_routes:
route_data = aircraft_routes[matching_flight]["arrival"]
logger.info(f"匹配航班 {matching_flight} 的进港路由")
else:
route_data = aircraft_routes["MU5123"]["arrival"]
logger.info(f"未找到匹配航班使用MU5123的进港路由作为默认")
# 将路由数据转换为CGCS2000坐标系格式
converted_route_data = convert_route_to_cgcs2000(route_data)
return jsonify({
"status": 200,
"msg": "进港滑行路线查询成功",
"data": converted_route_data
})
@app.route('/runwayPathPlanningController/findDepTaxiwayByRunwayAndContactCrossAndSeat', methods=['GET', 'OPTIONS'])
def get_departure_taxiway_route():
"""获取出港滑行路线"""
if request.method == 'OPTIONS':
return '', 204
if not check_auth():
return jsonify({
"status": 401,
"msg": "认证失败",
"data": None
}), 401
# 获取请求参数
in_runway = request.args.get('inRunway', '35')
out_runway = request.args.get('outRunway', '34')
start_seat = request.args.get('startSeat', '138')
logger.info(f"出港路线查询: inRunway={in_runway}, outRunway={out_runway}, startSeat={start_seat}")
# 根据参数匹配对应的航班路由
matching_flight = None
for flight_no, params in aircraft_route_params.items():
departure_params = params.get("departure", {})
if (departure_params.get("inRunway") == in_runway and
departure_params.get("startSeat") == start_seat):
matching_flight = flight_no
break
# 如果找到匹配的航班返回对应路由否则使用CA1234作为默认
if matching_flight and matching_flight in aircraft_routes:
route_data = aircraft_routes[matching_flight]["departure"]
logger.info(f"匹配航班 {matching_flight} 的出港路由")
else:
route_data = aircraft_routes["CA1234"]["departure"]
logger.info(f"未找到匹配航班使用CA1234的出港路由作为默认")
# 将路由数据转换为CGCS2000坐标系格式
converted_route_data = convert_route_to_cgcs2000(route_data)
return jsonify({
"status": 200,
"msg": "出港滑行路线查询成功",
"data": converted_route_data
})
def get_active_flights() -> list[dict[str, Any]]:
"""获取当前活跃的进出港航班(基于路径状态)"""
active_flights = []
current_time = int(time.time() * 1000)
# 遍历所有路径跟随管理器中的飞机
for flight_no, follower in route_manager.followers.items():
# 检查飞机是否在活跃状态(正在路径中或在机位)
if follower.flight_status in ['in_route', 'at_gate']:
# 获取航班基础信息
if flight_no in flight_data:
flight_info = flight_data[flight_no]
# 根据路径类型确定航班通知类型
flight_type = flight_info.get("type")
event_type = "落地" if flight_type == "IN" else "滑出"
# 构造航班通知数据
flight_notification = {
"flightNo": flight_no,
"type": flight_type,
"runway": flight_info.get("runway"),
"contactCross": flight_info.get("contactCross"),
"seat": flight_info.get("seat"),
"time": current_time
}
active_flights.append(flight_notification)
logger.info(f"航班 {flight_no} {event_type}通知中 (路径状态: {follower.flight_status}, 进度: {follower.route_progress:.2f})")
else:
logger.warning(f"航班 {flight_no} 在路径跟随器中但不在flight_data中")
return active_flights
@app.route('/openApi/getInboundAndOutboundFlightsNotification', methods=['GET', 'OPTIONS'])
def get_flights():
"""进出港航班查询接口Mock"""
if request.method == 'OPTIONS':
return '', 204
if not check_auth():
return jsonify({
"status": 401,
"msg": "认证失败",
"data": None
}), 401
try:
# 获取当前活跃航班
active_flights = get_active_flights()
logger.info(f"进出港航班查询: 返回 {len(active_flights)} 个活跃航班")
return jsonify({
"status": 200,
"msg": "进出港航班查询成功",
"data": active_flights
})
except Exception as e:
logger.error(f"进出港航班查询失败: {str(e)}")
return jsonify({
"status": 500,
"msg": f"查询失败: {str(e)}",
"data": []
}), 500
@app.route('/aircraftStatusController/getAircraftStatus', methods=['GET', 'OPTIONS'])
def get_aircraft_status():
"""获取航空器状态"""
if request.method == 'OPTIONS':
return '', 204
if not check_auth():
return jsonify({
"status": 401,
"msg": "认证失败",
"data": None
}), 401
# 返回简化的航空器状态信息
return jsonify({
"status": 200,
"msg": "航空器状态查询成功",
"data": {
"message": "已简化为两架飞机CA1234(出港)和MU5123(进港)"
}
})
if __name__ == '__main__':
# 启动后台更新管理器
background_manager.start()
# 注册应用关闭时的清理函数
def cleanup_on_exit():
background_manager.stop()
atexit.register(cleanup_on_exit)
try:
app.run(host='localhost', port=8090, debug=True)
finally:
# 确保后台管理器正确停止
background_manager.stop()