import json from flask import Flask, request,jsonify import os import time from datetime import datetime import numpy as np import base64 import cv2 """ 使用说明 一,安装 (1)请首先确保您已经具备Python运行环境 目前支持 Python 3.8 至 Python 3.12 (2)安装Python基础依赖库(注:下面依赖库版本是Windows-Python3.10环境下的,其他环境需要自行解决可能发生的错误) pip install Flask==3.0.3 -i https://pypi.tuna.tsinghua.edu.cn/simple pip install numpy==1.26.2 -i https://pypi.tuna.tsinghua.edu.cn/simple pip install opencv-python==4.8.1.78 -i https://pypi.tuna.tsinghua.edu.cn/simple pip install openai==1.66.3 -i https://pypi.tuna.tsinghua.edu.cn/simple 二,启动 python api_behaviour.py """ app = Flask(__name__) @app.route('/', methods=['GET']) def index(): return "This is the api_behaviour.py service" @app.route('/receiveDetectResults', methods=['POST', 'GET']) def receiveDetectResults(): print("receiveDetectResults()") """ 更新时间 2024/12/12 适用于v4.503及其之后的版本 适用于【模式一】,【模式二】,【模式三】,【模式四】的API行为算法调用示例1 """ ret = False msg = "未知错误" happen = False # True表示发生了报警,False表示未发生报警 happenScore = 0.0 # 发生报警的置信度 result_indices = [] # 返回过滤后检测目标的序号数组 try: print("request_params=", datetime.now()) request_params = request.get_json() # print(request_params) detects = request_params.get("detects") image_base64 = request_params.get("image_base64", None) # 接收base64编码的图片并转换成cv2的图片格式 print("image_base64=", datetime.now(), image_base64) if detects: if len(detects) > 0: # 如果检测目标数大于0,只提取第一个目标的序号并返回 result_indices = [0] ret = True msg = "success" except Exception as e: msg = str(e) response_data = { "code": 1000 if ret else 0, "msg": msg, "result": { "happen": happen, "happenScore": happenScore, "indices": result_indices, } } return jsonify(response_data) @app.route('/receiveDetectResults2', methods=['POST', 'GET']) def receiveDetectResults2(): print("receiveDetectResults2()") """ 更新时间 2025/01/08 适用于v4.607及其之后的版本 适用于【模式一】,【模式二】,【模式三】,【模式四】的API行为算法调用示例2 最新一次更新说明:新增userdata自定义字段 """ ret = False msg = "未知错误" happen = False # True表示发生了报警,False表示未发生报警 happenScore = 0.0 # 发生报警的置信度 result_detects = [] # 返回检测的目标数组 try: print("request_params=", datetime.now()) request_params = request.get_json() print(request_params) detects = request_params.get("detects") if detects or True: # 添加一个模拟产生的目标 result_detects.append({ "x1": 100, "y1": 100, "x2": 300, "y2": 300, "class_score": 0.93456, "class_id": 0, "class_name": "test", "user_data": "hahahah", }) happen = True happenScore = 1 ret = True msg = "success" except Exception as e: msg = str(e) response_data = { "code": 1000 if ret else 0, "msg": msg, "result": { "happen": happen, "happenScore": happenScore, "detects": result_detects } } return jsonify(response_data) @app.route('/receiveImage', methods=['POST', 'GET']) def receiveImage(): print("receiveImage()") """ 更新时间 2024/12/12 适用于v4.503及其之后的版本 适用于【模式五】的API行为算法调用示例 """ ret = False msg = "未知错误" happen = False # True表示发生了报警,False表示未发生报警 happenScore = 0.0 # 发生报警的置信度 happenDesc = "" # 发生报警的描述(v4.635支持) result_detects = [] try: request_params = request.get_json() # print("request_params=", datetime.now(), request_params) image_base64 = request_params.get("image_base64", None) # 接收base64编码的图片并转换成cv2的图片格式 if image_base64: encoded_image_byte = base64.b64decode(image_base64) image_array = np.frombuffer(encoded_image_byte, np.uint8) # image = turboJpeg.decode(image_array) # turbojpeg 解码 image = cv2.imdecode(image_array, cv2.COLOR_RGB2BGR) # opencv 解码 # video_height,video_width,video_channels = image.shape # 输入图像的的高,宽,channel print(image.shape) result_detects.append({ "x1": 100, "y1": 100, "x2": 500, "y2": 500, "class_score": 0.93456, "class_id": 0, "class_name": "test", "region_index": 2, }) happen = True # True 表示发生了报警 happenScore = 1.0 # 发生报警的置信度 ret = True msg = "success" except Exception as e: msg = str(e) response_data = { "code": 1000 if ret else 0, "msg": msg, "result": { "happen": happen, "happenScore": happenScore, "happenDesc": happenDesc, "detects": result_detects } } # return json.dumps(response_data) return jsonify(response_data) if __name__ == '__main__': # app.debug = True # 推荐的json格式化网页工具:https://www.sojson.com/ # 接口列表 # http://127.0.0.1:8070/receiveDetectResults # http://127.0.0.1:8070/receiveDetectResults2 # http://127.0.0.1:8070/receiveImage app.run(host='0.0.0.0', port=8070, debug=False)