OrangePi3588Media/scripts/mock_alarm_server.py
2026-02-26 17:01:45 +08:00

184 lines
5.1 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
模拟告警服务器
用于测试 RK3588 Media Server 的 alarm 节点 external_api 功能
提供两个接口:
1. POST /api/getToken - 获取访问令牌
2. POST /api/putMessage - 接收告警信息
使用方法:
python3 mock_alarm_server.py
默认监听0.0.0.0:8080
"""
from flask import Flask, request, jsonify
from datetime import datetime
import json
import sys
app = Flask(__name__)
# 模拟 token 存储 - 使用纯ASCII字符避免转义问题
mock_token = "mocktoken123456789abcdef123456789"
token_expire_time = 30 * 60 # 30分钟过期时间
# 统计信息
stats = {
"token_requests": 0,
"alarm_requests": 0,
"last_alarm": None
}
@app.route('/api/getToken', methods=['POST'])
def get_token():
"""
模拟获取 token 接口
返回格式与 d8_1.py 中一致
"""
stats["token_requests"] += 1
# 打印请求信息
print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 收到 Token 请求")
# 简化的响应纯ASCII避免转义问题
response = {
"errorDesc": "",
"message": "",
"responseBody": {
"expireTime": str(token_expire_time),
"token": mock_token,
"refreshToken": mock_token + "_refresh"
},
"retCode": "200"
}
# 手动序列化并打印原始JSON
raw_json = json.dumps(response, ensure_ascii=True)
print(f" 返回 JSON: {raw_json}")
# 返回纯文本JSON避免Flask jsonify的额外处理
return raw_json, 200, {'Content-Type': 'application/json'}
@app.route('/api/putMessage', methods=['POST'])
def put_message():
"""
模拟接收告警信息接口
接收格式与 d8_1.py 中 send_post_request 一致
"""
stats["alarm_requests"] += 1
# 获取请求头中的 token
token = request.headers.get('X-Access-Token', 'None')
# 解析请求体
try:
data = request.json if request.is_json else json.loads(request.data)
except:
data = {}
# 打印告警信息
print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 收到告警信息")
print(f" Token: {token[:30]}...")
print(f" 租户代码: {data.get('tenantCode', 'N/A')}")
print(f" 频道号: {data.get('channelNo', 'N/A')}")
print(f" 告警内容: {data.get('alarmContent', 'N/A')}")
print(f" 告警时间: {data.get('alarmTime', 'N/A')}")
# 打印图片/视频信息
pic_info = data.get('picInfo', [])
video_info = data.get('videoInfo', [])
if pic_info:
print(f" 图片地址: {len(pic_info)}")
for i, pic in enumerate(pic_info[:3]): # 只打印前3个
print(f" [{i+1}] {pic.get('url', 'N/A')}")
if len(pic_info) > 3:
print(f" ... 还有 {len(pic_info) - 3}")
if video_info:
print(f" 视频地址: {len(video_info)}")
for i, video in enumerate(video_info[:3]):
print(f" [{i+1}] {video.get('url', 'N/A')}")
# 更新统计
stats["last_alarm"] = {
"time": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"content": data.get('alarmContent', 'N/A'),
"channel": data.get('channelNo', 'N/A')
}
# 返回成功响应
response = {
"responseBody": "1",
"message": None,
"retCode": "200",
"errorDesc": None
}
print(f" 响应: 告警接收成功")
return jsonify(response)
@app.route('/stats', methods=['GET'])
def get_stats():
"""查看统计信息"""
return jsonify({
"token_requests": stats["token_requests"],
"alarm_requests": stats["alarm_requests"],
"last_alarm": stats["last_alarm"]
})
@app.route('/', methods=['GET'])
def index():
"""首页说明"""
return """
<h1>RK3588 模拟告警服务器</h1>
<p>可用接口:</p>
<ul>
<li><b>POST /api/getToken</b> - 获取访问令牌</li>
<li><b>POST /api/putMessage</b> - 接收告警信息</li>
<li><b>GET /stats</b> - 查看统计信息</li>
</ul>
<p>当前统计:</p>
<ul>
<li>Token 请求次数: {token_requests}</li>
<li>告警请求次数: {alarm_requests}</li>
</ul>
""".format(**stats)
def main():
host = "0.0.0.0"
port = 8080
print("=" * 60)
print("RK3588 模拟告警服务器")
print("=" * 60)
print(f"启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"监听地址: http://{host}:{port}")
print("")
print("接口列表:")
print(f" 1. POST http://{host}:{port}/api/getToken")
print(f" 2. POST http://{host}:{port}/api/putMessage")
print(f" 3. GET http://{host}:{port}/stats")
print("")
print("按 Ctrl+C 停止服务")
print("=" * 60)
try:
app.run(host=host, port=port, debug=False, threaded=True)
except KeyboardInterrupt:
print("\n\n服务已停止")
print(f"总计 Token 请求: {stats['token_requests']}")
print(f"总计告警请求: {stats['alarm_requests']}")
sys.exit(0)
if __name__ == '__main__':
main()