添加rtsp断流重连功能

This commit is contained in:
haotian 2025-09-17 11:21:52 +08:00
parent e09432f0e4
commit cfc8f80954

View File

@ -313,45 +313,45 @@ class RTSPProcessor:
detection_interval: int = 5,
alarm_config: AlarmConfig = None,
output_stream_url: str = "rtmp://localhost:1935/live/stream"):
self.rtsp_url = rtsp_url
self.detection_interval = detection_interval
self.frame_count = 0
self.running = False
# 初始化组件
self.detector = YOLODetector(model_path)
self.alarm_manager = AlarmManager(alarm_config or AlarmConfig(target_classes=["person"]))
self.frame_buffer = FrameBuffer()
self.stream_server = StreamServer(output_stream_url, use_gpu=True) # 启用GPU加速
# 最后的检测结果
self.last_detection = None
# 线程安全队列
self.frame_queue = queue.Queue(maxsize=100)
def draw_detections(self, frame: np.ndarray, detection_result: DetectionResult) -> np.ndarray:
"""在帧上绘制检测结果"""
if detection_result is None or len(detection_result.boxes) == 0:
return frame
annotated_frame = frame.copy()
for box, confidence, class_name in zip(
detection_result.boxes,
detection_result.confidences,
detection_result.class_names
):
x1, y1, x2, y2 = box.astype(int)
# 绘制边界框
color = (0, 255, 0) # 绿色
if class_name in self.alarm_manager.config.target_classes:
color = (0, 0, 255) # 目标类别用红色
cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), color, 2)
# 绘制标签
label = f"{class_name}: {confidence:.2f}"
label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)[0]
@ -359,42 +359,42 @@ class RTSPProcessor:
(x1 + label_size[0], y1), color, -1)
cv2.putText(annotated_frame, label, (x1, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
# 添加告警状态显示
if self.alarm_manager.is_alarming:
cv2.putText(annotated_frame, "ALARM ACTIVE", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 3)
# 添加时间戳
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
cv2.putText(annotated_frame, timestamp, (10, annotated_frame.shape[0] - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
return annotated_frame
def process_frame(self, frame: np.ndarray) -> np.ndarray:
"""处理单帧"""
current_time = datetime.now()
detection_result = None
# 按间隔进行检测
if self.frame_count % self.detection_interval == 0:
detection_result = self.detector.detect(frame,
self.alarm_manager.config.confidence_threshold)
self.last_detection = detection_result
# 检查告警触发
if self.alarm_manager.check_alarm_trigger(detection_result):
if self.alarm_manager.start_alarm():
# 告警开始时的处理
pass
# 使用最后的检测结果绘制
annotated_frame = self.draw_detections(frame, self.last_detection)
# 添加帧到缓冲区
self.frame_buffer.add_frame(annotated_frame, current_time)
# 检查告警结束
if self.alarm_manager.should_stop_alarm():
# 获取告警期间的帧
@ -402,7 +402,7 @@ class RTSPProcessor:
self.alarm_manager.alarm_start_time,
self.alarm_manager.config.alarm_duration
)
# 保存告警视频
if alarm_frames:
threading.Thread(
@ -410,65 +410,65 @@ class RTSPProcessor:
args=(alarm_frames, 25),
daemon=True
).start()
self.alarm_manager.stop_alarm()
return annotated_frame
def connect_to_rtsp_stream(self, url):
""" 尝试连接到 RTSP 流 """
cap = cv2.VideoCapture(url)
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'HEVC'))
# cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'HEVC'))
if not cap.isOpened():
logger.error(f"Failed to connect to {url}")
return None
return cap
def capture_frames(self):
"""捕获RTSP流帧"""
cap = cv2.VideoCapture(self.rtsp_url)
if not cap.isOpened():
logger.error(f"无法打开RTSP流: {self.rtsp_url}")
return
cap = self.connect_to_rtsp_stream(self.rtsp_url)
# 设置缓冲区大小
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
logger.info(f"开始捕获RTSP流: {self.rtsp_url}")
try:
while self.running:
ret, frame = cap.read()
if not ret:
logger.warning("读取帧失败,尝试重连...")
cap.release()
time.sleep(5)
cap = cv2.VideoCapture(self.rtsp_url)
continue
if not self.frame_queue.full():
self.frame_queue.put(frame)
else:
# 丢弃最旧的帧
try:
self.frame_queue.get_nowait()
if cap:
ret, frame = cap.read()
if not ret:
logger.warning("读取帧失败,尝试重连...")
cap.release()
time.sleep(1)
cap = None
continue
if not self.frame_queue.full():
self.frame_queue.put(frame)
except queue.Empty:
pass
else:
# 丢弃最旧的帧
try:
self.frame_queue.get_nowait()
self.frame_queue.put(frame)
except queue.Empty:
pass
else:
cap = self.connect_to_rtsp_stream(self.rtsp_url)
time.sleep(5)
finally:
cap.release()
logger.info("RTSP捕获已停止")
def process_frames(self):
"""处理帧线程"""
first_frame = True
while self.running:
try:
frame = self.frame_queue.get(timeout=1)
# 初始化流媒体服务器
if first_frame:
height, width = frame.shape[:2]
@ -477,38 +477,38 @@ class RTSPProcessor:
else:
logger.error("流媒体服务器启动失败")
break
# 处理帧
processed_frame = self.process_frame(frame)
# 发送到流媒体服务器
self.stream_server.send_frame(processed_frame)
self.frame_count += 1
except queue.Empty:
continue
except Exception as e:
logger.error(f"处理帧时出错: {e}")
logger.info("帧处理已停止")
def start(self):
"""启动处理器"""
self.running = True
# 启动捕获线程
capture_thread = threading.Thread(target=self.capture_frames, daemon=True)
capture_thread.start()
# 启动处理线程
process_thread = threading.Thread(target=self.process_frames, daemon=True)
process_thread.start()
logger.info("RTSP处理器已启动")
return capture_thread, process_thread
def stop(self):
"""停止处理器"""
self.running = False