haikang_api/util/haikang_util.py

617 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import hashlib
import hmac
import json
import base64
import os
import requests
import httpx
from fastapi import HTTPException
from datetime import datetime, timezone
from email.utils import format_datetime
from urllib.parse import urlparse
from dateutil.relativedelta import relativedelta
from config.env import HaiKangConfig
# from utils.log_util import logger
class HaikangUtil:
"""
海康平台工具类
"""
# def __init__(self):
# self.HAIKANG_URL = HaiKangConfig.HAIKANG_URL
# self.HAIKANG_PORT = HaiKangConfig.HAIKANG_PORT
# self.HAIKANG_AK = HaiKangConfig.HAIKANG_AK
# self.HAIKANG_SK = HaiKangConfig.HAIKANG_SK
# 通用请求头
@classmethod
def build_signed_headers(cls, method, url, body, app_key, app_secret):
"""
返回一个 dict包含所有签名请求所需的 header。
参数:
- method: HTTP 方法,例如 "POST"
- url: URL完整url(代码中提取相对位置, 保留 path 和 query
- body: 请求主体字符串(如 JSON可为空。
- app_key: AK
- app_secret: SK
"""
# 1. 基本 headers
accept = "*/*"
content_type = "application/json"
# 2. 计算 MD5可选如果 body 存在)
content_md5 = ""
if body:
# .digest()返回原始二进制md5
md5_digest = hashlib.md5(body.encode("utf-8")).digest()
content_md5 = base64.b64encode(md5_digest).decode("utf-8")
# 3. 生成 Date headerHTTP 规范格式)
now = datetime.now(timezone.utc)
date = format_datetime(now, usegmt=True)
# 4. 构造 httpHeaders 部分
http_headers_str = "\n".join(
[method.upper(), accept, content_md5, content_type, date, ""]
)
# 5. 自定义 headers 部分
custom_headers_str = f"x-ca-key:{app_key}\n"
# 6. 拼接 path + query
parsed = urlparse(url)
path_and_query = parsed.path
if parsed.query:
path_and_query += "?" + parsed.query
# 7. 构造签名字符串
string_to_sign = http_headers_str + custom_headers_str + path_and_query
# 8. 使用 HmacSHA256 + Base64 签名
h = hmac.new(
app_secret.encode("utf-8"), string_to_sign.encode("utf-8"), hashlib.sha256
)
signature = base64.b64encode(h.digest()).decode("utf-8")
# 9. 返回完整 headers
headers = {
"Accept": accept,
"Content-MD5": content_md5,
"Content-Type": content_type,
"Date": date,
"X-Ca-Key": app_key,
"X-Ca-Signature": signature,
"X-Ca-Signature-Headers": "x-ca-key",
}
return headers
# 发送请求
@classmethod
async def send_request(cls, method: str, url: str, headers: dict, body: str | None, json=True):
async with httpx.AsyncClient(verify=False) as client:
response = await client.request(
method, url, headers=headers, content=body, timeout=60
)
if response.status_code in (301, 302, 303, 307, 308):
location = response.headers.get("Location")
# 可能需要处理成 GET 请求或保留原方法
response = await client.get(location, headers=headers)
response.raise_for_status()
# if response.status_code >= 400:
# logger.error(f"发送请求失败: {url} , {response.status_code} {response.text}")
# raise HTTPException(status_code=response.status_code, detail=response.text)
if json:
return response.json()
return response
# 获取access_token
@classmethod
async def get_access_token(cls):
"""获取access_token
Returns:
_type_: json
"""
url = f"{HaiKangConfig.HAIKANG_URL}:{HaiKangConfig.HAIKANG_PORT}/artemis{HaiKangConfig.HAIKANG_ACCESS_TOKEN_URL}"
headers = cls.build_signed_headers(
"POST", url, None, HaiKangConfig.HAIKANG_AK, HaiKangConfig.HAIKANG_SK
)
back = await cls.send_request("POST", url, headers, None)
if back["code"] == "0":
# logger.info("获取access_token成功")
return [True, back]
# return back["data"]["access_token"],back["data"]["token_type"] ,back["data"]["expires_in"]
else:
# logger.error("获取access_token失败", back["code"], back["msg"])
return [False, back]
# 查询门禁点列表v2
@classmethod
async def get_door_list_v2(cls, pageNo: int = 1, pageSize: int = 10):
"""获取门禁点列表v2
Args:
pageNo (int, optional): 页码. Defaults to 1.
pageSize (int, optional): 每页个数. Defaults to 10.
Returns:
_type_: _description_
"""
url = f"{HaiKangConfig.HAIKANG_URL}:{HaiKangConfig.HAIKANG_PORT}/artemis{HaiKangConfig.HAIKANG_DOOR_SEARCH}"
body_dict = {
"pageNo": max(pageNo, 1),
"pageSize": min(pageSize, 999),
}
body_json = json.dumps(body_dict, separators=(",", ":"))
headers = cls.build_signed_headers(
"POST", url, body_json, HaiKangConfig.HAIKANG_AK, HaiKangConfig.HAIKANG_SK
)
back = await cls.send_request("POST", url, headers, body_json)
if back["code"] == "0":
# logger.info("查询门禁点列表成功")
return [True, back["data"]]
else:
# logger.error("查询门禁点列表失败 ", back["code"], back["msg"])
return [False, back["code"], back["msg"]]
# 查询门禁状态
@classmethod
async def get_door_status(cls, door_index_codes: list):
"""查询门禁状态
Args:
door_index_codes (list): 门禁点唯一标识
"""
url = f"{HaiKangConfig.HAIKANG_URL}:{HaiKangConfig.HAIKANG_PORT}/artemis{HaiKangConfig.HAIKANG_DOOR_STATES_URL}"
body_dict = {"doorIndexCodes": door_index_codes}
body_json = json.dumps(body_dict, separators=(",", ":"))
headers = cls.build_signed_headers(
"POST", url, body_json, HaiKangConfig.HAIKANG_AK, HaiKangConfig.HAIKANG_SK
)
back = await cls.send_request("POST", url, headers, body_json)
if back["code"] == "0":
# logger.info("查询门禁状态成功")
return [
True,
back["data"]["authDoorList"],
back["data"]["noAuthDoorIndexCodeList"],
]
else:
# logger.error("查询门禁状态失败 ", back["code"], back["msg"])
return [False, back["code"], back["msg"]]
# 门禁点反控
@classmethod
async def door_do_control(cls, door_index_code: list, control_type: int):
"""门禁点反控
Args:
door_index_code (list): 门禁点唯一标识
control_type (int): 操作类型 0-常开, 1-门闭, 2-门开, 3-常闭, 不允许长闭
"""
if control_type not in [0, 1, 2]:
# logger.error("control_type参数错误 ", control_type)
return [False, 400, "control_type参数错误"]
url = f"{HaiKangConfig.HAIKANG_URL}:{HaiKangConfig.HAIKANG_PORT}/artemis{HaiKangConfig.HAIKANG_DOOR_DOCONTROL_URL}"
body_dict = {"doorIndexCodes": door_index_code, "controlType": control_type}
body_json = json.dumps(body_dict, separators=(",", ":"))
headers = cls.build_signed_headers(
"POST", url, body_json, HaiKangConfig.HAIKANG_AK, HaiKangConfig.HAIKANG_SK
)
back = await cls.send_request("POST", url, headers, body_json)
"""
{
"code": "0",
"msg": "success",
"data": [
{
"doorIndexCode": "2c95c028a809448f962a969e3ab34f",
"controlResultCode": 0, # 0表示反控成功, 其他表示失败, 门禁控制是否成功主要看这个值
"controlResultDesc": "success",
}
],
},
"""
if back["code"] == "0":
# logger.info(
# f"执行门禁控制接口成功 door_index_code:{door_index_code} control_type:{control_type} "
# )
return [True, back["data"]]
else:
# logger.error(f"执行门禁控制失败 ", back["code"], back["msg"])
return [False, back["code"], back["msg"]]
@classmethod
# 查询门禁点事件v2
async def query_door_events_v2(cls, door_index_code: list, **args):
"""查询门禁点事件v2
Args:
door_index_code (str): 门禁唯一标识
"""
url = f"{HaiKangConfig.HAIKANG_URL}:{HaiKangConfig.HAIKANG_PORT}/artemis{HaiKangConfig.HAIKANG_DOOR_ENVENTS_URL}"
# current_time = datetime.now(timezone.utc)
# three_months_ago = current_time - relativedelta(months=3)
# # ISO8601 时间格式
# current_time_iso = current_time.isoformat()
# three_months_ago_iso = three_months_ago.isoformat()
body_dict = {
"pageNo": max(args.get("pageNo", 1), 1),
"pageSize": min(args.get("pageSize", 10), 999),
"doorIndexCode": door_index_code,
# 排序字段
"sort": "eventTime",
# 倒序返回
"order": "desc",
}
if args.get("startTime") and args.get('startTime') is not None:
body_dict["startTime"] = args.get("startTime")
if args.get("endTime") and args.get('endTime') is not None:
body_dict["endTime"] = args.get("endTime")
if args.get("eventType") and args.get('eventType') is not None:
body_dict["eventType"] = args.get("eventType")
if args.get("personName") and args.get('personName') is not None:
body_dict["personName"] = args.get("personName")
body_json = json.dumps(body_dict, separators=(",", ":"))
headers = cls.build_signed_headers(
"POST", url, body_json, HaiKangConfig.HAIKANG_AK, HaiKangConfig.HAIKANG_SK
)
back = await cls.send_request("POST", url, headers, body_json)
if back["code"] == "0":
# logger.info(f"获取门禁事件成功")
return [True, back["data"]]
else:
# logger.error(f"获取门禁事件失败 ", back["code"], back["msg"])
return [False, back["code"], back["msg"]]
# 查看门禁在线状态
@classmethod
async def door_online_status(cls, indexCodes: list):
url = f"{HaiKangConfig.HAIKANG_URL}:{HaiKangConfig.HAIKANG_PORT}/artemis{HaiKangConfig.HAIKANG_DOOR_ONLINE_STATUS}"
body_dict = {
"indexCodes": indexCodes
}
body_json = json.dumps(body_dict, separators=(",", ":"))
headers = cls.build_signed_headers(
"POST", url, body_json, HaiKangConfig.HAIKANG_AK, HaiKangConfig.HAIKANG_SK
)
back = await cls.send_request("POST", url, headers, body_json)
if back["code"] == "0":
# logger.info(f"获取门禁在线状态成功")
return [True, back["data"]]
else:
# logger.error(f"获取门禁在线状态失败 ", back["code"], back["msg"])
return [False, back["code"], back["msg"]]
# 人脸分组1vN检索
@classmethod
async def face_group_1vN_search(
cls,
facePicBinaryData: str, # base64编码后的字符串
pageNo: int = 1,
pageSize: int = 20,
searchNum: int = 99,
minSimilarity: int = 50,
faceGroupIndexCodes: list[str] = None,
):
"""人脸分组1vN检索
Args:
facePicBinaryData (str): 图片二值化后,base64编码的字符串
pageSize (int, optional): 每页个数 Defaults to 20.
searchNum (int, optional): 最大搜索返回数. Defaults to 99.
minSimilarity (int, optional): 最小相似度. Defaults to 50.
faceGroupIndexCodes (list[str], optional): 查询人脸分组. Defaults to None.
Returns:
_type_: _description_
"""
url = f"{HaiKangConfig.HAIKANG_URL}:{HaiKangConfig.HAIKANG_PORT}/artemis{HaiKangConfig.HAIKANG_APPLICATION_ONETOMANY_URL}"
body_dict = {
"facePicBinaryData": facePicBinaryData,
"pageNo": pageNo,
"pageSize": pageSize,
"searchNum": searchNum,
"minSimilarity": minSimilarity,
"faceGroupIndexCodes": faceGroupIndexCodes,
}
body_json = json.dumps(body_dict, separators=(",", ":"))
headers = cls.build_signed_headers(
"POST", url, body_json, HaiKangConfig.HAIKANG_AK, HaiKangConfig.HAIKANG_SK
)
back = await cls.send_request("POST", url, headers, body_json)
if back["code"] == "0":
# logger.info(f"获取人脸分组检索成功")
return [True, back["data"]]
else:
# logger.error(f"获取人脸分组检索失败 ", back["code"], back["msg"])
return [False, back["code"], back["msg"]]
# 人脸评分
@classmethod
async def face_picture_check(
cls,
facePicBinaryData: str, #
):
"""人脸评分
Args:
facePicBinaryData (str): 人脸图的二进制数据经过Base64编码后的字符串
Returns:
_type_: _description_
"""
url = f"{HaiKangConfig.HAIKANG_URL}:{HaiKangConfig.HAIKANG_PORT}/artemis{HaiKangConfig.HAIKANG_PICTURE_CHECK_URL}"
body_dict = {"facePicBinaryData": facePicBinaryData}
body_json = json.dumps(body_dict, separators=(",", ":"))
headers = cls.build_signed_headers(
"POST", url, body_json, HaiKangConfig.HAIKANG_AK, HaiKangConfig.HAIKANG_SK
)
back = await cls.send_request("POST", url, headers, body_json)
if back["code"] == "0":
# logger.info(f"获取人脸评分成功")
return [True, back["data"]]
else:
# logger.error(f"获取人脸评分失败 ", back["code"], back["msg"])
return [False, back["code"], back["msg"]]
# 按条件查询人员识别事件
@classmethod
async def get_face_capture_list_event(cls, startTime, endTime):
"""按条件查询人员识别事件, 暂无实现.
Args:
startTime (_type_): _description_
endTime (_type_): _description_
"""
pass
# 按条件查询人脸分组
@classmethod
async def get_face_group(cls):
url = f"{HaiKangConfig.HAIKANG_URL}:{HaiKangConfig.HAIKANG_PORT}/artemis{HaiKangConfig.HAIKANG_FACE_GROUP_URL}"
headers = cls.build_signed_headers(
"POST", url, json.dumps({}), HaiKangConfig.HAIKANG_AK, HaiKangConfig.HAIKANG_SK
)
back = await cls.send_request("POST", url, headers, json.dumps({}))
if back["code"] == "0":
# logger.info(f"按条件查询人脸成功")
return [True, back["data"]]
else:
# logger.error(f"按条件查询人脸失败 ", back["code"], back["msg"])
return [False, back["code"], back["msg"]]
# 查询访客预约记录, visitorStatus: 0 待审核, 1 正常, 2 迟到, 3 失效, 4 审核退回, 9 审核失效, 10 邀约中, 11 邀约失效
@classmethod
async def query_visitor_record(
cls, **args
):
"""查询访客预约记录
Args:
pageNo (int, optional): 页码. Defaults to 1.
pageSize (int, optional): 每页个数. Defaults to 10.
visitorStatus (int, optional): 访客状态. Defaults to 1.
Returns:
_type_: _description_
"""
url = f"{HaiKangConfig.HAIKANG_URL}:{HaiKangConfig.HAIKANG_PORT}/artemis{HaiKangConfig.HAIKANG_VISITOR_RECORD_SEARCH}"
body_dict = args
body_json = json.dumps(body_dict, separators=(",", ":"))
headers = cls.build_signed_headers(
"POST", url, body_json, HaiKangConfig.HAIKANG_AK, HaiKangConfig.HAIKANG_SK
)
back = await cls.send_request("POST", url, headers, body_json)
if back["code"] == "0":
# logger.info(f"查询访客预约记录成功")
return [True, back["data"]]
else:
# logger.error(f"查询访客预约记录失败 ", back["code"], back["msg"])
return [False, back["code"], back["msg"]]
# 获取访客记录中的图片
@classmethod
async def query_visitor_record_pictures(cls, svrIndexCode: str, picUri: str, save_path):
"""获取访客记录中的图片
Args:
visitorRecordIndexCode (str): 访客记录索引码
Returns:
_type_: _description_
"""
url = f"{HaiKangConfig.HAIKANG_URL}:{HaiKangConfig.HAIKANG_PORT}/artemis{HaiKangConfig.HAIKANG_VISITOR_RECORD_PICTURES}"
body_dict = {
"svrIndexCode": svrIndexCode,
"picUri": picUri,
}
body_json = json.dumps(body_dict, separators=(",", ":"))
headers = cls.build_signed_headers(
"POST", url, body_json, HaiKangConfig.HAIKANG_AK, HaiKangConfig.HAIKANG_SK
)
back = await cls.send_request("POST", url, headers, body_json, False)
os.makedirs(save_path, exist_ok=True)
save_path = os.path.join(save_path, picUri.split('/')[-1]+".jpg")
with open(save_path, 'wb') as f:
for chunk in back.iter_bytes(chunk_size=8192):
if chunk: # 忽略 keep-alive 的空 chunk
f.write(chunk)
# 获取人员列表
@classmethod
async def get_person_list(cls, pageNo: int = 1, pageSize: int = 10):
url = f"{HaiKangConfig.HAIKANG_URL}:{HaiKangConfig.HAIKANG_PORT}/artemis{HaiKangConfig.HAIKANG_PERSON_LIST}"
body_dict = {
"pageNo": pageNo,
"pageSize": pageSize,
}
body_json = json.dumps(body_dict, separators=(",", ":"))
headers = cls.build_signed_headers(
"POST", url, body_json, HaiKangConfig.HAIKANG_AK, HaiKangConfig.HAIKANG_SK
)
back = await cls.send_request("POST", url, headers, body_json)
if back["code"] == "0":
return [True, back["data"]]
else:
return [False, back["code"], back["msg"]]
# 提取人员图片
@classmethod
async def get_person_picture(cls, serverIndexCode, picUri, save_path):
url = f"{HaiKangConfig.HAIKANG_URL}:{HaiKangConfig.HAIKANG_PORT}/artemis{HaiKangConfig.HAIKANG_GET_PERSON_PICTURE}"
body_dict = {
"serverIndexCode": serverIndexCode,
"picUri": picUri,
}
body_json = json.dumps(body_dict, separators=(",", ":"))
headers = cls.build_signed_headers(
"POST", url, body_json, HaiKangConfig.HAIKANG_AK, HaiKangConfig.HAIKANG_SK
)
back = await cls.send_request("POST", url, headers, body_json, False)
os.makedirs(save_path, exist_ok=True)
save_path = os.path.join(save_path, picUri.split('/')[-1]+".jpg")
with open(save_path, 'wb') as f:
for chunk in back.iter_bytes(chunk_size=8192):
if chunk: # 忽略 keep-alive 的空 chunk
f.write(chunk)
# 单个添加人脸分组
@classmethod
async def face_group_addition(cls, name, description):
url = f"{HaiKangConfig.HAIKANG_URL}:{HaiKangConfig.HAIKANG_PORT}/artemis{HaiKangConfig.HAIKANG_FACE_GROUP_ADDITION}"
body_dict = {
"name": name,
"description": description,
}
body_json = json.dumps(body_dict, separators=(",", ":"))
headers = cls.build_signed_headers(
"POST", url, body_json, HaiKangConfig.HAIKANG_AK, HaiKangConfig.HAIKANG_SK
)
back = await cls.send_request("POST", url, headers, body_json)
if back["code"] == "0":
return [True, back["data"]]
else:
return [False, back["code"], back["msg"]]
# 向分组中单个添加人脸图片
@classmethod
async def face_single_addition(cls, faceGroupIndexCode: str, faceInfo: dict, facePic: dict):
url = f"{HaiKangConfig.HAIKANG_URL}:{HaiKangConfig.HAIKANG_PORT}/artemis{HaiKangConfig.HAIKANG_FACE_SIGLE_ADDITION}"
body_dict = {
"faceGroupIndexCode": faceGroupIndexCode,
"faceInfo": {
"name": "测1"
},
"facePic": {
#"facePicBinaryData": facePic["facePicBinaryData"]
"facePicUrl":"https://pic-image.yesky.com/uploadImages/newPic/2023/271/38/A3A130JG06X2.png"
}
}
body_json = json.dumps(body_dict, separators=(",", ":"))
# print(body_json)
headers = cls.build_signed_headers(
"POST", url, body_json, HaiKangConfig.HAIKANG_AK, HaiKangConfig.HAIKANG_SK
)
back = await cls.send_request("POST", url, headers, body_json)
if back["code"] == "0":
return [True, back["data"]]
else:
return [False, back["code"], back["msg"]]
# 批量删除人脸
@classmethod
async def face_delete(cls, faceGroupIndexCode: str, indexCodes: list):
url = f"{HaiKangConfig.HAIKANG_URL}:{HaiKangConfig.HAIKANG_PORT}/artemis{HaiKangConfig.HAIKANG_FACE_DELETE}"
body_dict = {
"faceGroupIndexCode": faceGroupIndexCode,
"indexCodes": indexCodes,
}
body_json = json.dumps(body_dict, separators=(",", ":"))
headers = cls.build_signed_headers(
"POST", url, body_json, HaiKangConfig.HAIKANG_AK, HaiKangConfig.HAIKANG_SK
)
back = await cls.send_request("POST", url, headers, body_json)
if back["code"] == "0":
return [True, back["data"]]
else:
return [False, back["code"], back["msg"]]