957 lines
37 KiB
Python
957 lines
37 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
无人车厂商平台模拟服务
|
||
Unmanned Vehicle Manufacturer Platform Mock Server
|
||
|
||
提供无人车位置上报、状态查询、控制指令等API接口
|
||
模拟真实的无人车厂商平台与机场管理系统的交互
|
||
"""
|
||
|
||
from flask import Flask, jsonify, request
|
||
import time
|
||
import math
|
||
import logging
|
||
import os
|
||
from typing import Any, Literal, final, TypedDict
|
||
|
||
# 导入统一的日志配置
|
||
import sys
|
||
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
||
from logging_config import setup_logger
|
||
|
||
# 设置当前模块的日志记录器
|
||
logger = setup_logger(
|
||
name='mock_unmanned_vehicle',
|
||
log_file='../logs/mock_unmanned_vehicle.log',
|
||
max_bytes=10*1024*1024, # 10MB
|
||
backup_count=5
|
||
)
|
||
|
||
app = Flask(__name__)
|
||
|
||
# 为位置与移动对象定义类型,避免使用 Any
|
||
class Point(TypedDict):
|
||
latitude: float
|
||
longitude: float
|
||
|
||
# 必填字段
|
||
class MovingObjectRequired(TypedDict):
|
||
latitude: float
|
||
longitude: float
|
||
time: int
|
||
speed: float
|
||
moving_to_end: bool
|
||
start_point: Point
|
||
end_point: Point
|
||
|
||
# 可选字段(total=False)
|
||
class MovingObject(MovingObjectRequired, total=False):
|
||
vehicleNo: str
|
||
wait_until: float
|
||
direction: float
|
||
|
||
# 地球半径(米)
|
||
EARTH_RADIUS = 6378137.0
|
||
|
||
# 指令优先级配置
|
||
COMMAND_PRIORITIES = {
|
||
"ALERT": 5,
|
||
"WARNING": 3,
|
||
"PARKING": 2,
|
||
"RESUME": 1
|
||
}
|
||
|
||
# 距离配置(米)
|
||
DIST_300M = 300
|
||
DIST_200M = 200
|
||
DIST_150M = 150
|
||
DIST_100M = 100
|
||
DIST_50M = 50
|
||
|
||
# 时间配置(秒)
|
||
WAIT_TIME_AFTER_RETURN = 0.1 # 返回起点后的等待时间
|
||
UPDATE_INTERVAL = 1.0 # 位置更新间隔, 默认 1 秒
|
||
|
||
# 无人车路径配置
|
||
# 无人车A 鲁B567 路径
|
||
UNMANNED_A_START = {"longitude": 120.083084, "latitude": 36.369696} # 无人车A起点
|
||
UNMANNED_A_END = {"longitude": 120.084637, "latitude": 36.365617} # 无人车A终点
|
||
|
||
# 无人车B 鲁B579 路径
|
||
UNMANNED_B_START = {"longitude": 120.086965, "latitude": 36.368599} # 无人车B起点
|
||
UNMANNED_B_END = {"longitude": 120.086263, "latitude": 36.370484} # 无人车B终点
|
||
|
||
# 车辆配置
|
||
DEFAULT_VEHICLE_SPEED = 30.0 # km/h,默认速度
|
||
EMERGENCY_BRAKE_DECELERATION = 0.8 # 紧急制动减速度 (每次更新减速 80%)
|
||
NORMAL_BRAKE_DECELERATION = 0.2 # 正常制动减速度 (每次更新减速 20%)
|
||
|
||
# 认证相关配置
|
||
AUTH_USERNAME = "dianxin"
|
||
AUTH_PASSWORD = "dianxin@123"
|
||
AUTH_TOKEN = "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3MzI3ODMwOTAsInVzZXJuYW1lIjoiYWRtaW4ifQ.y9feEL_9NT8UzED9NNkb0Ln6C-PBoufiSHWobWe5vWY"
|
||
|
||
def meters_to_degrees(meters: float, latitude: float) -> tuple[float, float]:
|
||
"""
|
||
将米转换为度,考虑纬度的影响
|
||
"""
|
||
# 纬度方向:1度 = 111,319.9米
|
||
meters_per_deg_lat: float = 111319.9
|
||
# 经度方向:1度 = 111,319.9 * cos(latitude)米
|
||
meters_per_deg_lon: float = meters_per_deg_lat * math.cos(math.radians(latitude))
|
||
return meters / meters_per_deg_lat, meters / meters_per_deg_lon
|
||
|
||
def to_float_safe(x: object) -> float | None:
|
||
"""
|
||
安全将值转换为 float:
|
||
- 允许 int/float
|
||
- 允许非布尔的数字字符串
|
||
- 允许 dict,优先取 'value'、'longitude'、'latitude' 等常见键
|
||
- 无法解析则返回 None
|
||
"""
|
||
# 排除布尔值(bool 是 int 的子类,但不应当被视为数值)
|
||
if isinstance(x, bool):
|
||
return None
|
||
if isinstance(x, (int, float)):
|
||
return float(x)
|
||
if isinstance(x, str):
|
||
try:
|
||
return float(x.strip())
|
||
except Exception:
|
||
return None
|
||
if isinstance(x, dict):
|
||
for key in ("value", "longitude", "latitude"):
|
||
if key in x:
|
||
return to_float_safe(x[key])
|
||
return None
|
||
return None
|
||
|
||
def calculate_distance_to_target(vehicle: dict[str, float], target_lat: float, target_lon: float) -> float:
|
||
"""计算车辆到目标位的距离(米)"""
|
||
# 转换为弧度
|
||
lat1_rad = math.radians(vehicle["latitude"])
|
||
lon1_rad = math.radians(vehicle["longitude"])
|
||
lat2_rad = math.radians(target_lat)
|
||
lon2_rad = math.radians(target_lon)
|
||
|
||
dlat = lat2_rad - lat1_rad
|
||
dlon = lon2_rad - lon1_rad
|
||
|
||
# 使用 Haversine 公式计算距离
|
||
a = math.sin(dlat/2) * math.sin(dlat/2) + \
|
||
math.cos(lat1_rad) * math.cos(lat2_rad) * \
|
||
math.sin(dlon/2) * math.sin(dlon/2)
|
||
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
|
||
return EARTH_RADIUS * c
|
||
|
||
def update_position_with_vector(
|
||
obj: dict[str, Any],
|
||
target_point: "Point",
|
||
start_point: "Point",
|
||
speed: float,
|
||
elapsed_time: float,
|
||
return_to_start: bool = False
|
||
) -> bool:
|
||
"""
|
||
基于向量的位置更新逻辑
|
||
|
||
Args:
|
||
obj: 需要更新位置的对象(车辆)
|
||
target_point: 目标点坐标 {"latitude": float, "longitude": float}
|
||
start_point: 起点坐标 {"latitude": float, "longitude": float}
|
||
speed: 移动速度(km/h)
|
||
elapsed_time: 经过的时间(秒)
|
||
return_to_start: 是否在到达目标点时返回起点
|
||
"""
|
||
# 检查是否在等待状态
|
||
if "wait_until" not in obj:
|
||
obj["wait_until"] = 0.0
|
||
|
||
current_time = time.time()
|
||
# 基于类型安全读取 wait_until,避免 float 与 object 比较
|
||
_wu = to_float_safe(obj.get("wait_until"))
|
||
wait_until = 0.0 if _wu is None else float(_wu)
|
||
if current_time < wait_until:
|
||
return False # 还在等待中,不更新位置
|
||
|
||
# 计算这一步要移动的距离(米)并转换为经纬度
|
||
speed_mps = float(speed) * 1000.0 / 3600.0
|
||
distance = speed_mps * float(elapsed_time)
|
||
|
||
# 累计里程到总里程中
|
||
if "total_mileage" not in obj:
|
||
obj["total_mileage"] = 0.0
|
||
obj["total_mileage"] = to_float_safe(obj.get("total_mileage", 0.0)) or 0.0
|
||
obj["total_mileage"] += distance
|
||
|
||
# 安全读取并转换当前位置与目标点/起点经纬度为 float
|
||
cur_lat = to_float_safe(obj.get("latitude"))
|
||
cur_lon = to_float_safe(obj.get("longitude"))
|
||
tgt_lat = to_float_safe(target_point.get("latitude"))
|
||
tgt_lon = to_float_safe(target_point.get("longitude"))
|
||
if cur_lat is None or cur_lon is None or tgt_lat is None or tgt_lon is None:
|
||
# 无法解析经纬度,跳过更新
|
||
return False
|
||
|
||
# 将移动距离转换为经纬度变化量
|
||
move_dlat, move_dlon = meters_to_degrees(distance, cur_lat)
|
||
|
||
# 将5米的判断距离转换为经纬度(优化:从15米减少到5米,减少颤抖)
|
||
check_dlat, check_dlon = meters_to_degrees(5.0, cur_lat)
|
||
|
||
# 计算当前位置到终点的向量(经纬度空间)
|
||
vector_lat = float(tgt_lat) - float(cur_lat)
|
||
vector_lon = float(tgt_lon) - float(cur_lon)
|
||
|
||
# 计算向量长度(经纬度空间)
|
||
vector_length = math.sqrt(vector_lat * vector_lat + vector_lon * vector_lon)
|
||
|
||
# 检查是否到达终点
|
||
if vector_length <= math.sqrt(check_dlat * check_dlat + check_dlon * check_dlon):
|
||
# 精确设置到目标点位置,避免位置偏差
|
||
obj["latitude"] = float(tgt_lat)
|
||
obj["longitude"] = float(tgt_lon)
|
||
|
||
if return_to_start:
|
||
# 返回起点并设置等待时间
|
||
obj["wait_until"] = current_time + WAIT_TIME_AFTER_RETURN
|
||
print(f"无人车 {obj.get('vehicleNo')} 返回起点,等待{WAIT_TIME_AFTER_RETURN}秒后继续")
|
||
else:
|
||
# 设置短暂等待时间,避免立即切换方向导致的颤抖
|
||
obj["wait_until"] = current_time + 0.1 # 0.1秒的短暂等待
|
||
print(f"无人车 {obj.get('vehicleNo')} 到达目标点,等待0.1秒后切换方向")
|
||
return True # 表示已到达终点
|
||
else:
|
||
# 归一化方向向量
|
||
unit_lat = vector_lat / vector_length
|
||
unit_lon = vector_lon / vector_length
|
||
|
||
# 计算这一步的位置变化
|
||
dlat = move_dlat * unit_lat
|
||
dlon = move_dlon * unit_lon
|
||
|
||
# 更新位置(写回为 float)
|
||
obj["latitude"] = float(cur_lat + dlat)
|
||
obj["longitude"] = float(cur_lon + dlon)
|
||
return False # 表示正在移动中
|
||
|
||
def update_object_position(obj: dict[str, Any], elapsed_time: float) -> None:
|
||
"""统一的位置更新函数 - 用于无人车的往复运动"""
|
||
# 获取对象类型和标识
|
||
obj_id = obj.get("vehicleNo")
|
||
|
||
# 根据移动方向确定目标点
|
||
if obj["moving_to_end"]:
|
||
target_raw = obj["end_point"]
|
||
start_raw = obj["start_point"]
|
||
else:
|
||
target_raw = obj["start_point"]
|
||
start_raw = obj["end_point"]
|
||
|
||
# 构造类型安全的 Point
|
||
t_lat = to_float_safe(target_raw.get("latitude")) if isinstance(target_raw, dict) else None
|
||
t_lon = to_float_safe(target_raw.get("longitude")) if isinstance(target_raw, dict) else None
|
||
s_lat = to_float_safe(start_raw.get("latitude")) if isinstance(start_raw, dict) else None
|
||
s_lon = to_float_safe(start_raw.get("longitude")) if isinstance(start_raw, dict) else None
|
||
if t_lat is None or t_lon is None or s_lat is None or s_lon is None:
|
||
# 无法解析坐标,直接更新时间戳后返回
|
||
obj["time"] = int(time.time() * 1000)
|
||
return
|
||
|
||
target_point: Point = {"latitude": float(t_lat), "longitude": float(t_lon)}
|
||
start_point: Point = {"latitude": float(s_lat), "longitude": float(s_lon)}
|
||
|
||
# 读取速度为 float
|
||
spd = to_float_safe(obj.get("speed"))
|
||
if spd is None:
|
||
spd = 0.0
|
||
|
||
# 更新位置
|
||
reached_target = update_position_with_vector(
|
||
obj,
|
||
target_point,
|
||
start_point,
|
||
float(spd),
|
||
float(elapsed_time),
|
||
return_to_start=False
|
||
)
|
||
|
||
if reached_target:
|
||
# 切换移动方向
|
||
obj["moving_to_end"] = not obj["moving_to_end"]
|
||
target_name = "终点" if not obj["moving_to_end"] else "起点"
|
||
print(f"无人车 {obj_id} 到达{'终点' if obj['moving_to_end'] else '起点'},开始前往{target_name}")
|
||
|
||
# 更新时间戳
|
||
obj["time"] = int(time.time() * 1000)
|
||
|
||
def update_vehicle_position(vehicle: dict[str, Any], elapsed_time: float) -> None:
|
||
"""更新无人车位置 - 使用统一的位置更新函数"""
|
||
update_object_position(vehicle, elapsed_time)
|
||
|
||
# 无人车数据配置
|
||
current_timestamp = time.time()
|
||
unmanned_vehicle_data = [
|
||
{
|
||
"vehicleNo": "鲁B567", # 无人车A
|
||
"longitude": UNMANNED_A_START["longitude"],
|
||
"latitude": UNMANNED_A_START["latitude"],
|
||
"time": int(current_timestamp * 1000),
|
||
"direction": 90.0, # 90度
|
||
"speed": 25.0, # 25km/h
|
||
"start_point": UNMANNED_A_START, # 起点
|
||
"end_point": UNMANNED_A_END, # 终点
|
||
"moving_to_end": True, # 当前是否向终点移动
|
||
# 任务和里程相关字段
|
||
"total_mileage": 0.0, # 总里程(米)
|
||
"mission_start_time": int(current_timestamp * 1000), # 任务开始时间
|
||
"mission_id": "MISSION_A_001", # 任务ID
|
||
"mission_type": "PATROL_TRANSPORT" # 任务类型:巡逻运输
|
||
},
|
||
{
|
||
"vehicleNo": "鲁B579", # 无人车B
|
||
"longitude": UNMANNED_B_START["longitude"],
|
||
"latitude": UNMANNED_B_START["latitude"],
|
||
"time": int(current_timestamp * 1000),
|
||
"direction": 270.0, # 270度
|
||
"speed": 25.0, # 25km/h
|
||
"start_point": UNMANNED_B_START, # 起点
|
||
"end_point": UNMANNED_B_END, # 终点
|
||
"moving_to_end": True, # 当前是否向终点移动
|
||
# 任务和里程相关字段
|
||
"total_mileage": 0.0, # 总里程(米)
|
||
"mission_start_time": int(current_timestamp * 1000), # 任务开始时间
|
||
"mission_id": "MISSION_B_001", # 任务ID
|
||
"mission_type": "CARGO_TRANSPORT" # 任务类型:货物运输
|
||
}
|
||
]
|
||
|
||
# 添加车辆状态类
|
||
@final
|
||
class VehicleState:
|
||
vehicle_no: str
|
||
is_running: bool
|
||
current_command: str | None
|
||
command_priority: int
|
||
target_speed: float
|
||
brake_mode: Literal['emergency', 'normal'] | None
|
||
command_reason: str | None
|
||
last_command_time: float
|
||
traffic_light_state: str | None
|
||
target_lat: float | None
|
||
target_lon: float | None
|
||
is_traffic_light_stop: bool
|
||
|
||
def __init__(self, vehicle_no: str) -> None:
|
||
self.vehicle_no = vehicle_no
|
||
self.is_running = True # 运行状态
|
||
self.current_command = None # 当前执行的指令
|
||
self.command_priority = 0 # 当前指令优先级
|
||
self.target_speed = float(DEFAULT_VEHICLE_SPEED) # 目标速度
|
||
self.brake_mode = None # 制动模式:'emergency' 或 'normal'
|
||
self.command_reason = None # 指令原因
|
||
self.last_command_time = float(time.time()) # 最后一次指令时间
|
||
self.traffic_light_state = None # 当前红绿灯状态
|
||
self.target_lat = None # 目标纬度
|
||
self.target_lon = None # 目标经度
|
||
self.is_traffic_light_stop = False # 是否因红灯停车
|
||
self.remoteControlActive = False # 远程控制是否激活
|
||
|
||
def can_be_overridden_by(self, command_type: str) -> bool:
|
||
"""判断当前指令是否可以被新指令覆盖"""
|
||
# 红绿灯信号不参与指令优先级判断
|
||
if command_type in ["RED", "GREEN", "YELLOW"]:
|
||
return True
|
||
|
||
# 类型安全获取优先级,避免 Unknown | None 作为 dict key
|
||
new_priority = COMMAND_PRIORITIES.get(str(command_type), 0)
|
||
current_key = self.current_command if isinstance(self.current_command, str) else None
|
||
current_priority = COMMAND_PRIORITIES.get(current_key or "", 0)
|
||
|
||
# 相同类型的指令可以覆盖(因为需要持续发送)
|
||
if isinstance(self.current_command, str) and command_type == self.current_command:
|
||
return True
|
||
|
||
# ALERT 指令可以被 RESUME 解除
|
||
if self.current_command == "ALERT":
|
||
return command_type == "RESUME"
|
||
|
||
# WARNING 指令可以被 RESUME 或相同及更高优先级指令覆盖
|
||
if self.current_command == "WARNING":
|
||
return command_type == "RESUME" or new_priority >= current_priority
|
||
|
||
# 其他情况下,相同或更高优先级可以覆盖
|
||
return new_priority >= current_priority
|
||
|
||
def update_command(self, command_type: str, target_lat: float | None = None, target_lon: float | None = None) -> None:
|
||
"""更新指令状态"""
|
||
# 更新目标位置
|
||
if target_lat is not None and target_lon is not None:
|
||
self.target_lat = target_lat
|
||
self.target_lon = target_lon
|
||
|
||
# 如果是红绿灯状态,只更新状态,不影响其他指令
|
||
if command_type in ["RED", "GREEN", "YELLOW"]:
|
||
old_state = self.traffic_light_state
|
||
self.traffic_light_state = command_type
|
||
|
||
# 更新红灯停车状态
|
||
if command_type == "RED":
|
||
self.is_traffic_light_stop = True
|
||
elif command_type in ["GREEN", "YELLOW"]:
|
||
self.is_traffic_light_stop = False
|
||
|
||
print(f"无人车 {self.vehicle_no} 收到红绿灯信号: {command_type} (原状态: {old_state})")
|
||
else:
|
||
# 其他指令正常更新
|
||
self.current_command = command_type
|
||
self.command_priority = COMMAND_PRIORITIES.get(command_type, 0)
|
||
|
||
# RESUME 指令不清除红绿灯状态,红绿灯状态由信号灯独立控制
|
||
if command_type == "RESUME":
|
||
# 清除其他状态
|
||
self.brake_mode = None
|
||
self.command_reason = None
|
||
|
||
# 更新运行状态
|
||
self.is_running = self.can_move()
|
||
print(f"更新无人车 {self.vehicle_no} 状态: command={self.current_command}, traffic_light={self.traffic_light_state}, priority={self.command_priority}, is_running={self.is_running}")
|
||
|
||
def can_move(self) -> bool:
|
||
"""检查车辆是否可以移动"""
|
||
# 如果有告警指令,不能移动
|
||
if self.current_command == "ALERT":
|
||
return False
|
||
|
||
# 如果有预警指令,不能移动
|
||
if self.current_command == "WARNING":
|
||
return False
|
||
|
||
# 如果有安全停靠指令,不能移动
|
||
if self.current_command == "PARKING":
|
||
return False
|
||
|
||
# 如果是红灯停车状态,不能移动
|
||
if self.is_traffic_light_stop:
|
||
return False
|
||
|
||
# 其他情况可以移动
|
||
return True
|
||
|
||
def log_state(self) -> None:
|
||
"""记录当前车辆状态"""
|
||
print(f"""
|
||
无人车状态:
|
||
- 车辆编号: {self.vehicle_no}
|
||
- 运行状态: {'运行中' if self.is_running else '已停止'}
|
||
- 当前指令: {self.current_command}
|
||
- 红绿灯状态: {self.traffic_light_state}
|
||
- 红灯停车: {'是' if self.is_traffic_light_stop else '否'}
|
||
- 指令优先级: {self.command_priority}
|
||
- 目标速度: {self.target_speed}
|
||
- 制动模式: {self.brake_mode}
|
||
- 指令原因: {self.command_reason}
|
||
- 目标位置: ({self.target_lat}, {self.target_lon})
|
||
""")
|
||
|
||
# 车辆状态管理
|
||
vehicle_states = {}
|
||
for vehicle in unmanned_vehicle_data:
|
||
vehicle_states[str(vehicle["vehicleNo"])] = VehicleState(str(vehicle["vehicleNo"]))
|
||
|
||
# 记录上次更新时间
|
||
last_unmanned_vehicle_update_time = time.time()
|
||
|
||
def check_auth() -> bool:
|
||
"""检查认证"""
|
||
auth_header = request.headers.get('Authorization')
|
||
|
||
if not auth_header:
|
||
print("认证失败: 缺少Authorization头")
|
||
return False
|
||
|
||
# 直接比较完整的Authorization头
|
||
result = auth_header == AUTH_TOKEN
|
||
if not result:
|
||
print(f"认证失败: token不匹配")
|
||
print(f"收到的token: {auth_header}")
|
||
print(f"期望的token: {AUTH_TOKEN}")
|
||
return result
|
||
|
||
class RequestVehicleCommandPayload(TypedDict, total=False):
|
||
vehicleID: str
|
||
commandType: str
|
||
commandReason: str
|
||
signalState: str
|
||
latitude: float
|
||
longitude: float
|
||
transId: str
|
||
|
||
|
||
# 生成符合API规范的完整车辆状态数据
|
||
def generate_comprehensive_vehicle_status(vehicle_id: str) -> dict[str, Any]:
|
||
"""生成符合universal_autonomous_vehicle_api规范的完整车辆状态数据"""
|
||
current_time = time.time()
|
||
timestamp_ms = int(current_time * 1000)
|
||
|
||
# 获取车辆基础数据
|
||
vehicle_data_item = None
|
||
for v in unmanned_vehicle_data:
|
||
if v["vehicleNo"] == vehicle_id:
|
||
vehicle_data_item = v
|
||
break
|
||
|
||
if not vehicle_data_item:
|
||
return {}
|
||
|
||
# 获取车辆状态
|
||
vehicle_state = vehicle_states.get(vehicle_id)
|
||
|
||
# 基础位置和运动数据
|
||
lat_val = to_float_safe(vehicle_data_item.get("latitude"))
|
||
lon_val = to_float_safe(vehicle_data_item.get("longitude"))
|
||
speed_val = to_float_safe(vehicle_data_item.get("speed", 0))
|
||
direction_val = to_float_safe(vehicle_data_item.get("direction", 0))
|
||
|
||
# 生成完整状态数据
|
||
status_data = {
|
||
"vehicleInfo": {
|
||
"vehicleId": vehicle_id
|
||
},
|
||
"operationalStatus": {
|
||
"powerStatus": "ON",
|
||
"systemHealth": "HEALTHY" if vehicle_state and vehicle_state.is_running else "DEGRADED",
|
||
"operationalMode": "AUTONOMOUS",
|
||
"emergencyStatus": "CRITICAL" if vehicle_state and vehicle_state.current_command == "ALERT"
|
||
else "WARNING" if vehicle_state and vehicle_state.current_command == "WARNING"
|
||
else "NORMAL",
|
||
"lastHeartbeat": timestamp_ms
|
||
},
|
||
"controlStatus": {
|
||
"controlMode": "AUTONOMOUS",
|
||
"controlAuthority": "SYSTEM",
|
||
"remoteControlActive": vehicle_state.remoteControlActive if vehicle_state else False
|
||
},
|
||
"motionStatus": {
|
||
"position": {
|
||
"latitude": round(lat_val, 6) if lat_val is not None else 0.0,
|
||
"longitude": round(lon_val, 6) if lon_val is not None else 0.0
|
||
},
|
||
"velocity": {
|
||
"speed": round(speed_val * 1000 / 3600, 2) if speed_val is not None else 0.0, # 转换为 m/s
|
||
"direction": round(math.radians(direction_val), 4) if direction_val is not None else 0.0 # 转换为弧度
|
||
}
|
||
},
|
||
"safetyStatus": {
|
||
"collisionAvoidanceActive": vehicle_state.is_running if vehicle_state else True,
|
||
"emergencyBrakingReady": True,
|
||
"pathPlanningStatus": "ACTIVE" if vehicle_state and vehicle_state.is_running else "INACTIVE",
|
||
"obstacleDetectionStatus": "ACTIVE" if vehicle_state and vehicle_state.is_running else "INACTIVE",
|
||
"minimumRiskManeuverTriggered": vehicle_state.current_command == "ALERT" if vehicle_state else False
|
||
},
|
||
"sensorStatus": {
|
||
"gps": {
|
||
"status": "ACTIVE",
|
||
"accuracy": 0.5, # 0.5米精度
|
||
"lastUpdate": timestamp_ms
|
||
}
|
||
},
|
||
"batteryStatus": {
|
||
"mainBattery": {
|
||
"chargeLevel": 85.5, # 85.5%
|
||
"voltage": 48.2, # 48.2V
|
||
"current": -15.3, # -15.3A (放电)
|
||
"temperature": 35.2, # 35.2°C
|
||
"chargingStatus": "DISCHARGING"
|
||
}
|
||
},
|
||
"communicationStatus": {
|
||
"v2xStatus": "CONNECTED",
|
||
"cellularSignalStrength": -65, # -65dBm
|
||
"wifiStatus": "CONNECTED",
|
||
"cloudConnectivity": "ONLINE"
|
||
},
|
||
"missionContext": {
|
||
"currentMission": {
|
||
"missionId": vehicle_data_item.get("mission_id", "UNKNOWN"),
|
||
"missionType": vehicle_data_item.get("mission_type", "UNKNOWN"),
|
||
"startTime": vehicle_data_item.get("mission_start_time", timestamp_ms),
|
||
"estimatedEndTime": vehicle_data_item.get("mission_start_time", timestamp_ms) + 3600000, # 1小时后
|
||
"progress": round(calculate_mission_progress(vehicle_data_item), 1),
|
||
"totalMileage": round(to_float_safe(vehicle_data_item.get("total_mileage", 0.0)) or 0.0, 1)
|
||
},
|
||
"waypoints": [
|
||
{
|
||
"waypointId": f"{vehicle_id}_START",
|
||
"latitude": round(to_float_safe(vehicle_data_item.get("start_point", {}).get("latitude", 0.0)) or 0.0, 6),
|
||
"longitude": round(to_float_safe(vehicle_data_item.get("start_point", {}).get("longitude", 0.0)) or 0.0, 6),
|
||
"status": "COMPLETED" if not vehicle_data_item.get("moving_to_end", True) else "PENDING"
|
||
},
|
||
{
|
||
"waypointId": f"{vehicle_id}_END",
|
||
"latitude": round(to_float_safe(vehicle_data_item.get("end_point", {}).get("latitude", 0.0)) or 0.0, 6),
|
||
"longitude": round(to_float_safe(vehicle_data_item.get("end_point", {}).get("longitude", 0.0)) or 0.0, 6),
|
||
"status": "COMPLETED" if vehicle_data_item.get("moving_to_end", True) else "PENDING"
|
||
}
|
||
]
|
||
}
|
||
}
|
||
|
||
return status_data
|
||
|
||
def calculate_mission_progress(vehicle_data: dict[str, Any]) -> float:
|
||
"""计算任务进度百分比"""
|
||
try:
|
||
# 获取车辆当前位置
|
||
cur_lat = to_float_safe(vehicle_data.get("latitude"))
|
||
cur_lon = to_float_safe(vehicle_data.get("longitude"))
|
||
|
||
# 获取起点和终点坐标
|
||
start_point = vehicle_data.get("start_point", {})
|
||
end_point = vehicle_data.get("end_point", {})
|
||
|
||
start_lat = to_float_safe(start_point.get("latitude"))
|
||
start_lon = to_float_safe(start_point.get("longitude"))
|
||
end_lat = to_float_safe(end_point.get("latitude"))
|
||
end_lon = to_float_safe(end_point.get("longitude"))
|
||
|
||
if any(v is None for v in [cur_lat, cur_lon, start_lat, start_lon, end_lat, end_lon]):
|
||
return 0.0
|
||
|
||
# 确保所有值都不为None,使用断言来帮助类型检查器理解
|
||
assert cur_lat is not None and cur_lon is not None
|
||
assert start_lat is not None and start_lon is not None
|
||
assert end_lat is not None and end_lon is not None
|
||
|
||
# 现在可以安全地转换为float类型
|
||
start_lat_safe = float(start_lat)
|
||
start_lon_safe = float(start_lon)
|
||
end_lat_safe = float(end_lat)
|
||
end_lon_safe = float(end_lon)
|
||
cur_lat_safe = float(cur_lat)
|
||
cur_lon_safe = float(cur_lon)
|
||
|
||
# 计算起点到终点的总距离
|
||
total_distance = calculate_distance_to_target(
|
||
{"latitude": start_lat_safe, "longitude": start_lon_safe},
|
||
end_lat_safe, end_lon_safe
|
||
)
|
||
|
||
if total_distance == 0:
|
||
return 100.0
|
||
|
||
# 计算当前位置到起点的距离
|
||
distance_from_start = calculate_distance_to_target(
|
||
{"latitude": cur_lat_safe, "longitude": cur_lon_safe},
|
||
start_lat_safe, start_lon_safe
|
||
)
|
||
|
||
# 计算当前位置到终点的距离
|
||
distance_to_end = calculate_distance_to_target(
|
||
{"latitude": cur_lat_safe, "longitude": cur_lon_safe},
|
||
end_lat_safe, end_lon_safe
|
||
)
|
||
|
||
# 根据移动方向计算进度
|
||
if vehicle_data.get("moving_to_end", True):
|
||
# 向终点移动:进度 = (已走距离 / 总距离) * 100
|
||
progress = (distance_from_start / total_distance) * 100.0
|
||
else:
|
||
# 向起点移动:进度 = (1 - 距离起点的距离 / 总距离) * 100 + 100
|
||
progress = 100.0 + ((total_distance - distance_from_start) / total_distance) * 100.0
|
||
|
||
# 确保进度在合理范围内
|
||
return max(0.0, min(200.0, progress))
|
||
|
||
except Exception:
|
||
return 0.0
|
||
|
||
def filter_vehicle_status_fields(status_data: dict[str, Any], fields: str | None) -> dict[str, Any]:
|
||
"""根据fields参数过滤车辆状态字段"""
|
||
if not fields:
|
||
return status_data
|
||
|
||
# 解析fields参数
|
||
requested_fields = [f.strip() for f in fields.split(',')]
|
||
filtered_data = {}
|
||
|
||
# 可用的字段组
|
||
available_fields = {
|
||
"vehicleInfo", "operationalStatus", "controlStatus", "motionStatus",
|
||
"safetyStatus", "sensorStatus", "batteryStatus", "communicationStatus"
|
||
}
|
||
|
||
for field in requested_fields:
|
||
if field in available_fields and field in status_data:
|
||
filtered_data[field] = status_data[field]
|
||
|
||
return filtered_data
|
||
|
||
|
||
# API 端点实现
|
||
|
||
@app.route('/api/VehicleCommandInfo', methods=['POST'])
|
||
def handle_vehicle_command():
|
||
"""车辆控制指令接口"""
|
||
# 提前定义并注解,避免未绑定与类型未知
|
||
data: RequestVehicleCommandPayload = {}
|
||
try:
|
||
data = request.get_json(silent=True) or {}
|
||
vehicle_id = data.get("vehicleID")
|
||
command_type = data.get("commandType", "").upper()
|
||
reason = data.get("commandReason", "").upper()
|
||
signal_state = data.get("signalState", "").upper()
|
||
target_lat = data.get("latitude", None)
|
||
target_lon = data.get("longitude", None)
|
||
|
||
logger.info(f"🎛️ 收到无人车控制指令: vehicle_id={vehicle_id}, type={command_type}, reason={reason}, signal_state={signal_state}")
|
||
logger.info(f"完整请求数据: {data}")
|
||
|
||
# 检查 SIGNAL 类型命令必须包含 signalState
|
||
if command_type == "SIGNAL" and not signal_state:
|
||
logger.warning(f"SIGNAL 类型命令缺少 signalState")
|
||
return jsonify({
|
||
"code": 400,
|
||
"msg": "SIGNAL command must include signalState",
|
||
"transId": data.get("transId"),
|
||
"timestamp": int(time.time() * 1000)
|
||
}), 400
|
||
|
||
# 检查车辆是否存在
|
||
vehicle_state = vehicle_states.get(vehicle_id)
|
||
if not vehicle_state:
|
||
logger.warning(f"未找到无人车: {vehicle_id}")
|
||
return jsonify({
|
||
"code": 404,
|
||
"msg": f"Vehicle {vehicle_id} not found",
|
||
"transId": data.get("transId"),
|
||
"timestamp": int(time.time() * 1000)
|
||
}), 404
|
||
|
||
# 打印当前车辆状态
|
||
logger.info(f"当前无人车状态: vehicle={vehicle_id}")
|
||
vehicle_state.log_state()
|
||
|
||
# 检查指令优先级
|
||
check_command = signal_state if command_type == "SIGNAL" else command_type
|
||
can_override = vehicle_state.can_be_overridden_by(check_command)
|
||
logger.info(f"指令优先级检查: vehicle={vehicle_id}, current_command={str(vehicle_state.current_command)}, new_command={check_command}, can_override={can_override}")
|
||
|
||
if not can_override:
|
||
logger.warning(f"指令优先级过低: vehicle={vehicle_id}, current_priority={vehicle_state.command_priority}, command={check_command}")
|
||
return jsonify({
|
||
"code": 400,
|
||
"msg": "Command priority too low",
|
||
"transId": data.get("transId"),
|
||
"timestamp": int(time.time() * 1000)
|
||
}), 400
|
||
|
||
# 处理不同类型的指令
|
||
if command_type == "SIGNAL":
|
||
# 更新红绿灯状态和指令状态
|
||
vehicle_state.update_command(signal_state, target_lat, target_lon)
|
||
logger.info(f"无人车 {vehicle_id} 更新状态: command={command_type}, traffic_light={vehicle_state.traffic_light_state}")
|
||
return jsonify({
|
||
"code": 200,
|
||
"msg": "Traffic light state updated",
|
||
"transId": data.get("transId"),
|
||
"timestamp": int(time.time() * 1000)
|
||
})
|
||
elif command_type in ["ALERT", "WARNING"]:
|
||
# 查找当前车辆
|
||
current_vehicle = None
|
||
for v in unmanned_vehicle_data:
|
||
if v["vehicleNo"] == vehicle_id:
|
||
current_vehicle = v
|
||
break
|
||
|
||
# 执行告警指令直接停车
|
||
logger.info(f"执行{'紧急' if command_type == 'ALERT' else '正常'}制动: vehicle={vehicle_id}")
|
||
vehicle_state.current_command = command_type
|
||
vehicle_state.command_priority = COMMAND_PRIORITIES.get(command_type, 0)
|
||
vehicle_state.is_running = False
|
||
vehicle_state.target_speed = 0
|
||
vehicle_state.brake_mode = "emergency" if command_type == "ALERT" else "normal"
|
||
vehicle_state.target_lat = target_lat
|
||
vehicle_state.target_lon = target_lon
|
||
|
||
return jsonify({
|
||
"code": 200,
|
||
"msg": "Command executed successfully",
|
||
"transId": data.get("transId"),
|
||
"timestamp": int(time.time() * 1000)
|
||
})
|
||
|
||
elif command_type == "RESUME":
|
||
logger.info(f"执行恢复指令: vehicle_id={vehicle_id}")
|
||
# 检查当前车辆
|
||
current_vehicle = None
|
||
for v in unmanned_vehicle_data:
|
||
if v["vehicleNo"] == vehicle_id:
|
||
current_vehicle = v
|
||
break
|
||
|
||
# 清除限制性指令
|
||
if vehicle_state.current_command in ["ALERT", "WARNING"]:
|
||
logger.info(f"清除限制性指令: vehicle={vehicle_id}")
|
||
vehicle_state.current_command = None
|
||
vehicle_state.command_priority = 0
|
||
vehicle_state.is_running = True
|
||
vehicle_state.target_speed = DEFAULT_VEHICLE_SPEED
|
||
vehicle_state.brake_mode = None
|
||
vehicle_state.target_lat = None
|
||
vehicle_state.target_lon = None
|
||
|
||
# 更新车辆运行状态
|
||
vehicle_state.is_running = vehicle_state.can_move()
|
||
if vehicle_state.is_running and current_vehicle:
|
||
logger.info(f"无人车 {vehicle_id} 恢复运行")
|
||
|
||
# 记录状态变化但不更新指令
|
||
vehicle_state.command_reason = reason
|
||
vehicle_state.last_command_time = time.time()
|
||
|
||
logger.info(f"Vehicle {vehicle_id} state updated: running={vehicle_state.is_running}, "
|
||
f"command={vehicle_state.current_command}, traffic_light={vehicle_state.traffic_light_state}, "
|
||
f"reason={reason}, priority={vehicle_state.command_priority}, "
|
||
f"target_speed={vehicle_state.target_speed}, brake_mode={vehicle_state.brake_mode}")
|
||
|
||
return jsonify({
|
||
"code": 200,
|
||
"msg": "Command executed successfully",
|
||
"transId": data.get("transId"),
|
||
"timestamp": int(time.time() * 1000)
|
||
})
|
||
|
||
return jsonify({
|
||
"code": 200,
|
||
"msg": "Command executed successfully",
|
||
"transId": data.get("transId"),
|
||
"timestamp": int(time.time() * 1000)
|
||
})
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error handling vehicle command: {str(e)}")
|
||
return jsonify({
|
||
"code": 500,
|
||
"msg": str(e),
|
||
"transId": data.get("transId", ""),
|
||
"timestamp": int(time.time() * 1000)
|
||
}), 500
|
||
|
||
|
||
|
||
@app.route('/api/v1/vehicles/<vehicle_id>/status', methods=['GET', 'OPTIONS'])
|
||
def get_vehicle_status_universal(vehicle_id):
|
||
"""通用无人车状态上报接口 - 符合universal_autonomous_vehicle_api规范"""
|
||
if request.method == 'OPTIONS':
|
||
return '', 204
|
||
|
||
logger.info(f"🔍 收到无人车状态查询请求: vehicleId={vehicle_id}")
|
||
|
||
# 检查vehicleId格式(字母数字 3~20 位)
|
||
if not vehicle_id or len(vehicle_id) < 3 or len(vehicle_id) > 20:
|
||
return jsonify({
|
||
"code": 400,
|
||
"message": "Bad Request",
|
||
"timestamp": int(time.time() * 1000),
|
||
"error": {
|
||
"type": "VALIDATION_ERROR",
|
||
"details": "vehicleId must be 3-20 alphanumeric characters",
|
||
"field": "vehicleId"
|
||
}
|
||
}), 400
|
||
|
||
# 检查车辆是否存在
|
||
vehicle_exists = False
|
||
for v in unmanned_vehicle_data:
|
||
if v["vehicleNo"] == vehicle_id:
|
||
vehicle_exists = True
|
||
break
|
||
|
||
if not vehicle_exists:
|
||
return jsonify({
|
||
"code": 404,
|
||
"message": "Not Found",
|
||
"timestamp": int(time.time() * 1000),
|
||
"error": {
|
||
"type": "RESOURCE_NOT_FOUND",
|
||
"details": f"Vehicle {vehicle_id} not found",
|
||
"field": "vehicleId"
|
||
}
|
||
}), 404
|
||
|
||
try:
|
||
# 更新车辆位置数据
|
||
current_time = time.time()
|
||
global last_unmanned_vehicle_update_time
|
||
elapsed_time = current_time - last_unmanned_vehicle_update_time
|
||
if elapsed_time >= UPDATE_INTERVAL:
|
||
for vehicle in unmanned_vehicle_data:
|
||
update_vehicle_position(vehicle, UPDATE_INTERVAL)
|
||
vehicle["time"] = int(current_time * 1000)
|
||
last_unmanned_vehicle_update_time = current_time
|
||
|
||
# 生成完整的车辆状态数据
|
||
status_data = generate_comprehensive_vehicle_status(vehicle_id)
|
||
if not status_data:
|
||
return jsonify({
|
||
"code": 500,
|
||
"message": "Internal Server Error",
|
||
"timestamp": int(time.time() * 1000),
|
||
"error": {
|
||
"type": "DATA_ERROR",
|
||
"details": "Failed to generate vehicle status data",
|
||
"field": None
|
||
}
|
||
}), 500
|
||
|
||
# 处理fields查询参数
|
||
fields = request.args.get('fields')
|
||
if fields:
|
||
status_data = filter_vehicle_status_fields(status_data, fields)
|
||
|
||
# 返回成功响应
|
||
battery_level = status_data.get('batteryStatus', {}).get('mainBattery', {}).get('chargeLevel', '未知')
|
||
logger.info(f"🚗 返回无人车状态数据: vehicleId={vehicle_id}, 电量={battery_level}%")
|
||
return jsonify({
|
||
"code": 200,
|
||
"message": "success",
|
||
"timestamp": int(time.time() * 1000),
|
||
"data": status_data
|
||
})
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error in get_vehicle_status_universal: {str(e)}")
|
||
return jsonify({
|
||
"code": 500,
|
||
"message": "Internal Server Error",
|
||
"timestamp": int(time.time() * 1000),
|
||
"error": {
|
||
"type": "SERVER_ERROR",
|
||
"details": str(e),
|
||
"field": None
|
||
}
|
||
}), 500
|
||
|
||
|
||
# 设置CORS
|
||
@app.after_request
|
||
def add_cors_headers(response):
|
||
response.headers['Access-Control-Allow-Origin'] = '*'
|
||
response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization'
|
||
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, OPTIONS'
|
||
response.headers['Access-Control-Max-Age'] = '3600'
|
||
return response
|
||
|
||
if __name__ == '__main__':
|
||
logger.info("🚗 无人车厂商平台模拟服务启动中...")
|
||
logger.info("📍 服务端点:")
|
||
logger.info(" - 车辆控制指令: POST /api/VehicleCommandInfo")
|
||
logger.info(" - 通用状态接口: GET /api/v1/vehicles/<vehicle_id>/status")
|
||
logger.info(" - 车辆状态获取: GET /openApi/getVehicleStatus")
|
||
logger.info(f"🌐 服务地址: http://localhost:8091")
|
||
logger.info("🔐 认证Token: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9...")
|
||
|
||
app.run(host='localhost', port=8091, debug=True) |