修改环境参数, 关闭https ssl证书校验

This commit is contained in:
haotianmingyue 2025-09-18 11:00:40 +08:00
parent 9d1c34bdb0
commit 960d5237da
4 changed files with 35 additions and 32 deletions

View File

@ -1,6 +1,7 @@
from util.haikang_util import HaikangUtil
import asyncio
import base64
import json
@ -12,42 +13,42 @@ async def get_door_list_service(pageNo: int = 1, pageSize: int = 10):
result = await HaikangUtil.get_door_list_v2(pageNo, pageSize)
print(result)
with open("get_door_list_service.json", "w", encoding="utf-8") as f:
f.write(result[1])
f.write(json.dumps(result[1]))
# 查询门禁状态
async def get_door_status_service(door_index_codes):
result = await HaikangUtil.get_door_status(door_index_codes)
print(result)
with open("get_door_status_service.json", "w", encoding="utf-8") as f:
f.write(result[1])
f.write(json.dumps(result[1]))
# 门禁控制
async def door_do_control_service(door_index_codes, control_type):
result = await HaikangUtil.door_do_control(door_index_codes, control_type)
print(result)
with open("door_do_control_service.json", "w", encoding="utf-8") as f:
f.write(result[1])
f.write(json.dumps(result[1]))
# 查询门禁点事件
async def query_door_events_service(door_index_code,pageNo, pageSize, startTime, endTime):
result = await HaikangUtil.query_door_events_v2(door_index_code, pageNo=pageNo, pageSize=pageSize ,startTime=startTime, endTime=endTime)
print(result)
with open("query_door_events_service.json", "w", encoding="utf-8") as f:
f.write(result[1])
f.write(json.dumps(result[1]))
# 查看门禁点在线状态
async def door_online_status_service(door_index_codes):
result = await HaikangUtil.door_online_status(door_index_codes)
print(result)
with open("door_online_status_service.json", "w", encoding="utf-8") as f:
f.write(result[1])
f.write(json.dumps(result[1]))
# 按条件查询人脸分组, 很重要
async def get_face_group_service():
result = await HaikangUtil.get_face_group()
print(result)
with open("get_face_group_service.json", "w", encoding="utf-8") as f:
f.write(result[1])
f.write(json.dumps(result[1]))
# 人脸分组1vN搜索
async def face_group_1vN_search_service(image_path):
@ -63,11 +64,11 @@ async def face_group_1vN_search_service(image_path):
pageSize=10,
searchNum=99,
minSimilarity=50,
faceGroupIndexCodes=['5dc82633-a4cb-4107-b55e-f21bf952f9']
faceGroupIndexCodes=['70cd3fcdcdb444a0b92e75ff66b549e6']
)
print(result)
with open("face_group_1vN_search_service.json", "w", encoding="utf-8") as f:
f.write(result[1])
f.write(json.dumps(result[1]))
# 人脸评分
async def face_picture_check(image_path):
@ -77,38 +78,38 @@ async def face_picture_check(image_path):
encoded_image = base64.b64encode(image_data).decode('utf-8')
# print(encoded_image)
result = await HaikangUtil.face_picture_check(
facePicBinaryData=encoded_image
)
print(result)
with open("face_picture_check.json", "w", encoding="utf-8") as f:
f.write(result[1])
f.write(json.dumps(result[1]))
# 查询访客预约记录
async def query_visitor_record():
result = await HaikangUtil.query_visitor_record()
result = await HaikangUtil.query_visitor_record(pageNo=1, pageSize = 10)
print(result)
with open("query_visitor_record.json", "w", encoding="utf-8") as f:
f.write(result[1])
f.write(json.dumps(result[1]))
if __name__ == '__main__':
# asyncio.run(get_door_list_service())
#asyncio.run(get_door_list_service())
# print("*"*100)
# asyncio.run(get_door_status_service(['D01']))
# asyncio.run(get_door_status_service(['d4a610b6554d4544bc35dd3138f128b0']))
# print("*"*100)
# asyncio.run(door_do_control_service(['D01'], 1))
# asyncio.run(query_door_events_service('D01',1,10,1640995200,1640995200))
# asyncio.run(query_door_events_service('d4a610b6554d4544bc35dd3138f128b0',1,10,'20025-09-10T17:30:08+08:00','2004-09-15T17:30:08+08:00'))
# asyncio.run(get_face_group_service())
#asyncio.run(get_face_group_service())
# image_path = "75c03e462769c81b6a8513d90ff2a27d.jpg"
# asyncio.run(face_group_1vN_search_service(image_path))
# asyncio.run(face_picture_check(image_path))
image_path = "./a7b3c794281902041c9b823df7667ce7.jpg"
asyncio.run(face_group_1vN_search_service(image_path))
#asyncio.run(face_picture_check(image_path))
# asyncio.run(query_visitor_record())
asyncio.run(door_online_status_service(["xxxxxxxx"]))
#asyncio.run(door_online_status_service(["d4a610b6554d4544bc35dd3138f128b0"]))

