diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index a084c19..b16d7de 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -249,6 +249,7 @@ add_library(alarm SHARED alarm/packet_ring_buffer.cpp alarm/actions/log_action.cpp alarm/actions/http_action.cpp + alarm/actions/external_api_action.cpp alarm/actions/snapshot_action.cpp alarm/actions/clip_action.cpp alarm/uploaders/local_uploader.cpp diff --git a/plugins/alarm/actions/external_api_action.cpp b/plugins/alarm/actions/external_api_action.cpp new file mode 100644 index 0000000..50a05f1 --- /dev/null +++ b/plugins/alarm/actions/external_api_action.cpp @@ -0,0 +1,405 @@ +#include "external_api_action.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "utils/simple_json.h" +#include "utils/simple_json_writer.h" + +#if __has_include() +#include +#define HAS_CURL 1 +#else +#define HAS_CURL 0 +#endif + +namespace rk3588 { +namespace { + +#if HAS_CURL +size_t CurlWriteCb(char* ptr, size_t size, size_t nmemb, void* userdata) { + if (!userdata) return 0; + auto* out = static_cast(userdata); + out->append(ptr, size * nmemb); + return size * nmemb; +} + +bool CurlPostJson(const std::string& url, + const std::vector& headers, + const std::string& json_body, + int timeout_ms, + std::string& resp_out, + long& http_code_out, + std::string& err) { + http_code_out = 0; + resp_out.clear(); + + CURL* curl = curl_easy_init(); + if (!curl) { + err = "curl_easy_init failed"; + return false; + } + + struct curl_slist* hdrs = nullptr; + for (const auto& h : headers) { + hdrs = curl_slist_append(hdrs, h.c_str()); + } + + curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, hdrs); + curl_easy_setopt(curl, CURLOPT_POST, 1L); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, json_body.c_str()); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, static_cast(json_body.size())); + curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, timeout_ms); + curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT_MS, std::max(500, timeout_ms / 2)); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &CurlWriteCb); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &resp_out); + + CURLcode res = curl_easy_perform(curl); + curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code_out); + + curl_slist_free_all(hdrs); + curl_easy_cleanup(curl); + + if (res != CURLE_OK) { + err = curl_easy_strerror(res); + return false; + } + return true; +} +#endif + +const SimpleJson* FindByDottedPath(const SimpleJson& root, const std::string& path) { + const SimpleJson* cur = &root; + size_t start = 0; + while (start < path.size()) { + size_t dot = path.find('.', start); + const std::string key = (dot == std::string::npos) ? path.substr(start) : path.substr(start, dot - start); + if (!cur || !cur->IsObject()) return nullptr; + cur = cur->Find(key); + if (dot == std::string::npos) break; + start = dot + 1; + } + return cur; +} + +} // namespace + +ExternalApiAction::~ExternalApiAction() { + Drain(); + Stop(); +} + +bool ExternalApiAction::Init(const SimpleJson& config) { + get_token_url_ = config.ValueOr("getTokenUrl", config.ValueOr("get_token_url", "")); + put_message_url_ = config.ValueOr("putMessageUrl", config.ValueOr("put_message_url", "")); + + tenant_code_ = config.ValueOr("tenantCode", config.ValueOr("tenant_code", "")); + channel_no_ = config.ValueOr("channelNo", config.ValueOr("channel_no", "")); + + include_media_url_ = config.ValueOr("include_media_url", true); + timeout_ms_ = config.ValueOr("timeout_ms", 3000); + + token_header_ = config.ValueOr("token_header", "X-Access-Token"); + token_json_path_ = config.ValueOr("token_json_path", "responseBody.token"); + token_cache_sec_ = std::max(0, config.ValueOr("token_cache_sec", 600)); + + alarm_content_ = config.ValueOr("alarmContent", config.ValueOr("alarm_content", "")); + use_rule_name_as_content_ = config.ValueOr("use_rule_name_as_content", alarm_content_.empty()); + + max_queue_size_ = static_cast( + std::max(0, config.ValueOr("max_queue_size", static_cast(max_queue_size_)))); + const std::string policy = config.ValueOr("queue_policy", "drop_oldest"); + if (policy == "drop_newest") queue_policy_ = QueuePolicy::DropNewest; + else queue_policy_ = QueuePolicy::DropOldest; + + max_retries_ = std::max(0, config.ValueOr("max_retries", 0)); + retry_backoff_ms_ = std::max(0, config.ValueOr("retry_backoff_ms", 200)); + + if (get_token_url_.empty()) { + std::cerr << "[ExternalApiAction] getTokenUrl is required\n"; + return false; + } + if (put_message_url_.empty()) { + std::cerr << "[ExternalApiAction] putMessageUrl is required\n"; + return false; + } + if (tenant_code_.empty()) { + std::cerr << "[ExternalApiAction] tenantCode is required\n"; + return false; + } + if (channel_no_.empty()) { + std::cerr << "[ExternalApiAction] channelNo is required\n"; + return false; + } + +#if HAS_CURL + curl_global_init(CURL_GLOBAL_DEFAULT); +#endif + + running_.store(true); + worker_ = std::thread(&ExternalApiAction::WorkerLoop, this); + std::cout << "[ExternalApiAction] initialized, token_url=" << get_token_url_ + << " msg_url=" << put_message_url_ << "\n"; + return true; +} + +void ExternalApiAction::Execute(AlarmEvent& event, std::shared_ptr /*frame*/) { + Job job; + job.tenant_code = tenant_code_; + job.channel_no = channel_no_; + job.timestamp_ms = event.timestamp_ms; + + if (!alarm_content_.empty()) { + job.alarm_content = alarm_content_; + } else if (use_rule_name_as_content_) { + job.alarm_content = event.rule_name; + } else { + job.alarm_content = event.rule_name; + } + + if (include_media_url_) { + job.pic_url = event.snapshot_url; + job.video_url = event.clip_url; + } + + { + std::lock_guard lock(queue_mutex_); + if (max_queue_size_ > 0 && queue_.size() >= max_queue_size_) { + ++dropped_total_; + if (queue_policy_ == QueuePolicy::DropOldest) { + queue_.pop(); + queue_.push(std::move(job)); + } else { + // DropNewest: drop this request + } + } else { + queue_.push(std::move(job)); + } + } + queue_cv_.notify_one(); +} + +void ExternalApiAction::Drain() { + if (!running_.load()) return; + std::unique_lock lock(queue_mutex_); + queue_cv_.wait_for(lock, std::chrono::seconds(5), [this] { return queue_.empty() && in_flight_ == 0; }); +} + +void ExternalApiAction::Stop() { + bool was_running = running_.exchange(false); + (void)was_running; + queue_cv_.notify_all(); + if (worker_.joinable()) worker_.join(); +} + +void ExternalApiAction::WorkerLoop() { + while (running_.load()) { + Job job; + { + std::unique_lock lock(queue_mutex_); + queue_cv_.wait_for(lock, std::chrono::milliseconds(100), + [this] { return !queue_.empty() || !running_.load(); }); + if (!running_.load() && queue_.empty()) break; + if (queue_.empty()) continue; + job = std::move(queue_.front()); + queue_.pop(); + ++in_flight_; + } + + std::string err; + if (!EnsureToken(err)) { + std::cerr << "[ExternalApiAction] ensure token failed: " << err << "\n"; + } else { + std::string token; + { + std::lock_guard tl(token_mu_); + token = token_; + } + + const int max_attempts = std::max(1, max_retries_ + 1); + for (int attempt = 0; attempt < max_attempts; ++attempt) { + if (attempt > 0 && retry_backoff_ms_ > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(retry_backoff_ms_ * attempt)); + } + + long http_code = 0; + std::string send_err; + if (SendMessageWithToken(job, token, http_code, send_err)) { + break; + } + + // Token may have expired/been revoked. + if (http_code == 401 || http_code == 403) { + std::string terr; + { + std::lock_guard tl(token_mu_); + token_.clear(); + token_expire_tp_ = std::chrono::steady_clock::time_point{}; + } + if (EnsureToken(terr)) { + std::lock_guard tl(token_mu_); + token = token_; + continue; + } + std::cerr << "[ExternalApiAction] refresh token failed: " << terr << "\n"; + } + + std::cerr << "[ExternalApiAction] send failed: " << send_err << " (http=" << http_code << ")\n"; + } + } + + { + std::lock_guard lock(queue_mutex_); + if (in_flight_ > 0) --in_flight_; + } + queue_cv_.notify_all(); + } +} + +bool ExternalApiAction::EnsureToken(std::string& err) { + { + std::lock_guard tl(token_mu_); + if (!token_.empty()) { + if (token_cache_sec_ <= 0) { + return true; + } + if (token_expire_tp_.time_since_epoch().count() == 0) { + return true; + } + if (std::chrono::steady_clock::now() < token_expire_tp_) { + return true; + } + } + } + + std::string token; + if (!FetchToken(token, err)) { + return false; + } + + std::lock_guard tl(token_mu_); + token_ = std::move(token); + if (token_cache_sec_ > 0) { + token_expire_tp_ = std::chrono::steady_clock::now() + std::chrono::seconds(token_cache_sec_); + } else { + token_expire_tp_ = std::chrono::steady_clock::time_point{}; + } + return true; +} + +bool ExternalApiAction::FetchToken(std::string& token_out, std::string& err) { + token_out.clear(); + +#if HAS_CURL + std::vector headers; + headers.emplace_back("Content-Type: application/json"); + std::string resp; + long code = 0; + if (!CurlPostJson(get_token_url_, headers, "", timeout_ms_, resp, code, err)) { + return false; + } + if (code < 200 || code >= 300) { + err = "getToken http " + std::to_string(code); + return false; + } + + SimpleJson root; + std::string perr; + if (!ParseSimpleJson(resp, root, perr)) { + err = "getToken invalid json: " + perr; + return false; + } + + const SimpleJson* v = FindByDottedPath(root, token_json_path_); + if (!v || !v->IsString()) { + // Fallbacks + if (const SimpleJson* t = root.Find("token")) { + v = t; + } + } + if (!v || !v->IsString()) { + err = "token not found at path: " + token_json_path_; + return false; + } + token_out = v->AsString(""); + if (token_out.empty()) { + err = "empty token"; + return false; + } + return true; +#else + err = "curl not available"; + return false; +#endif +} + +bool ExternalApiAction::SendMessageWithToken(const Job& job, const std::string& token, long& http_code, std::string& err) { + http_code = 0; + +#if HAS_CURL + std::vector headers; + headers.emplace_back("Content-Type: application/json"); + if (!token.empty()) { + headers.emplace_back(token_header_ + ": " + token); + } + + const std::string body = BuildMessageJson(job); + std::string resp; + if (!CurlPostJson(put_message_url_, headers, body, timeout_ms_, resp, http_code, err)) { + return false; + } + if (http_code < 200 || http_code >= 300) { + err = "putMessage http " + std::to_string(http_code); + return false; + } + return true; +#else + (void)job; + (void)token; + std::cout << "[ExternalApiAction] curl not available; would POST to " << put_message_url_ << "\n"; + return true; +#endif +} + +std::string ExternalApiAction::FormatAlarmTime(uint64_t timestamp_ms) const { + const std::time_t t = static_cast(timestamp_ms / 1000ULL); + std::tm* tm = std::localtime(&t); + if (!tm) return ""; + std::ostringstream oss; + oss << std::put_time(tm, "%Y-%m-%d %H:%M:%S"); + return oss.str(); +} + +std::string ExternalApiAction::BuildMessageJson(const Job& job) const { + SimpleJson::Object payload; + payload["tenantCode"] = SimpleJson(job.tenant_code); + payload["channelNo"] = SimpleJson(job.channel_no); + payload["alarmContent"] = SimpleJson(job.alarm_content); + payload["alarmTime"] = SimpleJson(FormatAlarmTime(job.timestamp_ms)); + + SimpleJson::Array pic; + if (!job.pic_url.empty()) { + SimpleJson::Object o; + o["url"] = SimpleJson(job.pic_url); + pic.emplace_back(SimpleJson(std::move(o))); + } + payload["picInfo"] = SimpleJson(std::move(pic)); + + SimpleJson::Array video; + if (!job.video_url.empty()) { + SimpleJson::Object o; + o["url"] = SimpleJson(job.video_url); + video.emplace_back(SimpleJson(std::move(o))); + } + payload["videoInfo"] = SimpleJson(std::move(video)); + + return StringifySimpleJson(SimpleJson(std::move(payload))); +} + +} // namespace rk3588 diff --git a/plugins/alarm/actions/external_api_action.h b/plugins/alarm/actions/external_api_action.h new file mode 100644 index 0000000..b989b2f --- /dev/null +++ b/plugins/alarm/actions/external_api_action.h @@ -0,0 +1,83 @@ +#pragma once + +#include "action_base.h" + +#include +#include +#include +#include +#include +#include +#include + +namespace rk3588 { + +class ExternalApiAction : public IAlarmAction { +public: + ~ExternalApiAction() override; + + bool Init(const SimpleJson& config) override; + void Execute(AlarmEvent& event, std::shared_ptr frame) override; + void Drain() override; + void Stop() override; + std::string Name() const override { return "external_api"; } + +private: + struct Job { + std::string tenant_code; + std::string channel_no; + std::string alarm_content; + uint64_t timestamp_ms = 0; + std::string pic_url; + std::string video_url; + }; + + enum class QueuePolicy { + DropOldest, + DropNewest, + }; + + void WorkerLoop(); + bool EnsureToken(std::string& err); + bool FetchToken(std::string& token_out, std::string& err); + bool SendMessageWithToken(const Job& job, const std::string& token, long& http_code, std::string& err); + + std::string BuildMessageJson(const Job& job) const; + std::string FormatAlarmTime(uint64_t timestamp_ms) const; + + std::string get_token_url_; + std::string put_message_url_; + + std::string tenant_code_; + std::string channel_no_; + + std::string alarm_content_; + bool use_rule_name_as_content_ = true; + + bool include_media_url_ = true; + int timeout_ms_ = 3000; + + std::string token_header_ = "X-Access-Token"; + std::string token_json_path_ = "responseBody.token"; + int token_cache_sec_ = 600; + + int max_retries_ = 0; + int retry_backoff_ms_ = 200; + + std::atomic running_{false}; + std::thread worker_; + std::mutex queue_mutex_; + std::condition_variable queue_cv_; + std::queue queue_; + size_t in_flight_ = 0; + + size_t max_queue_size_ = 0; // 0 = unlimited + QueuePolicy queue_policy_ = QueuePolicy::DropOldest; + uint64_t dropped_total_ = 0; + + std::mutex token_mu_; + std::string token_; + std::chrono::steady_clock::time_point token_expire_tp_{}; +}; + +} // namespace rk3588 diff --git a/plugins/alarm/alarm_node.cpp b/plugins/alarm/alarm_node.cpp index bb431af..f4ad8af 100644 --- a/plugins/alarm/alarm_node.cpp +++ b/plugins/alarm/alarm_node.cpp @@ -15,6 +15,7 @@ #include "actions/action_base.h" #include "actions/log_action.h" #include "actions/http_action.h" +#include "actions/external_api_action.h" #include "actions/snapshot_action.h" #include "actions/clip_action.h" @@ -131,7 +132,7 @@ public: } } - // HTTP action (should be last to include media URLs) + // HTTP action (should be after snapshot/clip to include media URLs) if (const SimpleJson* http_cfg = actions_cfg->Find("http")) { if (http_cfg->ValueOr("enable", false)) { auto action = std::make_unique(); @@ -140,6 +141,16 @@ public: } } } + + // External API action (token + message). Should be after snapshot/clip to include media URLs. + if (const SimpleJson* ext_cfg = actions_cfg->Find("external_api")) { + if (ext_cfg->ValueOr("enable", false)) { + auto action = std::make_unique(); + if (action->Init(*ext_cfg)) { + actions_.push_back(std::move(action)); + } + } + } } else { // Default: just log action auto action = std::make_unique(); @@ -259,6 +270,15 @@ public: } } } + + if (const SimpleJson* ext_cfg = actions_cfg->Find("external_api")) { + if (ext_cfg->ValueOr("enable", false)) { + auto action = std::make_unique(); + if (action->Init(*ext_cfg)) { + new_actions.push_back(std::move(action)); + } + } + } } if (new_actions.empty()) { auto action = std::make_unique();