CollisionAvoidance/tools/mock_server.py

1162 lines
45 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, jsonify, request
import time
import math
import random
import logging
import os
import uuid
# 创建 logs 目录(如果不存在)
if not os.path.exists('logs'):
os.makedirs('logs')
# 配置日志
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/mock_server.log'),
logging.StreamHandler()
]
)
app = Flask(__name__)
# 地球半径(米)
EARTH_RADIUS = 6378137.0
COMMAND_PRIORITIES = {
"ALERT": 5,
"WARNING": 3,
"PARKING": 2,
"RESUME": 1
}
def meters_to_degrees(meters, latitude):
"""
将米转换为度,考虑纬度的影响
"""
# 纬度方向1度 = 111,319.9米
meters_per_deg_lat = 111319.9
# 经度方向1度 = 111,319.9 * cos(latitude)米
meters_per_deg_lon = meters_per_deg_lat * math.cos(math.radians(latitude))
return meters / meters_per_deg_lat, meters / meters_per_deg_lon
# 距离配置(米)
DIST_300M = 300
DIST_200M = 200
DIST_150M = 150
DIST_100M = 100
DIST_50M = 50
# 时间配置(秒)
WAIT_TIME_AFTER_RETURN = 10.0 # 返回起点后的等待时间
UPDATE_INTERVAL = 1.0 # 位置更新间隔, 默认 1 秒
TRAFFIC_LIGHT_SWITCH_INTERVAL = 10.0 # 红绿灯切换间隔
# 两个路口的位置
T2_INTERSECTION = {"longitude": 120.08502054, "latitude": 36.35448347}
T6_INTERSECTION = {"longitude": 120.08649105, "latitude": 36.35074527}
# 坐标点
# 无人车1起点
POINT_T1 = {"longitude": 120.0868853, "latitude": 36.35496367}
# 冲突点 1
POINT_T2 = {"longitude": 120.08502054, "latitude": 36.35448347}
# 特勤车终点
POINT_T3 = {"longitude": 120.08341044, "latitude": 36.35406879}
# 无人车1终点, 无人车 2 起点,特勤车起点
POINT_T4 = {"longitude": 120.08558121, "latitude": 36.35305878}
POINT_T5 = {"longitude": 120.08400957, "latitude": 36.35265197}
# 冲突点 2
POINT_T6 = {"longitude": 120.08649105, "latitude": 36.35074527}
# 航空器 1终点
POINT_T7 = {"longitude": 120.08562915, "latitude": 36.35052372}
# 无人车 2终点
POINT_T8 = {"longitude": 120.08676664, "latitude": 36.35004529}
POINT_T9 = {"longitude": 120.08520616, "latitude": 36.34964473}
POINT_T10 = {"longitude": 120.08710569, "latitude": 36.34917893}
# 航空器 1终点
POINT_T11 = {"longitude": 120.0873865, "latitude": 36.3509885}
# QN002 新起点(距离 T6 路口 150 米)
POINT_T12 = {"longitude": 120.08603613, "latitude": 36.35190217} # T4 和 T6 之间,距 T6 150米
# AC001 新起点(距离 T6 路口 200 米)
POINT_T13 = {"longitude": 120.08509148, "latitude": 36.35041247} # T6向南 200 米
# 飞机和车辆尺寸(半径 米)
AIRCRAFT_SIZE_M = 30.0
VEHICLE_SIZE_M = 10.0
# 计算初始方向向量
initial_target_lat = POINT_T11["latitude"] - POINT_T7["latitude"]
initial_target_lon = POINT_T11["longitude"] - POINT_T7["longitude"]
# 归一化方向向量
initial_dist = math.sqrt(initial_target_lat * initial_target_lat + initial_target_lon * initial_target_lon)
if initial_dist > 0:
initial_target_lat /= initial_dist
initial_target_lon /= initial_dist
# 修改航空器数据的初始位置和方向
aircraft_data = [
{
"flightNo": "AC001", # 航空器
"longitude": POINT_T7["longitude"],
"latitude": POINT_T7["latitude"],
"time": int(time.time() * 1000),
"altitude": 0.0,
"trackNumber": 1001, # 改为数字格式
"speed": 36.0, # 内部使用的速度字段,不会在 API 返回中显示
"direction": { # 内部使用的方向字段,不会在 API 返回中显示
"lat": initial_target_lat,
"lon": initial_target_lon
}
}
]
# 添加默认速度常量
DEFAULT_VEHICLE_SPEED = 18.0 # km/h默认 36km/h
EMERGENCY_BRAKE_DECELERATION = 0.8 # 紧急制动减速度 (每次更新减速 80%)
NORMAL_BRAKE_DECELERATION = 0.2 # 正常制动减速度 (每次更新减速 20%)
# 计算 QN002 的初始方向向量
qn002_target_lat = POINT_T8["latitude"] - POINT_T4["latitude"]
qn002_target_lon = POINT_T8["longitude"] - POINT_T4["longitude"]
# 归一化方向向量
qn002_dist = math.sqrt(qn002_target_lat * qn002_target_lat + qn002_target_lon * qn002_target_lon)
if qn002_dist > 0:
qn002_target_lat /= qn002_dist
qn002_target_lon /= qn002_dist
# 修改车辆数据中 QN002 的初始位置和方向
vehicle_data = [
{
"vehicleNo": "QN001", # 无人车1
"longitude": POINT_T1["longitude"],
"latitude": POINT_T1["latitude"],
"time": int(time.time() * 1000),
"direction": 180.0, # 向南方向为180度
"speed": DEFAULT_VEHICLE_SPEED
},
{
"vehicleNo": "QN002", # 无人车2
"longitude": POINT_T12["longitude"],
"latitude": POINT_T12["latitude"],
"time": int(time.time() * 1000),
"direction": 90.0, # 向东方向为90度
"speed": DEFAULT_VEHICLE_SPEED
},
{
"vehicleNo": "TQ001", # 特勤车
"longitude": POINT_T4["longitude"],
"latitude": POINT_T4["latitude"],
"time": int(time.time() * 1000),
"direction": 270.0, # 向西方向为270度
"speed": DEFAULT_VEHICLE_SPEED
}
]
# 添加车辆状态类
class VehicleState:
def __init__(self, vehicle_no):
self.vehicle_no = vehicle_no
self.is_running = True # 运行状态
self.current_command = None # 当前执行的指令
self.command_priority = 0 # 当前指令优先级
self.target_speed = DEFAULT_VEHICLE_SPEED # 目标速度
self.brake_mode = None # 制动模式:'emergency' 或 'normal'
self.command_reason = None # 指令原因
self.last_command_time = time.time() # 最后一次指令时间
self.traffic_light_state = None # 当前红绿灯状态
self.target_lat = None # 目标纬度
self.target_lon = None # 目标经度
self.is_traffic_light_stop = False # 是否因红灯停车
def can_be_overridden_by(self, command_type):
"""判断当前指令是否可以被新指令覆盖"""
# 红绿灯信号不参与指令优先级判断
if command_type in ["RED", "GREEN", "YELLOW"]:
return True
new_priority = COMMAND_PRIORITIES.get(command_type, 0)
current_priority = COMMAND_PRIORITIES.get(self.current_command, 0)
# 相同类型的指令可以覆盖(因为需要持续发送)
if command_type == self.current_command:
return True
# ALERT 指令可以被 RESUME 解除
if self.current_command == "ALERT":
return command_type == "RESUME"
# WARNING 指令可以被 RESUME 或相同及更高优先级指令覆盖
if self.current_command == "WARNING":
return command_type == "RESUME" or new_priority >= current_priority
# 其他情况下,相同或更高优先级可以覆盖
return new_priority >= current_priority
def update_command(self, command_type, target_lat=None, target_lon=None):
"""更新指令状态"""
# 更新目标位置
if target_lat is not None and target_lon is not None:
self.target_lat = target_lat
self.target_lon = target_lon
# 如果是红绿灯状态,只更新状态,不影响其他指令
if command_type in ["RED", "GREEN", "YELLOW"]:
old_state = self.traffic_light_state
self.traffic_light_state = command_type
# 更新红灯停车状态
if command_type == "RED":
self.is_traffic_light_stop = True
elif command_type in ["GREEN", "YELLOW"]:
self.is_traffic_light_stop = False
print(f"车辆 {self.vehicle_no} 收到红绿灯信号: {command_type} (原状态: {old_state})")
else:
# 其他指令正常更新
self.current_command = command_type
self.command_priority = COMMAND_PRIORITIES.get(command_type, 0)
# RESUME 指令不清除红绿灯状态,红绿灯状态由信号灯独立控制
if command_type == "RESUME":
# 清除其他状态
self.brake_mode = None
self.command_reason = None
# 更新运行状态
self.is_running = self.can_move()
print(f"更新车辆 {self.vehicle_no} 状态: command={self.current_command}, traffic_light={self.traffic_light_state}, priority={self.command_priority}, is_running={self.is_running}")
def can_move(self):
"""检查车辆是否可以移动"""
# 如果有告警指令,不能移动
if self.current_command == "ALERT":
return False
# 如果有预警指令,不能移动
if self.current_command == "WARNING":
return False
# 如果有安全停靠指令,不能移动
if self.current_command == "PARKING":
return False
# 如果是红灯停车状态,不能移动
if self.is_traffic_light_stop:
return False
# 其他情况可以移动
return True
def log_state(self):
"""记录当前车辆状态"""
print(f"""
车辆状态:
- 车辆编号: {self.vehicle_no}
- 运行状态: {'运行中' if self.is_running else '已停止'}
- 当前指令: {self.current_command}
- 红绿灯状态: {self.traffic_light_state}
- 红灯停车: {'' if self.is_traffic_light_stop else ''}
- 指令优先级: {self.command_priority}
- 目标速度: {self.target_speed}
- 制动模式: {self.brake_mode}
- 指令原因: {self.command_reason}
- 目标位置: ({self.target_lat}, {self.target_lon})
""")
# 添加车辆状态管理
vehicle_states = {}
for vehicle in vehicle_data:
vehicle_states[vehicle["vehicleNo"]] = VehicleState(vehicle["vehicleNo"])
def calculate_distance_to_target(vehicle, target_lat, target_lon):
"""计算车辆到目标位的距离(米)"""
dlat = target_lat - vehicle["latitude"]
dlon = target_lon - vehicle["longitude"]
# 使用 Haversine 公式计算距离
R = 6371000 # 地球半径(米)
a = math.sin(dlat/2) * math.sin(dlat/2) + \
math.cos(math.radians(vehicle["latitude"])) * math.cos(math.radians(target_lat)) * \
math.sin(dlon/2) * math.sin(dlon/2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
return R * c
@app.route('/api/VehicleCommandInfo', methods=['POST'])
def handle_vehicle_command():
try:
data = request.json
vehicle_id = data.get("vehicleID")
command_type = data.get("commandType", "").upper()
reason = data.get("commandReason", "").upper()
signal_state = data.get("signalState", "").upper()
target_lat = data.get("latitude", None)
target_lon = data.get("longitude", None)
print(f"收到车辆控制指令: vehicle_id={vehicle_id}, type={command_type}, reason={reason}, signal_state={signal_state}")
print(f"完整请求数据: {data}")
# 检查 SIGNAL 类型命令必须包含 signalState
if command_type == "SIGNAL" and not signal_state:
print(f"SIGNAL 类型命令缺少 signalState")
return jsonify({
"code": 400,
"msg": "SIGNAL command must include signalState",
"transId": data.get("transId"),
"timestamp": int(time.time() * 1000)
}), 400
# 检查车辆是否存在
vehicle_state = vehicle_states.get(vehicle_id)
if not vehicle_state:
print(f"未找到车辆: {vehicle_id}")
return jsonify({
"code": 404,
"msg": f"Vehicle {vehicle_id} not found",
"transId": data.get("transId"),
"timestamp": int(time.time() * 1000)
}), 404
# 打印当前车辆状态
print(f"当前车辆状态: vehicle={vehicle_id}")
vehicle_state.log_state()
# 特勤车只响应红绿灯信号
if vehicle_id.startswith("TQ"):
if command_type != "SIGNAL":
print(f"特勤车辆忽略非红绿灯指令: {vehicle_id}")
return jsonify({
"code": 200,
"msg": "Special vehicle only responds to traffic light signals",
"transId": data.get("transId"),
"timestamp": int(time.time() * 1000)
})
# 更新红绿灯状态和指令状态
vehicle_state.update_command(signal_state, target_lat, target_lon)
print(f"特勤车 {vehicle_id} 更新状态: command={command_type}, traffic_light={vehicle_state.traffic_light_state}")
return jsonify({
"code": 200,
"msg": "Traffic light state updated",
"transId": data.get("transId"),
"timestamp": int(time.time() * 1000)
})
# 检查指令优先级并添加详细日志
check_command = signal_state if command_type == "SIGNAL" else command_type
can_override = vehicle_state.can_be_overridden_by(check_command)
print(f"指令优先级检查: vehicle={vehicle_id}, current_command={vehicle_state.current_command}, "
f"new_command={check_command}, can_override={can_override}")
if not can_override:
print(f"指令优先级过低: vehicle={vehicle_id}, current_priority={vehicle_state.command_priority}, "
f"command={check_command}")
return jsonify({
"code": 400,
"msg": "Command priority too low",
"transId": data.get("transId"),
"timestamp": int(time.time() * 1000)
})
# 处理不同类型的指令
if command_type == "SIGNAL":
# 更新红绿灯状态和指令状态
vehicle_state.update_command(signal_state, target_lat, target_lon)
print(f"车辆 {vehicle_id} 更新状态: command={command_type}, traffic_light={vehicle_state.traffic_light_state}")
return jsonify({
"code": 200,
"msg": "Traffic light state updated",
"transId": data.get("transId"),
"timestamp": int(time.time() * 1000)
})
elif command_type in ["ALERT", "WARNING"]:
# 查找当前车辆
current_vehicle = None
for v in vehicle_data:
if v["vehicleNo"] == vehicle_id:
current_vehicle = v
break
# 执行告警指令直接停车
print(f"执行{'紧急' if command_type == 'ALERT' else '正常'}制动: vehicle={vehicle_id}")
vehicle_state.current_command = command_type
vehicle_state.command_priority = COMMAND_PRIORITIES.get(command_type, 0)
vehicle_state.is_running = False
vehicle_state.target_speed = 0
vehicle_state.brake_mode = "emergency" if command_type == "ALERT" else "normal"
vehicle_state.target_lat = target_lat
vehicle_state.target_lon = target_lon
# 立即更新车辆速度
if current_vehicle:
current_vehicle["speed"] = 0
return jsonify({
"code": 200,
"msg": "Command executed successfully",
"transId": data.get("transId"),
"timestamp": int(time.time() * 1000)
})
elif command_type == "RESUME":
print(f"执行恢复指令: vehicle_id={vehicle_id}")
# 检查当前车辆
current_vehicle = None
for v in vehicle_data:
if v["vehicleNo"] == vehicle_id:
current_vehicle = v
break
# 清除限制性指令
if vehicle_state.current_command in ["ALERT", "WARNING"]:
print(f"清除限制性指令: vehicle={vehicle_id}")
vehicle_state.current_command = None
vehicle_state.command_priority = 0
vehicle_state.is_running = True
vehicle_state.target_speed = DEFAULT_VEHICLE_SPEED
vehicle_state.brake_mode = None
vehicle_state.target_lat = None
vehicle_state.target_lon = None
# 更新车辆运行状态
vehicle_state.is_running = vehicle_state.can_move()
if vehicle_state.is_running and current_vehicle:
print(f"车辆 {vehicle_id} 恢复运行")
current_vehicle["speed"] = DEFAULT_VEHICLE_SPEED
# 记录状态变化但不更新指令
vehicle_state.command_reason = reason
vehicle_state.last_command_time = time.time()
print(f"Vehicle {vehicle_id} state updated: running={vehicle_state.is_running}, "
f"command={vehicle_state.current_command}, traffic_light={vehicle_state.traffic_light_state}, "
f"reason={reason}, priority={vehicle_state.command_priority}, "
f"target_speed={vehicle_state.target_speed}, brake_mode={vehicle_state.brake_mode}")
return jsonify({
"code": 200,
"msg": "Command executed successfully",
"transId": data.get("transId"),
"timestamp": int(time.time() * 1000)
})
return jsonify({
"code": 200,
"msg": "Command executed successfully",
"transId": data.get("transId"),
"timestamp": int(time.time() * 1000)
})
except Exception as e:
print(f"Error handling vehicle command: {str(e)}")
return jsonify({
"code": 500,
"msg": str(e),
"transId": data.get("transId", ""),
"timestamp": int(time.time() * 1000)
}), 500
def calculate_distance_to_intersection(vehicle, intersection):
"""计算车辆到路口的距离(米)"""
vehicle_lat = vehicle["latitude"]
vehicle_lon = vehicle["longitude"]
intersection_lat = intersection["latitude"]
intersection_lon = intersection["longitude"]
# 使用经纬度计算距离
lat_diff = (vehicle_lat - intersection_lat) * 111319.9
lon_diff = (vehicle_lon - intersection_lon) * (111319.9 * math.cos(math.radians(vehicle_lat)))
return math.sqrt(lat_diff * lat_diff + lon_diff * lon_diff)
def calculate_path_direction(start_point, end_point):
"""计算两点之间的方向角度(相对于正北方向,顺时针)"""
dlat = end_point["latitude"] - start_point["latitude"]
dlon = end_point["longitude"] - start_point["longitude"]
angle_rad = math.atan2(dlon, dlat) # 使用 atan2 计算角度
angle_deg = math.degrees(angle_rad)
# 转换为顺时针方向正北为0度
return (90 - angle_deg) % 360
def get_front_traffic_light(vehicle, distance_to_west, distance_to_east):
"""获取车辆前方的红绿灯状态"""
# 根据车辆的位置和方向判断前方路口
if vehicle["vehicleNo"] == "QN001":
# 确保 phase 属性存在
if "phase" not in vehicle:
vehicle["phase"] = 0
# QN001 的路线T1 -> T2 -> T4
if vehicle["phase"] == 0: # T1 -> T2 阶段
# 计算 T1 到 T2 的实际方向
actual_direction = calculate_path_direction(POINT_T1, POINT_T2)
# 允许 15 度的误差范围
direction_diff = abs(vehicle["direction"] - actual_direction)
direction_diff = min(direction_diff, 360 - direction_diff)
# 在西路口以北时,向 T2 移动需要判断西路口红绿灯
if direction_diff <= 15 and vehicle["latitude"] > T2_INTERSECTION["latitude"]:
return traffic_light_data[0], distance_to_west # 西路口红绿灯
elif vehicle["vehicleNo"] == "QN002":
# 计算 T12 到 T8 的实际方向
actual_direction = calculate_path_direction(POINT_T12, POINT_T8)
# 允许 15 度的误差范围
direction_diff = abs(vehicle["direction"] - actual_direction)
direction_diff = min(direction_diff, 360 - direction_diff)
# 在东路口以西时,向 T8 移动需要判断东路红绿灯
if direction_diff <= 15 and vehicle["longitude"] < T6_INTERSECTION["longitude"]:
return traffic_light_data[1], distance_to_east # 东路口红绿灯
elif vehicle["vehicleNo"] == "TQ001":
# 确保 phase 属性存在
if "phase" not in vehicle:
vehicle["phase"] = 0
# TQ001 的路线T4 -> T2 -> T3
if vehicle["phase"] == 0: # T4 -> T2 阶段
# 计算 T4 到 T2 的实际方向
actual_direction = calculate_path_direction(POINT_T4, POINT_T2)
# 允许 15 度的误差范围
direction_diff = abs(vehicle["direction"] - actual_direction)
direction_diff = min(direction_diff, 360 - direction_diff)
# 在西路口以东时,向 T2 移动需要判断西路口红绿灯
if direction_diff <= 15 and vehicle["longitude"] > T2_INTERSECTION["longitude"]:
return traffic_light_data[0], distance_to_west # 西路口红绿灯
# 其他情况,表示车辆已经过路口或不需要判断红绿灯
return None, float('inf')
def update_position_with_vector(obj, target_point, start_point, speed, elapsed_time, return_to_start=False):
"""
基于向量的位置更新逻辑
Args:
obj: 需要更新位置的对象(航空器或车辆)
target_point: 目标点坐标 {"latitude": float, "longitude": float}
start_point: 起点坐标 {"latitude": float, "longitude": float}
speed: 移动速度km/h
elapsed_time: 经过的时间(秒)
return_to_start: 是否在到达目标点时返回起点
"""
# 检查是否在等待状态
if "wait_until" not in obj:
obj["wait_until"] = 0
current_time = time.time()
if current_time < obj["wait_until"]:
return False # 还在等待中,不更新位置
# 计算这一步要移动的距离(米)并转换为经纬度
speed_mps = speed * 1000 / 3600
distance = speed_mps * elapsed_time
# 将移动距离转换为经纬度变化量
move_dlat, move_dlon = meters_to_degrees(distance, obj["latitude"])
# 将15米的判断距离转换为经纬度
check_dlat, check_dlon = meters_to_degrees(15, obj["latitude"])
# 计算当前位置到终点的向量
vector_lat = target_point["latitude"] - obj["latitude"]
vector_lon = target_point["longitude"] - obj["longitude"]
# 计算向量长度(经纬度空间)
vector_length = math.sqrt(vector_lat * vector_lat + vector_lon * vector_lon)
# 检查是否到达终点
if vector_length <= math.sqrt(check_dlat * check_dlat + check_dlon * check_dlon):
if return_to_start:
# 返回起点并设置等待时间
obj["latitude"] = start_point["latitude"]
obj["longitude"] = start_point["longitude"]
obj["wait_until"] = current_time + WAIT_TIME_AFTER_RETURN # 使用常量
print(f"对象 {obj.get('flightNo', obj.get('vehicleNo'))} 返回起点,等待{WAIT_TIME_AFTER_RETURN}秒后继续")
else:
# 否则就停在目标点
obj["latitude"] = target_point["latitude"]
obj["longitude"] = target_point["longitude"]
return True # 表示已到达终点
else:
# 归一化方向向量
unit_lat = vector_lat / vector_length
unit_lon = vector_lon / vector_length
# 计算这一步的位置变化
dlat = move_dlat * unit_lat
dlon = move_dlon * unit_lon
# 更新位置
obj["latitude"] += dlat
obj["longitude"] += dlon
return False # 表示正在移动中
def update_aircraft_position(aircraft, elapsed_time):
"""更新航空器位置"""
reached_target = update_position_with_vector(
aircraft,
POINT_T11, # 目标点
POINT_T13, # 起点
aircraft["speed"],
elapsed_time,
return_to_start=True # 航空器需要返回起点
)
if reached_target:
print(f"航空器 {aircraft['flightNo']} 到达终点,返回起点 T7")
def update_vehicle_position(vehicle, elapsed_time):
"""更新车辆位置"""
# 获取车辆状态
vehicle_state = vehicle_states.get(vehicle["vehicleNo"])
if not vehicle_state:
return
# 确保所有车辆都有 phase 属性
if "phase" not in vehicle:
vehicle["phase"] = 0
print(f"初始化车辆 {vehicle['vehicleNo']} 的 phase 属性为 0")
# 获取前方红绿灯状态和距离
traffic_light, distance = get_front_traffic_light(vehicle,
calculate_distance_to_intersection(vehicle, T2_INTERSECTION),
calculate_distance_to_intersection(vehicle, T6_INTERSECTION))
# 特勤车只响应红绿灯
if vehicle["vehicleNo"].startswith("TQ"):
# 红灯且距离停止线小于等于 50 米时停车
if traffic_light and traffic_light["state"] == 0 and distance <= DIST_50M:
vehicle["speed"] = 0
print(f"特勤车 {vehicle['vehicleNo']} 遇红灯在停止线前停车,距路口 {distance:.1f}")
return
# 其他情况正常行驶
vehicle["speed"] = DEFAULT_VEHICLE_SPEED
print(f"特勤车 {vehicle['vehicleNo']} 正常行驶,速度={vehicle['speed']}km/h")
else:
# 普通车辆的移动逻辑
# 红灯且距离停止线小于等于 50 米时必须停车
if traffic_light and traffic_light["state"] == 0 and distance <= DIST_50M:
vehicle["speed"] = 0
vehicle_state.current_command = "RED" # 设置当前指令为红灯
vehicle_state.is_running = False # 设置为停止状态
print(f"车辆 {vehicle['vehicleNo']} 遇红灯在停止线前停车,距路口 {distance:.1f}")
return
elif vehicle_state.current_command in ["ALERT", "WARNING"]:
# 其他停车指令
vehicle["speed"] = 0
print(f"车辆 {vehicle['vehicleNo']} 不能移动: command={vehicle_state.current_command}")
return
# 可以移动,设置正常速度
vehicle["speed"] = DEFAULT_VEHICLE_SPEED
print(f"车辆 {vehicle['vehicleNo']} 可以移动,速度={vehicle['speed']}km/h")
# 更新位置
if vehicle["vehicleNo"].startswith("QN"):
# 无人车的路径更新逻辑
if vehicle["vehicleNo"] == "QN001":
if vehicle["phase"] == 0: # T1 -> T2
reached_target = update_position_with_vector(
vehicle,
POINT_T2, # 目标点(西路口)
POINT_T1, # 起点
vehicle["speed"],
elapsed_time,
return_to_start=False # 不返回起点,继续到下一个点
)
if reached_target:
vehicle["phase"] = 1 # 切换到下一阶段
print(f"无人车 {vehicle['vehicleNo']} 到达 T2开始前往 T4")
elif vehicle["phase"] == 1: # T2 -> T4
reached_target = update_position_with_vector(
vehicle,
POINT_T4, # 目标点
POINT_T2, # 起点
vehicle["speed"],
elapsed_time,
return_to_start=False # 不返回起点,继续到下一个点
)
if reached_target:
# 到达 T4 后直接跳回起点 T1
vehicle["latitude"] = POINT_T1["latitude"]
vehicle["longitude"] = POINT_T1["longitude"]
vehicle["phase"] = 0 # 重置到初始阶段
print(f"无人车 {vehicle['vehicleNo']} 到达 T4直接返回起点 T1")
else: # QN002
reached_target = update_position_with_vector(
vehicle,
POINT_T8, # 目标点
POINT_T12, # 使用新的起点
vehicle["speed"],
elapsed_time,
return_to_start=True # QN002 需要返回起点
)
if reached_target:
print(f"无人车 {vehicle['vehicleNo']} 到达终点,返回起点 T12")
elif vehicle["vehicleNo"].startswith("TQ"):
# 特勤车的路径更新逻辑
if vehicle["phase"] == 0: # T4 -> T2
reached_target = update_position_with_vector(
vehicle,
POINT_T2, # 目标点(西路口)
POINT_T4, # 起点
vehicle["speed"],
elapsed_time,
return_to_start=False # 不返回起点,继续到下一个点
)
if reached_target:
vehicle["phase"] = 1 # 切换到下一阶段
print(f"特勤车 {vehicle['vehicleNo']} 到达 T2开始前往 T3")
elif vehicle["phase"] == 1: # T2 -> T3
reached_target = update_position_with_vector(
vehicle,
POINT_T3, # 目标点(特勤车终点)
POINT_T2, # 起点
vehicle["speed"],
elapsed_time,
return_to_start=False # 不返回起点,继续到下一个点
)
if reached_target:
# 到达 T3 后直接跳回起点 T4
vehicle["latitude"] = POINT_T4["latitude"]
vehicle["longitude"] = POINT_T4["longitude"]
vehicle["phase"] = 0 # 重置到初始阶段
print(f"特勤车 {vehicle['vehicleNo']} 到达 T3直接返回起点 T4")
# 新时间戳
vehicle["time"] = int(time.time() * 1000)
# 定义路口信息
INTERSECTIONS = {
"TL001": {
"id": "INT001",
"name": "西路口",
"latitude": T2_INTERSECTION["latitude"],
"longitude": T2_INTERSECTION["longitude"]
},
"TL002": {
"id": "INT002",
"name": "东路口",
"latitude": T6_INTERSECTION["latitude"],
"longitude": T6_INTERSECTION["longitude"]
}
}
def generate_traffic_light_data():
"""生成红绿灯数据"""
traffic_light_data = []
# 生成两个固定路口的红绿灯数据
for tl_id, intersection in INTERSECTIONS.items():
signal = {
"id": tl_id,
"state": 1, # 0: RED, 1: GREEN测试时保持绿灯
"timestamp": int(time.time() * 1000),
"intersection": intersection["id"],
"position": {
"latitude": intersection["latitude"],
"longitude": intersection["longitude"]
}
}
traffic_light_data.append(signal)
return traffic_light_data
# 红绿灯数据
traffic_light_data = generate_traffic_light_data()
# 分别记录航空器和车辆的上次更新时间
last_aircraft_update_time = time.time()
last_vehicle_update_time = time.time()
last_light_switch_time = time.time()
def check_auth():
auth_header = request.headers.get('Authorization')
if not auth_header or auth_header != AUTH_TOKEN:
return False
return True
@app.route('/openApi/getCurrentFlightPositions', methods=['GET', 'OPTIONS'])
def get_flight_positions():
"""获取当前航空器位置信息"""
if request.method == 'OPTIONS':
return '', 204
if not check_auth():
return jsonify({
"status": 401,
"msg": "认证失败",
"data": None
}), 401
global last_aircraft_update_time
current_time = time.time()
elapsed_time = current_time - last_aircraft_update_time
# 只在达到更新间隔时更新位置
if elapsed_time >= UPDATE_INTERVAL:
for aircraft in aircraft_data:
update_aircraft_position(aircraft, UPDATE_INTERVAL)
aircraft["time"] = int(current_time * 1000)
last_aircraft_update_time = current_time
# 创建符合 API 格式的响应数据
response_data = []
for aircraft in aircraft_data:
api_aircraft = {
"flightNo": aircraft["flightNo"],
"longitude": aircraft["longitude"],
"latitude": aircraft["latitude"],
"time": aircraft["time"],
"altitude": aircraft["altitude"],
"trackNumber": aircraft["trackNumber"]
}
response_data.append(api_aircraft)
return jsonify({
"status": 200,
"msg": "当前航空器实时位置数据",
"data": response_data
})
def switch_traffic_light_state():
"""统一处理红绿灯状态切换"""
global last_light_switch_time
current_time = time.time()
# 西路口红绿灯每15秒切换一次
elapsed_since_switch = current_time - last_light_switch_time
if elapsed_since_switch >= TRAFFIC_LIGHT_SWITCH_INTERVAL: # 使用常量
# 获取当前状态
current_state = traffic_light_data[0]["state"]
# 根据当前状态决定下一个状态
if current_state == 0: # 当前是红灯
# 切换到黄灯(表示红绿灯故障)
traffic_light_data[0]["state"] = 2 # 2 表示黄灯
print(f"西路口红绿灯状态切换为: 黄灯(红绿灯故障)")
elif current_state == 2: # 当前是黄灯
# 从黄灯切换到绿灯
traffic_light_data[0]["state"] = 1 # 1 表示绿灯
print(f"西路口红绿灯状态切换为: 绿灯")
else: # 当前是绿灯
# 从绿灯切换到红灯
traffic_light_data[0]["state"] = 0 # 0 表示红灯
print(f"西路口红绿灯状态切换为: 红灯")
last_light_switch_time = current_time
# 更新东路口红绿灯(根据航空器位置)
if aircraft_data:
aircraft = aircraft_data[0]
lat_diff = abs(aircraft["latitude"] - T6_INTERSECTION["latitude"]) * 111319.9
old_state = traffic_light_data[1]["state"]
traffic_light_data[1]["state"] = 1 if lat_diff > DIST_50M else 1
if old_state != traffic_light_data[1]["state"]:
print(f"东路口红绿灯状态切换为: {'绿灯' if traffic_light_data[1]['state'] == 1 else '红灯'}")
@app.route('/openApi/getCurrentVehiclePositions', methods=['GET', 'OPTIONS'])
def get_vehicle_positions():
"""获取当前车辆位置信息"""
if request.method == 'OPTIONS':
return '', 204
if not check_auth():
return jsonify({
"status": 401,
"msg": "认证失败",
"data": None
}), 401
global last_vehicle_update_time
current_time = time.time()
elapsed_time = current_time - last_vehicle_update_time
try:
# 统一处理红绿灯状态切换
# switch_traffic_light_state() # 采用模拟的红绿灯脚本,注释掉此处的调用
# 只在达到更新间隔时更新位置
if elapsed_time >= UPDATE_INTERVAL:
for vehicle in vehicle_data:
update_vehicle_position(vehicle, UPDATE_INTERVAL)
vehicle["time"] = int(current_time * 1000)
last_vehicle_update_time = current_time
return jsonify({
"status": 200,
"msg": "当前车辆实时位置数据",
"data": vehicle_data
})
except Exception as e:
print(f"Error in get_vehicle_positions: {str(e)}")
return jsonify({
"status": 500,
"msg": str(e),
"data": None
}), 500
@app.route('/openApi/getTrafficLightSignals', methods=['GET', 'OPTIONS'])
def get_traffic_light_signals():
"""获取红绿灯信号状态 (旧的轮询接口)"""
if request.method == 'OPTIONS':
return '', 204
# --- 注释掉原有逻辑 (2025-05-06 by AI Assistant) ---
# 原因: 为了测试新的 TrafficLightHttpServer 推送机制,暂时禁用此轮询接口返回数据。
# 模拟和发送红绿灯状态的功能现在由 tools/mock_traffic_light_client.py 负责。
# 如需恢复轮询测试,请取消下面的注释并注释掉新的 return 语句。
# # 更新红绿灯状态
# # switch_traffic_light_state() # 采用模拟的红绿灯脚本,注释掉此处的调用
#
# # 更新时间戳
# for signal in traffic_light_data:
# signal[\"timestamp\"] = int(time.time() * 1000)
#
# return jsonify({
# \"status\": 200,
# \"msg\": \"获取红绿灯信号成功\",
# \"data\": traffic_light_data
# })
# --- 注释结束 ---
# 新增: 返回空数据,以禁用轮询数据源
return jsonify({
"status": 200,
"msg": "Traffic light polling endpoint disabled to test push mechanism.",
"data": []
})
@app.after_request
def add_cors_headers(response):
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, OPTIONS'
response.headers['Access-Control-Max-Age'] = '3600'
return response
def check_vehicle_states():
"""定期检查所有车辆状态"""
for vehicle_no, state in vehicle_states.items():
state.log_state()
# 认证相关配置
AUTH_USERNAME = "dianxin"
AUTH_PASSWORD = "dianxin@123"
AUTH_TOKEN = "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3MzI3ODMwOTAsInVzZXJuYW1lIjoiYWRtaW4ifQ.y9feEL_9NT8UzED9NNkb0Ln6C-PBoufiSHWobWe5vWY"
@app.route('/login', methods=['POST', 'OPTIONS'])
def login():
if request.method == 'OPTIONS':
return '', 204
# 从 URL query parameters 中获取用户名和密码
username = request.args.get('username')
password = request.args.get('password')
print(f"收到登录请求: username={username}, password={password}")
print(f"请求 URL: {request.url}")
print(f"请求方法: {request.method}")
print(f"请求参数: {request.args}")
if not username or not password:
return jsonify({
"status": 400,
"msg": "缺少用户名或密码",
"data": None
}), 400
if username == AUTH_USERNAME and password == AUTH_PASSWORD:
return jsonify({
"status": 200,
"msg": "登入成功",
"data": AUTH_TOKEN
})
else:
return jsonify({
"status": 401,
"msg": "用户名或密码错误",
"data": None
}), 401
@app.route('/openApi/getVehicleStatus', methods=['GET', 'OPTIONS'])
def get_vehicle_status():
"""获取无人车状态信息"""
if request.method == 'OPTIONS':
return '', 204
vehicle_id = request.args.get('vehicleId')
if not vehicle_id:
return jsonify({
"status": 400,
"msg": "缺少 vehicleId 参数",
"data": None
}), 400
# 查找对应的车辆
vehicle_state = vehicle_states.get(vehicle_id)
if not vehicle_state:
return jsonify({
"status": 404,
"msg": f"未找到车辆 {vehicle_id}",
"data": None
}), 404
# 返回车辆状态
return jsonify({
"status": 200,
"msg": "获取车辆状态成功",
"data": {
"vehicleId": vehicle_id,
"status": "NORMAL" if vehicle_state.is_running else "STOPPED",
"command": vehicle_state.current_command,
"commandPriority": vehicle_state.command_priority,
"trafficLightState": vehicle_state.traffic_light_state,
"timestamp": int(time.time() * 1000)
}
})
# 无人车位置数据生成函数
def generate_unmanned_vehicle_location_data():
"""生成无人车位置数据符合官方API格式"""
unmanned_vehicles = []
# 从现有vehicle_data中筛选无人车QN开头的车辆
for vehicle in vehicle_data:
if vehicle["vehicleNo"].startswith("QN"):
location_info = {
"transId": str(uuid.uuid4()),
"timestamp": int(time.time() * 1000),
"vehicleID": vehicle["vehicleNo"],
"latitude": vehicle["latitude"],
"longitude": vehicle["longitude"],
"speed": vehicle["speed"] / 3.6, # 转换为m/s
"direction": math.radians(vehicle["direction"]) # 转换为弧度
}
unmanned_vehicles.append(location_info)
return unmanned_vehicles
# 无人车状态数据生成函数
def generate_unmanned_vehicle_state_data(vehicle_id=None, is_single=True):
"""生成无人车状态数据符合官方API格式"""
vehicle_states_data = []
if is_single and vehicle_id:
# 单个车辆状态查询
vehicle_state = vehicle_states.get(vehicle_id)
if vehicle_state:
state_info = {
"transId": str(uuid.uuid4()),
"timestamp": int(time.time() * 1000),
"vehicleID": vehicle_id,
"loginState": True,
"faultInfo": [],
"activeSafety": not vehicle_state.is_running,
"RC": False,
"Command": 1 if vehicle_state.current_command == "ALERT" else 0,
"airportInfo": [],
"vehicleMode": 2, # 自动模式
"gearState": 2, # D档
"chassisReady": vehicle_state.is_running,
"collisionStatus": False,
"clearance": 1 if vehicle_state.is_running else 0,
"turnSignalStstus": 0,
"pointCloud": []
}
vehicle_states_data.append(state_info)
else:
# 所有车辆状态查询
for vehicle in vehicle_data:
if vehicle["vehicleNo"].startswith("QN"):
vehicle_state = vehicle_states.get(vehicle["vehicleNo"])
state_info = {
"transId": str(uuid.uuid4()),
"timestamp": int(time.time() * 1000),
"vehicleID": vehicle["vehicleNo"],
"loginState": True,
"faultInfo": [],
"activeSafety": not vehicle_state.is_running if vehicle_state else False,
"RC": False,
"Command": 1 if vehicle_state and vehicle_state.current_command == "ALERT" else 0,
"airportInfo": [],
"vehicleMode": 2,
"gearState": 2,
"chassisReady": vehicle_state.is_running if vehicle_state else True,
"collisionStatus": False,
"clearance": 1 if vehicle_state and vehicle_state.is_running else 0,
"turnSignalStstus": 0,
"pointCloud": []
}
vehicle_states_data.append(state_info)
return vehicle_states_data
# 无人车API接口
@app.route('/api/VehicleLocationInfo', methods=['GET', 'OPTIONS'])
def get_unmanned_vehicle_location():
"""无人车位置上报接口"""
if request.method == 'OPTIONS':
return '', 204
try:
# 更新车辆位置
current_time = time.time()
global last_vehicle_update_time
elapsed_time = current_time - last_vehicle_update_time
if elapsed_time >= UPDATE_INTERVAL:
for vehicle in vehicle_data:
update_vehicle_position(vehicle, UPDATE_INTERVAL)
vehicle["time"] = int(current_time * 1000)
last_vehicle_update_time = current_time
# 生成无人车位置数据
location_data = generate_unmanned_vehicle_location_data()
print(f"返回无人车位置数据,数量: {len(location_data)}")
return jsonify(location_data)
except Exception as e:
print(f"Error in get_unmanned_vehicle_location: {str(e)}")
return jsonify([]), 500
@app.route('/api/VehicleStateInfo', methods=['POST', 'OPTIONS'])
def get_unmanned_vehicle_state():
"""无人车状态查询接口"""
if request.method == 'OPTIONS':
return '', 204
try:
data = request.json
vehicle_id = data.get("vehicleID") if data else None
is_single = data.get("isSingle", True) if data else False
print(f"收到无人车状态查询请求: vehicle_id={vehicle_id}, is_single={is_single}")
# 生成无人车状态数据
state_data = generate_unmanned_vehicle_state_data(vehicle_id, is_single)
print(f"返回无人车状态数据,数量: {len(state_data)}")
return jsonify(state_data)
except Exception as e:
print(f"Error in get_unmanned_vehicle_state: {str(e)}")
return jsonify([]), 500
if __name__ == '__main__':
app.run(host='localhost', port=8090, debug=True)