CollisionAvoidance/tools/mock_server.py

134 lines
4.3 KiB
Python

from flask import Flask, jsonify, make_response, request
import math
import time
import random
import logging
from werkzeug.serving import WSGIRequestHandler
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 自定义请求处理器
class CustomRequestHandler(WSGIRequestHandler):
protocol_version = "HTTP/1.1"
def handle(self):
try:
logger.info("Starting to handle request")
super().handle()
logger.info("Finished handling request")
except Exception as e:
logger.error(f"Error handling request: {e}")
finally:
self.close_connection = True
logger.info("Connection closed")
app = Flask(__name__)
# 机场参考点(青岛胶州国际机场坐标)
AIRPORT_LAT = 36.361999 # 北纬36°21'43.2"
AIRPORT_LON = 120.088003 # 东经120°05'16.8"
def generate_mock_data():
"""生成模拟数据"""
try:
logger.info("Generating mock data")
current_time = int(time.time() * 1000)
vehicles = []
# 生成3架飞机的数据
for i in range(3):
lat_offset = math.sin(time.time() + i) * 0.001
lon_offset = math.cos(time.time() + i) * 0.001
aircraft = {
"id": f"AIRCRAFT_{i+1}",
"type": "AIRCRAFT",
"position": {
"latitude": AIRPORT_LAT + lat_offset,
"longitude": AIRPORT_LON + lon_offset
},
"velocity": {
"x": random.uniform(-10, 10),
"y": random.uniform(-10, 10)
},
"heading": random.uniform(0, 360),
"timestamp": current_time,
"altitude": random.uniform(100, 1000)
}
vehicles.append(aircraft)
# 生成5辆车的数据
for i in range(5):
lat_offset = math.sin(time.time() + i) * 0.0005
lon_offset = math.cos(time.time() + i) * 0.0005
vehicle = {
"id": f"VEHICLE_{i+1}",
"type": "VEHICLE",
"position": {
"latitude": AIRPORT_LAT + lat_offset,
"longitude": AIRPORT_LON + lon_offset
},
"velocity": {
"x": random.uniform(-5, 5),
"y": random.uniform(-5, 5)
},
"heading": random.uniform(0, 360),
"timestamp": current_time,
"altitude": 0
}
vehicles.append(vehicle)
logger.info(f"Generated {len(vehicles)} vehicles data")
return {"vehicles": vehicles}
except Exception as e:
logger.error(f"Error generating mock data: {e}")
return {"vehicles": []}
@app.route('/api/vehicles')
def get_vehicles():
"""处理获取车辆数据的请求"""
try:
logger.info(f"Received request from {request.remote_addr}")
logger.info(f"Request headers: {dict(request.headers)}")
data = generate_mock_data()
# 使用 make_response 创建响应
response = make_response(jsonify(data))
# 设置响应头
response.headers['Content-Type'] = 'application/json'
response.headers['Connection'] = 'close'
response.headers['Cache-Control'] = 'no-cache'
response.headers['Access-Control-Allow-Origin'] = '*'
logger.info(f"Response headers: {dict(response.headers)}")
logger.info(f"Response data length: {len(str(data))}")
logger.info("Sending response...")
return response
except Exception as e:
logger.error(f"Error handling request: {e}")
logger.exception("Detailed error:")
return make_response(jsonify({"error": str(e)}), 500)
finally:
logger.info("Request handling completed")
if __name__ == '__main__':
try:
logger.info("Starting mock server on port 8080...")
app.run(
host='0.0.0.0',
port=8080,
threaded=True,
use_reloader=False,
request_handler=CustomRequestHandler
)
except Exception as e:
logger.error(f"Failed to start server: {e}")