修改旧文件名, 添加ffmpeg硬件编解码支持
This commit is contained in:
parent
d15bfae372
commit
d44399dcda
474
rtsp_processor_0.py
Normal file
474
rtsp_processor_0.py
Normal file
@ -0,0 +1,474 @@
|
|||||||
|
import cv2
|
||||||
|
import numpy as np
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
import queue
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from pathlib import Path
|
||||||
|
import subprocess
|
||||||
|
import logging
|
||||||
|
from typing import List, Dict, Tuple, Optional
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from collections import deque
|
||||||
|
import json
|
||||||
|
|
||||||
|
try:
|
||||||
|
from ultralytics import YOLO
|
||||||
|
except ImportError:
|
||||||
|
print("请安装ultralytics: pip install ultralytics")
|
||||||
|
raise
|
||||||
|
|
||||||
|
# 配置日志
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
||||||
|
)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class DetectionResult:
|
||||||
|
"""检测结果数据类"""
|
||||||
|
boxes: np.ndarray
|
||||||
|
confidences: np.ndarray
|
||||||
|
class_ids: np.ndarray
|
||||||
|
class_names: List[str]
|
||||||
|
timestamp: datetime
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class AlarmConfig:
|
||||||
|
"""告警配置"""
|
||||||
|
target_classes: List[str] # 目标类别
|
||||||
|
confidence_threshold: float = 0.5 # 置信度阈值
|
||||||
|
alarm_duration: int = 10 # 告警录制时长(秒)
|
||||||
|
cooldown_duration: int = 30 # 告警冷却时间(秒)
|
||||||
|
save_path: str = "./alarm_videos" # 保存路径
|
||||||
|
|
||||||
|
class FrameBuffer:
|
||||||
|
"""帧缓冲区,用于告警录制"""
|
||||||
|
def __init__(self, max_duration: int = 60, fps: int = 25):
|
||||||
|
self.max_frames = max_duration * fps
|
||||||
|
self.buffer = deque(maxlen=self.max_frames)
|
||||||
|
self.fps = fps
|
||||||
|
|
||||||
|
def add_frame(self, frame: np.ndarray, timestamp: datetime):
|
||||||
|
self.buffer.append((frame.copy(), timestamp))
|
||||||
|
|
||||||
|
def get_frames_in_range(self, start_time: datetime, duration: int) -> List[Tuple[np.ndarray, datetime]]:
|
||||||
|
"""获取指定时间范围内的帧"""
|
||||||
|
end_time = start_time + timedelta(seconds=duration)
|
||||||
|
frames = []
|
||||||
|
|
||||||
|
for frame, timestamp in self.buffer:
|
||||||
|
if start_time <= timestamp <= end_time:
|
||||||
|
frames.append((frame, timestamp))
|
||||||
|
|
||||||
|
return frames
|
||||||
|
|
||||||
|
class YOLODetector:
|
||||||
|
"""YOLO检测器"""
|
||||||
|
def __init__(self, model_path: str = "yolov8n.pt"):
|
||||||
|
self.model = YOLO(model_path)
|
||||||
|
self.class_names = self.model.names
|
||||||
|
|
||||||
|
def detect(self, frame: np.ndarray, confidence_threshold: float = 0.5) -> DetectionResult:
|
||||||
|
"""执行目标检测"""
|
||||||
|
results = self.model(frame, conf=confidence_threshold, verbose=False)
|
||||||
|
|
||||||
|
if len(results) > 0 and results[0].boxes is not None:
|
||||||
|
boxes = results[0].boxes.xyxy.cpu().numpy()
|
||||||
|
confidences = results[0].boxes.conf.cpu().numpy()
|
||||||
|
class_ids = results[0].boxes.cls.cpu().numpy().astype(int)
|
||||||
|
class_names = [self.class_names[id] for id in class_ids]
|
||||||
|
else:
|
||||||
|
boxes = np.array([])
|
||||||
|
confidences = np.array([])
|
||||||
|
class_ids = np.array([])
|
||||||
|
class_names = []
|
||||||
|
|
||||||
|
return DetectionResult(
|
||||||
|
boxes=boxes,
|
||||||
|
confidences=confidences,
|
||||||
|
class_ids=class_ids,
|
||||||
|
class_names=class_names,
|
||||||
|
timestamp=datetime.now()
|
||||||
|
)
|
||||||
|
|
||||||
|
class AlarmManager:
|
||||||
|
"""告警管理器"""
|
||||||
|
def __init__(self, config: AlarmConfig):
|
||||||
|
self.config = config
|
||||||
|
self.last_alarm_time = None
|
||||||
|
self.is_alarming = False
|
||||||
|
self.alarm_start_time = None
|
||||||
|
|
||||||
|
# 创建保存目录
|
||||||
|
Path(config.save_path).mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
def check_alarm_trigger(self, detection_result: DetectionResult) -> bool:
|
||||||
|
"""检查是否触发告警"""
|
||||||
|
current_time = datetime.now()
|
||||||
|
|
||||||
|
# 检查冷却时间
|
||||||
|
if (self.last_alarm_time and
|
||||||
|
current_time - self.last_alarm_time < timedelta(seconds=self.config.cooldown_duration)):
|
||||||
|
return False
|
||||||
|
|
||||||
|
# 检查是否检测到目标类别
|
||||||
|
for class_name, confidence in zip(detection_result.class_names, detection_result.confidences):
|
||||||
|
if (class_name in self.config.target_classes and
|
||||||
|
confidence >= self.config.confidence_threshold):
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
def start_alarm(self) -> bool:
|
||||||
|
"""开始告警"""
|
||||||
|
if not self.is_alarming:
|
||||||
|
self.is_alarming = True
|
||||||
|
self.alarm_start_time = datetime.now()
|
||||||
|
logger.info(f"告警开始: {self.alarm_start_time}")
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def should_stop_alarm(self) -> bool:
|
||||||
|
"""检查是否应该停止告警"""
|
||||||
|
if self.is_alarming and self.alarm_start_time:
|
||||||
|
duration = (datetime.now() - self.alarm_start_time).total_seconds()
|
||||||
|
return duration >= self.config.alarm_duration
|
||||||
|
return False
|
||||||
|
|
||||||
|
def stop_alarm(self):
|
||||||
|
"""停止告警"""
|
||||||
|
if self.is_alarming:
|
||||||
|
self.is_alarming = False
|
||||||
|
self.last_alarm_time = datetime.now()
|
||||||
|
logger.info(f"告警结束: {self.last_alarm_time}")
|
||||||
|
|
||||||
|
def save_alarm_video(self, frames: List[Tuple[np.ndarray, datetime]], fps: int = 25):
|
||||||
|
"""保存告警视频"""
|
||||||
|
if not frames:
|
||||||
|
logger.warning("没有帧数据可保存")
|
||||||
|
return None
|
||||||
|
|
||||||
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||||
|
filename = f"alarm_{timestamp}.mp4"
|
||||||
|
filepath = Path(self.config.save_path) / filename
|
||||||
|
|
||||||
|
# 获取帧尺寸
|
||||||
|
height, width = frames[0][0].shape[:2]
|
||||||
|
|
||||||
|
# 创建视频写入器
|
||||||
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
|
||||||
|
out = cv2.VideoWriter(str(filepath), fourcc, fps, (width, height))
|
||||||
|
|
||||||
|
try:
|
||||||
|
for frame, _ in frames:
|
||||||
|
out.write(frame)
|
||||||
|
|
||||||
|
logger.info(f"告警视频已保存: {filepath}")
|
||||||
|
return str(filepath)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"保存视频失败: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
finally:
|
||||||
|
out.release()
|
||||||
|
|
||||||
|
class StreamServer:
|
||||||
|
"""流媒体服务器接口"""
|
||||||
|
def __init__(self, output_url: str = "rtmp://localhost:1935/live/stream"):
|
||||||
|
self.output_url = output_url
|
||||||
|
self.ffmpeg_process = None
|
||||||
|
|
||||||
|
def start_streaming(self, width: int, height: int, fps: int = 25):
|
||||||
|
"""启动流媒体推送"""
|
||||||
|
ffmpeg_cmd = [
|
||||||
|
'ffmpeg',
|
||||||
|
'-y', # 覆盖输出文件
|
||||||
|
'-f', 'rawvideo',
|
||||||
|
'-vcodec', 'rawvideo',
|
||||||
|
'-pix_fmt', 'bgr24',
|
||||||
|
'-s', f'{width}x{height}',
|
||||||
|
'-r', str(fps),
|
||||||
|
'-i', '-', # 从stdin读取
|
||||||
|
'-c:v', 'libx264',
|
||||||
|
'-pix_fmt', 'yuv420p',
|
||||||
|
'-preset', 'ultrafast',
|
||||||
|
'-f', 'flv',
|
||||||
|
self.output_url
|
||||||
|
]
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.ffmpeg_process = subprocess.Popen(
|
||||||
|
ffmpeg_cmd,
|
||||||
|
stdin=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE
|
||||||
|
)
|
||||||
|
logger.info(f"流媒体服务启动: {self.output_url}")
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"启动流媒体失败: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def send_frame(self, frame: np.ndarray):
|
||||||
|
"""发送帧到流媒体服务器"""
|
||||||
|
if self.ffmpeg_process and self.ffmpeg_process.stdin:
|
||||||
|
try:
|
||||||
|
self.ffmpeg_process.stdin.write(frame.tobytes())
|
||||||
|
self.ffmpeg_process.stdin.flush()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"发送帧失败: {e}")
|
||||||
|
|
||||||
|
def stop_streaming(self):
|
||||||
|
"""停止流媒体推送"""
|
||||||
|
if self.ffmpeg_process:
|
||||||
|
self.ffmpeg_process.stdin.close()
|
||||||
|
self.ffmpeg_process.wait()
|
||||||
|
self.ffmpeg_process = None
|
||||||
|
logger.info("流媒体服务已停止")
|
||||||
|
|
||||||
|
class RTSPProcessor:
|
||||||
|
"""RTSP流处理器"""
|
||||||
|
def __init__(self,
|
||||||
|
rtsp_url: str,
|
||||||
|
model_path: str = "yolov8n.pt",
|
||||||
|
detection_interval: int = 5,
|
||||||
|
alarm_config: AlarmConfig = None,
|
||||||
|
output_stream_url: str = "rtmp://localhost:1935/live/stream"):
|
||||||
|
|
||||||
|
self.rtsp_url = rtsp_url
|
||||||
|
self.detection_interval = detection_interval
|
||||||
|
self.frame_count = 0
|
||||||
|
self.running = False
|
||||||
|
|
||||||
|
# 初始化组件
|
||||||
|
self.detector = YOLODetector(model_path)
|
||||||
|
self.alarm_manager = AlarmManager(alarm_config or AlarmConfig(target_classes=["person"]))
|
||||||
|
self.frame_buffer = FrameBuffer()
|
||||||
|
self.stream_server = StreamServer(output_stream_url)
|
||||||
|
|
||||||
|
# 最后的检测结果
|
||||||
|
self.last_detection = None
|
||||||
|
|
||||||
|
# 线程安全队列
|
||||||
|
self.frame_queue = queue.Queue(maxsize=100)
|
||||||
|
|
||||||
|
def draw_detections(self, frame: np.ndarray, detection_result: DetectionResult) -> np.ndarray:
|
||||||
|
"""在帧上绘制检测结果"""
|
||||||
|
if detection_result is None or len(detection_result.boxes) == 0:
|
||||||
|
return frame
|
||||||
|
|
||||||
|
annotated_frame = frame.copy()
|
||||||
|
|
||||||
|
for box, confidence, class_name in zip(
|
||||||
|
detection_result.boxes,
|
||||||
|
detection_result.confidences,
|
||||||
|
detection_result.class_names
|
||||||
|
):
|
||||||
|
x1, y1, x2, y2 = box.astype(int)
|
||||||
|
|
||||||
|
# 绘制边界框
|
||||||
|
color = (0, 255, 0) # 绿色
|
||||||
|
if class_name in self.alarm_manager.config.target_classes:
|
||||||
|
color = (0, 0, 255) # 目标类别用红色
|
||||||
|
|
||||||
|
cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), color, 2)
|
||||||
|
|
||||||
|
# 绘制标签
|
||||||
|
label = f"{class_name}: {confidence:.2f}"
|
||||||
|
label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)[0]
|
||||||
|
cv2.rectangle(annotated_frame, (x1, y1 - label_size[1] - 10),
|
||||||
|
(x1 + label_size[0], y1), color, -1)
|
||||||
|
cv2.putText(annotated_frame, label, (x1, y1 - 5),
|
||||||
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
|
||||||
|
|
||||||
|
# 添加告警状态显示
|
||||||
|
if self.alarm_manager.is_alarming:
|
||||||
|
cv2.putText(annotated_frame, "ALARM ACTIVE", (10, 30),
|
||||||
|
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 3)
|
||||||
|
|
||||||
|
# 添加时间戳
|
||||||
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
cv2.putText(annotated_frame, timestamp, (10, annotated_frame.shape[0] - 10),
|
||||||
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
|
||||||
|
|
||||||
|
return annotated_frame
|
||||||
|
|
||||||
|
def process_frame(self, frame: np.ndarray) -> np.ndarray:
|
||||||
|
"""处理单帧"""
|
||||||
|
current_time = datetime.now()
|
||||||
|
detection_result = None
|
||||||
|
|
||||||
|
# 按间隔进行检测
|
||||||
|
if self.frame_count % self.detection_interval == 0:
|
||||||
|
detection_result = self.detector.detect(frame,
|
||||||
|
self.alarm_manager.config.confidence_threshold)
|
||||||
|
self.last_detection = detection_result
|
||||||
|
|
||||||
|
# 检查告警触发
|
||||||
|
if self.alarm_manager.check_alarm_trigger(detection_result):
|
||||||
|
if self.alarm_manager.start_alarm():
|
||||||
|
# 告警开始时的处理
|
||||||
|
pass
|
||||||
|
|
||||||
|
# 使用最后的检测结果绘制
|
||||||
|
annotated_frame = self.draw_detections(frame, self.last_detection)
|
||||||
|
|
||||||
|
# 添加帧到缓冲区
|
||||||
|
self.frame_buffer.add_frame(annotated_frame, current_time)
|
||||||
|
|
||||||
|
# 检查告警结束
|
||||||
|
if self.alarm_manager.should_stop_alarm():
|
||||||
|
# 获取告警期间的帧
|
||||||
|
alarm_frames = self.frame_buffer.get_frames_in_range(
|
||||||
|
self.alarm_manager.alarm_start_time,
|
||||||
|
self.alarm_manager.config.alarm_duration
|
||||||
|
)
|
||||||
|
|
||||||
|
# 保存告警视频
|
||||||
|
if alarm_frames:
|
||||||
|
threading.Thread(
|
||||||
|
target=self.alarm_manager.save_alarm_video,
|
||||||
|
args=(alarm_frames, 25),
|
||||||
|
daemon=True
|
||||||
|
).start()
|
||||||
|
|
||||||
|
self.alarm_manager.stop_alarm()
|
||||||
|
|
||||||
|
return annotated_frame
|
||||||
|
|
||||||
|
def capture_frames(self):
|
||||||
|
"""捕获RTSP流帧"""
|
||||||
|
cap = cv2.VideoCapture(self.rtsp_url)
|
||||||
|
|
||||||
|
if not cap.isOpened():
|
||||||
|
logger.error(f"无法打开RTSP流: {self.rtsp_url}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# 设置缓冲区大小
|
||||||
|
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
|
||||||
|
|
||||||
|
logger.info(f"开始捕获RTSP流: {self.rtsp_url}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
while self.running:
|
||||||
|
ret, frame = cap.read()
|
||||||
|
if not ret:
|
||||||
|
logger.warning("读取帧失败,尝试重连...")
|
||||||
|
cap.release()
|
||||||
|
time.sleep(5)
|
||||||
|
cap = cv2.VideoCapture(self.rtsp_url)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not self.frame_queue.full():
|
||||||
|
self.frame_queue.put(frame)
|
||||||
|
else:
|
||||||
|
# 丢弃最旧的帧
|
||||||
|
try:
|
||||||
|
self.frame_queue.get_nowait()
|
||||||
|
self.frame_queue.put(frame)
|
||||||
|
except queue.Empty:
|
||||||
|
pass
|
||||||
|
|
||||||
|
finally:
|
||||||
|
cap.release()
|
||||||
|
logger.info("RTSP捕获已停止")
|
||||||
|
|
||||||
|
def process_frames(self):
|
||||||
|
"""处理帧线程"""
|
||||||
|
first_frame = True
|
||||||
|
|
||||||
|
while self.running:
|
||||||
|
try:
|
||||||
|
frame = self.frame_queue.get(timeout=1)
|
||||||
|
|
||||||
|
# 初始化流媒体服务器
|
||||||
|
if first_frame:
|
||||||
|
height, width = frame.shape[:2]
|
||||||
|
if self.stream_server.start_streaming(width, height):
|
||||||
|
first_frame = False
|
||||||
|
else:
|
||||||
|
logger.error("流媒体服务器启动失败")
|
||||||
|
break
|
||||||
|
|
||||||
|
# 处理帧
|
||||||
|
processed_frame = self.process_frame(frame)
|
||||||
|
|
||||||
|
# 发送到流媒体服务器
|
||||||
|
self.stream_server.send_frame(processed_frame)
|
||||||
|
|
||||||
|
self.frame_count += 1
|
||||||
|
|
||||||
|
except queue.Empty:
|
||||||
|
continue
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"处理帧时出错: {e}")
|
||||||
|
|
||||||
|
logger.info("帧处理已停止")
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
"""启动处理器"""
|
||||||
|
self.running = True
|
||||||
|
|
||||||
|
# 启动捕获线程
|
||||||
|
capture_thread = threading.Thread(target=self.capture_frames, daemon=True)
|
||||||
|
capture_thread.start()
|
||||||
|
|
||||||
|
# 启动处理线程
|
||||||
|
process_thread = threading.Thread(target=self.process_frames, daemon=True)
|
||||||
|
process_thread.start()
|
||||||
|
|
||||||
|
logger.info("RTSP处理器已启动")
|
||||||
|
|
||||||
|
return capture_thread, process_thread
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""停止处理器"""
|
||||||
|
self.running = False
|
||||||
|
self.stream_server.stop_streaming()
|
||||||
|
logger.info("RTSP处理器已停止")
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""主函数"""
|
||||||
|
# 配置参数
|
||||||
|
RTSP_URL = "rtsp://10.0.0.17:8554/camera_test/2" # 替换为实际RTSP地址
|
||||||
|
MODEL_PATH = "yolov8n.pt" # YOLO模型路径
|
||||||
|
DETECTION_INTERVAL = 5 # 每5帧检测一次
|
||||||
|
OUTPUT_STREAM_URL = "rtmp://localhost:1935/live/processed" # 输出流地址
|
||||||
|
|
||||||
|
# 告警配置
|
||||||
|
alarm_config = AlarmConfig(
|
||||||
|
target_classes=["person"], # 目标类别
|
||||||
|
confidence_threshold=0.6,
|
||||||
|
alarm_duration=15, # 告警录制15秒
|
||||||
|
cooldown_duration=60*5, # 冷却60秒
|
||||||
|
save_path="./alarm_videos"
|
||||||
|
)
|
||||||
|
|
||||||
|
# 创建处理器
|
||||||
|
processor = RTSPProcessor(
|
||||||
|
rtsp_url=RTSP_URL,
|
||||||
|
model_path=MODEL_PATH,
|
||||||
|
detection_interval=DETECTION_INTERVAL,
|
||||||
|
alarm_config=alarm_config,
|
||||||
|
output_stream_url=OUTPUT_STREAM_URL
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# 启动处理
|
||||||
|
threads = processor.start()
|
||||||
|
|
||||||
|
# 保持运行
|
||||||
|
logger.info("系统运行中,按Ctrl+C停止...")
|
||||||
|
while True:
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
logger.info("收到停止信号")
|
||||||
|
|
||||||
|
finally:
|
||||||
|
processor.stop()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
548
rtsp_processor_1.py
Normal file
548
rtsp_processor_1.py
Normal file
@ -0,0 +1,548 @@
|
|||||||
|
import cv2
|
||||||
|
import numpy as np
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
import queue
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from pathlib import Path
|
||||||
|
import subprocess
|
||||||
|
import logging
|
||||||
|
from typing import List, Dict, Tuple, Optional
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from collections import deque
|
||||||
|
import json
|
||||||
|
|
||||||
|
try:
|
||||||
|
from ultralytics import YOLO
|
||||||
|
except ImportError:
|
||||||
|
print("请安装ultralytics: pip install ultralytics")
|
||||||
|
raise
|
||||||
|
|
||||||
|
# 配置日志
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
||||||
|
)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class DetectionResult:
|
||||||
|
"""检测结果数据类"""
|
||||||
|
boxes: np.ndarray
|
||||||
|
confidences: np.ndarray
|
||||||
|
class_ids: np.ndarray
|
||||||
|
class_names: List[str]
|
||||||
|
timestamp: datetime
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class AlarmConfig:
|
||||||
|
"""告警配置"""
|
||||||
|
target_classes: List[str] # 目标类别
|
||||||
|
confidence_threshold: float = 0.5 # 置信度阈值
|
||||||
|
alarm_duration: int = 10 # 告警录制时长(秒)
|
||||||
|
cooldown_duration: int = 30 # 告警冷却时间(秒)
|
||||||
|
save_path: str = "./alarm_videos" # 保存路径
|
||||||
|
|
||||||
|
class FrameBuffer:
|
||||||
|
"""帧缓冲区,用于告警录制"""
|
||||||
|
def __init__(self, max_duration: int = 60, fps: int = 25):
|
||||||
|
self.max_frames = max_duration * fps
|
||||||
|
self.buffer = deque(maxlen=self.max_frames)
|
||||||
|
self.fps = fps
|
||||||
|
|
||||||
|
def add_frame(self, frame: np.ndarray, timestamp: datetime):
|
||||||
|
self.buffer.append((frame.copy(), timestamp))
|
||||||
|
|
||||||
|
def get_frames_in_range(self, start_time: datetime, duration: int) -> List[Tuple[np.ndarray, datetime]]:
|
||||||
|
"""获取指定时间范围内的帧"""
|
||||||
|
end_time = start_time + timedelta(seconds=duration)
|
||||||
|
frames = []
|
||||||
|
|
||||||
|
for frame, timestamp in self.buffer:
|
||||||
|
if start_time <= timestamp <= end_time:
|
||||||
|
frames.append((frame, timestamp))
|
||||||
|
|
||||||
|
return frames
|
||||||
|
|
||||||
|
class YOLODetector:
|
||||||
|
"""YOLO检测器"""
|
||||||
|
def __init__(self, model_path: str = "yolov8n.pt"):
|
||||||
|
self.model = YOLO(model_path)
|
||||||
|
self.class_names = self.model.names
|
||||||
|
|
||||||
|
def detect(self, frame: np.ndarray, confidence_threshold: float = 0.5) -> DetectionResult:
|
||||||
|
"""执行目标检测"""
|
||||||
|
results = self.model(frame, conf=confidence_threshold, verbose=False)
|
||||||
|
|
||||||
|
if len(results) > 0 and results[0].boxes is not None:
|
||||||
|
boxes = results[0].boxes.xyxy.cpu().numpy()
|
||||||
|
confidences = results[0].boxes.conf.cpu().numpy()
|
||||||
|
class_ids = results[0].boxes.cls.cpu().numpy().astype(int)
|
||||||
|
class_names = [self.class_names[id] for id in class_ids]
|
||||||
|
else:
|
||||||
|
boxes = np.array([])
|
||||||
|
confidences = np.array([])
|
||||||
|
class_ids = np.array([])
|
||||||
|
class_names = []
|
||||||
|
|
||||||
|
return DetectionResult(
|
||||||
|
boxes=boxes,
|
||||||
|
confidences=confidences,
|
||||||
|
class_ids=class_ids,
|
||||||
|
class_names=class_names,
|
||||||
|
timestamp=datetime.now()
|
||||||
|
)
|
||||||
|
|
||||||
|
class AlarmManager:
|
||||||
|
"""告警管理器"""
|
||||||
|
def __init__(self, config: AlarmConfig):
|
||||||
|
self.config = config
|
||||||
|
self.last_alarm_time = None
|
||||||
|
self.is_alarming = False
|
||||||
|
self.alarm_start_time = None
|
||||||
|
|
||||||
|
# 创建保存目录
|
||||||
|
Path(config.save_path).mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
def check_alarm_trigger(self, detection_result: DetectionResult) -> bool:
|
||||||
|
"""检查是否触发告警"""
|
||||||
|
current_time = datetime.now()
|
||||||
|
|
||||||
|
# 检查冷却时间
|
||||||
|
if (self.last_alarm_time and
|
||||||
|
current_time - self.last_alarm_time < timedelta(seconds=self.config.cooldown_duration)):
|
||||||
|
return False
|
||||||
|
|
||||||
|
# 检查是否检测到目标类别
|
||||||
|
for class_name, confidence in zip(detection_result.class_names, detection_result.confidences):
|
||||||
|
if (class_name in self.config.target_classes and
|
||||||
|
confidence >= self.config.confidence_threshold):
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
def start_alarm(self) -> bool:
|
||||||
|
"""开始告警"""
|
||||||
|
if not self.is_alarming:
|
||||||
|
self.is_alarming = True
|
||||||
|
self.alarm_start_time = datetime.now()
|
||||||
|
logger.info(f"告警开始: {self.alarm_start_time}")
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def should_stop_alarm(self) -> bool:
|
||||||
|
"""检查是否应该停止告警"""
|
||||||
|
if self.is_alarming and self.alarm_start_time:
|
||||||
|
duration = (datetime.now() - self.alarm_start_time).total_seconds()
|
||||||
|
return duration >= self.config.alarm_duration
|
||||||
|
return False
|
||||||
|
|
||||||
|
def stop_alarm(self):
|
||||||
|
"""停止告警"""
|
||||||
|
if self.is_alarming:
|
||||||
|
self.is_alarming = False
|
||||||
|
self.last_alarm_time = datetime.now()
|
||||||
|
logger.info(f"告警结束: {self.last_alarm_time}")
|
||||||
|
|
||||||
|
def save_alarm_video(self, frames: List[Tuple[np.ndarray, datetime]], fps: int = 25):
|
||||||
|
"""保存告警视频"""
|
||||||
|
if not frames:
|
||||||
|
logger.warning("没有帧数据可保存")
|
||||||
|
return None
|
||||||
|
|
||||||
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||||
|
filename = f"alarm_{timestamp}.mp4"
|
||||||
|
filepath = Path(self.config.save_path) / filename
|
||||||
|
|
||||||
|
# 获取帧尺寸
|
||||||
|
height, width = frames[0][0].shape[:2]
|
||||||
|
|
||||||
|
# 创建视频写入器
|
||||||
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
|
||||||
|
out = cv2.VideoWriter(str(filepath), fourcc, fps, (width, height))
|
||||||
|
|
||||||
|
try:
|
||||||
|
for frame, _ in frames:
|
||||||
|
out.write(frame)
|
||||||
|
|
||||||
|
logger.info(f"告警视频已保存: {filepath}")
|
||||||
|
return str(filepath)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"保存视频失败: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
finally:
|
||||||
|
out.release()
|
||||||
|
|
||||||
|
class StreamServer:
|
||||||
|
"""流媒体服务器接口"""
|
||||||
|
def __init__(self, output_url: str = "rtmp://localhost:1935/live/stream", use_gpu: bool = True):
|
||||||
|
self.output_url = output_url
|
||||||
|
self.ffmpeg_process = None
|
||||||
|
self.use_gpu = use_gpu
|
||||||
|
self.gpu_encoder = None
|
||||||
|
|
||||||
|
def detect_gpu_encoder(self):
|
||||||
|
"""检测可用的GPU编码器"""
|
||||||
|
# 检测NVIDIA GPU编码器优先级
|
||||||
|
gpu_encoders = ['h264_nvenc', 'hevc_nvenc', 'av1_nvenc']
|
||||||
|
|
||||||
|
for encoder in gpu_encoders:
|
||||||
|
try:
|
||||||
|
# 测试编码器是否可用
|
||||||
|
test_cmd = ['ffmpeg', '-hide_banner', '-f', 'lavfi', '-i', 'testsrc2=size=320x240:duration=1',
|
||||||
|
'-c:v', encoder, '-f', 'null', '-']
|
||||||
|
result = subprocess.run(test_cmd, capture_output=True, timeout=10)
|
||||||
|
if result.returncode == 0:
|
||||||
|
logger.info(f"检测到可用的GPU编码器: {encoder}")
|
||||||
|
return encoder
|
||||||
|
except (subprocess.TimeoutExpired, FileNotFoundError, subprocess.SubprocessError):
|
||||||
|
continue
|
||||||
|
|
||||||
|
logger.warning("未检测到可用的GPU编码器,将使用CPU编码")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def start_streaming(self, width: int, height: int, fps: int = 25):
|
||||||
|
"""启动流媒体推送"""
|
||||||
|
# 检测GPU编码器
|
||||||
|
if self.use_gpu:
|
||||||
|
self.gpu_encoder = self.detect_gpu_encoder()
|
||||||
|
|
||||||
|
# 构建FFmpeg命令
|
||||||
|
ffmpeg_cmd = [
|
||||||
|
'ffmpeg',
|
||||||
|
'-y', # 覆盖输出文件
|
||||||
|
'-f', 'rawvideo',
|
||||||
|
'-vcodec', 'rawvideo',
|
||||||
|
'-pix_fmt', 'bgr24',
|
||||||
|
'-s', f'{width}x{height}',
|
||||||
|
'-r', str(fps),
|
||||||
|
'-i', '-', # 从stdin读取
|
||||||
|
]
|
||||||
|
|
||||||
|
# GPU硬件加速配置
|
||||||
|
if self.gpu_encoder:
|
||||||
|
if 'nvenc' in self.gpu_encoder:
|
||||||
|
# NVIDIA GPU编码配置
|
||||||
|
ffmpeg_cmd.extend([
|
||||||
|
'-c:v', self.gpu_encoder,
|
||||||
|
'-pix_fmt', 'yuv420p',
|
||||||
|
# NVENC特定参数
|
||||||
|
'-preset', 'p1', # 最快预设 (p1-p7, p1最快)
|
||||||
|
'-tune', 'll', # 低延迟调优
|
||||||
|
'-rc', 'cbr', # 恒定码率控制
|
||||||
|
'-b:v', '2M', # 视频码率
|
||||||
|
'-maxrate', '2M', # 最大码率
|
||||||
|
'-bufsize', '4M', # 缓冲区大小
|
||||||
|
'-g', str(fps * 2), # GOP大小,2秒
|
||||||
|
'-keyint_min', str(fps), # 最小关键帧间隔
|
||||||
|
'-bf', '0', # B帧数量(低延迟设为0)
|
||||||
|
'-refs', '1', # 参考帧数量
|
||||||
|
'-spatial_aq', '1', # 空间自适应量化
|
||||||
|
'-temporal_aq', '1', # 时间自适应量化
|
||||||
|
'-rc-lookahead', '8', # 前瞻帧数
|
||||||
|
'-surfaces', '8', # 编码表面数量
|
||||||
|
])
|
||||||
|
else:
|
||||||
|
# 其他GPU编码器的通用配置
|
||||||
|
ffmpeg_cmd.extend([
|
||||||
|
'-c:v', self.gpu_encoder,
|
||||||
|
'-pix_fmt', 'yuv420p',
|
||||||
|
'-preset', 'ultrafast',
|
||||||
|
'-b:v', '2M',
|
||||||
|
])
|
||||||
|
else:
|
||||||
|
# CPU编码配置(回退方案)
|
||||||
|
ffmpeg_cmd.extend([
|
||||||
|
'-c:v', 'libx264',
|
||||||
|
'-pix_fmt', 'yuv420p',
|
||||||
|
'-preset', 'ultrafast',
|
||||||
|
'-tune', 'zerolatency', # 零延迟调优
|
||||||
|
'-crf', '23', # 恒定质量因子
|
||||||
|
'-maxrate', '2M', # 最大码率
|
||||||
|
'-bufsize', '4M', # 缓冲区大小
|
||||||
|
])
|
||||||
|
|
||||||
|
# 输出格式和URL
|
||||||
|
ffmpeg_cmd.extend([
|
||||||
|
'-f', 'flv',
|
||||||
|
self.output_url
|
||||||
|
])
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.ffmpeg_process = subprocess.Popen(
|
||||||
|
ffmpeg_cmd,
|
||||||
|
stdin=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE,
|
||||||
|
bufsize=0 # 无缓冲,降低延迟
|
||||||
|
)
|
||||||
|
|
||||||
|
encoder_info = self.gpu_encoder if self.gpu_encoder else "libx264 (CPU)"
|
||||||
|
logger.info(f"流媒体服务启动: {self.output_url}, 编码器: {encoder_info}")
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"启动流媒体失败: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def send_frame(self, frame: np.ndarray):
|
||||||
|
"""发送帧到流媒体服务器"""
|
||||||
|
if self.ffmpeg_process and self.ffmpeg_process.stdin:
|
||||||
|
try:
|
||||||
|
self.ffmpeg_process.stdin.write(frame.tobytes())
|
||||||
|
self.ffmpeg_process.stdin.flush()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"发送帧失败: {e}")
|
||||||
|
|
||||||
|
def stop_streaming(self):
|
||||||
|
"""停止流媒体推送"""
|
||||||
|
if self.ffmpeg_process:
|
||||||
|
self.ffmpeg_process.stdin.close()
|
||||||
|
self.ffmpeg_process.wait()
|
||||||
|
self.ffmpeg_process = None
|
||||||
|
logger.info("流媒体服务已停止")
|
||||||
|
|
||||||
|
class RTSPProcessor:
|
||||||
|
"""RTSP流处理器"""
|
||||||
|
def __init__(self,
|
||||||
|
rtsp_url: str,
|
||||||
|
model_path: str = "yolov8n.pt",
|
||||||
|
detection_interval: int = 5,
|
||||||
|
alarm_config: AlarmConfig = None,
|
||||||
|
output_stream_url: str = "rtmp://localhost:1935/live/stream"):
|
||||||
|
|
||||||
|
self.rtsp_url = rtsp_url
|
||||||
|
self.detection_interval = detection_interval
|
||||||
|
self.frame_count = 0
|
||||||
|
self.running = False
|
||||||
|
|
||||||
|
# 初始化组件
|
||||||
|
self.detector = YOLODetector(model_path)
|
||||||
|
self.alarm_manager = AlarmManager(alarm_config or AlarmConfig(target_classes=["person"]))
|
||||||
|
self.frame_buffer = FrameBuffer()
|
||||||
|
self.stream_server = StreamServer(output_stream_url, use_gpu=True) # 启用GPU加速
|
||||||
|
|
||||||
|
# 最后的检测结果
|
||||||
|
self.last_detection = None
|
||||||
|
|
||||||
|
# 线程安全队列
|
||||||
|
self.frame_queue = queue.Queue(maxsize=100)
|
||||||
|
|
||||||
|
def draw_detections(self, frame: np.ndarray, detection_result: DetectionResult) -> np.ndarray:
|
||||||
|
"""在帧上绘制检测结果"""
|
||||||
|
if detection_result is None or len(detection_result.boxes) == 0:
|
||||||
|
return frame
|
||||||
|
|
||||||
|
annotated_frame = frame.copy()
|
||||||
|
|
||||||
|
for box, confidence, class_name in zip(
|
||||||
|
detection_result.boxes,
|
||||||
|
detection_result.confidences,
|
||||||
|
detection_result.class_names
|
||||||
|
):
|
||||||
|
x1, y1, x2, y2 = box.astype(int)
|
||||||
|
|
||||||
|
# 绘制边界框
|
||||||
|
color = (0, 255, 0) # 绿色
|
||||||
|
if class_name in self.alarm_manager.config.target_classes:
|
||||||
|
color = (0, 0, 255) # 目标类别用红色
|
||||||
|
|
||||||
|
cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), color, 2)
|
||||||
|
|
||||||
|
# 绘制标签
|
||||||
|
label = f"{class_name}: {confidence:.2f}"
|
||||||
|
label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)[0]
|
||||||
|
cv2.rectangle(annotated_frame, (x1, y1 - label_size[1] - 10),
|
||||||
|
(x1 + label_size[0], y1), color, -1)
|
||||||
|
cv2.putText(annotated_frame, label, (x1, y1 - 5),
|
||||||
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
|
||||||
|
|
||||||
|
# 添加告警状态显示
|
||||||
|
if self.alarm_manager.is_alarming:
|
||||||
|
cv2.putText(annotated_frame, "ALARM ACTIVE", (10, 30),
|
||||||
|
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 3)
|
||||||
|
|
||||||
|
# 添加时间戳
|
||||||
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
cv2.putText(annotated_frame, timestamp, (10, annotated_frame.shape[0] - 10),
|
||||||
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
|
||||||
|
|
||||||
|
return annotated_frame
|
||||||
|
|
||||||
|
def process_frame(self, frame: np.ndarray) -> np.ndarray:
|
||||||
|
"""处理单帧"""
|
||||||
|
current_time = datetime.now()
|
||||||
|
detection_result = None
|
||||||
|
|
||||||
|
# 按间隔进行检测
|
||||||
|
if self.frame_count % self.detection_interval == 0:
|
||||||
|
detection_result = self.detector.detect(frame,
|
||||||
|
self.alarm_manager.config.confidence_threshold)
|
||||||
|
self.last_detection = detection_result
|
||||||
|
|
||||||
|
# 检查告警触发
|
||||||
|
if self.alarm_manager.check_alarm_trigger(detection_result):
|
||||||
|
if self.alarm_manager.start_alarm():
|
||||||
|
# 告警开始时的处理
|
||||||
|
pass
|
||||||
|
|
||||||
|
# 使用最后的检测结果绘制
|
||||||
|
annotated_frame = self.draw_detections(frame, self.last_detection)
|
||||||
|
|
||||||
|
# 添加帧到缓冲区
|
||||||
|
self.frame_buffer.add_frame(annotated_frame, current_time)
|
||||||
|
|
||||||
|
# 检查告警结束
|
||||||
|
if self.alarm_manager.should_stop_alarm():
|
||||||
|
# 获取告警期间的帧
|
||||||
|
alarm_frames = self.frame_buffer.get_frames_in_range(
|
||||||
|
self.alarm_manager.alarm_start_time,
|
||||||
|
self.alarm_manager.config.alarm_duration
|
||||||
|
)
|
||||||
|
|
||||||
|
# 保存告警视频
|
||||||
|
if alarm_frames:
|
||||||
|
threading.Thread(
|
||||||
|
target=self.alarm_manager.save_alarm_video,
|
||||||
|
args=(alarm_frames, 25),
|
||||||
|
daemon=True
|
||||||
|
).start()
|
||||||
|
|
||||||
|
self.alarm_manager.stop_alarm()
|
||||||
|
|
||||||
|
return annotated_frame
|
||||||
|
|
||||||
|
def capture_frames(self):
|
||||||
|
"""捕获RTSP流帧"""
|
||||||
|
cap = cv2.VideoCapture(self.rtsp_url)
|
||||||
|
|
||||||
|
if not cap.isOpened():
|
||||||
|
logger.error(f"无法打开RTSP流: {self.rtsp_url}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# 设置缓冲区大小
|
||||||
|
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
|
||||||
|
|
||||||
|
logger.info(f"开始捕获RTSP流: {self.rtsp_url}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
while self.running:
|
||||||
|
ret, frame = cap.read()
|
||||||
|
if not ret:
|
||||||
|
logger.warning("读取帧失败,尝试重连...")
|
||||||
|
cap.release()
|
||||||
|
time.sleep(5)
|
||||||
|
cap = cv2.VideoCapture(self.rtsp_url)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not self.frame_queue.full():
|
||||||
|
self.frame_queue.put(frame)
|
||||||
|
else:
|
||||||
|
# 丢弃最旧的帧
|
||||||
|
try:
|
||||||
|
self.frame_queue.get_nowait()
|
||||||
|
self.frame_queue.put(frame)
|
||||||
|
except queue.Empty:
|
||||||
|
pass
|
||||||
|
|
||||||
|
finally:
|
||||||
|
cap.release()
|
||||||
|
logger.info("RTSP捕获已停止")
|
||||||
|
|
||||||
|
def process_frames(self):
|
||||||
|
"""处理帧线程"""
|
||||||
|
first_frame = True
|
||||||
|
|
||||||
|
while self.running:
|
||||||
|
try:
|
||||||
|
frame = self.frame_queue.get(timeout=1)
|
||||||
|
|
||||||
|
# 初始化流媒体服务器
|
||||||
|
if first_frame:
|
||||||
|
height, width = frame.shape[:2]
|
||||||
|
if self.stream_server.start_streaming(width, height):
|
||||||
|
first_frame = False
|
||||||
|
else:
|
||||||
|
logger.error("流媒体服务器启动失败")
|
||||||
|
break
|
||||||
|
|
||||||
|
# 处理帧
|
||||||
|
processed_frame = self.process_frame(frame)
|
||||||
|
|
||||||
|
# 发送到流媒体服务器
|
||||||
|
self.stream_server.send_frame(processed_frame)
|
||||||
|
|
||||||
|
self.frame_count += 1
|
||||||
|
|
||||||
|
except queue.Empty:
|
||||||
|
continue
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"处理帧时出错: {e}")
|
||||||
|
|
||||||
|
logger.info("帧处理已停止")
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
"""启动处理器"""
|
||||||
|
self.running = True
|
||||||
|
|
||||||
|
# 启动捕获线程
|
||||||
|
capture_thread = threading.Thread(target=self.capture_frames, daemon=True)
|
||||||
|
capture_thread.start()
|
||||||
|
|
||||||
|
# 启动处理线程
|
||||||
|
process_thread = threading.Thread(target=self.process_frames, daemon=True)
|
||||||
|
process_thread.start()
|
||||||
|
|
||||||
|
logger.info("RTSP处理器已启动")
|
||||||
|
|
||||||
|
return capture_thread, process_thread
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""停止处理器"""
|
||||||
|
self.running = False
|
||||||
|
self.stream_server.stop_streaming()
|
||||||
|
logger.info("RTSP处理器已停止")
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""主函数"""
|
||||||
|
# 配置参数
|
||||||
|
RTSP_URL = "rtsp://10.0.0.17:8554/camera_test/2" # 替换为实际RTSP地址
|
||||||
|
MODEL_PATH = "yolov8n.pt" # YOLO模型路径
|
||||||
|
DETECTION_INTERVAL = 5 # 每5帧检测一次
|
||||||
|
OUTPUT_STREAM_URL = "rtmp://localhost:1935/live/processed" # 输出流地址
|
||||||
|
|
||||||
|
# 告警配置
|
||||||
|
alarm_config = AlarmConfig(
|
||||||
|
target_classes=["person", "car", "truck"], # 目标类别
|
||||||
|
confidence_threshold=0.6,
|
||||||
|
alarm_duration=15, # 告警录制15秒
|
||||||
|
cooldown_duration=60, # 冷却60秒
|
||||||
|
save_path="./alarm_videos"
|
||||||
|
)
|
||||||
|
|
||||||
|
# 创建处理器
|
||||||
|
processor = RTSPProcessor(
|
||||||
|
rtsp_url=RTSP_URL,
|
||||||
|
model_path=MODEL_PATH,
|
||||||
|
detection_interval=DETECTION_INTERVAL,
|
||||||
|
alarm_config=alarm_config,
|
||||||
|
output_stream_url=OUTPUT_STREAM_URL
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# 启动处理
|
||||||
|
threads = processor.start()
|
||||||
|
|
||||||
|
# 保持运行
|
||||||
|
logger.info("系统运行中,按Ctrl+C停止...")
|
||||||
|
while True:
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
logger.info("收到停止信号")
|
||||||
|
|
||||||
|
finally:
|
||||||
|
processor.stop()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Loading…
Reference in New Issue
Block a user