OrangePi3588Media/plugins/alarm/actions/external_api_action.cpp

457 lines
14 KiB
C++

#include "external_api_action.h"
#include <algorithm>
#include <chrono>
#include <cstdlib>
#include <iomanip>
#include <ctime>
#include <mutex>
#include <sstream>
#include <vector>
#include "utils/simple_json.h"
#include "utils/simple_json_writer.h"
#include "utils/logger.h"
#if __has_include(<curl/curl.h>)
#include <curl/curl.h>
#define HAS_CURL 1
#else
#define HAS_CURL 0
#endif
namespace rk3588 {
namespace {
bool SafeLocalTime(std::time_t t, std::tm& out) {
#if defined(_WIN32)
return localtime_s(&out, &t) == 0;
#elif defined(__unix__) || defined(__APPLE__)
return localtime_r(&t, &out) != nullptr;
#else
static std::mutex mu;
std::lock_guard<std::mutex> lock(mu);
std::tm* p = std::localtime(&t);
if (!p) return false;
out = *p;
return true;
#endif
}
#if HAS_CURL
void EnsureCurlGlobalInitOnce() {
static std::once_flag once;
std::call_once(once, []() {
curl_global_init(CURL_GLOBAL_DEFAULT);
std::atexit([]() { curl_global_cleanup(); });
});
}
size_t CurlWriteCb(char* ptr, size_t size, size_t nmemb, void* userdata) {
if (!userdata) return 0;
auto* out = static_cast<std::string*>(userdata);
out->append(ptr, size * nmemb);
return size * nmemb;
}
bool CurlPostJson(const std::string& url,
const std::vector<std::string>& headers,
const std::string& json_body,
int timeout_ms,
std::string& resp_out,
long& http_code_out,
std::string& err) {
http_code_out = 0;
resp_out.clear();
CURL* curl = curl_easy_init();
if (!curl) {
err = "curl_easy_init failed";
return false;
}
struct curl_slist* hdrs = nullptr;
for (const auto& h : headers) {
hdrs = curl_slist_append(hdrs, h.c_str());
}
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, hdrs);
curl_easy_setopt(curl, CURLOPT_POST, 1L);
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, json_body.c_str());
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, static_cast<long>(json_body.size()));
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, timeout_ms);
curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT_MS, std::max(500, timeout_ms / 2));
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &CurlWriteCb);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &resp_out);
CURLcode res = curl_easy_perform(curl);
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code_out);
curl_slist_free_all(hdrs);
curl_easy_cleanup(curl);
if (res != CURLE_OK) {
err = curl_easy_strerror(res);
return false;
}
return true;
}
#endif
const SimpleJson* FindByDottedPath(const SimpleJson& root, const std::string& path) {
const SimpleJson* cur = &root;
size_t start = 0;
while (start < path.size()) {
size_t dot = path.find('.', start);
const std::string key = (dot == std::string::npos) ? path.substr(start) : path.substr(start, dot - start);
if (!cur || !cur->IsObject()) return nullptr;
cur = cur->Find(key);
if (dot == std::string::npos) break;
start = dot + 1;
}
return cur;
}
} // namespace
std::string BuildExternalApiResultLogLine(const std::string& prefix,
const std::string& alarm_content,
const std::string& pic_url,
const std::string& video_url,
long http_code,
const std::string& err) {
std::ostringstream oss;
oss << "[ExternalApiAction] " << prefix
<< " http=" << http_code
<< " alarm_content=" << alarm_content;
if (!pic_url.empty()) {
oss << " pic_url=" << pic_url;
}
if (!video_url.empty()) {
oss << " video_url=" << video_url;
}
if (!err.empty()) {
oss << " error=" << err;
}
return oss.str();
}
ExternalApiAction::~ExternalApiAction() {
Drain();
Stop();
}
bool ExternalApiAction::Init(const SimpleJson& config) {
get_token_url_ = config.ValueOr<std::string>("getTokenUrl", config.ValueOr<std::string>("get_token_url", ""));
put_message_url_ = config.ValueOr<std::string>("putMessageUrl", config.ValueOr<std::string>("put_message_url", ""));
tenant_code_ = config.ValueOr<std::string>("tenantCode", config.ValueOr<std::string>("tenant_code", ""));
channel_no_ = config.ValueOr<std::string>("channelNo", config.ValueOr<std::string>("channel_no", ""));
include_media_url_ = config.ValueOr<bool>("include_media_url", true);
timeout_ms_ = config.ValueOr<int>("timeout_ms", 3000);
token_header_ = config.ValueOr<std::string>("token_header", "X-Access-Token");
token_json_path_ = config.ValueOr<std::string>("token_json_path", "responseBody.token");
token_cache_sec_ = std::max(0, config.ValueOr<int>("token_cache_sec", 600));
alarm_content_ = config.ValueOr<std::string>("alarmContent", config.ValueOr<std::string>("alarm_content", ""));
use_rule_name_as_content_ = config.ValueOr<bool>("use_rule_name_as_content", alarm_content_.empty());
max_queue_size_ = static_cast<size_t>(
std::max(0, config.ValueOr<int>("max_queue_size", static_cast<int>(max_queue_size_))));
const std::string policy = config.ValueOr<std::string>("queue_policy", "drop_oldest");
if (policy == "drop_newest") queue_policy_ = QueuePolicy::DropNewest;
else queue_policy_ = QueuePolicy::DropOldest;
max_retries_ = std::max(0, config.ValueOr<int>("max_retries", 0));
retry_backoff_ms_ = std::max(0, config.ValueOr<int>("retry_backoff_ms", 200));
if (get_token_url_.empty()) {
LogError("[ExternalApiAction] getTokenUrl is required");
return false;
}
if (put_message_url_.empty()) {
LogError("[ExternalApiAction] putMessageUrl is required");
return false;
}
if (tenant_code_.empty()) {
LogError("[ExternalApiAction] tenantCode is required");
return false;
}
if (channel_no_.empty()) {
LogError("[ExternalApiAction] channelNo is required");
return false;
}
#if HAS_CURL
EnsureCurlGlobalInitOnce();
#endif
running_.store(true);
worker_ = std::thread(&ExternalApiAction::WorkerLoop, this);
LogInfo("[ExternalApiAction] initialized, token_url=" + get_token_url_ + " msg_url=" + put_message_url_);
return true;
}
void ExternalApiAction::Execute(AlarmEvent& event, std::shared_ptr<Frame> /*frame*/) {
Job job;
job.tenant_code = tenant_code_;
job.channel_no = channel_no_;
job.timestamp_ms = event.timestamp_ms;
if (!alarm_content_.empty()) {
job.alarm_content = alarm_content_;
} else if (use_rule_name_as_content_) {
job.alarm_content = event.rule_name;
} else {
job.alarm_content = event.rule_name;
}
if (include_media_url_) {
job.pic_url = event.snapshot_url;
job.video_url = event.clip_url;
}
{
std::lock_guard<std::mutex> lock(queue_mutex_);
if (max_queue_size_ > 0 && queue_.size() >= max_queue_size_) {
++dropped_total_;
if (queue_policy_ == QueuePolicy::DropOldest) {
queue_.pop();
queue_.push(std::move(job));
} else {
// DropNewest: drop this request
}
} else {
queue_.push(std::move(job));
}
}
queue_cv_.notify_one();
}
void ExternalApiAction::Drain() {
if (!running_.load()) return;
std::unique_lock<std::mutex> lock(queue_mutex_);
queue_cv_.wait_for(lock, std::chrono::seconds(5), [this] { return queue_.empty() && in_flight_ == 0; });
}
void ExternalApiAction::Stop() {
bool was_running = running_.exchange(false);
(void)was_running;
queue_cv_.notify_all();
if (worker_.joinable()) worker_.join();
}
void ExternalApiAction::WorkerLoop() {
while (running_.load()) {
Job job;
{
std::unique_lock<std::mutex> lock(queue_mutex_);
queue_cv_.wait_for(lock, std::chrono::milliseconds(100),
[this] { return !queue_.empty() || !running_.load(); });
if (!running_.load() && queue_.empty()) break;
if (queue_.empty()) continue;
job = std::move(queue_.front());
queue_.pop();
++in_flight_;
}
std::string err;
if (!EnsureToken(err)) {
LogWarn("[ExternalApiAction] ensure token failed: " + err);
} else {
std::string token;
{
std::lock_guard<std::mutex> tl(token_mu_);
token = token_;
}
const int max_attempts = std::max(1, max_retries_ + 1);
for (int attempt = 0; attempt < max_attempts; ++attempt) {
if (attempt > 0 && retry_backoff_ms_ > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(retry_backoff_ms_ * attempt));
}
long http_code = 0;
std::string send_err;
if (SendMessageWithToken(job, token, http_code, send_err)) {
LogInfo(BuildExternalApiResultLogLine(
"send ok", job.alarm_content, job.pic_url, job.video_url, http_code, ""));
break;
}
// Token may have expired/been revoked.
if (http_code == 401 || http_code == 403) {
std::string terr;
{
std::lock_guard<std::mutex> tl(token_mu_);
token_.clear();
token_expire_tp_ = std::chrono::steady_clock::time_point{};
}
if (EnsureToken(terr)) {
std::lock_guard<std::mutex> tl(token_mu_);
token = token_;
continue;
}
LogWarn("[ExternalApiAction] refresh token failed: " + terr);
}
LogWarn(BuildExternalApiResultLogLine(
"send failed", job.alarm_content, job.pic_url, job.video_url, http_code, send_err));
}
}
{
std::lock_guard<std::mutex> lock(queue_mutex_);
if (in_flight_ > 0) --in_flight_;
}
queue_cv_.notify_all();
}
}
bool ExternalApiAction::EnsureToken(std::string& err) {
{
std::lock_guard<std::mutex> tl(token_mu_);
if (!token_.empty()) {
if (token_cache_sec_ <= 0) {
return true;
}
if (token_expire_tp_.time_since_epoch().count() == 0) {
return true;
}
if (std::chrono::steady_clock::now() < token_expire_tp_) {
return true;
}
}
}
std::string token;
if (!FetchToken(token, err)) {
return false;
}
LogInfo("[ExternalApiAction] token fetched successfully");
std::lock_guard<std::mutex> tl(token_mu_);
token_ = std::move(token);
if (token_cache_sec_ > 0) {
token_expire_tp_ = std::chrono::steady_clock::now() + std::chrono::seconds(token_cache_sec_);
} else {
token_expire_tp_ = std::chrono::steady_clock::time_point{};
}
return true;
}
bool ExternalApiAction::FetchToken(std::string& token_out, std::string& err) {
token_out.clear();
#if HAS_CURL
std::vector<std::string> headers;
headers.emplace_back("Content-Type: application/json");
std::string resp;
long code = 0;
if (!CurlPostJson(get_token_url_, headers, "", timeout_ms_, resp, code, err)) {
return false;
}
if (code < 200 || code >= 300) {
err = "getToken http " + std::to_string(code);
return false;
}
SimpleJson root;
std::string perr;
if (!ParseSimpleJson(resp, root, perr)) {
err = "getToken invalid json: " + perr;
return false;
}
const SimpleJson* v = FindByDottedPath(root, token_json_path_);
if (!v || !v->IsString()) {
// Fallbacks
if (const SimpleJson* t = root.Find("token")) {
v = t;
}
}
if (!v || !v->IsString()) {
err = "token not found at path: " + token_json_path_;
return false;
}
token_out = v->AsString("");
if (token_out.empty()) {
err = "empty token";
return false;
}
return true;
#else
err = "curl not available";
return false;
#endif
}
bool ExternalApiAction::SendMessageWithToken(const Job& job, const std::string& token, long& http_code, std::string& err) {
http_code = 0;
#if HAS_CURL
std::vector<std::string> headers;
headers.emplace_back("Content-Type: application/json");
if (!token.empty()) {
headers.emplace_back(token_header_ + ": " + token);
}
const std::string body = BuildMessageJson(job);
std::string resp;
if (!CurlPostJson(put_message_url_, headers, body, timeout_ms_, resp, http_code, err)) {
return false;
}
if (http_code < 200 || http_code >= 300) {
err = "putMessage http " + std::to_string(http_code);
return false;
}
return true;
#else
(void)job;
(void)token;
LogInfo("[ExternalApiAction] curl not available; would POST to " + put_message_url_);
return true;
#endif
}
std::string ExternalApiAction::FormatAlarmTime(uint64_t timestamp_ms) const {
const std::time_t t = static_cast<std::time_t>(timestamp_ms / 1000ULL);
std::tm tm{};
if (!SafeLocalTime(t, tm)) return "";
std::ostringstream oss;
oss << std::put_time(&tm, "%Y-%m-%d %H:%M:%S");
return oss.str();
}
std::string ExternalApiAction::BuildMessageJson(const Job& job) const {
SimpleJson::Object payload;
payload["tenantCode"] = SimpleJson(job.tenant_code);
payload["channelNo"] = SimpleJson(job.channel_no);
payload["alarmContent"] = SimpleJson(job.alarm_content);
payload["alarmTime"] = SimpleJson(FormatAlarmTime(job.timestamp_ms));
SimpleJson::Array pic;
if (!job.pic_url.empty()) {
SimpleJson::Object o;
o["url"] = SimpleJson(job.pic_url);
pic.emplace_back(SimpleJson(std::move(o)));
}
payload["picInfo"] = SimpleJson(std::move(pic));
SimpleJson::Array video;
if (!job.video_url.empty()) {
SimpleJson::Object o;
o["url"] = SimpleJson(job.video_url);
video.emplace_back(SimpleJson(std::move(o)));
}
payload["videoInfo"] = SimpleJson(std::move(video));
return StringifySimpleJson(SimpleJson(std::move(payload)));
}
} // namespace rk3588