#include #include #include #include #include #include #include #include #include #include #include #include #include #include "face/face_result.h" #include "node.h" #include "rule_engine.h" #include "packet_ring_buffer.h" #include "actions/action_base.h" #include "actions/log_action.h" #include "actions/http_action.h" #include "actions/external_api_action.h" #include "actions/snapshot_action.h" #include "actions/clip_action.h" namespace rk3588 { namespace { uint64_t NowEpochMs() { using namespace std::chrono; return static_cast(duration_cast(system_clock::now().time_since_epoch()).count()); } class RateLimitedAction final : public IAlarmAction { public: RateLimitedAction(std::unique_ptr inner, uint64_t min_interval_ms) : inner_(std::move(inner)), min_interval_ms_(min_interval_ms) {} bool Init(const SimpleJson& config) override { // Inner is expected to be initialized already; keep compatibility if called. return inner_ ? inner_->Init(config) : false; } void Execute(AlarmEvent& event, std::shared_ptr frame) override { if (!inner_) return; if (min_interval_ms_ == 0) { inner_->Execute(event, std::move(frame)); return; } const uint64_t now = NowEpochMs(); uint64_t last = last_exec_ms_.load(std::memory_order_relaxed); if (last != 0 && now > last && (now - last) < min_interval_ms_) { return; } // Best-effort: only one thread should update; compare-exchange avoids lost updates. last_exec_ms_.store(now, std::memory_order_relaxed); inner_->Execute(event, std::move(frame)); } void Drain() override { if (inner_) inner_->Drain(); } void Stop() override { if (inner_) inner_->Stop(); } std::string Name() const override { return inner_ ? inner_->Name() : std::string("unknown"); } private: std::unique_ptr inner_; uint64_t min_interval_ms_ = 0; std::atomic last_exec_ms_{0}; }; std::unique_ptr WrapIfRateLimited(std::unique_ptr action, const SimpleJson& cfg) { if (!action) return nullptr; const uint64_t min_interval_ms = static_cast(std::max( 0, static_cast(cfg.ValueOr("min_interval_ms", 0)))); if (min_interval_ms == 0) return action; return std::make_unique(std::move(action), min_interval_ms); } } // namespace class AlarmNode : public INode { private: struct AlarmJob { AlarmEvent event; std::shared_ptr frame; }; static std::shared_ptr TryGetEncodedMeta(const std::shared_ptr& frame) { if (!frame || !frame->user_meta) return nullptr; auto meta = std::static_pointer_cast(frame->user_meta); if (!meta || meta->magic != EncodedVideoFrameMeta::kMagic) return nullptr; if (!meta->codec || meta->pkt.data.empty() || meta->pkt.pts_ms <= 0) return nullptr; return meta; } public: std::string Id() const override { return id_; } std::string Type() const override { return "alarm"; } bool Init(const SimpleJson& config, const NodeContext& ctx) override { id_ = config.ValueOr("id", "alarm"); ParseFaceRules(config); // Optional evaluation throttle (for alarm side). 0 = disabled. eval_interval_ms_ = std::max(0, static_cast(config.ValueOr("eval_interval_ms", 0))); if (eval_interval_ms_ <= 0) { const double eval_fps = config.ValueOr("eval_fps", 0.0); if (eval_fps > 0.0) { eval_interval_ms_ = static_cast(1000.0 / eval_fps); if (eval_interval_ms_ < 1) eval_interval_ms_ = 1; } } // Parse labels for class name mapping if (const SimpleJson* labels_cfg = config.Find("labels")) { for (const auto& label : labels_cfg->AsArray()) { labels_.push_back(label.AsString("")); } } // Initialize rule engine if (const SimpleJson* rules_cfg = config.Find("rules")) { if (!rule_engine_.Init(*rules_cfg, labels_)) { std::cerr << "[alarm] failed to init rule engine\n"; return false; } } // Derive packet ring window from clip action config int pre_sec = 5; int post_sec = 0; int fps_hint = 25; if (const SimpleJson* actions_cfg = config.Find("actions")) { if (const SimpleJson* clip_cfg = actions_cfg->Find("clip")) { pre_sec = clip_cfg->ValueOr("pre_sec", 5); post_sec = clip_cfg->ValueOr("post_sec", 0); fps_hint = clip_cfg->ValueOr("fps", 25); } } const int window_sec = std::max(1, std::max(0, pre_sec) + std::max(0, post_sec) + 2); packet_buffer_ = std::make_shared(window_sec, fps_hint); // Alarm worker queue (avoid blocking Process()) size_t alarm_q_size = 32; QueueDropStrategy alarm_q_strategy = QueueDropStrategy::DropOldest; if (const SimpleJson* q = config.Find("event_queue")) { if (q->IsObject()) { alarm_q_size = static_cast(q->ValueOr("size", static_cast(alarm_q_size))); const std::string strat = q->ValueOr("strategy", "drop_oldest"); if (strat == "drop_newest") alarm_q_strategy = QueueDropStrategy::DropNewest; else if (strat == "block") alarm_q_strategy = QueueDropStrategy::Block; else alarm_q_strategy = QueueDropStrategy::DropOldest; } } alarm_queue_size_ = alarm_q_size; alarm_queue_strategy_ = alarm_q_strategy; alarm_queue_ = std::make_shared>(alarm_queue_size_, alarm_queue_strategy_); // Initialize actions if (const SimpleJson* actions_cfg = config.Find("actions")) { // Log action if (const SimpleJson* log_cfg = actions_cfg->Find("log")) { if (log_cfg->ValueOr("enable", true)) { auto action = std::make_unique(); if (action->Init(*log_cfg)) { actions_.push_back(WrapIfRateLimited(std::move(action), *log_cfg)); } } } // Snapshot action (must be before HTTP to fill snapshot_url) if (const SimpleJson* snap_cfg = actions_cfg->Find("snapshot")) { if (snap_cfg->ValueOr("enable", false)) { auto action = std::make_unique(); if (action->Init(*snap_cfg)) { actions_.push_back(WrapIfRateLimited(std::move(action), *snap_cfg)); } } } // Clip action (must be before HTTP to fill clip_url) if (const SimpleJson* clip_cfg = actions_cfg->Find("clip")) { if (clip_cfg->ValueOr("enable", false)) { auto action = std::make_unique(); if (action->Init(*clip_cfg)) { auto* clip_ptr = static_cast(action.get()); clip_ptr->SetPacketBuffer(packet_buffer_); actions_.push_back(WrapIfRateLimited(std::move(action), *clip_cfg)); } } } // HTTP action (should be after snapshot/clip to include media URLs) if (const SimpleJson* http_cfg = actions_cfg->Find("http")) { if (http_cfg->ValueOr("enable", false)) { auto action = std::make_unique(); if (action->Init(*http_cfg)) { actions_.push_back(WrapIfRateLimited(std::move(action), *http_cfg)); } } } // External API action (token + message). Should be after snapshot/clip to include media URLs. if (const SimpleJson* ext_cfg = actions_cfg->Find("external_api")) { if (ext_cfg->ValueOr("enable", false)) { auto action = std::make_unique(); if (action->Init(*ext_cfg)) { actions_.push_back(WrapIfRateLimited(std::move(action), *ext_cfg)); } } } } else { // Default: just log action auto action = std::make_unique(); SimpleJson empty_cfg; action->Init(empty_cfg); actions_.push_back(std::move(action)); } input_queue_ = ctx.input_queue; if (!input_queue_) { std::cerr << "[alarm] no input queue for node " << id_ << "\n"; return false; } std::cout << "[alarm] initialized with " << actions_.size() << " actions\n"; return true; } bool Start() override { worker_running_.store(true); worker_ = std::thread(&AlarmNode::WorkerLoop, this); std::cout << "[alarm] started\n"; return true; } void Stop() override { worker_running_.store(false); if (alarm_queue_) alarm_queue_->Stop(); if (worker_.joinable()) worker_.join(); // Ensure all actions fully stop (Drain only clears pending work; Stop must reclaim threads/resources). for (auto& action : actions_) action->Drain(); for (auto& action : actions_) action->Stop(); std::cout << "[alarm] stopped, processed " << processed_frames_ << " frames, triggered " << alarm_count_ << " alarms\n"; } void Drain() override { // First drain alarm jobs (so snapshot/clip are finished before draining HTTP queue). while (alarm_queue_ && (alarm_queue_->Size() > 0 || in_flight_.load() > 0)) { std::this_thread::sleep_for(std::chrono::milliseconds(50)); } for (auto& action : actions_) action->Drain(); } bool UpdateConfig(const SimpleJson& new_config) override { const std::string new_id = new_config.ValueOr("id", id_); if (!new_id.empty() && new_id != id_) { return false; } // Parse labels std::vector new_labels; if (const SimpleJson* labels_cfg = new_config.Find("labels")) { for (const auto& label : labels_cfg->AsArray()) { new_labels.push_back(label.AsString("")); } } // Build new rule engine RuleEngine new_engine; if (const SimpleJson* rules_cfg = new_config.Find("rules")) { if (!new_engine.Init(*rules_cfg, new_labels)) { return false; } } // Parse face rules (independent from detection rules) std::vector new_face_rules = ParseFaceRulesFromConfig(new_config); // Packet ring window settings int pre_sec = 5; int post_sec = 0; int fps_hint = 25; if (const SimpleJson* actions_cfg = new_config.Find("actions")) { if (const SimpleJson* clip_cfg = actions_cfg->Find("clip")) { pre_sec = clip_cfg->ValueOr("pre_sec", 5); post_sec = clip_cfg->ValueOr("post_sec", 0); fps_hint = clip_cfg->ValueOr("fps", 25); } } const int window_sec = std::max(1, std::max(0, pre_sec) + std::max(0, post_sec) + 2); auto new_packet_buffer = std::make_shared(window_sec, fps_hint); // Initialize new actions std::vector> new_actions; if (const SimpleJson* actions_cfg = new_config.Find("actions")) { if (const SimpleJson* log_cfg = actions_cfg->Find("log")) { if (log_cfg->ValueOr("enable", true)) { auto action = std::make_unique(); if (action->Init(*log_cfg)) { new_actions.push_back(std::move(action)); } } } if (const SimpleJson* snap_cfg = actions_cfg->Find("snapshot")) { if (snap_cfg->ValueOr("enable", false)) { auto action = std::make_unique(); if (action->Init(*snap_cfg)) { new_actions.push_back(std::move(action)); } } } if (const SimpleJson* clip_cfg = actions_cfg->Find("clip")) { if (clip_cfg->ValueOr("enable", false)) { auto action = std::make_unique(); if (action->Init(*clip_cfg)) { auto* clip_ptr = static_cast(action.get()); clip_ptr->SetPacketBuffer(new_packet_buffer); new_actions.push_back(std::move(action)); } } } if (const SimpleJson* http_cfg = actions_cfg->Find("http")) { if (http_cfg->ValueOr("enable", false)) { auto action = std::make_unique(); if (action->Init(*http_cfg)) { new_actions.push_back(std::move(action)); } } } if (const SimpleJson* ext_cfg = actions_cfg->Find("external_api")) { if (ext_cfg->ValueOr("enable", false)) { auto action = std::make_unique(); if (action->Init(*ext_cfg)) { new_actions.push_back(std::move(action)); } } } } if (new_actions.empty()) { auto action = std::make_unique(); SimpleJson empty_cfg; if (action->Init(empty_cfg)) { new_actions.push_back(std::move(action)); } } // Stop worker thread to avoid executing actions while swapping. std::shared_ptr> old_queue; std::thread old_worker; { std::lock_guard lock(mu_); worker_running_.store(false); old_queue = alarm_queue_; old_worker = std::move(worker_); } if (old_queue) old_queue->Stop(); if (old_worker.joinable()) old_worker.join(); std::vector> old_actions; { std::lock_guard lock(mu_); labels_ = std::move(new_labels); rule_engine_ = std::move(new_engine); face_rules_ = std::move(new_face_rules); face_last_trigger_.clear(); packet_buffer_ = std::move(new_packet_buffer); old_actions = std::move(actions_); actions_ = std::move(new_actions); in_flight_.store(0); last_eval_pts_ms_ = 0; alarm_queue_ = std::make_shared>(alarm_queue_size_, alarm_queue_strategy_); worker_running_.store(true); worker_ = std::thread(&AlarmNode::WorkerLoop, this); } for (auto& action : old_actions) action->Drain(); for (auto& action : old_actions) action->Stop(); return true; } bool GetCustomMetrics(SimpleJson& out) const override { std::lock_guard lock(mu_); SimpleJson::Object o; o["alarm_total"] = SimpleJson(static_cast(alarm_count_)); o["processed"] = SimpleJson(static_cast(processed_frames_)); out = SimpleJson(std::move(o)); return true; } NodeStatus Process(FramePtr frame) override { if (!frame) return NodeStatus::DROP; if (packet_buffer_) { if (auto meta = TryGetEncodedMeta(frame)) { packet_buffer_->Push(std::move(meta)); } } { std::lock_guard lock(mu_); if (eval_interval_ms_ > 0 && frame->pts > 0) { const int64_t pts_ms = static_cast(frame->pts / 1000ULL); if (last_eval_pts_ms_ > 0 && (pts_ms - last_eval_pts_ms_) < eval_interval_ms_) { ++processed_frames_; return NodeStatus::OK; } last_eval_pts_ms_ = pts_ms; } auto result = rule_engine_.Evaluate(frame); if (result.matched) TriggerAlarm(result, frame); EvaluateFaceRulesLocked(frame); ++processed_frames_; } return NodeStatus::OK; } private: struct FaceRule { enum class Kind { Unknown, Person }; std::string name; Kind kind = Kind::Unknown; std::vector persons; // used when kind==Person; empty => any known person int cooldown_ms = 5000; float min_sim = 0.0f; }; static std::vector ParseFaceRulesFromConfig(const SimpleJson& config) { std::vector rules; const SimpleJson* fr = config.Find("face_rules"); if (!fr || !fr->IsArray()) return rules; for (const auto& item : fr->AsArray()) { if (!item.IsObject()) continue; FaceRule r; r.name = item.ValueOr("name", "face"); std::string type = item.ValueOr("type", "unknown"); for (auto& c : type) c = static_cast(std::tolower(static_cast(c))); r.kind = (type == "person") ? FaceRule::Kind::Person : FaceRule::Kind::Unknown; r.cooldown_ms = std::max(0, item.ValueOr("cooldown_ms", r.cooldown_ms)); r.min_sim = item.ValueOr("min_sim", static_cast(r.min_sim)); if (const SimpleJson* persons = item.Find("persons"); persons && persons->IsArray()) { for (const auto& p : persons->AsArray()) { const std::string s = p.AsString(""); if (!s.empty()) r.persons.push_back(s); } } rules.push_back(std::move(r)); } return rules; } void ParseFaceRules(const SimpleJson& config) { face_rules_ = ParseFaceRulesFromConfig(config); face_last_trigger_.clear(); } void EvaluateFaceRulesLocked(const FramePtr& frame) { if (face_rules_.empty()) return; if (!frame || !frame->face_recog || frame->face_recog->items.empty()) return; const auto now = std::chrono::steady_clock::now(); for (const auto& rule : face_rules_) { bool matched = false; std::string matched_name; std::vector dets; dets.reserve(3); for (const auto& it : frame->face_recog->items) { if (rule.kind == FaceRule::Kind::Unknown) { if (!it.unknown) continue; if (it.best_sim < rule.min_sim) continue; matched = true; } else { if (it.unknown) continue; if (it.best_sim < rule.min_sim) continue; if (!rule.persons.empty()) { bool ok = false; for (const auto& p : rule.persons) { if (p == it.best_name) { ok = true; break; } } if (!ok) continue; } matched = true; matched_name = it.best_name; } Detection d; d.cls_id = -1; d.score = it.best_sim; d.bbox = it.bbox; d.track_id = it.best_person_id; dets.push_back(std::move(d)); if (dets.size() >= 3) break; } if (!matched) continue; const auto it_last = face_last_trigger_.find(rule.name); if (it_last != face_last_trigger_.end() && rule.cooldown_ms > 0) { const auto elapsed = std::chrono::duration_cast(now - it_last->second).count(); if (elapsed < rule.cooldown_ms) { continue; } } face_last_trigger_[rule.name] = now; RuleMatchResult fake; fake.matched = true; fake.rule_name = rule.name; if (rule.kind == FaceRule::Kind::Person && !matched_name.empty()) { fake.rule_name = rule.name + ":" + matched_name; } fake.matched_detections = std::move(dets); TriggerAlarm(fake, frame); } } void WorkerLoop() { while (worker_running_.load()) { AlarmJob job; if (!alarm_queue_) { std::this_thread::sleep_for(std::chrono::milliseconds(50)); continue; } if (!alarm_queue_->Pop(job, std::chrono::milliseconds(100))) { continue; } in_flight_.fetch_add(1); for (auto& action : actions_) { action->Execute(job.event, job.frame); } in_flight_.fetch_sub(1); } } void TriggerAlarm(const RuleMatchResult& result, FramePtr frame) { ++alarm_count_; AlarmEvent event; event.node_id = id_; event.rule_name = result.rule_name; event.timestamp_ms = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()).count(); event.frame_id = frame->frame_id; event.detections = result.matched_detections; AlarmJob job; job.event = std::move(event); job.frame = std::move(frame); if (alarm_queue_) { alarm_queue_->Push(std::move(job)); } } std::string id_; std::vector labels_; RuleEngine rule_engine_; std::vector face_rules_; std::map face_last_trigger_; std::shared_ptr packet_buffer_; std::vector> actions_; mutable std::mutex mu_; std::shared_ptr> alarm_queue_; size_t alarm_queue_size_ = 32; QueueDropStrategy alarm_queue_strategy_ = QueueDropStrategy::DropOldest; std::atomic worker_running_{false}; std::thread worker_; std::atomic in_flight_{0}; std::shared_ptr> input_queue_; uint64_t processed_frames_ = 0; uint64_t alarm_count_ = 0; int64_t eval_interval_ms_ = 0; int64_t last_eval_pts_ms_ = 0; }; REGISTER_NODE(AlarmNode, "alarm"); } // namespace rk3588