dongchang/main.py

114 lines
3.4 KiB
Python

import os
import sys
import yaml
import logging
from datetime import datetime
from src.camera_handler import RTSPCamera
from src.person_detector import PersonDetector
from src.distance_estimator import DistanceEstimator
from src.api_server import create_app
# 配置日志
def setup_logging():
"""设置日志配置"""
# 创建logs目录
if not os.path.exists('logs'):
os.makedirs('logs')
# 设置日志格式
log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(
level=logging.INFO,
format=log_format,
handlers=[
logging.FileHandler(
f'logs/app_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'
),
logging.StreamHandler(sys.stdout)
]
)
return logging.getLogger(__name__)
def load_config(config_path='config/config.yaml'):
"""加载配置文件"""
try:
with open(config_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
except Exception as e:
raise RuntimeError(f"无法加载配置文件: {str(e)}")
def init_camera(config, logger):
"""初始化相机"""
logger.info("正在初始化相机...")
camera_config = config['camera']
camera = RTSPCamera(
rtsp_url=camera_config['rtsp_url'],
fps=camera_config.get('fps', 30)
)
return camera
def init_detector(config, logger):
"""初始化人物检测器"""
logger.info("正在初始化人物检测器...")
detector_config = config['detector']
detector = PersonDetector(
model_path=detector_config['model_path'],
conf_threshold=detector_config.get('conf_threshold', 0.5)
)
detector.detect(None)
return detector
def init_estimator(config, logger):
"""初始化距离估计器"""
logger.info("正在初始化距离估计器...")
estimator_config = config['estimator']
estimator = DistanceEstimator(
focal_length_mm=estimator_config['focal_length_mm'],
sensor_width_mm=estimator_config['sensor_width_mm'],
sensor_height_mm=estimator_config['sensor_height_mm'],
image_width_pixels=estimator_config['image_width_pixels'],
image_height_pixels=estimator_config['image_height_pixels'],
camera_height_mm=estimator_config.get('camera_height_mm', 1700),
camera_tilt_angle=estimator_config.get('camera_tilt_angle', 0)
)
return estimator
def main():
"""主函数"""
# 设置日志
logger = setup_logging()
logger.info("启动服务...")
try:
# 加载配置
config = load_config()
logger.info("配置加载成功")
# 初始化组件
camera = init_camera(config, logger)
detector = init_detector(config, logger)
estimator = init_estimator(config, logger)
# 创建Flask应用
app = create_app(camera, detector, estimator)
# 获取服务配置
server_config = config['server']
host = server_config.get('host', '0.0.0.0')
port = server_config.get('port', 5000)
debug = server_config.get('debug', False)
# 启动服务
logger.info(f"服务启动在 http://{host}:{port}")
app.run(
host=host,
port=port,
debug=debug
)
except Exception as e:
logger.error(f"服务启动失败: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()