import asyncio import os import random import time from aiohttp import web def now_ms() -> float: return time.time() * 1000 def now_s() -> float: return time.time() def build_vehicle_status(vehicle_id: str) -> dict: timestamp = now_ms() return { "code": 200, "message": "success", "data": { "vehicleInfo": {"vehicleId": vehicle_id}, "operationalStatus": { "powerStatus": "ON", "systemHealth": "HEALTHY", "operationalMode": "AUTONOMOUS", "emergencyStatus": "NORMAL", "lastHeartbeat": timestamp, }, "controlStatus": { "controlMode": "MANUAL", "controlAuthority": "SYSTEM", "remoteControlActive": True, }, "motionStatus": { "position": { "latitude": 36.367216, "longitude": 0, }, "velocity": { "speed": 0, "direction": -1.0080692e-10, }, }, "safetyStatus": { "collisionAvoidanceActive": True, "emergencyBrakingReady": True, "pathPlanningStatus": "ACTIVE", "obstacleDetectionStatus": "ACTIVE", "minimumRiskManeuverTriggered": True, }, "sensorStatus": { "gps": { "status": "ACTIVE", "accuracy": 0.5, "lastUpdate": timestamp, } }, "batteryStatus": { "mainBattery": { "chargeLevel": "0.0", "voltage": 0, "current": 0, "temperature": 0, "chargingStatus": "DISCHARGING", } }, "communicationStatus": { "v2xStatus": "CONNECTED", "cellularSignalStrength": 0, "wifiStatus": "CONNECTED", "cloudConnectivity": "ONLINE", }, "missionContext": { "currentMission": { "missionId": None, "missionType": "", "startTime": 0, "estimatedEndTime": 0, "progress": 0, "totalMileage": 0, }, "waypoints": [], }, }, "timestamp": timestamp, } def build_vehicle_order_info(vehicle_id: str) -> dict: return { "data": { "messageName": "VehicleOrderInfo", "businessKey": str(int(now_ms())), "tenantId": "UpHang", "processDefKey": "UpHang", "vehicleId": vehicle_id, "jobStatus": 1, "abortBy": None, "orderStatus": "", "cancelReason": None, "jobStage": "HANG_UP", "jobStageTag": "WORKING", "jobStageVehicle": 0, "origin": "FMS", "jobType": "UpHang", "qctpCranId": "", "craneID": None, "passingLocation": "", "isNeedToTs": -1, "isNeedReshuffGate": 0, "movementType": "ProcessTask", "vesselVisitID": None, "loadable": False, "executeTogether": False, "doubleYardSpecialStack": None, "uniqueOrderIDOrigin": "", "firstJobId": "", "destination": { "logicalLocation": { "area": "YARD", "block": "BU01", "lane": "01", "stack": "001", "tier": "", }, "portLocation": { "fmsX": None, "fmsY": None, "fmsZ": None, "orientation": None, }, }, "points": {}, "jobContent": { "logicalLocation": { "area": "YARD", "block": "BU01", "lane": "01", "stack": "001", } }, "workflowNodes": [ { "node": "START", "time": now_s(), "origin": "AUTO", "state": True, }, { "node": "END", "time": now_s(), "origin": "AUTO", "state": False, }, ], "isSecondBox": False, "boxBelongsJob": None, "trainId": "", "mainId": 1134, "createOn": "2026-01-06T02:32:56.685437+00:00", "updateOn": "2026-01-06T02:32:56.685437+00:00", "plannedContainerDestinationList": [], "jobExtra": {"hangType": "large", "hangNum": "4", "uploadNum": "0"}, "jobAdd": [ { "destPointType": "gateDestination", "destination": {"area": "GateCheck", "block": "GC01", "lane": "01", "stack": "001"}, }, { "destPointType": "upHangDestination", "destination": {"area": "UpHang", "block": "UH01", "lane": "01", "stack": "001"}, }, { "destPointType": "recvConDestination", "destination": {"area": "RecvCon", "block": "RC01", "lane": "01", "stack": "001"}, }, { "destPointType": "conveyorDestination", "destination": {"area": "Conveyor", "block": "CV01", "lane": "01", "stack": "001"}, }, { "destPointType": "downHangDestination", "destination": {"area": "DownHang", "block": "DH01", "lane": "01", "stack": "001"}, }, ], } } def build_vehicle_details(vehicle_id: str) -> dict: return { "messageName": "VehicleDetails", "vehicleId": vehicle_id, "vin": None, "manufacturer": None, "vehicleType": "QTRUCK", "createAt": "2025-08-05T07:54:09.721537+00:00", "updateAt": "2025-11-06T07:40:30.789366+00:00", "color": None, "engineType": None, } def build_vehicle_login_status(vehicle_id: str, login: bool) -> dict: return { "messageName": "VehicleLoginStatus", "vehicleId": vehicle_id, "loginStatus": "login" if login else "logout", } def build_vehicle_chassis_info(vehicle_id: str) -> dict: return { "messageName": "VehicleChassisInfo", "vehicleId": vehicle_id, "reportAt": now_s(), "sys_info": { "i_at_id": vehicle_id, "i_driving_mode": "1", "overweight_speed_limit": 0, "state_info": { "d_speed_kmph": random.choice([0, 5, 10]), "i_vehicle_miles_traveled": "0", "d_battery_available": f"{random.uniform(10, 100):.1f}", "i_gear_position": 1, "d_battery_soh": "0.0", }, }, } def build_vehicle_suspend_report(vehicle_id: str) -> dict: return { "messageName": "VehicleSuspendReport", "vehicleId": vehicle_id, "suspendStatus": 0, "origin": "device", } def build_vehicle_tailer_num(vehicle_id: str) -> dict: return { "messageName": "VehicleTailerNum", "vehicleId": vehicle_id, "tailerNum": "0", } def build_vehicle_position_info(vehicle_id: str) -> dict: return { "messageName": "VehiclePositionInfo", "vehicleId": vehicle_id, "updateAt": "20260106T024227963Z", "update_at": "20260106T024227963Z", "reportAt": now_s(), "report_at": now_s(), "action": 0, "z": 0, "x": 36.367216 + random.uniform(-0.001, 0.001), "y": 120.082999 + random.uniform(-0.001, 0.001), "direction": 1, "v": 0, "theta": -1.5224342, "trailer": { "x": 36.367216, "y": 120.082999, "theta": 100000, }, } def build_navi_short_path_report(vehicle_id: str) -> dict: return { "vehicleId": vehicle_id, "vehicleMissionId": "04078d1e-eaa8-11f0-8723-fa30eca52200", "destinationType": 0, "shortNavi": False, "path": [ {"x": 36.367216, "y": 120.082999, "theta": -1.5224548170138026}, {"x": 36.368216, "y": 120.083999, "theta": -1.5224956618374788}, {"x": 36.369216, "y": 120.084999, "theta": -1.5240533769087043}, {"x": 36.370216, "y": 120.085999, "theta": -1.5256081246300013}, {"x": 36.371216, "y": 120.086999, "theta": -1.5239545312967973}, ], "messageName": "NaviShortPathReport", } def build_fms_message(vehicle_id: str) -> dict: return { "vehicleID": vehicle_id, "code": "4119010", "type": 1, "level": 4, "isActive": 1, "longTermDisplayMessage": False, "description": "里程计信息超时", "ts": "2026-01-06T03:36:09.637195+00:00", "messageName": "GetFmsMessage", } async def handle_vehicle_status(request: web.Request) -> web.Response: vehicle_id = request.match_info.get("vehicleId") or request.query.get("vehicleId") or "AET01" return web.json_response(build_vehicle_status(vehicle_id)) async def handle_vehicle_details(request: web.Request) -> web.Response: return web.json_response({ "code": 200, "message": "success", "data": [ "AET02", "AET01" ], "timestamp": now_ms() }) async def ws_at_manager(request: web.Request) -> web.WebSocketResponse: vehicle_id = request.query.get("vehicleId") or "AET01" ws = web.WebSocketResponse() await ws.prepare(request) messages = [ build_vehicle_order_info(vehicle_id), build_vehicle_details(vehicle_id), build_vehicle_login_status(vehicle_id, True), build_vehicle_chassis_info(vehicle_id), build_vehicle_suspend_report(vehicle_id), build_vehicle_tailer_num(vehicle_id), build_fms_message(vehicle_id), ] async def send_loop() -> None: index = 0 while not ws.closed: await ws.send_json(messages[index % len(messages)]) index += 1 await asyncio.sleep(1) task = asyncio.create_task(send_loop()) async for msg in ws: if msg.type == web.WSMsgType.ERROR: break task.cancel() return ws async def ws_at_manager_bsm(request: web.Request) -> web.WebSocketResponse: vehicle_id = request.query.get("vehicleId") or "AET01" ws = web.WebSocketResponse() await ws.prepare(request) async def send_loop() -> None: while not ws.closed: await ws.send_json(build_vehicle_position_info(vehicle_id)) await asyncio.sleep(1) task = asyncio.create_task(send_loop()) async for msg in ws: if msg.type == web.WSMsgType.ERROR: break task.cancel() return ws async def ws_at_manager_path(request: web.Request) -> web.WebSocketResponse: vehicle_id = request.query.get("vehicleId") or "AET01" ws = web.WebSocketResponse() await ws.prepare(request) async def send_loop() -> None: while not ws.closed: await ws.send_json(build_navi_short_path_report(vehicle_id)) await asyncio.sleep(2) task = asyncio.create_task(send_loop()) async for msg in ws: if msg.type == web.WSMsgType.ERROR: break task.cancel() return ws def create_app() -> web.Application: app = web.Application() app.router.add_post("/api/vehicle_manager/v1/vehicles/{vehicleId}/status", handle_vehicle_status) app.router.add_get("/api/vehicle_details", handle_vehicle_details) app.router.add_get("/ws/at_manager", ws_at_manager) app.router.add_get("/ws/at_manager_bsm", ws_at_manager_bsm) app.router.add_get("/ws/at_manager_path", ws_at_manager_path) return app if __name__ == "__main__": host = os.getenv("HOST", "0.0.0.0") port = int(os.getenv("PORT", "8020")) web.run_app(create_app(), host=host, port=port)