QAUP_Management/tools/mock_server.py
Tian jianyong e53e7c6d6c 增加了 mock_server的航空器状态信息辅助接口,采用真实的航空器路由数据,用 JTS
Topology Suite 解析数据并拼接成单线,同时做了数据验证。支持了中国大地坐标系2000
(CGCS2000)投影坐标数据。
2025-07-24 20:27:15 +08:00

2239 lines
85 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, jsonify, request
import time
import math
import random
import logging
import os
import uuid
# 创建 logs 目录(如果不存在)
if not os.path.exists('logs'):
os.makedirs('logs')
# 配置日志
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/mock_server.log'),
logging.StreamHandler()
]
)
app = Flask(__name__)
# 地球半径(米)
EARTH_RADIUS = 6378137.0
COMMAND_PRIORITIES = {
"ALERT": 5,
"WARNING": 3,
"PARKING": 2,
"RESUME": 1
}
def meters_to_degrees(meters, latitude):
"""
将米转换为度,考虑纬度的影响
"""
# 纬度方向1度 = 111,319.9米
meters_per_deg_lat = 111319.9
# 经度方向1度 = 111,319.9 * cos(latitude)米
meters_per_deg_lon = meters_per_deg_lat * math.cos(math.radians(latitude))
return meters / meters_per_deg_lat, meters / meters_per_deg_lon
# 距离配置(米)
DIST_300M = 300
DIST_200M = 200
DIST_150M = 150
DIST_100M = 100
DIST_50M = 50
# 时间配置(秒)
WAIT_TIME_AFTER_RETURN = 0.1 # 返回起点后的等待时间进一步优化减少到0.1秒)
UPDATE_INTERVAL = 1.0 # 位置更新间隔, 默认 1 秒
TRAFFIC_LIGHT_SWITCH_INTERVAL = 10.0 # 红绿灯切换间隔
# 根据 route.md 定义的坐标点
# 飞机 CA1234 路径
CA_START = {"longitude": 120.086263, "latitude": 36.370484} # 飞机起点
CA_END = {"longitude": 120.080996, "latitude": 36.369105} # 飞机终点
# 飞机 MU5123 路径
MU_START = {"longitude": 120.088076, "latitude": 36.374179} # 飞机起点
MU_END = {"longitude": 120.077971, "latitude": 36.371503} # 飞机终点
# 特勤车 鲁B123 路径
SPECIAL_VEHICLE_START = {"longitude": 120.080801, "latitude": 36.366626} # 特勤车起点
SPECIAL_VEHICLE_END = {"longitude": 120.083899, "latitude": 36.367403} # 特勤车终点
# 普通车 鲁B234 路径
NORMAL_VEHICLE_START = {"longitude": 120.087259, "latitude": 36.368299} # 普通车起点
NORMAL_VEHICLE_END = {"longitude": 120.083899, "latitude": 36.367403} # 普通车终点
# 无人车A 鲁B567 路径
UNMANNED_A_START = {"longitude": 120.083084, "latitude": 36.369696} # 无人车A起点
UNMANNED_A_END = {"longitude": 120.084637, "latitude": 36.365617} # 无人车A终点
# 无人车B 鲁B579 路径
UNMANNED_B_START = {"longitude": 120.086965, "latitude": 36.368599} # 无人车B起点
UNMANNED_B_END = {"longitude": 120.086263, "latitude": 36.370484} # 无人车B终点
# 飞机和车辆尺寸(半径 米)
AIRCRAFT_SIZE_M = 30.0
VEHICLE_SIZE_M = 10.0
# CA3456 航空器路由参数配置
aircraft_route_params = {
"CA3456": {
"arrival": { # 进港参数
"inRunway": "35",
"outRunway": "34",
"contactCross": "F1",
"seat": "138"
},
"departure": { # 出港参数
"inRunway": "35",
"outRunway": "34",
"startSeat": "138"
}
},
"CA1234": {
"arrival": {
"inRunway": "17",
"outRunway": "35",
"contactCross": "A2",
"seat": "201"
},
"departure": {
"inRunway": "17",
"outRunway": "35",
"startSeat": "201"
}
},
"MU5123": {
"arrival": {
"inRunway": "35",
"outRunway": "17",
"contactCross": "B3",
"seat": "156"
},
"departure": {
"inRunway": "35",
"outRunway": "17",
"startSeat": "156"
}
}
}
@app.route('/aircraftRouteParamsController/getRouteParams', methods=['GET', 'OPTIONS'])
def get_aircraft_route_params():
"""获取航空器路由查询参数"""
if request.method == 'OPTIONS':
return '', 204
if not check_auth():
return jsonify({
"status": 401,
"msg": "认证失败",
"data": None
}), 401
# 获取航班号和路由类型参数
flight_no = request.args.get('flightNo')
route_type = request.args.get('routeType', '').upper() # IN 或 OUT
if not flight_no:
return jsonify({
"status": 400,
"msg": "缺少flightNo参数",
"data": None
}), 400
if route_type not in ['IN', 'OUT']:
return jsonify({
"status": 400,
"msg": "routeType参数必须是IN或OUT",
"data": None
}), 400
# 查找航班路由参数
aircraft_params = aircraft_route_params.get(flight_no)
if not aircraft_params:
return jsonify({
"status": 404,
"msg": f"未找到航班 {flight_no} 的路由参数",
"data": None
}), 404
# 根据路由类型返回对应参数
if route_type == 'IN':
route_params = aircraft_params.get('arrival')
if not route_params:
return jsonify({
"status": 404,
"msg": f"未找到航班 {flight_no} 的进港路由参数",
"data": None
}), 404
logging.info(f"进港路由参数查询: flightNo={flight_no}, params={route_params}")
return jsonify({
"status": 200,
"msg": "进港路由参数查询成功",
"data": {
"flightNo": flight_no,
"routeType": "IN",
"inRunway": route_params["inRunway"],
"outRunway": route_params["outRunway"],
"contactCross": route_params["contactCross"],
"seat": route_params["seat"],
"timestamp": int(time.time() * 1000)
}
})
else: # OUT
route_params = aircraft_params.get('departure')
if not route_params:
return jsonify({
"status": 404,
"msg": f"未找到航班 {flight_no} 的出港路由参数",
"data": None
}), 404
logging.info(f"出港路由参数查询: flightNo={flight_no}, params={route_params}")
return jsonify({
"status": 200,
"msg": "出港路由参数查询成功",
"data": {
"flightNo": flight_no,
"routeType": "OUT",
"inRunway": route_params["inRunway"],
"outRunway": route_params["outRunway"],
"startSeat": route_params["startSeat"],
"timestamp": int(time.time() * 1000)
}
})
ca3456_status = {
"flightNo": "CA3456",
"type": "IN", # IN: 进港, ARRIVED: 停留, OUT: 出港
"inRunway": "35",
"outRunway": "34",
"contactCross": "F1",
"seat": "138",
"timestamp": int(time.time() * 1000),
"status_start_time": time.time(),
"cycle_duration": 77, # 总循环时间: 30+15+30+2=77秒
"arrival_duration": 30, # 进港阶段持续时间
"wait_duration": 15, # 停留时间(15秒方便测试)
"departure_duration": 30 # 出港阶段持续时间
}
# 航空器路由数据 - 使用API文档中的完整示例数据
aircraft_routes = {
"arrival": {
"type": "IN",
"status": "COMPLETE",
"codes": "F1,L4,138",
"geometry": None,
"geoPath": {
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050742275893088E7, 4026164.644604296],
[4.050742342874898E7, 4026162.545793306]
]
},
"properties": {
"code": "L4"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050743615407222E7, 4026122.672208275],
[4.050743684026714E7, 4026120.146600441],
[4.050743730372977E7, 4026117.570797326],
[4.050743754093282E7, 4026114.964402468],
[4.050743757419489E7, 4026113.602043673],
[4.050743755007106E7, 4026112.347252104],
[4.050743733107493E7, 4026109.739264329],
[4.050743688561112E7, 4026107.160287504]
]
},
"properties": {
"code": "L4"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050717462298063E7, 4026091.904402129],
[4.050716820216861E7, 4026089.855066455]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050722536188381E7, 4026108.097315812],
[4.050720821283463E7, 4026102.624334418]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050727144214725E7, 4026112.527790001],
[4.050726278505515E7, 4026114.415332655]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050731882638656E7, 4026102.196402456],
[4.050727312768086E7, 4026112.160285922]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050738651815705E7, 4026087.437277401],
[4.050734647450486E7, 4026096.168165339]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050714461981621E7, 4026082.328947974],
[4.05071119278174E7, 4026071.895744022]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050734647450486E7, 4026096.168165339],
[4.050733913391775E7, 4026097.768664928]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050689454491971E7, 4026002.519737061],
[4.050693265139649E7, 4026014.681113256],
[4.050697075787329E7, 4026026.842489458]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050741162298967E7, 4026083.825606086],
[4.050741416963529E7, 4026084.285112275],
[4.050741669524226E7, 4026084.971307588],
[4.050741915143272E7, 4026085.875012957],
[4.050742151951354E7, 4026086.989350639],
[4.050742378146222E7, 4026088.305839852],
[4.050742592006397E7, 4026089.814461317],
[4.050742791904272E7, 4026091.503733515],
[4.050742976318505E7, 4026093.360800063],
[4.050743143845592E7, 4026095.371527565],
[4.050743293210549E7, 4026097.52061317],
[4.050743423276621E7, 4026099.791701039],
[4.050743533053925E7, 4026102.167506821],
[4.05074362170699E7, 4026104.629949201],
[4.050743683431807E7, 4026106.966150228],
[4.050743688561112E7, 4026107.160287504]
]
},
"properties": {
"code": ""
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050697552118288E7, 4026028.362661481],
[4.050697075787329E7, 4026026.842489458]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050704221346159E7, 4026049.646966941],
[4.050703137036284E7, 4026046.18647901]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050708746004742E7, 4026064.087051627],
[4.050704840096232E7, 4026051.621473066]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.05071119278174E7, 4026071.895744022],
[4.050710556055213E7, 4026069.863682419]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050741939071107E7, 4026175.198599438],
[4.05074216811156E7, 4026168.021835575],
[4.050742275893088E7, 4026164.644604296]
]
},
"properties": {
"code": "L4"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050753774654577E7, 4026246.448261945],
[4.050749515081406E7, 4026236.848849251],
[4.050744870329395E7, 4026226.381394062]
]
},
"properties": {
"code": "138"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050753774654577E7, 4026246.448261945],
[4.0507613391983E7, 4026263.495786141],
[4.05076192451935E7, 4026264.814870958],
[4.050762119626365E7, 4026265.254565894]
]
},
"properties": {
"code": "138"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050742342874898E7, 4026162.545793306],
[4.050743615407222E7, 4026122.672208275]
]
},
"properties": {
"code": "L4"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050716820216861E7, 4026089.855066455],
[4.050714461981621E7, 4026082.328947974]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050720821283463E7, 4026102.624334418],
[4.050717462298063E7, 4026091.904402129]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050726278505515E7, 4026114.415332655],
[4.050725934642086E7, 4026115.009285077],
[4.050725586910526E7, 4026115.301280484],
[4.050725237957282E7, 4026115.289096617],
[4.050724890438099E7, 4026114.9728262],
[4.050724546997807E7, 4026114.354876244],
[4.050724210250195E7, 4026113.43994972],
[4.050722536188381E7, 4026108.097315812]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050727312768086E7, 4026112.160285922],
[4.050727144214725E7, 4026112.527790001]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050733913391775E7, 4026097.768664928],
[4.050731882638656E7, 4026102.196402456]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.05074011833275E7, 4026084.239767811],
[4.050738651815705E7, 4026087.437277401]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.05074011833275E7, 4026084.239767811],
[4.050740376230329E7, 4026083.794303922],
[4.050740637029002E7, 4026083.5753078],
[4.050740898743934E7, 4026083.584446135],
[4.050741162298967E7, 4026083.825606086]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050744870329395E7, 4026226.381394062],
[4.050744533581797E7, 4026225.466467002],
[4.050744206089737E7, 4026224.261526533],
[4.050743890345625E7, 4026222.775742978],
[4.050743588752466E7, 4026221.020424048],
[4.050743303605565E7, 4026219.00892878],
[4.050743037075064E7, 4026216.756565868],
[4.050742791189419E7, 4026214.280477153],
[4.050742567819968E7, 4026211.599507165],
[4.050742368666689E7, 4026208.734059705],
[4.050742195245258E7, 4026205.705942559],
[4.050742048875517E7, 4026202.538201526],
[4.050741930671428E7, 4026199.254945029],
[4.050741841532595E7, 4026195.88116063],
[4.050741782137419E7, 4026192.442524868],
[4.050741752937933E7, 4026188.965207836],
[4.050741754156362E7, 4026185.475674017],
[4.050741785783435E7, 4026182.000480871],
[4.050741847578449E7, 4026178.566076715],
[4.050741939071107E7, 4026175.198599438]
]
},
"properties": {
"code": ""
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050684675534101E7, 4025987.268076611],
[4.050685643844293E7, 4025990.358360866],
[4.050689454491971E7, 4026002.519737061]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050703137036284E7, 4026046.18647901],
[4.05070295640735E7, 4026045.610016264],
[4.050701981996215E7, 4026042.500261312],
[4.050697623399237E7, 4026028.590148907],
[4.050697552118288E7, 4026028.362661481]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050704840096232E7, 4026051.621473066],
[4.050704221346159E7, 4026049.646966941]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050710556055213E7, 4026069.863682419],
[4.050708746004742E7, 4026064.087051627]
]
},
"properties": {
"code": "F1"
}
}
]
}
},
"departure": {
"type": "OUT",
"status": "COMPLETE",
"codes": "138,L4,F1",
"geometry": None,
"geoPath": {
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050742275893088E7, 4026164.644604296],
[4.050742342874898E7, 4026162.545793306]
]
},
"properties": {
"code": "L4"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050743615407222E7, 4026122.672208275],
[4.050743684026714E7, 4026120.146600441],
[4.050743730372977E7, 4026117.570797326],
[4.050743754093282E7, 4026114.964402468],
[4.050743757419489E7, 4026113.602043673],
[4.050743755007106E7, 4026112.347252104],
[4.050743733107493E7, 4026109.739264329],
[4.050743688561112E7, 4026107.160287504]
]
},
"properties": {
"code": "L4"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050717462298063E7, 4026091.904402129],
[4.050716820216861E7, 4026089.855066455]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050722536188381E7, 4026108.097315812],
[4.050720821283463E7, 4026102.624334418]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050727144214725E7, 4026112.527790001],
[4.050726278505515E7, 4026114.415332655]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050731882638656E7, 4026102.196402456],
[4.050727312768086E7, 4026112.160285922]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050738651815705E7, 4026087.437277401],
[4.050734647450486E7, 4026096.168165339]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050714461981621E7, 4026082.328947974],
[4.05071119278174E7, 4026071.895744022]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050734647450486E7, 4026096.168165339],
[4.050733913391775E7, 4026097.768664928]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050689454491971E7, 4026002.519737061],
[4.050693265139649E7, 4026014.681113256],
[4.050697075787329E7, 4026026.842489458]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050741162298967E7, 4026083.825606086],
[4.050741416963529E7, 4026084.285112275],
[4.050741669524226E7, 4026084.971307588],
[4.050741915143272E7, 4026085.875012957],
[4.050742151951354E7, 4026086.989350639],
[4.050742378146222E7, 4026088.305839852],
[4.050742592006397E7, 4026089.814461317],
[4.050742791904272E7, 4026091.503733515],
[4.050742976318505E7, 4026093.360800063],
[4.050743143845592E7, 4026095.371527565],
[4.050743293210549E7, 4026097.52061317],
[4.050743423276621E7, 4026099.791701039],
[4.050743533053925E7, 4026102.167506821],
[4.05074362170699E7, 4026104.629949201],
[4.050743683431807E7, 4026106.966150228],
[4.050743688561112E7, 4026107.160287504]
]
},
"properties": {
"code": ""
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050697552118288E7, 4026028.362661481],
[4.050697075787329E7, 4026026.842489458]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050704221346159E7, 4026049.646966941],
[4.050703137036284E7, 4026046.18647901]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050708746004742E7, 4026064.087051627],
[4.050704840096232E7, 4026051.621473066]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.05071119278174E7, 4026071.895744022],
[4.050710556055213E7, 4026069.863682419]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050741939071107E7, 4026175.198599438],
[4.05074216811156E7, 4026168.021835575],
[4.050742275893088E7, 4026164.644604296]
]
},
"properties": {
"code": "L4"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050753774654577E7, 4026246.448261945],
[4.050749515081406E7, 4026236.848849251],
[4.050744870329395E7, 4026226.381394062]
]
},
"properties": {
"code": "138"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050753774654577E7, 4026246.448261945],
[4.0507613391983E7, 4026263.495786141],
[4.05076192451935E7, 4026264.814870958],
[4.050762119626365E7, 4026265.254565894]
]
},
"properties": {
"code": "138"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050742342874898E7, 4026162.545793306],
[4.050743615407222E7, 4026122.672208275]
]
},
"properties": {
"code": "L4"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050716820216861E7, 4026089.855066455],
[4.050714461981621E7, 4026082.328947974]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050720821283463E7, 4026102.624334418],
[4.050717462298063E7, 4026091.904402129]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050726278505515E7, 4026114.415332655],
[4.050725934642086E7, 4026115.009285077],
[4.050725586910526E7, 4026115.301280484],
[4.050725237957282E7, 4026115.289096617],
[4.050724890438099E7, 4026114.9728262],
[4.050724546997807E7, 4026114.354876244],
[4.050724210250195E7, 4026113.43994972],
[4.050722536188381E7, 4026108.097315812]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050727312768086E7, 4026112.160285922],
[4.050727144214725E7, 4026112.527790001]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050733913391775E7, 4026097.768664928],
[4.050731882638656E7, 4026102.196402456]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.05074011833275E7, 4026084.239767811],
[4.050738651815705E7, 4026087.437277401]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.05074011833275E7, 4026084.239767811],
[4.050740376230329E7, 4026083.794303922],
[4.050740637029002E7, 4026083.5753078],
[4.050740898743934E7, 4026083.584446135],
[4.050741162298967E7, 4026083.825606086]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050744870329395E7, 4026226.381394062],
[4.050744533581797E7, 4026225.466467002],
[4.050744206089737E7, 4026224.261526533],
[4.050743890345625E7, 4026222.775742978],
[4.050743588752466E7, 4026221.020424048],
[4.050743303605565E7, 4026219.00892878],
[4.050743037075064E7, 4026216.756565868],
[4.050742791189419E7, 4026214.280477153],
[4.050742567819968E7, 4026211.599507165],
[4.050742368666689E7, 4026208.734059705],
[4.050742195245258E7, 4026205.705942559],
[4.050742048875517E7, 4026202.538201526],
[4.050741930671428E7, 4026199.254945029],
[4.050741841532595E7, 4026195.88116063],
[4.050741782137419E7, 4026192.442524868],
[4.050741752937933E7, 4026188.965207836],
[4.050741754156362E7, 4026185.475674017],
[4.050741785783435E7, 4026182.000480871],
[4.050741847578449E7, 4026178.566076715],
[4.050741939071107E7, 4026175.198599438]
]
},
"properties": {
"code": ""
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050684675534101E7, 4025987.268076611],
[4.050685643844293E7, 4025990.358360866],
[4.050689454491971E7, 4026002.519737061]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050703137036284E7, 4026046.18647901],
[4.05070295640735E7, 4026045.610016264],
[4.050701981996215E7, 4026042.500261312],
[4.050697623399237E7, 4026028.590148907],
[4.050697552118288E7, 4026028.362661481]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050704840096232E7, 4026051.621473066],
[4.050704221346159E7, 4026049.646966941]
]
},
"properties": {
"code": "F1"
}
},
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[4.050710556055213E7, 4026069.863682419],
[4.050708746004742E7, 4026064.087051627]
]
},
"properties": {
"code": "F1"
}
}
]
}
}
}
# 飞机数据 - 根据 route.md 配置
aircraft_data = [
{
"flightNo": "CA1234", # 根据route.md更新航班号
"longitude": CA_START["longitude"],
"latitude": CA_START["latitude"],
"time": int(time.time() * 1000),
"altitude": 0.0,
"trackNumber": 1001,
"speed": 50.0, # 根据route.md设置为50km/h
"start_point": CA_START, # 起点
"end_point": CA_END, # 终点
"moving_to_end": True # 当前是否向终点移动
},
{
"flightNo": "MU5123", # 根据route.md更新航班号
"longitude": MU_START["longitude"],
"latitude": MU_START["latitude"],
"time": int(time.time() * 1000),
"altitude": 0.0,
"trackNumber": 1002,
"speed": 50.0, # 根据route.md设置为50km/h
"start_point": MU_START, # 起点
"end_point": MU_END, # 终点
"moving_to_end": True # 当前是否向终点移动
}
]
# 车辆数据,根据 route.md 配置
DEFAULT_VEHICLE_SPEED = 30.0 # km/h默认速度
EMERGENCY_BRAKE_DECELERATION = 0.8 # 紧急制动减速度 (每次更新减速 80%)
NORMAL_BRAKE_DECELERATION = 0.2 # 正常制动减速度 (每次更新减速 20%)
vehicle_data = [
{
"vehicleNo": "鲁B123", # 特勤车根据route.md更新
"longitude": SPECIAL_VEHICLE_START["longitude"],
"latitude": SPECIAL_VEHICLE_START["latitude"],
"time": int(time.time() * 1000),
"direction": 0.0, # 方向角
"speed": 30.0, # 根据route.md设置为30km/h
"start_point": SPECIAL_VEHICLE_START, # 起点
"end_point": SPECIAL_VEHICLE_END, # 终点
"moving_to_end": True # 当前是否向终点移动
},
{
"vehicleNo": "鲁B234", # 普通车根据route.md更新
"longitude": NORMAL_VEHICLE_START["longitude"],
"latitude": NORMAL_VEHICLE_START["latitude"],
"time": int(time.time() * 1000),
"direction": 180.0, # 根据route.md设置为180度
"speed": 30.0, # 根据route.md设置为30km/h
"start_point": NORMAL_VEHICLE_START, # 起点
"end_point": NORMAL_VEHICLE_END, # 终点
"moving_to_end": True # 当前是否向终点移动
},
{
"vehicleNo": "鲁B567", # 无人车A根据route.md更新
"longitude": UNMANNED_A_START["longitude"],
"latitude": UNMANNED_A_START["latitude"],
"time": int(time.time() * 1000),
"direction": 90.0, # 根据route.md设置为90度
"speed": 25.0, # 根据route.md设置为25km/h
"start_point": UNMANNED_A_START, # 起点
"end_point": UNMANNED_A_END, # 终点
"moving_to_end": True # 当前是否向终点移动
},
{
"vehicleNo": "鲁B579", # 无人车B根据route.md更新
"longitude": UNMANNED_B_START["longitude"],
"latitude": UNMANNED_B_START["latitude"],
"time": int(time.time() * 1000),
"direction": 270.0, # 根据route.md设置为270度
"speed": 25.0, # 根据route.md设置为25km/h
"start_point": UNMANNED_B_START, # 起点
"end_point": UNMANNED_B_END, # 终点
"moving_to_end": True # 当前是否向终点移动
}
]
# 车辆分类
airport_vehicle_data = [v for v in vehicle_data if v["vehicleNo"] in ["鲁B123", "鲁B234"]] # 特勤车和普通车
unmanned_vehicle_data = [v for v in vehicle_data if v["vehicleNo"] in ["鲁B567", "鲁B579"]] # 无人车
# 添加车辆状态类
class VehicleState:
def __init__(self, vehicle_no):
self.vehicle_no = vehicle_no
self.is_running = True # 运行状态
self.current_command = None # 当前执行的指令
self.command_priority = 0 # 当前指令优先级
self.target_speed = DEFAULT_VEHICLE_SPEED # 目标速度
self.brake_mode = None # 制动模式:'emergency' 或 'normal'
self.command_reason = None # 指令原因
self.last_command_time = time.time() # 最后一次指令时间
self.traffic_light_state = None # 当前红绿灯状态
self.target_lat = None # 目标纬度
self.target_lon = None # 目标经度
self.is_traffic_light_stop = False # 是否因红灯停车
def can_be_overridden_by(self, command_type):
"""判断当前指令是否可以被新指令覆盖"""
# 红绿灯信号不参与指令优先级判断
if command_type in ["RED", "GREEN", "YELLOW"]:
return True
new_priority = COMMAND_PRIORITIES.get(command_type, 0)
current_priority = COMMAND_PRIORITIES.get(self.current_command, 0)
# 相同类型的指令可以覆盖(因为需要持续发送)
if command_type == self.current_command:
return True
# ALERT 指令可以被 RESUME 解除
if self.current_command == "ALERT":
return command_type == "RESUME"
# WARNING 指令可以被 RESUME 或相同及更高优先级指令覆盖
if self.current_command == "WARNING":
return command_type == "RESUME" or new_priority >= current_priority
# 其他情况下,相同或更高优先级可以覆盖
return new_priority >= current_priority
def update_command(self, command_type, target_lat=None, target_lon=None):
"""更新指令状态"""
# 更新目标位置
if target_lat is not None and target_lon is not None:
self.target_lat = target_lat
self.target_lon = target_lon
# 如果是红绿灯状态,只更新状态,不影响其他指令
if command_type in ["RED", "GREEN", "YELLOW"]:
old_state = self.traffic_light_state
self.traffic_light_state = command_type
# 更新红灯停车状态
if command_type == "RED":
self.is_traffic_light_stop = True
elif command_type in ["GREEN", "YELLOW"]:
self.is_traffic_light_stop = False
print(f"车辆 {self.vehicle_no} 收到红绿灯信号: {command_type} (原状态: {old_state})")
else:
# 其他指令正常更新
self.current_command = command_type
self.command_priority = COMMAND_PRIORITIES.get(command_type, 0)
# RESUME 指令不清除红绿灯状态,红绿灯状态由信号灯独立控制
if command_type == "RESUME":
# 清除其他状态
self.brake_mode = None
self.command_reason = None
# 更新运行状态
self.is_running = self.can_move()
print(f"更新车辆 {self.vehicle_no} 状态: command={self.current_command}, traffic_light={self.traffic_light_state}, priority={self.command_priority}, is_running={self.is_running}")
def can_move(self):
"""检查车辆是否可以移动"""
# 如果有告警指令,不能移动
if self.current_command == "ALERT":
return False
# 如果有预警指令,不能移动
if self.current_command == "WARNING":
return False
# 如果有安全停靠指令,不能移动
if self.current_command == "PARKING":
return False
# 如果是红灯停车状态,不能移动
if self.is_traffic_light_stop:
return False
# 其他情况可以移动
return True
def log_state(self):
"""记录当前车辆状态"""
print(f"""
车辆状态:
- 车辆编号: {self.vehicle_no}
- 运行状态: {'运行中' if self.is_running else '已停止'}
- 当前指令: {self.current_command}
- 红绿灯状态: {self.traffic_light_state}
- 红灯停车: {'' if self.is_traffic_light_stop else ''}
- 指令优先级: {self.command_priority}
- 目标速度: {self.target_speed}
- 制动模式: {self.brake_mode}
- 指令原因: {self.command_reason}
- 目标位置: ({self.target_lat}, {self.target_lon})
""")
# 添加车辆状态管理
vehicle_states = {}
for vehicle in vehicle_data:
vehicle_states[vehicle["vehicleNo"]] = VehicleState(vehicle["vehicleNo"])
def calculate_distance_to_target(vehicle, target_lat, target_lon):
"""计算车辆到目标位的距离(米)"""
# 转换为弧度
lat1_rad = math.radians(vehicle["latitude"])
lon1_rad = math.radians(vehicle["longitude"])
lat2_rad = math.radians(target_lat)
lon2_rad = math.radians(target_lon)
dlat = lat2_rad - lat1_rad
dlon = lon2_rad - lon1_rad
# 使用 Haversine 公式计算距离
a = math.sin(dlat/2) * math.sin(dlat/2) + \
math.cos(lat1_rad) * math.cos(lat2_rad) * \
math.sin(dlon/2) * math.sin(dlon/2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
return EARTH_RADIUS * c
@app.route('/api/VehicleCommandInfo', methods=['POST'])
def handle_vehicle_command():
try:
data = request.json
vehicle_id = data.get("vehicleID")
command_type = data.get("commandType", "").upper()
reason = data.get("commandReason", "").upper()
signal_state = data.get("signalState", "").upper()
target_lat = data.get("latitude", None)
target_lon = data.get("longitude", None)
print(f"收到车辆控制指令: vehicle_id={vehicle_id}, type={command_type}, reason={reason}, signal_state={signal_state}")
print(f"完整请求数据: {data}")
# 检查 SIGNAL 类型命令必须包含 signalState
if command_type == "SIGNAL" and not signal_state:
print(f"SIGNAL 类型命令缺少 signalState")
return jsonify({
"code": 400,
"msg": "SIGNAL command must include signalState",
"transId": data.get("transId"),
"timestamp": int(time.time() * 1000)
}), 400
# 检查车辆是否存在
vehicle_state = vehicle_states.get(vehicle_id)
if not vehicle_state:
print(f"未找到车辆: {vehicle_id}")
return jsonify({
"code": 404,
"msg": f"Vehicle {vehicle_id} not found",
"transId": data.get("transId"),
"timestamp": int(time.time() * 1000)
}), 404
# 打印当前车辆状态
print(f"当前车辆状态: vehicle={vehicle_id}")
vehicle_state.log_state()
# 特勤车鲁B123只响应红绿灯信号
if vehicle_id == "鲁B123":
if command_type != "SIGNAL":
print(f"特勤车辆忽略非红绿灯指令: {vehicle_id}")
return jsonify({
"code": 200,
"msg": "Special vehicle only responds to traffic light signals",
"transId": data.get("transId"),
"timestamp": int(time.time() * 1000)
})
# 更新红绿灯状态和指令状态
vehicle_state.update_command(signal_state, target_lat, target_lon)
print(f"特勤车 {vehicle_id} 更新状态: command={command_type}, traffic_light={vehicle_state.traffic_light_state}")
return jsonify({
"code": 200,
"msg": "Traffic light state updated",
"transId": data.get("transId"),
"timestamp": int(time.time() * 1000)
})
# 检查指令优先级并添加详细日志
check_command = signal_state if command_type == "SIGNAL" else command_type
can_override = vehicle_state.can_be_overridden_by(check_command)
print(f"指令优先级检查: vehicle={vehicle_id}, current_command={vehicle_state.current_command}, "
f"new_command={check_command}, can_override={can_override}")
if not can_override:
print(f"指令优先级过低: vehicle={vehicle_id}, current_priority={vehicle_state.command_priority}, "
f"command={check_command}")
return jsonify({
"code": 400,
"msg": "Command priority too low",
"transId": data.get("transId"),
"timestamp": int(time.time() * 1000)
})
# 处理不同类型的指令
if command_type == "SIGNAL":
# 更新红绿灯状态和指令状态
vehicle_state.update_command(signal_state, target_lat, target_lon)
print(f"车辆 {vehicle_id} 更新状态: command={command_type}, traffic_light={vehicle_state.traffic_light_state}")
return jsonify({
"code": 200,
"msg": "Traffic light state updated",
"transId": data.get("transId"),
"timestamp": int(time.time() * 1000)
})
elif command_type in ["ALERT", "WARNING"]:
# 查找当前车辆
current_vehicle = None
for v in vehicle_data:
if v["vehicleNo"] == vehicle_id:
current_vehicle = v
break
# 执行告警指令直接停车
print(f"执行{'紧急' if command_type == 'ALERT' else '正常'}制动: vehicle={vehicle_id}")
vehicle_state.current_command = command_type
vehicle_state.command_priority = COMMAND_PRIORITIES.get(command_type, 0)
vehicle_state.is_running = False
vehicle_state.target_speed = 0
vehicle_state.brake_mode = "emergency" if command_type == "ALERT" else "normal"
vehicle_state.target_lat = target_lat
vehicle_state.target_lon = target_lon
# 注释掉不再直接设置车辆速度为0让位置更新逻辑统一处理
# if current_vehicle:
# current_vehicle["speed"] = 0
return jsonify({
"code": 200,
"msg": "Command executed successfully",
"transId": data.get("transId"),
"timestamp": int(time.time() * 1000)
})
elif command_type == "RESUME":
print(f"执行恢复指令: vehicle_id={vehicle_id}")
# 检查当前车辆
current_vehicle = None
for v in vehicle_data:
if v["vehicleNo"] == vehicle_id:
current_vehicle = v
break
# 清除限制性指令
if vehicle_state.current_command in ["ALERT", "WARNING"]:
print(f"清除限制性指令: vehicle={vehicle_id}")
vehicle_state.current_command = None
vehicle_state.command_priority = 0
vehicle_state.is_running = True
vehicle_state.target_speed = DEFAULT_VEHICLE_SPEED
vehicle_state.brake_mode = None
vehicle_state.target_lat = None
vehicle_state.target_lon = None
# 更新车辆运行状态
vehicle_state.is_running = vehicle_state.can_move()
if vehicle_state.is_running and current_vehicle:
print(f"车辆 {vehicle_id} 恢复运行")
# 注释掉:不再直接设置车辆速度,让位置更新逻辑统一处理
# current_vehicle["speed"] = DEFAULT_VEHICLE_SPEED
# 记录状态变化但不更新指令
vehicle_state.command_reason = reason
vehicle_state.last_command_time = time.time()
print(f"Vehicle {vehicle_id} state updated: running={vehicle_state.is_running}, "
f"command={vehicle_state.current_command}, traffic_light={vehicle_state.traffic_light_state}, "
f"reason={reason}, priority={vehicle_state.command_priority}, "
f"target_speed={vehicle_state.target_speed}, brake_mode={vehicle_state.brake_mode}")
return jsonify({
"code": 200,
"msg": "Command executed successfully",
"transId": data.get("transId"),
"timestamp": int(time.time() * 1000)
})
return jsonify({
"code": 200,
"msg": "Command executed successfully",
"transId": data.get("transId"),
"timestamp": int(time.time() * 1000)
})
except Exception as e:
print(f"Error handling vehicle command: {str(e)}")
return jsonify({
"code": 500,
"msg": str(e),
"transId": data.get("transId", ""),
"timestamp": int(time.time() * 1000)
}), 500
# 删除了复杂的红绿灯和路口计算函数,因为已简化为往复运动逻辑
def update_position_with_vector(obj, target_point, start_point, speed, elapsed_time, return_to_start=False):
"""
基于向量的位置更新逻辑
Args:
obj: 需要更新位置的对象(航空器或车辆)
target_point: 目标点坐标 {"latitude": float, "longitude": float}
start_point: 起点坐标 {"latitude": float, "longitude": float}
speed: 移动速度km/h
elapsed_time: 经过的时间(秒)
return_to_start: 是否在到达目标点时返回起点
"""
# 检查是否在等待状态
if "wait_until" not in obj:
obj["wait_until"] = 0
current_time = time.time()
if current_time < obj["wait_until"]:
return False # 还在等待中,不更新位置
# 计算这一步要移动的距离(米)并转换为经纬度
speed_mps = speed * 1000 / 3600
distance = speed_mps * elapsed_time
# 将移动距离转换为经纬度变化量
move_dlat, move_dlon = meters_to_degrees(distance, obj["latitude"])
# 将5米的判断距离转换为经纬度优化从15米减少到5米减少颤抖
check_dlat, check_dlon = meters_to_degrees(5, obj["latitude"])
# 计算当前位置到终点的向量
vector_lat = target_point["latitude"] - obj["latitude"]
vector_lon = target_point["longitude"] - obj["longitude"]
# 计算向量长度(经纬度空间)
vector_length = math.sqrt(vector_lat * vector_lat + vector_lon * vector_lon)
# 检查是否到达终点
if vector_length <= math.sqrt(check_dlat * check_dlat + check_dlon * check_dlon):
# 精确设置到目标点位置,避免位置偏差
obj["latitude"] = target_point["latitude"]
obj["longitude"] = target_point["longitude"]
if return_to_start:
# 返回起点并设置等待时间
obj["wait_until"] = current_time + WAIT_TIME_AFTER_RETURN
print(f"对象 {obj.get('flightNo', obj.get('vehicleNo'))} 返回起点,等待{WAIT_TIME_AFTER_RETURN}秒后继续")
else:
# 设置短暂等待时间,避免立即切换方向导致的颤抖
obj["wait_until"] = current_time + 0.1 # 0.1秒的短暂等待
print(f"对象 {obj.get('flightNo', obj.get('vehicleNo'))} 到达目标点等待0.1秒后切换方向")
return True # 表示已到达终点
else:
# 归一化方向向量
unit_lat = vector_lat / vector_length
unit_lon = vector_lon / vector_length
# 计算这一步的位置变化
dlat = move_dlat * unit_lat
dlon = move_dlon * unit_lon
# 更新位置
obj["latitude"] += dlat
obj["longitude"] += dlon
return False # 表示正在移动中
def update_object_position(obj, elapsed_time):
"""统一的位置更新函数 - 用于飞机和车辆的往复运动"""
# 获取对象类型和标识
obj_type = "航空器" if "flightNo" in obj else "车辆"
obj_id = obj.get("flightNo", obj.get("vehicleNo"))
# 根据移动方向确定目标点
if obj["moving_to_end"]:
target_point = obj["end_point"]
else:
target_point = obj["start_point"]
# 更新位置
reached_target = update_position_with_vector(
obj,
target_point,
obj["start_point"] if obj["moving_to_end"] else obj["end_point"],
obj["speed"],
elapsed_time,
return_to_start=False
)
if reached_target:
# 切换移动方向
obj["moving_to_end"] = not obj["moving_to_end"]
target_name = "终点" if not obj["moving_to_end"] else "起点"
print(f"{obj_type} {obj_id} 到达{'终点' if obj['moving_to_end'] else '起点'},开始前往{target_name}")
# 注意等待时间已经在update_position_with_vector中设置了不需要重复设置
# 更新时间戳
obj["time"] = int(time.time() * 1000)
def update_aircraft_position(aircraft, elapsed_time):
"""更新航空器位置 - 使用统一的位置更新函数"""
update_object_position(aircraft, elapsed_time)
def update_vehicle_position(vehicle, elapsed_time):
"""更新车辆位置 - 使用统一的位置更新函数"""
update_object_position(vehicle, elapsed_time)
# 简化的路口信息保留API兼容性
INTERSECTIONS = {
"TL001": {
"id": "INT001",
"name": "路口1",
"latitude": 36.369696,
"longitude": 120.083084
},
"TL002": {
"id": "INT002",
"name": "路口2",
"latitude": 36.365617,
"longitude": 120.084637
}
}
def generate_traffic_light_data():
"""生成红绿灯数据"""
traffic_light_data = []
# 生成两个固定路口的红绿灯数据
for tl_id, intersection in INTERSECTIONS.items():
signal = {
"id": tl_id,
"state": 1, # 0: RED, 1: GREEN测试时保持绿灯
"timestamp": int(time.time() * 1000),
"intersection": intersection["id"],
"position": {
"latitude": intersection["latitude"],
"longitude": intersection["longitude"]
}
}
traffic_light_data.append(signal)
return traffic_light_data
# 红绿灯数据
traffic_light_data = generate_traffic_light_data()
# 分别记录航空器、机场车辆和无人车的上次更新时间
last_aircraft_update_time = time.time()
last_vehicle_update_time = time.time() # 机场车辆更新时间
last_unmanned_vehicle_update_time = time.time() # 无人车更新时间
last_light_switch_time = time.time()
def check_auth():
auth_header = request.headers.get('Authorization')
if not auth_header:
print("认证失败: 缺少Authorization头")
return False
# 检查是否包含Bearer前缀
if not auth_header.startswith('Bearer '):
print("认证失败: 不是Bearer格式")
return False
# 提取token部分
token = auth_header[7:] # 去掉 "Bearer " 前缀
expected_token = AUTH_TOKEN[7:] # 去掉 "Bearer " 前缀
result = token == expected_token
#print(f"认证结果: {result}")
return result
@app.route('/openApi/getCurrentFlightPositions', methods=['GET', 'OPTIONS'])
def get_flight_positions():
"""获取当前航空器位置信息"""
if request.method == 'OPTIONS':
return '', 204
if not check_auth():
return jsonify({
"status": 401,
"msg": "认证失败",
"data": None
}), 401
global last_aircraft_update_time
current_time = time.time()
elapsed_time = current_time - last_aircraft_update_time
# 只在达到更新间隔时更新位置
if elapsed_time >= UPDATE_INTERVAL:
for aircraft in aircraft_data:
update_aircraft_position(aircraft, UPDATE_INTERVAL)
aircraft["time"] = int(current_time * 1000)
last_aircraft_update_time = current_time
# 创建符合 API 格式的响应数据
response_data = []
for aircraft in aircraft_data:
api_aircraft = {
"flightNo": aircraft["flightNo"],
"longitude": round(aircraft["longitude"], 6), # 经纬度保留6位小数
"latitude": round(aircraft["latitude"], 6), # 经纬度保留6位小数
"time": aircraft["time"]
}
response_data.append(api_aircraft)
return jsonify({
"status": 200,
"msg": "当前航空器实时位置数据",
"data": response_data
})
def switch_traffic_light_state():
"""统一处理红绿灯状态切换"""
global last_light_switch_time
current_time = time.time()
# 西路口红绿灯每15秒切换一次
elapsed_since_switch = current_time - last_light_switch_time
if elapsed_since_switch >= TRAFFIC_LIGHT_SWITCH_INTERVAL: # 使用常量
# 获取当前状态
current_state = traffic_light_data[0]["state"]
# 根据当前状态决定下一个状态
if current_state == 0: # 当前是红灯
# 切换到黄灯(表示红绿灯故障)
traffic_light_data[0]["state"] = 2 # 2 表示黄灯
print(f"西路口红绿灯状态切换为: 黄灯(红绿灯故障)")
elif current_state == 2: # 当前是黄灯
# 从黄灯切换到绿灯
traffic_light_data[0]["state"] = 1 # 1 表示绿灯
print(f"西路口红绿灯状态切换为: 绿灯")
else: # 当前是绿灯
# 从绿灯切换到红灯
traffic_light_data[0]["state"] = 0 # 0 表示红灯
print(f"西路口红绿灯状态切换为: 红灯")
last_light_switch_time = current_time
# 更新东路口红绿灯(根据航空器位置)
if aircraft_data:
aircraft = aircraft_data[0]
lat_diff = abs(aircraft["latitude"] - T6_INTERSECTION["latitude"]) * 111319.9
old_state = traffic_light_data[1]["state"]
traffic_light_data[1]["state"] = 1 if lat_diff > DIST_50M else 1
if old_state != traffic_light_data[1]["state"]:
print(f"东路口红绿灯状态切换为: {'绿灯' if traffic_light_data[1]['state'] == 1 else '红灯'}")
@app.route('/openApi/getCurrentVehiclePositions', methods=['GET', 'OPTIONS'])
def get_vehicle_positions():
"""获取当前车辆位置信息(仅机场车辆)"""
if request.method == 'OPTIONS':
return '', 204
if not check_auth():
return jsonify({
"status": 401,
"msg": "认证失败",
"data": None
}), 401
global last_vehicle_update_time
current_time = time.time()
elapsed_time = current_time - last_vehicle_update_time
try:
# 只在达到更新间隔时更新位置
if elapsed_time >= UPDATE_INTERVAL:
for vehicle in airport_vehicle_data:
update_vehicle_position(vehicle, UPDATE_INTERVAL)
vehicle["time"] = int(current_time * 1000)
last_vehicle_update_time = current_time
response_data = []
for v in airport_vehicle_data:
v_out = {
"vehicleNo": v.get("vehicleNo"),
"longitude": round(v.get("longitude"), 6), # 经纬度保留6位小数
"latitude": round(v.get("latitude"), 6), # 经纬度保留6位小数
"time": v.get("time")
}
response_data.append(v_out)
return jsonify({
"status": 200,
"msg": "当前车辆实时位置数据",
"data": response_data
})
except Exception as e:
print(f"Error in get_vehicle_positions: {str(e)}")
return jsonify({
"status": 500,
"msg": str(e),
"data": None
}), 500
@app.route('/openApi/getTrafficLightSignals', methods=['GET', 'OPTIONS'])
def get_traffic_light_signals():
"""获取红绿灯信号状态 (旧的轮询接口)"""
if request.method == 'OPTIONS':
return '', 204
# 新增: 返回空数据,以禁用轮询数据源
return jsonify({
"status": 200,
"msg": "Traffic light polling endpoint disabled to test push mechanism.",
"data": []
})
@app.after_request
def add_cors_headers(response):
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, OPTIONS'
response.headers['Access-Control-Max-Age'] = '3600'
return response
def check_vehicle_states():
"""定期检查所有车辆状态"""
for vehicle_no, state in vehicle_states.items():
state.log_state()
# 认证相关配置
AUTH_USERNAME = "dianxin"
AUTH_PASSWORD = "dianxin@123"
AUTH_TOKEN = "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3MzI3ODMwOTAsInVzZXJuYW1lIjoiYWRtaW4ifQ.y9feEL_9NT8UzED9NNkb0Ln6C-PBoufiSHWobWe5vWY"
@app.route('/login', methods=['POST', 'OPTIONS'])
def login():
if request.method == 'OPTIONS':
return '', 204
# 支持多种方式获取用户名和密码
username = None
password = None
# 优先从 URL query parameters 中获取符合API文档示例
username = request.args.get('username')
password = request.args.get('password')
# 如果query参数中没有尝试从form data中获取
if not username or not password:
username = request.form.get('username') or username
password = request.form.get('password') or password
# 如果form data中也没有尝试从JSON body中获取
if not username or not password:
try:
json_data = request.get_json(silent=True)
if json_data:
username = json_data.get('username') or username
password = json_data.get('password') or password
except:
pass
print(f"收到登录请求: username={username}, password={password}")
print(f"请求 URL: {request.url}")
print(f"请求方法: {request.method}")
print(f"请求参数: {request.args}")
print(f"请求表单: {request.form}")
print(f"请求JSON: {request.get_json(silent=True)}")
if not username or not password:
return jsonify({
"status": 400,
"msg": "缺少用户名或密码",
"data": None
}), 400
if username == AUTH_USERNAME and password == AUTH_PASSWORD:
return jsonify({
"status": 200,
"msg": "登入成功",
"data": AUTH_TOKEN
})
else:
return jsonify({
"status": 401,
"msg": "用户名或密码错误",
"data": None
}), 401
@app.route('/openApi/getVehicleStatus', methods=['GET', 'OPTIONS'])
def get_vehicle_status():
"""获取无人车状态信息"""
if request.method == 'OPTIONS':
return '', 204
vehicle_id = request.args.get('vehicleId')
if not vehicle_id:
return jsonify({
"status": 400,
"msg": "缺少 vehicleId 参数",
"data": None
}), 400
# 查找对应的车辆
vehicle_state = vehicle_states.get(vehicle_id)
if not vehicle_state:
return jsonify({
"status": 404,
"msg": f"未找到车辆 {vehicle_id}",
"data": None
}), 404
# 返回车辆状态
return jsonify({
"status": 200,
"msg": "获取车辆状态成功",
"data": {
"vehicleId": vehicle_id,
"status": "NORMAL" if vehicle_state.is_running else "STOPPED",
"command": vehicle_state.current_command,
"commandPriority": vehicle_state.command_priority,
"trafficLightState": vehicle_state.traffic_light_state,
"timestamp": int(time.time() * 1000)
}
})
# 无人车位置数据生成函数
def generate_unmanned_vehicle_location_data():
"""生成无人车位置数据符合官方API格式"""
unmanned_vehicles = []
# 从现有vehicle_data中筛选无人车QN开头的车辆
for vehicle in unmanned_vehicle_data:
location_info = {
"transId": str(uuid.uuid4()),
"timestamp": int(time.time() * 1000),
"vehicleID": vehicle["vehicleNo"],
"longitude": round(vehicle.get("longitude"), 6), # 经纬度保留6位小数
"latitude": round(vehicle.get("latitude"), 6) # 经纬度保留6位小数
}
unmanned_vehicles.append(location_info)
return unmanned_vehicles
# 无人车状态数据生成函数
def generate_unmanned_vehicle_state_data(vehicle_id=None, is_single=True):
"""生成无人车状态数据符合官方API格式"""
vehicle_states_data = []
if is_single and vehicle_id:
# 单个车辆状态查询
vehicle_state = vehicle_states.get(vehicle_id)
if vehicle_state:
state_info = {
"transId": str(uuid.uuid4()),
"timestamp": int(time.time() * 1000),
"vehicleID": vehicle_id,
"loginState": True,
"faultInfo": [],
"activeSafety": not vehicle_state.is_running,
"RC": False,
"Command": 1 if vehicle_state.current_command == "ALERT" else 0,
"airportInfo": [],
"vehicleMode": 2, # 自动模式
"gearState": 2, # D档
"chassisReady": vehicle_state.is_running,
"collisionStatus": False,
"clearance": 1 if vehicle_state.is_running else 0,
"turnSignalStstus": 0,
"pointCloud": []
}
vehicle_states_data.append(state_info)
else:
# 所有车辆状态查询
for vehicle in unmanned_vehicle_data:
vehicle_state = vehicle_states.get(vehicle["vehicleNo"])
state_info = {
"transId": str(uuid.uuid4()),
"timestamp": int(time.time() * 1000),
"vehicleID": vehicle["vehicleNo"],
"loginState": True,
"faultInfo": [],
"activeSafety": not vehicle_state.is_running if vehicle_state else False,
"RC": False,
"Command": 1 if vehicle_state and vehicle_state.current_command == "ALERT" else 0,
"airportInfo": [],
"vehicleMode": 2,
"gearState": 2,
"chassisReady": vehicle_state.is_running if vehicle_state else True,
"collisionStatus": False,
"clearance": 1 if vehicle_state and vehicle_state.is_running else 0,
"turnSignalStstus": 0,
"pointCloud": []
}
vehicle_states_data.append(state_info)
return vehicle_states_data
# 无人车API接口
@app.route('/api/VehicleLocationInfo', methods=['GET', 'OPTIONS'])
def get_unmanned_vehicle_location():
"""无人车位置上报接口"""
if request.method == 'OPTIONS':
return '', 204
try:
current_time = time.time()
global last_unmanned_vehicle_update_time
elapsed_time = current_time - last_unmanned_vehicle_update_time
if elapsed_time >= UPDATE_INTERVAL:
for vehicle in unmanned_vehicle_data:
update_vehicle_position(vehicle, UPDATE_INTERVAL)
vehicle["time"] = int(current_time * 1000)
last_unmanned_vehicle_update_time = current_time
# 生成无人车位置数据
location_data = generate_unmanned_vehicle_location_data()
return jsonify(location_data)
except Exception as e:
print(f"Error in get_unmanned_vehicle_location: {str(e)}")
return jsonify([]), 500
@app.route('/api/VehicleStateInfo', methods=['POST', 'OPTIONS'])
def get_unmanned_vehicle_state():
"""无人车状态查询接口"""
if request.method == 'OPTIONS':
return '', 204
try:
data = request.json
vehicle_id = data.get("vehicleID") if data else None
is_single = data.get("isSingle", True) if data else False
print(f"收到无人车状态查询请求: vehicle_id={vehicle_id}, is_single={is_single}")
# 生成无人车状态数据
state_data = generate_unmanned_vehicle_state_data(vehicle_id, is_single)
print(f"返回无人车状态数据,数量: {len(state_data)}")
return jsonify(state_data)
except Exception as e:
print(f"Error in get_unmanned_vehicle_state: {str(e)}")
return jsonify([]), 500
def update_ca3456_status():
"""更新CA3456航空器状态 - 进港->停留1分钟->出港循环"""
global ca3456_status
current_time = time.time()
elapsed_time = current_time - ca3456_status["status_start_time"]
# 计算当前在循环中的位置
cycle_time = elapsed_time % ca3456_status["cycle_duration"]
if cycle_time < ca3456_status["arrival_duration"]:
# 进港阶段
ca3456_status["type"] = "IN"
logging.info(f"CA3456 进港阶段: {cycle_time:.1f}s")
elif cycle_time < ca3456_status["arrival_duration"] + ca3456_status["wait_duration"]:
# 停留阶段
ca3456_status["type"] = "ARRIVED"
logging.info(f"CA3456 停留阶段: {cycle_time:.1f}s")
else:
# 出港阶段
ca3456_status["type"] = "OUT"
logging.info(f"CA3456 出港阶段: {cycle_time:.1f}s")
ca3456_status["timestamp"] = int(current_time * 1000)
# 新增API端点
@app.route('/userInfoController/refreshToken', methods=['GET', 'OPTIONS'])
def refresh_token():
"""Token刷新接口"""
if request.method == 'OPTIONS':
return '', 204
if not check_auth():
return jsonify({
"status": 401,
"msg": "认证失败",
"data": None
}), 401
# 返回新的token实际上还是同一个
return jsonify({
"status": 200,
"msg": "Token刷新成功",
"data": {
"token": "Bearer dianxin-token-2024",
"expiresIn": 86400 # 24小时过期
}
})
@app.route('/runwayPathPlanningController/findArrTaxiwayByRunwayAndContactCrossAndSeat', methods=['GET', 'OPTIONS'])
def get_arrival_taxiway_route():
"""获取进港滑行路线"""
if request.method == 'OPTIONS':
return '', 204
if not check_auth():
return jsonify({
"status": 401,
"msg": "认证失败",
"data": None
}), 401
# 获取请求参数
in_runway = request.args.get('inRunway', '35')
out_runway = request.args.get('outRunway', '34')
contact_cross = request.args.get('contactCross', 'F1')
seat = request.args.get('seat', '138')
logging.info(f"进港路线查询: inRunway={in_runway}, outRunway={out_runway}, contactCross={contact_cross}, seat={seat}")
# 返回进港路线数据
return jsonify({
"status": 200,
"msg": "进港滑行路线查询成功",
"data": aircraft_routes["arrival"]
})
@app.route('/runwayPathPlanningController/findDepTaxiwayByRunwayAndContactCrossAndSeat', methods=['GET', 'OPTIONS'])
def get_departure_taxiway_route():
"""获取出港滑行路线"""
if request.method == 'OPTIONS':
return '', 204
if not check_auth():
return jsonify({
"status": 401,
"msg": "认证失败",
"data": None
}), 401
# 获取请求参数
in_runway = request.args.get('inRunway', '35')
out_runway = request.args.get('outRunway', '34')
start_seat = request.args.get('startSeat', '138')
logging.info(f"出港路线查询: inRunway={in_runway}, outRunway={out_runway}, startSeat={start_seat}")
# 返回出港路线数据
return jsonify({
"status": 200,
"msg": "出港滑行路线查询成功",
"data": aircraft_routes["departure"]
})
@app.route('/aircraftStatusController/getAircraftStatus', methods=['GET', 'OPTIONS'])
def get_aircraft_status():
"""获取航空器状态"""
if request.method == 'OPTIONS':
return '', 204
if not check_auth():
return jsonify({
"status": 401,
"msg": "认证失败",
"data": None
}), 401
# 更新CA3456状态
update_ca3456_status()
# 返回当前状态
return jsonify({
"status": 200,
"msg": "航空器状态查询成功",
"data": {
"type": ca3456_status["type"],
"flightNo": ca3456_status["flightNo"],
"inRunway": ca3456_status["inRunway"],
"outRunway": ca3456_status["outRunway"],
"contactCross": ca3456_status["contactCross"],
"seat": ca3456_status["seat"],
"timestamp": ca3456_status["timestamp"]
}
})
if __name__ == '__main__':
app.run(host='localhost', port=8090, debug=True)