duan8_services/d8_4.py
2025-01-09 16:25:17 +08:00

1204 lines
47 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 开发者 haotian
# 开发时间: 2024/9/20 21:14
'''
v7_p1_1.5.1 的 改进版
将文件路径等做成可配置的
'''
'''
在1.5 的基础上修改了判断人脸的逻辑
'''
'''
人脸识别直接新开一个线程算了
'''
'''
添加人脸识别,新的告警逻辑。
改进了告警的逻辑。
没写上传minio
加上上传minio
加上post警告测试环境注释掉了。
测试环境
1.注释掉获取get_token
2.注释掉minio上传文件 self.minio_client.put_object
3.注释掉 send_post_request
4.注释掉 os.remove
'''
import ctypes
import os
import shutil
import random
import sys
import threading
import time
import cv2
import numpy as np
import pycuda.autoinit
import pycuda.driver as cuda
import tensorrt as trt
import queue
from minio import Minio
import yaml
import threading
import subprocess
import uuid
import requests
import json
import datetime
from compreface import CompreFace
from compreface.service import RecognitionService
# from PIL import Image
CONF_THRESH = 0.65
IOU_THRESHOLD = 0.4
with open('config.yaml', 'r') as file:
configData = yaml.safe_load(file)
# Minio实例化, 用于云端存储文件
client = Minio(
endpoint=configData['minioConfig']['endpoint'],
access_key=configData['minioConfig']['access_key'],
secret_key=configData['minioConfig']['secret_key'],
secure=configData['minioConfig']['secure']
)
# 保存token
tokenResult = {}
getTokenUrl = configData['dataConfig']['getTokenUrl']
# 告警信息url
putMessageUrl = configData['dataConfig']['putMessageUrl']
# ip和文件目录标识符
ip = configData['video_config']['v4_ip']
# 所有模型类别
categories = configData['video_config']['categories']
# 输出m3u8文件地址
m3u8_path = configData['video_config']['m3u8_path_0']
# 图片/视频文件的暂存路径
save_path = configData['video_config']['save_path']
# 视频源地址
vod_path = configData['video_config']['v0_path']
# 人脸识别暂存文件地址
people_save_path = configData['video_config']['people_save_path']
# 模型文件地址
engine_path = configData['engine_path']
vod_channelNo = configData['video_config']['v0_channelNo']
# 要检测的类别
testclasses = configData['video_config']['v0_testclasses']
# 请求超时时间
time_interval = configData['dataConfig']['timeInterval']
command_mid = [
'ffmpeg',
'-i', '-', # 从标准输入读取视频帧
'-c:v', 'libx264', # 使用 H.264 编码
'-b:v', '500k', # 设置视频比特率
'-preset', 'superfast', # 编码速度
'-tune', 'zerolatency', # 低延迟
'-crf', '23', # 使用 CRF 模式来控制视频质量
'-s', '1280x720', # 设置分辨率
'-an', # 禁用音频
'-loglevel', 'error',
# '-f', 'flv', # 输出格式
# 'rtmp://127.0.0.1:1935/live/1' # 输出到 RTMP 服务器
'-hls_time', '4',
'-hls_list_size', '2',
'-hls_flags', 'delete_segments',
'-f', 'hls',
f'{m3u8_path}'+ ip + '/index.m3u8'
]
# 人脸识别部分
DOMAIN: str = configData['compreface_service']['domain']
PORT: str = configData['compreface_service']['port']
API_KEY: str = configData['compreface_service']['api_key']
LIMIT: str = configData['compreface_service']['limit']
Det_prob_threshold: str = configData['compreface_service']['det_prob_threshold']
# 人脸识别客户端
compre_face: CompreFace = CompreFace(DOMAIN, PORT,options={'limit': LIMIT,"det_prob_threshold":Det_prob_threshold})
recognition: RecognitionService = compre_face.init_face_recognition(API_KEY)
pipeline_mid = subprocess.Popen(command_mid, shell=False, stdin=subprocess.PIPE)
frames = [None] * 6
# 拉流缓存
rtsp_frame_buffer = queue.Queue(maxsize=300)
# 全局人名字典每天0点清空
d_face = dict()
# 获取token和对应时间 存入字典
def get_token(tokenResult):
if 'token' in tokenResult and 'current_time' in tokenResult:
token_time = datetime.datetime.strptime(tokenResult['current_time'],
"%Y-%m-%d %H:%M:%S")
current_time = datetime.datetime.now()
time_diff = current_time - token_time
if time_diff.total_seconds() > 20 * 60:
# 过期重新请求 token
# print("token 已过期")
response = requests.post(getTokenUrl)
if response.status_code == 200:
data = json.loads(response.text)
if 'retCode' in data and data['retCode'] == '200':
token = data['responseBody']['token']
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
tokenResult['token'] = token
tokenResult['current_time'] = current_time
else:
tokenResult['error'] = data['errorDesc']
else:
tokenResult['error'] = response.status_code
token = tokenResult['token']
return token
def send_post_request(url, token, msg, picUrl, videoUrl):
payload = {
"tenantCode": "32",
"channelNo": vod_channelNo,
"alarmContent": msg,
"alarmTime": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"picInfo": [
{"url": picUrl}
],
"videoInfo": [
{"url": videoUrl}
]
}
headers = {
'X-Access-Token': token,
'Content-Type': 'application/json'
}
# print(url)
# print(headers)
# print(payload)
response = requests.post(url, headers=headers, data=json.dumps(payload))
#print(response)
def clear_folder(ip):
folder_path1 = m3u8_path+ip
# 判断文件夹是否存在
if not os.path.exists(folder_path1):
print(f"文件夹 {folder_path1} 不存在!")
os.mkdir(folder_path1)
# return
# 判断文件夹是否为空
if not os.listdir(folder_path1):
print(f"文件夹 {folder_path1} 为空,无需清空!")
else:
# 清空文件夹1
for filename in os.listdir(folder_path1):
file_path = os.path.join(folder_path1, filename)
if os.path.isfile(file_path):
os.remove(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
print(f"已清空文件夹 {folder_path1} 的内容!")
def restart_program():
"""重新启动当前程序"""
python = sys.executable # 获取当前 Python 解释器的路径
# print("Restarting program...")
time.sleep(1) # 可选延迟,确保用户看到提示
os.execl(python, python, *sys.argv) # 使用相同的参数重新启动当前脚本
def verify_bbox_class(classid_list, save_flag):
if "face" in classid_list:
save_flag.append("face")
return True
if "shoe" in classid_list:
save_flag.append("shoe")
return True
if "phone" in classid_list:
save_flag.append("phone")
return True
return False
# after_time 距离 before_time 是否在 time_num秒 之内,若在 返回true 不在返回false
def verify_timenum(before_time, after_time, time_num):
if before_time is None:
return True
time_difference = after_time - before_time
if time_difference < time_num:
return True
else:
return False
# 该方法用于判断两个帧是否相同
def compare_frames(frame1, frame2, threshold):
difference = cv2.absdiff(frame1, frame2)
diff_gray = cv2.cvtColor(difference, cv2.COLOR_BGR2GRAY)
_, thresholded_diff = cv2.threshold(diff_gray, threshold, 350, cv2.THRESH_BINARY)
return np.sum(thresholded_diff) == 0
def white_color_ratio(image):
image = image.copy()
# 将图像从BGR颜色空间转换为HSV颜色空间
hsv_image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
# 定义白色的HSV范围在此示例中使用了一组简单的范围
lower_white = np.array([0, 0, 200], dtype=np.uint8)
upper_white = np.array([180, 30, 255], dtype=np.uint8)
# 创建遮罩,将白色和白灰色区域设置为白色,其他区域设置为黑色
white_mask = cv2.inRange(hsv_image, lower_white, upper_white)
# 计算白色和白灰色区域的像素数
white_pixels = np.count_nonzero(white_mask)
# 计算图像中白色和白灰色的占比
total_pixels = image.shape[0] * image.shape[1]
white_ratio = white_pixels / total_pixels
return white_ratio
def shoe_color_ratio(image):
# 将图像从BGR颜色空间转换为HSV颜色空间
hsv_image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
# 定义黑色的HSV范围在此示例中使用了一组简单的范围
lower_black = np.array([0, 0, 0], dtype=np.uint8)
upper_black = np.array([180, 255, 80], dtype=np.uint8)
# 创建遮罩,将黑色区域设置为白色,其他区域设置为黑色
black_mask = cv2.inRange(hsv_image, lower_black, upper_black)
# 计算黑色区域的像素数
black_pixels = np.count_nonzero(black_mask)
# 计算图像中黑色的占比
total_pixels = image.shape[0] * image.shape[1]
black_ratio = black_pixels / total_pixels
return black_ratio
def skin_color_ratio(image):
# 将图像从BGR颜色空间转换为HSV颜色空间
# 定义肤色的HSV范围在此示例中使用了一组简单的范围
hsv_image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
lower_skin = np.array([0, 20, 70], dtype=np.uint8)
upper_skin = np.array([20, 255, 255], dtype=np.uint8)
# 创建遮罩,将肤色区域设置为白色,其他区域设置为黑色
skin_mask = cv2.inRange(hsv_image, lower_skin, upper_skin)
# 计算肤色区域的像素数
skin_pixels = np.count_nonzero(skin_mask)
# 计算图像中人体肤色的占比
total_pixels = image.shape[0] * image.shape[1]
skin_ratio = skin_pixels / total_pixels
return skin_ratio
def get_img_path_batches(batch_size, img_dir):
ret = []
batch = []
for root, dirs, files in os.walk(img_dir):
for name in files:
if len(batch) == batch_size:
ret.append(batch)
batch = []
batch.append(os.path.join(root, name))
if len(batch) > 0:
ret.append(batch)
return ret
# 画框 返回True 为需要报警的选项返回Flase为不需要报警的
def plot_one_box(x, img, color=[0, 255, 0], label=None, line_thickness=2):
# print("label: ", label)
c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3]))
# msg = ''
if label == 'shoe':
print('识别到鞋子')
region_of_interest = img[c1[1]:c2[1], c1[0]:c2[0]]
source = shoe_color_ratio(region_of_interest)
print(f'识别到鞋子, 比率:{source}')
# white_sorce = white_color_ratio(region_of_interest)
# 鞋子黑色面积必须大于整体框的百分之50 并且框的宽度小于40高度小于30
if source < 0.4:
# 画框
color = [0, 0, 255]
cv2.rectangle(img, c1, c2, color, thickness=line_thickness, lineType=cv2.LINE_AA)
return [1, '未穿戴劳保鞋']
if label == 'face':
region_of_interest = img[c1[1]:c2[1], c1[0]:c2[0]]
face_source = skin_color_ratio(region_of_interest)
print("识别到人脸, 人脸比率: ", face_source)
# white_sorce = white_color_ratio(region_of_interest)
# print("判断存在脸,且肤色为" + str(face_source))
# 肤色面积必须大于整体框的百分之70 框的宽度大于10
if face_source > 0.8:
# 画框
color = [0, 0, 255]
cv2.rectangle(img, c1, c2, color, thickness=1, lineType=cv2.LINE_AA)
print("face")
return [1, '未佩戴口罩']
if label == 'phone':
print("phone")
color = [0, 0, 255]
cv2.rectangle(img, c1, c2, color, thickness=line_thickness, lineType=cv2.LINE_AA)
return [1, '识别到手机']
if label == 'e-bike':
print("e-bike")
color = [0, 0, 255]
cv2.rectangle(img, c1, c2, color, thickness=line_thickness, lineType=cv2.LINE_AA)
return [1, '识别到电动车']
return [0, ""]
# def check_save_flag(save_flag):
# # 定义需要检查的类别
# categories = ["shoe"]
# # 找出在save_flag中的类别
# matched_categories = [category for category in categories if category in save_flag]
# # 用'-'连接匹配的类别并返回
# return "-".join(matched_categories)
# 计算图像的模糊度
def calculate_blur(frame):
# 将图片转换为灰度图
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# 计算灰度图的方差
blur_value = cv2.Laplacian(gray, cv2.CV_64F).var()
return blur_value
# 先前陌生人数
p_s_num = 0
class FaceRecUpload(threading.Thread):
def __init__(self, ip, frame, token):
threading.Thread.__init__(self)
self.ip = ip
self.frame = frame.copy()
# self.result = result
self.new_face = False
self.token = token
self.minio_client = client
def run(self):
global p_s_num
# blur = calculate_blur(self.frame)
# print(f'\n模糊度:{blur}')
et, jpeg_frame = cv2.imencode('.jpg', self.frame)
# self.ffmpeg_frame_buffer.put(jpeg_frame.tobytes())
now_time = time.time()
result = recognition.recognize(jpeg_frame.tobytes())
if 'message' not in result:
warning = list()
# 人脸识别检测
for i in range(len(result['result'])):
# print(self.result['result'][i]['subjects'][0]['subject'], self.result['result'][i]['subjects'][0]['similarity'])
# print(self.result['result'][i]['box'])
x1, y1, x2, y2 = result['result'][i]['box']['x_min'], result['result'][i]['box']['y_min'], \
result['result'][i]['box']['x_max'], result['result'][i]['box']['y_max']
if self.frame[x1:x2+1, y1:y2+1].size > 0:
blur = calculate_blur(self.frame[x1:x2+1, y1:y2+1])
else:
blur = 0
# print(f'人脸模糊度{blur}')
if blur > 10:
# 人脸识别库中一定要有一张图片, 不然会报错
# print(len(result['result'][i]['subjects']), result['result'][i]['subjects'])
if 0.3 < result['result'][i]['subjects'][0]['similarity'] < 0.4:
# 设置文本参数
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 1
color = (255, 0, 0) # BGR颜色
thickness = 2
cv2.rectangle(self.frame, (x1, y1), (x2, y2), (255, 0, 0), 2)
cv2.putText(self.frame, f'warning {result["result"][i]["subjects"][0]["similarity"]}',
(x1, y1 - 20), font, font_scale, color, thickness,
cv2.LINE_AA)
warning.append([(x1, y1), (x2, y2)])
# 垃圾帧丢弃
elif result['result'][i]['subjects'][0]['similarity'] <= 0.3:
# print('\n丢弃的人脸识别帧')
pass
else:
if result['result'][i]['subjects'][0]['subject'] not in d_face:
# 记录每人第一次人脸识别记录
d_face[result['result'][i]['subjects'][0]['subject']] = time.time()
self.new_face = True
print(
f'\n新人打卡, 员工名:{result["result"][i]["subjects"][0]["subject"]} ,相似度:{result["result"][i]["subjects"][0]["similarity"]},打卡时间{datetime.datetime.now()}')
uuid_str = str(uuid.uuid4())[:6] + str(int(time.time()))
img_object_name = f"{save_path}{result['result'][i]['subjects'][0]['subject']}_{uuid_str}_{vod_channelNo}.jpg"
cv2.imwrite(img_object_name, self.frame)
# print(f'\n/home/admin-root/haotian/jingzhu1.1/tensorrtx-master/yolov8/attendance/{datetime.datetime.now().date()}.txt')
with open(
f'{people_save_path}{datetime.datetime.now().date()}.txt',
'a') as f:
f.write(
f'新人打卡, 员工名:{result["result"][i]["subjects"][0]["subject"]} ,相似度:{result["result"][i]["subjects"][0]["similarity"]},打卡时间{datetime.datetime.now()} \n')
else:
pass
# print('\r模糊帧丢弃', end='')
if len(warning) > p_s_num:
p_s_num = len(warning)
uuid_str = str(uuid.uuid4())[:6] + str(int(time.time())) # 生成UUID的前6位,前6位有不小的概率重复再加上时间戳。
img_object_name = f"{save_path}face_{uuid_str}_{vod_channelNo}.jpg"
cv2.imwrite(img_object_name, self.frame)
# with open(img_object_name, 'rb') as file_data:
# file_data.seek(0, os.SEEK_END)
# file_size = file_data.tell()
# file_data.seek(0)
# self.minio_client.put_object(configData['minioConfig']['bucket_name'],
# f'{uuid_str}_{vod_channelNo}_stranger_.jpg', file_data,
# file_size)
# os.remove(img_object_name)
upload_http_url_img = configData['minioConfig'][
'bucket_name'] + f'/{uuid_str}_{vod_channelNo}_stranger_.jpg'
# send_post_request(putMessageUrl, self.token, '陌生人警告', upload_http_url_img, '')
print('\n陌生人警告')
#print('陌生人图片保存完成')
elif len(warning) < p_s_num:
p_s_num = len(warning)
self.new_face = False
#print(f'\r人脸识别任务完成完成时间{time.time() - now_time}', end='')
# if blur > 750:
#
# else:
# print('\r模糊帧不做人脸识别处理', end='')
class NewSaveAndUploadMP4Thread(threading.Thread):
def __init__(self, ip, frame_buffer, save_type, token, msg_list = None):
'''
:param ip: 摄像头标识
:param frame_buffer: 异常帧缓存
:param save_type: 保存类型
'''
threading.Thread.__init__(self)
self.ip = ip
# 初始化时将异常帧转换为队列,你单纯的赋值其实是一个对象引用,若源对象变了,这里也会变。
# 线程start后可能不会立即得到执行若没执行源队列被清空了这里的队列也会变空
# 所以这里先将其转换成list存储。
self.frames = list(frame_buffer.queue)
self.save_type = save_type
self.minio_client = client
self.token = token
self.msg_list = msg_list
def run(self):
# 所以这里是可能出现重复的啊300次生成有10次会重复。。加个时间戳吧
now_time = str(int(time.time()))
uuid_str = str(uuid.uuid4())[:6] + now_time # 生成UUID的前6位
# 将异常帧存储到列表中
if self.save_type == 'picture':
print("图片上传消息", self.msg_list, "视频长度: ", len(self.frames))
first_frame = self.frames[0]
img_file_path = f"{save_path}{uuid_str}_{vod_channelNo}_.jpg"
cv2.imwrite(img_file_path, first_frame)
# with open(img_file_path, 'rb') as file_data:
# file_data.seek(0, os.SEEK_END)
# file_size = file_data.tell()
# file_data.seek(0)
# self.minio_client.put_object(configData['minioConfig']['bucket_name'], f'{uuid_str}_{vod_channelNo}_.jpg', file_data,
# file_size)
upload_http_url_img = configData['minioConfig']['bucket_name'] + f'/{uuid_str}_{vod_channelNo}_.jpg'
msg = self.msg_list[0] if len(self.msg_list) > 0 else ""
# upload_http_url_img = configData['minioConfig']['bucket_name'] + f'/{uuid_str}_{vod_channelNo}_.jpg'
# send_post_request(putMessageUrl, self.token, msg, upload_http_url_img, '')
# os.remove(img_object_name)
print('\n上传图片完成')
self.msg_list.clear()
elif self.save_type == 'video':
print("视频上传消息", self.msg_list, "视频长度: ", len(self.frames))
mp4_file_path = f'{save_path}{uuid_str}_{vod_channelNo}_.mp4'
height, width, _ = self.frames[0].shape
# 保存视频
out = cv2.VideoWriter(mp4_file_path, cv2.VideoWriter_fourcc(*'mp4v'), 25, (width, height), isColor=True)
for frame in self.frames:
out.write(frame)
out.release()
mp4_object_name = f"{uuid_str}_{vod_channelNo}_.mp4"
# self.temp_file_path = os.path.abspath(mp4_file_path)
self.temp_file_size = os.path.getsize(mp4_file_path)
# with open(mp4_file_path, 'rb') as file_data:
# #上传到minio
# self.minio_client.put_object(configData['minioConfig']['bucket_name'], mp4_object_name, file_data,
# self.temp_file_size)
upload_http_url_mp4 = configData['minioConfig']['bucket_name'] + f'/{uuid_str}_{vod_channelNo}_.mp4'
msg = self.msg_list[0]
# 上传警告。
# send_post_request(putMessageUrl, self.token, msg, '', upload_http_url_mp4)
# os.remove(mp4_file_path)
print('\n上传视频成功')
self.msg_list.clear()
else:
print('\n异常类型')
class FramePushThread(threading.Thread):
def __init__(self, frame_buffer, process_mid):
threading.Thread.__init__(self)
self.frame_buffer = frame_buffer
self.process_mid = process_mid
def run(self):
while True:
frame_data = self.frame_buffer.get()
self.process_mid.stdin.write(frame_data)
class YoLov8TRT(object):
def __init__(self, engine_file_path):
# Create a Context on this device,
self.ctx = cuda.Device(0).make_context()
stream = cuda.Stream()
TRT_LOGGER = trt.Logger(trt.Logger.INFO)
runtime = trt.Runtime(TRT_LOGGER)
# Deserialize the engine from file
with open(engine_file_path, "rb") as f:
engine = runtime.deserialize_cuda_engine(f.read())
context = engine.create_execution_context()
host_inputs = []
cuda_inputs = []
host_outputs = []
cuda_outputs = []
bindings = []
for binding in engine:
print('bingding:', binding, engine.get_binding_shape(binding))
size = trt.volume(engine.get_binding_shape(binding)) * engine.max_batch_size
dtype = trt.nptype(engine.get_binding_dtype(binding))
# Allocate host and device buffers
host_mem = cuda.pagelocked_empty(size, dtype)
cuda_mem = cuda.mem_alloc(host_mem.nbytes)
# Append the device buffer to device bindings.
bindings.append(int(cuda_mem))
# Append to the appropriate list.
if engine.binding_is_input(binding):
self.input_w = engine.get_binding_shape(binding)[-1]
self.input_h = engine.get_binding_shape(binding)[-2]
host_inputs.append(host_mem)
cuda_inputs.append(cuda_mem)
else:
host_outputs.append(host_mem)
cuda_outputs.append(cuda_mem)
# Store
self.stream = stream
self.context = context
self.engine = engine
self.host_inputs = host_inputs
self.cuda_inputs = cuda_inputs
self.host_outputs = host_outputs
self.cuda_outputs = cuda_outputs
self.bindings = bindings
self.batch_size = engine.max_batch_size
def infer(self, image_frame):
threading.Thread.__init__(self)
# Make self the active context, pushing it on top of the context stack.
self.ctx.push()
# Restore
stream = self.stream
context = self.context
engine = self.engine
host_inputs = self.host_inputs
cuda_inputs = self.cuda_inputs
host_outputs = self.host_outputs
cuda_outputs = self.cuda_outputs
bindings = self.bindings
# Do image preprocess
batch_image_raw = []
batch_origin_h = []
batch_origin_w = []
batch_input_image = np.empty(shape=[self.batch_size, 3, self.input_h, self.input_w])
# 方法遍历由 raw_image_generator 生成的图像,对每个图像执行预处理,并收集处理后的图像数据以及原始图像的尺寸。
# 预处理可能包括调整大小、标准化等步骤,为推理准备适当的输入格式。
# for i, image_raw in enumerate(image_frame):
input_image, image_raw, origin_h, origin_w = self.preprocess_image(image_frame)
batch_image_raw.append(image_frame)
batch_origin_h.append(origin_h)
batch_origin_w.append(origin_w)
np.copyto(batch_input_image[0], input_image)
batch_input_image = np.ascontiguousarray(batch_input_image)
# Copy input image to host buffer
np.copyto(host_inputs[0], batch_input_image.ravel())
# 记录推理开始时间
# start = time.time()
# 将输入数据传输到GPU
cuda.memcpy_htod_async(cuda_inputs[0], host_inputs[0], stream)
# 执行异步推理
context.execute_async(batch_size=self.batch_size, bindings=bindings, stream_handle=stream.handle)
# 将预测结果传回主机
cuda.memcpy_dtoh_async(host_outputs[0], cuda_outputs[0], stream)
# 同步CUDA流
stream.synchronize()
# 并移除上下文
self.ctx.pop()
output = host_outputs[0]
# 后处理和结果解析:
for i in range(self.batch_size):
# for循环中 调用self.post_process方法进行后处理提取结果框、得分和类别ID。 此处不需要特别注意 了解获得返回结果即可
result_boxes, result_scores, result_classid = self.post_process(
output[i * 38001: (i + 1) * 38001], batch_origin_h[i], batch_origin_w[i]
)
result_list = 0
result_boxes_list = list()
msg_list = list()
for j in range(len(result_boxes)):
box = result_boxes[j]
# !!!!!!!!!!!这个推理的地方 后续可判断一张图片中是否需要保存 若保存调用保存方法
# plot_one_box 方法在原始图像上绘制检测到的每个对象的边界框和标签
t = int(result_classid[j])
# print("t: ", t)
if t in testclasses:
result, msg = plot_one_box(
box,
batch_image_raw[i],
label="{}".format(categories[t])
)
# print("result: ", result)
# 将检测结果保存到minio中
result_list += result
if result == 1:
result_boxes_list.append(box)
msg_list.append(msg)
return batch_image_raw, result_list, result_boxes_list, msg_list
def destroy(self):
# Remove any context from the top of the context stack, deactivating it.
self.ctx.pop()
def get_raw_image_zeros(self, image_path_batch=None):
"""
description: Ready data for warmup
"""
for _ in range(self.batch_size):
yield np.zeros([self.input_h, self.input_w, 3], dtype=np.uint8)
def preprocess_image(self, raw_bgr_image):
image_raw = raw_bgr_image
h, w, c = image_raw.shape
image = cv2.cvtColor(image_raw, cv2.COLOR_BGR2RGB)
# Calculate widht and height and paddings
r_w = self.input_w / w
r_h = self.input_h / h
if r_h > r_w:
tw = self.input_w
th = int(r_w * h)
tx1 = tx2 = 0
ty1 = int((self.input_h - th) / 2)
ty2 = self.input_h - th - ty1
else:
tw = int(r_h * w)
th = self.input_h
tx1 = int((self.input_w - tw) / 2)
tx2 = self.input_w - tw - tx1
ty1 = ty2 = 0
# Resize the image with long side while maintaining ratio
image = cv2.resize(image, (tw, th))
# Pad the short side with (128,128,128)
image = cv2.copyMakeBorder(
image, ty1, ty2, tx1, tx2, cv2.BORDER_CONSTANT, None, (128, 128, 128)
)
image = image.astype(np.float32)
# Normalize to [0,1]
image /= 255.0
# HWC to CHW format:
image = np.transpose(image, [2, 0, 1])
# CHW to NCHW format
image = np.expand_dims(image, axis=0)
# Convert the image to row-major order, also known as "C order":
image = np.ascontiguousarray(image)
return image, image_raw, h, w
def xywh2xyxy(self, origin_h, origin_w, x):
"""
description: Convert nx4 boxes from [x, y, w, h] to [x1, y1, x2, y2] where xy1=top-left, xy2=bottom-right
param:
origin_h: height of original image
origin_w: width of original image
x: A boxes numpy, each row is a box [center_x, center_y, w, h]
return:
y: A boxes numpy, each row is a box [x1, y1, x2, y2]
"""
y = np.zeros_like(x)
r_w = self.input_w / origin_w
r_h = self.input_h / origin_h
if r_h > r_w:
y[:, 0] = x[:, 0]
y[:, 2] = x[:, 2]
y[:, 1] = x[:, 1] - (self.input_h - r_w * origin_h) / 2
y[:, 3] = x[:, 3] - (self.input_h - r_w * origin_h) / 2
y /= r_w
else:
y[:, 0] = x[:, 0] - (self.input_w - r_h * origin_w) / 2
y[:, 2] = x[:, 2] - (self.input_w - r_h * origin_w) / 2
y[:, 1] = x[:, 1]
y[:, 3] = x[:, 3]
y /= r_h
return y
def post_process(self, output, origin_h, origin_w):
# Get the num of boxes detected
num = int(output[0])
# Reshape to a two dimentional ndarray
pred = np.reshape(output[1:], (-1, 38))[:num, :]
# Do nms
boxes = self.non_max_suppression(pred, origin_h, origin_w, conf_thres=CONF_THRESH, nms_thres=IOU_THRESHOLD)
result_boxes = boxes[:, :4] if len(boxes) else np.array([])
result_scores = boxes[:, 4] if len(boxes) else np.array([])
result_classid = boxes[:, 5] if len(boxes) else np.array([])
return result_boxes, result_scores, result_classid
def bbox_iou(self, box1, box2, x1y1x2y2=True):
"""
description: compute the IoU of two bounding boxes
param:
box1: A box coordinate (can be (x1, y1, x2, y2) or (x, y, w, h))
box2: A box coordinate (can be (x1, y1, x2, y2) or (x, y, w, h))
x1y1x2y2: select the coordinate format
return:
iou: computed iou
box_iou
"""
if not x1y1x2y2:
# Transform from center and width to exact coordinates
b1_x1, b1_x2 = box1[:, 0] - box1[:, 2] / 2, box1[:, 0] + box1[:, 2] / 2
b1_y1, b1_y2 = box1[:, 1] - box1[:, 3] / 2, box1[:, 1] + box1[:, 3] / 2
b2_x1, b2_x2 = box2[:, 0] - box2[:, 2] / 2, box2[:, 0] + box2[:, 2] / 2
b2_y1, b2_y2 = box2[:, 1] - box2[:, 3] / 2, box2[:, 1] + box2[:, 3] / 2
else:
# Get the coordinates of bounding boxes
b1_x1, b1_y1, b1_x2, b1_y2 = box1[:, 0], box1[:, 1], box1[:, 2], box1[:, 3]
b2_x1, b2_y1, b2_x2, b2_y2 = box2[:, 0], box2[:, 1], box2[:, 2], box2[:, 3]
# Get the coordinates of the intersection rectangle
inter_rect_x1 = np.maximum(b1_x1, b2_x1)
inter_rect_y1 = np.maximum(b1_y1, b2_y1)
inter_rect_x2 = np.minimum(b1_x2, b2_x2)
inter_rect_y2 = np.minimum(b1_y2, b2_y2)
# Intersection area
inter_area = np.clip(inter_rect_x2 - inter_rect_x1 + 1, 0, None) * \
np.clip(inter_rect_y2 - inter_rect_y1 + 1, 0, None)
# Union Area
b1_area = (b1_x2 - b1_x1 + 1) * (b1_y2 - b1_y1 + 1)
b2_area = (b2_x2 - b2_x1 + 1) * (b2_y2 - b2_y1 + 1)
iou = inter_area / (b1_area + b2_area - inter_area + 1e-16)
return iou
def non_max_suppression(self, prediction, origin_h, origin_w, conf_thres=0.5, nms_thres=0.4):
boxes = prediction[prediction[:, 4] >= conf_thres]
boxes[:, :4] = self.xywh2xyxy(origin_h, origin_w, boxes[:, :4])
boxes[:, 0] = np.clip(boxes[:, 0], 0, origin_w - 1)
boxes[:, 2] = np.clip(boxes[:, 2], 0, origin_w - 1)
boxes[:, 1] = np.clip(boxes[:, 1], 0, origin_h - 1)
boxes[:, 3] = np.clip(boxes[:, 3], 0, origin_h - 1)
confs = boxes[:, 4]
boxes = boxes[np.argsort(-confs)]
keep_boxes = []
while boxes.shape[0]:
large_overlap = self.bbox_iou(np.expand_dims(boxes[0, :4], 0), boxes[:, :4]) > nms_thres
label_match = boxes[0, -1] == boxes[:, -1]
# Indices of boxes with lower confidence scores, large IOUs and matching labels
invalid = large_overlap & label_match
keep_boxes += [boxes[0]]
boxes = boxes[~invalid]
boxes = np.stack(keep_boxes, 0) if len(keep_boxes) else np.array([])
return boxes
def get_attendance_p():
# 万一今天重启了,直接从文件中读取以打卡的人数,避免重复打卡
global d_face
try:
with open(f'{people_save_path}{datetime.datetime.now().date()}.txt','r') as f:
for line in f:
d_face[line.strip().split(' ')[1][4:]] = 1
except:
pass
def connect_to_rtsp_stream(url):
""" 尝试连接到 RTSP 流 """
cap = cv2.VideoCapture(url)
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'HEVC'))
if not cap.isOpened():
print(f"Failed to connect to {url}")
return None
return cap
class rtspInputFrame(threading.Thread):
def __init__(self, video_path, rtsp_frame_buffer):
threading.Thread.__init__(self)
self.video_path = video_path
self.rtsp_frame_buffer = rtsp_frame_buffer # rtsp输入缓冲区的队列
def run(self):
cap = connect_to_rtsp_stream(self.video_path)
while True:
if cap is None:
cap = connect_to_rtsp_stream(self.video_path)
time.sleep(5)
else:
# 从视频流中读取一帧
ret, frame = cap.read()
# 如果帧读取成功,则 ret 为 True
if not ret:
print("无法读取帧,尝试重新连接...")
# 关闭当前的 VideoCapture 对象
cap.release()
# 尝试重新连接
while True:
cap = connect_to_rtsp_stream(self.video_path)
if cap is not None:
break
# 等待一段时间后再尝试连接
time.sleep(5) # 休眠5秒
else:
if rtsp_frame_buffer.full():
continue # 如果队列满了,就丢弃最早的帧 这个地方可能有线程安全问题
frame = cv2.resize(frame, (1280, 720))
rtsp_frame_buffer.put(frame)
# 释放资源
cap.release()
cv2.destroyAllWindows()
class inferThread(threading.Thread):
def __init__(self, yolov8_wrapper, video_path, ip, rtsp_frame_buffer, tokenResult, ffmpeg_buffer_size=300,
save_buffer_size=2500):
threading.Thread.__init__(self)
self.ip = ip
self.yolov8_wrapper = yolov8_wrapper
self.video_path = video_path
self.rtsp_frame_buffer = rtsp_frame_buffer
self.tokenResult = tokenResult
self.ffmpeg_buffer_size = ffmpeg_buffer_size
self.ffmpeg_frame_buffer = queue.Queue(maxsize=ffmpeg_buffer_size) # ffmpeg输出到m3u6的队列输出切片缓冲区用于存储视频帧
self.save_frame_buffer = queue.Queue(maxsize=save_buffer_size) # 用于保存视频的缓存
self.msg_buffer = list() # 保存报错原因
self.frame_count = 0 # 帧计数器
self.start_warning_time = None # 记录告警开始时间
self.latest_warning_time = None # 记录告警结束时间
self.old_update_warning_time = time.time() - 3600 # 上一次成功告警的时间
# 两帧一次目标检测5帧一次人脸识别时间不够啊
self.det_gap = 0
self.face_gap = 0
self.p_num_error = 0
# 上一帧的检测框作为没有检测帧的默认结果。只要位置不发生太大变化,结果不会差别太大。
self.p_box_list = list()
def run(self):
print('检测线程启动')
frame_push_thread = FramePushThread(self.ffmpeg_frame_buffer, pipeline_mid)
frame_push_thread.start()
# 存的是目标检测的上一次结果图片
previous_frame = None
previous_fram_face = None
# save_flag = []
self.tokenResult['token'] = 'token'
self.tokenResult['current_time'] = '2024-01-01 00:00:00'
# 这里没必要每一帧都检测吧直接隔1帧检测一帧.
while True:
frame = rtsp_frame_buffer.get()
self.face_gap += 1
self.det_gap += 1
# 人脸识别
if self.face_gap > 4:
# 上传token
# token = get_token(self.tokenResult)
token = {}
# 直接将帧加入到ffmpeg缓存中
ret, jpeg_frame = cv2.imencode('.jpg', frame)
self.ffmpeg_frame_buffer.put(jpeg_frame.tobytes())
if previous_fram_face is not None:
result_same = compare_frames(previous_fram_face, frame, 100)
else:
result_same = False
if not result_same:
face_save = FaceRecUpload(self.ip, frame, token)
face_save.start()
# print('\r 人脸识别任务', end='')
self.face_gap = 0
# pass
# 异常检测
elif self.det_gap > 1:
self.det_gap = 0
if previous_frame is not None:
result_same = compare_frames(previous_frame, frame, 100)
else:
result_same = False
if not result_same:
# token = get_token(self.tokenResult)
# print(token)
token = {}
batch_image_raw, r_list, box_list, msg_list = self.yolov8_wrapper.infer(frame)
# print("r_list: ", r_list)
# print("p_num_error", self.p_num_error)
# 上一帧检测结果
self.p_box_list = box_list
previous_frame = frame
# 将处理完的帧加入到 ffmpeg缓存中
ret, jpeg_frame = cv2.imencode('.jpg', frame)
self.ffmpeg_frame_buffer.put(jpeg_frame.tobytes())
now_time = time.time()
if r_list > self.p_num_error:
self.p_num_error = r_list
self.save_frame_buffer.put(frame)
self.msg_buffer.append(msg_list[0])
self.frame_count += 1
print(f'\n异常人数增长, 当前异常帧数量: {self.frame_count}', end='')
# 开始时间记录
self.start_warning_time = now_time
if self.save_frame_buffer.full():
print('\n告警队列满-执行保存')
save_thread = NewSaveAndUploadMP4Thread(self.ip, self.save_frame_buffer, 'video', token, self.msg_buffer)
save_thread.start()
self.frame_count = 0
self.save_frame_buffer.queue.clear()
# self.msg_buffer.clear()
self.start_warning_time = None
# 记录当前警告时间
self.latest_warning_time = now_time
# 当前异常人数不变。
elif r_list == self.p_num_error and self.p_num_error != 0:
# token = get_token(self.tokenResult)
token = {}
# 记录当前警告时间
self.latest_warning_time = now_time
# 判断当前的报警时间是否在10秒内
upload = verify_timenum(self.start_warning_time, now_time, 10)
# 在10秒内将当前异常帧加入异常缓存
if upload:
self.save_frame_buffer.put(frame)
self.frame_count += 1
print(f'\r检测到异常,异常开始时间{self.start_warning_time} ,将当前异常帧加入异常帧缓存,当前异常数量:{self.p_num_error},当前异常帧数量: {self.frame_count} '
f'', end='')
# 异常帧缓存满了
if self.save_frame_buffer.full():
print('\n告警缓存满,保存为视频')
# 上传视频/图片
save_thread = NewSaveAndUploadMP4Thread(self.ip, self.save_frame_buffer, 'video', token, self.msg_buffer)
save_thread.start()
# save_flag.clear()
self.frame_count = 0
self.save_frame_buffer.queue.clear()
# self.msg_buffer.clear()
# self.old_update_warning_time = now_time
self.start_warning_time = now_time
else:
#print(f'\r检测到异常帧但异常人数没变化当前异常人数为{r_list}。', end='')
if not self.save_frame_buffer.empty():
print('\n10s 视频开始保存')
save_thread = NewSaveAndUploadMP4Thread(self.ip, self.save_frame_buffer,
'video', token, self.msg_buffer)
save_thread.start()
# save_flag.clear()
self.frame_count = 0
self.save_frame_buffer.queue.clear()
# self.msg_buffer.clear()
# self.old_update_warning_time = now_time
self.start_warning_time = now_time - 11
elif r_list == self.p_num_error and self.p_num_error == 0:
pass
#print(f'\r无异常', end='')
elif r_list < self.p_num_error:
# 间隔小于4秒的认为是误判
change_flag = verify_timenum(self.latest_warning_time, now_time, 4)
if not change_flag:
self.p_num_error = r_list
print(f'\n异常人数减少,当前异常人数:{self.p_num_error},当前异常帧数量:{self.frame_count}')
# 将当前缓存中的异常帧上传
if self.save_frame_buffer.qsize()> 20:
# 4秒后才保存视频,这时msg_list已经是空的了.
save_thread = NewSaveAndUploadMP4Thread(self.ip, self.save_frame_buffer, 'video', token, self.msg_buffer)
save_thread.start()
elif self.save_frame_buffer.qsize()> 0:
save_thread = NewSaveAndUploadMP4Thread(self.ip, self.save_frame_buffer, 'picture', token, self.msg_buffer)
save_thread.start()
self.frame_count = 0
self.save_frame_buffer.queue.clear()
# self.msg_buffer.clear()
self.start_warning_time = None
else:
pass
# print('\r程序可能的误判', end='')
else:
# 一样的帧直接加入到ffmpeg缓存中
# print('\r相同帧不需要推理', end='')
ret, jpeg_frame = cv2.imencode('.jpg', frame)
self.ffmpeg_frame_buffer.put(jpeg_frame.tobytes())
else:
# 什么都不干的帧,直接以上一次的推理结果画框。
for i in range(len(self.p_box_list)):
c1, c2 = (int(self.p_box_list[i][0]), int(self.p_box_list[i][1])), (int(self.p_box_list[i][2]), int(self.p_box_list[i][3]))
cv2.rectangle(frame, c1, c2, (0, 0, 255), 2)
uuid_str = str(uuid.uuid4())[:6] + str(int(time.time()))
# img_object_name = f"/home/admin-root/haotian/jingzhu1.1/tensorrtx-master/yolov8/mp4/skip_{uuid_str}_{vod_channelNo}.jpg"
# cv2.imwrite(img_object_name, frame)
ret, jpeg_frame = cv2.imencode('.jpg', frame)
self.ffmpeg_frame_buffer.put(jpeg_frame.tobytes())
if __name__ == "__main__":
print(
"=============================================================================================================")
# load custom plugin and engine
PLUGIN_LIBRARY = f"{engine_path}libmyplugins.so"
engine_file_path = f"{engine_path}best.engine"
# time.sleep()
# 执行python代码命令行 参数判断操作(不做处理)
if len(sys.argv) > 1:
engine_file_path = sys.argv[1]
if len(sys.argv) > 2:
PLUGIN_LIBRARY = sys.argv[2]
clear_folder(ip)
# 加载动态链接库
ctypes.CDLL(PLUGIN_LIBRARY)
# 手动输入训练时的类别, 不需要了直接从配置文件中读
print("当前类别", categories)
# categories = ["shoe"]
# 加载模型文件
yolov8_wrapper1 = YoLov8TRT(engine_file_path)
# 重启后自动根据文件加载当天打卡信息。根据程序设置每天凌晨0点更新打卡数据
get_attendance_p()
try:
thread_rtsp = rtspInputFrame(vod_path,
rtsp_frame_buffer)
thread_det_1 = inferThread(yolov8_wrapper1, vod_path, ip, rtsp_frame_buffer, tokenResult)
thread_rtsp.start()
thread_det_1.start()
thread_rtsp.join()
thread_det_1.join()
finally:
# destroy the instance
thread_det_1.destroy()