duan8/v7_p1.py
2025-01-08 17:56:55 +08:00

958 lines
40 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import ctypes
import os
import shutil
import random
import sys
import threading
import time
import cv2
import numpy as np
import pycuda.autoinit
import pycuda.driver as cuda
import tensorrt as trt
import queue
from minio import Minio
import yaml
import threading
import subprocess
import uuid
import requests
import json
import datetime
from PIL import Image
CONF_THRESH = 0.6
IOU_THRESHOLD = 0.6
with open('config.yaml', 'r') as file:
configData = yaml.safe_load(file)
# 实例化
client = Minio(
endpoint=configData['minioConfig']['endpoint'],
access_key=configData['minioConfig']['access_key'],
secret_key=configData['minioConfig']['secret_key'],
secure=configData['minioConfig']['secure']
)
# 保存token
tokenResult = {}
getTokenUrl = configData['dataConfig']['getTokenUrl']
putMessageUrl = configData['dataConfig']['putMessageUrl']
ip = configData['video_config']['v1_ip']
vod_path = configData['video_config']['v1_path']
vod_channelNo = configData['video_config']['v1_channelNo']
time_interval = configData['dataConfig']['timeInterval']
command_mid = [
'ffmpeg',
'-i', '-', # 从标准输入读取视频帧
'-c:v', 'libx264', # 使用 H.264 编码
'-b:v', '500k', # 设置视频比特率
'-preset', 'superfast', # 编码速度
'-tune', 'zerolatency', # 低延迟
'-crf', '23', # 使用 CRF 模式来控制视频质量
'-s', '1280x720', # 设置分辨率
'-an', # 禁用音频
# '-f', 'flv', # 输出格式
# 'rtmp://127.0.0.1:1935/live/1' # 输出到 RTMP 服务器
'-hls_time', '4',
'-hls_list_size', '2',
'-hls_flags', 'delete_segments',
'-f', 'hls',
# 'level', 'error',
'/home/admin-root/hls_data/mid/' + ip + '/index.m3u8'
]
pipeline_mid = subprocess.Popen(command_mid, shell=False, stdin=subprocess.PIPE)
frames = [None] * 6
rtsp_frame_buffer = queue.Queue(maxsize=300)
# 获取token和对应时间 存入字典
def get_token(tokenResult):
if 'token' in tokenResult and 'current_time' in tokenResult:
token_time = datetime.datetime.strptime(tokenResult['current_time'],
"%Y-%m-%d %H:%M:%S")
current_time = datetime.datetime.now()
time_diff = current_time - token_time
if time_diff.total_seconds() > 20 * 60:
# 过期重新请求 token
# print("token 已过期")
response = requests.post(getTokenUrl)
if response.status_code == 200:
data = json.loads(response.text)
if 'retCode' in data and data['retCode'] == '200':
token = data['responseBody']['token']
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
tokenResult['token'] = token
tokenResult['current_time'] = current_time
else:
tokenResult['error'] = data['errorDesc']
else:
tokenResult['error'] = response.status_code
token = tokenResult['token']
return token
def send_post_request(url, token, msg, picUrl, videoUrl):
payload = {
"tenantCode": "220",
"channelNo": vod_channelNo,
"alarmContent": msg,
"alarmTime": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"picInfo": [
{"url": picUrl}
],
"videoInfo": [
{"url": videoUrl}
]
}
headers = {
'X-Access-Token': token,
'Content-Type': 'application/json'
}
response = requests.post(url, headers=headers, data=json.dumps(payload))
print(response)
def clear_folder(ip):
folder_path1 = f'/home/admin-root/hls_data/mid/{ip}'
# 判断文件夹是否存在
if not os.path.exists(folder_path1):
print(f"文件夹 {folder_path1} 不存在!")
return
# 判断文件夹是否为空
if not os.listdir(folder_path1):
print(f"文件夹 {folder_path1} 为空,无需清空!")
else:
# 清空文件夹1
for filename in os.listdir(folder_path1):
file_path = os.path.join(folder_path1, filename)
if os.path.isfile(file_path):
os.remove(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
print(f"已清空文件夹 {folder_path1} 的内容!")
def restart_program():
"""重新启动当前程序"""
python = sys.executable # 获取当前 Python 解释器的路径
# print("Restarting program...")
time.sleep(1) # 可选延迟,确保用户看到提示
os.execl(python, python, *sys.argv) # 使用相同的参数重新启动当前脚本
def verify_bbox_class(classid_list, save_flag):
if "face" in classid_list:
save_flag.append("face")
return True
if "shoe" in classid_list:
save_flag.append("shoe")
return True
if "phone" in classid_list:
save_flag.append("phone")
return True
return False
# after_time 距离 before_time 是否在 time_num秒 之内,若在 返回true 不在返回false
def verify_timenum(before_time, after_time, time_num):
time_difference = after_time - before_time
if time_difference < time_num:
return True
else:
return False
# 该方法用于判断两个帧是否相同
def compare_frames(frame1, frame2, threshold):
difference = cv2.absdiff(frame1, frame2)
diff_gray = cv2.cvtColor(difference, cv2.COLOR_BGR2GRAY)
_, thresholded_diff = cv2.threshold(diff_gray, threshold, 350, cv2.THRESH_BINARY)
return np.sum(thresholded_diff) == 0
def skin_color_ratio(image):
# 将图像从BGR颜色空间转换为HSV颜色空间
# 定义肤色的HSV范围在此示例中使用了一组简单的范围
hsv_image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
lower_skin = np.array([0, 20, 70], dtype=np.uint8)
upper_skin = np.array([20, 255, 255], dtype=np.uint8)
# 创建遮罩,将肤色区域设置为白色,其他区域设置为黑色
skin_mask = cv2.inRange(hsv_image, lower_skin, upper_skin)
# 计算肤色区域的像素数
skin_pixels = np.count_nonzero(skin_mask)
# 计算图像中人体肤色的占比
total_pixels = image.shape[0] * image.shape[1]
skin_ratio = skin_pixels / total_pixels
return skin_ratio
def white_color_ratio(image):
# 将图像从BGR颜色空间转换为HSV颜色空间
hsv_image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
# 定义白色的HSV范围在此示例中使用了一组简单的范围
lower_white = np.array([0, 0, 221], dtype=np.uint8)
upper_white = np.array([180, 30, 255], dtype=np.uint8)
# 创建遮罩,将白色和白灰色区域设置为白色,其他区域设置为黑色
white_mask = cv2.inRange(hsv_image, lower_white, upper_white)
# 计算白色和白灰色区域的像素数
white_pixels = np.count_nonzero(white_mask)
# 计算图像中白色和白灰色的占比
total_pixels = image.shape[0] * image.shape[1]
white_ratio = white_pixels / total_pixels
return white_ratio
def shoe_color_ratio(image):
# 将图像从BGR颜色空间转换为HSV颜色空间
hsv_image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
# 定义黑色的HSV范围在此示例中使用了一组简单的范围
lower_black = np.array([0, 0, 0], dtype=np.uint8)
upper_black = np.array([180, 255, 80], dtype=np.uint8)
# 创建遮罩,将黑色区域设置为白色,其他区域设置为黑色
black_mask = cv2.inRange(hsv_image, lower_black, upper_black)
# 计算黑色区域的像素数
black_pixels = np.count_nonzero(black_mask)
# 计算图像中黑色的占比
total_pixels = image.shape[0] * image.shape[1]
black_ratio = black_pixels / total_pixels
return black_ratio
def get_img_path_batches(batch_size, img_dir):
ret = []
batch = []
for root, dirs, files in os.walk(img_dir):
for name in files:
if len(batch) == batch_size:
ret.append(batch)
batch = []
batch.append(os.path.join(root, name))
if len(batch) > 0:
ret.append(batch)
return ret
# 画框 返回True 为需要报警的选项返回Flase为不需要报警的
def plot_one_box(x, img, color=[0, 255, 0], label=None, line_thickness=None):
tl = (
line_thickness or round(0.002 * (img.shape[0] + img.shape[1]) / 2) + 1
) # line/font thickness
# tl = 1
c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3]))
if label == 'face':
region_of_interest = img[c1[1]:c2[1], c1[0]:c2[0]]
face_source = skin_color_ratio(region_of_interest)
# white_sorce = white_color_ratio(region_of_interest)
# print("判断存在脸,且肤色为" + str(face_source))
# 肤色面积必须大于整体框的百分之70 框的宽度大于10
if face_source > 0.8:
# 画框
color = [0, 0, 255]
cv2.rectangle(img, c1, c2, color, thickness=1, lineType=cv2.LINE_AA)
# tf = max(tl - 1, 1)
# t_size = cv2.getTextSize('warning', 0, fontScale=tl / 4, thickness=tf)[0]
# c2 = c1[0] + t_size[0], c1[1] - t_size[1] - 11
# cv2.rectangle(img, c1, c2, color, -1, cv2.LINE_AA)
print("face")
# cv2.putText(
# img,
# 'warning',
# (c1[0], c1[1] - 10),
# 0,
# tl / 4,
# [225, 255, 255],
# thickness=tf,
# lineType=cv2.LINE_AA,
# )
return 'face'
if label == 'shoe':
print("识别到鞋")
region_of_interest = img[c1[1]:c2[1], c1[0]:c2[0]]
source = shoe_color_ratio(region_of_interest)
# white_sorce = white_color_ratio(region_of_interest)
print(f'黑色={source}')
# # 鞋子黑色面积必须大于整体框的百分之50 并且框的宽度小于40高度小于30
# color = [0, 0, 255]
# cv2.rectangle(img, c1, c2, color, thickness=1, lineType=cv2.LINE_AA)
# return 'shoe'
if source < 0.3:
# 画框
color = [0, 0, 255]
cv2.rectangle(img, c1, c2, color, thickness=1, lineType=cv2.LINE_AA)
# tf = max(tl - 1, 1)
# t_size = cv2.getTextSize('warning', 0, fontScale=tl / 4, thickness=tf)[0]
# c2 = c1[0] + t_size[0], c1[1] - t_size[1] - 11
# cv2.rectangle(img, c1, c2, color, -1, cv2.LINE_AA)
# cv2.putText(
# img,
# 'warning',
# (c1[0], c1[1] - 10),
# 0,
# tl / 4,
# [225, 255, 255],
# thickness=tf,
# lineType=cv2.LINE_AA,
# )
return 'shoe'
if label == 'phone':
print("识别到手机")
# 画框
color = [0, 0, 255]
cv2.rectangle(img, c1, c2, color, thickness=1, lineType=cv2.LINE_AA)
# tf = max(tl - 1, 1)
# t_size = cv2.getTextSize('warning', 0, fontScale=tl / 4, thickness=tf)[0]
# c2 = c1[0] + t_size[0], c1[1] - t_size[1] - 11
# cv2.rectangle(img, c1, c2, color, -1, cv2.LINE_AA)
# cv2.putText(
# img,
# 'warning',
# (c1[0], c1[1] - 10),
# 0,
# tl / 4,
# [225, 255, 255],
# thickness=tf,
# lineType=cv2.LINE_AA,
# )
return 'phone'
if label == 'e-bike':
print("识别到电动车")
# 画框
color = [0, 0, 255]
cv2.rectangle(img, c1, c2, color, thickness=1, lineType=cv2.LINE_AA)
return 'e-bike';
return ''
def check_save_flag(save_flag):
# 定义需要检查的类别
categories = ["face", "shoe", "phone", "e-bike"]
# 找出在save_flag中的类别
matched_categories = [category for category in categories if category in save_flag]
# 用'-'连接匹配的类别并返回
return "-".join(matched_categories)
# ip ,缓存buffer,脸或者鞋的标志列表,上传类型
class SaveAndUploadMP4Thread(threading.Thread):
def __init__(self, ip, frame_buffer, save_flag, save_frame_type_min, token=None):
threading.Thread.__init__(self)
self.ip = ip
self.frame_buffer = frame_buffer
self.save_flag = save_flag
self.save_frame_type_min = save_frame_type_min
self.temp_file_path = None
self.temp_file_size = None
self.minio_client = client
self.token = token
def run(self):
# putMsg = ''
# if "face" in self.save_flag:
# putMsg += "face"
# elif "shoe" in self.save_flag:
# putMsg += "shoe"
putMsg = check_save_flag(self.save_flag)
uuid_str = str(uuid.uuid4())[:6] # 生成UUID的前6位
# current_time = datetime.datetime.now().strftime("%Y-%m-%d %H-%M-%S") # 当前日期格式为YYYYMMDD
frames = []
while not self.frame_buffer.empty():
frames.append(self.frame_buffer.get())
# 短告警上传 保存图片
if len(frames) > 0 and self.save_frame_type_min is True:
print('短警告')
first_frame = frames[0]
img_file_path = f"/home/admin-root/220_project/tensorrtx-master/yolov8/mp4/{uuid_str}_{vod_channelNo}_{putMsg}.jpg"
img_object_name = "{uuid_str}_{vod_channelNo}_{putMsg}.jpg"
cv2.imwrite(img_file_path, first_frame)
with open(img_file_path, 'rb') as file_data:
file_data.seek(0, os.SEEK_END)
file_size = file_data.tell()
file_data.seek(0)
self.minio_client.put_object(configData['minioConfig']['bucket_name'], img_object_name, file_data,
file_size)
upload_http_url_img = configData['minioConfig']['bucket_name'] + '/' + img_object_name
os.remove(img_file_path)
print('上传图片成功')
send_post_request(putMessageUrl, self.token, putMsg, upload_http_url_img, '')
print("执行发送短警告完毕,警告信息:" + putMsg)
# 长警告
if len(frames) > 0 and self.save_frame_type_min is False:
print('长警告')
# 保存图片
img_file_path = f"/home/admin-root/220_project/tensorrtx-master/yolov8/mp4/{uuid_str}_{vod_channelNo}_{putMsg}.jpg"
img_object_name = f"{uuid_str}_{vod_channelNo}_{putMsg}.jpg"
first_frame = frames[0]
cv2.imwrite(img_file_path, first_frame)
with open(img_file_path, 'rb') as file_data:
file_data.seek(0, os.SEEK_END)
file_size = file_data.tell()
file_data.seek(0)
self.minio_client.put_object(configData['minioConfig']['bucket_name'], img_object_name, file_data,
file_size)
upload_http_url_img = configData['minioConfig']['bucket_name'] + '/' + img_object_name
os.remove(img_file_path)
print('上传图片成功')
send_post_request(putMessageUrl, self.token, putMsg, upload_http_url_img, '')
print("执行发送短警告完毕,警告信息:" + putMsg)
# 长告警上传 保存图片,MP4文件
self.temp_file_path = os.path.abspath(mp4_file_path)
self.temp_file_size = os.path.getsize(mp4_file_path)
mp4_file_path = f'/home/admin-root/220_project/tensorrtx-master/yolov8/mp4/{uuid_str}_{vod_channelNo}_{putMsg}.mp4'
mp4_object_name = f"{uuid_str}_{vod_channelNo}_{putMsg}.mp4"
height, width, _ = frames[0].shape
# 保存视频
out = cv2.VideoWriter(mp4_file_path, cv2.VideoWriter_fourcc(*'mp4v'), 25, (width, height))
for frame in frames:
out.write(frame)
out.release()
print('保存视频成功')
# with open(mp4_file_path, 'rb') as file_data:
with open(mp4_file_path, 'rb') as file_data:
# 上传到minio
self.minio_client.put_object(configData['minioConfig']['bucket_name'], mp4_object_name, file_data,
self.temp_file_size)
file_data.seek(0, os.SEEK_END)
file_size = file_data.tell()
file_data.seek(0)
self.minio_client.put_object(configData['minioConfig']['bucket_name'], img_object_name, file_data,
file_size)
print('上传图片视频成功')
upload_http_url_mp4 = configData['minioConfig']['bucket_name'] + '/' + mp4_object_name
upload_http_url_img = configData['minioConfig']['bucket_name'] + '/' + img_object_name
os.remove(mp4_file_path)
# os.remove(img_object_name)
print('mp4:', upload_http_url_mp4 + '\n', 'img:', upload_http_url_img)
# 发送
send_post_request(putMessageUrl, self.token, putMsg, upload_http_url_img, upload_http_url_mp4)
print("执行发送长警告完毕,警告信息:" + putMsg)
class FramePushThread(threading.Thread):
def __init__(self, frame_buffer, process_mid):
threading.Thread.__init__(self)
self.frame_buffer = frame_buffer
self.process_mid = process_mid
def run(self):
while True:
frame_data = self.frame_buffer.get()
self.process_mid.stdin.write(frame_data)
class YoLov8TRT(object):
def __init__(self, engine_file_path):
# Create a Context on this device,
self.ctx = cuda.Device(0).make_context()
stream = cuda.Stream()
TRT_LOGGER = trt.Logger(trt.Logger.INFO)
runtime = trt.Runtime(TRT_LOGGER)
# Deserialize the engine from file
with open(engine_file_path, "rb") as f:
engine = runtime.deserialize_cuda_engine(f.read())
context = engine.create_execution_context()
host_inputs = []
cuda_inputs = []
host_outputs = []
cuda_outputs = []
bindings = []
for binding in engine:
print('bingding:', binding, engine.get_binding_shape(binding))
size = trt.volume(engine.get_binding_shape(binding)) * engine.max_batch_size
dtype = trt.nptype(engine.get_binding_dtype(binding))
# Allocate host and device buffers
host_mem = cuda.pagelocked_empty(size, dtype)
cuda_mem = cuda.mem_alloc(host_mem.nbytes)
# Append the device buffer to device bindings.
bindings.append(int(cuda_mem))
# Append to the appropriate list.
if engine.binding_is_input(binding):
self.input_w = engine.get_binding_shape(binding)[-1]
self.input_h = engine.get_binding_shape(binding)[-2]
host_inputs.append(host_mem)
cuda_inputs.append(cuda_mem)
else:
host_outputs.append(host_mem)
cuda_outputs.append(cuda_mem)
# Store
self.stream = stream
self.context = context
self.engine = engine
self.host_inputs = host_inputs
self.cuda_inputs = cuda_inputs
self.host_outputs = host_outputs
self.cuda_outputs = cuda_outputs
self.bindings = bindings
self.batch_size = engine.max_batch_size
def infer(self, image_frame):
threading.Thread.__init__(self)
# Make self the active context, pushing it on top of the context stack.
self.ctx.push()
# Restore
stream = self.stream
context = self.context
engine = self.engine
host_inputs = self.host_inputs
cuda_inputs = self.cuda_inputs
host_outputs = self.host_outputs
cuda_outputs = self.cuda_outputs
bindings = self.bindings
# Do image preprocess
batch_image_raw = []
batch_origin_h = []
batch_origin_w = []
batch_input_image = np.empty(shape=[self.batch_size, 3, self.input_h, self.input_w])
# 方法遍历由 raw_image_generator 生成的图像,对每个图像执行预处理,并收集处理后的图像数据以及原始图像的尺寸。
# 预处理可能包括调整大小、标准化等步骤,为推理准备适当的输入格式。
# for i, image_raw in enumerate(image_frame):
input_image, image_raw, origin_h, origin_w = self.preprocess_image(image_frame)
batch_image_raw.append(image_frame)
batch_origin_h.append(origin_h)
batch_origin_w.append(origin_w)
np.copyto(batch_input_image[0], input_image)
batch_input_image = np.ascontiguousarray(batch_input_image)
# Copy input image to host buffer
np.copyto(host_inputs[0], batch_input_image.ravel())
# 记录推理开始时间
# start = time.time()
# 将输入数据传输到GPU
cuda.memcpy_htod_async(cuda_inputs[0], host_inputs[0], stream)
# 执行异步推理
context.execute_async(batch_size=self.batch_size, bindings=bindings, stream_handle=stream.handle)
# 将预测结果传回主机
cuda.memcpy_dtoh_async(host_outputs[0], cuda_outputs[0], stream)
# 同步CUDA流
stream.synchronize()
# 并移除上下文
self.ctx.pop()
output = host_outputs[0]
# 后处理和结果解析:
for i in range(self.batch_size):
# for循环中 调用self.post_process方法进行后处理提取结果框、得分和类别ID。 此处不需要特别注意 了解获得返回结果即可
result_boxes, result_scores, result_classid = self.post_process(
output[i * 38001: (i + 1) * 38001], batch_origin_h[i], batch_origin_w[i]
)
result_list = []
for j in range(len(result_boxes)):
box = result_boxes[j]
# !!!!!!!!!!!这个推理的地方 后续可判断一张图片中是否需要保存 若保存调用保存方法
# plot_one_box 方法在原始图像上绘制检测到的每个对象的边界框和标签
# print("类别",int(result_classid[j]))
if(int(result_classid[j]) < len(categories)): # 这里莫名会出现类别大于预定义类别长度的情况??
result = plot_one_box(
box,
batch_image_raw[i],
label="{}".format(categories[int(result_classid[j])])
# label="{}:{:.2f}".format(
# categories[int(result_classid[j])], result_scores[j]
# ),
)
# 将检测结果保存到minio中
result_list.append(result)
return batch_image_raw, result_list
def destroy(self):
# Remove any context from the top of the context stack, deactivating it.
self.ctx.pop()
def get_raw_image_zeros(self, image_path_batch=None):
"""
description: Ready data for warmup
"""
for _ in range(self.batch_size):
yield np.zeros([self.input_h, self.input_w, 3], dtype=np.uint8)
def preprocess_image(self, raw_bgr_image):
image_raw = raw_bgr_image
h, w, c = image_raw.shape
image = cv2.cvtColor(image_raw, cv2.COLOR_BGR2RGB)
# Calculate widht and height and paddings
r_w = self.input_w / w
r_h = self.input_h / h
if r_h > r_w:
tw = self.input_w
th = int(r_w * h)
tx1 = tx2 = 0
ty1 = int((self.input_h - th) / 2)
ty2 = self.input_h - th - ty1
else:
tw = int(r_h * w)
th = self.input_h
tx1 = int((self.input_w - tw) / 2)
tx2 = self.input_w - tw - tx1
ty1 = ty2 = 0
# Resize the image with long side while maintaining ratio
image = cv2.resize(image, (tw, th))
# Pad the short side with (128,128,128)
image = cv2.copyMakeBorder(
image, ty1, ty2, tx1, tx2, cv2.BORDER_CONSTANT, None, (128, 128, 128)
)
image = image.astype(np.float32)
# Normalize to [0,1]
image /= 255.0
# HWC to CHW format:
image = np.transpose(image, [2, 0, 1])
# CHW to NCHW format
image = np.expand_dims(image, axis=0)
# Convert the image to row-major order, also known as "C order":
image = np.ascontiguousarray(image)
return image, image_raw, h, w
def xywh2xyxy(self, origin_h, origin_w, x):
"""
description: Convert nx4 boxes from [x, y, w, h] to [x1, y1, x2, y2] where xy1=top-left, xy2=bottom-right
param:
origin_h: height of original image
origin_w: width of original image
x: A boxes numpy, each row is a box [center_x, center_y, w, h]
return:
y: A boxes numpy, each row is a box [x1, y1, x2, y2]
"""
y = np.zeros_like(x)
r_w = self.input_w / origin_w
r_h = self.input_h / origin_h
if r_h > r_w:
y[:, 0] = x[:, 0]
y[:, 2] = x[:, 2]
y[:, 1] = x[:, 1] - (self.input_h - r_w * origin_h) / 2
y[:, 3] = x[:, 3] - (self.input_h - r_w * origin_h) / 2
y /= r_w
else:
y[:, 0] = x[:, 0] - (self.input_w - r_h * origin_w) / 2
y[:, 2] = x[:, 2] - (self.input_w - r_h * origin_w) / 2
y[:, 1] = x[:, 1]
y[:, 3] = x[:, 3]
y /= r_h
return y
def post_process(self, output, origin_h, origin_w):
# Get the num of boxes detected
num = int(output[0])
# Reshape to a two dimentional ndarray
pred = np.reshape(output[1:], (-1, 38))[:num, :]
# Do nms
boxes = self.non_max_suppression(pred, origin_h, origin_w, conf_thres=CONF_THRESH, nms_thres=IOU_THRESHOLD)
result_boxes = boxes[:, :4] if len(boxes) else np.array([])
result_scores = boxes[:, 4] if len(boxes) else np.array([])
result_classid = boxes[:, 5] if len(boxes) else np.array([])
return result_boxes, result_scores, result_classid
def bbox_iou(self, box1, box2, x1y1x2y2=True):
"""
description: compute the IoU of two bounding boxes
param:
box1: A box coordinate (can be (x1, y1, x2, y2) or (x, y, w, h))
box2: A box coordinate (can be (x1, y1, x2, y2) or (x, y, w, h))
x1y1x2y2: select the coordinate format
return:
iou: computed iou
box_iou
"""
if not x1y1x2y2:
# Transform from center and width to exact coordinates
b1_x1, b1_x2 = box1[:, 0] - box1[:, 2] / 2, box1[:, 0] + box1[:, 2] / 2
b1_y1, b1_y2 = box1[:, 1] - box1[:, 3] / 2, box1[:, 1] + box1[:, 3] / 2
b2_x1, b2_x2 = box2[:, 0] - box2[:, 2] / 2, box2[:, 0] + box2[:, 2] / 2
b2_y1, b2_y2 = box2[:, 1] - box2[:, 3] / 2, box2[:, 1] + box2[:, 3] / 2
else:
# Get the coordinates of bounding boxes
b1_x1, b1_y1, b1_x2, b1_y2 = box1[:, 0], box1[:, 1], box1[:, 2], box1[:, 3]
b2_x1, b2_y1, b2_x2, b2_y2 = box2[:, 0], box2[:, 1], box2[:, 2], box2[:, 3]
# Get the coordinates of the intersection rectangle
inter_rect_x1 = np.maximum(b1_x1, b2_x1)
inter_rect_y1 = np.maximum(b1_y1, b2_y1)
inter_rect_x2 = np.minimum(b1_x2, b2_x2)
inter_rect_y2 = np.minimum(b1_y2, b2_y2)
# Intersection area
inter_area = np.clip(inter_rect_x2 - inter_rect_x1 + 1, 0, None) * \
np.clip(inter_rect_y2 - inter_rect_y1 + 1, 0, None)
# Union Area
b1_area = (b1_x2 - b1_x1 + 1) * (b1_y2 - b1_y1 + 1)
b2_area = (b2_x2 - b2_x1 + 1) * (b2_y2 - b2_y1 + 1)
iou = inter_area / (b1_area + b2_area - inter_area + 1e-16)
return iou
def non_max_suppression(self, prediction, origin_h, origin_w, conf_thres=0.5, nms_thres=0.4):
boxes = prediction[prediction[:, 4] >= conf_thres]
boxes[:, :4] = self.xywh2xyxy(origin_h, origin_w, boxes[:, :4])
boxes[:, 0] = np.clip(boxes[:, 0], 0, origin_w - 1)
boxes[:, 2] = np.clip(boxes[:, 2], 0, origin_w - 1)
boxes[:, 1] = np.clip(boxes[:, 1], 0, origin_h - 1)
boxes[:, 3] = np.clip(boxes[:, 3], 0, origin_h - 1)
confs = boxes[:, 4]
boxes = boxes[np.argsort(-confs)]
keep_boxes = []
while boxes.shape[0]:
large_overlap = self.bbox_iou(np.expand_dims(boxes[0, :4], 0), boxes[:, :4]) > nms_thres
label_match = boxes[0, -1] == boxes[:, -1]
# Indices of boxes with lower confidence scores, large IOUs and matching labels
invalid = large_overlap & label_match
keep_boxes += [boxes[0]]
boxes = boxes[~invalid]
boxes = np.stack(keep_boxes, 0) if len(keep_boxes) else np.array([])
return boxes
def connect_to_rtsp_stream(url):
""" 尝试连接到 RTSP 流 """
cap = cv2.VideoCapture(url)
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'HEVC'))
if not cap.isOpened():
print(f"Failed to connect to {url}")
return None
return cap
class rtspInputFrame(threading.Thread):
def __init__(self, video_path, rtsp_frame_buffer):
threading.Thread.__init__(self)
self.video_path = video_path
self.rtsp_frame_buffer = rtsp_frame_buffer # rtsp输入缓冲区的队列
def run(self):
cap = connect_to_rtsp_stream(self.video_path)
while True:
if cap is None:
cap = connect_to_rtsp_stream(self.video_path)
time.sleep(5)
else:
# 从视频流中读取一帧
ret, frame = cap.read()
# 如果帧读取成功,则 ret 为 True
if not ret:
print("无法读取帧,尝试重新连接...")
# 关闭当前的 VideoCapture 对象
cap.release()
# 尝试重新连接
while True:
cap = connect_to_rtsp_stream(self.video_path)
if cap is not None:
break
# 等待一段时间后再尝试连接
time.sleep(5) # 休眠5秒
else:
if rtsp_frame_buffer.full():
continue # 如果队列满了,就丢弃最早的帧 这个地方可能有线程安全问题
frame = cv2.resize(frame, (1280, 720))
rtsp_frame_buffer.put(frame)
# 释放资源
cap.release()
cv2.destroyAllWindows()
class inferThread(threading.Thread):
def __init__(self, yolov8_wrapper, video_path, ip, rtsp_frame_buffer, tokenResult, ffmpeg_buffer_size=300,
save_buffer_size=2500):
threading.Thread.__init__(self)
self.ip = ip
self.yolov8_wrapper = yolov8_wrapper
self.video_path = video_path
self.rtsp_frame_buffer = rtsp_frame_buffer
self.tokenResult = tokenResult
self.ffmpeg_buffer_size = ffmpeg_buffer_size
self.ffmpeg_frame_buffer = queue.Queue(maxsize=ffmpeg_buffer_size) # ffmpeg输出到m3u6的队列输出切片缓冲区用于存储视频帧
self.save_frame_buffer = queue.Queue(maxsize=save_buffer_size) # 用于保存视频的缓存
self.frame_count = 0 # 帧计数器
self.start_warning_time = None # 记录告警开始时间
self.latest_warning_time = None # 记录告警结束时间
self.old_update_warning_time = time.time() - 3600 # 上一次成功告警的时间
def run(self):
frame_push_thread = FramePushThread(self.ffmpeg_frame_buffer, pipeline_mid)
frame_push_thread.start()
previous_frame = None
save_flag = []
self.tokenResult['token'] = 'token'
self.tokenResult['current_time'] = '2024-01-01 00:00:00'
while True:
frame = rtsp_frame_buffer.get()
if previous_frame is not None:
result_same = compare_frames(previous_frame, frame, 100)
if result_same:
# 相同帧不做处理直接推流
ret, jpeg_frame = cv2.imencode('.jpg', frame)
self.ffmpeg_frame_buffer.put(jpeg_frame.tobytes())
else:
batch_image_raw, r_list = self.yolov8_wrapper.infer(frame)
ret, jpeg_frame = cv2.imencode('.jpg', frame)
self.ffmpeg_frame_buffer.put(jpeg_frame.tobytes())
if r_list is not None:
now_time = time.time()
# 如果告警满了 直接上传
if self.save_frame_buffer.full():
# print('开始告警时间超时60s应该触发长告警上传方法')
token = get_token(self.tokenResult)
# token = '123'
print('执行保存')
save_thread = SaveAndUploadMP4Thread(self.ip, self.save_frame_buffer, save_flag,
False, token)
save_thread.start()
save_flag.clear()
self.frame_count = 0
self.save_frame_buffer.queue.clear()
self.old_update_warning_time = now_time
self.start_warning_time = None
self.latest_warning_time = None
else:
# 不在600秒以内 进行操作
upload_flag = verify_timenum(self.old_update_warning_time, now_time, time_interval)
# print("是否在600s以内" + str(upload_flag))
if upload_flag is False:
flag = verify_bbox_class(r_list, save_flag)
if flag:
# 若存在人脸,且为最新的警告,则设置开始时间,最新时间为当前时间
if self.latest_warning_time is None:
self.start_warning_time = now_time
self.latest_warning_time = now_time
# print('判断是第一个警告视频帧,且开始时间 和 最新警告时间是:' + formatted_time)
# 若存在人脸,且为后续的警告,则设置最新时间为当前时间,开始时间不变
if self.latest_warning_time is not None:
self.latest_warning_time = now_time
# print('判断是告警事件的后续视频帧‘)
self.save_frame_buffer.put(frame)
# print('存入')
self.frame_count += 1
# 正常帧,但有最新告警时间---->说明事件还未完成
if flag is False and self.latest_warning_time is not None:
# 判断距离最新告警帧事件是否超过10s若在10s内返回true 不在 返回false
save_upload_flag = verify_timenum(self.latest_warning_time, now_time, 10)
# 10s内直接保存
if save_upload_flag:
self.save_frame_buffer.put(frame)
else:
if self.frame_count > 20:
# save_frame_type_min 是否为短告警
# 判断最新告警时间 和 开始告警时间相差是否在10s内
# save_frame_type_min = true 为10s内上传图片 save_frame_type_min = false超过5s上传图片和视频
save_frame_type_min = verify_timenum(self.start_warning_time,
self.latest_warning_time, 5)
# print('判断是正常视频帧,且在5s外,距离最新告警时间相差超过5s应该触发上传方法')
# 获取token
token = get_token(self.tokenResult)
print('执行保存')
save_thread = SaveAndUploadMP4Thread(self.ip, self.save_frame_buffer, save_flag,
save_frame_type_min, token)
save_thread.start()
save_flag.clear()
self.frame_count = 0
self.save_frame_buffer.queue.clear()
self.old_update_warning_time = now_time
self.start_warning_time = None
self.latest_warning_time = None
else:
self.frame_count = 0
self.save_frame_buffer.queue.clear()
self.old_update_warning_time = now_time
self.start_warning_time = None
self.latest_warning_time = None
if self.latest_warning_time is not None and self.start_warning_time is not None:
maxTimeMp4Flag = verify_timenum(self.start_warning_time, self.latest_warning_time, 60)
if maxTimeMp4Flag is False:
# print('开始告警时间超时60s应该触发长告警上传方法')
print('执行保存')
# 获取token
token = get_token(self.tokenResult)
save_thread = SaveAndUploadMP4Thread(self.ip, self.save_frame_buffer, save_flag,
False, token)
save_thread.start()
save_flag.clear()
self.frame_count = 0
self.save_frame_buffer.queue.clear()
self.old_update_warning_time = now_time
self.start_warning_time = None
self.latest_warning_time = None
previous_frame = frame
if __name__ == "__main__":
print("=============================================================================================================")
# load custom plugin and engine
PLUGIN_LIBRARY = "build_250107/libmyplugins.so"
engine_file_path = "build_250107/best.engine"
# 执行python代码命令行 参数判断操作(不做处理)
if len(sys.argv) > 1:
engine_file_path = sys.argv[1]
if len(sys.argv) > 2:
PLUGIN_LIBRARY = sys.argv[2]
clear_folder(ip)
# 加载动态链接库
ctypes.CDLL(PLUGIN_LIBRARY)
# 手动输入训练时的类别
categories = ["face", "shoe","phone", "e-bike"]
try:
# 加载模型文件
yolov8_wrapper1 = YoLov8TRT(engine_file_path)
except:
print("加载文件失败")
restart_program()
try:
thread_rtsp = rtspInputFrame(vod_path,
rtsp_frame_buffer)
thread_det_1 = inferThread(yolov8_wrapper1, vod_path, ip, rtsp_frame_buffer, tokenResult)
thread_rtsp.start()
thread_det_1.start()
thread_rtsp.join()
thread_det_1.join()
finally:
# destroy the instance
thread_det_1.destroy()