QDAirPortBackendTest/server.py
shan 8ff3cbb096 Add vehicle details API and update coordinates
- Add /api/vehicle_details endpoint to query vehicle list
- Update coordinates to Qingdao area (36.367216, 120.082999)
- Set longitude to 0 in vehicle status response

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-23 09:57:59 +08:00

407 lines
12 KiB
Python

import asyncio
import os
import random
import time
from aiohttp import web
def now_ms() -> float:
return time.time() * 1000
def now_s() -> float:
return time.time()
def build_vehicle_status(vehicle_id: str) -> dict:
timestamp = now_ms()
return {
"code": 200,
"message": "success",
"data": {
"vehicleInfo": {"vehicleId": vehicle_id},
"operationalStatus": {
"powerStatus": "ON",
"systemHealth": "HEALTHY",
"operationalMode": "AUTONOMOUS",
"emergencyStatus": "NORMAL",
"lastHeartbeat": timestamp,
},
"controlStatus": {
"controlMode": "MANUAL",
"controlAuthority": "SYSTEM",
"remoteControlActive": True,
},
"motionStatus": {
"position": {
"latitude": 36.367216,
"longitude": 0,
},
"velocity": {
"speed": 0,
"direction": -1.0080692e-10,
},
},
"safetyStatus": {
"collisionAvoidanceActive": True,
"emergencyBrakingReady": True,
"pathPlanningStatus": "ACTIVE",
"obstacleDetectionStatus": "ACTIVE",
"minimumRiskManeuverTriggered": True,
},
"sensorStatus": {
"gps": {
"status": "ACTIVE",
"accuracy": 0.5,
"lastUpdate": timestamp,
}
},
"batteryStatus": {
"mainBattery": {
"chargeLevel": "0.0",
"voltage": 0,
"current": 0,
"temperature": 0,
"chargingStatus": "DISCHARGING",
}
},
"communicationStatus": {
"v2xStatus": "CONNECTED",
"cellularSignalStrength": 0,
"wifiStatus": "CONNECTED",
"cloudConnectivity": "ONLINE",
},
"missionContext": {
"currentMission": {
"missionId": None,
"missionType": "",
"startTime": 0,
"estimatedEndTime": 0,
"progress": 0,
"totalMileage": 0,
},
"waypoints": [],
},
},
"timestamp": timestamp,
}
def build_vehicle_order_info(vehicle_id: str) -> dict:
return {
"data": {
"messageName": "VehicleOrderInfo",
"businessKey": str(int(now_ms())),
"tenantId": "UpHang",
"processDefKey": "UpHang",
"vehicleId": vehicle_id,
"jobStatus": 1,
"abortBy": None,
"orderStatus": "",
"cancelReason": None,
"jobStage": "HANG_UP",
"jobStageTag": "WORKING",
"jobStageVehicle": 0,
"origin": "FMS",
"jobType": "UpHang",
"qctpCranId": "",
"craneID": None,
"passingLocation": "",
"isNeedToTs": -1,
"isNeedReshuffGate": 0,
"movementType": "ProcessTask",
"vesselVisitID": None,
"loadable": False,
"executeTogether": False,
"doubleYardSpecialStack": None,
"uniqueOrderIDOrigin": "",
"firstJobId": "",
"destination": {
"logicalLocation": {
"area": "YARD",
"block": "BU01",
"lane": "01",
"stack": "001",
"tier": "",
},
"portLocation": {
"fmsX": None,
"fmsY": None,
"fmsZ": None,
"orientation": None,
},
},
"points": {},
"jobContent": {
"logicalLocation": {
"area": "YARD",
"block": "BU01",
"lane": "01",
"stack": "001",
}
},
"workflowNodes": [
{
"node": "START",
"time": now_s(),
"origin": "AUTO",
"state": True,
},
{
"node": "END",
"time": now_s(),
"origin": "AUTO",
"state": False,
},
],
"isSecondBox": False,
"boxBelongsJob": None,
"trainId": "",
"mainId": 1134,
"createOn": "2026-01-06T02:32:56.685437+00:00",
"updateOn": "2026-01-06T02:32:56.685437+00:00",
"plannedContainerDestinationList": [],
"jobExtra": {"hangType": "large", "hangNum": "4", "uploadNum": "0"},
"jobAdd": [
{
"destPointType": "gateDestination",
"destination": {"area": "GateCheck", "block": "GC01", "lane": "01", "stack": "001"},
},
{
"destPointType": "upHangDestination",
"destination": {"area": "UpHang", "block": "UH01", "lane": "01", "stack": "001"},
},
{
"destPointType": "recvConDestination",
"destination": {"area": "RecvCon", "block": "RC01", "lane": "01", "stack": "001"},
},
{
"destPointType": "conveyorDestination",
"destination": {"area": "Conveyor", "block": "CV01", "lane": "01", "stack": "001"},
},
{
"destPointType": "downHangDestination",
"destination": {"area": "DownHang", "block": "DH01", "lane": "01", "stack": "001"},
},
],
}
}
def build_vehicle_details(vehicle_id: str) -> dict:
return {
"messageName": "VehicleDetails",
"vehicleId": vehicle_id,
"vin": None,
"manufacturer": None,
"vehicleType": "QTRUCK",
"createAt": "2025-08-05T07:54:09.721537+00:00",
"updateAt": "2025-11-06T07:40:30.789366+00:00",
"color": None,
"engineType": None,
}
def build_vehicle_login_status(vehicle_id: str, login: bool) -> dict:
return {
"messageName": "VehicleLoginStatus",
"vehicleId": vehicle_id,
"loginStatus": "login" if login else "logout",
}
def build_vehicle_chassis_info(vehicle_id: str) -> dict:
return {
"messageName": "VehicleChassisInfo",
"vehicleId": vehicle_id,
"reportAt": now_s(),
"sys_info": {
"i_at_id": vehicle_id,
"i_driving_mode": "1",
"overweight_speed_limit": 0,
"state_info": {
"d_speed_kmph": random.choice([0, 5, 10]),
"i_vehicle_miles_traveled": "0",
"d_battery_available": f"{random.uniform(10, 100):.1f}",
"i_gear_position": 1,
"d_battery_soh": "0.0",
},
},
}
def build_vehicle_suspend_report(vehicle_id: str) -> dict:
return {
"messageName": "VehicleSuspendReport",
"vehicleId": vehicle_id,
"suspendStatus": 0,
"origin": "device",
}
def build_vehicle_tailer_num(vehicle_id: str) -> dict:
return {
"messageName": "VehicleTailerNum",
"vehicleId": vehicle_id,
"tailerNum": "0",
}
def build_vehicle_position_info(vehicle_id: str) -> dict:
return {
"messageName": "VehiclePositionInfo",
"vehicleId": vehicle_id,
"updateAt": "20260106T024227963Z",
"update_at": "20260106T024227963Z",
"reportAt": now_s(),
"report_at": now_s(),
"action": 0,
"z": 0,
"x": 36.367216 + random.uniform(-0.001, 0.001),
"y": 120.082999 + random.uniform(-0.001, 0.001),
"direction": 1,
"v": 0,
"theta": -1.5224342,
"trailer": {
"x": 36.367216,
"y": 120.082999,
"theta": 100000,
},
}
def build_navi_short_path_report(vehicle_id: str) -> dict:
return {
"vehicleId": vehicle_id,
"vehicleMissionId": "04078d1e-eaa8-11f0-8723-fa30eca52200",
"destinationType": 0,
"shortNavi": False,
"path": [
{"x": 36.367216, "y": 120.082999, "theta": -1.5224548170138026},
{"x": 36.368216, "y": 120.083999, "theta": -1.5224956618374788},
{"x": 36.369216, "y": 120.084999, "theta": -1.5240533769087043},
{"x": 36.370216, "y": 120.085999, "theta": -1.5256081246300013},
{"x": 36.371216, "y": 120.086999, "theta": -1.5239545312967973},
],
"messageName": "NaviShortPathReport",
}
def build_fms_message(vehicle_id: str) -> dict:
return {
"vehicleID": vehicle_id,
"code": "4119010",
"type": 1,
"level": 4,
"isActive": 1,
"longTermDisplayMessage": False,
"description": "里程计信息超时",
"ts": "2026-01-06T03:36:09.637195+00:00",
"messageName": "GetFmsMessage",
}
async def handle_vehicle_status(request: web.Request) -> web.Response:
vehicle_id = request.match_info.get("vehicleId") or request.query.get("vehicleId") or "AET01"
return web.json_response(build_vehicle_status(vehicle_id))
async def handle_vehicle_details(request: web.Request) -> web.Response:
return web.json_response({
"code": 200,
"message": "success",
"data": [
"AET02",
"AET01"
],
"timestamp": now_ms()
})
async def ws_at_manager(request: web.Request) -> web.WebSocketResponse:
vehicle_id = request.query.get("vehicleId") or "AET01"
ws = web.WebSocketResponse()
await ws.prepare(request)
messages = [
build_vehicle_order_info(vehicle_id),
build_vehicle_details(vehicle_id),
build_vehicle_login_status(vehicle_id, True),
build_vehicle_chassis_info(vehicle_id),
build_vehicle_suspend_report(vehicle_id),
build_vehicle_tailer_num(vehicle_id),
build_fms_message(vehicle_id),
]
async def send_loop() -> None:
index = 0
while not ws.closed:
await ws.send_json(messages[index % len(messages)])
index += 1
await asyncio.sleep(1)
task = asyncio.create_task(send_loop())
async for msg in ws:
if msg.type == web.WSMsgType.ERROR:
break
task.cancel()
return ws
async def ws_at_manager_bsm(request: web.Request) -> web.WebSocketResponse:
vehicle_id = request.query.get("vehicleId") or "AET01"
ws = web.WebSocketResponse()
await ws.prepare(request)
async def send_loop() -> None:
while not ws.closed:
await ws.send_json(build_vehicle_position_info(vehicle_id))
await asyncio.sleep(1)
task = asyncio.create_task(send_loop())
async for msg in ws:
if msg.type == web.WSMsgType.ERROR:
break
task.cancel()
return ws
async def ws_at_manager_path(request: web.Request) -> web.WebSocketResponse:
vehicle_id = request.query.get("vehicleId") or "AET01"
ws = web.WebSocketResponse()
await ws.prepare(request)
async def send_loop() -> None:
while not ws.closed:
await ws.send_json(build_navi_short_path_report(vehicle_id))
await asyncio.sleep(2)
task = asyncio.create_task(send_loop())
async for msg in ws:
if msg.type == web.WSMsgType.ERROR:
break
task.cancel()
return ws
def create_app() -> web.Application:
app = web.Application()
app.router.add_post("/api/vehicle_manager/v1/vehicles/{vehicleId}/status", handle_vehicle_status)
app.router.add_get("/api/vehicle_details", handle_vehicle_details)
app.router.add_get("/ws/at_manager", ws_at_manager)
app.router.add_get("/ws/at_manager_bsm", ws_at_manager_bsm)
app.router.add_get("/ws/at_manager_path", ws_at_manager_path)
return app
if __name__ == "__main__":
host = os.getenv("HOST", "0.0.0.0")
port = int(os.getenv("PORT", "8020"))
web.run_app(create_app(), host=host, port=port)