diff --git a/tools/mock_server.py b/tools/mock_server.py index 9588ce5..853d8a3 100644 --- a/tools/mock_server.py +++ b/tools/mock_server.py @@ -4,6 +4,7 @@ import math import random import logging import os +import uuid # 创建 logs 目录(如果不存在) if not os.path.exists('logs'): @@ -1029,5 +1030,133 @@ def get_vehicle_status(): } }) +# 无人车位置数据生成函数 +def generate_unmanned_vehicle_location_data(): + """生成无人车位置数据,符合官方API格式""" + unmanned_vehicles = [] + + # 从现有vehicle_data中筛选无人车(QN开头的车辆) + for vehicle in vehicle_data: + if vehicle["vehicleNo"].startswith("QN"): + location_info = { + "transId": str(uuid.uuid4()), + "timestamp": int(time.time() * 1000), + "vehicleID": vehicle["vehicleNo"], + "latitude": vehicle["latitude"], + "longitude": vehicle["longitude"], + "speed": vehicle["speed"] / 3.6, # 转换为m/s + "direction": math.radians(vehicle["direction"]) # 转换为弧度 + } + unmanned_vehicles.append(location_info) + + return unmanned_vehicles + +# 无人车状态数据生成函数 +def generate_unmanned_vehicle_state_data(vehicle_id=None, is_single=True): + """生成无人车状态数据,符合官方API格式""" + vehicle_states_data = [] + + if is_single and vehicle_id: + # 单个车辆状态查询 + vehicle_state = vehicle_states.get(vehicle_id) + if vehicle_state: + state_info = { + "transId": str(uuid.uuid4()), + "timestamp": int(time.time() * 1000), + "vehicleID": vehicle_id, + "loginState": True, + "faultInfo": [], + "activeSafety": not vehicle_state.is_running, + "RC": False, + "Command": 1 if vehicle_state.current_command == "ALERT" else 0, + "airportInfo": [], + "vehicleMode": 2, # 自动模式 + "gearState": 2, # D档 + "chassisReady": vehicle_state.is_running, + "collisionStatus": False, + "clearance": 1 if vehicle_state.is_running else 0, + "turnSignalStstus": 0, + "pointCloud": [] + } + vehicle_states_data.append(state_info) + else: + # 所有车辆状态查询 + for vehicle in vehicle_data: + if vehicle["vehicleNo"].startswith("QN"): + vehicle_state = vehicle_states.get(vehicle["vehicleNo"]) + state_info = { + "transId": str(uuid.uuid4()), + "timestamp": int(time.time() * 1000), + "vehicleID": vehicle["vehicleNo"], + "loginState": True, + "faultInfo": [], + "activeSafety": not vehicle_state.is_running if vehicle_state else False, + "RC": False, + "Command": 1 if vehicle_state and vehicle_state.current_command == "ALERT" else 0, + "airportInfo": [], + "vehicleMode": 2, + "gearState": 2, + "chassisReady": vehicle_state.is_running if vehicle_state else True, + "collisionStatus": False, + "clearance": 1 if vehicle_state and vehicle_state.is_running else 0, + "turnSignalStstus": 0, + "pointCloud": [] + } + vehicle_states_data.append(state_info) + + return vehicle_states_data + +# 无人车API接口 +@app.route('/api/VehicleLocationInfo', methods=['GET', 'OPTIONS']) +def get_unmanned_vehicle_location(): + """无人车位置上报接口""" + if request.method == 'OPTIONS': + return '', 204 + + try: + # 更新车辆位置 + current_time = time.time() + global last_vehicle_update_time + elapsed_time = current_time - last_vehicle_update_time + + if elapsed_time >= UPDATE_INTERVAL: + for vehicle in vehicle_data: + update_vehicle_position(vehicle, UPDATE_INTERVAL) + vehicle["time"] = int(current_time * 1000) + last_vehicle_update_time = current_time + + # 生成无人车位置数据 + location_data = generate_unmanned_vehicle_location_data() + + print(f"返回无人车位置数据,数量: {len(location_data)}") + return jsonify(location_data) + + except Exception as e: + print(f"Error in get_unmanned_vehicle_location: {str(e)}") + return jsonify([]), 500 + +@app.route('/api/VehicleStateInfo', methods=['POST', 'OPTIONS']) +def get_unmanned_vehicle_state(): + """无人车状态查询接口""" + if request.method == 'OPTIONS': + return '', 204 + + try: + data = request.json + vehicle_id = data.get("vehicleID") if data else None + is_single = data.get("isSingle", True) if data else False + + print(f"收到无人车状态查询请求: vehicle_id={vehicle_id}, is_single={is_single}") + + # 生成无人车状态数据 + state_data = generate_unmanned_vehicle_state_data(vehicle_id, is_single) + + print(f"返回无人车状态数据,数量: {len(state_data)}") + return jsonify(state_data) + + except Exception as e: + print(f"Error in get_unmanned_vehicle_state: {str(e)}") + return jsonify([]), 500 + if __name__ == '__main__': - app.run(host='localhost', port=8081, debug=True) \ No newline at end of file + app.run(host='localhost', port=8090, debug=True) \ No newline at end of file