CollisionAvoidance/tools/mock_server.py

169 lines
6.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, jsonify, make_response
import math
import time
import random
import logging
from werkzeug.serving import WSGIRequestHandler
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
app = Flask(__name__)
# 机场参考点(青岛胶东国际机场坐标)
AIRPORT_LAT = 36.361999
AIRPORT_LON = 120.088003
class CollisionScenario:
def __init__(self):
self.start_time = time.time()
# 计算相遇点(在机场参考点)
self.collision_point = (AIRPORT_LON, AIRPORT_LAT)
# 飞机初始位置从西向东距离相遇点165米约3秒到达
self.aircraft_start_pos = (
AIRPORT_LON - (165 / (111000 * math.cos(math.radians(AIRPORT_LAT)))), # 165米对应的经度差
AIRPORT_LAT
)
# 飞机基准速度(米/秒)
self.aircraft_base_speed = 55.0 # 约200公里/小时
# 车辆初始位置从南向北距离相遇点66米约3秒到达
self.vehicle_start_pos = (
AIRPORT_LON,
AIRPORT_LAT - (66 / 111000) # 66米对应的纬度差
)
# 车辆基准速度(米/秒)
self.vehicle_base_speed = 22.0 # 约80公里/小时
# 场景持续时间(秒)
self.scenario_duration = 6 # 减少到6秒让场景更紧凑
logger.info("Scenario initialized:")
logger.info(f"Aircraft start: lon={self.aircraft_start_pos[0]:.6f}, lat={self.aircraft_start_pos[1]:.6f}")
logger.info(f"Vehicle start: lon={self.vehicle_start_pos[0]:.6f}, lat={self.vehicle_start_pos[1]:.6f}")
logger.info(f"Collision point: lon={self.collision_point[0]:.6f}, lat={self.collision_point[1]:.6f}")
def add_speed_variation(self, base_speed):
"""添加速度随机波动±5%"""
variation = random.uniform(-0.05, 0.05) # ±5%的波动
return base_speed * (1 + variation)
def get_current_positions(self):
current_time = time.time()
elapsed_time = current_time - self.start_time
if elapsed_time > self.scenario_duration:
self.start_time = current_time
elapsed_time = 0
logger.info("Scenario reset")
# 计算实际速度(添加随机波动)
aircraft_speed = self.add_speed_variation(self.aircraft_base_speed)
vehicle_speed = self.add_speed_variation(self.vehicle_base_speed)
# 计算位移(米)
aircraft_distance = aircraft_speed * elapsed_time
vehicle_distance = vehicle_speed * elapsed_time
# 转换为经纬度变化1度约等于111000米
aircraft_dlon = aircraft_distance / (111000 * math.cos(math.radians(AIRPORT_LAT)))
vehicle_dlat = vehicle_distance / 111000
# 计算当前位置
aircraft_pos = (
self.aircraft_start_pos[0] + aircraft_dlon,
self.aircraft_start_pos[1]
)
vehicle_pos = (
self.vehicle_start_pos[0],
self.vehicle_start_pos[1] + vehicle_dlat
)
# 记录运动信息
logger.info(f"Aircraft speed={aircraft_speed:.2f}m/s, heading=90.00°")
logger.info(f"Vehicle speed={vehicle_speed:.2f}m/s, heading=0.00°")
# 计算到相遇点的距离
aircraft_dist = math.sqrt(
((aircraft_pos[0] - self.collision_point[0]) * 111000 * math.cos(math.radians(AIRPORT_LAT))) ** 2 +
((aircraft_pos[1] - self.collision_point[1]) * 111000) ** 2
)
vehicle_dist = math.sqrt(
((vehicle_pos[0] - self.collision_point[0]) * 111000 * math.cos(math.radians(AIRPORT_LAT))) ** 2 +
((vehicle_pos[1] - self.collision_point[1]) * 111000) ** 2
)
logger.info(f"Distance to collision point: Aircraft={aircraft_dist:.1f}m, Vehicle={vehicle_dist:.1f}m")
return {
'aircraft': aircraft_pos,
'vehicle': vehicle_pos
}
scenario = CollisionScenario()
@app.route('/api/getCurrentFlightPositions')
def get_flight_positions():
try:
logger.info("Handling flight positions request")
positions = scenario.get_current_positions()
aircraft = [{
'flightNo': 'CES2501',
'longitude': positions['aircraft'][0],
'latitude': positions['aircraft'][1],
'time': time.time(),
'altitude': 5.0, # 地面滑行高度
'trackNumber': 'TN001'
}]
response = make_response(jsonify(aircraft))
response.headers['Content-Type'] = 'application/json'
response.headers['Connection'] = 'close'
return response
except Exception as e:
logger.error(f"Error generating flight data: {e}")
return make_response(jsonify({"error": str(e)}), 500)
@app.route('/api/getCurrentVehiclePositions')
def get_vehicle_positions():
try:
logger.info("Handling vehicle positions request")
positions = scenario.get_current_positions()
vehicles = [{
'vehicleNo': 'VEH001',
'longitude': positions['vehicle'][0],
'latitude': positions['vehicle'][1],
'time': time.time()
}]
response = make_response(jsonify(vehicles))
response.headers['Content-Type'] = 'application/json'
response.headers['Connection'] = 'close'
return response
except Exception as e:
logger.error(f"Error generating vehicle data: {e}")
return make_response(jsonify({"error": str(e)}), 500)
if __name__ == '__main__':
try:
logger.info("Starting mock server on port 8080")
app.run(
host='0.0.0.0',
port=8080,
debug=False,
threaded=True,
use_reloader=False
)
except Exception as e:
logger.error(f"Failed to start server: {e}")