#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "utils/logger.h" #include "face/face_result.h" #include "node.h" #include "rule_engine.h" #include "packet_ring_buffer.h" #include "actions/action_base.h" #include "actions/log_action.h" #include "actions/http_action.h" #include "actions/external_api_action.h" #include "actions/snapshot_action.h" #include "actions/clip_action.h" namespace rk3588 { namespace { struct FaceTrackAggregationConfig { int known_min_hits = 3; int known_hit_window_ms = 3000; int known_reentry_cooldown_ms = 0; int unknown_min_track_age_ms = 2000; int unknown_min_quality_hits = 4; int state_expire_ms = 5000; }; struct FaceTrackState { int track_id = -1; uint64_t first_seen_ms = 0; uint64_t last_seen_ms = 0; int best_known_person_id = -1; std::string best_known_name; int quality_hits = 0; bool reported_known = false; bool reported_unknown = false; uint64_t known_reentry_suppressed_until_ms = 0; std::deque known_hit_times; }; struct FaceTrackDecision { bool trigger_known = false; bool trigger_unknown = false; }; void PruneKnownHitTimes( FaceTrackState& state, const FaceTrackAggregationConfig& cfg, uint64_t now_ms) { const uint64_t known_window_ms = static_cast(std::max(0, cfg.known_hit_window_ms)); if (known_window_ms == 0) return; while (!state.known_hit_times.empty() && now_ms > state.known_hit_times.front() && (now_ms - state.known_hit_times.front()) > known_window_ms) { state.known_hit_times.pop_front(); } } bool IsKnownLeaningTrack( const FaceTrackState& state, const FaceTrackAggregationConfig& cfg) { return !state.reported_known && !state.known_hit_times.empty() && static_cast(state.known_hit_times.size()) < std::max(1, cfg.known_min_hits) && (state.best_known_person_id >= 0 || !state.best_known_name.empty()); } void ResetFaceTrackState(FaceTrackState& state, int track_id, uint64_t now_ms) { state = FaceTrackState{}; state.track_id = track_id; state.first_seen_ms = now_ms; state.last_seen_ms = now_ms; } FaceTrackAggregationConfig ParseFaceTrackAggregationConfig(const SimpleJson& config) { FaceTrackAggregationConfig cfg; const SimpleJson* agg = config.Find("face_track_aggregation"); if (!agg || !agg->IsObject()) return cfg; cfg.known_min_hits = std::max(1, agg->ValueOr("known_min_hits", cfg.known_min_hits)); cfg.known_hit_window_ms = std::max(0, agg->ValueOr("known_hit_window_ms", cfg.known_hit_window_ms)); cfg.known_reentry_cooldown_ms = std::max(0, agg->ValueOr("known_reentry_cooldown_ms", cfg.known_reentry_cooldown_ms)); cfg.unknown_min_track_age_ms = std::max(0, agg->ValueOr("unknown_min_track_age_ms", cfg.unknown_min_track_age_ms)); cfg.unknown_min_quality_hits = std::max(1, agg->ValueOr("unknown_min_quality_hits", cfg.unknown_min_quality_hits)); cfg.state_expire_ms = std::max(0, agg->ValueOr("state_expire_ms", cfg.state_expire_ms)); if (const SimpleJson* known = agg->Find("known"); known && known->IsObject()) { cfg.known_min_hits = std::max(1, known->ValueOr("min_hits", cfg.known_min_hits)); cfg.known_hit_window_ms = std::max(0, known->ValueOr("hit_window_ms", cfg.known_hit_window_ms)); cfg.known_reentry_cooldown_ms = std::max(0, known->ValueOr("reentry_cooldown_ms", cfg.known_reentry_cooldown_ms)); } if (const SimpleJson* unknown = agg->Find("unknown"); unknown && unknown->IsObject()) { cfg.unknown_min_track_age_ms = std::max(0, unknown->ValueOr("min_track_age_ms", cfg.unknown_min_track_age_ms)); cfg.unknown_min_quality_hits = std::max(1, unknown->ValueOr("min_quality_hits", cfg.unknown_min_quality_hits)); } return cfg; } FaceTrackDecision UpdateFaceTrackState( const FaceTrackAggregationConfig& cfg, FaceTrackState& state, std::unordered_map& known_identity_last_alarm_ms, const FaceRecogItem& item, uint64_t now_ms) { FaceTrackDecision decision; if (item.person_track_id < 0) return decision; const uint64_t expire_ms = static_cast(std::max(0, cfg.state_expire_ms)); const bool expired = expire_ms > 0 && state.last_seen_ms > 0 && now_ms > state.last_seen_ms && (now_ms - state.last_seen_ms) > expire_ms; if (state.track_id != item.person_track_id || expired) { ResetFaceTrackState(state, item.person_track_id, now_ms); } else if (state.first_seen_ms == 0) { state.first_seen_ms = now_ms; state.last_seen_ms = now_ms; } else { state.last_seen_ms = now_ms; } PruneKnownHitTimes(state, cfg, now_ms); if (FaceRecogStateIsKnown(item.state)) { const bool same_identity = state.best_known_person_id == item.best_person_id && state.best_known_name == item.best_name; if (!same_identity) { ResetFaceTrackState(state, item.person_track_id, now_ms); state.best_known_person_id = item.best_person_id; state.best_known_name = item.best_name; } state.known_hit_times.push_back(now_ms); state.quality_hits = 0; if (!state.reported_known && static_cast(state.known_hit_times.size()) >= std::max(1, cfg.known_min_hits)) { if (cfg.known_reentry_cooldown_ms > 0 && state.best_known_person_id >= 0) { const auto it_last = known_identity_last_alarm_ms.find(state.best_known_person_id); if (it_last != known_identity_last_alarm_ms.end() && now_ms >= it_last->second && (now_ms - it_last->second) < static_cast(cfg.known_reentry_cooldown_ms)) { state.known_reentry_suppressed_until_ms = std::max(state.known_reentry_suppressed_until_ms, it_last->second + static_cast(cfg.known_reentry_cooldown_ms)); return decision; } } state.reported_known = true; state.reported_unknown = false; if (state.best_known_person_id >= 0) { known_identity_last_alarm_ms[state.best_known_person_id] = now_ms; } decision.trigger_known = true; } return decision; } if (state.reported_known) return decision; if (!state.reported_known && state.best_known_person_id >= 0 && state.known_reentry_suppressed_until_ms > 0) { return decision; } if (state.known_reentry_suppressed_until_ms > now_ms) return decision; if (IsKnownLeaningTrack(state, cfg)) return decision; ++state.quality_hits; if (!state.reported_unknown && now_ms >= state.first_seen_ms && static_cast(now_ms - state.first_seen_ms) >= std::max(0, cfg.unknown_min_track_age_ms) && state.quality_hits >= std::max(1, cfg.unknown_min_quality_hits)) { state.reported_unknown = true; decision.trigger_unknown = true; } return decision; } uint64_t NowEpochMs() { using namespace std::chrono; return static_cast(duration_cast(system_clock::now().time_since_epoch()).count()); } std::string SanitizeEventIdPart(const std::string& value) { std::string out; out.reserve(value.size()); for (unsigned char ch : value) { out.push_back(std::isalnum(ch) ? static_cast(ch) : '_'); } return out.empty() ? "event" : out; } class RateLimitedAction final : public IAlarmAction { public: RateLimitedAction(std::unique_ptr inner, uint64_t min_interval_ms) : inner_(std::move(inner)), min_interval_ms_(min_interval_ms) {} bool Init(const SimpleJson& config) override { // Inner is expected to be initialized already; keep compatibility if called. return inner_ ? inner_->Init(config) : false; } void Execute(AlarmEvent& event, std::shared_ptr frame) override { if (!inner_) return; if (min_interval_ms_ == 0) { inner_->Execute(event, std::move(frame)); return; } const uint64_t now = NowEpochMs(); uint64_t last = last_exec_ms_.load(std::memory_order_relaxed); while (true) { if (last != 0 && now > last && (now - last) < min_interval_ms_) { return; } // Best-effort: only one thread should win the update. if (last_exec_ms_.compare_exchange_weak(last, now, std::memory_order_relaxed)) { break; } // last is updated by compare_exchange_weak; loop and re-check. } inner_->Execute(event, std::move(frame)); } void Drain() override { if (inner_) inner_->Drain(); } void Stop() override { if (inner_) inner_->Stop(); } std::string Name() const override { return inner_ ? inner_->Name() : std::string("unknown"); } private: std::unique_ptr inner_; uint64_t min_interval_ms_ = 0; std::atomic last_exec_ms_{0}; }; std::unique_ptr WrapIfRateLimited(std::unique_ptr action, const SimpleJson& cfg) { if (!action) return nullptr; const uint64_t min_interval_ms = static_cast(std::max( 0, static_cast(cfg.ValueOr("min_interval_ms", 0)))); if (min_interval_ms == 0) return action; return std::make_unique(std::move(action), min_interval_ms); } } // namespace class AlarmNode : public INode { private: struct AlarmJob { AlarmEvent event; std::shared_ptr frame; }; static std::shared_ptr TryGetEncodedMeta(const std::shared_ptr& frame) { if (!frame || !frame->user_meta) return nullptr; auto meta = std::static_pointer_cast(frame->user_meta); if (!meta || meta->magic != EncodedVideoFrameMeta::kMagic) return nullptr; if (!meta->codec || meta->pkt.data.empty() || meta->pkt.pts_ms <= 0) return nullptr; return meta; } public: std::string Id() const override { return id_; } std::string Type() const override { return "alarm"; } bool Init(const SimpleJson& config, const NodeContext& ctx) override { id_ = config.ValueOr("id", "alarm"); ParseFaceRules(config); track_agg_cfg_ = ParseFaceTrackAggregationConfig(config); const FaceDebugConfig face_debug = ParseFaceDebugConfig(config); face_debug_log_unknown_candidates_ = face_debug.log_unknown_candidates; face_debug_unknown_candidate_interval_ms_ = face_debug.unknown_candidate_interval_ms; last_unknown_candidate_log_ms_ = 0; face_track_states_.clear(); known_identity_last_alarm_ms_.clear(); // Optional evaluation throttle (for alarm side). 0 = disabled. eval_interval_ms_ = std::max(0, static_cast(config.ValueOr("eval_interval_ms", 0))); if (eval_interval_ms_ <= 0) { const double eval_fps = config.ValueOr("eval_fps", 0.0); if (eval_fps > 0.0) { eval_interval_ms_ = static_cast(1000.0 / eval_fps); if (eval_interval_ms_ < 1) eval_interval_ms_ = 1; } } // Parse labels for class name mapping if (const SimpleJson* labels_cfg = config.Find("labels")) { for (const auto& label : labels_cfg->AsArray()) { labels_.push_back(label.AsString("")); } } // Initialize rule engine if (const SimpleJson* rules_cfg = config.Find("rules")) { if (!rule_engine_.Init(*rules_cfg, labels_)) { LogError("[alarm] failed to init rule engine"); return false; } } // Derive packet ring window from clip action config int pre_sec = 5; int post_sec = 0; int fps_hint = 25; if (const SimpleJson* actions_cfg = config.Find("actions")) { if (const SimpleJson* clip_cfg = actions_cfg->Find("clip")) { pre_sec = clip_cfg->ValueOr("pre_sec", 5); post_sec = clip_cfg->ValueOr("post_sec", 0); fps_hint = clip_cfg->ValueOr("fps", 25); } } const int window_sec = std::max(1, std::max(0, pre_sec) + std::max(0, post_sec) + 2); packet_buffer_ = std::make_shared(window_sec, fps_hint); // Alarm worker queue (avoid blocking Process()) size_t alarm_q_size = 32; QueueDropStrategy alarm_q_strategy = QueueDropStrategy::DropOldest; if (const SimpleJson* q = config.Find("event_queue")) { if (q->IsObject()) { alarm_q_size = static_cast(q->ValueOr("size", static_cast(alarm_q_size))); const std::string strat = q->ValueOr("strategy", "drop_oldest"); if (strat == "drop_newest") alarm_q_strategy = QueueDropStrategy::DropNewest; else if (strat == "block") alarm_q_strategy = QueueDropStrategy::Block; else alarm_q_strategy = QueueDropStrategy::DropOldest; } } alarm_queue_size_ = alarm_q_size; alarm_queue_strategy_ = alarm_q_strategy; alarm_queue_ = std::make_shared>(alarm_queue_size_, alarm_queue_strategy_); // Initialize actions if (const SimpleJson* actions_cfg = config.Find("actions")) { // Log action if (const SimpleJson* log_cfg = actions_cfg->Find("log")) { if (log_cfg->ValueOr("enable", true)) { auto action = std::make_unique(); if (action->Init(*log_cfg)) { actions_.push_back(WrapIfRateLimited(std::move(action), *log_cfg)); } } } // Snapshot action (must be before HTTP to fill snapshot_url) if (const SimpleJson* snap_cfg = actions_cfg->Find("snapshot")) { if (snap_cfg->ValueOr("enable", false)) { auto action = std::make_unique(); if (action->Init(*snap_cfg)) { actions_.push_back(WrapIfRateLimited(std::move(action), *snap_cfg)); } } } // Clip action (must be before HTTP to fill clip_url) if (const SimpleJson* clip_cfg = actions_cfg->Find("clip")) { if (clip_cfg->ValueOr("enable", false)) { auto action = std::make_unique(); if (action->Init(*clip_cfg)) { auto* clip_ptr = static_cast(action.get()); clip_ptr->SetPacketBuffer(packet_buffer_); actions_.push_back(WrapIfRateLimited(std::move(action), *clip_cfg)); } } } // HTTP action (should be after snapshot/clip to include media URLs) if (const SimpleJson* http_cfg = actions_cfg->Find("http")) { if (http_cfg->ValueOr("enable", false)) { auto action = std::make_unique(); if (action->Init(*http_cfg)) { actions_.push_back(WrapIfRateLimited(std::move(action), *http_cfg)); } } } // External API action (token + message). Should be after snapshot/clip to include media URLs. if (const SimpleJson* ext_cfg = actions_cfg->Find("external_api")) { if (ext_cfg->ValueOr("enable", false)) { auto action = std::make_unique(); if (action->Init(*ext_cfg)) { actions_.push_back(WrapIfRateLimited(std::move(action), *ext_cfg)); } } } } else { // Default: just log action auto action = std::make_unique(); SimpleJson empty_cfg; action->Init(empty_cfg); actions_.push_back(std::move(action)); } input_queue_ = ctx.input_queue; if (!input_queue_) { LogError("[alarm] no input queue for node " + id_); return false; } LogInfo("[alarm] initialized with " + std::to_string(actions_.size()) + " actions"); return true; } bool Start() override { worker_running_.store(true); worker_ = std::thread(&AlarmNode::WorkerLoop, this); LogInfo("[alarm] started"); return true; } void Stop() override { worker_running_.store(false); if (alarm_queue_) alarm_queue_->Stop(); if (worker_.joinable()) worker_.join(); // Ensure all actions fully stop (Drain only clears pending work; Stop must reclaim threads/resources). for (auto& action : actions_) action->Drain(); for (auto& action : actions_) action->Stop(); LogInfo("[alarm] stopped, processed " + std::to_string(processed_frames_) + " frames, triggered " + std::to_string(alarm_count_) + " alarms"); } void Drain() override { // First drain alarm jobs (so snapshot/clip are finished before draining HTTP queue). while (alarm_queue_ && (alarm_queue_->Size() > 0 || in_flight_.load() > 0)) { std::this_thread::sleep_for(std::chrono::milliseconds(50)); } for (auto& action : actions_) action->Drain(); } bool UpdateConfig(const SimpleJson& new_config) override { const std::string new_id = new_config.ValueOr("id", id_); if (!new_id.empty() && new_id != id_) { return false; } // Parse labels std::vector new_labels; if (const SimpleJson* labels_cfg = new_config.Find("labels")) { for (const auto& label : labels_cfg->AsArray()) { new_labels.push_back(label.AsString("")); } } // Build new rule engine RuleEngine new_engine; if (const SimpleJson* rules_cfg = new_config.Find("rules")) { if (!new_engine.Init(*rules_cfg, new_labels)) { return false; } } // Parse face rules (independent from detection rules) std::vector new_face_rules = ParseFaceRulesFromConfig(new_config); FaceTrackAggregationConfig new_track_agg_cfg = ParseFaceTrackAggregationConfig(new_config); const FaceDebugConfig new_face_debug = ParseFaceDebugConfig(new_config); // Packet ring window settings int pre_sec = 5; int post_sec = 0; int fps_hint = 25; if (const SimpleJson* actions_cfg = new_config.Find("actions")) { if (const SimpleJson* clip_cfg = actions_cfg->Find("clip")) { pre_sec = clip_cfg->ValueOr("pre_sec", 5); post_sec = clip_cfg->ValueOr("post_sec", 0); fps_hint = clip_cfg->ValueOr("fps", 25); } } const int window_sec = std::max(1, std::max(0, pre_sec) + std::max(0, post_sec) + 2); auto new_packet_buffer = std::make_shared(window_sec, fps_hint); // Initialize new actions std::vector> new_actions; if (const SimpleJson* actions_cfg = new_config.Find("actions")) { if (const SimpleJson* log_cfg = actions_cfg->Find("log")) { if (log_cfg->ValueOr("enable", true)) { auto action = std::make_unique(); if (action->Init(*log_cfg)) { new_actions.push_back(std::move(action)); } } } if (const SimpleJson* snap_cfg = actions_cfg->Find("snapshot")) { if (snap_cfg->ValueOr("enable", false)) { auto action = std::make_unique(); if (action->Init(*snap_cfg)) { new_actions.push_back(std::move(action)); } } } if (const SimpleJson* clip_cfg = actions_cfg->Find("clip")) { if (clip_cfg->ValueOr("enable", false)) { auto action = std::make_unique(); if (action->Init(*clip_cfg)) { auto* clip_ptr = static_cast(action.get()); clip_ptr->SetPacketBuffer(new_packet_buffer); new_actions.push_back(std::move(action)); } } } if (const SimpleJson* http_cfg = actions_cfg->Find("http")) { if (http_cfg->ValueOr("enable", false)) { auto action = std::make_unique(); if (action->Init(*http_cfg)) { new_actions.push_back(std::move(action)); } } } if (const SimpleJson* ext_cfg = actions_cfg->Find("external_api")) { if (ext_cfg->ValueOr("enable", false)) { auto action = std::make_unique(); if (action->Init(*ext_cfg)) { new_actions.push_back(std::move(action)); } } } } if (new_actions.empty()) { auto action = std::make_unique(); SimpleJson empty_cfg; if (action->Init(empty_cfg)) { new_actions.push_back(std::move(action)); } } // Stop worker thread to avoid executing actions while swapping. std::shared_ptr> old_queue; std::thread old_worker; { std::lock_guard lock(mu_); worker_running_.store(false); old_queue = alarm_queue_; old_worker = std::move(worker_); } if (old_queue) old_queue->Stop(); if (old_worker.joinable()) old_worker.join(); std::vector> old_actions; { std::lock_guard lock(mu_); labels_ = std::move(new_labels); rule_engine_ = std::move(new_engine); face_rules_ = std::move(new_face_rules); track_agg_cfg_ = new_track_agg_cfg; face_debug_log_unknown_candidates_ = new_face_debug.log_unknown_candidates; face_debug_unknown_candidate_interval_ms_ = new_face_debug.unknown_candidate_interval_ms; last_unknown_candidate_log_ms_ = 0; face_last_trigger_.clear(); face_person_last_trigger_.clear(); face_track_states_.clear(); known_identity_last_alarm_ms_.clear(); packet_buffer_ = std::move(new_packet_buffer); old_actions = std::move(actions_); actions_ = std::move(new_actions); in_flight_.store(0); last_eval_pts_ms_ = 0; alarm_queue_ = std::make_shared>(alarm_queue_size_, alarm_queue_strategy_); worker_running_.store(true); worker_ = std::thread(&AlarmNode::WorkerLoop, this); } for (auto& action : old_actions) action->Drain(); for (auto& action : old_actions) action->Stop(); return true; } bool GetCustomMetrics(SimpleJson& out) const override { std::lock_guard lock(mu_); SimpleJson::Object o; o["alarm_total"] = SimpleJson(static_cast(alarm_count_)); o["processed"] = SimpleJson(static_cast(processed_frames_)); out = SimpleJson(std::move(o)); return true; } NodeStatus Process(FramePtr frame) override { if (!frame) return NodeStatus::DROP; if (packet_buffer_) { if (auto meta = TryGetEncodedMeta(frame)) { packet_buffer_->Push(std::move(meta)); } } { std::lock_guard lock(mu_); // 调试:检查 frame 和 det if (frame->det) { int no_boots_count = 0; for (const auto& d : frame->det->items) { if (d.cls_id == 10) no_boots_count++; } // Log throttled } if (eval_interval_ms_ > 0 && frame->pts > 0) { const int64_t pts_ms = static_cast(frame->pts / 1000ULL); if (last_eval_pts_ms_ > 0 && (pts_ms - last_eval_pts_ms_) < eval_interval_ms_) { ++processed_frames_; return NodeStatus::OK; } last_eval_pts_ms_ = pts_ms; } auto result = rule_engine_.Evaluate(frame); if (result.matched) { LogInfo("[alarm] RULE MATCHED: " + result.rule_name); TriggerAlarm(result, frame); } EvaluateFaceRulesLocked(frame); ++processed_frames_; } return NodeStatus::OK; } private: struct FaceRule { enum class Kind { Unknown, Person }; std::string name; Kind kind = Kind::Unknown; std::vector persons; // used when kind==Person; empty => any known person int cooldown_ms = 5000; int per_person_cooldown_ms = 0; float min_face_area_ratio = 0.0f; float min_face_aspect = 0.0f; float max_face_aspect = 0.0f; float min_sim = 0.0f; float max_known_sim = 0.0f; }; struct FaceRuleEval { bool matched = false; std::string reject_reason; std::string detail; double face_area_ratio = 0.0; double face_aspect = 0.0; }; struct FaceDebugConfig { bool log_unknown_candidates = false; uint64_t unknown_candidate_interval_ms = 0; }; static std::string Fixed3(double value) { std::ostringstream oss; oss << std::fixed << std::setprecision(3) << value; return oss.str(); } static std::string Fixed6(double value) { std::ostringstream oss; oss << std::fixed << std::setprecision(6) << value; return oss.str(); } static FaceDebugConfig ParseFaceDebugConfig(const SimpleJson& config) { FaceDebugConfig out; const SimpleJson* debug = config.Find("face_debug"); if (!debug || !debug->IsObject()) return out; out.log_unknown_candidates = debug->ValueOr("log_unknown_candidates", out.log_unknown_candidates); out.unknown_candidate_interval_ms = static_cast(std::max( 0, static_cast(debug->ValueOr("unknown_candidate_interval_ms", 0)))); return out; } static std::vector ParseFaceRulesFromConfig(const SimpleJson& config) { std::vector rules; const SimpleJson* fr = config.Find("face_rules"); if (!fr || !fr->IsArray()) return rules; for (const auto& item : fr->AsArray()) { if (!item.IsObject()) continue; FaceRule r; r.name = item.ValueOr("name", "face"); std::string type = item.ValueOr("type", "unknown"); for (auto& c : type) c = static_cast(std::tolower(static_cast(c))); r.kind = (type == "person") ? FaceRule::Kind::Person : FaceRule::Kind::Unknown; r.cooldown_ms = std::max(0, item.ValueOr("cooldown_ms", r.cooldown_ms)); r.per_person_cooldown_ms = std::max(0, item.ValueOr("per_person_cooldown_ms", r.per_person_cooldown_ms)); r.min_face_area_ratio = static_cast( item.ValueOr("min_face_area_ratio", static_cast(r.min_face_area_ratio))); r.min_face_aspect = static_cast( item.ValueOr("min_face_aspect", static_cast(r.min_face_aspect))); r.max_face_aspect = static_cast( item.ValueOr("max_face_aspect", static_cast(r.max_face_aspect))); r.min_sim = static_cast(item.ValueOr("min_sim", static_cast(r.min_sim))); r.max_known_sim = static_cast( item.ValueOr("max_known_sim", static_cast(r.max_known_sim))); if (const SimpleJson* persons = item.Find("persons"); persons && persons->IsArray()) { for (const auto& p : persons->AsArray()) { const std::string s = p.AsString(""); if (!s.empty()) r.persons.push_back(s); } } rules.push_back(std::move(r)); } return rules; } void ParseFaceRules(const SimpleJson& config) { face_rules_ = ParseFaceRulesFromConfig(config); face_last_trigger_.clear(); face_person_last_trigger_.clear(); face_track_states_.clear(); known_identity_last_alarm_ms_.clear(); } static bool FaceItemMatchesRule( const FaceRule& rule, const FaceRecogItem& it, double img_area, FaceRuleEval* eval = nullptr) { FaceRuleEval local; FaceRuleEval& out = eval ? *eval : local; out = FaceRuleEval{}; if (rule.min_face_area_ratio > 0.0f && img_area > 0.0) { const double a = static_cast(it.bbox.w) * static_cast(it.bbox.h); const double r = a / img_area; out.face_area_ratio = r; if (r < static_cast(rule.min_face_area_ratio)) { out.reject_reason = "min_face_area_ratio"; out.detail = "area_ratio=" + Fixed6(r) + " min=" + Fixed6(rule.min_face_area_ratio); return false; } } if (rule.min_face_aspect > 0.0f || rule.max_face_aspect > 0.0f) { const double h = std::max(1e-6, static_cast(it.bbox.h)); const double aspect = static_cast(it.bbox.w) / h; out.face_aspect = aspect; if (rule.min_face_aspect > 0.0f && aspect < static_cast(rule.min_face_aspect)) { out.reject_reason = "min_face_aspect"; out.detail = "aspect=" + Fixed3(aspect) + " min=" + Fixed3(rule.min_face_aspect); return false; } if (rule.max_face_aspect > 0.0f && aspect > static_cast(rule.max_face_aspect)) { out.reject_reason = "max_face_aspect"; out.detail = "aspect=" + Fixed3(aspect) + " max=" + Fixed3(rule.max_face_aspect); return false; } } if (rule.kind == FaceRule::Kind::Unknown) { if (FaceRecogStateIsKnown(it.state)) { out.reject_reason = "known_state"; out.detail = "status=known"; return false; } if (rule.max_known_sim > 0.0f && it.best_sim >= rule.max_known_sim) { out.reject_reason = "max_known_sim"; out.detail = "best_sim=" + Fixed3(it.best_sim) + " max=" + Fixed3(rule.max_known_sim); return false; } } else { if (!FaceRecogStateIsKnown(it.state)) { out.reject_reason = "not_known"; out.detail = std::string("status=") + FaceRecogStateName(it.state); return false; } if (it.best_sim < rule.min_sim) { out.reject_reason = "min_sim"; out.detail = "best_sim=" + Fixed3(it.best_sim) + " min=" + Fixed3(rule.min_sim); return false; } if (!rule.persons.empty()) { bool ok = false; for (const auto& p : rule.persons) { if (p == it.best_name) { ok = true; break; } } if (!ok) { out.reject_reason = "person_filter"; out.detail = "best_name=" + it.best_name; return false; } } } if (it.person_track_id < 0) { out.reject_reason = "missing_track_id"; out.detail = "person_track_id=-1"; return false; } out.matched = true; return true; } void EvaluateFaceRulesLocked(const FramePtr& frame) { if (face_rules_.empty()) return; if (!frame || !frame->face_recog || frame->face_recog->items.empty()) return; const auto now = std::chrono::steady_clock::now(); const uint64_t now_epoch_ms = NowEpochMs(); const int img_w = frame->face_recog->img_w > 0 ? frame->face_recog->img_w : frame->width; const int img_h = frame->face_recog->img_h > 0 ? frame->face_recog->img_h : frame->height; const double img_area = (img_w > 0 && img_h > 0) ? static_cast(img_w) * static_cast(img_h) : 0.0; if (track_agg_cfg_.state_expire_ms > 0) { const uint64_t expire_ms = static_cast(track_agg_cfg_.state_expire_ms); for (auto it = face_track_states_.begin(); it != face_track_states_.end();) { const auto& state = it->second; const bool expired = state.last_seen_ms > 0 && now_epoch_ms > state.last_seen_ms && (now_epoch_ms - state.last_seen_ms) > expire_ms; if (expired) { it = face_track_states_.erase(it); } else { ++it; } } } std::vector qualifying_observations(frame->face_recog->items.size(), false); for (const auto& rule : face_rules_) { if (rule.per_person_cooldown_ms <= 0) { const auto it_last = face_last_trigger_.find(rule.name); if (it_last != face_last_trigger_.end() && rule.cooldown_ms > 0) { const auto elapsed = std::chrono::duration_cast(now - it_last->second).count(); if (elapsed < rule.cooldown_ms) { continue; } } } for (size_t i = 0; i < frame->face_recog->items.size(); ++i) { const auto& it = frame->face_recog->items[i]; if (!FaceItemMatchesRule(rule, it, img_area)) continue; if (rule.per_person_cooldown_ms > 0) { const std::string key = BuildFaceTrackKey(rule, it.person_track_id); auto it_last = face_person_last_trigger_.find(key); if (it_last != face_person_last_trigger_.end()) { const auto elapsed = std::chrono::duration_cast(now - it_last->second).count(); if (elapsed < rule.per_person_cooldown_ms) { continue; } } } qualifying_observations[i] = true; } } std::vector track_decisions(frame->face_recog->items.size()); std::vector have_track_decision(frame->face_recog->items.size(), false); for (size_t i = 0; i < frame->face_recog->items.size(); ++i) { if (!qualifying_observations[i]) continue; const auto& it = frame->face_recog->items[i]; auto& state = face_track_states_[it.person_track_id]; track_decisions[i] = UpdateFaceTrackState( track_agg_cfg_, state, known_identity_last_alarm_ms_, it, now_epoch_ms); have_track_decision[i] = true; } if (face_debug_log_unknown_candidates_) { for (const auto& rule : face_rules_) { if (rule.kind != FaceRule::Kind::Unknown) continue; for (size_t i = 0; i < frame->face_recog->items.size(); ++i) { FaceRuleEval eval; const bool rule_matched = FaceItemMatchesRule( rule, frame->face_recog->items[i], img_area, &eval); LogUnknownCandidateDebug( rule, frame, i, img_area, eval, rule_matched, have_track_decision[i] ? &track_decisions[i] : nullptr, now_epoch_ms); } } } for (const auto& rule : face_rules_) { if (rule.per_person_cooldown_ms <= 0) { const auto it_last = face_last_trigger_.find(rule.name); if (it_last != face_last_trigger_.end() && rule.cooldown_ms > 0) { const auto elapsed = std::chrono::duration_cast(now - it_last->second).count(); if (elapsed < rule.cooldown_ms) { continue; } } } bool matched = false; std::string matched_name; std::vector dets; std::vector trigger_keys; std::vector face_matches; dets.reserve(3); trigger_keys.reserve(3); face_matches.reserve(3); for (size_t i = 0; i < frame->face_recog->items.size(); ++i) { const auto& it = frame->face_recog->items[i]; if (!FaceItemMatchesRule(rule, it, img_area)) continue; const std::string key = BuildFaceTrackKey(rule, it.person_track_id); if (rule.per_person_cooldown_ms > 0) { auto it_last = face_person_last_trigger_.find(key); if (it_last != face_person_last_trigger_.end()) { const auto elapsed = std::chrono::duration_cast(now - it_last->second).count(); if (elapsed < rule.per_person_cooldown_ms) { continue; } } } if (!have_track_decision[i]) continue; const FaceTrackDecision& decision = track_decisions[i]; if (rule.kind == FaceRule::Kind::Person && !decision.trigger_known) continue; if (rule.kind == FaceRule::Kind::Unknown && !decision.trigger_unknown) continue; matched = true; if (rule.kind == FaceRule::Kind::Person) matched_name = it.best_name; Detection d; d.cls_id = -1; d.score = it.best_sim; d.bbox = it.bbox; d.track_id = it.person_track_id >= 0 ? it.person_track_id : it.best_person_id; dets.push_back(std::move(d)); trigger_keys.push_back(key); FaceAlarmMatch fm; fm.status = (rule.kind == FaceRule::Kind::Unknown) ? FaceAlarmStatus::Unknown : (FaceRecogStateIsKnown(it.state) ? FaceAlarmStatus::Known : FaceAlarmStatus::Uncertain); fm.candidate_person_id = it.candidate_person_id; fm.candidate_name = it.candidate_name; fm.best_sim = it.best_sim; fm.second_sim = it.second_sim; face_matches.push_back(std::move(fm)); if (dets.size() >= 3) break; } if (!matched) continue; if (rule.per_person_cooldown_ms > 0) { for (const auto& key : trigger_keys) { face_person_last_trigger_[key] = now; } } else { const auto it_last = face_last_trigger_.find(rule.name); if (it_last != face_last_trigger_.end() && rule.cooldown_ms > 0) { const auto elapsed = std::chrono::duration_cast(now - it_last->second).count(); if (elapsed < rule.cooldown_ms) { continue; } } face_last_trigger_[rule.name] = now; } RuleMatchResult fake; fake.matched = true; fake.rule_name = rule.name; if (rule.kind == FaceRule::Kind::Person && !matched_name.empty()) { fake.rule_name = rule.name + ":" + matched_name; } fake.matched_detections = std::move(dets); TriggerAlarm(fake, frame, std::move(face_matches)); } } void LogUnknownCandidateDebug( const FaceRule& rule, const FramePtr& frame, size_t item_index, double img_area, const FaceRuleEval& eval, bool rule_matched, const FaceTrackDecision* decision, uint64_t now_epoch_ms) { if (face_debug_unknown_candidate_interval_ms_ > 0 && last_unknown_candidate_log_ms_ > 0 && now_epoch_ms > last_unknown_candidate_log_ms_ && (now_epoch_ms - last_unknown_candidate_log_ms_) < face_debug_unknown_candidate_interval_ms_) { return; } last_unknown_candidate_log_ms_ = now_epoch_ms; const auto& it = frame->face_recog->items[item_index]; const auto state_it = face_track_states_.find(it.person_track_id); const FaceTrackState* state = (state_it != face_track_states_.end()) ? &state_it->second : nullptr; const uint64_t track_age_ms = (state && state->first_seen_ms > 0 && now_epoch_ms >= state->first_seen_ms) ? (now_epoch_ms - state->first_seen_ms) : 0; const int quality_hits = state ? state->quality_hits : 0; const bool known_leaning = state && (state->reported_known || state->known_hit_times.size() >= static_cast(track_agg_cfg_.known_min_hits)); std::string gate = "rule_rejected"; if (rule_matched && !decision) { gate = "no_track_decision"; } else if (decision && decision->trigger_unknown) { gate = "trigger_unknown"; } else if (state && state->reported_unknown) { gate = "already_reported"; } else if (known_leaning) { gate = "known_leaning"; } else if (rule_matched && track_age_ms < static_cast(track_agg_cfg_.unknown_min_track_age_ms)) { gate = "waiting_track_age"; } else if (rule_matched && quality_hits < track_agg_cfg_.unknown_min_quality_hits) { gate = "waiting_quality_hits"; } const double area_ratio = eval.face_area_ratio > 0.0 ? eval.face_area_ratio : ((img_area > 0.0) ? (static_cast(it.bbox.w) * static_cast(it.bbox.h) / img_area) : 0.0); const double aspect = eval.face_aspect > 0.0 ? eval.face_aspect : (static_cast(it.bbox.w) / std::max(1e-6, static_cast(it.bbox.h))); std::ostringstream oss; oss << "[alarm] unknown_candidate" << " rule=" << rule.name << " frame=" << frame->frame_id << " item=" << item_index << " status=" << FaceRecogStateName(it.state) << " track_id=" << it.person_track_id << " candidate_id=" << it.candidate_person_id << " candidate=" << (it.candidate_name.empty() ? "-" : it.candidate_name) << " best_sim=" << Fixed3(it.best_sim) << " second_sim=" << Fixed3(it.second_sim) << " margin=" << Fixed3(static_cast(it.best_sim) - static_cast(it.second_sim)) << " bbox=(" << Fixed3(it.bbox.x) << "," << Fixed3(it.bbox.y) << "," << Fixed3(it.bbox.w) << "," << Fixed3(it.bbox.h) << ")" << " area_ratio=" << Fixed6(area_ratio) << " aspect=" << Fixed3(aspect) << " rule_matched=" << (rule_matched ? "true" : "false") << " reject_reason=" << (eval.reject_reason.empty() ? "-" : eval.reject_reason) << " reject_detail=" << (eval.detail.empty() ? "-" : eval.detail) << " track_age_ms=" << track_age_ms << " quality_hits=" << quality_hits << "/" << track_agg_cfg_.unknown_min_quality_hits << " min_track_age_ms=" << track_agg_cfg_.unknown_min_track_age_ms << " reported_known=" << (state && state->reported_known ? "true" : "false") << " reported_unknown=" << (state && state->reported_unknown ? "true" : "false") << " gate=" << gate; LogInfo(oss.str()); } static std::string BuildFaceTrackKey(const FaceRule& rule, int track_id) { return rule.name + "#track:" + std::to_string(track_id); } void WorkerLoop() { while (worker_running_.load()) { AlarmJob job; if (!alarm_queue_) { std::this_thread::sleep_for(std::chrono::milliseconds(50)); continue; } if (!alarm_queue_->Pop(job, std::chrono::milliseconds(100))) { continue; } in_flight_.fetch_add(1); for (auto& action : actions_) { action->Execute(job.event, job.frame); } in_flight_.fetch_sub(1); } } void TriggerAlarm(const RuleMatchResult& result, FramePtr frame, std::vector face_matches = {}) { ++alarm_count_; AlarmEvent event; event.node_id = id_; event.rule_name = result.rule_name; event.timestamp_ms = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()).count(); event.frame_id = frame->frame_id; const uint64_t seq = next_event_seq_.fetch_add(1, std::memory_order_relaxed) + 1; event.event_id = id_ + "_" + SanitizeEventIdPart(result.rule_name) + "_" + std::to_string(event.frame_id) + "_" + std::to_string(seq); event.detections = result.matched_detections; event.face_matches = std::move(face_matches); event.behavior_events = result.matched_behavior_events; LogInfo("[alarm] trigger event_id=" + event.event_id + " rule=" + event.rule_name + " frame=" + std::to_string(event.frame_id)); AlarmJob job; job.event = std::move(event); job.frame = std::move(frame); if (alarm_queue_) { alarm_queue_->Push(std::move(job)); } } std::string id_; std::vector labels_; RuleEngine rule_engine_; std::vector face_rules_; std::map face_last_trigger_; std::map face_person_last_trigger_; FaceTrackAggregationConfig track_agg_cfg_; std::map face_track_states_; std::unordered_map known_identity_last_alarm_ms_; bool face_debug_log_unknown_candidates_ = false; uint64_t face_debug_unknown_candidate_interval_ms_ = 0; uint64_t last_unknown_candidate_log_ms_ = 0; std::shared_ptr packet_buffer_; std::vector> actions_; mutable std::mutex mu_; std::shared_ptr> alarm_queue_; size_t alarm_queue_size_ = 32; QueueDropStrategy alarm_queue_strategy_ = QueueDropStrategy::DropOldest; std::atomic worker_running_{false}; std::thread worker_; std::atomic in_flight_{0}; std::shared_ptr> input_queue_; uint64_t processed_frames_ = 0; uint64_t alarm_count_ = 0; std::atomic next_event_seq_{0}; int64_t eval_interval_ms_ = 0; int64_t last_eval_pts_ms_ = 0; }; REGISTER_NODE(AlarmNode, "alarm"); } // namespace rk3588