OrangePi3588Media/plugins/alarm/alarm_node.cpp

707 lines
28 KiB
C++

#include <atomic>
#include <chrono>
#include <cctype>
#include <cstdint>
#include <cstring>
#include <algorithm>
#include <deque>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#include "utils/logger.h"
#include "face/face_result.h"
#include "node.h"
#include "rule_engine.h"
#include "packet_ring_buffer.h"
#include "actions/action_base.h"
#include "actions/log_action.h"
#include "actions/http_action.h"
#include "actions/external_api_action.h"
#include "actions/snapshot_action.h"
#include "actions/clip_action.h"
namespace rk3588 {
namespace {
uint64_t NowEpochMs() {
using namespace std::chrono;
return static_cast<uint64_t>(duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count());
}
class RateLimitedAction final : public IAlarmAction {
public:
RateLimitedAction(std::unique_ptr<IAlarmAction> inner, uint64_t min_interval_ms)
: inner_(std::move(inner)), min_interval_ms_(min_interval_ms) {}
bool Init(const SimpleJson& config) override {
// Inner is expected to be initialized already; keep compatibility if called.
return inner_ ? inner_->Init(config) : false;
}
void Execute(AlarmEvent& event, std::shared_ptr<Frame> frame) override {
if (!inner_) return;
if (min_interval_ms_ == 0) {
inner_->Execute(event, std::move(frame));
return;
}
const uint64_t now = NowEpochMs();
uint64_t last = last_exec_ms_.load(std::memory_order_relaxed);
while (true) {
if (last != 0 && now > last && (now - last) < min_interval_ms_) {
return;
}
// Best-effort: only one thread should win the update.
if (last_exec_ms_.compare_exchange_weak(last, now, std::memory_order_relaxed)) {
break;
}
// last is updated by compare_exchange_weak; loop and re-check.
}
inner_->Execute(event, std::move(frame));
}
void Drain() override {
if (inner_) inner_->Drain();
}
void Stop() override {
if (inner_) inner_->Stop();
}
std::string Name() const override {
return inner_ ? inner_->Name() : std::string("unknown");
}
private:
std::unique_ptr<IAlarmAction> inner_;
uint64_t min_interval_ms_ = 0;
std::atomic<uint64_t> last_exec_ms_{0};
};
std::unique_ptr<IAlarmAction> WrapIfRateLimited(std::unique_ptr<IAlarmAction> action, const SimpleJson& cfg) {
if (!action) return nullptr;
const uint64_t min_interval_ms = static_cast<uint64_t>(std::max<int64_t>(
0, static_cast<int64_t>(cfg.ValueOr<int>("min_interval_ms", 0))));
if (min_interval_ms == 0) return action;
return std::make_unique<RateLimitedAction>(std::move(action), min_interval_ms);
}
} // namespace
class AlarmNode : public INode {
private:
struct AlarmJob {
AlarmEvent event;
std::shared_ptr<Frame> frame;
};
static std::shared_ptr<EncodedVideoFrameMeta> TryGetEncodedMeta(const std::shared_ptr<Frame>& frame) {
if (!frame || !frame->user_meta) return nullptr;
auto meta = std::static_pointer_cast<EncodedVideoFrameMeta>(frame->user_meta);
if (!meta || meta->magic != EncodedVideoFrameMeta::kMagic) return nullptr;
if (!meta->codec || meta->pkt.data.empty() || meta->pkt.pts_ms <= 0) return nullptr;
return meta;
}
public:
std::string Id() const override { return id_; }
std::string Type() const override { return "alarm"; }
bool Init(const SimpleJson& config, const NodeContext& ctx) override {
id_ = config.ValueOr<std::string>("id", "alarm");
ParseFaceRules(config);
// Optional evaluation throttle (for alarm side). 0 = disabled.
eval_interval_ms_ = std::max<int64_t>(0, static_cast<int64_t>(config.ValueOr<int>("eval_interval_ms", 0)));
if (eval_interval_ms_ <= 0) {
const double eval_fps = config.ValueOr<double>("eval_fps", 0.0);
if (eval_fps > 0.0) {
eval_interval_ms_ = static_cast<int64_t>(1000.0 / eval_fps);
if (eval_interval_ms_ < 1) eval_interval_ms_ = 1;
}
}
// Parse labels for class name mapping
if (const SimpleJson* labels_cfg = config.Find("labels")) {
for (const auto& label : labels_cfg->AsArray()) {
labels_.push_back(label.AsString(""));
}
}
// Initialize rule engine
if (const SimpleJson* rules_cfg = config.Find("rules")) {
if (!rule_engine_.Init(*rules_cfg, labels_)) {
LogError("[alarm] failed to init rule engine");
return false;
}
}
// Derive packet ring window from clip action config
int pre_sec = 5;
int post_sec = 0;
int fps_hint = 25;
if (const SimpleJson* actions_cfg = config.Find("actions")) {
if (const SimpleJson* clip_cfg = actions_cfg->Find("clip")) {
pre_sec = clip_cfg->ValueOr<int>("pre_sec", 5);
post_sec = clip_cfg->ValueOr<int>("post_sec", 0);
fps_hint = clip_cfg->ValueOr<int>("fps", 25);
}
}
const int window_sec = std::max(1, std::max(0, pre_sec) + std::max(0, post_sec) + 2);
packet_buffer_ = std::make_shared<PacketRingBuffer>(window_sec, fps_hint);
// Alarm worker queue (avoid blocking Process())
size_t alarm_q_size = 32;
QueueDropStrategy alarm_q_strategy = QueueDropStrategy::DropOldest;
if (const SimpleJson* q = config.Find("event_queue")) {
if (q->IsObject()) {
alarm_q_size = static_cast<size_t>(q->ValueOr<int>("size", static_cast<int>(alarm_q_size)));
const std::string strat = q->ValueOr<std::string>("strategy", "drop_oldest");
if (strat == "drop_newest") alarm_q_strategy = QueueDropStrategy::DropNewest;
else if (strat == "block") alarm_q_strategy = QueueDropStrategy::Block;
else alarm_q_strategy = QueueDropStrategy::DropOldest;
}
}
alarm_queue_size_ = alarm_q_size;
alarm_queue_strategy_ = alarm_q_strategy;
alarm_queue_ = std::make_shared<SpscQueue<AlarmJob>>(alarm_queue_size_, alarm_queue_strategy_);
// Initialize actions
if (const SimpleJson* actions_cfg = config.Find("actions")) {
// Log action
if (const SimpleJson* log_cfg = actions_cfg->Find("log")) {
if (log_cfg->ValueOr<bool>("enable", true)) {
auto action = std::make_unique<LogAction>();
if (action->Init(*log_cfg)) {
actions_.push_back(WrapIfRateLimited(std::move(action), *log_cfg));
}
}
}
// Snapshot action (must be before HTTP to fill snapshot_url)
if (const SimpleJson* snap_cfg = actions_cfg->Find("snapshot")) {
if (snap_cfg->ValueOr<bool>("enable", false)) {
auto action = std::make_unique<SnapshotAction>();
if (action->Init(*snap_cfg)) {
actions_.push_back(WrapIfRateLimited(std::move(action), *snap_cfg));
}
}
}
// Clip action (must be before HTTP to fill clip_url)
if (const SimpleJson* clip_cfg = actions_cfg->Find("clip")) {
if (clip_cfg->ValueOr<bool>("enable", false)) {
auto action = std::make_unique<ClipAction>();
if (action->Init(*clip_cfg)) {
auto* clip_ptr = static_cast<ClipAction*>(action.get());
clip_ptr->SetPacketBuffer(packet_buffer_);
actions_.push_back(WrapIfRateLimited(std::move(action), *clip_cfg));
}
}
}
// HTTP action (should be after snapshot/clip to include media URLs)
if (const SimpleJson* http_cfg = actions_cfg->Find("http")) {
if (http_cfg->ValueOr<bool>("enable", false)) {
auto action = std::make_unique<HttpAction>();
if (action->Init(*http_cfg)) {
actions_.push_back(WrapIfRateLimited(std::move(action), *http_cfg));
}
}
}
// External API action (token + message). Should be after snapshot/clip to include media URLs.
if (const SimpleJson* ext_cfg = actions_cfg->Find("external_api")) {
if (ext_cfg->ValueOr<bool>("enable", false)) {
auto action = std::make_unique<ExternalApiAction>();
if (action->Init(*ext_cfg)) {
actions_.push_back(WrapIfRateLimited(std::move(action), *ext_cfg));
}
}
}
} else {
// Default: just log action
auto action = std::make_unique<LogAction>();
SimpleJson empty_cfg;
action->Init(empty_cfg);
actions_.push_back(std::move(action));
}
input_queue_ = ctx.input_queue;
if (!input_queue_) {
LogError("[alarm] no input queue for node " + id_);
return false;
}
LogInfo("[alarm] initialized with " + std::to_string(actions_.size()) + " actions");
return true;
}
bool Start() override {
worker_running_.store(true);
worker_ = std::thread(&AlarmNode::WorkerLoop, this);
LogInfo("[alarm] started");
return true;
}
void Stop() override {
worker_running_.store(false);
if (alarm_queue_) alarm_queue_->Stop();
if (worker_.joinable()) worker_.join();
// Ensure all actions fully stop (Drain only clears pending work; Stop must reclaim threads/resources).
for (auto& action : actions_) action->Drain();
for (auto& action : actions_) action->Stop();
LogInfo("[alarm] stopped, processed " + std::to_string(processed_frames_) +
" frames, triggered " + std::to_string(alarm_count_) + " alarms");
}
void Drain() override {
// First drain alarm jobs (so snapshot/clip are finished before draining HTTP queue).
while (alarm_queue_ && (alarm_queue_->Size() > 0 || in_flight_.load() > 0)) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
for (auto& action : actions_) action->Drain();
}
bool UpdateConfig(const SimpleJson& new_config) override {
const std::string new_id = new_config.ValueOr<std::string>("id", id_);
if (!new_id.empty() && new_id != id_) {
return false;
}
// Parse labels
std::vector<std::string> new_labels;
if (const SimpleJson* labels_cfg = new_config.Find("labels")) {
for (const auto& label : labels_cfg->AsArray()) {
new_labels.push_back(label.AsString(""));
}
}
// Build new rule engine
RuleEngine new_engine;
if (const SimpleJson* rules_cfg = new_config.Find("rules")) {
if (!new_engine.Init(*rules_cfg, new_labels)) {
return false;
}
}
// Parse face rules (independent from detection rules)
std::vector<FaceRule> new_face_rules = ParseFaceRulesFromConfig(new_config);
// Packet ring window settings
int pre_sec = 5;
int post_sec = 0;
int fps_hint = 25;
if (const SimpleJson* actions_cfg = new_config.Find("actions")) {
if (const SimpleJson* clip_cfg = actions_cfg->Find("clip")) {
pre_sec = clip_cfg->ValueOr<int>("pre_sec", 5);
post_sec = clip_cfg->ValueOr<int>("post_sec", 0);
fps_hint = clip_cfg->ValueOr<int>("fps", 25);
}
}
const int window_sec = std::max(1, std::max(0, pre_sec) + std::max(0, post_sec) + 2);
auto new_packet_buffer = std::make_shared<PacketRingBuffer>(window_sec, fps_hint);
// Initialize new actions
std::vector<std::unique_ptr<IAlarmAction>> new_actions;
if (const SimpleJson* actions_cfg = new_config.Find("actions")) {
if (const SimpleJson* log_cfg = actions_cfg->Find("log")) {
if (log_cfg->ValueOr<bool>("enable", true)) {
auto action = std::make_unique<LogAction>();
if (action->Init(*log_cfg)) {
new_actions.push_back(std::move(action));
}
}
}
if (const SimpleJson* snap_cfg = actions_cfg->Find("snapshot")) {
if (snap_cfg->ValueOr<bool>("enable", false)) {
auto action = std::make_unique<SnapshotAction>();
if (action->Init(*snap_cfg)) {
new_actions.push_back(std::move(action));
}
}
}
if (const SimpleJson* clip_cfg = actions_cfg->Find("clip")) {
if (clip_cfg->ValueOr<bool>("enable", false)) {
auto action = std::make_unique<ClipAction>();
if (action->Init(*clip_cfg)) {
auto* clip_ptr = static_cast<ClipAction*>(action.get());
clip_ptr->SetPacketBuffer(new_packet_buffer);
new_actions.push_back(std::move(action));
}
}
}
if (const SimpleJson* http_cfg = actions_cfg->Find("http")) {
if (http_cfg->ValueOr<bool>("enable", false)) {
auto action = std::make_unique<HttpAction>();
if (action->Init(*http_cfg)) {
new_actions.push_back(std::move(action));
}
}
}
if (const SimpleJson* ext_cfg = actions_cfg->Find("external_api")) {
if (ext_cfg->ValueOr<bool>("enable", false)) {
auto action = std::make_unique<ExternalApiAction>();
if (action->Init(*ext_cfg)) {
new_actions.push_back(std::move(action));
}
}
}
}
if (new_actions.empty()) {
auto action = std::make_unique<LogAction>();
SimpleJson empty_cfg;
if (action->Init(empty_cfg)) {
new_actions.push_back(std::move(action));
}
}
// Stop worker thread to avoid executing actions while swapping.
std::shared_ptr<SpscQueue<AlarmJob>> old_queue;
std::thread old_worker;
{
std::lock_guard<std::mutex> lock(mu_);
worker_running_.store(false);
old_queue = alarm_queue_;
old_worker = std::move(worker_);
}
if (old_queue) old_queue->Stop();
if (old_worker.joinable()) old_worker.join();
std::vector<std::unique_ptr<IAlarmAction>> old_actions;
{
std::lock_guard<std::mutex> lock(mu_);
labels_ = std::move(new_labels);
rule_engine_ = std::move(new_engine);
face_rules_ = std::move(new_face_rules);
face_last_trigger_.clear();
packet_buffer_ = std::move(new_packet_buffer);
old_actions = std::move(actions_);
actions_ = std::move(new_actions);
in_flight_.store(0);
last_eval_pts_ms_ = 0;
alarm_queue_ = std::make_shared<SpscQueue<AlarmJob>>(alarm_queue_size_, alarm_queue_strategy_);
worker_running_.store(true);
worker_ = std::thread(&AlarmNode::WorkerLoop, this);
}
for (auto& action : old_actions) action->Drain();
for (auto& action : old_actions) action->Stop();
return true;
}
bool GetCustomMetrics(SimpleJson& out) const override {
std::lock_guard<std::mutex> lock(mu_);
SimpleJson::Object o;
o["alarm_total"] = SimpleJson(static_cast<double>(alarm_count_));
o["processed"] = SimpleJson(static_cast<double>(processed_frames_));
out = SimpleJson(std::move(o));
return true;
}
NodeStatus Process(FramePtr frame) override {
if (!frame) return NodeStatus::DROP;
if (packet_buffer_) {
if (auto meta = TryGetEncodedMeta(frame)) {
packet_buffer_->Push(std::move(meta));
}
}
{
std::lock_guard<std::mutex> lock(mu_);
// 调试:检查 frame 和 det
if (frame->det) {
int no_boots_count = 0;
for (const auto& d : frame->det->items) {
if (d.cls_id == 10) no_boots_count++;
}
// Log throttled
}
if (eval_interval_ms_ > 0 && frame->pts > 0) {
const int64_t pts_ms = static_cast<int64_t>(frame->pts / 1000ULL);
if (last_eval_pts_ms_ > 0 && (pts_ms - last_eval_pts_ms_) < eval_interval_ms_) {
++processed_frames_;
return NodeStatus::OK;
}
last_eval_pts_ms_ = pts_ms;
}
auto result = rule_engine_.Evaluate(frame);
if (result.matched) {
LogInfo("[alarm] RULE MATCHED: " + result.rule_name);
TriggerAlarm(result, frame);
}
EvaluateFaceRulesLocked(frame);
++processed_frames_;
}
return NodeStatus::OK;
}
private:
struct FaceRule {
enum class Kind { Unknown, Person };
std::string name;
Kind kind = Kind::Unknown;
std::vector<std::string> persons; // used when kind==Person; empty => any known person
int cooldown_ms = 5000;
int per_person_cooldown_ms = 0;
int min_hits = 1;
int hit_window_ms = 0;
float min_face_area_ratio = 0.0f;
float min_face_aspect = 0.0f;
float max_face_aspect = 0.0f;
float min_sim = 0.0f;
};
static std::vector<FaceRule> ParseFaceRulesFromConfig(const SimpleJson& config) {
std::vector<FaceRule> rules;
const SimpleJson* fr = config.Find("face_rules");
if (!fr || !fr->IsArray()) return rules;
for (const auto& item : fr->AsArray()) {
if (!item.IsObject()) continue;
FaceRule r;
r.name = item.ValueOr<std::string>("name", "face");
std::string type = item.ValueOr<std::string>("type", "unknown");
for (auto& c : type) c = static_cast<char>(std::tolower(static_cast<unsigned char>(c)));
r.kind = (type == "person") ? FaceRule::Kind::Person : FaceRule::Kind::Unknown;
r.cooldown_ms = std::max(0, item.ValueOr<int>("cooldown_ms", r.cooldown_ms));
r.per_person_cooldown_ms = std::max(0, item.ValueOr<int>("per_person_cooldown_ms", r.per_person_cooldown_ms));
r.min_hits = std::max(1, item.ValueOr<int>("min_hits", r.min_hits));
r.hit_window_ms = std::max(0, item.ValueOr<int>("hit_window_ms", r.hit_window_ms));
r.min_face_area_ratio = item.ValueOr<double>("min_face_area_ratio", static_cast<double>(r.min_face_area_ratio));
r.min_face_aspect = item.ValueOr<double>("min_face_aspect", static_cast<double>(r.min_face_aspect));
r.max_face_aspect = item.ValueOr<double>("max_face_aspect", static_cast<double>(r.max_face_aspect));
r.min_sim = item.ValueOr<double>("min_sim", static_cast<double>(r.min_sim));
if (const SimpleJson* persons = item.Find("persons"); persons && persons->IsArray()) {
for (const auto& p : persons->AsArray()) {
const std::string s = p.AsString("");
if (!s.empty()) r.persons.push_back(s);
}
}
rules.push_back(std::move(r));
}
return rules;
}
void ParseFaceRules(const SimpleJson& config) {
face_rules_ = ParseFaceRulesFromConfig(config);
face_last_trigger_.clear();
face_person_last_trigger_.clear();
face_vote_history_.clear();
}
void EvaluateFaceRulesLocked(const FramePtr& frame) {
if (face_rules_.empty()) return;
if (!frame || !frame->face_recog || frame->face_recog->items.empty()) return;
const auto now = std::chrono::steady_clock::now();
const int img_w = frame->face_recog->img_w > 0 ? frame->face_recog->img_w : frame->width;
const int img_h = frame->face_recog->img_h > 0 ? frame->face_recog->img_h : frame->height;
const double img_area = (img_w > 0 && img_h > 0) ? static_cast<double>(img_w) * static_cast<double>(img_h) : 0.0;
for (const auto& rule : face_rules_) {
bool matched = false;
std::string matched_name;
std::vector<Detection> dets;
dets.reserve(3);
for (const auto& it : frame->face_recog->items) {
if (rule.min_face_area_ratio > 0.0f && img_area > 0.0) {
const double a = static_cast<double>(it.bbox.w) * static_cast<double>(it.bbox.h);
const double r = a / img_area;
if (r < static_cast<double>(rule.min_face_area_ratio)) continue;
}
if (rule.min_face_aspect > 0.0f || rule.max_face_aspect > 0.0f) {
const double h = std::max(1e-6, static_cast<double>(it.bbox.h));
const double aspect = static_cast<double>(it.bbox.w) / h;
if (rule.min_face_aspect > 0.0f && aspect < static_cast<double>(rule.min_face_aspect)) continue;
if (rule.max_face_aspect > 0.0f && aspect > static_cast<double>(rule.max_face_aspect)) continue;
}
if (rule.kind == FaceRule::Kind::Unknown) {
if (!it.unknown) continue;
if (it.best_sim < rule.min_sim) continue;
} else {
if (it.unknown) continue;
if (it.best_sim < rule.min_sim) continue;
if (!rule.persons.empty()) {
bool ok = false;
for (const auto& p : rule.persons) {
if (p == it.best_name) {
ok = true;
break;
}
}
if (!ok) continue;
}
}
const std::string key = BuildFaceVoteKey(rule, it);
if (rule.per_person_cooldown_ms > 0) {
auto it_last = face_person_last_trigger_.find(key);
if (it_last != face_person_last_trigger_.end()) {
const auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(now - it_last->second).count();
if (elapsed < rule.per_person_cooldown_ms) {
continue;
}
}
}
if (!CheckFaceVote(rule, key, now)) {
continue;
}
matched = true;
if (rule.kind == FaceRule::Kind::Person) matched_name = it.best_name;
Detection d;
d.cls_id = -1;
d.score = it.best_sim;
d.bbox = it.bbox;
d.track_id = it.best_person_id;
dets.push_back(std::move(d));
if (dets.size() >= 3) break;
}
if (!matched) continue;
if (rule.per_person_cooldown_ms > 0) {
for (const auto& d : dets) {
const std::string key = BuildFaceVoteKey(rule, d.track_id, matched_name);
face_person_last_trigger_[key] = now;
}
} else {
const auto it_last = face_last_trigger_.find(rule.name);
if (it_last != face_last_trigger_.end() && rule.cooldown_ms > 0) {
const auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(now - it_last->second).count();
if (elapsed < rule.cooldown_ms) {
continue;
}
}
face_last_trigger_[rule.name] = now;
}
RuleMatchResult fake;
fake.matched = true;
fake.rule_name = rule.name;
if (rule.kind == FaceRule::Kind::Person && !matched_name.empty()) {
fake.rule_name = rule.name + ":" + matched_name;
}
fake.matched_detections = std::move(dets);
TriggerAlarm(fake, frame);
}
}
static std::string BuildFaceVoteKey(const FaceRule& rule, const FaceRecogItem& item) {
if (rule.kind == FaceRule::Kind::Person) {
if (item.best_person_id >= 0) {
return rule.name + "#" + std::to_string(item.best_person_id);
}
return rule.name + "#" + item.best_name;
}
return rule.name + "#unknown";
}
static std::string BuildFaceVoteKey(const FaceRule& rule, int person_id, const std::string& name) {
if (rule.kind == FaceRule::Kind::Person) {
if (person_id >= 0) return rule.name + "#" + std::to_string(person_id);
return rule.name + "#" + name;
}
return rule.name + "#unknown";
}
bool CheckFaceVote(const FaceRule& rule, const std::string& key, const std::chrono::steady_clock::time_point& now) {
if (rule.min_hits <= 1 || rule.hit_window_ms <= 0) return true;
auto& dq = face_vote_history_[key];
const auto window = std::chrono::milliseconds(rule.hit_window_ms);
while (!dq.empty() && (now - dq.front()) > window) {
dq.pop_front();
}
dq.push_back(now);
return static_cast<int>(dq.size()) >= rule.min_hits;
}
void WorkerLoop() {
while (worker_running_.load()) {
AlarmJob job;
if (!alarm_queue_) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
continue;
}
if (!alarm_queue_->Pop(job, std::chrono::milliseconds(100))) {
continue;
}
in_flight_.fetch_add(1);
for (auto& action : actions_) {
action->Execute(job.event, job.frame);
}
in_flight_.fetch_sub(1);
}
}
void TriggerAlarm(const RuleMatchResult& result, FramePtr frame) {
++alarm_count_;
AlarmEvent event;
event.node_id = id_;
event.rule_name = result.rule_name;
event.timestamp_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
event.frame_id = frame->frame_id;
event.detections = result.matched_detections;
AlarmJob job;
job.event = std::move(event);
job.frame = std::move(frame);
if (alarm_queue_) {
alarm_queue_->Push(std::move(job));
}
}
std::string id_;
std::vector<std::string> labels_;
RuleEngine rule_engine_;
std::vector<FaceRule> face_rules_;
std::map<std::string, std::chrono::steady_clock::time_point> face_last_trigger_;
std::map<std::string, std::chrono::steady_clock::time_point> face_person_last_trigger_;
std::map<std::string, std::deque<std::chrono::steady_clock::time_point>> face_vote_history_;
std::shared_ptr<PacketRingBuffer> packet_buffer_;
std::vector<std::unique_ptr<IAlarmAction>> actions_;
mutable std::mutex mu_;
std::shared_ptr<SpscQueue<AlarmJob>> alarm_queue_;
size_t alarm_queue_size_ = 32;
QueueDropStrategy alarm_queue_strategy_ = QueueDropStrategy::DropOldest;
std::atomic<bool> worker_running_{false};
std::thread worker_;
std::atomic<size_t> in_flight_{0};
std::shared_ptr<SpscQueue<FramePtr>> input_queue_;
uint64_t processed_frames_ = 0;
uint64_t alarm_count_ = 0;
int64_t eval_interval_ms_ = 0;
int64_t last_eval_pts_ms_ = 0;
};
REGISTER_NODE(AlarmNode, "alarm");
} // namespace rk3588