2539 lines
96 KiB
Python
2539 lines
96 KiB
Python
from flask import Flask, jsonify, request
|
||
import time
|
||
import math
|
||
import logging
|
||
import os
|
||
import threading
|
||
import atexit
|
||
import random
|
||
from typing import Any, Literal, final, TypedDict
|
||
|
||
# 导入统一的日志配置
|
||
import sys
|
||
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
||
from logging_config import setup_logger
|
||
|
||
# 设置当前模块的日志记录器
|
||
logger = setup_logger(
|
||
name='mock_airport',
|
||
log_file='../logs/mock_airport.log',
|
||
max_bytes=10*1024*1024, # 10MB
|
||
backup_count=5
|
||
)
|
||
|
||
app = Flask(__name__)
|
||
|
||
# 为位置与移动对象定义类型,避免使用 Any
|
||
class Point(TypedDict):
|
||
latitude: float
|
||
longitude: float
|
||
|
||
# 必填字段
|
||
class MovingObjectRequired(TypedDict):
|
||
latitude: float
|
||
longitude: float
|
||
time: int
|
||
speed: float
|
||
moving_to_end: bool
|
||
start_point: Point
|
||
end_point: Point
|
||
|
||
# 可选字段(total=False)
|
||
class MovingObject(MovingObjectRequired, total=False):
|
||
flightNo: str
|
||
vehicleNo: str
|
||
wait_until: float
|
||
direction: float
|
||
|
||
|
||
|
||
# 地球半径(米)
|
||
EARTH_RADIUS = 6378137.0
|
||
|
||
COMMAND_PRIORITIES = {
|
||
"ALERT": 5,
|
||
"WARNING": 3,
|
||
"PARKING": 2,
|
||
"RESUME": 1
|
||
}
|
||
|
||
def meters_to_degrees(meters: float, latitude: float) -> tuple[float, float]:
|
||
"""
|
||
将米转换为度,考虑纬度的影响
|
||
"""
|
||
# 纬度方向:1度 = 111,319.9米
|
||
meters_per_deg_lat: float = 111319.9
|
||
# 经度方向:1度 = 111,319.9 * cos(latitude)米
|
||
meters_per_deg_lon: float = meters_per_deg_lat * math.cos(math.radians(latitude))
|
||
return meters / meters_per_deg_lat, meters / meters_per_deg_lon
|
||
|
||
# 距离配置(米)
|
||
DIST_300M = 300
|
||
DIST_200M = 200
|
||
DIST_150M = 150
|
||
DIST_100M = 100
|
||
DIST_50M = 50
|
||
|
||
# 时间配置(秒)
|
||
WAIT_TIME_AFTER_RETURN = 0.1 # 返回起点后的等待时间(进一步优化:减少到0.1秒)
|
||
UPDATE_INTERVAL = 1.0 # 位置更新间隔, 默认 1 秒
|
||
TRAFFIC_LIGHT_SWITCH_INTERVAL = 10.0 # 红绿灯切换间隔
|
||
|
||
# 根据 route.md 定义的坐标点
|
||
|
||
# 飞机 CA1234 路径
|
||
CA_START = {"longitude": 120.086263, "latitude": 36.370484} # 飞机起点
|
||
CA_END = {"longitude": 120.080996, "latitude": 36.369105} # 飞机终点
|
||
|
||
# 飞机 MU5123 路径
|
||
MU_START = {"longitude": 120.088076, "latitude": 36.374179} # 飞机起点
|
||
MU_END = {"longitude": 120.077971, "latitude": 36.371503} # 飞机终点
|
||
|
||
# 特勤车 鲁B123 路径
|
||
SPECIAL_VEHICLE_START = {"longitude": 120.080801, "latitude": 36.366626} # 特勤车起点
|
||
SPECIAL_VEHICLE_END = {"longitude": 120.083899, "latitude": 36.367403} # 特勤车终点
|
||
|
||
# 普通车 鲁B234 路径
|
||
NORMAL_VEHICLE_START = {"longitude": 120.087259, "latitude": 36.368299} # 普通车起点
|
||
NORMAL_VEHICLE_END = {"longitude": 120.083899, "latitude": 36.367403} # 普通车终点
|
||
|
||
|
||
# 飞机和车辆尺寸(半径 米)
|
||
AIRCRAFT_SIZE_M = 30.0
|
||
VEHICLE_SIZE_M = 10.0
|
||
|
||
# 航班数据配置 - 用于进出港查询接口(简化版)
|
||
# 移除时间周期控制,仅保留基础信息供航班通知使用
|
||
flight_data = {
|
||
"CA1234": { # 出港航班
|
||
"flightNo": "CA1234",
|
||
"type": "OUT",
|
||
"runway": "17",
|
||
"contactCross": "A2",
|
||
"seat": "201"
|
||
},
|
||
"MU5123": { # 进港航班 - 使用真实路由
|
||
"flightNo": "MU5123",
|
||
"type": "IN",
|
||
"runway": "35",
|
||
"contactCross": "F1", # F1滑行道
|
||
"seat": "138" # 138机位
|
||
}
|
||
}
|
||
|
||
# 航空器路由参数配置
|
||
aircraft_route_params = {
|
||
"CA1234": {
|
||
"arrival": {
|
||
"inRunway": "17",
|
||
"outRunway": "35",
|
||
"contactCross": "A2",
|
||
"seat": "201"
|
||
},
|
||
"departure": {
|
||
"inRunway": "17",
|
||
"outRunway": "35",
|
||
"startSeat": "201"
|
||
}
|
||
},
|
||
"MU5123": {
|
||
"arrival": {
|
||
"inRunway": "35",
|
||
"outRunway": "34",
|
||
"contactCross": "F1",
|
||
"seat": "138"
|
||
},
|
||
"departure": {
|
||
"inRunway": "35",
|
||
"outRunway": "34",
|
||
"startSeat": "138"
|
||
}
|
||
}
|
||
}
|
||
|
||
@app.route('/aircraftRouteParamsController/getRouteParams', methods=['GET', 'OPTIONS'])
|
||
def get_aircraft_route_params():
|
||
"""获取航空器路由查询参数"""
|
||
if request.method == 'OPTIONS':
|
||
return '', 204
|
||
|
||
if not check_auth():
|
||
return jsonify({
|
||
"status": 401,
|
||
"msg": "认证失败",
|
||
"data": None
|
||
}), 401
|
||
|
||
# 获取航班号和路由类型参数
|
||
flight_no = request.args.get('flightNo')
|
||
route_type = request.args.get('routeType', '').upper() # IN 或 OUT
|
||
|
||
if not flight_no:
|
||
return jsonify({
|
||
"status": 400,
|
||
"msg": "缺少flightNo参数",
|
||
"data": None
|
||
}), 400
|
||
|
||
if route_type not in ['IN', 'OUT']:
|
||
return jsonify({
|
||
"status": 400,
|
||
"msg": "routeType参数必须是IN或OUT",
|
||
"data": None
|
||
}), 400
|
||
|
||
# 查找航班路由参数
|
||
aircraft_params = aircraft_route_params.get(flight_no)
|
||
if not aircraft_params:
|
||
return jsonify({
|
||
"status": 404,
|
||
"msg": f"未找到航班 {flight_no} 的路由参数",
|
||
"data": None
|
||
}), 404
|
||
|
||
# 根据路由类型返回对应参数
|
||
if route_type == 'IN':
|
||
route_params = aircraft_params.get('arrival')
|
||
if not route_params:
|
||
return jsonify({
|
||
"status": 404,
|
||
"msg": f"未找到航班 {flight_no} 的进港路由参数",
|
||
"data": None
|
||
}), 404
|
||
|
||
logger.info(f"进港路由参数查询: flightNo={flight_no}, params={route_params}")
|
||
return jsonify({
|
||
"status": 200,
|
||
"msg": "进港路由参数查询成功",
|
||
"data": {
|
||
"flightNo": flight_no,
|
||
"routeType": "IN",
|
||
"inRunway": route_params["inRunway"],
|
||
"outRunway": route_params["outRunway"],
|
||
"contactCross": route_params["contactCross"],
|
||
"seat": route_params["seat"],
|
||
"timestamp": int(time.time() * 1000)
|
||
}
|
||
})
|
||
|
||
else: # OUT
|
||
route_params = aircraft_params.get('departure')
|
||
if not route_params:
|
||
return jsonify({
|
||
"status": 404,
|
||
"msg": f"未找到航班 {flight_no} 的出港路由参数",
|
||
"data": None
|
||
}), 404
|
||
|
||
logger.info(f"出港路由参数查询: flightNo={flight_no}, params={route_params}")
|
||
return jsonify({
|
||
"status": 200,
|
||
"msg": "出港路由参数查询成功",
|
||
"data": {
|
||
"flightNo": flight_no,
|
||
"routeType": "OUT",
|
||
"inRunway": route_params["inRunway"],
|
||
"outRunway": route_params["outRunway"],
|
||
"startSeat": route_params["startSeat"],
|
||
"timestamp": int(time.time() * 1000)
|
||
}
|
||
})
|
||
|
||
|
||
# 航空器路由数据 - 每架飞机使用不同的路由
|
||
aircraft_routes = {
|
||
|
||
# CA1234的路由 - 简化的出港路由(17号跑道到201机位区域)
|
||
"CA1234": {
|
||
"departure": {
|
||
"type": "OUT",
|
||
"status": "COMPLETE",
|
||
"codes": "201,A2,17",
|
||
"geometry": None,
|
||
"coordinateSystem": "WGS84",
|
||
"geoPath": {
|
||
"type": "FeatureCollection",
|
||
"features": [
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[120.08508640012495, 36.36182498963186],
|
||
[120.08090199425916, 36.362577136200784],
|
||
[120.08077110864791, 36.36212595046586],
|
||
[120.08004241896676, 36.36188154498729],
|
||
[120.07859116299593, 36.365372400901556],
|
||
[120.07757535108954, 36.36533179518197],
|
||
[120.07649732717964, 36.36814042245338],
|
||
[120.07441715579117, 36.36757367925402]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": "A2"
|
||
}
|
||
}
|
||
]
|
||
}
|
||
}
|
||
},
|
||
|
||
# MU5123的路由 - 真实完整进港路由(35号跑道经F1滑行道到138机位)
|
||
"MU5123": {
|
||
"arrival": {
|
||
"type": "IN",
|
||
"status": "COMPLETE",
|
||
"codes": "F1,L4,138",
|
||
"geometry": None,
|
||
"coordinateSystem": "CGCS2000",
|
||
"geoPath": {
|
||
"type": "FeatureCollection",
|
||
"features": [
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.050742275893088E7, 4026164.644604296],
|
||
[4.050742342874898E7, 4026162.545793306]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": "L4"
|
||
}
|
||
},
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.050743615407222E7, 4026122.672208275],
|
||
[4.050743684026714E7, 4026120.146600441],
|
||
[4.050743730372977E7, 4026117.570797326],
|
||
[4.050743754093282E7, 4026114.964402468],
|
||
[4.050743757419489E7, 4026113.602043673],
|
||
[4.050743755007106E7, 4026112.347252104],
|
||
[4.050743733107493E7, 4026109.739264329],
|
||
[4.050743688561112E7, 4026107.160287504]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": "L4"
|
||
}
|
||
},
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.050717462298063E7, 4026091.904402129],
|
||
[4.050716820216861E7, 4026089.855066455]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": "F1"
|
||
}
|
||
},
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.050722536188381E7, 4026108.097315812],
|
||
[4.050720821283463E7, 4026102.624334418]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": "F1"
|
||
}
|
||
},
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.050727144214725E7, 4026112.527790001],
|
||
[4.050726278505515E7, 4026114.415332655]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": "F1"
|
||
}
|
||
},
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.050731882638656E7, 4026102.196402456],
|
||
[4.050727312768086E7, 4026112.160285922]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": "F1"
|
||
}
|
||
},
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.050738651815705E7, 4026087.437277401],
|
||
[4.050734647450486E7, 4026096.168165339]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": "F1"
|
||
}
|
||
},
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.050714461981621E7, 4026082.328947974],
|
||
[4.05071119278174E7, 4026071.895744022]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": "F1"
|
||
}
|
||
},
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.050734647450486E7, 4026096.168165339],
|
||
[4.050733913391775E7, 4026097.768664928]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": "F1"
|
||
}
|
||
},
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.050689454491971E7, 4026002.519737061],
|
||
[4.050693265139649E7, 4026014.681113256],
|
||
[4.050697075787329E7, 4026026.842489458]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": "F1"
|
||
}
|
||
},
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.050741162298967E7, 4026083.825606086],
|
||
[4.050741416963529E7, 4026084.285112275],
|
||
[4.050741669524226E7, 4026084.971307588],
|
||
[4.050741915143272E7, 4026085.875012957],
|
||
[4.050742151951354E7, 4026086.989350639],
|
||
[4.050742378146222E7, 4026088.305839852],
|
||
[4.050742592006397E7, 4026089.814461317],
|
||
[4.050742791904272E7, 4026091.503733515],
|
||
[4.050742976318505E7, 4026093.360800063],
|
||
[4.050743143845592E7, 4026095.371527565],
|
||
[4.050743293210549E7, 4026097.52061317],
|
||
[4.050743423276621E7, 4026099.791701039],
|
||
[4.050743533053925E7, 4026102.167506821],
|
||
[4.05074362170699E7, 4026104.629949201],
|
||
[4.050743683431807E7, 4026106.966150228],
|
||
[4.050743688561112E7, 4026107.160287504]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": ""
|
||
}
|
||
},
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.050697552118288E7, 4026028.362661481],
|
||
[4.050697075787329E7, 4026026.842489458]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": "F1"
|
||
}
|
||
},
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.050704221346159E7, 4026049.646966941],
|
||
[4.050703137036284E7, 4026046.18647901]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": "F1"
|
||
}
|
||
},
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.050708746004742E7, 4026064.087051627],
|
||
[4.050704840096232E7, 4026051.621473066]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": "F1"
|
||
}
|
||
},
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.05071119278174E7, 4026071.895744022],
|
||
[4.050710556055213E7, 4026069.863682419]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": "F1"
|
||
}
|
||
},
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.050741939071107E7, 4026175.198599438],
|
||
[4.05074216811156E7, 4026168.021835575],
|
||
[4.050742275893088E7, 4026164.644604296]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": "L4"
|
||
}
|
||
},
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.050753774654577E7, 4026246.448261945],
|
||
[4.050749515081406E7, 4026236.848849251],
|
||
[4.050744870329395E7, 4026226.381394062]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": "138"
|
||
}
|
||
},
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.050753774654577E7, 4026246.448261945],
|
||
[4.0507613391983E7, 4026263.495786141],
|
||
[4.05076192451935E7, 4026264.814870958],
|
||
[4.050762119626365E7, 4026265.254565894]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": "138"
|
||
}
|
||
},
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.050742342874898E7, 4026162.545793306],
|
||
[4.050743615407222E7, 4026122.672208275]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": "L4"
|
||
}
|
||
},
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.050716820216861E7, 4026089.855066455],
|
||
[4.050714461981621E7, 4026082.328947974]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": "F1"
|
||
}
|
||
},
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.050720821283463E7, 4026102.624334418],
|
||
[4.050717462298063E7, 4026091.904402129]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": "F1"
|
||
}
|
||
},
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.050726278505515E7, 4026114.415332655],
|
||
[4.050725934642086E7, 4026115.009285077],
|
||
[4.050725586910526E7, 4026115.301280484],
|
||
[4.050725237957282E7, 4026115.289096617],
|
||
[4.050724890438099E7, 4026114.9728262],
|
||
[4.050724546997807E7, 4026114.354876244],
|
||
[4.050724210250195E7, 4026113.43994972],
|
||
[4.050722536188381E7, 4026108.097315812]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": "F1"
|
||
}
|
||
},
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.050727312768086E7, 4026112.160285922],
|
||
[4.050727144214725E7, 4026112.527790001]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": "F1"
|
||
}
|
||
},
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.050733913391775E7, 4026097.768664928],
|
||
[4.050731882638656E7, 4026102.196402456]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": "F1"
|
||
}
|
||
},
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.05074011833275E7, 4026084.239767811],
|
||
[4.050738651815705E7, 4026087.437277401]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": "F1"
|
||
}
|
||
},
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.05074011833275E7, 4026084.239767811],
|
||
[4.050740376230329E7, 4026083.794303922],
|
||
[4.050740637029002E7, 4026083.5753078],
|
||
[4.050740898743934E7, 4026083.584446135],
|
||
[4.050741162298967E7, 4026083.825606086]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": "F1"
|
||
}
|
||
},
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.050744870329395E7, 4026226.381394062],
|
||
[4.050744533581797E7, 4026225.466467002],
|
||
[4.050744206089737E7, 4026224.261526533],
|
||
[4.050743890345625E7, 4026222.775742978],
|
||
[4.050743588752466E7, 4026221.020424048],
|
||
[4.050743303605565E7, 4026219.00892878],
|
||
[4.050743037075064E7, 4026216.756565868],
|
||
[4.050742791189419E7, 4026214.280477153],
|
||
[4.050742567819968E7, 4026211.599507165],
|
||
[4.050742368666689E7, 4026208.734059705],
|
||
[4.050742195245258E7, 4026205.705942559],
|
||
[4.050742048875517E7, 4026202.538201526],
|
||
[4.050741930671428E7, 4026199.254945029],
|
||
[4.050741841532595E7, 4026195.88116063],
|
||
[4.050741782137419E7, 4026192.442524868],
|
||
[4.050741752937933E7, 4026188.965207836],
|
||
[4.050741754156362E7, 4026185.475674017],
|
||
[4.050741785783435E7, 4026182.000480871],
|
||
[4.050741847578449E7, 4026178.566076715],
|
||
[4.050741939071107E7, 4026175.198599438]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": ""
|
||
}
|
||
},
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.050684675534101E7, 4025987.268076611],
|
||
[4.050685643844293E7, 4025990.358360866],
|
||
[4.050689454491971E7, 4026002.519737061]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": "F1"
|
||
}
|
||
},
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.050703137036284E7, 4026046.18647901],
|
||
[4.05070295640735E7, 4026045.610016264],
|
||
[4.050701981996215E7, 4026042.500261312],
|
||
[4.050697623399237E7, 4026028.590148907],
|
||
[4.050697552118288E7, 4026028.362661481]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": "F1"
|
||
}
|
||
},
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.050704840096232E7, 4026051.621473066],
|
||
[4.050704221346159E7, 4026049.646966941]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": "F1"
|
||
}
|
||
},
|
||
{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [
|
||
[4.050710556055213E7, 4026069.863682419],
|
||
[4.050708746004742E7, 4026064.087051627]
|
||
]
|
||
},
|
||
"properties": {
|
||
"code": "F1"
|
||
}
|
||
}
|
||
]
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
# 飞机数据将在路径跟随器创建后初始化
|
||
aircraft_data = []
|
||
|
||
# 车辆数据,根据 route.md 配置
|
||
DEFAULT_VEHICLE_SPEED = 30.0 # km/h,默认速度
|
||
EMERGENCY_BRAKE_DECELERATION = 0.8 # 紧急制动减速度 (每次更新减速 80%)
|
||
NORMAL_BRAKE_DECELERATION = 0.2 # 正常制动减速度 (每次更新减速 20%)
|
||
|
||
vehicle_data = [
|
||
{
|
||
"vehicleNo": "鲁B123", # 特勤车,根据route.md更新
|
||
"longitude": SPECIAL_VEHICLE_START["longitude"],
|
||
"latitude": SPECIAL_VEHICLE_START["latitude"],
|
||
"time": int(time.time() * 1000),
|
||
"direction": 0.0, # 方向角
|
||
"speed": 30.0, # 根据route.md设置为30km/h
|
||
"start_point": SPECIAL_VEHICLE_START, # 起点
|
||
"end_point": SPECIAL_VEHICLE_END, # 终点
|
||
"moving_to_end": True # 当前是否向终点移动
|
||
},
|
||
{
|
||
"vehicleNo": "鲁B234", # 普通车,根据route.md更新
|
||
"longitude": NORMAL_VEHICLE_START["longitude"],
|
||
"latitude": NORMAL_VEHICLE_START["latitude"],
|
||
"time": int(time.time() * 1000),
|
||
"direction": 180.0, # 根据route.md设置为180度
|
||
"speed": 30.0, # 根据route.md设置为30km/h
|
||
"start_point": NORMAL_VEHICLE_START, # 起点
|
||
"end_point": NORMAL_VEHICLE_END, # 终点
|
||
"moving_to_end": True # 当前是否向终点移动
|
||
}
|
||
]
|
||
|
||
# 车辆分类
|
||
airport_vehicle_data = [v for v in vehicle_data if v["vehicleNo"] in ["鲁B123", "鲁B234"]] # 特勤车和普通车
|
||
|
||
# 添加车辆状态类
|
||
@final
|
||
class VehicleState:
|
||
vehicle_no: str
|
||
is_running: bool
|
||
current_command: str | None
|
||
command_priority: int
|
||
target_speed: float
|
||
brake_mode: Literal['emergency', 'normal'] | None
|
||
command_reason: str | None
|
||
last_command_time: float
|
||
traffic_light_state: str | None
|
||
target_lat: float | None
|
||
target_lon: float | None
|
||
is_traffic_light_stop: bool
|
||
|
||
def __init__(self, vehicle_no: str) -> None:
|
||
self.vehicle_no = vehicle_no
|
||
self.is_running = True # 运行状态
|
||
self.current_command = None # 当前执行的指令
|
||
self.command_priority = 0 # 当前指令优先级
|
||
self.target_speed = float(DEFAULT_VEHICLE_SPEED) # 目标速度
|
||
self.brake_mode = None # 制动模式:'emergency' 或 'normal'
|
||
self.command_reason = None # 指令原因
|
||
self.last_command_time = float(time.time()) # 最后一次指令时间
|
||
self.traffic_light_state = None # 当前红绿灯状态
|
||
self.target_lat = None # 目标纬度
|
||
self.target_lon = None # 目标经度
|
||
self.is_traffic_light_stop = False # 是否因红灯停车
|
||
self.remoteControlActive = False # 远程控制是否激活
|
||
|
||
def can_be_overridden_by(self, command_type: str) -> bool:
|
||
"""判断当前指令是否可以被新指令覆盖"""
|
||
# 红绿灯信号不参与指令优先级判断
|
||
if command_type in ["RED", "GREEN", "YELLOW"]:
|
||
return True
|
||
|
||
# 类型安全获取优先级,避免 Unknown | None 作为 dict key
|
||
new_priority = COMMAND_PRIORITIES.get(str(command_type), 0)
|
||
current_key = self.current_command if isinstance(self.current_command, str) else None
|
||
current_priority = COMMAND_PRIORITIES.get(current_key or "", 0)
|
||
|
||
# 相同类型的指令可以覆盖(因为需要持续发送)
|
||
if isinstance(self.current_command, str) and command_type == self.current_command:
|
||
return True
|
||
|
||
# ALERT 指令可以被 RESUME 解除
|
||
if self.current_command == "ALERT":
|
||
return command_type == "RESUME"
|
||
|
||
# WARNING 指令可以被 RESUME 或相同及更高优先级指令覆盖
|
||
if self.current_command == "WARNING":
|
||
return command_type == "RESUME" or new_priority >= current_priority
|
||
|
||
# 其他情况下,相同或更高优先级可以覆盖
|
||
return new_priority >= current_priority
|
||
|
||
def update_command(self, command_type: str, target_lat: float | None = None, target_lon: float | None = None) -> None:
|
||
"""更新指令状态"""
|
||
# 更新目标位置
|
||
if target_lat is not None and target_lon is not None:
|
||
self.target_lat = target_lat
|
||
self.target_lon = target_lon
|
||
|
||
# 如果是红绿灯状态,只更新状态,不影响其他指令
|
||
if command_type in ["RED", "GREEN", "YELLOW"]:
|
||
old_state = self.traffic_light_state
|
||
self.traffic_light_state = command_type
|
||
|
||
# 更新红灯停车状态
|
||
if command_type == "RED":
|
||
self.is_traffic_light_stop = True
|
||
elif command_type in ["GREEN", "YELLOW"]:
|
||
self.is_traffic_light_stop = False
|
||
|
||
print(f"车辆 {self.vehicle_no} 收到红绿灯信号: {command_type} (原状态: {old_state})")
|
||
else:
|
||
# 其他指令正常更新
|
||
self.current_command = command_type
|
||
self.command_priority = COMMAND_PRIORITIES.get(command_type, 0)
|
||
|
||
# RESUME 指令不清除红绿灯状态,红绿灯状态由信号灯独立控制
|
||
if command_type == "RESUME":
|
||
# 清除其他状态
|
||
self.brake_mode = None
|
||
self.command_reason = None
|
||
|
||
# 更新运行状态
|
||
self.is_running = self.can_move()
|
||
print(f"更新车辆 {self.vehicle_no} 状态: command={self.current_command}, traffic_light={self.traffic_light_state}, priority={self.command_priority}, is_running={self.is_running}")
|
||
|
||
def can_move(self) -> bool:
|
||
"""检查车辆是否可以移动"""
|
||
# 如果有告警指令,不能移动
|
||
if self.current_command == "ALERT":
|
||
return False
|
||
|
||
# 如果有预警指令,不能移动
|
||
if self.current_command == "WARNING":
|
||
return False
|
||
|
||
# 如果有安全停靠指令,不能移动
|
||
if self.current_command == "PARKING":
|
||
return False
|
||
|
||
# 如果是红灯停车状态,不能移动
|
||
if self.is_traffic_light_stop:
|
||
return False
|
||
|
||
# 其他情况可以移动
|
||
return True
|
||
|
||
def log_state(self) -> None:
|
||
"""记录当前车辆状态"""
|
||
print(f"""
|
||
车辆状态:
|
||
- 车辆编号: {self.vehicle_no}
|
||
- 运行状态: {'运行中' if self.is_running else '已停止'}
|
||
- 当前指令: {self.current_command}
|
||
- 红绿灯状态: {self.traffic_light_state}
|
||
- 红灯停车: {'是' if self.is_traffic_light_stop else '否'}
|
||
- 指令优先级: {self.command_priority}
|
||
- 目标速度: {self.target_speed}
|
||
- 制动模式: {self.brake_mode}
|
||
- 指令原因: {self.command_reason}
|
||
- 目标位置: ({self.target_lat}, {self.target_lon})
|
||
""")
|
||
|
||
# 添加车辆状态管理
|
||
vehicle_states = {}
|
||
for vehicle in airport_vehicle_data:
|
||
vehicle_states[str(vehicle["vehicleNo"])] = VehicleState(str(vehicle["vehicleNo"]))
|
||
|
||
# 坐标转换系统 - 使用pyproj进行精确转换
|
||
class AirportCoordinateSystem:
|
||
"""机场坐标转换系统,使用pyproj进行精确的CGCS2000横轴墨卡托投影转换"""
|
||
|
||
def __init__(self):
|
||
# 导入pyproj库
|
||
try:
|
||
from pyproj import CRS, Transformer
|
||
import warnings
|
||
warnings.filterwarnings('ignore', category=UserWarning, module='pyproj')
|
||
|
||
# 定义CGCS2000坐标系(横轴墨卡托投影,120°E中央经线)
|
||
cgcs2000_proj4 = '+proj=tmerc +ellps=GRS80 +lon_0=120 +x_0=40500000 +y_0=0 +k_0=1.0 +units=m +no_defs'
|
||
self.cgcs2000_crs = CRS.from_proj4(cgcs2000_proj4)
|
||
self.wgs84_crs = CRS.from_epsg(4326) # WGS84
|
||
|
||
# 创建转换器
|
||
self.transformer_to_wgs84 = Transformer.from_crs(self.cgcs2000_crs, self.wgs84_crs, always_xy=True)
|
||
self.transformer_to_cgcs2000 = Transformer.from_crs(self.wgs84_crs, self.cgcs2000_crs, always_xy=True)
|
||
|
||
# 机场中心坐标(基于CGCS2000投影坐标)
|
||
self.utm_origin_x = 40507423 # 机场中心的CGCS2000 X坐标
|
||
self.utm_origin_y = 4026164 # 机场中心的CGCS2000 Y坐标
|
||
|
||
# 计算机场中心的WGS84坐标
|
||
center_lon, center_lat = self.transformer_to_wgs84.transform(self.utm_origin_x, self.utm_origin_y)
|
||
self.center_lon = center_lon
|
||
self.center_lat = center_lat
|
||
|
||
print(f"机场中心CGCS2000坐标: X={self.utm_origin_x}, Y={self.utm_origin_y}")
|
||
print(f"机场中心WGS84坐标: lon={self.center_lon:.10f}, lat={self.center_lat:.10f}")
|
||
|
||
except ImportError:
|
||
print("警告: pyproj库未安装,将回退到简化转换算法")
|
||
self.use_pyproj = False
|
||
# 回退到原来的参数
|
||
self.center_lon = 120.08782536
|
||
self.center_lat = 36.36236547
|
||
self.utm_origin_x = 40507423
|
||
self.utm_origin_y = 4026164
|
||
self.meters_per_degree_lon = 89932
|
||
self.meters_per_degree_lat = 111320
|
||
except Exception as e:
|
||
print(f"pyproj初始化失败: {str(e)},将回退到简化转换算法")
|
||
self.use_pyproj = False
|
||
else:
|
||
self.use_pyproj = True
|
||
|
||
def cgcs2000_to_wgs84(self, x: float, y: float) -> tuple[float, float]:
|
||
"""
|
||
将CGCS2000投影坐标转换为WGS84地理坐标
|
||
使用pyproj进行精确转换
|
||
|
||
Args:
|
||
x: CGCS2000 X坐标(东向)
|
||
y: CGCS2000 Y坐标(北向)
|
||
|
||
Returns:
|
||
tuple: (纬度, 经度) WGS84坐标
|
||
"""
|
||
try:
|
||
if self.use_pyproj:
|
||
# 使用pyproj进行精确转换
|
||
longitude, latitude = self.transformer_to_wgs84.transform(float(x), float(y))
|
||
return latitude, longitude
|
||
else:
|
||
# 回退到简化算法
|
||
return self._fallback_cgcs2000_to_wgs84(x, y)
|
||
except Exception as e:
|
||
print(f"坐标转换错误: {str(e)}, x={x}, y={y}")
|
||
return self.center_lat, self.center_lon
|
||
|
||
def wgs84_to_cgcs2000(self, latitude: float, longitude: float) -> tuple[float, float]:
|
||
"""
|
||
将WGS84地理坐标转换为CGCS2000投影坐标
|
||
使用pyproj进行精确转换
|
||
|
||
Args:
|
||
latitude: WGS84纬度
|
||
longitude: WGS84经度
|
||
|
||
Returns:
|
||
tuple: (CGCS2000 X坐标, CGCS2000 Y坐标)
|
||
"""
|
||
try:
|
||
if self.use_pyproj:
|
||
# 使用pyproj进行精确转换
|
||
x, y = self.transformer_to_cgcs2000.transform(float(longitude), float(latitude))
|
||
return x, y
|
||
else:
|
||
# 回退到简化算法
|
||
return self._fallback_wgs84_to_cgcs2000(latitude, longitude)
|
||
except Exception as e:
|
||
print(f"WGS84到CGCS2000坐标转换错误: {str(e)}, lat={latitude}, lon={longitude}")
|
||
return self.utm_origin_x, self.utm_origin_y
|
||
|
||
def _fallback_cgcs2000_to_wgs84(self, x: float, y: float) -> tuple[float, float]:
|
||
"""回退的简化转换算法"""
|
||
offset_x = float(x) - self.utm_origin_x
|
||
offset_y = float(y) - self.utm_origin_y
|
||
lon_offset = offset_x / self.meters_per_degree_lon
|
||
lat_offset = offset_y / self.meters_per_degree_lat
|
||
longitude = self.center_lon + lon_offset
|
||
latitude = self.center_lat + lat_offset
|
||
return latitude, longitude
|
||
|
||
def _fallback_wgs84_to_cgcs2000(self, latitude: float, longitude: float) -> tuple[float, float]:
|
||
"""回退的简化转换算法"""
|
||
lon_offset = float(longitude) - self.center_lon
|
||
lat_offset = float(latitude) - self.center_lat
|
||
offset_x = lon_offset * self.meters_per_degree_lon
|
||
offset_y = lat_offset * self.meters_per_degree_lat
|
||
x = self.utm_origin_x + offset_x
|
||
y = self.utm_origin_y + offset_y
|
||
return x, y
|
||
|
||
def batch_convert_coordinates(self, coordinates: list[list[float]]) -> list[tuple[float, float]]:
|
||
"""批量转换坐标列表"""
|
||
converted = []
|
||
for coord in coordinates:
|
||
if len(coord) >= 2:
|
||
lat, lon = self.cgcs2000_to_wgs84(coord[0], coord[1])
|
||
converted.append((lat, lon))
|
||
return converted
|
||
|
||
# 全局坐标转换系统实例
|
||
coordinate_system = AirportCoordinateSystem()
|
||
|
||
def convert_route_to_cgcs2000(route_data: dict) -> dict:
|
||
"""
|
||
将路由数据转换为CGCS2000坐标系格式
|
||
|
||
Args:
|
||
route_data: 原始路由数据
|
||
|
||
Returns:
|
||
dict: 转换后的路由数据(统一使用CGCS2000坐标系)
|
||
"""
|
||
import copy
|
||
|
||
# 深度复制路由数据,避免修改原始数据
|
||
converted_route = copy.deepcopy(route_data)
|
||
|
||
try:
|
||
# 检查当前坐标系类型
|
||
current_coordinate_system = route_data.get("coordinateSystem", "WGS84")
|
||
logger.info(f"转换路由数据坐标系: 当前={current_coordinate_system} -> 目标=CGCS2000")
|
||
|
||
# 如果已经是CGCS2000坐标系,直接返回
|
||
if current_coordinate_system == "CGCS2000":
|
||
logger.info("路由数据已经是CGCS2000坐标系,无需转换")
|
||
return converted_route
|
||
|
||
# 如果是WGS84坐标系,需要转换为CGCS2000
|
||
if current_coordinate_system == "WGS84":
|
||
geo_path = converted_route.get("geoPath", {})
|
||
features = geo_path.get("features", [])
|
||
|
||
logger.info(f"开始转换WGS84坐标到CGCS2000,共有 {len(features)} 个feature")
|
||
|
||
for feature in features:
|
||
geometry = feature.get("geometry", {})
|
||
if geometry.get("type") == "LineString":
|
||
coordinates = geometry.get("coordinates", [])
|
||
|
||
# 转换坐标点
|
||
converted_coordinates = []
|
||
for coord in coordinates:
|
||
if len(coord) >= 2:
|
||
# WGS84坐标格式:[longitude, latitude]
|
||
longitude, latitude = float(coord[0]), float(coord[1])
|
||
|
||
# 转换为CGCS2000坐标(X, Y)
|
||
x, y = coordinate_system.wgs84_to_cgcs2000(latitude, longitude)
|
||
converted_coordinates.append([x, y])
|
||
|
||
logger.debug(f"WGS84坐标转换: lon={longitude}, lat={latitude} -> CGCS2000: x={x}, y={y}")
|
||
|
||
# 更新坐标数据
|
||
geometry["coordinates"] = converted_coordinates
|
||
|
||
# 更新坐标系标识
|
||
converted_route["coordinateSystem"] = "CGCS2000"
|
||
|
||
logger.info(f"WGS84到CGCS2000坐标转换完成,共转换 {len(features)} 个feature")
|
||
|
||
return converted_route
|
||
|
||
except Exception as e:
|
||
logger.error(f"路由数据坐标转换失败: {str(e)}")
|
||
# 转换失败时返回原始数据,但确保坐标系标识为CGCS2000
|
||
converted_route["coordinateSystem"] = "CGCS2000"
|
||
return converted_route
|
||
|
||
def merge_discontinuous_route_for_flight(route_data: dict, flight_no: str) -> dict:
|
||
"""
|
||
使用Shapely自动拼接多个路径段为连续路径,并基于codes字段验证方向
|
||
"""
|
||
try:
|
||
from shapely.geometry import LineString, Point
|
||
from shapely.ops import linemerge
|
||
|
||
geo_path = route_data.get("geoPath", {})
|
||
features = geo_path.get("features", [])
|
||
|
||
if len(features) <= 1:
|
||
return route_data
|
||
|
||
# 解析codes字段获取正确的路径顺序
|
||
codes_str = route_data.get("codes", "")
|
||
if codes_str:
|
||
code_order = [code.strip() for code in codes_str.split(",")]
|
||
logger.info(f"航班 {flight_no} codes顺序: {code_order}")
|
||
else:
|
||
logger.warning(f"航班 {flight_no} 缺少codes字段,无法验证方向")
|
||
code_order = []
|
||
|
||
# 收集所有LineString路径段,并记录每段的code标识
|
||
lines = []
|
||
feature_codes = []
|
||
for feature in features:
|
||
geometry = feature.get("geometry", {})
|
||
if geometry.get("type") == "LineString":
|
||
coordinates = geometry.get("coordinates", [])
|
||
if len(coordinates) >= 2:
|
||
lines.append(LineString(coordinates))
|
||
code = feature.get("properties", {}).get("code", "")
|
||
feature_codes.append(code)
|
||
|
||
if not lines:
|
||
return route_data
|
||
|
||
# Shapely自动拼接
|
||
merged_geometry = linemerge(lines)
|
||
|
||
# 如果成功合并为单条路径
|
||
if isinstance(merged_geometry, LineString):
|
||
continuous_coords = list(merged_geometry.coords)
|
||
logger.info(f"✅ 航班 {flight_no} 合并为连续路径,包含 {len(continuous_coords)} 个坐标点")
|
||
|
||
# 基于codes字段验证和修正路径方向
|
||
if len(code_order) >= 2:
|
||
continuous_coords = _verify_and_correct_path_direction(
|
||
continuous_coords, features, code_order, flight_no
|
||
)
|
||
|
||
merged_route_data = route_data.copy()
|
||
merged_route_data["geoPath"]["features"] = [{
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": continuous_coords
|
||
},
|
||
"properties": {"code": "MERGED_PATH"}
|
||
}]
|
||
return merged_route_data
|
||
else:
|
||
# 无法合并就报错,不能保持原样掩盖问题
|
||
error_msg = f"❌ 航班 {flight_no} 路径无法合并为连续路径,存在不连续段"
|
||
logger.error(error_msg)
|
||
raise RuntimeError(error_msg)
|
||
|
||
except ImportError:
|
||
logger.warning(f"Shapely库未安装,跳过路径合并")
|
||
return route_data
|
||
except Exception as e:
|
||
logger.error(f"航班 {flight_no} 路径合并失败: {str(e)}")
|
||
return route_data
|
||
|
||
|
||
def _verify_and_correct_path_direction(continuous_coords: list, original_features: list, code_order: list, flight_no: str) -> list:
|
||
"""
|
||
简化版本:检查合并后路径端点所在的路径段code值来判断方向
|
||
"""
|
||
try:
|
||
if len(code_order) < 2:
|
||
return continuous_coords
|
||
|
||
# 找到端点最接近的路径段
|
||
def find_point_code(coord):
|
||
closest_code = None
|
||
min_distance = float('inf')
|
||
|
||
for feature in original_features:
|
||
geometry = feature.get("geometry", {})
|
||
code = feature.get("properties", {}).get("code", "")
|
||
if geometry.get("type") == "LineString" and code:
|
||
coordinates = geometry.get("coordinates", [])
|
||
# 计算到这个路径段的最小距离
|
||
for seg_coord in coordinates:
|
||
# 简单的欧几里得距离
|
||
distance = ((coord[0] - seg_coord[0]) ** 2 + (coord[1] - seg_coord[1]) ** 2) ** 0.5
|
||
if distance < min_distance:
|
||
min_distance = distance
|
||
closest_code = code
|
||
|
||
return closest_code
|
||
|
||
start_code = find_point_code(continuous_coords[0])
|
||
end_code = find_point_code(continuous_coords[-1])
|
||
|
||
logger.info(f"航班 {flight_no} 端点检查: 起点在{start_code}, 终点在{end_code}, 期望顺序{code_order}")
|
||
|
||
# 检查方向是否正确 - 只使用实际存在的路径段codes进行判断
|
||
expected_start = code_order[0] # F1
|
||
# 对于MU5123进港,应该从F1开始,但终点138不是路径段code,所以检查起点即可
|
||
|
||
logger.info(f"航班 {flight_no} 方向检查: 期望从{expected_start}开始, 实际起点{start_code}, 终点{end_code}")
|
||
|
||
# 通用路径方向检查逻辑
|
||
if start_code == expected_start:
|
||
logger.info(f"✅ 航班 {flight_no} 路径方向正确: 从{expected_start}开始")
|
||
else:
|
||
# 方向相反,需要反转
|
||
continuous_coords.reverse()
|
||
logger.info(f"✅ 航班 {flight_no} 路径方向已修正: 从{expected_start}开始 (原起点: {start_code})")
|
||
|
||
return continuous_coords
|
||
|
||
except Exception as e:
|
||
logger.error(f"航班 {flight_no} 路径方向验证失败: {str(e)}")
|
||
return continuous_coords
|
||
|
||
def validate_route_data_integrity(route_data: dict, flight_no: str) -> dict:
|
||
"""
|
||
验证路由数据的完整性和格式正确性
|
||
|
||
Args:
|
||
route_data: 包含geoPath的路由数据
|
||
flight_no: 航班号,用于日志
|
||
|
||
Returns:
|
||
dict: 原始路由数据(不做任何修改)
|
||
"""
|
||
try:
|
||
geo_path = route_data.get("geoPath", {})
|
||
features = geo_path.get("features", [])
|
||
|
||
if len(features) == 0:
|
||
logger.warning(f"⚠️ 航班 {flight_no} 路径数据为空,无路径段")
|
||
return route_data
|
||
|
||
logger.info(f"✅ 航班 {flight_no} 路径数据完整:包含 {len(features)} 个路径段")
|
||
|
||
# 验证每个路径段的数据完整性
|
||
valid_segments = 0
|
||
total_coordinates = 0
|
||
|
||
for i, feature in enumerate(features):
|
||
geometry = feature.get("geometry", {})
|
||
properties = feature.get("properties", {})
|
||
|
||
if geometry.get("type") == "LineString":
|
||
coordinates = geometry.get("coordinates", [])
|
||
code = properties.get("code", "")
|
||
|
||
if len(coordinates) >= 2:
|
||
valid_segments += 1
|
||
total_coordinates += len(coordinates)
|
||
logger.debug(f" 路径段 {i+1}: 代码={code}, 坐标点数={len(coordinates)} ✅")
|
||
else:
|
||
logger.warning(f" 路径段 {i+1}: 代码={code}, 坐标点数不足({len(coordinates)}) ⚠️")
|
||
else:
|
||
logger.warning(f" 路径段 {i+1}: 非LineString类型 ({geometry.get('type')}) ⚠️")
|
||
|
||
logger.info(f"✅ 航班 {flight_no} 数据验证完成:{valid_segments}/{len(features)} 有效路径段,共 {total_coordinates} 个坐标点")
|
||
|
||
# 验证坐标系信息
|
||
coordinate_system = route_data.get("coordinateSystem", "未知")
|
||
logger.info(f"✅ 航班 {flight_no} 坐标系:{coordinate_system}")
|
||
|
||
return route_data
|
||
|
||
except Exception as e:
|
||
logger.error(f"航班 {flight_no} 路径数据完整性验证异常: {str(e)}")
|
||
return route_data
|
||
|
||
def parse_route_path(route_data: dict) -> list[tuple[float, float]]:
|
||
"""
|
||
解析GeoJSON路由数据,转换为WGS84坐标点序列
|
||
|
||
Args:
|
||
route_data: 包含geoPath的路由数据
|
||
|
||
Returns:
|
||
list: WGS84坐标点列表 [(lat, lon), ...]
|
||
"""
|
||
path_points = []
|
||
|
||
try:
|
||
# 检查坐标系类型
|
||
coordinate_system_type = route_data.get("coordinateSystem", "CGCS2000")
|
||
logger.info(f"路由数据坐标系: {coordinate_system_type}")
|
||
|
||
geo_path = route_data.get("geoPath", {})
|
||
features = geo_path.get("features", [])
|
||
|
||
logger.info(f"解析路径,共有 {len(features)} 个feature")
|
||
|
||
for feature in features:
|
||
geometry = feature.get("geometry", {})
|
||
if geometry.get("type") == "LineString":
|
||
coordinates = geometry.get("coordinates", [])
|
||
|
||
# 根据坐标系类型处理坐标点
|
||
for coord in coordinates:
|
||
if len(coord) >= 2:
|
||
x, y = float(coord[0]), float(coord[1])
|
||
|
||
if coordinate_system_type == "WGS84":
|
||
# 如果已经是WGS84坐标,直接使用 (经度, 纬度)
|
||
lat, lon = y, x
|
||
logger.debug(f"WGS84坐标直接使用: lon={lon}, lat={lat}")
|
||
else:
|
||
# 如果是CGCS2000坐标,需要转换
|
||
lat, lon = coordinate_system.cgcs2000_to_wgs84(x, y)
|
||
logger.debug(f"CGCS2000坐标转换: {x},{y} -> lat={lat}, lon={lon}")
|
||
|
||
path_points.append((lat, lon))
|
||
|
||
logger.info(f"路径解析完成,坐标系={coordinate_system_type},共生成 {len(path_points)} 个坐标点")
|
||
|
||
# 暂时禁用过滤,保留所有路径点用于测试
|
||
logger.info(f"保留所有 {len(path_points)} 个路径点")
|
||
|
||
return path_points
|
||
|
||
except Exception as e:
|
||
logger.error(f"路径解析失败: {str(e)}")
|
||
return []
|
||
|
||
def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
|
||
"""使用Haversine公式计算两点间距离(米)"""
|
||
lat1_rad = math.radians(lat1)
|
||
lon1_rad = math.radians(lon1)
|
||
lat2_rad = math.radians(lat2)
|
||
lon2_rad = math.radians(lon2)
|
||
|
||
dlat = lat2_rad - lat1_rad
|
||
dlon = lon2_rad - lon1_rad
|
||
|
||
a = math.sin(dlat/2) * math.sin(dlat/2) + \
|
||
math.cos(lat1_rad) * math.cos(lat2_rad) * \
|
||
math.sin(dlon/2) * math.sin(dlon/2)
|
||
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
|
||
return EARTH_RADIUS * c
|
||
|
||
# 航空器路径跟随系统
|
||
class AircraftRouteFollower:
|
||
"""航空器路径跟随管理类"""
|
||
|
||
def __init__(self, flight_no: str):
|
||
self.flight_no = flight_no
|
||
self.current_route_type = None # 'arrival' or 'departure'
|
||
self.route_points = [] # 当前路径的坐标点序列
|
||
self.current_point_index = 0 # 当前目标路径点索引
|
||
self.route_progress = 0.0 # 路径完成进度 (0-1)
|
||
self.is_at_gate = False # 是否在机位停留
|
||
self.gate_position = None # 机位坐标
|
||
self._status_start_time = time.time() # 状态开始时间,用于简化状态管理
|
||
|
||
# 新增状态管理属性
|
||
self.flight_status = 'idle' # 飞机状态:'idle', 'in_route', 'at_gate', 'waiting'
|
||
self.wait_until = 0.0 # 等待结束时间戳
|
||
self.waiting_duration = 5.0 # 路径完成后的等待时间(秒)
|
||
|
||
# 解析航空器路由路径
|
||
self._parse_routes()
|
||
|
||
def _parse_routes(self):
|
||
"""解析进出港路径"""
|
||
try:
|
||
# 根据航班号获取对应的路由数据
|
||
flight_routes = aircraft_routes.get(self.flight_no)
|
||
if not flight_routes:
|
||
logger.warning(f"未找到航班 {self.flight_no} 的路由数据,使用MU5123的路由作为默认")
|
||
flight_routes = aircraft_routes.get("MU5123", {})
|
||
|
||
# 解析进港路径
|
||
arrival_route = flight_routes.get("arrival", {})
|
||
|
||
# 验证MU5123进港路径的数据完整性,然后合并为连续路径
|
||
if self.flight_no == "MU5123" and arrival_route:
|
||
logger.info(f"验证 {self.flight_no} 进港路径数据完整性")
|
||
arrival_route = validate_route_data_integrity(arrival_route, self.flight_no)
|
||
|
||
# 将31个路径段合并为连续路径,让飞机能够完整运行
|
||
logger.info(f"将 {self.flight_no} 的分散路径段合并为连续路径")
|
||
arrival_route = merge_discontinuous_route_for_flight(arrival_route, self.flight_no)
|
||
|
||
self.arrival_points = parse_route_path(arrival_route)
|
||
logger.info(f"航空器 {self.flight_no} 进港路径: {len(self.arrival_points)} 个点")
|
||
|
||
# 解析出港路径
|
||
departure_route = flight_routes.get("departure", {})
|
||
self.departure_points = parse_route_path(departure_route)
|
||
logger.info(f"航空器 {self.flight_no} 出港路径: {len(self.departure_points)} 个点")
|
||
|
||
# 设置机位坐标(138号机位的大概位置)
|
||
if self.arrival_points:
|
||
self.gate_position = self.arrival_points[-1] # 进港路径的终点作为机位
|
||
elif self.departure_points:
|
||
self.gate_position = self.departure_points[0] # 出港路径的起点作为机位
|
||
else:
|
||
# 默认机位坐标
|
||
self.gate_position = (36.354068, 120.083410)
|
||
|
||
except Exception as e:
|
||
logger.error(f"路径解析失败: {str(e)}")
|
||
self.arrival_points = []
|
||
self.departure_points = []
|
||
self.gate_position = (36.354068, 120.083410)
|
||
|
||
def start_route(self, route_type: str):
|
||
"""开始跟随指定类型的路径"""
|
||
if route_type == "arrival" and self.arrival_points:
|
||
self.current_route_type = "arrival"
|
||
self.route_points = self.arrival_points.copy()
|
||
self.current_point_index = 0
|
||
self.route_progress = 0.0
|
||
self.is_at_gate = False
|
||
self.flight_status = 'in_route' # 设置为路径运行状态
|
||
self.wait_until = 0.0 # 清除等待时间
|
||
logger.info(f"航空器 {self.flight_no} 开始进港路径跟随")
|
||
|
||
elif route_type == "departure" and self.departure_points:
|
||
self.current_route_type = "departure"
|
||
self.route_points = self.departure_points.copy()
|
||
self.current_point_index = 0
|
||
self.route_progress = 0.0
|
||
self.is_at_gate = False
|
||
self.flight_status = 'in_route' # 设置为路径运行状态
|
||
self.wait_until = 0.0 # 清除等待时间
|
||
logger.info(f"航空器 {self.flight_no} 开始出港路径跟随")
|
||
|
||
else:
|
||
logger.warning(f"航空器 {self.flight_no} 无效的路径类型: {route_type}")
|
||
|
||
def update_position_on_route(self, aircraft: dict[str, Any], speed_kmh: float, elapsed_time: float) -> bool:
|
||
"""
|
||
沿路径更新航空器位置
|
||
|
||
Args:
|
||
aircraft: 航空器数据字典
|
||
speed_kmh: 速度 (km/h)
|
||
elapsed_time: 经过时间 (秒)
|
||
|
||
Returns:
|
||
bool: 是否到达路径终点
|
||
"""
|
||
current_time = time.time()
|
||
|
||
# 检查是否在等待期
|
||
if current_time < self.wait_until:
|
||
self.flight_status = 'waiting'
|
||
return False # 还在等待中,不更新位置
|
||
|
||
if not self.route_points or self.current_point_index >= len(self.route_points):
|
||
return True # 路径已完成
|
||
|
||
# 设置状态为路径运行中
|
||
self.flight_status = 'in_route'
|
||
|
||
# 获取当前位置
|
||
current_lat = to_float_safe(aircraft.get("latitude", 0))
|
||
current_lon = to_float_safe(aircraft.get("longitude", 0))
|
||
if current_lat is None or current_lon is None:
|
||
return False
|
||
|
||
# 获取目标路径点
|
||
target_lat, target_lon = self.route_points[self.current_point_index]
|
||
|
||
# 计算到目标点的距离
|
||
distance_to_target = haversine_distance(current_lat, current_lon, target_lat, target_lon)
|
||
|
||
# 计算移动距离
|
||
speed_mps = speed_kmh * 1000.0 / 3600.0 # 转换为米/秒
|
||
move_distance = speed_mps * elapsed_time
|
||
|
||
# 优化路径跟随:如果移动距离大于到目标点的距离,直接跳到下一个合适的点
|
||
remaining_distance = move_distance
|
||
|
||
while remaining_distance > 0 and self.current_point_index < len(self.route_points):
|
||
# 如果已经接近目标点或剩余移动距离大于目标距离,移动到下一个路径点
|
||
if distance_to_target <= remaining_distance:
|
||
# 移动到当前目标点
|
||
remaining_distance -= distance_to_target
|
||
self.current_point_index += 1
|
||
self.route_progress = min(1.0, self.current_point_index / max(1, len(self.route_points) - 1))
|
||
|
||
if self.current_point_index >= len(self.route_points):
|
||
# 路径完成,设置到真正的路径终点
|
||
final_lat, final_lon = self.route_points[-1] # 路径的最后一个点
|
||
aircraft["latitude"] = final_lat
|
||
aircraft["longitude"] = final_lon
|
||
self.is_at_gate = (self.current_route_type == "arrival")
|
||
self.flight_status = 'at_gate' if self.is_at_gate else 'completed'
|
||
# 设置5秒等待时间
|
||
self.wait_until = current_time + self.waiting_duration
|
||
logger.info(f"航空器 {self.flight_no} 完成 {self.current_route_type} 路径,等待{self.waiting_duration}秒后重新开始")
|
||
return True
|
||
|
||
# 更新到下一个目标点
|
||
current_lat, current_lon = target_lat, target_lon # 当前位置更新为刚到达的点
|
||
if self.current_point_index < len(self.route_points):
|
||
target_lat, target_lon = self.route_points[self.current_point_index]
|
||
distance_to_target = haversine_distance(current_lat, current_lon, target_lat, target_lon)
|
||
else:
|
||
# 已经超出路径范围,跳出循环
|
||
break
|
||
else:
|
||
# 不能完全到达目标点,按比例移动
|
||
break
|
||
|
||
# 计算最终位置
|
||
if remaining_distance > 0 and distance_to_target > 0:
|
||
# 按比例移动到目标点
|
||
move_ratio = remaining_distance / distance_to_target
|
||
|
||
# 线性插值计算新位置
|
||
new_lat = current_lat + (target_lat - current_lat) * move_ratio
|
||
new_lon = current_lon + (target_lon - current_lon) * move_ratio
|
||
|
||
# 更新航空器位置
|
||
aircraft["latitude"] = new_lat
|
||
aircraft["longitude"] = new_lon
|
||
else:
|
||
# 已到达当前目标点,更新位置
|
||
aircraft["latitude"] = current_lat
|
||
aircraft["longitude"] = current_lon
|
||
|
||
return False
|
||
|
||
def set_at_gate(self, aircraft: dict[str, Any]):
|
||
"""设置航空器在机位停留"""
|
||
if self.gate_position:
|
||
aircraft["latitude"] = self.gate_position[0]
|
||
aircraft["longitude"] = self.gate_position[1]
|
||
self.is_at_gate = True
|
||
|
||
# 多飞机路径跟随管理器
|
||
class AircraftRouteManager:
|
||
"""多架飞机路径跟随管理器"""
|
||
|
||
def __init__(self):
|
||
self.followers = {} # {flight_no: AircraftRouteFollower}
|
||
self.flight_configs = {} # 飞机配置信息
|
||
|
||
def add_aircraft(self, flight_no: str, config: dict):
|
||
"""添加飞机到路径跟随系统
|
||
|
||
Args:
|
||
flight_no: 航班号
|
||
config: 飞机配置 {
|
||
'primary_behavior': 'arrival'|'departure'|'cycle',
|
||
'speed': 30.0,
|
||
'cycle_timing': {...} # 仅用于cycle模式
|
||
}
|
||
"""
|
||
self.flight_configs[flight_no] = config
|
||
self.followers[flight_no] = AircraftRouteFollower(flight_no)
|
||
logger.info(f"已添加飞机 {flight_no} 到路径跟随系统,行为模式: {config.get('primary_behavior', 'cycle')}")
|
||
|
||
def update_aircraft_position(self, aircraft: dict[str, Any], elapsed_time: float):
|
||
"""统一的航空器位置更新入口"""
|
||
flight_no = aircraft.get("flightNo")
|
||
|
||
if flight_no not in self.followers:
|
||
logger.warning(f"飞机 {flight_no} 未注册到路径跟随系统")
|
||
return
|
||
|
||
follower = self.followers[flight_no]
|
||
config = self.flight_configs[flight_no]
|
||
behavior = config.get("primary_behavior", "cycle")
|
||
speed = config.get("speed", 30.0)
|
||
|
||
if behavior == "arrival":
|
||
# 纯进港模式
|
||
self._update_arrival_only_aircraft(aircraft, follower, speed, elapsed_time)
|
||
elif behavior == "departure":
|
||
# 纯出港模式
|
||
self._update_departure_only_aircraft(aircraft, follower, speed, elapsed_time)
|
||
|
||
# 更新时间戳
|
||
aircraft["time"] = int(time.time() * 1000)
|
||
|
||
|
||
def _update_arrival_only_aircraft(self, aircraft: dict[str, Any], follower: AircraftRouteFollower,
|
||
speed: float, elapsed_time: float):
|
||
"""更新纯进港模式飞机位置"""
|
||
current_time = time.time()
|
||
|
||
# 检查是否需要开始新路径(等待期结束后或初始状态)
|
||
if follower.flight_status in ['idle', 'completed', 'at_gate', 'waiting'] and current_time >= follower.wait_until:
|
||
follower.start_route("arrival")
|
||
# 设置到进港路径起点
|
||
if follower.route_points:
|
||
aircraft["latitude"] = follower.route_points[0][0]
|
||
aircraft["longitude"] = follower.route_points[0][1]
|
||
logger.info(f"航空器 {aircraft.get('flightNo')} 等待结束,重新开始进港路径")
|
||
|
||
route_completed = follower.update_position_on_route(aircraft, speed, elapsed_time)
|
||
if route_completed and follower.flight_status != 'waiting':
|
||
# 路径完成,已设置5秒等待时间,无需立即重新开始
|
||
logger.info(f"航空器 {aircraft.get('flightNo')} 进港完成,将等待{follower.waiting_duration}秒后重新开始")
|
||
|
||
def _update_departure_only_aircraft(self, aircraft: dict[str, Any], follower: AircraftRouteFollower,
|
||
speed: float, elapsed_time: float):
|
||
"""更新纯出港模式飞机位置"""
|
||
current_time = time.time()
|
||
|
||
# 检查是否需要开始新路径(等待期结束后或初始状态)
|
||
if follower.flight_status in ['idle', 'completed', 'at_gate', 'waiting'] and current_time >= follower.wait_until:
|
||
follower.start_route("departure")
|
||
# 设置到出港路径起点
|
||
if follower.route_points:
|
||
aircraft["latitude"] = follower.route_points[0][0]
|
||
aircraft["longitude"] = follower.route_points[0][1]
|
||
logger.info(f"航空器 {aircraft.get('flightNo')} 等待结束,重新开始出港路径")
|
||
|
||
route_completed = follower.update_position_on_route(aircraft, speed, elapsed_time)
|
||
if route_completed and follower.flight_status != 'waiting':
|
||
# 路径完成,已设置5秒等待时间,无需立即重新开始
|
||
logger.info(f"航空器 {aircraft.get('flightNo')} 出港完成,将等待{follower.waiting_duration}秒后重新开始")
|
||
|
||
|
||
# 创建全局路径跟随管理器
|
||
route_manager = AircraftRouteManager()
|
||
|
||
# 配置并添加飞机到路径跟随系统
|
||
# CA1234:纯出港模式,模拟出港飞机
|
||
route_manager.add_aircraft("CA1234", {
|
||
'primary_behavior': 'departure',
|
||
'speed': 50.0
|
||
})
|
||
|
||
# MU5123:纯进港模式,使用真实进港路由
|
||
route_manager.add_aircraft("MU5123", {
|
||
'primary_behavior': 'arrival',
|
||
'speed': 50.0 # 设置速度为50 km/h,约45秒完成一个周期
|
||
})
|
||
|
||
# 初始化飞机数据
|
||
def initialize_aircraft_data():
|
||
"""初始化航空器数据,设置合适的起始位置"""
|
||
|
||
# CA1234 - 出港模式,从机位开始(出港路径起点)
|
||
ca1234_follower = route_manager.followers["CA1234"]
|
||
ca1234_start_pos = ca1234_follower.departure_points[0] if ca1234_follower.departure_points else (36.354068, 120.083410)
|
||
|
||
# MU5123 - 进港模式,从跑道开始(进港路径起点)
|
||
mu5123_follower = route_manager.followers["MU5123"]
|
||
mu5123_start_pos = mu5123_follower.arrival_points[0] if mu5123_follower.arrival_points else (36.374179, 120.088076)
|
||
|
||
return [
|
||
{
|
||
"flightNo": "CA1234", # 出港模式
|
||
"longitude": ca1234_start_pos[1],
|
||
"latitude": ca1234_start_pos[0],
|
||
"time": int(time.time() * 1000),
|
||
"altitude": 0.0,
|
||
"trackNumber": 1001,
|
||
"speed": 50.0,
|
||
"use_route_following": True
|
||
},
|
||
{
|
||
"flightNo": "MU5123", # 进港模式
|
||
"longitude": mu5123_start_pos[1],
|
||
"latitude": mu5123_start_pos[0],
|
||
"time": int(time.time() * 1000),
|
||
"altitude": 0.0,
|
||
"trackNumber": 1002,
|
||
"speed": 50.0,
|
||
"use_route_following": True
|
||
}
|
||
]
|
||
|
||
# 初始化飞机数据
|
||
aircraft_data.extend(initialize_aircraft_data())
|
||
|
||
# 额外测试飞机数据(用于前端性能测试)
|
||
additional_aircraft_data = []
|
||
|
||
def initialize_additional_aircraft():
|
||
"""初始化 60 架测试飞机"""
|
||
global additional_aircraft_data
|
||
|
||
# 机场中心坐标(基于 CGCS2000 投影坐标)
|
||
airport_center_lat = 36.36236547
|
||
airport_center_lon = 120.08782536
|
||
|
||
# 机场覆盖范围(大约 2km x 2km 区域)
|
||
lat_range = 0.02 # 大约 2.2km
|
||
lon_range = 0.02 # 大约 1.8km
|
||
|
||
# 创建 60 架测试飞机
|
||
for i in range(1, 61):
|
||
# 随机分布在机场范围内
|
||
lat_offset = (random.random() - 0.5) * lat_range
|
||
lon_offset = (random.random() - 0.5) * lon_range
|
||
|
||
aircraft_lat = airport_center_lat + lat_offset
|
||
aircraft_lon = airport_center_lon + lon_offset
|
||
|
||
# 前 55 架静止,后 5 架移动
|
||
is_moving = i > 55
|
||
speed = random.uniform(20.0, 50.0) if is_moving else 0.0
|
||
|
||
# 为移动的飞机设置简单的往复移动路径
|
||
if is_moving:
|
||
# 在当前位置周围创建一个小范围的移动路径
|
||
move_radius = 0.001 # 大约 100 米范围
|
||
start_lat = aircraft_lat - move_radius
|
||
start_lon = aircraft_lon - move_radius
|
||
end_lat = aircraft_lat + move_radius
|
||
end_lon = aircraft_lon + move_radius
|
||
moving_to_end = True
|
||
else:
|
||
# 静止飞机使用当前位置作为起点和终点
|
||
start_lat = aircraft_lat
|
||
start_lon = aircraft_lon
|
||
end_lat = aircraft_lat
|
||
end_lon = aircraft_lon
|
||
moving_to_end = False
|
||
|
||
aircraft = {
|
||
"flightNo": f"PT{i:03d}", # 测试飞机航班号:PT001-PT060
|
||
"longitude": aircraft_lon,
|
||
"latitude": aircraft_lat,
|
||
"time": int(time.time() * 1000),
|
||
"altitude": 0.0,
|
||
"trackNumber": 2000 + i, # 跟踪号:2001-2060
|
||
"speed": speed,
|
||
"use_route_following": False, # 不使用复杂路径跟随
|
||
"start_point": {"latitude": start_lat, "longitude": start_lon},
|
||
"end_point": {"latitude": end_lat, "longitude": end_lon},
|
||
"moving_to_end": moving_to_end
|
||
}
|
||
|
||
additional_aircraft_data.append(aircraft)
|
||
|
||
# 统计信息
|
||
stationary_count = sum(1 for a in additional_aircraft_data if a["speed"] == 0)
|
||
moving_count = len(additional_aircraft_data) - stationary_count
|
||
|
||
print(f"✅ 已添加测试飞机:")
|
||
print(f" - 总飞机数: {len(additional_aircraft_data)}")
|
||
print(f" - 静止飞机: {stationary_count} 架")
|
||
print(f" - 移动飞机: {moving_count} 架")
|
||
print(f" - 移动速度: 20-50 km/h")
|
||
|
||
# 添加到主飞机数据列表
|
||
aircraft_data.extend(additional_aircraft_data)
|
||
|
||
# 初始化测试飞机
|
||
initialize_additional_aircraft()
|
||
|
||
from collections.abc import Mapping
|
||
|
||
def calculate_distance_to_target(vehicle: Mapping[str, float], target_lat: float, target_lon: float) -> float:
|
||
"""计算车辆到目标位的距离(米)"""
|
||
# 转换为弧度
|
||
lat1_rad = math.radians(vehicle["latitude"])
|
||
lon1_rad = math.radians(vehicle["longitude"])
|
||
lat2_rad = math.radians(target_lat)
|
||
lon2_rad = math.radians(target_lon)
|
||
|
||
dlat = lat2_rad - lat1_rad
|
||
dlon = lon2_rad - lon1_rad
|
||
|
||
# 使用 Haversine 公式计算距离
|
||
a = math.sin(dlat/2) * math.sin(dlat/2) + \
|
||
math.cos(lat1_rad) * math.cos(lat2_rad) * \
|
||
math.sin(dlon/2) * math.sin(dlon/2)
|
||
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
|
||
return EARTH_RADIUS * c
|
||
|
||
|
||
|
||
# 删除了复杂的红绿灯和路口计算函数,因为已简化为往复运动逻辑
|
||
|
||
def update_position_with_vector(
|
||
obj: dict[str, Any],
|
||
target_point: "Point",
|
||
start_point: "Point",
|
||
speed: float,
|
||
elapsed_time: float,
|
||
return_to_start: bool = False
|
||
) -> bool:
|
||
"""
|
||
基于向量的位置更新逻辑
|
||
|
||
Args:
|
||
obj: 需要更新位置的对象(航空器或车辆)
|
||
target_point: 目标点坐标 {"latitude": float, "longitude": float}
|
||
start_point: 起点坐标 {"latitude": float, "longitude": float}
|
||
speed: 移动速度(km/h)
|
||
elapsed_time: 经过的时间(秒)
|
||
return_to_start: 是否在到达目标点时返回起点
|
||
"""
|
||
# 检查是否在等待状态
|
||
if "wait_until" not in obj:
|
||
obj["wait_until"] = 0.0
|
||
|
||
current_time = time.time()
|
||
# 基于类型安全读取 wait_until,避免 float 与 object 比较
|
||
_wu = to_float_safe(obj.get("wait_until"))
|
||
wait_until = 0.0 if _wu is None else float(_wu)
|
||
if current_time < wait_until:
|
||
return False # 还在等待中,不更新位置
|
||
|
||
# 计算这一步要移动的距离(米)并转换为经纬度
|
||
speed_mps = float(speed) * 1000.0 / 3600.0
|
||
distance = speed_mps * float(elapsed_time)
|
||
|
||
# 安全读取并转换当前位置与目标点/起点经纬度为 float
|
||
cur_lat = to_float_safe(obj.get("latitude"))
|
||
cur_lon = to_float_safe(obj.get("longitude"))
|
||
tgt_lat = to_float_safe(target_point.get("latitude"))
|
||
tgt_lon = to_float_safe(target_point.get("longitude"))
|
||
if cur_lat is None or cur_lon is None or tgt_lat is None or tgt_lon is None:
|
||
# 无法解析经纬度,跳过更新
|
||
return False
|
||
|
||
# 将移动距离转换为经纬度变化量
|
||
move_dlat, move_dlon = meters_to_degrees(distance, cur_lat)
|
||
|
||
# 将5米的判断距离转换为经纬度(优化:从15米减少到5米,减少颤抖)
|
||
check_dlat, check_dlon = meters_to_degrees(5.0, cur_lat)
|
||
|
||
# 计算当前位置到终点的向量(经纬度空间)
|
||
vector_lat = float(tgt_lat) - float(cur_lat)
|
||
vector_lon = float(tgt_lon) - float(cur_lon)
|
||
|
||
# 计算向量长度(经纬度空间)
|
||
vector_length = math.sqrt(vector_lat * vector_lat + vector_lon * vector_lon)
|
||
|
||
# 检查是否到达终点
|
||
if vector_length <= math.sqrt(check_dlat * check_dlat + check_dlon * check_dlon):
|
||
# 精确设置到目标点位置,避免位置偏差
|
||
obj["latitude"] = float(tgt_lat)
|
||
obj["longitude"] = float(tgt_lon)
|
||
|
||
if return_to_start:
|
||
# 返回起点并设置等待时间
|
||
obj["wait_until"] = current_time + WAIT_TIME_AFTER_RETURN
|
||
print(f"对象 {obj.get('flightNo', obj.get('vehicleNo'))} 返回起点,等待{WAIT_TIME_AFTER_RETURN}秒后继续")
|
||
else:
|
||
# 设置短暂等待时间,避免立即切换方向导致的颤抖
|
||
obj["wait_until"] = current_time + 0.1 # 0.1秒的短暂等待
|
||
print(f"对象 {obj.get('flightNo', obj.get('vehicleNo'))} 到达目标点,等待0.1秒后切换方向")
|
||
return True # 表示已到达终点
|
||
else:
|
||
# 归一化方向向量
|
||
unit_lat = vector_lat / vector_length
|
||
unit_lon = vector_lon / vector_length
|
||
|
||
# 计算这一步的位置变化
|
||
dlat = move_dlat * unit_lat
|
||
dlon = move_dlon * unit_lon
|
||
|
||
# 更新位置(写回为 float)
|
||
obj["latitude"] = float(cur_lat + dlat)
|
||
obj["longitude"] = float(cur_lon + dlon)
|
||
return False # 表示正在移动中
|
||
|
||
def update_object_position(obj: dict[str, Any], elapsed_time: float) -> None:
|
||
"""统一的位置更新函数 - 用于飞机和车辆的往复运动"""
|
||
# 获取对象类型和标识
|
||
obj_type = "航空器" if "flightNo" in obj else "车辆"
|
||
obj_id = obj.get("flightNo", obj.get("vehicleNo"))
|
||
|
||
# 根据移动方向确定目标点
|
||
if obj["moving_to_end"]:
|
||
target_raw = obj["end_point"]
|
||
start_raw = obj["start_point"]
|
||
else:
|
||
target_raw = obj["start_point"]
|
||
start_raw = obj["end_point"]
|
||
|
||
# 构造类型安全的 Point
|
||
t_lat = to_float_safe(target_raw.get("latitude")) if isinstance(target_raw, dict) else None
|
||
t_lon = to_float_safe(target_raw.get("longitude")) if isinstance(target_raw, dict) else None
|
||
s_lat = to_float_safe(start_raw.get("latitude")) if isinstance(start_raw, dict) else None
|
||
s_lon = to_float_safe(start_raw.get("longitude")) if isinstance(start_raw, dict) else None
|
||
if t_lat is None or t_lon is None or s_lat is None or s_lon is None:
|
||
# 无法解析坐标,直接更新时间戳后返回
|
||
obj["time"] = int(time.time() * 1000)
|
||
return
|
||
|
||
target_point: Point = {"latitude": float(t_lat), "longitude": float(t_lon)}
|
||
start_point: Point = {"latitude": float(s_lat), "longitude": float(s_lon)}
|
||
|
||
# 读取速度为 float
|
||
spd = to_float_safe(obj.get("speed"))
|
||
if spd is None:
|
||
spd = 0.0
|
||
|
||
# 更新位置
|
||
reached_target = update_position_with_vector(
|
||
obj,
|
||
target_point,
|
||
start_point,
|
||
float(spd),
|
||
float(elapsed_time),
|
||
return_to_start=False
|
||
)
|
||
|
||
if reached_target:
|
||
# 切换移动方向
|
||
obj["moving_to_end"] = not obj["moving_to_end"]
|
||
target_name = "终点" if not obj["moving_to_end"] else "起点"
|
||
print(f"{obj_type} {obj_id} 到达{'终点' if obj['moving_to_end'] else '起点'},开始前往{target_name}")
|
||
# 注意:等待时间已经在update_position_with_vector中设置了,不需要重复设置
|
||
|
||
# 更新时间戳
|
||
obj["time"] = int(time.time() * 1000)
|
||
|
||
def update_aircraft_position(aircraft: dict[str, Any], elapsed_time: float) -> None:
|
||
"""更新航空器位置 - 统一使用路径跟随管理器"""
|
||
flight_no = aircraft.get("flightNo")
|
||
use_route_following = aircraft.get("use_route_following", False)
|
||
|
||
if use_route_following:
|
||
# 使用新的路径跟随管理器
|
||
route_manager.update_aircraft_position(aircraft, elapsed_time)
|
||
else:
|
||
# 保留旧的简单移动逻辑作为兜底
|
||
update_object_position(aircraft, elapsed_time)
|
||
|
||
def update_vehicle_position(vehicle: dict[str, Any], elapsed_time: float) -> None:
|
||
"""更新车辆位置 - 使用统一的位置更新函数"""
|
||
update_object_position(vehicle, elapsed_time)
|
||
|
||
# 简化的路口信息(保留API兼容性)
|
||
INTERSECTIONS = {
|
||
"TL001": {
|
||
"id": "INT001",
|
||
"name": "路口1",
|
||
"latitude": 36.369696,
|
||
"longitude": 120.083084
|
||
},
|
||
"TL002": {
|
||
"id": "INT002",
|
||
"name": "路口2",
|
||
"latitude": 36.365617,
|
||
"longitude": 120.084637
|
||
}
|
||
}
|
||
|
||
# T6路口定义
|
||
T6_INTERSECTION = {
|
||
"id": "T6",
|
||
"name": "T6路口",
|
||
"latitude": 36.372000,
|
||
"longitude": 120.085000
|
||
}
|
||
|
||
def generate_traffic_light_data() -> list[dict[str, Any]]:
|
||
"""生成红绿灯数据"""
|
||
traffic_light_data = []
|
||
|
||
# 生成两个固定路口的红绿灯数据
|
||
for tl_id, intersection in INTERSECTIONS.items():
|
||
signal = {
|
||
"id": tl_id,
|
||
"state": 1, # 0: RED, 1: GREEN,测试时保持绿灯
|
||
"timestamp": int(time.time() * 1000),
|
||
"intersection": intersection["id"],
|
||
"position": {
|
||
"latitude": intersection["latitude"],
|
||
"longitude": intersection["longitude"]
|
||
}
|
||
}
|
||
traffic_light_data.append(signal)
|
||
|
||
return traffic_light_data
|
||
|
||
# 红绿灯数据
|
||
traffic_light_data = generate_traffic_light_data()
|
||
|
||
# 分别记录航空器、机场车辆的上次更新时间
|
||
last_aircraft_update_time = time.time()
|
||
last_vehicle_update_time = time.time() # 机场车辆更新时间
|
||
last_light_switch_time = time.time()
|
||
|
||
# 后台更新管理器
|
||
class BackgroundUpdateManager:
|
||
"""独立的后台更新管理器,主动更新航空器状态和位置"""
|
||
|
||
def __init__(self):
|
||
self.running = False
|
||
self.update_thread = None
|
||
self.data_lock = threading.Lock() # 保护共享数据的锁
|
||
|
||
def start(self):
|
||
"""启动后台更新线程"""
|
||
if not self.running:
|
||
self.running = True
|
||
self.update_thread = threading.Thread(target=self._update_loop, daemon=True)
|
||
self.update_thread.start()
|
||
logger.info("后台更新管理器已启动")
|
||
|
||
def stop(self):
|
||
"""停止后台更新线程"""
|
||
if self.running:
|
||
self.running = False
|
||
if self.update_thread:
|
||
self.update_thread.join(timeout=2.0)
|
||
logger.info("后台更新管理器已停止")
|
||
|
||
def _update_loop(self):
|
||
"""主更新循环"""
|
||
while self.running:
|
||
try:
|
||
current_time = time.time()
|
||
|
||
with self.data_lock:
|
||
# 更新所有航空器位置
|
||
for aircraft in aircraft_data:
|
||
update_aircraft_position(aircraft, UPDATE_INTERVAL)
|
||
aircraft["time"] = int(current_time * 1000)
|
||
|
||
# 更新所有车辆位置
|
||
for vehicle in airport_vehicle_data:
|
||
update_vehicle_position(vehicle, UPDATE_INTERVAL)
|
||
vehicle["time"] = int(current_time * 1000)
|
||
|
||
# 休眠到下一个更新周期
|
||
time.sleep(UPDATE_INTERVAL)
|
||
|
||
except Exception as e:
|
||
logger.error(f"后台更新出错: {str(e)}")
|
||
time.sleep(1.0) # 出错时短暂休眠后继续
|
||
|
||
# 全局后台更新管理器实例
|
||
background_manager = BackgroundUpdateManager()
|
||
|
||
def check_auth() -> bool:
|
||
auth_header = request.headers.get('Authorization')
|
||
|
||
if not auth_header:
|
||
logger.warning("认证失败: 缺少Authorization头")
|
||
return False
|
||
|
||
# 直接比较完整的Authorization头
|
||
result = auth_header == AUTH_TOKEN
|
||
if not result:
|
||
logger.warning(f"认证失败: token不匹配")
|
||
logger.debug(f"收到的token: {auth_header}")
|
||
logger.debug(f"期望的token: {AUTH_TOKEN}")
|
||
return result
|
||
|
||
def to_float_safe(x: object) -> float | None:
|
||
"""
|
||
安全将值转换为 float:
|
||
- 允许 int/float
|
||
- 允许非布尔的数字字符串
|
||
- 允许 dict,优先取 'value'、'longitude'、'latitude' 等常见键
|
||
- 无法解析则返回 None
|
||
"""
|
||
# 排除布尔值(bool 是 int 的子类,但不应当被视为数值)
|
||
if isinstance(x, bool):
|
||
return None
|
||
if isinstance(x, (int, float)):
|
||
return float(x)
|
||
if isinstance(x, str):
|
||
try:
|
||
return float(x.strip())
|
||
except Exception:
|
||
return None
|
||
if isinstance(x, dict):
|
||
for key in ("value", "longitude", "latitude"):
|
||
if key in x:
|
||
return to_float_safe(x[key])
|
||
return None
|
||
return None
|
||
|
||
@app.route('/openApi/getCurrentFlightPositions', methods=['GET', 'OPTIONS'])
|
||
def get_flight_positions():
|
||
"""获取当前航空器位置信息"""
|
||
if request.method == 'OPTIONS':
|
||
return '', 204
|
||
|
||
if not check_auth():
|
||
return jsonify({
|
||
"status": 401,
|
||
"msg": "认证失败",
|
||
"data": None
|
||
}), 401
|
||
|
||
# 使用线程锁保护数据访问
|
||
with background_manager.data_lock:
|
||
# 创建符合 API 格式的响应数据
|
||
response_data = []
|
||
for aircraft in aircraft_data:
|
||
lon_val = to_float_safe(aircraft.get("longitude"))
|
||
lat_val = to_float_safe(aircraft.get("latitude"))
|
||
api_aircraft = {
|
||
"flightNo": aircraft.get("flightNo"),
|
||
"longitude": round(lon_val, 6) if lon_val is not None else aircraft.get("longitude"),
|
||
"latitude": round(lat_val, 6) if lat_val is not None else aircraft.get("latitude"),
|
||
"time": aircraft.get("time")
|
||
}
|
||
response_data.append(api_aircraft)
|
||
|
||
return jsonify({
|
||
"status": 200,
|
||
"msg": "当前航空器实时位置数据",
|
||
"data": response_data
|
||
})
|
||
|
||
def switch_traffic_light_state() -> None:
|
||
"""统一处理红绿灯状态切换"""
|
||
global last_light_switch_time
|
||
current_time = time.time()
|
||
|
||
# 西路口红绿灯每15秒切换一次
|
||
elapsed_since_switch = current_time - last_light_switch_time
|
||
if elapsed_since_switch >= TRAFFIC_LIGHT_SWITCH_INTERVAL: # 使用常量
|
||
# 获取当前状态
|
||
current_state = traffic_light_data[0]["state"]
|
||
|
||
# 根据当前状态决定下一个状态
|
||
if current_state == 0: # 当前是红灯
|
||
# 切换到黄灯(表示红绿灯故障)
|
||
traffic_light_data[0]["state"] = 2 # 2 表示黄灯
|
||
print(f"西路口红绿灯状态切换为: 黄灯(红绿灯故障)")
|
||
elif current_state == 2: # 当前是黄灯
|
||
# 从黄灯切换到绿灯
|
||
traffic_light_data[0]["state"] = 1 # 1 表示绿灯
|
||
print(f"西路口红绿灯状态切换为: 绿灯")
|
||
else: # 当前是绿灯
|
||
# 从绿灯切换到红灯
|
||
traffic_light_data[0]["state"] = 0 # 0 表示红灯
|
||
print(f"西路口红绿灯状态切换为: 红灯")
|
||
|
||
last_light_switch_time = current_time
|
||
|
||
# 更新东路口红绿灯(根据航空器位置)
|
||
if aircraft_data:
|
||
aircraft = aircraft_data[0]
|
||
# 安全获取并转换纬度为浮点数,避免类型错误
|
||
aircraft_lat = to_float_safe(aircraft.get("latitude"))
|
||
t6_lat = to_float_safe(T6_INTERSECTION.get("latitude"))
|
||
if aircraft_lat is not None and t6_lat is not None:
|
||
lat_diff = abs(aircraft_lat - t6_lat) * 111319.9 # 转米
|
||
else:
|
||
# 无法解析时默认认为距离较大,避免误判为红灯
|
||
lat_diff = float('inf')
|
||
|
||
old_state = traffic_light_data[1]["state"]
|
||
# 大于50米为绿灯(1),否则红灯(0)
|
||
traffic_light_data[1]["state"] = 1 if lat_diff > DIST_50M else 0
|
||
|
||
if old_state != traffic_light_data[1]["state"]:
|
||
print(f"东路口红绿灯状态切换为: {'绿灯' if traffic_light_data[1]['state'] == 1 else '红灯'}")
|
||
|
||
@app.route('/openApi/getCurrentVehiclePositions', methods=['GET', 'OPTIONS'])
|
||
def get_vehicle_positions():
|
||
"""获取当前车辆位置信息(仅机场车辆)"""
|
||
if request.method == 'OPTIONS':
|
||
return '', 204
|
||
|
||
if not check_auth():
|
||
return jsonify({
|
||
"status": 401,
|
||
"msg": "认证失败",
|
||
"data": None
|
||
}), 401
|
||
|
||
try:
|
||
# 使用线程锁保护数据访问
|
||
with background_manager.data_lock:
|
||
response_data = []
|
||
for v in airport_vehicle_data:
|
||
lon_val = to_float_safe(v.get("longitude"))
|
||
lat_val = to_float_safe(v.get("latitude"))
|
||
v_out = {
|
||
"vehicleNo": v.get("vehicleNo"),
|
||
"longitude": round(lon_val, 6) if lon_val is not None else v.get("longitude"),
|
||
"latitude": round(lat_val, 6) if lat_val is not None else v.get("latitude"),
|
||
"time": v.get("time")
|
||
}
|
||
response_data.append(v_out)
|
||
return jsonify({
|
||
"status": 200,
|
||
"msg": "当前车辆实时位置数据",
|
||
"data": response_data
|
||
})
|
||
except Exception as e:
|
||
logger.error(f"Error in get_vehicle_positions: {str(e)}")
|
||
return jsonify({
|
||
"status": 500,
|
||
"msg": str(e),
|
||
"data": None
|
||
}), 500
|
||
|
||
@app.route('/openApi/getTrafficLightSignals', methods=['GET', 'OPTIONS'])
|
||
def get_traffic_light_signals():
|
||
"""获取红绿灯信号状态 (旧的轮询接口)"""
|
||
if request.method == 'OPTIONS':
|
||
return '', 204
|
||
|
||
# 新增: 返回空数据,以禁用轮询数据源
|
||
return jsonify({
|
||
"status": 200,
|
||
"msg": "Traffic light polling endpoint disabled to test push mechanism.",
|
||
"data": []
|
||
})
|
||
|
||
@app.after_request
|
||
def add_cors_headers(response):
|
||
response.headers['Access-Control-Allow-Origin'] = '*'
|
||
response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization'
|
||
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, OPTIONS'
|
||
response.headers['Access-Control-Max-Age'] = '3600'
|
||
return response
|
||
|
||
def check_vehicle_states():
|
||
"""定期检查所有车辆状态"""
|
||
for vehicle_no, state in vehicle_states.items():
|
||
state.log_state()
|
||
|
||
# 认证相关配置
|
||
AUTH_USERNAME = "dianxin"
|
||
AUTH_PASSWORD = "dianxin@123"
|
||
AUTH_TOKEN = "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3MzI3ODMwOTAsInVzZXJuYW1lIjoiYWRtaW4ifQ.y9feEL_9NT8UzED9NNkb0Ln6C-PBoufiSHWobWe5vWY"
|
||
|
||
@app.route('/login', methods=['POST', 'OPTIONS'])
|
||
def login():
|
||
if request.method == 'OPTIONS':
|
||
return '', 204
|
||
|
||
# 支持多种方式获取用户名和密码
|
||
username = None
|
||
password = None
|
||
|
||
# 优先从 URL query parameters 中获取(符合API文档示例)
|
||
username = request.args.get('username')
|
||
password = request.args.get('password')
|
||
|
||
# 如果query参数中没有,尝试从form data中获取
|
||
if not username or not password:
|
||
username = request.form.get('username') or username
|
||
password = request.form.get('password') or password
|
||
|
||
# 如果form data中也没有,尝试从JSON body中获取
|
||
if not username or not password:
|
||
try:
|
||
json_data = request.get_json(silent=True)
|
||
if json_data:
|
||
username = json_data.get('username') or username
|
||
password = json_data.get('password') or password
|
||
except:
|
||
pass
|
||
|
||
logger.info(f"收到登录请求: username={username}, password={password}")
|
||
logger.debug(f"请求 URL: {request.url}")
|
||
logger.debug(f"请求方法: {request.method}")
|
||
logger.debug(f"请求参数: {request.args}")
|
||
logger.debug(f"请求表单: {request.form}")
|
||
logger.debug(f"请求JSON: {request.get_json(silent=True)}")
|
||
|
||
if not username or not password:
|
||
return jsonify({
|
||
"status": 400,
|
||
"msg": "缺少用户名或密码",
|
||
"data": None
|
||
}), 400
|
||
|
||
if username == AUTH_USERNAME and password == AUTH_PASSWORD:
|
||
return jsonify({
|
||
"status": 200,
|
||
"msg": "登入成功",
|
||
"data": AUTH_TOKEN
|
||
})
|
||
else:
|
||
return jsonify({
|
||
"status": 401,
|
||
"msg": "用户名或密码错误",
|
||
"data": None
|
||
}), 401
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
# 新增API端点
|
||
|
||
@app.route('/userInfoController/refreshToken', methods=['GET', 'OPTIONS'])
|
||
def refresh_token():
|
||
"""Token刷新接口"""
|
||
if request.method == 'OPTIONS':
|
||
return '', 204
|
||
|
||
if not check_auth():
|
||
return jsonify({
|
||
"status": 401,
|
||
"msg": "认证失败",
|
||
"data": None
|
||
}), 401
|
||
|
||
# 返回新的token(实际上还是同一个)
|
||
return jsonify({
|
||
"status": 200,
|
||
"msg": "Token刷新成功",
|
||
"data": {
|
||
"token": "Bearer dianxin-token-2024",
|
||
"expiresIn": 86400 # 24小时过期
|
||
}
|
||
})
|
||
|
||
@app.route('/runwayPathPlanningController/findArrTaxiwayByRunwayAndContactCrossAndSeat', methods=['GET', 'OPTIONS'])
|
||
def get_arrival_taxiway_route():
|
||
"""获取进港滑行路线"""
|
||
if request.method == 'OPTIONS':
|
||
return '', 204
|
||
|
||
if not check_auth():
|
||
return jsonify({
|
||
"status": 401,
|
||
"msg": "认证失败",
|
||
"data": None
|
||
}), 401
|
||
|
||
# 获取请求参数
|
||
in_runway = request.args.get('inRunway', '35')
|
||
out_runway = request.args.get('outRunway', '34')
|
||
contact_cross = request.args.get('contactCross', 'F1')
|
||
seat = request.args.get('seat', '138')
|
||
|
||
logger.info(f"进港路线查询: inRunway={in_runway}, outRunway={out_runway}, contactCross={contact_cross}, seat={seat}")
|
||
|
||
# 根据参数匹配对应的航班路由
|
||
matching_flight = None
|
||
for flight_no, params in aircraft_route_params.items():
|
||
arrival_params = params.get("arrival", {})
|
||
if (arrival_params.get("inRunway") == in_runway and
|
||
arrival_params.get("contactCross") == contact_cross and
|
||
arrival_params.get("seat") == seat):
|
||
matching_flight = flight_no
|
||
break
|
||
|
||
# 如果找到匹配的航班,返回对应路由,否则使用MU5123作为默认
|
||
if matching_flight and matching_flight in aircraft_routes:
|
||
route_data = aircraft_routes[matching_flight]["arrival"]
|
||
logger.info(f"匹配航班 {matching_flight} 的进港路由")
|
||
else:
|
||
route_data = aircraft_routes["MU5123"]["arrival"]
|
||
logger.info(f"未找到匹配航班,使用MU5123的进港路由作为默认")
|
||
|
||
# 将路由数据转换为CGCS2000坐标系格式
|
||
converted_route_data = convert_route_to_cgcs2000(route_data)
|
||
|
||
return jsonify({
|
||
"status": 200,
|
||
"msg": "进港滑行路线查询成功",
|
||
"data": converted_route_data
|
||
})
|
||
|
||
@app.route('/runwayPathPlanningController/findDepTaxiwayByRunwayAndContactCrossAndSeat', methods=['GET', 'OPTIONS'])
|
||
def get_departure_taxiway_route():
|
||
"""获取出港滑行路线"""
|
||
if request.method == 'OPTIONS':
|
||
return '', 204
|
||
|
||
if not check_auth():
|
||
return jsonify({
|
||
"status": 401,
|
||
"msg": "认证失败",
|
||
"data": None
|
||
}), 401
|
||
|
||
# 获取请求参数
|
||
in_runway = request.args.get('inRunway', '35')
|
||
out_runway = request.args.get('outRunway', '34')
|
||
start_seat = request.args.get('startSeat', '138')
|
||
|
||
logger.info(f"出港路线查询: inRunway={in_runway}, outRunway={out_runway}, startSeat={start_seat}")
|
||
|
||
# 根据参数匹配对应的航班路由
|
||
matching_flight = None
|
||
for flight_no, params in aircraft_route_params.items():
|
||
departure_params = params.get("departure", {})
|
||
if (departure_params.get("inRunway") == in_runway and
|
||
departure_params.get("startSeat") == start_seat):
|
||
matching_flight = flight_no
|
||
break
|
||
|
||
# 如果找到匹配的航班,返回对应路由,否则使用CA1234作为默认
|
||
if matching_flight and matching_flight in aircraft_routes:
|
||
route_data = aircraft_routes[matching_flight]["departure"]
|
||
logger.info(f"匹配航班 {matching_flight} 的出港路由")
|
||
else:
|
||
route_data = aircraft_routes["CA1234"]["departure"]
|
||
logger.info(f"未找到匹配航班,使用CA1234的出港路由作为默认")
|
||
|
||
# 将路由数据转换为CGCS2000坐标系格式
|
||
converted_route_data = convert_route_to_cgcs2000(route_data)
|
||
|
||
return jsonify({
|
||
"status": 200,
|
||
"msg": "出港滑行路线查询成功",
|
||
"data": converted_route_data
|
||
})
|
||
|
||
def get_active_flights() -> list[dict[str, Any]]:
|
||
"""获取当前活跃的进出港航班(基于路径状态)"""
|
||
active_flights = []
|
||
current_time = int(time.time() * 1000)
|
||
|
||
# 遍历所有路径跟随管理器中的飞机
|
||
for flight_no, follower in route_manager.followers.items():
|
||
# 检查飞机是否在活跃状态(正在路径中或在机位)
|
||
if follower.flight_status in ['in_route', 'at_gate']:
|
||
# 获取航班基础信息
|
||
if flight_no in flight_data:
|
||
flight_info = flight_data[flight_no]
|
||
|
||
# 根据路径类型确定航班通知类型
|
||
flight_type = flight_info.get("type")
|
||
event_type = "落地" if flight_type == "IN" else "滑出"
|
||
|
||
# 构造航班通知数据
|
||
flight_notification = {
|
||
"flightNo": flight_no,
|
||
"type": flight_type,
|
||
"runway": flight_info.get("runway"),
|
||
"contactCross": flight_info.get("contactCross"),
|
||
"seat": flight_info.get("seat"),
|
||
"time": current_time
|
||
}
|
||
active_flights.append(flight_notification)
|
||
|
||
logger.info(f"航班 {flight_no} {event_type}通知中 (路径状态: {follower.flight_status}, 进度: {follower.route_progress:.2f})")
|
||
else:
|
||
logger.warning(f"航班 {flight_no} 在路径跟随器中但不在flight_data中")
|
||
|
||
return active_flights
|
||
|
||
@app.route('/openApi/getInboundAndOutboundFlightsNotification', methods=['GET', 'OPTIONS'])
|
||
def get_flights():
|
||
"""进出港航班查询接口(Mock)"""
|
||
if request.method == 'OPTIONS':
|
||
return '', 204
|
||
|
||
if not check_auth():
|
||
return jsonify({
|
||
"status": 401,
|
||
"msg": "认证失败",
|
||
"data": None
|
||
}), 401
|
||
|
||
try:
|
||
# 获取当前活跃航班
|
||
active_flights = get_active_flights()
|
||
|
||
logger.info(f"进出港航班查询: 返回 {len(active_flights)} 个活跃航班")
|
||
|
||
return jsonify({
|
||
"status": 200,
|
||
"msg": "进出港航班查询成功",
|
||
"data": active_flights
|
||
})
|
||
|
||
except Exception as e:
|
||
logger.error(f"进出港航班查询失败: {str(e)}")
|
||
return jsonify({
|
||
"status": 500,
|
||
"msg": f"查询失败: {str(e)}",
|
||
"data": []
|
||
}), 500
|
||
|
||
@app.route('/aircraftStatusController/getAircraftStatus', methods=['GET', 'OPTIONS'])
|
||
def get_aircraft_status():
|
||
"""获取航空器状态"""
|
||
if request.method == 'OPTIONS':
|
||
return '', 204
|
||
|
||
if not check_auth():
|
||
return jsonify({
|
||
"status": 401,
|
||
"msg": "认证失败",
|
||
"data": None
|
||
}), 401
|
||
|
||
# 返回简化的航空器状态信息
|
||
return jsonify({
|
||
"status": 200,
|
||
"msg": "航空器状态查询成功",
|
||
"data": {
|
||
"message": "已简化为两架飞机:CA1234(出港)和MU5123(进港)"
|
||
}
|
||
})
|
||
|
||
|
||
if __name__ == '__main__':
|
||
# 启动后台更新管理器
|
||
background_manager.start()
|
||
|
||
# 注册应用关闭时的清理函数
|
||
def cleanup_on_exit():
|
||
background_manager.stop()
|
||
atexit.register(cleanup_on_exit)
|
||
|
||
try:
|
||
app.run(host='localhost', port=8090, debug=True)
|
||
finally:
|
||
# 确保后台管理器正确停止
|
||
background_manager.stop() |