修复对接api的问题
This commit is contained in:
parent
2ad8a934e0
commit
bb22c34696
@ -249,6 +249,7 @@ add_library(alarm SHARED
|
||||
alarm/packet_ring_buffer.cpp
|
||||
alarm/actions/log_action.cpp
|
||||
alarm/actions/http_action.cpp
|
||||
alarm/actions/external_api_action.cpp
|
||||
alarm/actions/snapshot_action.cpp
|
||||
alarm/actions/clip_action.cpp
|
||||
alarm/uploaders/local_uploader.cpp
|
||||
|
||||
405
plugins/alarm/actions/external_api_action.cpp
Normal file
405
plugins/alarm/actions/external_api_action.cpp
Normal file
@ -0,0 +1,405 @@
|
||||
#include "external_api_action.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
#include <iomanip>
|
||||
#include <ctime>
|
||||
#include <iostream>
|
||||
#include <sstream>
|
||||
#include <vector>
|
||||
|
||||
#include "utils/simple_json.h"
|
||||
#include "utils/simple_json_writer.h"
|
||||
|
||||
#if __has_include(<curl/curl.h>)
|
||||
#include <curl/curl.h>
|
||||
#define HAS_CURL 1
|
||||
#else
|
||||
#define HAS_CURL 0
|
||||
#endif
|
||||
|
||||
namespace rk3588 {
|
||||
namespace {
|
||||
|
||||
#if HAS_CURL
|
||||
size_t CurlWriteCb(char* ptr, size_t size, size_t nmemb, void* userdata) {
|
||||
if (!userdata) return 0;
|
||||
auto* out = static_cast<std::string*>(userdata);
|
||||
out->append(ptr, size * nmemb);
|
||||
return size * nmemb;
|
||||
}
|
||||
|
||||
bool CurlPostJson(const std::string& url,
|
||||
const std::vector<std::string>& headers,
|
||||
const std::string& json_body,
|
||||
int timeout_ms,
|
||||
std::string& resp_out,
|
||||
long& http_code_out,
|
||||
std::string& err) {
|
||||
http_code_out = 0;
|
||||
resp_out.clear();
|
||||
|
||||
CURL* curl = curl_easy_init();
|
||||
if (!curl) {
|
||||
err = "curl_easy_init failed";
|
||||
return false;
|
||||
}
|
||||
|
||||
struct curl_slist* hdrs = nullptr;
|
||||
for (const auto& h : headers) {
|
||||
hdrs = curl_slist_append(hdrs, h.c_str());
|
||||
}
|
||||
|
||||
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
|
||||
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, hdrs);
|
||||
curl_easy_setopt(curl, CURLOPT_POST, 1L);
|
||||
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, json_body.c_str());
|
||||
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, static_cast<long>(json_body.size()));
|
||||
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, timeout_ms);
|
||||
curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT_MS, std::max(500, timeout_ms / 2));
|
||||
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &CurlWriteCb);
|
||||
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &resp_out);
|
||||
|
||||
CURLcode res = curl_easy_perform(curl);
|
||||
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code_out);
|
||||
|
||||
curl_slist_free_all(hdrs);
|
||||
curl_easy_cleanup(curl);
|
||||
|
||||
if (res != CURLE_OK) {
|
||||
err = curl_easy_strerror(res);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
#endif
|
||||
|
||||
const SimpleJson* FindByDottedPath(const SimpleJson& root, const std::string& path) {
|
||||
const SimpleJson* cur = &root;
|
||||
size_t start = 0;
|
||||
while (start < path.size()) {
|
||||
size_t dot = path.find('.', start);
|
||||
const std::string key = (dot == std::string::npos) ? path.substr(start) : path.substr(start, dot - start);
|
||||
if (!cur || !cur->IsObject()) return nullptr;
|
||||
cur = cur->Find(key);
|
||||
if (dot == std::string::npos) break;
|
||||
start = dot + 1;
|
||||
}
|
||||
return cur;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
ExternalApiAction::~ExternalApiAction() {
|
||||
Drain();
|
||||
Stop();
|
||||
}
|
||||
|
||||
bool ExternalApiAction::Init(const SimpleJson& config) {
|
||||
get_token_url_ = config.ValueOr<std::string>("getTokenUrl", config.ValueOr<std::string>("get_token_url", ""));
|
||||
put_message_url_ = config.ValueOr<std::string>("putMessageUrl", config.ValueOr<std::string>("put_message_url", ""));
|
||||
|
||||
tenant_code_ = config.ValueOr<std::string>("tenantCode", config.ValueOr<std::string>("tenant_code", ""));
|
||||
channel_no_ = config.ValueOr<std::string>("channelNo", config.ValueOr<std::string>("channel_no", ""));
|
||||
|
||||
include_media_url_ = config.ValueOr<bool>("include_media_url", true);
|
||||
timeout_ms_ = config.ValueOr<int>("timeout_ms", 3000);
|
||||
|
||||
token_header_ = config.ValueOr<std::string>("token_header", "X-Access-Token");
|
||||
token_json_path_ = config.ValueOr<std::string>("token_json_path", "responseBody.token");
|
||||
token_cache_sec_ = std::max(0, config.ValueOr<int>("token_cache_sec", 600));
|
||||
|
||||
alarm_content_ = config.ValueOr<std::string>("alarmContent", config.ValueOr<std::string>("alarm_content", ""));
|
||||
use_rule_name_as_content_ = config.ValueOr<bool>("use_rule_name_as_content", alarm_content_.empty());
|
||||
|
||||
max_queue_size_ = static_cast<size_t>(
|
||||
std::max(0, config.ValueOr<int>("max_queue_size", static_cast<int>(max_queue_size_))));
|
||||
const std::string policy = config.ValueOr<std::string>("queue_policy", "drop_oldest");
|
||||
if (policy == "drop_newest") queue_policy_ = QueuePolicy::DropNewest;
|
||||
else queue_policy_ = QueuePolicy::DropOldest;
|
||||
|
||||
max_retries_ = std::max(0, config.ValueOr<int>("max_retries", 0));
|
||||
retry_backoff_ms_ = std::max(0, config.ValueOr<int>("retry_backoff_ms", 200));
|
||||
|
||||
if (get_token_url_.empty()) {
|
||||
std::cerr << "[ExternalApiAction] getTokenUrl is required\n";
|
||||
return false;
|
||||
}
|
||||
if (put_message_url_.empty()) {
|
||||
std::cerr << "[ExternalApiAction] putMessageUrl is required\n";
|
||||
return false;
|
||||
}
|
||||
if (tenant_code_.empty()) {
|
||||
std::cerr << "[ExternalApiAction] tenantCode is required\n";
|
||||
return false;
|
||||
}
|
||||
if (channel_no_.empty()) {
|
||||
std::cerr << "[ExternalApiAction] channelNo is required\n";
|
||||
return false;
|
||||
}
|
||||
|
||||
#if HAS_CURL
|
||||
curl_global_init(CURL_GLOBAL_DEFAULT);
|
||||
#endif
|
||||
|
||||
running_.store(true);
|
||||
worker_ = std::thread(&ExternalApiAction::WorkerLoop, this);
|
||||
std::cout << "[ExternalApiAction] initialized, token_url=" << get_token_url_
|
||||
<< " msg_url=" << put_message_url_ << "\n";
|
||||
return true;
|
||||
}
|
||||
|
||||
void ExternalApiAction::Execute(AlarmEvent& event, std::shared_ptr<Frame> /*frame*/) {
|
||||
Job job;
|
||||
job.tenant_code = tenant_code_;
|
||||
job.channel_no = channel_no_;
|
||||
job.timestamp_ms = event.timestamp_ms;
|
||||
|
||||
if (!alarm_content_.empty()) {
|
||||
job.alarm_content = alarm_content_;
|
||||
} else if (use_rule_name_as_content_) {
|
||||
job.alarm_content = event.rule_name;
|
||||
} else {
|
||||
job.alarm_content = event.rule_name;
|
||||
}
|
||||
|
||||
if (include_media_url_) {
|
||||
job.pic_url = event.snapshot_url;
|
||||
job.video_url = event.clip_url;
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(queue_mutex_);
|
||||
if (max_queue_size_ > 0 && queue_.size() >= max_queue_size_) {
|
||||
++dropped_total_;
|
||||
if (queue_policy_ == QueuePolicy::DropOldest) {
|
||||
queue_.pop();
|
||||
queue_.push(std::move(job));
|
||||
} else {
|
||||
// DropNewest: drop this request
|
||||
}
|
||||
} else {
|
||||
queue_.push(std::move(job));
|
||||
}
|
||||
}
|
||||
queue_cv_.notify_one();
|
||||
}
|
||||
|
||||
void ExternalApiAction::Drain() {
|
||||
if (!running_.load()) return;
|
||||
std::unique_lock<std::mutex> lock(queue_mutex_);
|
||||
queue_cv_.wait_for(lock, std::chrono::seconds(5), [this] { return queue_.empty() && in_flight_ == 0; });
|
||||
}
|
||||
|
||||
void ExternalApiAction::Stop() {
|
||||
bool was_running = running_.exchange(false);
|
||||
(void)was_running;
|
||||
queue_cv_.notify_all();
|
||||
if (worker_.joinable()) worker_.join();
|
||||
}
|
||||
|
||||
void ExternalApiAction::WorkerLoop() {
|
||||
while (running_.load()) {
|
||||
Job job;
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(queue_mutex_);
|
||||
queue_cv_.wait_for(lock, std::chrono::milliseconds(100),
|
||||
[this] { return !queue_.empty() || !running_.load(); });
|
||||
if (!running_.load() && queue_.empty()) break;
|
||||
if (queue_.empty()) continue;
|
||||
job = std::move(queue_.front());
|
||||
queue_.pop();
|
||||
++in_flight_;
|
||||
}
|
||||
|
||||
std::string err;
|
||||
if (!EnsureToken(err)) {
|
||||
std::cerr << "[ExternalApiAction] ensure token failed: " << err << "\n";
|
||||
} else {
|
||||
std::string token;
|
||||
{
|
||||
std::lock_guard<std::mutex> tl(token_mu_);
|
||||
token = token_;
|
||||
}
|
||||
|
||||
const int max_attempts = std::max(1, max_retries_ + 1);
|
||||
for (int attempt = 0; attempt < max_attempts; ++attempt) {
|
||||
if (attempt > 0 && retry_backoff_ms_ > 0) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(retry_backoff_ms_ * attempt));
|
||||
}
|
||||
|
||||
long http_code = 0;
|
||||
std::string send_err;
|
||||
if (SendMessageWithToken(job, token, http_code, send_err)) {
|
||||
break;
|
||||
}
|
||||
|
||||
// Token may have expired/been revoked.
|
||||
if (http_code == 401 || http_code == 403) {
|
||||
std::string terr;
|
||||
{
|
||||
std::lock_guard<std::mutex> tl(token_mu_);
|
||||
token_.clear();
|
||||
token_expire_tp_ = std::chrono::steady_clock::time_point{};
|
||||
}
|
||||
if (EnsureToken(terr)) {
|
||||
std::lock_guard<std::mutex> tl(token_mu_);
|
||||
token = token_;
|
||||
continue;
|
||||
}
|
||||
std::cerr << "[ExternalApiAction] refresh token failed: " << terr << "\n";
|
||||
}
|
||||
|
||||
std::cerr << "[ExternalApiAction] send failed: " << send_err << " (http=" << http_code << ")\n";
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(queue_mutex_);
|
||||
if (in_flight_ > 0) --in_flight_;
|
||||
}
|
||||
queue_cv_.notify_all();
|
||||
}
|
||||
}
|
||||
|
||||
bool ExternalApiAction::EnsureToken(std::string& err) {
|
||||
{
|
||||
std::lock_guard<std::mutex> tl(token_mu_);
|
||||
if (!token_.empty()) {
|
||||
if (token_cache_sec_ <= 0) {
|
||||
return true;
|
||||
}
|
||||
if (token_expire_tp_.time_since_epoch().count() == 0) {
|
||||
return true;
|
||||
}
|
||||
if (std::chrono::steady_clock::now() < token_expire_tp_) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::string token;
|
||||
if (!FetchToken(token, err)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> tl(token_mu_);
|
||||
token_ = std::move(token);
|
||||
if (token_cache_sec_ > 0) {
|
||||
token_expire_tp_ = std::chrono::steady_clock::now() + std::chrono::seconds(token_cache_sec_);
|
||||
} else {
|
||||
token_expire_tp_ = std::chrono::steady_clock::time_point{};
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ExternalApiAction::FetchToken(std::string& token_out, std::string& err) {
|
||||
token_out.clear();
|
||||
|
||||
#if HAS_CURL
|
||||
std::vector<std::string> headers;
|
||||
headers.emplace_back("Content-Type: application/json");
|
||||
std::string resp;
|
||||
long code = 0;
|
||||
if (!CurlPostJson(get_token_url_, headers, "", timeout_ms_, resp, code, err)) {
|
||||
return false;
|
||||
}
|
||||
if (code < 200 || code >= 300) {
|
||||
err = "getToken http " + std::to_string(code);
|
||||
return false;
|
||||
}
|
||||
|
||||
SimpleJson root;
|
||||
std::string perr;
|
||||
if (!ParseSimpleJson(resp, root, perr)) {
|
||||
err = "getToken invalid json: " + perr;
|
||||
return false;
|
||||
}
|
||||
|
||||
const SimpleJson* v = FindByDottedPath(root, token_json_path_);
|
||||
if (!v || !v->IsString()) {
|
||||
// Fallbacks
|
||||
if (const SimpleJson* t = root.Find("token")) {
|
||||
v = t;
|
||||
}
|
||||
}
|
||||
if (!v || !v->IsString()) {
|
||||
err = "token not found at path: " + token_json_path_;
|
||||
return false;
|
||||
}
|
||||
token_out = v->AsString("");
|
||||
if (token_out.empty()) {
|
||||
err = "empty token";
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
#else
|
||||
err = "curl not available";
|
||||
return false;
|
||||
#endif
|
||||
}
|
||||
|
||||
bool ExternalApiAction::SendMessageWithToken(const Job& job, const std::string& token, long& http_code, std::string& err) {
|
||||
http_code = 0;
|
||||
|
||||
#if HAS_CURL
|
||||
std::vector<std::string> headers;
|
||||
headers.emplace_back("Content-Type: application/json");
|
||||
if (!token.empty()) {
|
||||
headers.emplace_back(token_header_ + ": " + token);
|
||||
}
|
||||
|
||||
const std::string body = BuildMessageJson(job);
|
||||
std::string resp;
|
||||
if (!CurlPostJson(put_message_url_, headers, body, timeout_ms_, resp, http_code, err)) {
|
||||
return false;
|
||||
}
|
||||
if (http_code < 200 || http_code >= 300) {
|
||||
err = "putMessage http " + std::to_string(http_code);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
#else
|
||||
(void)job;
|
||||
(void)token;
|
||||
std::cout << "[ExternalApiAction] curl not available; would POST to " << put_message_url_ << "\n";
|
||||
return true;
|
||||
#endif
|
||||
}
|
||||
|
||||
std::string ExternalApiAction::FormatAlarmTime(uint64_t timestamp_ms) const {
|
||||
const std::time_t t = static_cast<std::time_t>(timestamp_ms / 1000ULL);
|
||||
std::tm* tm = std::localtime(&t);
|
||||
if (!tm) return "";
|
||||
std::ostringstream oss;
|
||||
oss << std::put_time(tm, "%Y-%m-%d %H:%M:%S");
|
||||
return oss.str();
|
||||
}
|
||||
|
||||
std::string ExternalApiAction::BuildMessageJson(const Job& job) const {
|
||||
SimpleJson::Object payload;
|
||||
payload["tenantCode"] = SimpleJson(job.tenant_code);
|
||||
payload["channelNo"] = SimpleJson(job.channel_no);
|
||||
payload["alarmContent"] = SimpleJson(job.alarm_content);
|
||||
payload["alarmTime"] = SimpleJson(FormatAlarmTime(job.timestamp_ms));
|
||||
|
||||
SimpleJson::Array pic;
|
||||
if (!job.pic_url.empty()) {
|
||||
SimpleJson::Object o;
|
||||
o["url"] = SimpleJson(job.pic_url);
|
||||
pic.emplace_back(SimpleJson(std::move(o)));
|
||||
}
|
||||
payload["picInfo"] = SimpleJson(std::move(pic));
|
||||
|
||||
SimpleJson::Array video;
|
||||
if (!job.video_url.empty()) {
|
||||
SimpleJson::Object o;
|
||||
o["url"] = SimpleJson(job.video_url);
|
||||
video.emplace_back(SimpleJson(std::move(o)));
|
||||
}
|
||||
payload["videoInfo"] = SimpleJson(std::move(video));
|
||||
|
||||
return StringifySimpleJson(SimpleJson(std::move(payload)));
|
||||
}
|
||||
|
||||
} // namespace rk3588
|
||||
83
plugins/alarm/actions/external_api_action.h
Normal file
83
plugins/alarm/actions/external_api_action.h
Normal file
@ -0,0 +1,83 @@
|
||||
#pragma once
|
||||
|
||||
#include "action_base.h"
|
||||
|
||||
#include <chrono>
|
||||
#include <atomic>
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
#include <queue>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
|
||||
namespace rk3588 {
|
||||
|
||||
class ExternalApiAction : public IAlarmAction {
|
||||
public:
|
||||
~ExternalApiAction() override;
|
||||
|
||||
bool Init(const SimpleJson& config) override;
|
||||
void Execute(AlarmEvent& event, std::shared_ptr<Frame> frame) override;
|
||||
void Drain() override;
|
||||
void Stop() override;
|
||||
std::string Name() const override { return "external_api"; }
|
||||
|
||||
private:
|
||||
struct Job {
|
||||
std::string tenant_code;
|
||||
std::string channel_no;
|
||||
std::string alarm_content;
|
||||
uint64_t timestamp_ms = 0;
|
||||
std::string pic_url;
|
||||
std::string video_url;
|
||||
};
|
||||
|
||||
enum class QueuePolicy {
|
||||
DropOldest,
|
||||
DropNewest,
|
||||
};
|
||||
|
||||
void WorkerLoop();
|
||||
bool EnsureToken(std::string& err);
|
||||
bool FetchToken(std::string& token_out, std::string& err);
|
||||
bool SendMessageWithToken(const Job& job, const std::string& token, long& http_code, std::string& err);
|
||||
|
||||
std::string BuildMessageJson(const Job& job) const;
|
||||
std::string FormatAlarmTime(uint64_t timestamp_ms) const;
|
||||
|
||||
std::string get_token_url_;
|
||||
std::string put_message_url_;
|
||||
|
||||
std::string tenant_code_;
|
||||
std::string channel_no_;
|
||||
|
||||
std::string alarm_content_;
|
||||
bool use_rule_name_as_content_ = true;
|
||||
|
||||
bool include_media_url_ = true;
|
||||
int timeout_ms_ = 3000;
|
||||
|
||||
std::string token_header_ = "X-Access-Token";
|
||||
std::string token_json_path_ = "responseBody.token";
|
||||
int token_cache_sec_ = 600;
|
||||
|
||||
int max_retries_ = 0;
|
||||
int retry_backoff_ms_ = 200;
|
||||
|
||||
std::atomic<bool> running_{false};
|
||||
std::thread worker_;
|
||||
std::mutex queue_mutex_;
|
||||
std::condition_variable queue_cv_;
|
||||
std::queue<Job> queue_;
|
||||
size_t in_flight_ = 0;
|
||||
|
||||
size_t max_queue_size_ = 0; // 0 = unlimited
|
||||
QueuePolicy queue_policy_ = QueuePolicy::DropOldest;
|
||||
uint64_t dropped_total_ = 0;
|
||||
|
||||
std::mutex token_mu_;
|
||||
std::string token_;
|
||||
std::chrono::steady_clock::time_point token_expire_tp_{};
|
||||
};
|
||||
|
||||
} // namespace rk3588
|
||||
@ -15,6 +15,7 @@
|
||||
#include "actions/action_base.h"
|
||||
#include "actions/log_action.h"
|
||||
#include "actions/http_action.h"
|
||||
#include "actions/external_api_action.h"
|
||||
#include "actions/snapshot_action.h"
|
||||
#include "actions/clip_action.h"
|
||||
|
||||
@ -131,7 +132,7 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
// HTTP action (should be last to include media URLs)
|
||||
// HTTP action (should be after snapshot/clip to include media URLs)
|
||||
if (const SimpleJson* http_cfg = actions_cfg->Find("http")) {
|
||||
if (http_cfg->ValueOr<bool>("enable", false)) {
|
||||
auto action = std::make_unique<HttpAction>();
|
||||
@ -140,6 +141,16 @@ public:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// External API action (token + message). Should be after snapshot/clip to include media URLs.
|
||||
if (const SimpleJson* ext_cfg = actions_cfg->Find("external_api")) {
|
||||
if (ext_cfg->ValueOr<bool>("enable", false)) {
|
||||
auto action = std::make_unique<ExternalApiAction>();
|
||||
if (action->Init(*ext_cfg)) {
|
||||
actions_.push_back(std::move(action));
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Default: just log action
|
||||
auto action = std::make_unique<LogAction>();
|
||||
@ -259,6 +270,15 @@ public:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (const SimpleJson* ext_cfg = actions_cfg->Find("external_api")) {
|
||||
if (ext_cfg->ValueOr<bool>("enable", false)) {
|
||||
auto action = std::make_unique<ExternalApiAction>();
|
||||
if (action->Init(*ext_cfg)) {
|
||||
new_actions.push_back(std::move(action));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (new_actions.empty()) {
|
||||
auto action = std::make_unique<LogAction>();
|
||||
|
||||
Loading…
Reference in New Issue
Block a user