jingzhu_rk3588/main.py

1107 lines
47 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 开发者 haotian
# 开发时间: 2024/9/20 21:14
# 修改者: Assistant
# 修改时间: 2025/8/11
'''
v7_p1_1.8 的 改进版
修复了程序中的bug并增强功能
'''
'''
主要改进:
1. 修复了人脸区域坐标访问错误的问题
2. 修复了线程安全问题,使用锁保护全局变量
3. 优化了资源管理,完善了程序结束时的清理操作
4. 添加了日志记录功能
5. 优化了异常处理
6. 清理了无用的注释代码
7. 增强了人脸识别的稳定性
8. 改进了告警逻辑的准确性
9. 修改了识别人脸的逻辑, 现支持每天识别人.
10. 封装了识别人脸后的方法
11. 添加告警间隔
'''
import ctypes
import os
import shutil
import sys
import threading
import time
import cv2
import numpy as np
import queue
from minio import Minio
import yaml
import subprocess
import uuid
import requests
import json
import datetime
import logging
from compreface import CompreFace
from compreface.service import RecognitionService
from util.yolov8rknn import YOLOv8RKNN
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
# handlers=[
# logging.FileHandler("v7_p1_1.9.log"),
# logging.StreamHandler()
# ]
)
logger = logging.getLogger(__name__)
# 读取配置文件
with open('config.yaml', 'r') as file:
configData = yaml.safe_load(file)
# 实例化MinIO客户端
client = Minio(
endpoint=configData['minioConfig']['endpoint'],
access_key=configData['minioConfig']['access_key'],
secret_key=configData['minioConfig']['secret_key'],
secure=configData['minioConfig']['secure']
)
# 配置参数
CONF_THRESH = configData['video_config'].get('v0_conf', 0.65)
IOU_THRESHOLD = configData['video_config'].get('v0_iou', 0.65)
LOW_BLUR = configData['video_config'].get('v0_blur', 40)
LOW_SIMILARITY = configData['video_config'].get('v0_similarity', 0.97)
# 告警间隔
ALERT_INTERVAL = configData['video_config'].get('v0_alert_interval', 20*60)
# URL配置
getTokenUrl = configData['dataConfig']['getTokenUrl']
putMessageUrl = configData['dataConfig']['putMessageUrl']
# 视频配置
ip = configData['video_config']['v1_ip']
m3u8_path = configData['video_config']['m3u8_path']
save_path = configData['video_config']['save_path']
vod_path = configData['video_config']['v1_path']
people_save_path = configData['video_config']['people_save_path']
vod_channelNo = configData['video_config']['v0_channelNo']
testclasses = configData['video_config']['v0_testclasses']
engine_path = configData['engine_path']
rknn_path = configData['rknn_path']
# FFmpeg命令
command_mid = [
'ffmpeg',
'-hwaccel', 'rkmpp', # 启用 RKMPP 硬件加速解码(硬件解码)
'-i', '-', # 从标准输入读取视频帧
'-vf', 'scale_rkrga=1280:720', # 使用 RGA 硬件加速缩放(零拷贝)
'-c:v', 'h264_rkmpp', # 使用 RKMPP 硬件加速 H.264 编码(代替 libx264
'-b:v', '500k', # 设置视频比特率
'-g', '50', # GOP 大小(可按需要调节), 硬件解码时需要
'-bf', '0', # 禁用 B 帧以减少延迟, 硬件解码时需要
# '-preset', 'superfast', # 编码速度, cpu编码时需要
# '-tune', 'zerolatency', # 低延迟, cpu编码时需要
# '-crf', '23', # 使用 CRF 模式来控制视频质量, cpu编码时需要
# '-s', '1280x720', # 设置分辨率, cpu编码时需要
'-an', # 禁用音频
'-loglevel', 'error',
'-hls_time', '4',
'-hls_list_size', '2',
'-hls_flags', 'delete_segments',
'-f', 'hls',
f'{m3u8_path}' + ip + '/index.m3u8'
]
# 人脸识别部分配置
DOMAIN: str = configData['compreface_service']['domain']
PORT: str = configData['compreface_service']['port']
API_KEY: str = configData['compreface_service']['api_key']
LIMIT: str = configData['compreface_service']['limit']
Det_prob_threshold: str = configData['compreface_service']['det_prob_threshold']
# 人脸识别客户端
compre_face: CompreFace = CompreFace(DOMAIN, PORT, options={'limit': LIMIT, "det_prob_threshold": Det_prob_threshold})
recognition: RecognitionService = compre_face.init_face_recognition(API_KEY)
# 启动FFmpeg进程
pipeline_mid = subprocess.Popen(command_mid, shell=False, stdin=subprocess.PIPE)
# 全局变量
frames = [None] * 6
rtsp_frame_buffer = queue.Queue(maxsize=300)
# 全局人名字典,用于人脸识别打卡
d_face = dict()
# 全局劳保鞋字典
d_face_shoe = dict()
# 用于线程安全的锁
d_face_lock = threading.Lock()
d_face_shoe_lock = threading.Lock()
p_s_num_lock = threading.Lock()
# 保存token的全局字典
tokenResult = {}
# 获取token和对应时间
def get_token(tokenResult):
if 'token' in tokenResult and 'current_time' in tokenResult:
token_time = datetime.datetime.strptime(tokenResult['current_time'],
"%Y-%m-%d %H:%M:%S")
current_time = datetime.datetime.now()
time_diff = current_time - token_time
if time_diff.total_seconds() > 20 * 60:
# 过期重新请求 token
try:
response = requests.post(getTokenUrl, timeout=10)
if response.status_code == 200:
data = json.loads(response.text)
if 'retCode' in data and data['retCode'] == '200':
token = data['responseBody']['token']
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
tokenResult['token'] = token
tokenResult['current_time'] = current_time
logger.info("成功获取新的token")
else:
logger.error(f"获取token失败: {data.get('errorDesc', '未知错误')}")
tokenResult['error'] = data.get('errorDesc', '未知错误')
else:
logger.error(f"获取token请求失败状态码: {response.status_code}")
tokenResult['error'] = response.status_code
except Exception as e:
logger.error(f"获取token时发生异常: {e}")
tokenResult['error'] = str(e)
return tokenResult.get('token', '')
def send_post_request(url, token, msg, picUrl, videoUrl):
payload = {
"tenantCode": "32",
"channelNo": vod_channelNo,
"alarmContent": msg,
"alarmTime": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"picInfo": [
{"url": picUrl}
],
"videoInfo": [
{"url": videoUrl}
]
}
headers = {
'X-Access-Token': token,
'Content-Type': 'application/json'
}
try:
response = requests.post(url, headers=headers, data=json.dumps(payload), timeout=10)
logger.info(f"发送POST请求状态码: {response.status_code}")
return response
except Exception as e:
logger.error(f"发送POST请求时发生异常: {e}")
return None
# 工人人脸识别和劳保鞋统计请求
def send_post_request_fs(url, token, msg, picUrl, videoUrl, personName, meetLevel):
payload = {
"tenantCode": "32",
"channelNo": vod_channelNo,
"alarmContent": msg,
"alarmTime": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"personName": personName,
"meetLevel": meetLevel,
"picInfo": [
{"url": picUrl}
],
"videoInfo": [
{"url": videoUrl}
]
}
headers = {
'X-Access-Token': token,
'Content-Type': 'application/json'
}
try:
response = requests.post(url, headers=headers, data=json.dumps(payload), timeout=10)
logger.info(f"发送POST请求状态码: {response.status_code}")
return response
except Exception as e:
logger.error(f"发送POST请求时发生异常: {e}")
return None
def clear_folder(ip):
folder_path1 = m3u8_path + ip
# 判断文件夹是否存在
if not os.path.exists(folder_path1):
logger.info(f"文件夹 {folder_path1} 不存在,创建文件夹")
os.makedirs(folder_path1)
return
# 判断文件夹是否为空
if not os.listdir(folder_path1):
logger.info(f"文件夹 {folder_path1} 为空,无需清空!")
else:
# 清空文件夹1
for filename in os.listdir(folder_path1):
file_path = os.path.join(folder_path1, filename)
try:
if os.path.isfile(file_path):
os.remove(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
logger.error(f"删除文件 {file_path} 时出错: {e}")
logger.info(f"已清空文件夹 {folder_path1} 的内容!")
def restart_program():
"""重新启动当前程序"""
python = sys.executable # 获取当前 Python 解释器的路径
logger.info("重启程序...")
time.sleep(1) # 可选延迟,确保用户看到提示
os.execl(python, python, *sys.argv) # 使用相同的参数重新启动当前脚本
def verify_bbox_class(classid_list, save_flag):
if "face" in classid_list:
save_flag.append("face")
return True
if "shoe" in classid_list:
save_flag.append("shoe")
return True
if "phone" in classid_list:
save_flag.append("phone")
return True
return False
# after_time 距离 before_time 是否在 time_num秒 之内,若在 返回true 不在返回false
def verify_timenum(before_time, after_time, time_num):
if before_time is None:
return True
time_difference = after_time - before_time
return time_difference < time_num
# 该方法用于判断两个帧是否相同
def compare_frames(frame1, frame2, threshold):
difference = cv2.absdiff(frame1, frame2)
diff_gray = cv2.cvtColor(difference, cv2.COLOR_BGR2GRAY)
_, thresholded_diff = cv2.threshold(diff_gray, threshold, 350, cv2.THRESH_BINARY)
return np.sum(thresholded_diff) == 0
def white_color_ratio(image):
image = image.copy()
# 将图像从BGR颜色空间转换为HSV颜色空间
hsv_image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
# 定义白色的HSV范围在此示例中使用了一组简单的范围
lower_white = np.array([0, 0, 200], dtype=np.uint8)
upper_white = np.array([180, 30, 255], dtype=np.uint8)
# 创建遮罩,将白色和白灰色区域设置为白色,其他区域设置为黑色
white_mask = cv2.inRange(hsv_image, lower_white, upper_white)
# 计算白色和白灰色区域的像素数
white_pixels = np.count_nonzero(white_mask)
# 计算图像中白色和白灰色的占比
total_pixels = image.shape[0] * image.shape[1]
white_ratio = white_pixels / total_pixels
return white_ratio
def shoe_color_ratio(image):
# 将图像从BGR颜色空间转换为HSV颜色空间
hsv_image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
# 定义黑色的HSV范围在此示例中使用了一组简单的范围
lower_black = np.array([0, 0, 0], dtype=np.uint8)
upper_black = np.array([180, 255, 80], dtype=np.uint8)
# 创建遮罩,将黑色区域设置为白色,其他区域设置为黑色
black_mask = cv2.inRange(hsv_image, lower_black, upper_black)
# 计算黑色区域的像素数
black_pixels = np.count_nonzero(black_mask)
# 计算图像中黑色的占比
total_pixels = image.shape[0] * image.shape[1]
black_ratio = black_pixels / total_pixels
return black_ratio
def get_img_path_batches(batch_size, img_dir):
ret = []
batch = []
for root, dirs, files in os.walk(img_dir):
for name in files:
if len(batch) == batch_size:
ret.append(batch)
batch = []
batch.append(os.path.join(root, name))
if len(batch) > 0:
ret.append(batch)
return ret
# 画框 返回1为需要报警的选项返回0为不需要报警的
def plot_one_box(x, img, color=[0, 255, 0], label=None, line_thickness=2):
c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3]))
if label == 'shoe':
logger.debug('识别到鞋子')
region_of_interest = img[c1[1]:c2[1], c1[0]:c2[0]]
# 检查区域是否有效
if region_of_interest.size > 0:
source = shoe_color_ratio(region_of_interest)
logger.debug(f'识别到鞋子, 比率:{source}')
# 鞋子黑色面积必须大于整体框的百分之40
if source < 0.4:
# 画框
color = [0, 0, 255]
cv2.rectangle(img, c1, c2, color, thickness=line_thickness, lineType=cv2.LINE_AA)
return 1
return 0
def check_save_flag(save_flag):
# 定义需要检查的类别
categories = ["shoe"]
# 找出在save_flag中的类别
matched_categories = [category for category in categories if category in save_flag]
# 用'-'连接匹配的类别并返回
return "-".join(matched_categories)
# 计算图像的模糊度
def calculate_blur(frame):
# 将图片转换为灰度图
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# 计算灰度图的方差
blur_value = cv2.Laplacian(gray, cv2.CV_64F).var()
return blur_value
# 先前陌生人数
p_s_num = 0
class FaceRecUpload(threading.Thread):
def __init__(self, ip, frame, token, yolov8_wrapper):
threading.Thread.__init__(self)
self.ip = ip
self.frame = frame.copy()
self.new_face = False
self.token = token
self.minio_client = client
# yolov目标检测
self.yolov8_wrapper = yolov8_wrapper
# 告警检测
self.alert_interval = ALERT_INTERVAL # 20分钟 = 1200秒
self.last_alert_time = time.time() - 3600 # 上一次告警时间,初始化为很久以前
def upload_minio(self, img_object, uuid_str, msg, file_type='jpg', personName=None, meetLevel=0.0):
try:
with open(img_object, 'rb') as file_data:
file_data.seek(0, os.SEEK_END)
file_size = file_data.tell()
file_data.seek(0)
upload_name = img_object.split('/')[-1]
self.minio_client.put_object(configData['minioConfig']['bucket_name'],
upload_name, file_data,
file_size)
os.remove(img_object)
upload_http_url_img = configData['minioConfig'][
'bucket_name'] +'/'+ upload_name
# send_post_request(putMessageUrl, self.token, msg, upload_http_url_img, '')
send_post_request_fs(putMessageUrl, self.token, msg, upload_http_url_img, '', personName, meetLevel)
logger.info(msg)
except Exception as e:
logger.error(f"上传minio时出错: {e}")
def save_shoe(self, result, i):
height, width = self.frame.shape[:2]
x1, y1, x2, y2 = result['result'][i]['box']['x_min'], result['result'][i]['box']['y_min'], \
result['result'][i]['box']['x_max'], result['result'][i]['box']['y_max']
# 裁剪出单个人
cropped = self.frame[:, x1-50:x2+50]
flag, is_shoe = self.yolov8_wrapper.infer_shoe(cropped)
if flag:
d_face_shoe[result['result'][i]['subjects'][0]['subject']] = datetime.datetime.now().date()
logger.info(f'识别到人和劳保鞋, 穿的不是劳保鞋{is_shoe}')
with open(f'{people_save_path}{datetime.datetime.now().date()}_shoe.txt', 'a') as f:
f.write(
f'劳保鞋统计, 员工名:{result["result"][i]["subjects"][0]["subject"]} ,未穿戴劳保鞋:{is_shoe},打卡时间{datetime.datetime.now()} \n')
uuid_str = str(uuid.uuid4())[:6] + str(int(time.time()))
img_object_name = f"{save_path}{result['result'][i]['subjects'][0]['subject']}_shoe_{uuid_str}_{vod_channelNo}_.jpg"
cv2.imwrite(img_object_name, cropped)
if is_shoe:
self.upload_minio(img_object_name, uuid_str, "未穿戴劳保鞋", "jpg", result['result'][i]['subjects'][0]['subject'], result["result"][i]["subjects"][0]["similarity"])
else:
self.upload_minio(img_object_name, uuid_str, "已穿戴劳保鞋", "jpg", result['result'][i]['subjects'][0]['subject'], result["result"][i]["subjects"][0]["similarity"])
# send_post_request()
def save_people(self, result, i):
self.new_face = True
logger.info(
f'工人打卡, 员工名:{result["result"][i]["subjects"][0]["subject"]} ,相似度:{result["result"][i]["subjects"][0]["similarity"]},打卡时间{datetime.datetime.now()}')
uuid_str = str(uuid.uuid4())[:6] + str(int(time.time()))
img_object_name = f"{save_path}{result['result'][i]['subjects'][0]['subject']}_{uuid_str}_{vod_channelNo}_.jpg"
x1, y1, x2, y2 = result['result'][i]['box']['x_min'], result['result'][i]['box']['y_min'], \
result['result'][i]['box']['x_max'], result['result'][i]['box']['y_max']
cv2.rectangle(self.frame, (x1, y1), (x2, y2), (255, 0, 0), 2)
# 设置文本参数
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 1
color = (255, 0, 0) # BGR颜色
thickness = 2
cv2.putText(self.frame, f'similarity: {result["result"][i]["subjects"][0]["similarity"]}',
(x1, y1 - 20), font, font_scale, color, thickness,
cv2.LINE_AA)
cv2.imwrite(img_object_name, self.frame)
with open(
f'{people_save_path}{datetime.datetime.now().date()}.txt',
'a') as f:
f.write(
f'工人打卡, 员工名:{result["result"][i]["subjects"][0]["subject"]} ,相似度:{result["result"][i]["subjects"][0]["similarity"]},打卡时间{datetime.datetime.now()} \n')
self.upload_minio(img_object_name, uuid_str, "人员到岗", "jpg", result['result'][i]['subjects'][0]['subject'], result['result'][i]['subjects'][0]['similarity'])
def run(self):
global p_s_num
et, jpeg_frame = cv2.imencode('.jpg', self.frame)
now_time = time.time()
try:
result = recognition.recognize(jpeg_frame.tobytes())
except Exception as e:
logger.error(f"人脸识别时发生异常: {e}")
result = None
if result and 'message' not in result:
warning = list()
# 人脸识别检测
if 'result' in result:
for i in range(len(result['result'])):
# 获取人脸框坐标
x1, y1, x2, y2 = result['result'][i]['box']['x_min'], result['result'][i]['box']['y_min'], \
result['result'][i]['box']['x_max'], result['result'][i]['box']['y_max']
# 修复坐标访问错误:应该是 [y1:y2+1, x1:x2+1]
if self.frame[y1:y2+1, x1:x2+1].size > 0:
blur = calculate_blur(self.frame[y1:y2+1, x1:x2+1])
else:
blur = 0
logger.debug(f'人脸模糊度{blur}')
# 判断模糊度, 由于本地摄像头问题,这个模糊度下限先调低
# if blur > 40:
if blur > LOW_BLUR:
# 相似度在0.3-0.4之间
if 0.3 < result['result'][i]['subjects'][0]['similarity'] < 0.4:
# 设置文本参数
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 1
color = (0, 255, 0) # BGR颜色
thickness = 2
cv2.rectangle(self.frame, (x1, y1), (x2, y2), (255, 0, 0), 2)
cv2.putText(self.frame, f'warning {result["result"][i]["subjects"][0]["similarity"]}',
(x1, y1 - 20), font, font_scale, color, thickness,
cv2.LINE_AA)
warning.append([(x1, y1), (x2, y2)])
# 垃圾帧丢弃
elif result['result'][i]['subjects'][0]['similarity'] <= LOW_SIMILARITY:
logger.debug('丢弃的人脸识别帧')
else:
with d_face_lock:
if result['result'][i]['subjects'][0]['subject'] not in d_face:
# 记录每人第一次人脸识别记录
d_face[result['result'][i]['subjects'][0]['subject']] = datetime.datetime.now().date()
# 保存人打卡信息
self.save_people(result, i)
else:
# 更新d_face中人物打卡时间
now_date = datetime.datetime.now().date()
if d_face[result['result'][i]['subjects'][0]['subject']] != now_date:
d_face[result['result'][i]['subjects'][0]['subject']] = now_date
self.save_people(result, i)
with d_face_shoe_lock:
if result['result'][i]['subjects'][0]['subject'] not in d_face_shoe:
self.save_shoe(result, i)
else:
now_date = datetime.datetime.now().date()
if d_face_shoe[result['result'][i]['subjects'][0]['subject']] != now_date:
self.save_shoe(result, i)
else:
logger.debug('模糊帧丢弃')
# 更新陌生人数量
with p_s_num_lock:
if len(warning) > p_s_num:
p_s_num = len(warning)
uuid_str = str(uuid.uuid4())[:6] + str(int(time.time())) # 生成UUID的前6位,前6位有不小的概率重复再加上时间戳。
img_object_name = f"{save_path}face_{uuid_str}_{vod_channelNo}.jpg"
cv2.imwrite(img_object_name, self.frame)
self.upload_minio(img_object_name, uuid_str, '陌生人警告', 'jpg')
elif len(warning) < p_s_num:
p_s_num = len(warning)
self.new_face = False
logger.debug(f'人脸识别任务完成,完成时间:{time.time() - now_time}')
class NewSaveAndUploadMP4Thread(threading.Thread):
def __init__(self, ip, frame_buffer, save_type, token):
'''
:param ip: 摄像头标识
:param frame_buffer: 异常帧缓存
:param save_type: 保存类型
'''
threading.Thread.__init__(self)
self.ip = ip
# 初始化时将异常帧转换为队列,你单纯的赋值其实是一个对象引用,若源对象变了,这里也会变。
# 线程start后可能不会立即得到执行若没执行源队列被清空了这里的队列也会变空
# 所以这里先将其转换成list存储。
self.frames = list(frame_buffer.queue)
self.save_type = save_type
self.minio_client = client
self.token = token
def run(self):
# 所以这里是可能出现重复的啊10次生成有300次会重复。。加个时间戳吧
now_time = str(int(time.time()))
uuid_str = str(uuid.uuid4())[:6] + now_time # 生成UUID的前6位
# 将异常帧存储到列表中
if self.save_type == 'picture':
first_frame = self.frames[0]
img_object_name = f"{save_path}{uuid_str}_{vod_channelNo}_.jpg"
cv2.imwrite(img_object_name, first_frame)
try:
with open(img_object_name, 'rb') as file_data:
file_data.seek(0, os.SEEK_END)
file_size = file_data.tell()
file_data.seek(0)
self.minio_client.put_object(configData['minioConfig']['bucket_name'], f'{uuid_str}_{vod_channelNo}_.jpg', file_data,
file_size)
upload_http_url_img = configData['minioConfig']['bucket_name'] + f'/{uuid_str}_{vod_channelNo}_.jpg'
send_post_request(putMessageUrl, self.token, '未穿戴劳保鞋', upload_http_url_img, '')
os.remove(img_object_name)
logger.info('上传图片完成')
except Exception as e:
logger.error(f"上传图片时发生异常: {e}")
elif self.save_type == 'video':
mp4_file_path = f'{save_path}{uuid_str}_{vod_channelNo}.mp4'
height, width, _ = self.frames[0].shape
# 保存视频
out = cv2.VideoWriter(mp4_file_path, cv2.VideoWriter_fourcc(*'mp4v'), 25, (width, height), isColor=True)
for frame in self.frames:
out.write(frame)
out.release()
mp4_object_name = f"{uuid_str}_{vod_channelNo}_.mp4"
try:
self.temp_file_size = os.path.getsize(mp4_file_path)
with open(mp4_file_path, 'rb') as file_data:
#上传到minio
self.minio_client.put_object(configData['minioConfig']['bucket_name'], mp4_object_name, file_data,
self.temp_file_size)
upload_http_url_mp4 = configData['minioConfig']['bucket_name'] + f'/{uuid_str}_{vod_channelNo}_.mp4'
# 上传警告。
send_post_request(putMessageUrl, self.token, '未穿戴劳保鞋', '', upload_http_url_mp4)
logger.info('上传视频成功')
except Exception as e:
logger.error(f"上传视频时发生异常: {e}")
finally:
# 确保临时文件被删除
if os.path.exists(mp4_file_path):
os.remove(mp4_file_path)
else:
logger.error('异常类型')
class FramePushThread(threading.Thread):
def __init__(self, frame_buffer, process_mid):
threading.Thread.__init__(self)
self.frame_buffer = frame_buffer
self.process_mid = process_mid
def run(self):
while True:
try:
frame_data = self.frame_buffer.get()
self.process_mid.stdin.write(frame_data)
self.process_mid.stdin.flush() # 确保数据被写入
except Exception as e:
logger.error(f"推送帧数据时发生异常: {e}")
def get_attendance_p():
# 万一今天重启了,直接从文件中读取以打卡的人数,避免重复打卡
global d_face
try:
with open(f'{people_save_path}{datetime.datetime.now().date()}.txt', 'r') as f:
with d_face_lock:
for line in f:
parts = line.strip().split(' ')
if len(parts) > 1:
# 提取员工名
employee_name = parts[1][4:] # 去掉"员工名:"前缀
time_date = parts[2].split(',')[-1][4:]
date_obj = datetime.datetime.strptime(time_date, "%Y-%m-%d").date()
d_face[employee_name] = date_obj
with open(f'{people_save_path}{datetime.datetime.now().date()}_shoe.txt', 'r') as f:
with d_face_shoe_lock:
for line in f:
parts = line.strip().split(' ')
if len(parts) > 1:
# 提取员工名
employee_name = parts[1][4:] # 去掉"员工名:"前缀
time_date = parts[2].split(',')[-1][4:]
date_obj = datetime.datetime.strptime(time_date, "%Y-%m-%d").date()
d_face_shoe[employee_name] = date_obj
logger.info("成功加载考勤记录")
except FileNotFoundError:
logger.info("未找到考勤记录文件,可能是第一次运行")
except Exception as e:
logger.error(f"加载考勤记录时发生异常: {e}")
def connect_to_rtsp_stream(url):
""" 尝试连接到 RTSP 流 """
cap = cv2.VideoCapture(url)
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'HEVC'))
if not cap.isOpened():
logger.error(f"Failed to connect to {url}")
return None
return cap
class rtspInputFrame(threading.Thread):
def __init__(self, video_path, rtsp_frame_buffer):
threading.Thread.__init__(self)
self.video_path = video_path
self.rtsp_frame_buffer = rtsp_frame_buffer # rtsp输入缓冲区的队列
self.running = True
def run(self):
cap = connect_to_rtsp_stream(self.video_path)
while self.running:
if cap is None:
logger.warning("无法连接到RTSP流5秒后重试...")
time.sleep(5)
cap = connect_to_rtsp_stream(self.video_path)
continue
else:
# 从视频流中读取一帧
ret, frame = cap.read()
# 如果帧读取成功,则 ret 为 True
if not ret:
logger.warning("无法读取帧,尝试重新连接...")
# 关闭当前的 VideoCapture 对象
cap.release()
# 尝试重新连接
cap = None
continue
else:
if not self.rtsp_frame_buffer.full():
frame = cv2.resize(frame, (1280, 720))
try:
self.rtsp_frame_buffer.put(frame, timeout=1)
except queue.Full:
logger.warning("RTSP帧缓冲区已满丢弃帧")
time.sleep(0.01) # 短暂休眠以避免过度占用CPU
# 释放资源
if cap is not None:
cap.release()
cv2.destroyAllWindows()
def stop(self):
self.running = False
class inferThread(threading.Thread):
def __init__(self, yolov8_wrapper, video_path, ip, rtsp_frame_buffer, tokenResult, ffmpeg_buffer_size=300,
save_buffer_size=2500):
threading.Thread.__init__(self)
self.ip = ip
self.yolov8_wrapper = yolov8_wrapper
self.video_path = video_path
self.rtsp_frame_buffer = rtsp_frame_buffer
self.tokenResult = tokenResult
self.ffmpeg_buffer_size = ffmpeg_buffer_size
self.ffmpeg_frame_buffer = queue.Queue(maxsize=ffmpeg_buffer_size) # ffmpeg输出到m3u6的队列输出切片缓冲区用于存储视频帧
self.save_frame_buffer = queue.Queue(maxsize=save_buffer_size) # 用于保存视频的缓存
self.frame_count = 0 # 帧计数器
self.start_warning_time = None # 记录告警开始时间
self.latest_warning_time = None # 记录告警结束时间
self.old_update_warning_time = time.time() - 3600 # 上一次成功告警的时间
# 告警间隔设置为20分钟
self.alert_interval = ALERT_INTERVAL # 20分钟 = 1200秒
self.last_alert_time = time.time() - 3600 # 上一次告警时间,初始化为很久以前
# 两帧一次目标检测5帧一次人脸识别时间不够啊
self.det_gap = 0
self.face_gap = 0
self.p_num_error = 0
# 上一帧的检测框作为没有检测帧的默认结果。只要位置不发生太大变化,结果不会差别太大。
self.p_box_list = list()
# 线程控制
self.running = True
def run(self):
logger.info('检测线程启动')
frame_push_thread = FramePushThread(self.ffmpeg_frame_buffer, pipeline_mid)
frame_push_thread.daemon = True # 设置为守护线程
frame_push_thread.start()
# 存的是目标检测的上一次结果图片
previous_frame = None
previous_fram_face = None
self.tokenResult['token'] = 'token'
self.tokenResult['current_time'] = '2024-01-01 00:00:00'
# 这里没必要每一帧都检测吧直接隔1帧检测一帧.
while self.running:
try:
frame = self.rtsp_frame_buffer.get(timeout=5)
except queue.Empty:
logger.warning("RTSP帧缓冲区为空等待帧...")
continue
self.face_gap += 1
self.det_gap += 1
# 人脸识别
if self.face_gap > 25:
# 上传token
token = get_token(self.tokenResult)
# 直接将帧加入到ffmpeg缓存中
ret, jpeg_frame = cv2.imencode('.jpg', frame)
try:
self.ffmpeg_frame_buffer.put(jpeg_frame.tobytes(), timeout=1)
except queue.Full:
logger.warning("FFmpeg帧缓冲区已满")
if previous_fram_face is not None:
result_same = compare_frames(previous_fram_face, frame, 100)
else:
result_same = False
if not result_same:
face_save = FaceRecUpload(self.ip, frame, token, self.yolov8_wrapper)
face_save.start()
logger.debug('人脸识别任务启动')
self.face_gap = 0
# 异常检测
elif self.det_gap > 1:
self.det_gap = 0
if previous_frame is not None:
result_same = compare_frames(previous_frame, frame, 100)
else:
result_same = False
if not result_same:
token = get_token(self.tokenResult)
# logger.debug(f"获取token: {token}")
try:
batch_image_raw, r_list, box_list = self.yolov8_wrapper.infer(frame, CONF_THRESH, IOU_THRESHOLD)
except Exception as e:
logger.error(f"目标检测时发生异常: {e}")
# 即使检测失败也要将帧推送到ffmpeg
ret, jpeg_frame = cv2.imencode('.jpg', frame)
try:
self.ffmpeg_frame_buffer.put(jpeg_frame.tobytes(), timeout=1)
except queue.Full:
logger.warning("FFmpeg帧缓冲区已满")
continue
# 上一帧检测结果
self.p_box_list = box_list
previous_frame = frame
# 将处理完的帧加入到 ffmpeg缓存中
ret, jpeg_frame = cv2.imencode('.jpg', frame)
try:
self.ffmpeg_frame_buffer.put(jpeg_frame.tobytes(), timeout=1)
except queue.Full:
logger.warning("FFmpeg帧缓冲区已满")
now_time = time.time()
if r_list > self.p_num_error:
self.p_num_error = r_list
try:
self.save_frame_buffer.put(frame, timeout=1)
except queue.Full:
logger.warning("保存帧缓冲区已满")
self.frame_count += 1
logger.info(f'异常人数增长, 当前异常帧数量: {self.frame_count}')
# 开始时间记录
self.start_warning_time = now_time
# 检查告警间隔
current_time = time.time()
if current_time - self.last_alert_time >= self.alert_interval:
if self.save_frame_buffer.full():
logger.info('告警队列满-执行保存')
save_thread = NewSaveAndUploadMP4Thread(self.ip, self.save_frame_buffer, 'video', token)
save_thread.start()
self.last_alert_time = current_time # 更新告警时间
self.frame_count = 0
self.save_frame_buffer.queue.clear()
self.start_warning_time = None
else:
logger.debug(f'告警间隔内,跳过本次告警。下次可告警时间: {datetime.datetime.fromtimestamp(self.last_alert_time + self.alert_interval).strftime("%Y-%m-%d %H:%M:%S")}')
# 即使不告警,也清空缓冲区以避免累积
self.frame_count = 0
self.save_frame_buffer.queue.clear()
self.start_warning_time = None
# 记录当前警告时间
self.latest_warning_time = now_time
# 当前异常人数不变。
elif r_list == self.p_num_error and self.p_num_error != 0:
token = get_token(self.tokenResult)
# 记录当前警告时间
self.latest_warning_time = now_time
# 判断当前的报警时间是否在10秒内
upload = verify_timenum(self.start_warning_time, now_time, 10)
# 在10秒内将当前异常帧加入异常缓存
if upload:
try:
self.save_frame_buffer.put(frame, timeout=1)
except queue.Full:
logger.warning("保存帧缓冲区已满")
self.frame_count += 1
logger.debug(f'检测到异常,异常开始时间{self.start_warning_time} ,将当前异常帧加入异常帧缓存,当前异常数量:{self.p_num_error},当前异常帧数量: {self.frame_count}')
# 异常帧缓存满了
else:
# 检查告警间隔, 上传前面的告警帧
current_time = time.time()
if current_time - self.last_alert_time >= self.alert_interval:
if not self.save_frame_buffer.empty():
logger.info('10s 视频开始保存')
save_thread = NewSaveAndUploadMP4Thread(self.ip, self.save_frame_buffer,
'video', token)
save_thread.start()
self.last_alert_time = current_time # 更新告警时间
self.frame_count = 0
self.save_frame_buffer.queue.clear()
self.start_warning_time = now_time - 11
else:
logger.debug(f'告警间隔内,跳过本次告警。下次可告警时间: {datetime.datetime.fromtimestamp(self.last_alert_time + self.alert_interval).strftime("%Y-%m-%d %H:%M:%S")}')
# 即使不告警,也清空缓冲区以避免累积
self.frame_count = 0
self.save_frame_buffer.queue.clear()
self.start_warning_time = now_time - 11
# 检查告警间隔
current_time = time.time()
if current_time - self.last_alert_time >= self.alert_interval:
if self.save_frame_buffer.full():
logger.info('告警缓存满,保存为视频')
# 上传视频/图片
save_thread = NewSaveAndUploadMP4Thread(self.ip, self.save_frame_buffer, 'video', token)
save_thread.start()
self.last_alert_time = current_time # 更新告警时间
self.frame_count = 0
self.save_frame_buffer.queue.clear()
self.start_warning_time = now_time
else:
logger.debug(f'告警间隔内,跳过本次告警。下次可告警时间: {datetime.datetime.fromtimestamp(self.last_alert_time + self.alert_interval).strftime("%Y-%m-%d %H:%M:%S")}')
# 即使不告警,也清空缓冲区以避免累积
self.frame_count = 0
self.save_frame_buffer.queue.clear()
self.start_warning_time = now_time
elif r_list == self.p_num_error and self.p_num_error == 0:
logger.debug('无异常')
elif r_list < self.p_num_error:
# 间隔小于4秒的认为是误判
change_flag = verify_timenum(self.latest_warning_time, now_time, 4)
if not change_flag:
self.p_num_error = r_list
logger.info(f'异常人数减少,当前异常人数:{self.p_num_error},当前异常帧数量:{self.frame_count}')
# 检查告警间隔
current_time = time.time()
if current_time - self.last_alert_time >= self.alert_interval:
# 将当前缓存中的异常帧上传
if self.save_frame_buffer.qsize() > 20:
save_thread = NewSaveAndUploadMP4Thread(self.ip, self.save_frame_buffer, 'video', token)
save_thread.start()
self.last_alert_time = current_time # 更新告警时间
else:
save_thread = NewSaveAndUploadMP4Thread(self.ip, self.save_frame_buffer, 'picture', token)
save_thread.start()
self.last_alert_time = current_time # 更新告警时间
self.frame_count = 0
self.save_frame_buffer.queue.clear()
self.start_warning_time = None
else:
logger.debug(f'告警间隔内,跳过本次告警。下次可告警时间: {datetime.datetime.fromtimestamp(self.last_alert_time + self.alert_interval).strftime("%Y-%m-%d %H:%M:%S")}')
# 即使不告警,也清空缓冲区以避免累积
self.frame_count = 0
self.save_frame_buffer.queue.clear()
self.start_warning_time = None
else:
logger.debug('程序可能的误判')
else:
# 一样的帧直接加入到ffmpeg缓存中
logger.debug('相同帧不需要推理')
ret, jpeg_frame = cv2.imencode('.jpg', frame)
try:
self.ffmpeg_frame_buffer.put(jpeg_frame.tobytes(), timeout=1)
except queue.Full:
logger.warning("FFmpeg帧缓冲区已满")
else:
# 什么都不干的帧,直接以上一次的推理结果画框。
for i in range(len(self.p_box_list)):
c1, c2 = (int(self.p_box_list[i][0]), int(self.p_box_list[i][1])), (int(self.p_box_list[i][2]), int(self.p_box_list[i][3]))
cv2.rectangle(frame, c1, c2, (0, 0, 255), 2)
ret, jpeg_frame = cv2.imencode('.jpg', frame)
try:
self.ffmpeg_frame_buffer.put(jpeg_frame.tobytes(), timeout=1)
except queue.Full:
logger.warning("FFmpeg帧缓冲区已满")
def stop(self):
self.running = False
if __name__ == "__main__":
logger.info("程序启动")
logger.info("=============================================================================================================")
# 创建必要的目录
os.makedirs(save_path, exist_ok=True)
os.makedirs(people_save_path, exist_ok=True)
os.makedirs(m3u8_path + ip, exist_ok=True)
# load custom plugin and engine
# PLUGIN_LIBRARY = f"{engine_path}libmyplugins.so"
# engine_file_path = f"{engine_path}best.engine"
# 执行python代码命令行 参数判断操作(不做处理)
if len(sys.argv) > 1:
engine_file_path = sys.argv[1]
if len(sys.argv) > 2:
PLUGIN_LIBRARY = sys.argv[2]
clear_folder(ip)
# # 加载动态链接库
# try:
# ctypes.CDLL(PLUGIN_LIBRARY)
# except Exception as e:
# logger.error(f"加载动态链接库失败: {e}")
# sys.exit(1)
# 手动输入训练时的类别
categories = ["class1", "class2"]
# 加载模型文件
try:
yolov8_wrapper1 = YOLOv8RKNN(rknn_path, categories)
except Exception as e:
logger.error(f"加载模型文件失败: {e}")
sys.exit(1)
# 重启后自动根据文件加载当天打卡信息。根据程序设置每天凌晨0点更新打卡数据
get_attendance_p()
thread_rtsp = None
thread_det_1 = None
try:
thread_rtsp = rtspInputFrame(vod_path, rtsp_frame_buffer)
thread_det_1 = inferThread(yolov8_wrapper1, vod_path, ip, rtsp_frame_buffer, tokenResult)
thread_rtsp.start()
thread_det_1.start()
# 等待线程结束
thread_rtsp.join()
thread_det_1.join()
except KeyboardInterrupt:
logger.info("收到键盘中断信号,正在停止程序...")
except Exception as e:
logger.error(f"程序运行时发生异常: {e}")
finally:
# 停止线程
if thread_rtsp:
thread_rtsp.stop()
if thread_det_1:
thread_det_1.stop()
# 销毁实例
if 'yolov8_wrapper1' in locals():
try:
yolov8_wrapper1.destroy()
except Exception as e:
logger.error(f"销毁YOLOv8实例时发生异常: {e}")
# 等待一小段时间确保线程停止
time.sleep(1)
logger.info("程序已停止")