CollisionAvoidance/tools/mock_server.py

825 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, jsonify, request
import time
import math
import random
import logging
app = Flask(__name__)
# 地球半径(米)
EARTH_RADIUS = 6378137.0
COMMAND_PRIORITIES = {
"ALERT": 5,
"SIGNAL": 4,
"WARNING": 3,
"GREEN": 2,
"RESUME": 1
}
def meters_to_degrees(meters, latitude):
"""
将米转换为度,考虑纬度的影响
"""
# 纬度方向1度 = 111,319.9米
meters_per_deg_lat = 111319.9
# 经度方向1度 = 111,319.9 * cos(latitude)米
meters_per_deg_lon = meters_per_deg_lat * math.cos(math.radians(latitude))
return meters / meters_per_deg_lat, meters / meters_per_deg_lon
# 距离配置(米)
DIST_300M = 300
DIST_200M = 200
DIST_150M = 150
DIST_100M = 100
DIST_50M = 50
# 时间配置(秒)
WAIT_TIME_AFTER_RETURN = 10.0 # 返回起点后的等待时间
UPDATE_INTERVAL = 1.0 # 位置更新间隔, 默认 1 秒
TRAFFIC_LIGHT_SWITCH_INTERVAL = 10.0 # 红绿灯切换间隔
# 两个路口的位置
T2_INTERSECTION = {"longitude": 120.08502054, "latitude": 36.35448347}
T6_INTERSECTION = {"longitude": 120.08649105, "latitude": 36.35074527}
# 坐标点
# 无人车1起点
POINT_T1 = {"longitude": 120.0868853, "latitude": 36.35496367}
# 冲突点 1
POINT_T2 = {"longitude": 120.08502054, "latitude": 36.35448347}
# 特勤车终点
POINT_T3 = {"longitude": 120.08341044, "latitude": 36.35406879}
# 无人车1终点, 无人车 2 起点,特勤车起点
POINT_T4 = {"longitude": 120.08558121, "latitude": 36.35305878}
POINT_T5 = {"longitude": 120.08400957, "latitude": 36.35265197}
# 冲突点 2
POINT_T6 = {"longitude": 120.08649105, "latitude": 36.35074527}
# 航空器 1终点
POINT_T7 = {"longitude": 120.08562915, "latitude": 36.35052372}
# 无人车 2终点
POINT_T8 = {"longitude": 120.08676664, "latitude": 36.35004529}
POINT_T9 = {"longitude": 120.08520616, "latitude": 36.34964473}
POINT_T10 = {"longitude": 120.08710569, "latitude": 36.34917893}
# 航空器 1终点
POINT_T11 = {"longitude": 120.0873865, "latitude": 36.3509885}
# QN002 新起点(距离 T6 路口 150 米)
POINT_T12 = {"longitude": 120.08603613, "latitude": 36.35190217} # T4 和 T6 之间,距 T6 150米
# 飞机和车辆尺寸(半径 米)
AIRCRAFT_SIZE_M = 30.0
VEHICLE_SIZE_M = 10.0
# 计算初始方向向量
initial_target_lat = POINT_T11["latitude"] - POINT_T7["latitude"]
initial_target_lon = POINT_T11["longitude"] - POINT_T7["longitude"]
# 归一化方向向量
initial_dist = math.sqrt(initial_target_lat * initial_target_lat + initial_target_lon * initial_target_lon)
if initial_dist > 0:
initial_target_lat /= initial_dist
initial_target_lon /= initial_dist
# 修改航空器数据的初始位置和方向
aircraft_data = [
{
"flightNo": "AC001", # 航空器
"latitude": POINT_T7["latitude"],
"longitude": POINT_T7["longitude"],
"time": int(time.time() * 1000),
"direction": { # 改用方向向量替代单一方向值
"lat": initial_target_lat,
"lon": initial_target_lon
},
"speed": 36.0 # 滑行速度 ,默认 50km/h
}
]
# 添加默认速度常量
DEFAULT_VEHICLE_SPEED = 18.0 # km/h默认 36km/h
EMERGENCY_BRAKE_DECELERATION = 0.8 # 紧急制动减速度 (每次更新减速 80%)
NORMAL_BRAKE_DECELERATION = 0.2 # 正常制动减速度 (每次更新减速 20%)
# 计算 QN002 的初始方向向量
qn002_target_lat = POINT_T8["latitude"] - POINT_T4["latitude"]
qn002_target_lon = POINT_T8["longitude"] - POINT_T4["longitude"]
# 归一化方向向量
qn002_dist = math.sqrt(qn002_target_lat * qn002_target_lat + qn002_target_lon * qn002_target_lon)
if qn002_dist > 0:
qn002_target_lat /= qn002_dist
qn002_target_lon /= qn002_dist
# 修改车辆数据中 QN002 的初始位置和方向
vehicle_data = [
{
"vehicleNo": "QN001", # 无人车1
"latitude": POINT_T1["latitude"],
"longitude": POINT_T1["longitude"],
"time": int(time.time() * 1000),
"direction": -1, # -1表示向南
"speed": DEFAULT_VEHICLE_SPEED,
"phase": 0
},
{
"vehicleNo": "QN002", # 无人车2
"latitude": POINT_T12["latitude"], # 使用新的起点
"longitude": POINT_T12["longitude"],
"time": int(time.time() * 1000),
"direction": { # 改用方向向量
"lat": qn002_target_lat,
"lon": qn002_target_lon
},
"speed": DEFAULT_VEHICLE_SPEED,
"phase": 0
},
{
"vehicleNo": "TQ001", # 特勤车
"latitude": POINT_T4["latitude"],
"longitude": POINT_T4["longitude"],
"time": int(time.time() * 1000),
"direction": -1, # -1表示向西
"speed": DEFAULT_VEHICLE_SPEED, # 使用默认速度
"phase": 0 # 初始化 phase
}
]
# 添加车辆状态类
class VehicleState:
def __init__(self, vehicle_no):
self.vehicle_no = vehicle_no
self.is_running = True # 运行状态
self.current_command = None # 当前执行的指令
self.command_priority = 0 # 当前指令优先级
self.target_speed = DEFAULT_VEHICLE_SPEED # 目标速度
self.brake_mode = None # 制动模式:'emergency' 或 'normal'
self.command_reason = None # 指令原因
self.last_command_time = time.time() # 最后一次指令时间
self.traffic_light_state = None # 当前红绿灯状态
self.target_lat = None # 目标纬度
self.target_lon = None # 目标经度
def can_be_overridden_by(self, command_type):
"""判断当前指令是否可以被新指令覆盖"""
priority_map = {
"ALERT": 5, # 告警指令,最高优先级
"RED": 4, # 红灯指令,次高优先级
"WARNING": 3, # 预警指令,中等优先级
"GREEN": 2, # 绿灯状态,低优先级
"RESUME": 1 # 恢复指令,最低优先级
}
new_priority = priority_map.get(command_type, 0)
current_priority = priority_map.get(self.current_command, 0)
# ALERT 指令可以被 RESUME 解除
if self.current_command == "ALERT":
return command_type == "RESUME"
# RED 指令可以被 GREEN 或更高优先级指令覆盖
if self.current_command == "RED":
return command_type == "GREEN" or new_priority > current_priority
# WARNING 指令可以被 RESUME 解除
if self.current_command == "WARNING":
return command_type == "RESUME" or new_priority > current_priority
# GREEN 不阻止任何指令
if self.current_command == "GREEN":
return True
# 其他情况按优先级判断
return new_priority >= current_priority
def update_command(self, command_type, target_lat=None, target_lon=None):
"""更新指令状态"""
priority_map = {
"ALERT": 5, # 告警指令,最高优先级
"RED": 4, # 红灯指令,次高优先级
"WARNING": 3, # 预警指令,中等优先级
"GREEN": 2, # 绿灯状态,低优先级
"RESUME": 1 # 恢复指令,最低优先级
}
# 更新目标位置
if target_lat is not None and target_lon is not None:
self.target_lat = target_lat
self.target_lon = target_lon
# 如果是红绿灯状态,更新状态和指令
if command_type in ["RED", "GREEN"]:
self.traffic_light_state = command_type
if command_type == "RED":
self.current_command = command_type
self.command_priority = priority_map[command_type]
self.is_running = False
print(f"车辆 {self.vehicle_no} 收到红灯指令,停止运行")
else: # GREEN
# 清除 RED 指令
if self.current_command == "RED":
print(f"车辆 {self.vehicle_no} 收到绿灯指令,清除红灯指令")
# 设置为绿灯指令
self.current_command = command_type
self.command_priority = priority_map[command_type]
# 如果没有其他阻塞性指令,允许移动
if self.current_command not in ["ALERT", "WARNING"]:
self.is_running = True
print(f"车辆 {self.vehicle_no} 收到绿灯指令,恢复运行")
else:
# 其他指令正常更新
self.current_command = command_type
self.command_priority = priority_map.get(command_type, 0)
print(f"更新车辆 {self.vehicle_no} 状态: command={self.current_command}, traffic_light={self.traffic_light_state}, priority={self.command_priority}")
def can_move(self):
"""检查车辆是否可以移动"""
# 如果有告警指令,不能移动
if self.current_command == "ALERT":
return False
# 如果有预警指令,不能移动
if self.current_command == "WARNING":
return False
# 如果是红灯且当前指令是 RED不能移动
if self.traffic_light_state == "RED" and self.current_command == "RED":
return False
# 其他情况可以移动
return True
def log_state(self):
"""记录当前车辆状态"""
print(f"""
车辆状态:
- 车辆编号: {self.vehicle_no}
- 运行状态: {'运行中' if self.is_running else '已停止'}
- 当前指令: {self.current_command}
- 红绿灯状态: {self.traffic_light_state}
- 指令优先级: {self.command_priority}
- 目标速度: {self.target_speed}
- 制动模式: {self.brake_mode}
- 指令原因: {self.command_reason}
- 目标位置: ({self.target_lat}, {self.target_lon})
""")
# 添加车辆状态管理
vehicle_states = {}
for vehicle in vehicle_data:
vehicle_states[vehicle["vehicleNo"]] = VehicleState(vehicle["vehicleNo"])
def calculate_distance_to_target(vehicle, target_lat, target_lon):
"""计算车辆到目标位的距离(米)"""
dlat = target_lat - vehicle["latitude"]
dlon = target_lon - vehicle["longitude"]
# 使用 Haversine 公式计算距离
R = 6371000 # 地球半径(米)
a = math.sin(dlat/2) * math.sin(dlat/2) + \
math.cos(math.radians(vehicle["latitude"])) * math.cos(math.radians(target_lat)) * \
math.sin(dlon/2) * math.sin(dlon/2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
return R * c
@app.route('/api/vehicle/command', methods=['POST'])
def handle_vehicle_command():
try:
data = request.json
vehicle_id = data.get("vehicleId")
command_type = data.get("type", "").upper()
reason = data.get("reason", "").upper()
signal_state = data.get("signalState", "").upper()
target_lat = data.get("latitude", None)
target_lon = data.get("longitude", None)
print(f"收到车辆控制指令: vehicle_id={vehicle_id}, type={command_type}, reason={reason}, signal_state={signal_state}")
print(f"完整请求数据: {data}")
# 将 SIGNAL 指令转换为 RED 或 GREEN
if command_type == "SIGNAL":
command_type = "RED" if signal_state == "RED" else "GREEN"
print(f"转换 SIGNAL 指令为: {command_type}")
# 检查车辆是否存在
vehicle_state = vehicle_states.get(vehicle_id)
if not vehicle_state:
print(f"未找到车辆: {vehicle_id}")
return jsonify({
"status": "error",
"message": f"Vehicle {vehicle_id} not found"
}), 404
# 打印当前车辆状态
print(f"当前车辆状态: vehicle={vehicle_id}")
vehicle_state.log_state()
# 特勤车只响应红绿灯信号
if vehicle_id.startswith("TQ"):
if command_type not in ["RED", "GREEN"]:
print(f"特勤车辆忽略非红绿灯指令: {vehicle_id}")
return jsonify({
"status": "ok",
"message": "Special vehicle only responds to traffic light signals"
})
# 更新红绿灯状态和指令状态
vehicle_state.update_command(command_type, target_lat, target_lon)
print(f"特勤车 {vehicle_id} 更新状态: command={command_type}, traffic_light={vehicle_state.traffic_light_state}")
return jsonify({
"status": "ok",
"message": "Traffic light state updated"
})
# 检查指令优先级并添加详细日志
can_override = vehicle_state.can_be_overridden_by(command_type)
print(f"指令优先级检查: vehicle={vehicle_id}, current_command={vehicle_state.current_command}, "
f"new_command={command_type}, can_override={can_override}")
if not can_override:
print(f"指令优先级过低: vehicle={vehicle_id}, current_priority={vehicle_state.command_priority}, "
f"command={command_type}")
return jsonify({
"status": "warning",
"message": "Command priority too low"
})
# 处理不同类型的指令
if command_type in ["ALERT", "WARNING"]:
# 查找当前车辆
current_vehicle = None
for v in vehicle_data:
if v["vehicleNo"] == vehicle_id:
current_vehicle = v
break
# 执行告警指令直接停车
print(f"执行{'紧急' if command_type == 'ALERT' else '正常'}制动: vehicle={vehicle_id}")
vehicle_state.current_command = command_type
vehicle_state.command_priority = COMMAND_PRIORITIES.get(command_type, 0)
vehicle_state.is_running = False
vehicle_state.target_speed = 0
vehicle_state.brake_mode = "emergency" if command_type == "ALERT" else "normal"
vehicle_state.target_lat = target_lat
vehicle_state.target_lon = target_lon
# 立即更新车辆速度
current_vehicle["speed"] = 0
elif command_type in ["RED", "GREEN"]:
print(f"执行红绿灯指令: vehicle_id={vehicle_id}, state={command_type}")
# 红绿灯状态的更新由 update_command 处理
pass
elif command_type == "RESUME":
print(f"执行恢复指令: vehicle_id={vehicle_id}")
# 检查当前车辆
current_vehicle = None
for v in vehicle_data:
if v["vehicleNo"] == vehicle_id:
current_vehicle = v
break
# 清除限制性指令
if vehicle_state.current_command in ["ALERT", "WARNING"]:
print(f"清除限制性指令: vehicle={vehicle_id}")
vehicle_state.current_command = None
vehicle_state.command_priority = 0
vehicle_state.is_running = True
vehicle_state.target_speed = DEFAULT_VEHICLE_SPEED
vehicle_state.brake_mode = None
vehicle_state.target_lat = None
vehicle_state.target_lon = None
# 更新车辆运行状态
vehicle_state.is_running = vehicle_state.can_move()
if vehicle_state.is_running:
print(f"车辆 {vehicle_id} 恢复运行")
current_vehicle["speed"] = DEFAULT_VEHICLE_SPEED
# 记录状态变化但不更新指令
vehicle_state.command_reason = reason
vehicle_state.last_command_time = time.time()
print(f"Vehicle {vehicle_id} state updated: running={vehicle_state.is_running}, "
f"command={vehicle_state.current_command}, traffic_light={vehicle_state.traffic_light_state}, "
f"reason={reason}, priority={vehicle_state.command_priority}, "
f"target_speed={vehicle_state.target_speed}, brake_mode={vehicle_state.brake_mode}")
return jsonify({
"status": "ok",
"message": "Command executed successfully"
})
# 更新车辆状态
vehicle_state.update_command(command_type, target_lat, target_lon)
vehicle_state.command_reason = reason
vehicle_state.last_command_time = time.time()
# 更新车辆运行状态
vehicle_state.is_running = vehicle_state.can_move()
if not vehicle_state.is_running:
vehicle_state.target_speed = 0
print(f"Vehicle {vehicle_id} state updated: running={vehicle_state.is_running}, "
f"command={vehicle_state.current_command}, traffic_light={vehicle_state.traffic_light_state}, "
f"reason={reason}, priority={vehicle_state.command_priority}, "
f"target_speed={vehicle_state.target_speed}, brake_mode={vehicle_state.brake_mode}")
return jsonify({
"status": "ok",
"message": "Command executed successfully"
})
except Exception as e:
print(f"Error handling vehicle command: {str(e)}")
return jsonify({
"status": "error",
"message": str(e)
}), 500
def calculate_distance_to_intersection(vehicle, intersection):
"""计算车辆到路口的距离(米)"""
vehicle_lat = vehicle["latitude"]
vehicle_lon = vehicle["longitude"]
intersection_lat = intersection["latitude"]
intersection_lon = intersection["longitude"]
# 使用经纬度计算距离
lat_diff = (vehicle_lat - intersection_lat) * 111319.9
lon_diff = (vehicle_lon - intersection_lon) * (111319.9 * math.cos(math.radians(vehicle_lat)))
return math.sqrt(lat_diff * lat_diff + lon_diff * lon_diff)
def get_nearest_intersection(vehicle):
"""获取车辆最近的路口及其红绿灯状态"""
west_dist = calculate_distance_to_intersection(vehicle, T2_INTERSECTION)
east_dist = calculate_distance_to_intersection(vehicle, T6_INTERSECTION)
if west_dist <= east_dist:
return T2_INTERSECTION, traffic_light_data[0], west_dist # TL001
else:
return T6_INTERSECTION, traffic_light_data[1], east_dist # TL002
def get_front_traffic_light(vehicle, distance_to_west, distance_to_east):
"""获取车辆前方的红绿灯状态"""
# 根据车辆的位置和方向判断前方路口
if vehicle["vehicleNo"] == "QN001":
# 确保 phase 属性存在
if "phase" not in vehicle:
vehicle["phase"] = 0
# QN001 的路线:西路口北侧 -> 东 -> 北
if vehicle["phase"] == 0: # 南北移动
# 在西路口以北时,向南移动需要判断西路口红绿灯
if vehicle["direction"] == -1 and vehicle["latitude"] > T2_INTERSECTION["latitude"]:
return traffic_light_data[0], distance_to_west # 西路口红绿灯
elif vehicle["vehicleNo"] == "QN002":
# 在东路口以西时,向东移动需要判断东路红绿灯
if vehicle["direction"] == 1 and vehicle["longitude"] < T6_INTERSECTION["longitude"]:
return traffic_light_data[1], distance_to_east # 东路口红绿灯
# 其他情况,表示车辆已经过路口或不需要判断红绿灯
return None, float('inf')
def update_position_with_vector(obj, target_point, start_point, speed, elapsed_time, return_to_start=False):
"""
基于向量的位置更新逻辑
Args:
obj: 需要更新位置的对象(航空器或车辆)
target_point: 目标点坐标 {"latitude": float, "longitude": float}
start_point: 起点坐标 {"latitude": float, "longitude": float}
speed: 移动速度km/h
elapsed_time: 经过的时间(秒)
return_to_start: 是否在到达目标点时返回起点
"""
# 检查是否在等待状态
if "wait_until" not in obj:
obj["wait_until"] = 0
current_time = time.time()
if current_time < obj["wait_until"]:
return False # 还在等待中,不更新位置
# 计算这一步要移动的距离(米)并转换为经纬度
speed_mps = speed * 1000 / 3600
distance = speed_mps * elapsed_time
# 将移动距离转换为经纬度变化量
move_dlat, move_dlon = meters_to_degrees(distance, obj["latitude"])
# 将15米的判断距离转换为经纬度
check_dlat, check_dlon = meters_to_degrees(15, obj["latitude"])
# 计算当前位置到终点的向量
vector_lat = target_point["latitude"] - obj["latitude"]
vector_lon = target_point["longitude"] - obj["longitude"]
# 计算向量长度(经纬度空间)
vector_length = math.sqrt(vector_lat * vector_lat + vector_lon * vector_lon)
# 检查是否到达终点
if vector_length <= math.sqrt(check_dlat * check_dlat + check_dlon * check_dlon):
if return_to_start:
# 返回起点并设置等待时间
obj["latitude"] = start_point["latitude"]
obj["longitude"] = start_point["longitude"]
obj["wait_until"] = current_time + WAIT_TIME_AFTER_RETURN # 使用常量
print(f"对象 {obj.get('flightNo', obj.get('vehicleNo'))} 返回起点,等待{WAIT_TIME_AFTER_RETURN}秒后继续")
else:
# 否则就停在目标点
obj["latitude"] = target_point["latitude"]
obj["longitude"] = target_point["longitude"]
return True # 表示已到达终点
else:
# 归一化方向向量
unit_lat = vector_lat / vector_length
unit_lon = vector_lon / vector_length
# 计算这一步的位置变化
dlat = move_dlat * unit_lat
dlon = move_dlon * unit_lon
# 更新位置
obj["latitude"] += dlat
obj["longitude"] += dlon
return False # 表示正在移动中
def update_aircraft_position(aircraft, elapsed_time):
"""更新航空器位置"""
reached_target = update_position_with_vector(
aircraft,
POINT_T11, # 目标点
POINT_T7, # 起点
aircraft["speed"],
elapsed_time,
return_to_start=True # 航空器需要返回起点
)
if reached_target:
print(f"航空器 {aircraft['flightNo']} 到达终点,返回起点 T7")
def update_vehicle_position(vehicle, elapsed_time):
"""更新车辆位置"""
# 获取车辆状态
vehicle_state = vehicle_states.get(vehicle["vehicleNo"])
if not vehicle_state:
return
# 确保所有车辆都有 phase 属性
if "phase" not in vehicle:
vehicle["phase"] = 0
print(f"初始化车辆 {vehicle['vehicleNo']} 的 phase 属性为 0")
# 获取前方红绿灯状态和距离
traffic_light, distance = get_front_traffic_light(vehicle,
calculate_distance_to_intersection(vehicle, T2_INTERSECTION),
calculate_distance_to_intersection(vehicle, T6_INTERSECTION))
# 特勤车只响应红绿灯
if vehicle["vehicleNo"].startswith("TQ"):
# 红灯且距离停止线小于等于 50 米时停车
if traffic_light and traffic_light["state"] == 0 and distance <= DIST_50M:
vehicle["speed"] = 0
print(f"特勤车 {vehicle['vehicleNo']} 遇红灯在停止线前停车,距路口 {distance:.1f}")
return
# 其他情况正常行驶
vehicle["speed"] = DEFAULT_VEHICLE_SPEED
print(f"特勤车 {vehicle['vehicleNo']} 正常行驶,速度={vehicle['speed']}km/h")
else:
# 普通车辆的移动逻辑
# 先判断是否在红灯影响范围内50米
if traffic_light and traffic_light["state"] == 0 and distance <= DIST_50M:
# 在红灯影响范围内,检查是否需要停车
if not vehicle_state.can_move():
vehicle["speed"] = 0
print(f"车辆 {vehicle['vehicleNo']} 遇红灯在停止线前停车,距路口 {distance:.1f}")
return
else:
# 不在红灯影响范围内,只检查其他指令
if vehicle_state.current_command in ["ALERT", "WARNING"]:
vehicle["speed"] = 0
print(f"车辆 {vehicle['vehicleNo']} 不能移动: command={vehicle_state.current_command}")
return
# 可以移动,设置正常速度
vehicle["speed"] = DEFAULT_VEHICLE_SPEED
print(f"车辆 {vehicle['vehicleNo']} 可以移动,速度={vehicle['speed']}km/h")
# 更新位置
if vehicle["vehicleNo"].startswith("QN"):
# 无人车的路径更新逻辑
if vehicle["vehicleNo"] == "QN001":
if vehicle["phase"] == 0: # T1 -> T2
reached_target = update_position_with_vector(
vehicle,
POINT_T2, # 目标点(西路口)
POINT_T1, # 起点
vehicle["speed"],
elapsed_time,
return_to_start=False # 不返回起点,继续到下一个点
)
if reached_target:
vehicle["phase"] = 1 # 切换到下一阶段
print(f"无人车 {vehicle['vehicleNo']} 到达 T2开始前往 T4")
elif vehicle["phase"] == 1: # T2 -> T4
reached_target = update_position_with_vector(
vehicle,
POINT_T4, # 目标点
POINT_T2, # 起点
vehicle["speed"],
elapsed_time,
return_to_start=False # 不返回起点,继续到下一个点
)
if reached_target:
# 到达 T4 后直接跳回起点 T1
vehicle["latitude"] = POINT_T1["latitude"]
vehicle["longitude"] = POINT_T1["longitude"]
vehicle["phase"] = 0 # 重置到初始阶段
print(f"无人车 {vehicle['vehicleNo']} 到达 T4直接返回起点 T1")
else: # QN002
reached_target = update_position_with_vector(
vehicle,
POINT_T8, # 目标点
POINT_T12, # 使用新的起点
vehicle["speed"],
elapsed_time,
return_to_start=True # QN002 需要返回起点
)
if reached_target:
print(f"无人车 {vehicle['vehicleNo']} 到达终点,返回起点 T12")
elif vehicle["vehicleNo"].startswith("TQ"):
# 特勤车的路径更新逻辑
if vehicle["phase"] == 0: # T4 -> T2
reached_target = update_position_with_vector(
vehicle,
POINT_T2, # 目标点(西路口)
POINT_T4, # 起点
vehicle["speed"],
elapsed_time,
return_to_start=False # 不返回起点,继续到下一个点
)
if reached_target:
vehicle["phase"] = 1 # 切换到下一阶段
print(f"特勤车 {vehicle['vehicleNo']} 到达 T2开始前往 T3")
elif vehicle["phase"] == 1: # T2 -> T3
reached_target = update_position_with_vector(
vehicle,
POINT_T3, # 目标点(特勤车终点)
POINT_T2, # 起点
vehicle["speed"],
elapsed_time,
return_to_start=False # 不返回起点,继续到下一个点
)
if reached_target:
# 到达 T3 后直接跳回起点 T4
vehicle["latitude"] = POINT_T4["latitude"]
vehicle["longitude"] = POINT_T4["longitude"]
vehicle["phase"] = 0 # 重置到初始阶段
print(f"特勤车 {vehicle['vehicleNo']} 到达 T3直接返回起点 T4")
# 新时间戳
vehicle["time"] = int(time.time() * 1000)
# 定义路口信息
INTERSECTIONS = {
"TL001": {
"id": "INT001",
"name": "西路口",
"latitude": T2_INTERSECTION["latitude"],
"longitude": T2_INTERSECTION["longitude"]
},
"TL002": {
"id": "INT002",
"name": "东路口",
"latitude": T6_INTERSECTION["latitude"],
"longitude": T6_INTERSECTION["longitude"]
}
}
def generate_traffic_light_data():
"""生成红绿灯数据"""
traffic_light_data = []
# 生成两个固定路口的红绿灯数据
for tl_id, intersection in INTERSECTIONS.items():
signal = {
"id": tl_id,
"state": 1, # 0: RED, 1: GREEN测试时保持绿灯
"timestamp": int(time.time() * 1000),
"intersection": intersection["id"],
"position": {
"latitude": intersection["latitude"],
"longitude": intersection["longitude"]
}
}
traffic_light_data.append(signal)
return traffic_light_data
# 红绿灯数据
traffic_light_data = generate_traffic_light_data()
# 分别记录航空器和车辆的上次更新时间
last_aircraft_update_time = time.time()
last_vehicle_update_time = time.time()
last_light_switch_time = time.time()
@app.route('/api/getCurrentFlightPositions')
def get_flight_positions():
global last_aircraft_update_time
current_time = time.time()
elapsed_time = current_time - last_aircraft_update_time
# 只在达到更新间隔时更新位置
if elapsed_time >= UPDATE_INTERVAL:
for aircraft in aircraft_data:
update_aircraft_position(aircraft, UPDATE_INTERVAL)
aircraft["time"] = int(current_time * 1000)
last_aircraft_update_time = current_time
return jsonify(aircraft_data)
def switch_traffic_light_state():
"""统一处理红绿灯状态切换"""
global last_light_switch_time
current_time = time.time()
# 西路口红绿灯每15秒切换一次
elapsed_since_switch = current_time - last_light_switch_time
if elapsed_since_switch >= TRAFFIC_LIGHT_SWITCH_INTERVAL: # 使用常量
traffic_light_data[0]["state"] = 1 if traffic_light_data[0]["state"] == 0 else 0
last_light_switch_time = current_time
print(f"西路口红绿灯状态切换为: {'绿灯' if traffic_light_data[0]['state'] == 1 else '红灯'}")
# 更新东路口红绿灯(根据航空器位置)
if aircraft_data:
aircraft = aircraft_data[0]
lat_diff = abs(aircraft["latitude"] - T6_INTERSECTION["latitude"]) * 111319.9
old_state = traffic_light_data[1]["state"]
traffic_light_data[1]["state"] = 1 if lat_diff > DIST_50M else 1
if old_state != traffic_light_data[1]["state"]:
print(f"东路口红绿灯状态切换为: {'绿灯' if traffic_light_data[1]['state'] == 1 else '红灯'}")
@app.route('/api/getCurrentVehiclePositions')
def get_vehicle_positions():
global last_vehicle_update_time
current_time = time.time()
elapsed_time = current_time - last_vehicle_update_time
try:
# 统一处理红绿灯状态切换
switch_traffic_light_state()
# 只在达到更新间隔时更新位置
if elapsed_time >= UPDATE_INTERVAL:
for vehicle in vehicle_data:
update_vehicle_position(vehicle, UPDATE_INTERVAL)
vehicle["time"] = int(current_time * 1000)
last_vehicle_update_time = current_time
return jsonify(vehicle_data)
except Exception as e:
print(f"Error in get_vehicle_positions: {str(e)}")
return jsonify({"error": str(e)}), 500
@app.route('/api/getTrafficLightSignals')
def get_traffic_light_signals():
current_time = time.time()
# 更新时间戳
for light in traffic_light_data:
light["timestamp"] = int(current_time * 1000)
# 统一处理红绿灯状态切换
switch_traffic_light_state()
return jsonify(traffic_light_data)
@app.after_request
def add_cors_headers(response):
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Headers'] = 'Content-Type'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, OPTIONS'
return response
def check_vehicle_states():
"""定期检查所有车辆状态"""
for vehicle_no, state in vehicle_states.items():
state.log_state()
if __name__ == '__main__':
app.run(host='localhost', port=8081, debug=True)