538 lines
22 KiB
Python
538 lines
22 KiB
Python
from flask import Flask, jsonify, request
|
||
import time
|
||
import math
|
||
import random
|
||
|
||
app = Flask(__name__)
|
||
|
||
# 地球半径(米)
|
||
EARTH_RADIUS = 6378137.0
|
||
|
||
def meters_to_degrees(meters, latitude):
|
||
"""
|
||
将米转换为度,考虑纬度的影响
|
||
"""
|
||
# 纬度方向:1度 = 111,319.9米
|
||
meters_per_deg_lat = 111319.9
|
||
# 经度方向:1度 = 111,319.9 * cos(latitude)米
|
||
meters_per_deg_lon = meters_per_deg_lat * math.cos(math.radians(latitude))
|
||
return meters / meters_per_deg_lat, meters / meters_per_deg_lon
|
||
|
||
# 距离配置(米)
|
||
DIST_150M = 150
|
||
DIST_100M = 100
|
||
DIST_50M = 50
|
||
|
||
# 两个路口的位置
|
||
WEST_INTERSECTION = {"longitude": 120.088003, "latitude": 36.361999}
|
||
EAST_INTERSECTION = {"longitude": 120.089003, "latitude": 36.361999}
|
||
|
||
# 位置更新间隔(秒)
|
||
UPDATE_INTERVAL = 1.0
|
||
|
||
# 航空器数据
|
||
aircraft_data = [
|
||
{
|
||
"flightNo": "AC001", # 航空器
|
||
"latitude": EAST_INTERSECTION["latitude"] - (DIST_150M / 111319.9), # 东路口南150米
|
||
"longitude": EAST_INTERSECTION["longitude"],
|
||
"time": int(time.time() * 1000),
|
||
"direction": 1, # 1表示向北
|
||
"speed": 50.0 # 滑行速度 50km/h
|
||
}
|
||
]
|
||
|
||
# 添加默认速度常量
|
||
DEFAULT_VEHICLE_SPEED = 36.0 # km/h
|
||
EMERGENCY_BRAKE_DECELERATION = 0.8 # 紧急制动减速度 (每次更新减速 80%)
|
||
NORMAL_BRAKE_DECELERATION = 0.2 # 正常制动减速度 (每次更新减速 20%)
|
||
|
||
# 车辆数据
|
||
vehicle_data = [
|
||
{
|
||
"vehicleNo": "QN001", # 无人车1(西路口)
|
||
"latitude": WEST_INTERSECTION["latitude"] + (DIST_50M / 111319.9), # 西路口北50米
|
||
"longitude": WEST_INTERSECTION["longitude"],
|
||
"time": int(time.time() * 1000),
|
||
"direction": -1, # -1表示向南
|
||
"speed": DEFAULT_VEHICLE_SPEED, # 使用默认速度
|
||
"phase": 0 # 0: 南北移动, 1: 东西移动
|
||
},
|
||
{
|
||
"vehicleNo": "QN002", # 无人车2(东路口)
|
||
"latitude": EAST_INTERSECTION["latitude"],
|
||
"longitude": EAST_INTERSECTION["longitude"] - (DIST_150M / (111319.9 * math.cos(math.radians(EAST_INTERSECTION["latitude"])))), # 东路口西150米
|
||
"time": int(time.time() * 1000),
|
||
"direction": 1, # 1表示向东
|
||
"speed": DEFAULT_VEHICLE_SPEED # 使用默认速度
|
||
},
|
||
{
|
||
"vehicleNo": "TQ001", # 特勤车(西路口)
|
||
"latitude": WEST_INTERSECTION["latitude"],
|
||
"longitude": WEST_INTERSECTION["longitude"] + (DIST_50M / (111319.9 * math.cos(math.radians(WEST_INTERSECTION["latitude"])))), # 西路口东50米
|
||
"time": int(time.time() * 1000),
|
||
"direction": -1, # -1表示向西
|
||
"speed": DEFAULT_VEHICLE_SPEED # 使用默认速度
|
||
}
|
||
]
|
||
|
||
# 添加车辆状态类
|
||
class VehicleState:
|
||
def __init__(self, vehicle_no):
|
||
self.vehicle_no = vehicle_no
|
||
self.is_running = True # 运行状态
|
||
self.current_command = None # 当前执行的指令
|
||
self.command_priority = 0 # 当前指令优先级
|
||
self.target_speed = DEFAULT_VEHICLE_SPEED # 目标速度
|
||
self.brake_mode = None # 制动模式:'emergency' 或 'normal'
|
||
|
||
def can_be_overridden_by(self, command_type):
|
||
# 指令优先级: SIGNAL(4) > ALERT(3) > WARNING(2) > RESUME(1)
|
||
priority_map = {
|
||
"SIGNAL": 4,
|
||
"ALERT": 3,
|
||
"WARNING": 2,
|
||
"RESUME": 1
|
||
}
|
||
new_priority = priority_map.get(command_type, 0)
|
||
|
||
# 如果当前没有指令,或者新指令优先级更高,或者是相同类型的指令,则允许覆盖
|
||
return (self.current_command is None or
|
||
new_priority >= self.command_priority or
|
||
command_type == self.current_command)
|
||
|
||
def update_command(self, command_type):
|
||
priority_map = {
|
||
"SIGNAL": 4,
|
||
"ALERT": 3,
|
||
"WARNING": 2,
|
||
"RESUME": 1
|
||
}
|
||
self.command_priority = priority_map.get(command_type, 0)
|
||
self.current_command = command_type
|
||
|
||
# 添加车辆状态管理
|
||
vehicle_states = {
|
||
"QN001": VehicleState("QN001"),
|
||
"QN002": VehicleState("QN002"),
|
||
"TQ001": VehicleState("TQ001")
|
||
}
|
||
|
||
def calculate_distance_to_target(vehicle, target_lat, target_lon):
|
||
"""计算车辆到目标位置的距离(米)"""
|
||
dlat = target_lat - vehicle["latitude"]
|
||
dlon = target_lon - vehicle["longitude"]
|
||
|
||
# 使用 Haversine 公式计算距离
|
||
R = 6371000 # 地球半径(米)
|
||
a = math.sin(dlat/2) * math.sin(dlat/2) + \
|
||
math.cos(math.radians(vehicle["latitude"])) * math.cos(math.radians(target_lat)) * \
|
||
math.sin(dlon/2) * math.sin(dlon/2)
|
||
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
|
||
return R * c
|
||
|
||
@app.route('/vehicle/command', methods=['POST'])
|
||
def handle_vehicle_command():
|
||
try:
|
||
data = request.json
|
||
vehicle_id = data.get("vehicleId")
|
||
command_type = data.get("type", "").upper()
|
||
reason = data.get("reason", "").upper()
|
||
|
||
# 检查车辆是否存在
|
||
vehicle_state = vehicle_states.get(vehicle_id)
|
||
if not vehicle_state:
|
||
return jsonify({
|
||
"status": "error",
|
||
"message": f"Vehicle {vehicle_id} not found"
|
||
}), 404
|
||
|
||
# 检查是否为特勤车辆
|
||
if vehicle_id.startswith("TQ"):
|
||
return jsonify({
|
||
"status": "ok",
|
||
"message": "Special vehicle ignores command"
|
||
})
|
||
|
||
# 检查指令优先级
|
||
if not vehicle_state.can_be_overridden_by(command_type):
|
||
return jsonify({
|
||
"status": "error",
|
||
"message": "Command priority too low"
|
||
}), 400
|
||
|
||
# 获取目标位置
|
||
target_lat = data.get("latitude", 0.0)
|
||
target_lon = data.get("longitude", 0.0)
|
||
|
||
# 处理不同类型的指令
|
||
if command_type == "SIGNAL":
|
||
signal_state = data.get("signalState", "").upper()
|
||
if signal_state == "RED":
|
||
# 计算到路口的距离
|
||
distance = calculate_distance_to_target(
|
||
next(v for v in vehicles if v["vehicleNo"] == vehicle_id),
|
||
target_lat, target_lon
|
||
)
|
||
|
||
# 根据距离决定制动方式
|
||
if distance <= 50: # 已经在停止线内
|
||
vehicle_state.is_running = False
|
||
vehicle_state.target_speed = 0
|
||
vehicle_state.brake_mode = "emergency" # 紧急停车
|
||
elif distance <= 100: # 距离停止线100米内
|
||
vehicle_state.is_running = False
|
||
vehicle_state.target_speed = 0
|
||
vehicle_state.brake_mode = "normal" # 正常制动
|
||
elif signal_state == "GREEN":
|
||
vehicle_state.is_running = True
|
||
vehicle_state.target_speed = DEFAULT_VEHICLE_SPEED
|
||
vehicle_state.brake_mode = None
|
||
|
||
elif command_type == "ALERT":
|
||
vehicle_state.is_running = False
|
||
vehicle_state.target_speed = 0
|
||
vehicle_state.brake_mode = "emergency" # 告警紧急制动
|
||
|
||
elif command_type == "WARNING":
|
||
vehicle_state.is_running = False
|
||
vehicle_state.target_speed = 0
|
||
vehicle_state.brake_mode = "normal" # 预警正常制动
|
||
|
||
elif command_type == "RESUME":
|
||
vehicle_state.is_running = True
|
||
vehicle_state.target_speed = DEFAULT_VEHICLE_SPEED
|
||
vehicle_state.brake_mode = None
|
||
|
||
# 更新车辆状态
|
||
vehicle_state.command_type = command_type
|
||
vehicle_state.command_reason = reason
|
||
vehicle_state.command_priority = COMMAND_PRIORITIES.get(command_type, 0)
|
||
vehicle_state.last_command_time = time.time()
|
||
|
||
print(f"Vehicle {vehicle_id} state updated: running={vehicle_state.is_running}, "
|
||
f"command={command_type}, reason={reason}, priority={vehicle_state.command_priority}, "
|
||
f"target_lat={target_lat}, target_lon={target_lon}")
|
||
|
||
return jsonify({
|
||
"status": "ok",
|
||
"message": "Command executed successfully"
|
||
})
|
||
|
||
except Exception as e:
|
||
print(f"Error handling vehicle command: {str(e)}")
|
||
return jsonify({
|
||
"status": "error",
|
||
"message": str(e)
|
||
}), 500
|
||
|
||
def calculate_distance_to_intersection(vehicle, intersection):
|
||
"""计算车辆到路口的距离(米)"""
|
||
vehicle_lat = vehicle["latitude"]
|
||
vehicle_lon = vehicle["longitude"]
|
||
intersection_lat = intersection["latitude"]
|
||
intersection_lon = intersection["longitude"]
|
||
|
||
# 使用经纬度计算距离
|
||
lat_diff = (vehicle_lat - intersection_lat) * 111319.9
|
||
lon_diff = (vehicle_lon - intersection_lon) * (111319.9 * math.cos(math.radians(vehicle_lat)))
|
||
return math.sqrt(lat_diff * lat_diff + lon_diff * lon_diff)
|
||
|
||
def get_nearest_intersection(vehicle):
|
||
"""获取车辆最近的路口及其红绿灯状态"""
|
||
west_dist = calculate_distance_to_intersection(vehicle, WEST_INTERSECTION)
|
||
east_dist = calculate_distance_to_intersection(vehicle, EAST_INTERSECTION)
|
||
|
||
if west_dist <= east_dist:
|
||
return WEST_INTERSECTION, traffic_light_data[0], west_dist # TL001
|
||
else:
|
||
return EAST_INTERSECTION, traffic_light_data[1], east_dist # TL002
|
||
|
||
def get_front_traffic_light(vehicle, distance_to_west, distance_to_east):
|
||
"""获取车辆前方的红绿灯状态"""
|
||
# 根据车辆的位置和方向判断前方路口
|
||
if vehicle["vehicleNo"].startswith("QN"):
|
||
if vehicle["vehicleNo"] == "QN001":
|
||
# QN001 的路线:西路口北侧 -> 南 -> 东 -> 北
|
||
if vehicle["phase"] == 0: # 南北移动
|
||
if vehicle["direction"] == -1: # 向南
|
||
return traffic_light_data[0], distance_to_west # 西路口红绿灯
|
||
else: # 向北
|
||
return None, float('inf') # 没有红绿灯
|
||
else: # 东西移动
|
||
if vehicle["direction"] == 1: # 向东
|
||
return None, float('inf') # 没有红绿灯
|
||
else: # 向西
|
||
return traffic_light_data[0], distance_to_west # 西路口红绿灯
|
||
else: # QN002
|
||
# QN002 的路线:东西方向往返
|
||
if vehicle["direction"] == 1: # 向东
|
||
return traffic_light_data[1], distance_to_east # 东路口红绿灯
|
||
else: # 向西
|
||
return traffic_light_data[0], distance_to_west # 西路口红绿灯
|
||
elif vehicle["vehicleNo"].startswith("TQ"):
|
||
# 特勤车:先东西后南北
|
||
if vehicle["phase"] == 0: # 东西移动
|
||
if vehicle["direction"] == 1: # 向东
|
||
return None, float('inf') # 没有<E6B2A1><E69C89><EFBFBD>绿灯
|
||
else: # 向西
|
||
return traffic_light_data[0], distance_to_west # 西路口红绿灯
|
||
else: # 南北移动
|
||
return None, float('inf') # 南北方向没有红绿灯
|
||
|
||
return None, float('inf') # 默认返回无红绿灯
|
||
|
||
def update_vehicle_position(vehicle, elapsed_time):
|
||
"""更新车辆位置"""
|
||
# 计算到两个路口的距离
|
||
distance_to_west = calculate_distance_to_intersection(vehicle, WEST_INTERSECTION)
|
||
distance_to_east = calculate_distance_to_intersection(vehicle, EAST_INTERSECTION)
|
||
|
||
# 获取前方红绿灯状态
|
||
traffic_light, distance = get_front_traffic_light(vehicle, distance_to_west, distance_to_east)
|
||
|
||
# 计算基础速度(米/秒)
|
||
if vehicle["vehicleNo"].startswith("TQ"):
|
||
base_speed_mps = DEFAULT_VEHICLE_SPEED * 1000 / 3600
|
||
else:
|
||
vehicle_state = vehicle_states.get(vehicle["vehicleNo"])
|
||
if not vehicle_state or not vehicle_state.is_running:
|
||
return
|
||
base_speed_mps = vehicle["speed"] * 1000 / 3600
|
||
|
||
# 如果是无人车,还需要考虑制动模式
|
||
if vehicle_state.brake_mode:
|
||
current_speed_mps = base_speed_mps
|
||
target_speed_mps = vehicle_state.target_speed * 1000 / 3600
|
||
|
||
if vehicle_state.brake_mode == "emergency":
|
||
base_speed_mps = current_speed_mps * (1 - EMERGENCY_BRAKE_DECELERATION)
|
||
else: # normal
|
||
base_speed_mps = current_speed_mps * (1 - NORMAL_BRAKE_DECELERATION)
|
||
|
||
base_speed_mps = max(base_speed_mps, target_speed_mps)
|
||
|
||
# 根据红绿灯状态调整速度
|
||
if traffic_light and traffic_light["state"] == 0: # 红灯
|
||
if distance <= 50: # 已经在停止线内
|
||
speed_mps = 0 # 紧急停车
|
||
print(f"车辆 {vehicle['vehicleNo']} 在停止线内遇红灯,紧急停车")
|
||
elif distance <= 100: # 距离停止线100米内
|
||
# 计算减速度,使车辆在停止线前停下
|
||
deceleration = (base_speed_mps * base_speed_mps) / (2 * (distance - 50))
|
||
speed_mps = max(0, base_speed_mps - deceleration * elapsed_time)
|
||
print(f"车辆 {vehicle['vehicleNo']} 距红灯 {distance:.1f}米,减速至 {speed_mps * 3.6:.1f}km/h")
|
||
else:
|
||
speed_mps = base_speed_mps
|
||
else: # 绿灯或无红绿灯
|
||
speed_mps = base_speed_mps
|
||
|
||
# 更新车辆速度
|
||
vehicle["speed"] = speed_mps * 3600 / 1000 # 转回 km/h
|
||
|
||
# 计算移动距离
|
||
distance = speed_mps * elapsed_time
|
||
|
||
# 计算经纬度变化
|
||
dlat, dlon = meters_to_degrees(distance, vehicle["latitude"])
|
||
|
||
if vehicle["vehicleNo"].startswith("QN"):
|
||
# 无人车的路径更新逻辑
|
||
if vehicle["vehicleNo"] == "QN001":
|
||
# 无人车1:先南北后东西往返
|
||
if "phase" not in vehicle:
|
||
vehicle["phase"] = 0
|
||
|
||
if vehicle["phase"] == 0: # 南北移动
|
||
new_lat = vehicle["latitude"] + (dlat * vehicle["direction"])
|
||
if vehicle["direction"] == -1 and new_lat <= WEST_INTERSECTION["latitude"]:
|
||
# 到达路口,切换到东西移动
|
||
vehicle["phase"] = 1
|
||
vehicle["direction"] = 1 # 向东
|
||
elif vehicle["direction"] == 1 and new_lat >= WEST_INTERSECTION["latitude"] + (DIST_50M / 111319.9):
|
||
# 返回起点
|
||
vehicle["phase"] = 0
|
||
vehicle["direction"] = -1 # 向南
|
||
vehicle["latitude"] = new_lat
|
||
|
||
else: # 东西移动
|
||
new_lon = vehicle["longitude"] + (dlon * vehicle["direction"])
|
||
if vehicle["direction"] == 1 and new_lon >= WEST_INTERSECTION["longitude"] + (DIST_100M / (111319.9 * math.cos(math.radians(vehicle["latitude"])))):
|
||
# 到达东端,开始返回
|
||
vehicle["direction"] = -1 # 向西
|
||
elif vehicle["direction"] == -1 and new_lon <= WEST_INTERSECTION["longitude"]:
|
||
# 返回路口,切换到南北移动
|
||
vehicle["phase"] = 0
|
||
vehicle["direction"] = 1 # 向北
|
||
vehicle["longitude"] = new_lon
|
||
|
||
else: # QN002
|
||
# 无人车2:东西方向往返(东路口)
|
||
new_lon = vehicle["longitude"] + (dlon * vehicle["direction"])
|
||
if new_lon >= EAST_INTERSECTION["longitude"] + (DIST_100M / (111319.9 * math.cos(math.radians(vehicle["latitude"])))):
|
||
vehicle["direction"] = -1 # 向西
|
||
elif new_lon <= EAST_INTERSECTION["longitude"] - (DIST_150M / (111319.9 * math.cos(math.radians(vehicle["latitude"])))):
|
||
vehicle["direction"] = 1 # 向东
|
||
vehicle["longitude"] = new_lon
|
||
|
||
elif vehicle["vehicleNo"].startswith("TQ"):
|
||
# 特勤车的路径更新逻辑
|
||
if "phase" not in vehicle:
|
||
vehicle["phase"] = 0
|
||
|
||
if vehicle["phase"] == 0: # 东西移动
|
||
new_lon = vehicle["longitude"] + (dlon * vehicle["direction"])
|
||
if vehicle["direction"] == -1 and new_lon <= WEST_INTERSECTION["longitude"]:
|
||
vehicle["phase"] = 1 # 切换到南北移动
|
||
vehicle["direction"] = -1 # 向南
|
||
elif vehicle["direction"] == 1 and new_lon >= WEST_INTERSECTION["longitude"] + (DIST_50M / (111319.9 * math.cos(math.radians(vehicle["latitude"])))):
|
||
vehicle["direction"] = -1 # 向西
|
||
vehicle["longitude"] = new_lon
|
||
else: # 南北移动
|
||
new_lat = vehicle["latitude"] + (dlat * vehicle["direction"])
|
||
if new_lat <= WEST_INTERSECTION["latitude"] - (DIST_100M / 111319.9): # 到达南端
|
||
vehicle["direction"] = 1 # 向北
|
||
elif new_lat >= WEST_INTERSECTION["latitude"]: # 返回到路口
|
||
vehicle["phase"] = 0 # 切换回东西移动
|
||
vehicle["direction"] = 1 # 向北
|
||
vehicle["longitude"] = WEST_INTERSECTION["longitude"] # 重置到起始位置
|
||
vehicle["latitude"] = new_lat
|
||
|
||
# 更新时间戳
|
||
vehicle["time"] = int(time.time() * 1000)
|
||
|
||
def update_aircraft_position(aircraft, elapsed_time):
|
||
"""更新航空器位置"""
|
||
# 计算一次更新的移动距离(米)
|
||
speed_mps = aircraft["speed"] * 1000 / 3600 # 速度转换为米/秒
|
||
distance = speed_mps * elapsed_time
|
||
|
||
# 计算经纬度变化
|
||
dlat, dlon = meters_to_degrees(distance, aircraft["latitude"])
|
||
|
||
new_lat = aircraft["latitude"] + (dlat * aircraft["direction"])
|
||
if new_lat >= EAST_INTERSECTION["latitude"] + (DIST_150M / 111319.9):
|
||
aircraft["direction"] = -1 # 向南
|
||
elif new_lat <= EAST_INTERSECTION["latitude"] - (DIST_150M / 111319.9):
|
||
aircraft["direction"] = 1 # 向北
|
||
aircraft["latitude"] = new_lat
|
||
|
||
def update_traffic_light_for_aircraft():
|
||
"""根据航空器位置更新东路口红绿灯状态"""
|
||
if not aircraft_data:
|
||
return
|
||
|
||
aircraft = aircraft_data[0]
|
||
# 计算到东路口的距离(米)
|
||
lat_diff = abs(aircraft["latitude"] - EAST_INTERSECTION["latitude"]) * 111319.9
|
||
|
||
# 更新TL002(东路口)的状态
|
||
for light in traffic_light_data:
|
||
if light["id"] == "TL002":
|
||
if lat_diff <= 50.0: # 距离于50米时为红灯
|
||
light["state"] = 0 # 红灯
|
||
else:
|
||
light["state"] = 1 # 绿灯
|
||
break
|
||
|
||
# 定义路口信息
|
||
INTERSECTIONS = {
|
||
"TL001": {
|
||
"id": "INT001",
|
||
"name": "西路口",
|
||
"latitude": WEST_INTERSECTION["latitude"],
|
||
"longitude": WEST_INTERSECTION["longitude"]
|
||
},
|
||
"TL002": {
|
||
"id": "INT002",
|
||
"name": "东路口",
|
||
"latitude": EAST_INTERSECTION["latitude"],
|
||
"longitude": EAST_INTERSECTION["longitude"]
|
||
}
|
||
}
|
||
|
||
def generate_traffic_light_data():
|
||
"""生成红绿灯数据"""
|
||
traffic_light_data = []
|
||
|
||
# 生成两个固定路口的红绿灯数据
|
||
for tl_id, intersection in INTERSECTIONS.items():
|
||
signal = {
|
||
"id": tl_id,
|
||
"state": random.randint(0, 1), # 0: RED, 1: GREEN
|
||
"timestamp": int(time.time() * 1000),
|
||
"intersectionId": intersection["id"],
|
||
"latitude": intersection["latitude"],
|
||
"longitude": intersection["longitude"]
|
||
}
|
||
traffic_light_data.append(signal)
|
||
|
||
return traffic_light_data
|
||
|
||
# 红绿灯<E7BBBF><E781AF><EFBFBD>据
|
||
traffic_light_data = generate_traffic_light_data()
|
||
|
||
# 分别记录航空器和车辆的上次更新时间
|
||
last_aircraft_update_time = time.time()
|
||
last_vehicle_update_time = time.time()
|
||
last_light_switch_time = time.time()
|
||
|
||
@app.route('/api/getCurrentFlightPositions')
|
||
def get_flight_positions():
|
||
global last_aircraft_update_time
|
||
current_time = time.time()
|
||
elapsed_time = current_time - last_aircraft_update_time
|
||
|
||
# 只在达到更新间隔时更新位置
|
||
if elapsed_time >= UPDATE_INTERVAL:
|
||
for aircraft in aircraft_data:
|
||
update_aircraft_position(aircraft, UPDATE_INTERVAL)
|
||
aircraft["time"] = int(current_time * 1000)
|
||
last_aircraft_update_time = current_time
|
||
|
||
return jsonify(aircraft_data)
|
||
|
||
@app.route('/api/getCurrentVehiclePositions')
|
||
def get_vehicle_positions():
|
||
global last_vehicle_update_time
|
||
current_time = time.time()
|
||
elapsed_time = current_time - last_vehicle_update_time
|
||
|
||
# 只在达到更新间隔时更新位置
|
||
if elapsed_time >= UPDATE_INTERVAL:
|
||
for vehicle in vehicle_data:
|
||
update_vehicle_position(vehicle, UPDATE_INTERVAL)
|
||
vehicle["time"] = int(current_time * 1000)
|
||
last_vehicle_update_time = current_time
|
||
|
||
return jsonify(vehicle_data)
|
||
|
||
@app.route('/api/getTrafficLightSignals')
|
||
def get_traffic_light_signals():
|
||
global last_light_switch_time
|
||
current_time = time.time()
|
||
|
||
# 更新时间戳
|
||
for light in traffic_light_data:
|
||
light["timestamp"] = int(current_time * 1000)
|
||
|
||
# TL001(西路口)状态每15秒切换一次
|
||
elapsed_since_switch = current_time - last_light_switch_time
|
||
if elapsed_since_switch >= 15: # 每15秒切换一次
|
||
traffic_light_data[0]["state"] = 1 if traffic_light_data[0]["state"] == 0 else 0
|
||
last_light_switch_time = current_time
|
||
|
||
# 根据航空器位置更新TL002(东路口)
|
||
update_traffic_light_for_aircraft()
|
||
|
||
return jsonify(traffic_light_data)
|
||
|
||
@app.after_request
|
||
def add_cors_headers(response):
|
||
response.headers['Access-Control-Allow-Origin'] = '*'
|
||
response.headers['Access-Control-Allow-Headers'] = 'Content-Type'
|
||
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, OPTIONS'
|
||
return response
|
||
|
||
if __name__ == '__main__':
|
||
app.run(host='localhost', port=8081, debug=True) |