CollisionAvoidance/tools/mock_server.py

724 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, jsonify, request
import time
import math
import random
import logging
app = Flask(__name__)
# 地球半径(米)
EARTH_RADIUS = 6378137.0
COMMAND_PRIORITIES = {
"ALERT": 5,
"SIGNAL": 4,
"WARNING": 3,
"GREEN": 2,
"RESUME": 1
}
def meters_to_degrees(meters, latitude):
"""
将米转换为度,考虑纬度的影响
"""
# 纬度方向1度 = 111,319.9米
meters_per_deg_lat = 111319.9
# 经度方向1度 = 111,319.9 * cos(latitude)米
meters_per_deg_lon = meters_per_deg_lat * math.cos(math.radians(latitude))
return meters / meters_per_deg_lat, meters / meters_per_deg_lon
# 距离配置(米)
DIST_300M = 300
DIST_200M = 200
DIST_150M = 150
DIST_100M = 100
DIST_50M = 50
# 两个路口的位置
WEST_INTERSECTION = {"longitude": 120.086003, "latitude": 36.361999}
EAST_INTERSECTION = {"longitude": 120.090003, "latitude": 36.361999}
# 位置更新间隔(秒)
UPDATE_INTERVAL = 1.0
# 飞机和车辆尺寸(半径 米)
AIRCRAFT_SIZE_M = 30.0
VEHICLE_SIZE_M = 10.0
# 航空器数据
aircraft_data = [
{
"flightNo": "AC001", # 航空器
"latitude": EAST_INTERSECTION["latitude"] - (DIST_150M / 111319.9), # 东路口南150米
"longitude": EAST_INTERSECTION["longitude"],
"time": int(time.time() * 1000),
"direction": 1, # 1表示向北
"speed": 50.0 # 滑行速度 50km/h
}
]
# 添加默认速度常量
DEFAULT_VEHICLE_SPEED = 36.0 # km/h
EMERGENCY_BRAKE_DECELERATION = 0.8 # 紧急制动减速度 (每次更新减速 80%)
NORMAL_BRAKE_DECELERATION = 0.2 # 正常制动减速度 (每次更新减速 20%)
# 车辆数据
vehicle_data = [
{
"vehicleNo": "QN001", # 无人车1西路口
"latitude": WEST_INTERSECTION["latitude"] + (DIST_100M / 111319.9), # 西路口北50米
"longitude": WEST_INTERSECTION["longitude"],
"time": int(time.time() * 1000),
"direction": -1, # -1表示向南
"speed": DEFAULT_VEHICLE_SPEED, # 使用默认速度
"phase": 0 # 0: 南北移动, 1: 东西移动
},
{
"vehicleNo": "QN002", # 无人车2东路口
"latitude": EAST_INTERSECTION["latitude"],
"longitude": EAST_INTERSECTION["longitude"] - (DIST_150M / (111319.9 * math.cos(math.radians(EAST_INTERSECTION["latitude"])))), # 东路口西150米
"time": int(time.time() * 1000),
"direction": 1, # 1表示向东
"speed": DEFAULT_VEHICLE_SPEED, # 使用默认速度
"phase": 0 # 初始化 phase
},
{
"vehicleNo": "TQ001", # 特勤车(西路口)
"latitude": WEST_INTERSECTION["latitude"],
"longitude": WEST_INTERSECTION["longitude"] + (DIST_50M / (111319.9 * math.cos(math.radians(WEST_INTERSECTION["latitude"])))), # 西路口东50米
"time": int(time.time() * 1000),
"direction": -1, # -1表示向西
"speed": DEFAULT_VEHICLE_SPEED, # 使用默认速度
"phase": 0 # 初始化 phase
}
]
# 添加车辆状态类
class VehicleState:
def __init__(self, vehicle_no):
self.vehicle_no = vehicle_no
self.is_running = True # 运行状态
self.current_command = None # 当前执行的指令
self.command_priority = 0 # 当前指令优先级
self.target_speed = DEFAULT_VEHICLE_SPEED # 目标速度
self.brake_mode = None # 制动模式:'emergency' 或 'normal'
self.command_reason = None # 指令原因
self.last_command_time = time.time() # 最后一次指令时间
self.traffic_light_state = None # 当前红绿灯状态
self.target_lat = None # 目标纬度
self.target_lon = None # 目标经度
def can_be_overridden_by(self, command_type):
"""判断当前指令是否可以被新指令覆盖"""
priority_map = {
"ALERT": 5, # 告警指令,最高优先级
"RED": 4, # 红灯指令,次高优先级
"WARNING": 3, # 预警指令,中等优先级
"GREEN": 2, # 绿灯状态,低优先级
"RESUME": 1 # 恢复指令,最低优先级
}
new_priority = priority_map.get(command_type, 0)
current_priority = priority_map.get(self.current_command, 0)
# ALERT 指令可以被 RESUME 解除
if self.current_command == "ALERT":
return command_type == "RESUME"
# RED 指令可以被 GREEN 或更高优先级指令覆盖
if self.current_command == "RED":
return command_type == "GREEN" or new_priority > current_priority
# WARNING 指令可以被 RESUME 解除
if self.current_command == "WARNING":
return command_type == "RESUME" or new_priority > current_priority
# GREEN 不阻止任何指令
if self.current_command == "GREEN":
return True
# 其他情况按优先级判断
return new_priority >= current_priority
def update_command(self, command_type, target_lat=None, target_lon=None):
"""更新指令状态"""
priority_map = {
"ALERT": 5, # 告警指令,最高优先级
"RED": 4, # 红灯指令,次高优先级
"WARNING": 3, # 预警指令,中等优先级
"GREEN": 2, # 绿灯状态,低优先级
"RESUME": 1 # 恢复指令,最低优先级
}
# 更新目标位置
if target_lat is not None and target_lon is not None:
self.target_lat = target_lat
self.target_lon = target_lon
# 如果是红绿灯状态,更新状态和指令
if command_type in ["RED", "GREEN"]:
self.traffic_light_state = command_type
if command_type == "RED":
self.current_command = command_type
self.command_priority = priority_map[command_type]
self.is_running = False
print(f"车辆 {self.vehicle_no} 收到红灯指令,停止运行")
else: # GREEN
# 清除 RED 指令
if self.current_command == "RED":
print(f"车辆 {self.vehicle_no} 收到绿灯指令,清除红灯指令")
# 设置为绿灯指令
self.current_command = command_type
self.command_priority = priority_map[command_type]
# 如果没有其他阻塞性指令,允许移动
if self.current_command not in ["ALERT", "WARNING"]:
self.is_running = True
print(f"车辆 {self.vehicle_no} 收到绿灯指令,恢复运行")
else:
# 其他指令正常更新
self.current_command = command_type
self.command_priority = priority_map.get(command_type, 0)
print(f"更新车辆 {self.vehicle_no} 状态: command={self.current_command}, traffic_light={self.traffic_light_state}, priority={self.command_priority}")
def can_move(self):
"""检查车辆是否可以移动"""
# 如果有告警指令,不能移动
if self.current_command == "ALERT":
return False
# 如果有预警指令,不能移动
if self.current_command == "WARNING":
return False
# 如果是红灯且当前指令是 RED不能移动
if self.traffic_light_state == "RED" and self.current_command == "RED":
return False
# 其他情况可以移动
return True
def log_state(self):
"""记录当前车辆状态"""
print(f"""
车辆状态:
- 车辆编号: {self.vehicle_no}
- 运行状态: {'运行中' if self.is_running else '已停止'}
- 当前指令: {self.current_command}
- 红绿灯状态: {self.traffic_light_state}
- 指令优先级: {self.command_priority}
- 目标速度: {self.target_speed}
- 制动模式: {self.brake_mode}
- 指令原因: {self.command_reason}
- 目标位置: ({self.target_lat}, {self.target_lon})
""")
# 添加车辆状态管理
vehicle_states = {}
for vehicle in vehicle_data:
vehicle_states[vehicle["vehicleNo"]] = VehicleState(vehicle["vehicleNo"])
def calculate_distance_to_target(vehicle, target_lat, target_lon):
"""计算车辆到目标位的距离(米)"""
dlat = target_lat - vehicle["latitude"]
dlon = target_lon - vehicle["longitude"]
# 使用 Haversine 公式计算距离
R = 6371000 # 地球半径(米)
a = math.sin(dlat/2) * math.sin(dlat/2) + \
math.cos(math.radians(vehicle["latitude"])) * math.cos(math.radians(target_lat)) * \
math.sin(dlon/2) * math.sin(dlon/2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
return R * c
@app.route('/api/vehicle/command', methods=['POST'])
def handle_vehicle_command():
try:
data = request.json
vehicle_id = data.get("vehicleId")
command_type = data.get("type", "").upper()
reason = data.get("reason", "").upper()
signal_state = data.get("signalState", "").upper()
target_lat = data.get("latitude", None)
target_lon = data.get("longitude", None)
print(f"收到车辆控制指令: vehicle_id={vehicle_id}, type={command_type}, reason={reason}, signal_state={signal_state}")
print(f"完整请求数据: {data}")
# 将 SIGNAL 指令转换为 RED 或 GREEN
if command_type == "SIGNAL":
command_type = "RED" if signal_state == "RED" else "GREEN"
print(f"转换 SIGNAL 指令为: {command_type}")
# 检查车辆是否存在
vehicle_state = vehicle_states.get(vehicle_id)
if not vehicle_state:
print(f"未找到车辆: {vehicle_id}")
return jsonify({
"status": "error",
"message": f"Vehicle {vehicle_id} not found"
}), 404
# 打印当前车辆状态
print(f"当前车辆状态: vehicle={vehicle_id}")
vehicle_state.log_state()
# 特勤车只响应红绿灯信号
if vehicle_id.startswith("TQ"):
if command_type not in ["RED", "GREEN"]:
print(f"特勤车辆忽略非红绿灯指令: {vehicle_id}")
return jsonify({
"status": "ok",
"message": "Special vehicle only responds to traffic light signals"
})
# 更新红绿灯状态和指令状态
vehicle_state.update_command(command_type, target_lat, target_lon)
print(f"特勤车 {vehicle_id} 更新状态: command={command_type}, traffic_light={vehicle_state.traffic_light_state}")
return jsonify({
"status": "ok",
"message": "Traffic light state updated"
})
# 检查指令优先级并添加详细日志
can_override = vehicle_state.can_be_overridden_by(command_type)
print(f"指令优先级检查: vehicle={vehicle_id}, current_command={vehicle_state.current_command}, "
f"new_command={command_type}, can_override={can_override}")
if not can_override:
print(f"指令优先级过低: vehicle={vehicle_id}, current_priority={vehicle_state.command_priority}, "
f"command={command_type}")
return jsonify({
"status": "warning",
"message": "Command priority too low"
})
# 处理不同类型的指令
if command_type in ["ALERT", "WARNING"]:
# 查找当前车辆
current_vehicle = None
for v in vehicle_data:
if v["vehicleNo"] == vehicle_id:
current_vehicle = v
break
# 执行告警指令,直接停车
print(f"执行{'紧急' if command_type == 'ALERT' else '正常'}制动: vehicle={vehicle_id}")
vehicle_state.current_command = command_type
vehicle_state.command_priority = COMMAND_PRIORITIES.get(command_type, 0)
vehicle_state.is_running = False
vehicle_state.target_speed = 0
vehicle_state.brake_mode = "emergency" if command_type == "ALERT" else "normal"
vehicle_state.target_lat = target_lat
vehicle_state.target_lon = target_lon
# 立即更新车辆速度
current_vehicle["speed"] = 0
elif command_type in ["RED", "GREEN"]:
print(f"执行红绿灯指令: vehicle_id={vehicle_id}, state={command_type}")
# 红绿灯状态的更新由 update_command 处理
pass
elif command_type == "RESUME":
print(f"执行恢复指令: vehicle_id={vehicle_id}")
# 检查当前车辆
current_vehicle = None
for v in vehicle_data:
if v["vehicleNo"] == vehicle_id:
current_vehicle = v
break
# 清除限制性指令
if vehicle_state.current_command in ["ALERT", "WARNING"]:
print(f"清除限制性指令: vehicle={vehicle_id}")
vehicle_state.current_command = None
vehicle_state.command_priority = 0
vehicle_state.is_running = True
vehicle_state.target_speed = DEFAULT_VEHICLE_SPEED
vehicle_state.brake_mode = None
vehicle_state.target_lat = None
vehicle_state.target_lon = None
# 更新车辆运行状态
vehicle_state.is_running = vehicle_state.can_move()
if vehicle_state.is_running:
print(f"车辆 {vehicle_id} 恢复运行")
current_vehicle["speed"] = DEFAULT_VEHICLE_SPEED
# 记录状态变化但不更新指令
vehicle_state.command_reason = reason
vehicle_state.last_command_time = time.time()
print(f"Vehicle {vehicle_id} state updated: running={vehicle_state.is_running}, "
f"command={vehicle_state.current_command}, traffic_light={vehicle_state.traffic_light_state}, "
f"reason={reason}, priority={vehicle_state.command_priority}, "
f"target_speed={vehicle_state.target_speed}, brake_mode={vehicle_state.brake_mode}")
return jsonify({
"status": "ok",
"message": "Command executed successfully"
})
# 更新车辆状态
vehicle_state.update_command(command_type, target_lat, target_lon)
vehicle_state.command_reason = reason
vehicle_state.last_command_time = time.time()
# 更新车辆运行状态
vehicle_state.is_running = vehicle_state.can_move()
if not vehicle_state.is_running:
vehicle_state.target_speed = 0
print(f"Vehicle {vehicle_id} state updated: running={vehicle_state.is_running}, "
f"command={vehicle_state.current_command}, traffic_light={vehicle_state.traffic_light_state}, "
f"reason={reason}, priority={vehicle_state.command_priority}, "
f"target_speed={vehicle_state.target_speed}, brake_mode={vehicle_state.brake_mode}")
return jsonify({
"status": "ok",
"message": "Command executed successfully"
})
except Exception as e:
print(f"Error handling vehicle command: {str(e)}")
return jsonify({
"status": "error",
"message": str(e)
}), 500
def calculate_distance_to_intersection(vehicle, intersection):
"""计算车辆到路口的距离(米)"""
vehicle_lat = vehicle["latitude"]
vehicle_lon = vehicle["longitude"]
intersection_lat = intersection["latitude"]
intersection_lon = intersection["longitude"]
# 使用经纬度计算距离
lat_diff = (vehicle_lat - intersection_lat) * 111319.9
lon_diff = (vehicle_lon - intersection_lon) * (111319.9 * math.cos(math.radians(vehicle_lat)))
return math.sqrt(lat_diff * lat_diff + lon_diff * lon_diff)
def get_nearest_intersection(vehicle):
"""获取车辆最近的路口及其红绿灯状态"""
west_dist = calculate_distance_to_intersection(vehicle, WEST_INTERSECTION)
east_dist = calculate_distance_to_intersection(vehicle, EAST_INTERSECTION)
if west_dist <= east_dist:
return WEST_INTERSECTION, traffic_light_data[0], west_dist # TL001
else:
return EAST_INTERSECTION, traffic_light_data[1], east_dist # TL002
def get_front_traffic_light(vehicle, distance_to_west, distance_to_east):
"""获取车辆前方的红绿灯状态"""
# 根据车辆的位置和方向判断前方路口
if vehicle["vehicleNo"] == "QN001":
# 确保 phase 属性存在
if "phase" not in vehicle:
vehicle["phase"] = 0
# QN001 的路线:西路口北侧 -> 东 -> 北
if vehicle["phase"] == 0: # 南北移动
# 在西路口以南时,向北移动需要判断西路口红绿灯
if vehicle["direction"] == 1 and vehicle["latitude"] < WEST_INTERSECTION["latitude"]:
return traffic_light_data[0], distance_to_west # 西路口红绿灯
# 在西路口以北时,向南移动需要判断西路口红绿灯
elif vehicle["direction"] == -1 and vehicle["latitude"] > WEST_INTERSECTION["latitude"]:
return traffic_light_data[0], distance_to_west # 西路口红绿灯
else: # 东西移动
# 在西路口以西时,向东移动需要判断西路口红绿灯
if vehicle["direction"] == 1 and vehicle["longitude"] < WEST_INTERSECTION["longitude"]:
return traffic_light_data[0], distance_to_west # 西路口红绿灯
# 在西路口以东时,向西移动需要判断西路口红绿灯
elif vehicle["direction"] == -1 and vehicle["longitude"] > WEST_INTERSECTION["longitude"]:
return traffic_light_data[0], distance_to_west # 西路口红绿灯
elif vehicle["vehicleNo"] == "QN002":
# 在东路口以西时,向东移动需要判断东路口红绿灯
if vehicle["direction"] == 1 and vehicle["longitude"] < EAST_INTERSECTION["longitude"]:
return traffic_light_data[1], distance_to_east # 东路口红绿灯
# 在东路口以东时,向西移动需要判断东路口红绿灯
elif vehicle["direction"] == -1 and vehicle["longitude"] > EAST_INTERSECTION["longitude"]:
return traffic_light_data[1], distance_to_east # 东路口红绿灯
# 其他情况,表示车辆已经通过路口或不需要判断红绿灯
return None, float('inf')
def update_vehicle_position(vehicle, elapsed_time):
"""更新车辆位置"""
# 获取车辆状态
vehicle_state = vehicle_states.get(vehicle["vehicleNo"])
if not vehicle_state:
return
# 确保所有车辆都有 phase 属性
if "phase" not in vehicle:
vehicle["phase"] = 0
print(f"初始化车辆 {vehicle['vehicleNo']} 的 phase 属性为 0")
# 获取前方红绿灯状态和距离
traffic_light, distance = get_front_traffic_light(vehicle,
calculate_distance_to_intersection(vehicle, WEST_INTERSECTION),
calculate_distance_to_intersection(vehicle, EAST_INTERSECTION))
# 特勤车只响应红绿灯
if vehicle["vehicleNo"].startswith("TQ"):
# 红灯且距离停止线小于等于 50 米时停车
if traffic_light and traffic_light["state"] == 0 and distance <= DIST_50M:
vehicle["speed"] = 0
print(f"特勤车 {vehicle['vehicleNo']} 遇红灯在停止线前停车,距路口 {distance:.1f}")
return
# 其他情况正常行驶
vehicle["speed"] = DEFAULT_VEHICLE_SPEED
print(f"特勤车 {vehicle['vehicleNo']} 正常行驶,速度={vehicle['speed']}km/h")
else:
# 普通车辆的移动逻辑
# 先判断是否在红灯影响范围内50米
if traffic_light and traffic_light["state"] == 0 and distance <= DIST_50M:
# 在红灯影响范围内,检查是否需要停车
if not vehicle_state.can_move():
vehicle["speed"] = 0
print(f"车辆 {vehicle['vehicleNo']} 遇红灯在停止线前停车,距路口 {distance:.1f}")
return
else:
# 不在红灯影响范围内,只检查其他指令
if vehicle_state.current_command in ["ALERT", "WARNING"]:
vehicle["speed"] = 0
print(f"车辆 {vehicle['vehicleNo']} 不能移动: command={vehicle_state.current_command}")
return
# 可以移动,设置正常速度
vehicle["speed"] = DEFAULT_VEHICLE_SPEED
print(f"车辆 {vehicle['vehicleNo']} 可以移动,速度={vehicle['speed']}km/h")
# 计算基础速度(米/秒)
base_speed_mps = vehicle["speed"] * 1000 / 3600
# 计算移动距离
distance = base_speed_mps * elapsed_time
# 计算经纬度变化
dlat, dlon = meters_to_degrees(distance, vehicle["latitude"])
# 更新位置
if vehicle["vehicleNo"].startswith("QN"):
# 无人车的路径更新逻辑
if vehicle["vehicleNo"] == "QN001":
# 无人车1先南北后东西往返
if vehicle["phase"] == 0: # 南北移动
new_lat = vehicle["latitude"] + (dlat * vehicle["direction"])
if vehicle["direction"] == -1 and new_lat <= WEST_INTERSECTION["latitude"]:
# 到达路口,切换到东西移动
vehicle["phase"] = 1
vehicle["direction"] = 1 # 向东
print(f"QN001 到达西路口,切换到东西移动,方向=东")
elif vehicle["direction"] == 1 and new_lat >= WEST_INTERSECTION["latitude"] + (DIST_50M / 111319.9):
# 返回起点
vehicle["phase"] = 0
vehicle["direction"] = -1 # 向南
print(f"QN001 到达北端,切换到南北移动,方向=南")
vehicle["latitude"] = new_lat
print(f"QN001 南北移动: lat={new_lat}, direction={vehicle['direction']}")
else: # 东西移动
new_lon = vehicle["longitude"] + (dlon * vehicle["direction"])
if vehicle["direction"] == 1 and new_lon >= WEST_INTERSECTION["longitude"] + (DIST_100M / (111319.9 * math.cos(math.radians(vehicle["latitude"])))):
# 到达东端,开始返回
vehicle["direction"] = -1 # 向西
print(f"QN001 到达东端,切换方向=西")
elif vehicle["direction"] == -1 and new_lon <= WEST_INTERSECTION["longitude"]:
# 返回路口,切换到南北移动
vehicle["phase"] = 0
vehicle["direction"] = 1 # 向北
print(f"QN001 返回西路口,切换到南北移动,方向=北")
vehicle["longitude"] = new_lon
print(f"QN001 东西移动: lon={new_lon}, direction={vehicle['direction']}")
else: # QN002
# 无人车2西向往返西路口
new_lon = vehicle["longitude"] + (dlon * vehicle["direction"])
if new_lon >= EAST_INTERSECTION["longitude"] + (DIST_150M / (111319.9 * math.cos(math.radians(vehicle["latitude"])))):
vehicle["direction"] = -1 # 向西
print(f"QN002 到达东端,切换方向=西")
elif new_lon <= EAST_INTERSECTION["longitude"] - (DIST_150M / (111319.9 * math.cos(math.radians(vehicle["latitude"])))):
vehicle["direction"] = 1 # 向东
print(f"QN002 到达西端,切换方向=东")
vehicle["longitude"] = new_lon
print(f"QN002 东西移动: lon={new_lon}, direction={vehicle['direction']}")
elif vehicle["vehicleNo"].startswith("TQ"):
# 特勤车的路径更新逻辑
if vehicle["phase"] == 0: # 东西移动
new_lon = vehicle["longitude"] + (dlon * vehicle["direction"])
if vehicle["direction"] == -1 and new_lon <= WEST_INTERSECTION["longitude"]:
vehicle["phase"] = 1 # 切换到南北移动
vehicle["direction"] = -1 # 向南
print(f"TQ001 到达西路口,切换到南北移动,方向=南")
elif vehicle["direction"] == 1 and new_lon >= WEST_INTERSECTION["longitude"] + (DIST_50M / (111319.9 * math.cos(math.radians(vehicle["latitude"])))):
vehicle["direction"] = -1 # 向西
print(f"TQ001 到达东端,切换方向=西")
vehicle["longitude"] = new_lon
print(f"TQ001 东西移动: lon={new_lon}, direction={vehicle['direction']}")
else: # 南北移动
new_lat = vehicle["latitude"] + (dlat * vehicle["direction"])
if new_lat <= WEST_INTERSECTION["latitude"] - (DIST_100M / 111319.9): # 到达南端
vehicle["direction"] = 1 # 向北
print(f"TQ001 到达南端,切换方向=北")
elif new_lat >= WEST_INTERSECTION["latitude"]: # 返回路口
vehicle["phase"] = 0 # 切换回东西移动
vehicle["direction"] = 1 # 向东
vehicle["longitude"] = WEST_INTERSECTION["longitude"] # 重置到起始位置
print(f"TQ001 返回西路口,切换到东西移动,方向=东")
vehicle["latitude"] = new_lat
print(f"TQ001 南北移动: lat={new_lat}, direction={vehicle['direction']}")
# 新时间戳
vehicle["time"] = int(time.time() * 1000)
def update_aircraft_position(aircraft, elapsed_time):
"""更新航空器位置"""
# 计算一次更新的动距离(米)
speed_mps = aircraft["speed"] * 1000 / 3600 # 速度转换为米/秒
distance = speed_mps * elapsed_time
# 计算经纬度变化
dlat, dlon = meters_to_degrees(distance, aircraft["latitude"])
new_lat = aircraft["latitude"] + (dlat * aircraft["direction"])
if new_lat >= EAST_INTERSECTION["latitude"] + (DIST_150M / 111319.9):
aircraft["direction"] = -1 # 向南
elif new_lat <= EAST_INTERSECTION["latitude"] - (DIST_150M / 111319.9):
aircraft["direction"] = 1 # 向北
aircraft["latitude"] = new_lat
# 定义路口信息
INTERSECTIONS = {
"TL001": {
"id": "INT001",
"name": "西路口",
"latitude": WEST_INTERSECTION["latitude"],
"longitude": WEST_INTERSECTION["longitude"]
},
"TL002": {
"id": "INT002",
"name": "东路口",
"latitude": EAST_INTERSECTION["latitude"],
"longitude": EAST_INTERSECTION["longitude"]
}
}
def generate_traffic_light_data():
"""生成红绿灯数据"""
traffic_light_data = []
# 生成两个固定路口的红绿灯数据
for tl_id, intersection in INTERSECTIONS.items():
signal = {
"id": tl_id,
"state": 1, # 0: RED, 1: GREEN测试时保持绿灯
"timestamp": int(time.time() * 1000),
"intersection": intersection["id"],
"position": {
"latitude": intersection["latitude"],
"longitude": intersection["longitude"]
}
}
traffic_light_data.append(signal)
return traffic_light_data
# 红绿灯数据
traffic_light_data = generate_traffic_light_data()
# 分别记录航空器和车辆的上次更新时间
last_aircraft_update_time = time.time()
last_vehicle_update_time = time.time()
last_light_switch_time = time.time()
@app.route('/api/getCurrentFlightPositions')
def get_flight_positions():
global last_aircraft_update_time
current_time = time.time()
elapsed_time = current_time - last_aircraft_update_time
# 只在达到更新间隔时更新位置
if elapsed_time >= UPDATE_INTERVAL:
for aircraft in aircraft_data:
update_aircraft_position(aircraft, UPDATE_INTERVAL)
aircraft["time"] = int(current_time * 1000)
last_aircraft_update_time = current_time
return jsonify(aircraft_data)
def switch_traffic_light_state():
"""统一处理红绿灯状态切换"""
global last_light_switch_time
current_time = time.time()
# 西路口红绿灯每15秒切换一次
elapsed_since_switch = current_time - last_light_switch_time
if elapsed_since_switch >= 15:
traffic_light_data[0]["state"] = 1 if traffic_light_data[0]["state"] == 0 else 0
last_light_switch_time = current_time
print(f"西路口红绿灯状态切换为: {'绿灯' if traffic_light_data[0]['state'] == 1 else '红灯'}")
# 更新东路口红绿灯(根据航空器位置)
if aircraft_data:
aircraft = aircraft_data[0]
lat_diff = abs(aircraft["latitude"] - EAST_INTERSECTION["latitude"]) * 111319.9
old_state = traffic_light_data[1]["state"]
traffic_light_data[1]["state"] = 1 if lat_diff > (AIRCRAFT_SIZE_M + DIST_50M) else 0
if old_state != traffic_light_data[1]["state"]:
print(f"东路口红绿灯状态切换为: {'绿灯' if traffic_light_data[1]['state'] == 1 else '红灯'}")
@app.route('/api/getCurrentVehiclePositions')
def get_vehicle_positions():
global last_vehicle_update_time
current_time = time.time()
elapsed_time = current_time - last_vehicle_update_time
try:
# 统一处理红绿灯状态切换
switch_traffic_light_state()
# 只在达到更新间隔时更新位置
if elapsed_time >= UPDATE_INTERVAL:
for vehicle in vehicle_data:
update_vehicle_position(vehicle, UPDATE_INTERVAL)
vehicle["time"] = int(current_time * 1000)
last_vehicle_update_time = current_time
return jsonify(vehicle_data)
except Exception as e:
print(f"Error in get_vehicle_positions: {str(e)}")
return jsonify({"error": str(e)}), 500
@app.route('/api/getTrafficLightSignals')
def get_traffic_light_signals():
current_time = time.time()
# 更新时间戳
for light in traffic_light_data:
light["timestamp"] = int(current_time * 1000)
# 统一处理红绿灯状态切换
switch_traffic_light_state()
return jsonify(traffic_light_data)
@app.after_request
def add_cors_headers(response):
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Headers'] = 'Content-Type'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, OPTIONS'
return response
def check_vehicle_states():
"""定期检查所有车辆状态"""
for vehicle_no, state in vehicle_states.items():
state.log_state()
if __name__ == '__main__':
app.run(host='localhost', port=8081, debug=True)