QDAirPortBackend0122/tools/mock_unmanned_vehicle.py
2026-01-22 13:19:47 +08:00

957 lines
37 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
无人车厂商平台模拟服务
Unmanned Vehicle Manufacturer Platform Mock Server
提供无人车位置上报、状态查询、控制指令等API接口
模拟真实的无人车厂商平台与机场管理系统的交互
"""
from flask import Flask, jsonify, request
import time
import math
import logging
import os
from typing import Any, Literal, final, TypedDict
# 导入统一的日志配置
import sys
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from logging_config import setup_logger
# 设置当前模块的日志记录器
logger = setup_logger(
name='mock_unmanned_vehicle',
log_file='../logs/mock_unmanned_vehicle.log',
max_bytes=10*1024*1024, # 10MB
backup_count=5
)
app = Flask(__name__)
# 为位置与移动对象定义类型,避免使用 Any
class Point(TypedDict):
latitude: float
longitude: float
# 必填字段
class MovingObjectRequired(TypedDict):
latitude: float
longitude: float
time: int
speed: float
moving_to_end: bool
start_point: Point
end_point: Point
# 可选字段total=False
class MovingObject(MovingObjectRequired, total=False):
vehicleNo: str
wait_until: float
direction: float
# 地球半径(米)
EARTH_RADIUS = 6378137.0
# 指令优先级配置
COMMAND_PRIORITIES = {
"ALERT": 5,
"WARNING": 3,
"PARKING": 2,
"RESUME": 1
}
# 距离配置(米)
DIST_300M = 300
DIST_200M = 200
DIST_150M = 150
DIST_100M = 100
DIST_50M = 50
# 时间配置(秒)
WAIT_TIME_AFTER_RETURN = 0.1 # 返回起点后的等待时间
UPDATE_INTERVAL = 1.0 # 位置更新间隔, 默认 1 秒
# 无人车路径配置
# 无人车A 鲁B567 路径
UNMANNED_A_START = {"longitude": 120.083084, "latitude": 36.369696} # 无人车A起点
UNMANNED_A_END = {"longitude": 120.084637, "latitude": 36.365617} # 无人车A终点
# 无人车B 鲁B579 路径
UNMANNED_B_START = {"longitude": 120.086965, "latitude": 36.368599} # 无人车B起点
UNMANNED_B_END = {"longitude": 120.086263, "latitude": 36.370484} # 无人车B终点
# 车辆配置
DEFAULT_VEHICLE_SPEED = 30.0 # km/h默认速度
EMERGENCY_BRAKE_DECELERATION = 0.8 # 紧急制动减速度 (每次更新减速 80%)
NORMAL_BRAKE_DECELERATION = 0.2 # 正常制动减速度 (每次更新减速 20%)
# 认证相关配置
AUTH_USERNAME = "dianxin"
AUTH_PASSWORD = "dianxin@123"
AUTH_TOKEN = "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3MzI3ODMwOTAsInVzZXJuYW1lIjoiYWRtaW4ifQ.y9feEL_9NT8UzED9NNkb0Ln6C-PBoufiSHWobWe5vWY"
def meters_to_degrees(meters: float, latitude: float) -> tuple[float, float]:
"""
将米转换为度,考虑纬度的影响
"""
# 纬度方向1度 = 111,319.9米
meters_per_deg_lat: float = 111319.9
# 经度方向1度 = 111,319.9 * cos(latitude)米
meters_per_deg_lon: float = meters_per_deg_lat * math.cos(math.radians(latitude))
return meters / meters_per_deg_lat, meters / meters_per_deg_lon
def to_float_safe(x: object) -> float | None:
"""
安全将值转换为 float:
- 允许 int/float
- 允许非布尔的数字字符串
- 允许 dict优先取 'value''longitude''latitude' 等常见键
- 无法解析则返回 None
"""
# 排除布尔值bool 是 int 的子类,但不应当被视为数值)
if isinstance(x, bool):
return None
if isinstance(x, (int, float)):
return float(x)
if isinstance(x, str):
try:
return float(x.strip())
except Exception:
return None
if isinstance(x, dict):
for key in ("value", "longitude", "latitude"):
if key in x:
return to_float_safe(x[key])
return None
return None
def calculate_distance_to_target(vehicle: dict[str, float], target_lat: float, target_lon: float) -> float:
"""计算车辆到目标位的距离(米)"""
# 转换为弧度
lat1_rad = math.radians(vehicle["latitude"])
lon1_rad = math.radians(vehicle["longitude"])
lat2_rad = math.radians(target_lat)
lon2_rad = math.radians(target_lon)
dlat = lat2_rad - lat1_rad
dlon = lon2_rad - lon1_rad
# 使用 Haversine 公式计算距离
a = math.sin(dlat/2) * math.sin(dlat/2) + \
math.cos(lat1_rad) * math.cos(lat2_rad) * \
math.sin(dlon/2) * math.sin(dlon/2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
return EARTH_RADIUS * c
def update_position_with_vector(
obj: dict[str, Any],
target_point: "Point",
start_point: "Point",
speed: float,
elapsed_time: float,
return_to_start: bool = False
) -> bool:
"""
基于向量的位置更新逻辑
Args:
obj: 需要更新位置的对象(车辆)
target_point: 目标点坐标 {"latitude": float, "longitude": float}
start_point: 起点坐标 {"latitude": float, "longitude": float}
speed: 移动速度km/h
elapsed_time: 经过的时间(秒)
return_to_start: 是否在到达目标点时返回起点
"""
# 检查是否在等待状态
if "wait_until" not in obj:
obj["wait_until"] = 0.0
current_time = time.time()
# 基于类型安全读取 wait_until避免 float 与 object 比较
_wu = to_float_safe(obj.get("wait_until"))
wait_until = 0.0 if _wu is None else float(_wu)
if current_time < wait_until:
return False # 还在等待中,不更新位置
# 计算这一步要移动的距离(米)并转换为经纬度
speed_mps = float(speed) * 1000.0 / 3600.0
distance = speed_mps * float(elapsed_time)
# 累计里程到总里程中
if "total_mileage" not in obj:
obj["total_mileage"] = 0.0
obj["total_mileage"] = to_float_safe(obj.get("total_mileage", 0.0)) or 0.0
obj["total_mileage"] += distance
# 安全读取并转换当前位置与目标点/起点经纬度为 float
cur_lat = to_float_safe(obj.get("latitude"))
cur_lon = to_float_safe(obj.get("longitude"))
tgt_lat = to_float_safe(target_point.get("latitude"))
tgt_lon = to_float_safe(target_point.get("longitude"))
if cur_lat is None or cur_lon is None or tgt_lat is None or tgt_lon is None:
# 无法解析经纬度,跳过更新
return False
# 将移动距离转换为经纬度变化量
move_dlat, move_dlon = meters_to_degrees(distance, cur_lat)
# 将5米的判断距离转换为经纬度优化从15米减少到5米减少颤抖
check_dlat, check_dlon = meters_to_degrees(5.0, cur_lat)
# 计算当前位置到终点的向量(经纬度空间)
vector_lat = float(tgt_lat) - float(cur_lat)
vector_lon = float(tgt_lon) - float(cur_lon)
# 计算向量长度(经纬度空间)
vector_length = math.sqrt(vector_lat * vector_lat + vector_lon * vector_lon)
# 检查是否到达终点
if vector_length <= math.sqrt(check_dlat * check_dlat + check_dlon * check_dlon):
# 精确设置到目标点位置,避免位置偏差
obj["latitude"] = float(tgt_lat)
obj["longitude"] = float(tgt_lon)
if return_to_start:
# 返回起点并设置等待时间
obj["wait_until"] = current_time + WAIT_TIME_AFTER_RETURN
print(f"无人车 {obj.get('vehicleNo')} 返回起点,等待{WAIT_TIME_AFTER_RETURN}秒后继续")
else:
# 设置短暂等待时间,避免立即切换方向导致的颤抖
obj["wait_until"] = current_time + 0.1 # 0.1秒的短暂等待
print(f"无人车 {obj.get('vehicleNo')} 到达目标点等待0.1秒后切换方向")
return True # 表示已到达终点
else:
# 归一化方向向量
unit_lat = vector_lat / vector_length
unit_lon = vector_lon / vector_length
# 计算这一步的位置变化
dlat = move_dlat * unit_lat
dlon = move_dlon * unit_lon
# 更新位置(写回为 float
obj["latitude"] = float(cur_lat + dlat)
obj["longitude"] = float(cur_lon + dlon)
return False # 表示正在移动中
def update_object_position(obj: dict[str, Any], elapsed_time: float) -> None:
"""统一的位置更新函数 - 用于无人车的往复运动"""
# 获取对象类型和标识
obj_id = obj.get("vehicleNo")
# 根据移动方向确定目标点
if obj["moving_to_end"]:
target_raw = obj["end_point"]
start_raw = obj["start_point"]
else:
target_raw = obj["start_point"]
start_raw = obj["end_point"]
# 构造类型安全的 Point
t_lat = to_float_safe(target_raw.get("latitude")) if isinstance(target_raw, dict) else None
t_lon = to_float_safe(target_raw.get("longitude")) if isinstance(target_raw, dict) else None
s_lat = to_float_safe(start_raw.get("latitude")) if isinstance(start_raw, dict) else None
s_lon = to_float_safe(start_raw.get("longitude")) if isinstance(start_raw, dict) else None
if t_lat is None or t_lon is None or s_lat is None or s_lon is None:
# 无法解析坐标,直接更新时间戳后返回
obj["time"] = int(time.time() * 1000)
return
target_point: Point = {"latitude": float(t_lat), "longitude": float(t_lon)}
start_point: Point = {"latitude": float(s_lat), "longitude": float(s_lon)}
# 读取速度为 float
spd = to_float_safe(obj.get("speed"))
if spd is None:
spd = 0.0
# 更新位置
reached_target = update_position_with_vector(
obj,
target_point,
start_point,
float(spd),
float(elapsed_time),
return_to_start=False
)
if reached_target:
# 切换移动方向
obj["moving_to_end"] = not obj["moving_to_end"]
target_name = "终点" if not obj["moving_to_end"] else "起点"
print(f"无人车 {obj_id} 到达{'终点' if obj['moving_to_end'] else '起点'},开始前往{target_name}")
# 更新时间戳
obj["time"] = int(time.time() * 1000)
def update_vehicle_position(vehicle: dict[str, Any], elapsed_time: float) -> None:
"""更新无人车位置 - 使用统一的位置更新函数"""
update_object_position(vehicle, elapsed_time)
# 无人车数据配置
current_timestamp = time.time()
unmanned_vehicle_data = [
{
"vehicleNo": "鲁B567", # 无人车A
"longitude": UNMANNED_A_START["longitude"],
"latitude": UNMANNED_A_START["latitude"],
"time": int(current_timestamp * 1000),
"direction": 90.0, # 90度
"speed": 25.0, # 25km/h
"start_point": UNMANNED_A_START, # 起点
"end_point": UNMANNED_A_END, # 终点
"moving_to_end": True, # 当前是否向终点移动
# 任务和里程相关字段
"total_mileage": 0.0, # 总里程(米)
"mission_start_time": int(current_timestamp * 1000), # 任务开始时间
"mission_id": "MISSION_A_001", # 任务ID
"mission_type": "PATROL_TRANSPORT" # 任务类型:巡逻运输
},
{
"vehicleNo": "鲁B579", # 无人车B
"longitude": UNMANNED_B_START["longitude"],
"latitude": UNMANNED_B_START["latitude"],
"time": int(current_timestamp * 1000),
"direction": 270.0, # 270度
"speed": 25.0, # 25km/h
"start_point": UNMANNED_B_START, # 起点
"end_point": UNMANNED_B_END, # 终点
"moving_to_end": True, # 当前是否向终点移动
# 任务和里程相关字段
"total_mileage": 0.0, # 总里程(米)
"mission_start_time": int(current_timestamp * 1000), # 任务开始时间
"mission_id": "MISSION_B_001", # 任务ID
"mission_type": "CARGO_TRANSPORT" # 任务类型:货物运输
}
]
# 添加车辆状态类
@final
class VehicleState:
vehicle_no: str
is_running: bool
current_command: str | None
command_priority: int
target_speed: float
brake_mode: Literal['emergency', 'normal'] | None
command_reason: str | None
last_command_time: float
traffic_light_state: str | None
target_lat: float | None
target_lon: float | None
is_traffic_light_stop: bool
def __init__(self, vehicle_no: str) -> None:
self.vehicle_no = vehicle_no
self.is_running = True # 运行状态
self.current_command = None # 当前执行的指令
self.command_priority = 0 # 当前指令优先级
self.target_speed = float(DEFAULT_VEHICLE_SPEED) # 目标速度
self.brake_mode = None # 制动模式:'emergency' 或 'normal'
self.command_reason = None # 指令原因
self.last_command_time = float(time.time()) # 最后一次指令时间
self.traffic_light_state = None # 当前红绿灯状态
self.target_lat = None # 目标纬度
self.target_lon = None # 目标经度
self.is_traffic_light_stop = False # 是否因红灯停车
self.remoteControlActive = False # 远程控制是否激活
def can_be_overridden_by(self, command_type: str) -> bool:
"""判断当前指令是否可以被新指令覆盖"""
# 红绿灯信号不参与指令优先级判断
if command_type in ["RED", "GREEN", "YELLOW"]:
return True
# 类型安全获取优先级,避免 Unknown | None 作为 dict key
new_priority = COMMAND_PRIORITIES.get(str(command_type), 0)
current_key = self.current_command if isinstance(self.current_command, str) else None
current_priority = COMMAND_PRIORITIES.get(current_key or "", 0)
# 相同类型的指令可以覆盖(因为需要持续发送)
if isinstance(self.current_command, str) and command_type == self.current_command:
return True
# ALERT 指令可以被 RESUME 解除
if self.current_command == "ALERT":
return command_type == "RESUME"
# WARNING 指令可以被 RESUME 或相同及更高优先级指令覆盖
if self.current_command == "WARNING":
return command_type == "RESUME" or new_priority >= current_priority
# 其他情况下,相同或更高优先级可以覆盖
return new_priority >= current_priority
def update_command(self, command_type: str, target_lat: float | None = None, target_lon: float | None = None) -> None:
"""更新指令状态"""
# 更新目标位置
if target_lat is not None and target_lon is not None:
self.target_lat = target_lat
self.target_lon = target_lon
# 如果是红绿灯状态,只更新状态,不影响其他指令
if command_type in ["RED", "GREEN", "YELLOW"]:
old_state = self.traffic_light_state
self.traffic_light_state = command_type
# 更新红灯停车状态
if command_type == "RED":
self.is_traffic_light_stop = True
elif command_type in ["GREEN", "YELLOW"]:
self.is_traffic_light_stop = False
print(f"无人车 {self.vehicle_no} 收到红绿灯信号: {command_type} (原状态: {old_state})")
else:
# 其他指令正常更新
self.current_command = command_type
self.command_priority = COMMAND_PRIORITIES.get(command_type, 0)
# RESUME 指令不清除红绿灯状态,红绿灯状态由信号灯独立控制
if command_type == "RESUME":
# 清除其他状态
self.brake_mode = None
self.command_reason = None
# 更新运行状态
self.is_running = self.can_move()
print(f"更新无人车 {self.vehicle_no} 状态: command={self.current_command}, traffic_light={self.traffic_light_state}, priority={self.command_priority}, is_running={self.is_running}")
def can_move(self) -> bool:
"""检查车辆是否可以移动"""
# 如果有告警指令,不能移动
if self.current_command == "ALERT":
return False
# 如果有预警指令,不能移动
if self.current_command == "WARNING":
return False
# 如果有安全停靠指令,不能移动
if self.current_command == "PARKING":
return False
# 如果是红灯停车状态,不能移动
if self.is_traffic_light_stop:
return False
# 其他情况可以移动
return True
def log_state(self) -> None:
"""记录当前车辆状态"""
print(f"""
无人车状态:
- 车辆编号: {self.vehicle_no}
- 运行状态: {'运行中' if self.is_running else '已停止'}
- 当前指令: {self.current_command}
- 红绿灯状态: {self.traffic_light_state}
- 红灯停车: {'' if self.is_traffic_light_stop else ''}
- 指令优先级: {self.command_priority}
- 目标速度: {self.target_speed}
- 制动模式: {self.brake_mode}
- 指令原因: {self.command_reason}
- 目标位置: ({self.target_lat}, {self.target_lon})
""")
# 车辆状态管理
vehicle_states = {}
for vehicle in unmanned_vehicle_data:
vehicle_states[str(vehicle["vehicleNo"])] = VehicleState(str(vehicle["vehicleNo"]))
# 记录上次更新时间
last_unmanned_vehicle_update_time = time.time()
def check_auth() -> bool:
"""检查认证"""
auth_header = request.headers.get('Authorization')
if not auth_header:
print("认证失败: 缺少Authorization头")
return False
# 直接比较完整的Authorization头
result = auth_header == AUTH_TOKEN
if not result:
print(f"认证失败: token不匹配")
print(f"收到的token: {auth_header}")
print(f"期望的token: {AUTH_TOKEN}")
return result
class RequestVehicleCommandPayload(TypedDict, total=False):
vehicleID: str
commandType: str
commandReason: str
signalState: str
latitude: float
longitude: float
transId: str
# 生成符合API规范的完整车辆状态数据
def generate_comprehensive_vehicle_status(vehicle_id: str) -> dict[str, Any]:
"""生成符合universal_autonomous_vehicle_api规范的完整车辆状态数据"""
current_time = time.time()
timestamp_ms = int(current_time * 1000)
# 获取车辆基础数据
vehicle_data_item = None
for v in unmanned_vehicle_data:
if v["vehicleNo"] == vehicle_id:
vehicle_data_item = v
break
if not vehicle_data_item:
return {}
# 获取车辆状态
vehicle_state = vehicle_states.get(vehicle_id)
# 基础位置和运动数据
lat_val = to_float_safe(vehicle_data_item.get("latitude"))
lon_val = to_float_safe(vehicle_data_item.get("longitude"))
speed_val = to_float_safe(vehicle_data_item.get("speed", 0))
direction_val = to_float_safe(vehicle_data_item.get("direction", 0))
# 生成完整状态数据
status_data = {
"vehicleInfo": {
"vehicleId": vehicle_id
},
"operationalStatus": {
"powerStatus": "ON",
"systemHealth": "HEALTHY" if vehicle_state and vehicle_state.is_running else "DEGRADED",
"operationalMode": "AUTONOMOUS",
"emergencyStatus": "CRITICAL" if vehicle_state and vehicle_state.current_command == "ALERT"
else "WARNING" if vehicle_state and vehicle_state.current_command == "WARNING"
else "NORMAL",
"lastHeartbeat": timestamp_ms
},
"controlStatus": {
"controlMode": "AUTONOMOUS",
"controlAuthority": "SYSTEM",
"remoteControlActive": vehicle_state.remoteControlActive if vehicle_state else False
},
"motionStatus": {
"position": {
"latitude": round(lat_val, 6) if lat_val is not None else 0.0,
"longitude": round(lon_val, 6) if lon_val is not None else 0.0
},
"velocity": {
"speed": round(speed_val * 1000 / 3600, 2) if speed_val is not None else 0.0, # 转换为 m/s
"direction": round(math.radians(direction_val), 4) if direction_val is not None else 0.0 # 转换为弧度
}
},
"safetyStatus": {
"collisionAvoidanceActive": vehicle_state.is_running if vehicle_state else True,
"emergencyBrakingReady": True,
"pathPlanningStatus": "ACTIVE" if vehicle_state and vehicle_state.is_running else "INACTIVE",
"obstacleDetectionStatus": "ACTIVE" if vehicle_state and vehicle_state.is_running else "INACTIVE",
"minimumRiskManeuverTriggered": vehicle_state.current_command == "ALERT" if vehicle_state else False
},
"sensorStatus": {
"gps": {
"status": "ACTIVE",
"accuracy": 0.5, # 0.5米精度
"lastUpdate": timestamp_ms
}
},
"batteryStatus": {
"mainBattery": {
"chargeLevel": 85.5, # 85.5%
"voltage": 48.2, # 48.2V
"current": -15.3, # -15.3A (放电)
"temperature": 35.2, # 35.2°C
"chargingStatus": "DISCHARGING"
}
},
"communicationStatus": {
"v2xStatus": "CONNECTED",
"cellularSignalStrength": -65, # -65dBm
"wifiStatus": "CONNECTED",
"cloudConnectivity": "ONLINE"
},
"missionContext": {
"currentMission": {
"missionId": vehicle_data_item.get("mission_id", "UNKNOWN"),
"missionType": vehicle_data_item.get("mission_type", "UNKNOWN"),
"startTime": vehicle_data_item.get("mission_start_time", timestamp_ms),
"estimatedEndTime": vehicle_data_item.get("mission_start_time", timestamp_ms) + 3600000, # 1小时后
"progress": round(calculate_mission_progress(vehicle_data_item), 1),
"totalMileage": round(to_float_safe(vehicle_data_item.get("total_mileage", 0.0)) or 0.0, 1)
},
"waypoints": [
{
"waypointId": f"{vehicle_id}_START",
"latitude": round(to_float_safe(vehicle_data_item.get("start_point", {}).get("latitude", 0.0)) or 0.0, 6),
"longitude": round(to_float_safe(vehicle_data_item.get("start_point", {}).get("longitude", 0.0)) or 0.0, 6),
"status": "COMPLETED" if not vehicle_data_item.get("moving_to_end", True) else "PENDING"
},
{
"waypointId": f"{vehicle_id}_END",
"latitude": round(to_float_safe(vehicle_data_item.get("end_point", {}).get("latitude", 0.0)) or 0.0, 6),
"longitude": round(to_float_safe(vehicle_data_item.get("end_point", {}).get("longitude", 0.0)) or 0.0, 6),
"status": "COMPLETED" if vehicle_data_item.get("moving_to_end", True) else "PENDING"
}
]
}
}
return status_data
def calculate_mission_progress(vehicle_data: dict[str, Any]) -> float:
"""计算任务进度百分比"""
try:
# 获取车辆当前位置
cur_lat = to_float_safe(vehicle_data.get("latitude"))
cur_lon = to_float_safe(vehicle_data.get("longitude"))
# 获取起点和终点坐标
start_point = vehicle_data.get("start_point", {})
end_point = vehicle_data.get("end_point", {})
start_lat = to_float_safe(start_point.get("latitude"))
start_lon = to_float_safe(start_point.get("longitude"))
end_lat = to_float_safe(end_point.get("latitude"))
end_lon = to_float_safe(end_point.get("longitude"))
if any(v is None for v in [cur_lat, cur_lon, start_lat, start_lon, end_lat, end_lon]):
return 0.0
# 确保所有值都不为None使用断言来帮助类型检查器理解
assert cur_lat is not None and cur_lon is not None
assert start_lat is not None and start_lon is not None
assert end_lat is not None and end_lon is not None
# 现在可以安全地转换为float类型
start_lat_safe = float(start_lat)
start_lon_safe = float(start_lon)
end_lat_safe = float(end_lat)
end_lon_safe = float(end_lon)
cur_lat_safe = float(cur_lat)
cur_lon_safe = float(cur_lon)
# 计算起点到终点的总距离
total_distance = calculate_distance_to_target(
{"latitude": start_lat_safe, "longitude": start_lon_safe},
end_lat_safe, end_lon_safe
)
if total_distance == 0:
return 100.0
# 计算当前位置到起点的距离
distance_from_start = calculate_distance_to_target(
{"latitude": cur_lat_safe, "longitude": cur_lon_safe},
start_lat_safe, start_lon_safe
)
# 计算当前位置到终点的距离
distance_to_end = calculate_distance_to_target(
{"latitude": cur_lat_safe, "longitude": cur_lon_safe},
end_lat_safe, end_lon_safe
)
# 根据移动方向计算进度
if vehicle_data.get("moving_to_end", True):
# 向终点移动:进度 = (已走距离 / 总距离) * 100
progress = (distance_from_start / total_distance) * 100.0
else:
# 向起点移动:进度 = (1 - 距离起点的距离 / 总距离) * 100 + 100
progress = 100.0 + ((total_distance - distance_from_start) / total_distance) * 100.0
# 确保进度在合理范围内
return max(0.0, min(200.0, progress))
except Exception:
return 0.0
def filter_vehicle_status_fields(status_data: dict[str, Any], fields: str | None) -> dict[str, Any]:
"""根据fields参数过滤车辆状态字段"""
if not fields:
return status_data
# 解析fields参数
requested_fields = [f.strip() for f in fields.split(',')]
filtered_data = {}
# 可用的字段组
available_fields = {
"vehicleInfo", "operationalStatus", "controlStatus", "motionStatus",
"safetyStatus", "sensorStatus", "batteryStatus", "communicationStatus"
}
for field in requested_fields:
if field in available_fields and field in status_data:
filtered_data[field] = status_data[field]
return filtered_data
# API 端点实现
@app.route('/api/VehicleCommandInfo', methods=['POST'])
def handle_vehicle_command():
"""车辆控制指令接口"""
# 提前定义并注解,避免未绑定与类型未知
data: RequestVehicleCommandPayload = {}
try:
data = request.get_json(silent=True) or {}
vehicle_id = data.get("vehicleID")
command_type = data.get("commandType", "").upper()
reason = data.get("commandReason", "").upper()
signal_state = data.get("signalState", "").upper()
target_lat = data.get("latitude", None)
target_lon = data.get("longitude", None)
logger.info(f"🎛️ 收到无人车控制指令: vehicle_id={vehicle_id}, type={command_type}, reason={reason}, signal_state={signal_state}")
logger.info(f"完整请求数据: {data}")
# 检查 SIGNAL 类型命令必须包含 signalState
if command_type == "SIGNAL" and not signal_state:
logger.warning(f"SIGNAL 类型命令缺少 signalState")
return jsonify({
"code": 400,
"msg": "SIGNAL command must include signalState",
"transId": data.get("transId"),
"timestamp": int(time.time() * 1000)
}), 400
# 检查车辆是否存在
vehicle_state = vehicle_states.get(vehicle_id)
if not vehicle_state:
logger.warning(f"未找到无人车: {vehicle_id}")
return jsonify({
"code": 404,
"msg": f"Vehicle {vehicle_id} not found",
"transId": data.get("transId"),
"timestamp": int(time.time() * 1000)
}), 404
# 打印当前车辆状态
logger.info(f"当前无人车状态: vehicle={vehicle_id}")
vehicle_state.log_state()
# 检查指令优先级
check_command = signal_state if command_type == "SIGNAL" else command_type
can_override = vehicle_state.can_be_overridden_by(check_command)
logger.info(f"指令优先级检查: vehicle={vehicle_id}, current_command={str(vehicle_state.current_command)}, new_command={check_command}, can_override={can_override}")
if not can_override:
logger.warning(f"指令优先级过低: vehicle={vehicle_id}, current_priority={vehicle_state.command_priority}, command={check_command}")
return jsonify({
"code": 400,
"msg": "Command priority too low",
"transId": data.get("transId"),
"timestamp": int(time.time() * 1000)
}), 400
# 处理不同类型的指令
if command_type == "SIGNAL":
# 更新红绿灯状态和指令状态
vehicle_state.update_command(signal_state, target_lat, target_lon)
logger.info(f"无人车 {vehicle_id} 更新状态: command={command_type}, traffic_light={vehicle_state.traffic_light_state}")
return jsonify({
"code": 200,
"msg": "Traffic light state updated",
"transId": data.get("transId"),
"timestamp": int(time.time() * 1000)
})
elif command_type in ["ALERT", "WARNING"]:
# 查找当前车辆
current_vehicle = None
for v in unmanned_vehicle_data:
if v["vehicleNo"] == vehicle_id:
current_vehicle = v
break
# 执行告警指令直接停车
logger.info(f"执行{'紧急' if command_type == 'ALERT' else '正常'}制动: vehicle={vehicle_id}")
vehicle_state.current_command = command_type
vehicle_state.command_priority = COMMAND_PRIORITIES.get(command_type, 0)
vehicle_state.is_running = False
vehicle_state.target_speed = 0
vehicle_state.brake_mode = "emergency" if command_type == "ALERT" else "normal"
vehicle_state.target_lat = target_lat
vehicle_state.target_lon = target_lon
return jsonify({
"code": 200,
"msg": "Command executed successfully",
"transId": data.get("transId"),
"timestamp": int(time.time() * 1000)
})
elif command_type == "RESUME":
logger.info(f"执行恢复指令: vehicle_id={vehicle_id}")
# 检查当前车辆
current_vehicle = None
for v in unmanned_vehicle_data:
if v["vehicleNo"] == vehicle_id:
current_vehicle = v
break
# 清除限制性指令
if vehicle_state.current_command in ["ALERT", "WARNING"]:
logger.info(f"清除限制性指令: vehicle={vehicle_id}")
vehicle_state.current_command = None
vehicle_state.command_priority = 0
vehicle_state.is_running = True
vehicle_state.target_speed = DEFAULT_VEHICLE_SPEED
vehicle_state.brake_mode = None
vehicle_state.target_lat = None
vehicle_state.target_lon = None
# 更新车辆运行状态
vehicle_state.is_running = vehicle_state.can_move()
if vehicle_state.is_running and current_vehicle:
logger.info(f"无人车 {vehicle_id} 恢复运行")
# 记录状态变化但不更新指令
vehicle_state.command_reason = reason
vehicle_state.last_command_time = time.time()
logger.info(f"Vehicle {vehicle_id} state updated: running={vehicle_state.is_running}, "
f"command={vehicle_state.current_command}, traffic_light={vehicle_state.traffic_light_state}, "
f"reason={reason}, priority={vehicle_state.command_priority}, "
f"target_speed={vehicle_state.target_speed}, brake_mode={vehicle_state.brake_mode}")
return jsonify({
"code": 200,
"msg": "Command executed successfully",
"transId": data.get("transId"),
"timestamp": int(time.time() * 1000)
})
return jsonify({
"code": 200,
"msg": "Command executed successfully",
"transId": data.get("transId"),
"timestamp": int(time.time() * 1000)
})
except Exception as e:
logger.error(f"Error handling vehicle command: {str(e)}")
return jsonify({
"code": 500,
"msg": str(e),
"transId": data.get("transId", ""),
"timestamp": int(time.time() * 1000)
}), 500
@app.route('/api/v1/vehicles/<vehicle_id>/status', methods=['GET', 'OPTIONS'])
def get_vehicle_status_universal(vehicle_id):
"""通用无人车状态上报接口 - 符合universal_autonomous_vehicle_api规范"""
if request.method == 'OPTIONS':
return '', 204
logger.info(f"🔍 收到无人车状态查询请求: vehicleId={vehicle_id}")
# 检查vehicleId格式字母数字 3~20 位)
if not vehicle_id or len(vehicle_id) < 3 or len(vehicle_id) > 20:
return jsonify({
"code": 400,
"message": "Bad Request",
"timestamp": int(time.time() * 1000),
"error": {
"type": "VALIDATION_ERROR",
"details": "vehicleId must be 3-20 alphanumeric characters",
"field": "vehicleId"
}
}), 400
# 检查车辆是否存在
vehicle_exists = False
for v in unmanned_vehicle_data:
if v["vehicleNo"] == vehicle_id:
vehicle_exists = True
break
if not vehicle_exists:
return jsonify({
"code": 404,
"message": "Not Found",
"timestamp": int(time.time() * 1000),
"error": {
"type": "RESOURCE_NOT_FOUND",
"details": f"Vehicle {vehicle_id} not found",
"field": "vehicleId"
}
}), 404
try:
# 更新车辆位置数据
current_time = time.time()
global last_unmanned_vehicle_update_time
elapsed_time = current_time - last_unmanned_vehicle_update_time
if elapsed_time >= UPDATE_INTERVAL:
for vehicle in unmanned_vehicle_data:
update_vehicle_position(vehicle, UPDATE_INTERVAL)
vehicle["time"] = int(current_time * 1000)
last_unmanned_vehicle_update_time = current_time
# 生成完整的车辆状态数据
status_data = generate_comprehensive_vehicle_status(vehicle_id)
if not status_data:
return jsonify({
"code": 500,
"message": "Internal Server Error",
"timestamp": int(time.time() * 1000),
"error": {
"type": "DATA_ERROR",
"details": "Failed to generate vehicle status data",
"field": None
}
}), 500
# 处理fields查询参数
fields = request.args.get('fields')
if fields:
status_data = filter_vehicle_status_fields(status_data, fields)
# 返回成功响应
battery_level = status_data.get('batteryStatus', {}).get('mainBattery', {}).get('chargeLevel', '未知')
logger.info(f"🚗 返回无人车状态数据: vehicleId={vehicle_id}, 电量={battery_level}%")
return jsonify({
"code": 200,
"message": "success",
"timestamp": int(time.time() * 1000),
"data": status_data
})
except Exception as e:
logger.error(f"Error in get_vehicle_status_universal: {str(e)}")
return jsonify({
"code": 500,
"message": "Internal Server Error",
"timestamp": int(time.time() * 1000),
"error": {
"type": "SERVER_ERROR",
"details": str(e),
"field": None
}
}), 500
# 设置CORS
@app.after_request
def add_cors_headers(response):
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, OPTIONS'
response.headers['Access-Control-Max-Age'] = '3600'
return response
if __name__ == '__main__':
logger.info("🚗 无人车厂商平台模拟服务启动中...")
logger.info("📍 服务端点:")
logger.info(" - 车辆控制指令: POST /api/VehicleCommandInfo")
logger.info(" - 通用状态接口: GET /api/v1/vehicles/<vehicle_id>/status")
logger.info(" - 车辆状态获取: GET /openApi/getVehicleStatus")
logger.info(f"🌐 服务地址: http://localhost:8091")
logger.info("🔐 认证Token: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9...")
app.run(host='localhost', port=8091, debug=True)