View File

@ -15,15 +15,14 @@ class HaiKangSettings:
"""
海康平台配置
"""
# HAIKANG_URL = 'http://localhost'
HAIKANG_URL = 'https://localhost'
# HAIKANG_PORT = 443
HAIKANG_URL = 'https://192.168.89.230'
HAIKANG_PORT = 443
HAIKANG_PORT = 9919
HAIKANG_AK = ''
HAIKANG_SK = ''
HAIKANG_AK = '29862915'
HAIKANG_SK = '3QlsmcyadBqu4OpVoxsJ'
HAIKANG_ACCESS_TOKEN_URL = '/api/v1/oauth/token'
HAIKANG_DOOR_STATES_URL = '/api/v1/door/states'
# HAIKANG_DOOR_STATES_URL = '/api/v1/door/states'
HAIKANG_DOOR_STATES_URL = '/api/acs/v1/door/states'
HAIKANG_DOOR_DOCONTROL_URL = '/api/acs/v1/door/doControl'
HAIKANG_DOOR_ENVENTS_URL = '/api/acs/v2/door/events'
HAIKANG_DOOR_SEARCH = '/api/resource/v2/door/search'
@ -32,9 +31,8 @@ class HaiKangSettings:
HAIKANG_PICTURE_CHECK_URL = '/api/frs/v1/face/picture/check'
HAIKANG_FACECAPATURE_SEARCH = '/api/frs/v1/event/face_capture/search'
HAIKANG_FACE_GROUP_URL = '/api/frs/v1/face/group'
HAIKANG_VISITOR_RECORD_SEARCH = '/api/visitor/v2/appointment/records'
class GetConfig:

View File

@ -91,7 +91,10 @@ class HaikangUtil:
# 发送请求
@classmethod
async def send_request(cls, method: str, url: str, headers: dict, body: str | None):
async with httpx.AsyncClient() as client:
async with httpx.AsyncClient(verify=False) as client:
response = await client.request(
method, url, headers=headers, content=body, timeout=60
)
@ -137,6 +140,7 @@ class HaikangUtil:
_type_: _description_
"""
url = f"{HaiKangConfig.HAIKANG_URL}:{HaiKangConfig.HAIKANG_PORT}/artemis{HaiKangConfig.HAIKANG_DOOR_SEARCH}"
body_dict = {
"pageNo": max(pageNo, 1),
"pageSize": min(pageSize, 999),
@ -407,10 +411,10 @@ class HaikangUtil:
url = f"{HaiKangConfig.HAIKANG_URL}:{HaiKangConfig.HAIKANG_PORT}/artemis{HaiKangConfig.HAIKANG_FACE_GROUP_URL}"
headers = cls.build_signed_headers(
"POST", url, None, HaiKangConfig.HAIKANG_AK, HaiKangConfig.HAIKANG_SK
"POST", url, json.dumps({}), HaiKangConfig.HAIKANG_AK, HaiKangConfig.HAIKANG_SK
)
back = await cls.send_request("POST", url, headers, None)
back = await cls.send_request("POST", url, headers, json.dumps({}))
if back["code"] == "0":
# logger.info(f"按条件查询人脸成功")