OrangePi3588Media/plugins/alarm/alarm_node.cpp
2026-04-18 20:58:31 +08:00

1162 lines
48 KiB
C++

#include <atomic>
#include <chrono>
#include <cctype>
#include <cstdint>
#include <cstring>
#include <algorithm>
#include <deque>
#include <iomanip>
#include <map>
#include <memory>
#include <mutex>
#include <sstream>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>
#include "utils/logger.h"
#include "face/face_result.h"
#include "node.h"
#include "rule_engine.h"
#include "packet_ring_buffer.h"
#include "actions/action_base.h"
#include "actions/log_action.h"
#include "actions/http_action.h"
#include "actions/external_api_action.h"
#include "actions/snapshot_action.h"
#include "actions/clip_action.h"
namespace rk3588 {
namespace {
struct FaceTrackAggregationConfig {
int known_min_hits = 3;
int known_hit_window_ms = 3000;
int known_reentry_cooldown_ms = 0;
int unknown_min_track_age_ms = 2000;
int unknown_min_quality_hits = 4;
int state_expire_ms = 5000;
};
struct FaceTrackState {
int track_id = -1;
uint64_t first_seen_ms = 0;
uint64_t last_seen_ms = 0;
int best_known_person_id = -1;
std::string best_known_name;
int quality_hits = 0;
bool reported_known = false;
bool reported_unknown = false;
uint64_t known_reentry_suppressed_until_ms = 0;
std::deque<uint64_t> known_hit_times;
};
struct FaceTrackDecision {
bool trigger_known = false;
bool trigger_unknown = false;
};
void PruneKnownHitTimes(
FaceTrackState& state,
const FaceTrackAggregationConfig& cfg,
uint64_t now_ms) {
const uint64_t known_window_ms = static_cast<uint64_t>(std::max(0, cfg.known_hit_window_ms));
if (known_window_ms == 0) return;
while (!state.known_hit_times.empty() &&
now_ms > state.known_hit_times.front() &&
(now_ms - state.known_hit_times.front()) > known_window_ms) {
state.known_hit_times.pop_front();
}
}
bool IsKnownLeaningTrack(
const FaceTrackState& state,
const FaceTrackAggregationConfig& cfg) {
return !state.reported_known &&
!state.known_hit_times.empty() &&
static_cast<int>(state.known_hit_times.size()) < std::max(1, cfg.known_min_hits) &&
(state.best_known_person_id >= 0 || !state.best_known_name.empty());
}
void ResetFaceTrackState(FaceTrackState& state, int track_id, uint64_t now_ms) {
state = FaceTrackState{};
state.track_id = track_id;
state.first_seen_ms = now_ms;
state.last_seen_ms = now_ms;
}
FaceTrackAggregationConfig ParseFaceTrackAggregationConfig(const SimpleJson& config) {
FaceTrackAggregationConfig cfg;
const SimpleJson* agg = config.Find("face_track_aggregation");
if (!agg || !agg->IsObject()) return cfg;
cfg.known_min_hits = std::max(1, agg->ValueOr<int>("known_min_hits", cfg.known_min_hits));
cfg.known_hit_window_ms = std::max(0, agg->ValueOr<int>("known_hit_window_ms", cfg.known_hit_window_ms));
cfg.known_reentry_cooldown_ms = std::max(0, agg->ValueOr<int>("known_reentry_cooldown_ms", cfg.known_reentry_cooldown_ms));
cfg.unknown_min_track_age_ms = std::max(0, agg->ValueOr<int>("unknown_min_track_age_ms", cfg.unknown_min_track_age_ms));
cfg.unknown_min_quality_hits = std::max(1, agg->ValueOr<int>("unknown_min_quality_hits", cfg.unknown_min_quality_hits));
cfg.state_expire_ms = std::max(0, agg->ValueOr<int>("state_expire_ms", cfg.state_expire_ms));
if (const SimpleJson* known = agg->Find("known"); known && known->IsObject()) {
cfg.known_min_hits = std::max(1, known->ValueOr<int>("min_hits", cfg.known_min_hits));
cfg.known_hit_window_ms = std::max(0, known->ValueOr<int>("hit_window_ms", cfg.known_hit_window_ms));
cfg.known_reentry_cooldown_ms = std::max(0, known->ValueOr<int>("reentry_cooldown_ms", cfg.known_reentry_cooldown_ms));
}
if (const SimpleJson* unknown = agg->Find("unknown"); unknown && unknown->IsObject()) {
cfg.unknown_min_track_age_ms = std::max(0, unknown->ValueOr<int>("min_track_age_ms", cfg.unknown_min_track_age_ms));
cfg.unknown_min_quality_hits = std::max(1, unknown->ValueOr<int>("min_quality_hits", cfg.unknown_min_quality_hits));
}
return cfg;
}
FaceTrackDecision UpdateFaceTrackState(
const FaceTrackAggregationConfig& cfg,
FaceTrackState& state,
std::unordered_map<int, uint64_t>& known_identity_last_alarm_ms,
const FaceRecogItem& item,
uint64_t now_ms) {
FaceTrackDecision decision;
if (item.person_track_id < 0) return decision;
const uint64_t expire_ms = static_cast<uint64_t>(std::max(0, cfg.state_expire_ms));
const bool expired =
expire_ms > 0 &&
state.last_seen_ms > 0 &&
now_ms > state.last_seen_ms &&
(now_ms - state.last_seen_ms) > expire_ms;
if (state.track_id != item.person_track_id || expired) {
ResetFaceTrackState(state, item.person_track_id, now_ms);
} else if (state.first_seen_ms == 0) {
state.first_seen_ms = now_ms;
state.last_seen_ms = now_ms;
} else {
state.last_seen_ms = now_ms;
}
PruneKnownHitTimes(state, cfg, now_ms);
if (FaceRecogStateIsKnown(item.state)) {
const bool same_identity =
state.best_known_person_id == item.best_person_id &&
state.best_known_name == item.best_name;
if (!same_identity) {
ResetFaceTrackState(state, item.person_track_id, now_ms);
state.best_known_person_id = item.best_person_id;
state.best_known_name = item.best_name;
}
state.known_hit_times.push_back(now_ms);
state.quality_hits = 0;
if (!state.reported_known &&
static_cast<int>(state.known_hit_times.size()) >= std::max(1, cfg.known_min_hits)) {
if (cfg.known_reentry_cooldown_ms > 0 && state.best_known_person_id >= 0) {
const auto it_last = known_identity_last_alarm_ms.find(state.best_known_person_id);
if (it_last != known_identity_last_alarm_ms.end() &&
now_ms >= it_last->second &&
(now_ms - it_last->second) < static_cast<uint64_t>(cfg.known_reentry_cooldown_ms)) {
state.known_reentry_suppressed_until_ms =
std::max(state.known_reentry_suppressed_until_ms,
it_last->second + static_cast<uint64_t>(cfg.known_reentry_cooldown_ms));
return decision;
}
}
state.reported_known = true;
state.reported_unknown = false;
if (state.best_known_person_id >= 0) {
known_identity_last_alarm_ms[state.best_known_person_id] = now_ms;
}
decision.trigger_known = true;
}
return decision;
}
if (state.reported_known) return decision;
if (!state.reported_known &&
state.best_known_person_id >= 0 &&
state.known_reentry_suppressed_until_ms > 0) {
return decision;
}
if (state.known_reentry_suppressed_until_ms > now_ms) return decision;
if (IsKnownLeaningTrack(state, cfg)) return decision;
++state.quality_hits;
if (!state.reported_unknown &&
now_ms >= state.first_seen_ms &&
static_cast<int>(now_ms - state.first_seen_ms) >= std::max(0, cfg.unknown_min_track_age_ms) &&
state.quality_hits >= std::max(1, cfg.unknown_min_quality_hits)) {
state.reported_unknown = true;
decision.trigger_unknown = true;
}
return decision;
}
uint64_t NowEpochMs() {
using namespace std::chrono;
return static_cast<uint64_t>(duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count());
}
std::string SanitizeEventIdPart(const std::string& value) {
std::string out;
out.reserve(value.size());
for (unsigned char ch : value) {
out.push_back(std::isalnum(ch) ? static_cast<char>(ch) : '_');
}
return out.empty() ? "event" : out;
}
class RateLimitedAction final : public IAlarmAction {
public:
RateLimitedAction(std::unique_ptr<IAlarmAction> inner, uint64_t min_interval_ms)
: inner_(std::move(inner)), min_interval_ms_(min_interval_ms) {}
bool Init(const SimpleJson& config) override {
// Inner is expected to be initialized already; keep compatibility if called.
return inner_ ? inner_->Init(config) : false;
}
void Execute(AlarmEvent& event, std::shared_ptr<Frame> frame) override {
if (!inner_) return;
if (min_interval_ms_ == 0) {
inner_->Execute(event, std::move(frame));
return;
}
const uint64_t now = NowEpochMs();
uint64_t last = last_exec_ms_.load(std::memory_order_relaxed);
while (true) {
if (last != 0 && now > last && (now - last) < min_interval_ms_) {
return;
}
// Best-effort: only one thread should win the update.
if (last_exec_ms_.compare_exchange_weak(last, now, std::memory_order_relaxed)) {
break;
}
// last is updated by compare_exchange_weak; loop and re-check.
}
inner_->Execute(event, std::move(frame));
}
void Drain() override {
if (inner_) inner_->Drain();
}
void Stop() override {
if (inner_) inner_->Stop();
}
std::string Name() const override {
return inner_ ? inner_->Name() : std::string("unknown");
}
private:
std::unique_ptr<IAlarmAction> inner_;
uint64_t min_interval_ms_ = 0;
std::atomic<uint64_t> last_exec_ms_{0};
};
std::unique_ptr<IAlarmAction> WrapIfRateLimited(std::unique_ptr<IAlarmAction> action, const SimpleJson& cfg) {
if (!action) return nullptr;
const uint64_t min_interval_ms = static_cast<uint64_t>(std::max<int64_t>(
0, static_cast<int64_t>(cfg.ValueOr<int>("min_interval_ms", 0))));
if (min_interval_ms == 0) return action;
return std::make_unique<RateLimitedAction>(std::move(action), min_interval_ms);
}
} // namespace
class AlarmNode : public INode {
private:
struct AlarmJob {
AlarmEvent event;
std::shared_ptr<Frame> frame;
};
static std::shared_ptr<EncodedVideoFrameMeta> TryGetEncodedMeta(const std::shared_ptr<Frame>& frame) {
if (!frame || !frame->user_meta) return nullptr;
auto meta = std::static_pointer_cast<EncodedVideoFrameMeta>(frame->user_meta);
if (!meta || meta->magic != EncodedVideoFrameMeta::kMagic) return nullptr;
if (!meta->codec || meta->pkt.data.empty() || meta->pkt.pts_ms <= 0) return nullptr;
return meta;
}
public:
std::string Id() const override { return id_; }
std::string Type() const override { return "alarm"; }
bool Init(const SimpleJson& config, const NodeContext& ctx) override {
id_ = config.ValueOr<std::string>("id", "alarm");
ParseFaceRules(config);
track_agg_cfg_ = ParseFaceTrackAggregationConfig(config);
const FaceDebugConfig face_debug = ParseFaceDebugConfig(config);
face_debug_log_unknown_candidates_ = face_debug.log_unknown_candidates;
face_debug_unknown_candidate_interval_ms_ = face_debug.unknown_candidate_interval_ms;
last_unknown_candidate_log_ms_ = 0;
face_track_states_.clear();
known_identity_last_alarm_ms_.clear();
// Optional evaluation throttle (for alarm side). 0 = disabled.
eval_interval_ms_ = std::max<int64_t>(0, static_cast<int64_t>(config.ValueOr<int>("eval_interval_ms", 0)));
if (eval_interval_ms_ <= 0) {
const double eval_fps = config.ValueOr<double>("eval_fps", 0.0);
if (eval_fps > 0.0) {
eval_interval_ms_ = static_cast<int64_t>(1000.0 / eval_fps);
if (eval_interval_ms_ < 1) eval_interval_ms_ = 1;
}
}
// Parse labels for class name mapping
if (const SimpleJson* labels_cfg = config.Find("labels")) {
for (const auto& label : labels_cfg->AsArray()) {
labels_.push_back(label.AsString(""));
}
}
// Initialize rule engine
if (const SimpleJson* rules_cfg = config.Find("rules")) {
if (!rule_engine_.Init(*rules_cfg, labels_)) {
LogError("[alarm] failed to init rule engine");
return false;
}
}
// Derive packet ring window from clip action config
int pre_sec = 5;
int post_sec = 0;
int fps_hint = 25;
if (const SimpleJson* actions_cfg = config.Find("actions")) {
if (const SimpleJson* clip_cfg = actions_cfg->Find("clip")) {
pre_sec = clip_cfg->ValueOr<int>("pre_sec", 5);
post_sec = clip_cfg->ValueOr<int>("post_sec", 0);
fps_hint = clip_cfg->ValueOr<int>("fps", 25);
}
}
const int window_sec = std::max(1, std::max(0, pre_sec) + std::max(0, post_sec) + 2);
packet_buffer_ = std::make_shared<PacketRingBuffer>(window_sec, fps_hint);
// Alarm worker queue (avoid blocking Process())
size_t alarm_q_size = 32;
QueueDropStrategy alarm_q_strategy = QueueDropStrategy::DropOldest;
if (const SimpleJson* q = config.Find("event_queue")) {
if (q->IsObject()) {
alarm_q_size = static_cast<size_t>(q->ValueOr<int>("size", static_cast<int>(alarm_q_size)));
const std::string strat = q->ValueOr<std::string>("strategy", "drop_oldest");
if (strat == "drop_newest") alarm_q_strategy = QueueDropStrategy::DropNewest;
else if (strat == "block") alarm_q_strategy = QueueDropStrategy::Block;
else alarm_q_strategy = QueueDropStrategy::DropOldest;
}
}
alarm_queue_size_ = alarm_q_size;
alarm_queue_strategy_ = alarm_q_strategy;
alarm_queue_ = std::make_shared<SpscQueue<AlarmJob>>(alarm_queue_size_, alarm_queue_strategy_);
// Initialize actions
if (const SimpleJson* actions_cfg = config.Find("actions")) {
// Log action
if (const SimpleJson* log_cfg = actions_cfg->Find("log")) {
if (log_cfg->ValueOr<bool>("enable", true)) {
auto action = std::make_unique<LogAction>();
if (action->Init(*log_cfg)) {
actions_.push_back(WrapIfRateLimited(std::move(action), *log_cfg));
}
}
}
// Snapshot action (must be before HTTP to fill snapshot_url)
if (const SimpleJson* snap_cfg = actions_cfg->Find("snapshot")) {
if (snap_cfg->ValueOr<bool>("enable", false)) {
auto action = std::make_unique<SnapshotAction>();
if (action->Init(*snap_cfg)) {
actions_.push_back(WrapIfRateLimited(std::move(action), *snap_cfg));
}
}
}
// Clip action (must be before HTTP to fill clip_url)
if (const SimpleJson* clip_cfg = actions_cfg->Find("clip")) {
if (clip_cfg->ValueOr<bool>("enable", false)) {
auto action = std::make_unique<ClipAction>();
if (action->Init(*clip_cfg)) {
auto* clip_ptr = static_cast<ClipAction*>(action.get());
clip_ptr->SetPacketBuffer(packet_buffer_);
actions_.push_back(WrapIfRateLimited(std::move(action), *clip_cfg));
}
}
}
// HTTP action (should be after snapshot/clip to include media URLs)
if (const SimpleJson* http_cfg = actions_cfg->Find("http")) {
if (http_cfg->ValueOr<bool>("enable", false)) {
auto action = std::make_unique<HttpAction>();
if (action->Init(*http_cfg)) {
actions_.push_back(WrapIfRateLimited(std::move(action), *http_cfg));
}
}
}
// External API action (token + message). Should be after snapshot/clip to include media URLs.
if (const SimpleJson* ext_cfg = actions_cfg->Find("external_api")) {
if (ext_cfg->ValueOr<bool>("enable", false)) {
auto action = std::make_unique<ExternalApiAction>();
if (action->Init(*ext_cfg)) {
actions_.push_back(WrapIfRateLimited(std::move(action), *ext_cfg));
}
}
}
} else {
// Default: just log action
auto action = std::make_unique<LogAction>();
SimpleJson empty_cfg;
action->Init(empty_cfg);
actions_.push_back(std::move(action));
}
input_queue_ = ctx.input_queue;
if (!input_queue_) {
LogError("[alarm] no input queue for node " + id_);
return false;
}
LogInfo("[alarm] initialized with " + std::to_string(actions_.size()) + " actions");
return true;
}
bool Start() override {
worker_running_.store(true);
worker_ = std::thread(&AlarmNode::WorkerLoop, this);
LogInfo("[alarm] started");
return true;
}
void Stop() override {
worker_running_.store(false);
if (alarm_queue_) alarm_queue_->Stop();
if (worker_.joinable()) worker_.join();
// Ensure all actions fully stop (Drain only clears pending work; Stop must reclaim threads/resources).
for (auto& action : actions_) action->Drain();
for (auto& action : actions_) action->Stop();
LogInfo("[alarm] stopped, processed " + std::to_string(processed_frames_) +
" frames, triggered " + std::to_string(alarm_count_) + " alarms");
}
void Drain() override {
// First drain alarm jobs (so snapshot/clip are finished before draining HTTP queue).
while (alarm_queue_ && (alarm_queue_->Size() > 0 || in_flight_.load() > 0)) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
for (auto& action : actions_) action->Drain();
}
bool UpdateConfig(const SimpleJson& new_config) override {
const std::string new_id = new_config.ValueOr<std::string>("id", id_);
if (!new_id.empty() && new_id != id_) {
return false;
}
// Parse labels
std::vector<std::string> new_labels;
if (const SimpleJson* labels_cfg = new_config.Find("labels")) {
for (const auto& label : labels_cfg->AsArray()) {
new_labels.push_back(label.AsString(""));
}
}
// Build new rule engine
RuleEngine new_engine;
if (const SimpleJson* rules_cfg = new_config.Find("rules")) {
if (!new_engine.Init(*rules_cfg, new_labels)) {
return false;
}
}
// Parse face rules (independent from detection rules)
std::vector<FaceRule> new_face_rules = ParseFaceRulesFromConfig(new_config);
FaceTrackAggregationConfig new_track_agg_cfg = ParseFaceTrackAggregationConfig(new_config);
const FaceDebugConfig new_face_debug = ParseFaceDebugConfig(new_config);
// Packet ring window settings
int pre_sec = 5;
int post_sec = 0;
int fps_hint = 25;
if (const SimpleJson* actions_cfg = new_config.Find("actions")) {
if (const SimpleJson* clip_cfg = actions_cfg->Find("clip")) {
pre_sec = clip_cfg->ValueOr<int>("pre_sec", 5);
post_sec = clip_cfg->ValueOr<int>("post_sec", 0);
fps_hint = clip_cfg->ValueOr<int>("fps", 25);
}
}
const int window_sec = std::max(1, std::max(0, pre_sec) + std::max(0, post_sec) + 2);
auto new_packet_buffer = std::make_shared<PacketRingBuffer>(window_sec, fps_hint);
// Initialize new actions
std::vector<std::unique_ptr<IAlarmAction>> new_actions;
if (const SimpleJson* actions_cfg = new_config.Find("actions")) {
if (const SimpleJson* log_cfg = actions_cfg->Find("log")) {
if (log_cfg->ValueOr<bool>("enable", true)) {
auto action = std::make_unique<LogAction>();
if (action->Init(*log_cfg)) {
new_actions.push_back(std::move(action));
}
}
}
if (const SimpleJson* snap_cfg = actions_cfg->Find("snapshot")) {
if (snap_cfg->ValueOr<bool>("enable", false)) {
auto action = std::make_unique<SnapshotAction>();
if (action->Init(*snap_cfg)) {
new_actions.push_back(std::move(action));
}
}
}
if (const SimpleJson* clip_cfg = actions_cfg->Find("clip")) {
if (clip_cfg->ValueOr<bool>("enable", false)) {
auto action = std::make_unique<ClipAction>();
if (action->Init(*clip_cfg)) {
auto* clip_ptr = static_cast<ClipAction*>(action.get());
clip_ptr->SetPacketBuffer(new_packet_buffer);
new_actions.push_back(std::move(action));
}
}
}
if (const SimpleJson* http_cfg = actions_cfg->Find("http")) {
if (http_cfg->ValueOr<bool>("enable", false)) {
auto action = std::make_unique<HttpAction>();
if (action->Init(*http_cfg)) {
new_actions.push_back(std::move(action));
}
}
}
if (const SimpleJson* ext_cfg = actions_cfg->Find("external_api")) {
if (ext_cfg->ValueOr<bool>("enable", false)) {
auto action = std::make_unique<ExternalApiAction>();
if (action->Init(*ext_cfg)) {
new_actions.push_back(std::move(action));
}
}
}
}
if (new_actions.empty()) {
auto action = std::make_unique<LogAction>();
SimpleJson empty_cfg;
if (action->Init(empty_cfg)) {
new_actions.push_back(std::move(action));
}
}
// Stop worker thread to avoid executing actions while swapping.
std::shared_ptr<SpscQueue<AlarmJob>> old_queue;
std::thread old_worker;
{
std::lock_guard<std::mutex> lock(mu_);
worker_running_.store(false);
old_queue = alarm_queue_;
old_worker = std::move(worker_);
}
if (old_queue) old_queue->Stop();
if (old_worker.joinable()) old_worker.join();
std::vector<std::unique_ptr<IAlarmAction>> old_actions;
{
std::lock_guard<std::mutex> lock(mu_);
labels_ = std::move(new_labels);
rule_engine_ = std::move(new_engine);
face_rules_ = std::move(new_face_rules);
track_agg_cfg_ = new_track_agg_cfg;
face_debug_log_unknown_candidates_ = new_face_debug.log_unknown_candidates;
face_debug_unknown_candidate_interval_ms_ = new_face_debug.unknown_candidate_interval_ms;
last_unknown_candidate_log_ms_ = 0;
face_last_trigger_.clear();
face_person_last_trigger_.clear();
face_track_states_.clear();
known_identity_last_alarm_ms_.clear();
packet_buffer_ = std::move(new_packet_buffer);
old_actions = std::move(actions_);
actions_ = std::move(new_actions);
in_flight_.store(0);
last_eval_pts_ms_ = 0;
alarm_queue_ = std::make_shared<SpscQueue<AlarmJob>>(alarm_queue_size_, alarm_queue_strategy_);
worker_running_.store(true);
worker_ = std::thread(&AlarmNode::WorkerLoop, this);
}
for (auto& action : old_actions) action->Drain();
for (auto& action : old_actions) action->Stop();
return true;
}
bool GetCustomMetrics(SimpleJson& out) const override {
std::lock_guard<std::mutex> lock(mu_);
SimpleJson::Object o;
o["alarm_total"] = SimpleJson(static_cast<double>(alarm_count_));
o["processed"] = SimpleJson(static_cast<double>(processed_frames_));
out = SimpleJson(std::move(o));
return true;
}
NodeStatus Process(FramePtr frame) override {
if (!frame) return NodeStatus::DROP;
if (packet_buffer_) {
if (auto meta = TryGetEncodedMeta(frame)) {
packet_buffer_->Push(std::move(meta));
}
}
{
std::lock_guard<std::mutex> lock(mu_);
// 调试:检查 frame 和 det
if (frame->det) {
int no_boots_count = 0;
for (const auto& d : frame->det->items) {
if (d.cls_id == 10) no_boots_count++;
}
// Log throttled
}
if (eval_interval_ms_ > 0 && frame->pts > 0) {
const int64_t pts_ms = static_cast<int64_t>(frame->pts / 1000ULL);
if (last_eval_pts_ms_ > 0 && (pts_ms - last_eval_pts_ms_) < eval_interval_ms_) {
++processed_frames_;
return NodeStatus::OK;
}
last_eval_pts_ms_ = pts_ms;
}
auto result = rule_engine_.Evaluate(frame);
if (result.matched) {
LogInfo("[alarm] RULE MATCHED: " + result.rule_name);
TriggerAlarm(result, frame);
}
EvaluateFaceRulesLocked(frame);
++processed_frames_;
}
return NodeStatus::OK;
}
private:
struct FaceRule {
enum class Kind { Unknown, Person };
std::string name;
Kind kind = Kind::Unknown;
std::vector<std::string> persons; // used when kind==Person; empty => any known person
int cooldown_ms = 5000;
int per_person_cooldown_ms = 0;
float min_face_area_ratio = 0.0f;
float min_face_aspect = 0.0f;
float max_face_aspect = 0.0f;
float min_sim = 0.0f;
float max_known_sim = 0.0f;
};
struct FaceRuleEval {
bool matched = false;
std::string reject_reason;
std::string detail;
double face_area_ratio = 0.0;
double face_aspect = 0.0;
};
struct FaceDebugConfig {
bool log_unknown_candidates = false;
uint64_t unknown_candidate_interval_ms = 0;
};
static std::string Fixed3(double value) {
std::ostringstream oss;
oss << std::fixed << std::setprecision(3) << value;
return oss.str();
}
static std::string Fixed6(double value) {
std::ostringstream oss;
oss << std::fixed << std::setprecision(6) << value;
return oss.str();
}
static FaceDebugConfig ParseFaceDebugConfig(const SimpleJson& config) {
FaceDebugConfig out;
const SimpleJson* debug = config.Find("face_debug");
if (!debug || !debug->IsObject()) return out;
out.log_unknown_candidates = debug->ValueOr<bool>("log_unknown_candidates", out.log_unknown_candidates);
out.unknown_candidate_interval_ms = static_cast<uint64_t>(std::max<int64_t>(
0, static_cast<int64_t>(debug->ValueOr<int>("unknown_candidate_interval_ms", 0))));
return out;
}
static std::vector<FaceRule> ParseFaceRulesFromConfig(const SimpleJson& config) {
std::vector<FaceRule> rules;
const SimpleJson* fr = config.Find("face_rules");
if (!fr || !fr->IsArray()) return rules;
for (const auto& item : fr->AsArray()) {
if (!item.IsObject()) continue;
FaceRule r;
r.name = item.ValueOr<std::string>("name", "face");
std::string type = item.ValueOr<std::string>("type", "unknown");
for (auto& c : type) c = static_cast<char>(std::tolower(static_cast<unsigned char>(c)));
r.kind = (type == "person") ? FaceRule::Kind::Person : FaceRule::Kind::Unknown;
r.cooldown_ms = std::max(0, item.ValueOr<int>("cooldown_ms", r.cooldown_ms));
r.per_person_cooldown_ms = std::max(0, item.ValueOr<int>("per_person_cooldown_ms", r.per_person_cooldown_ms));
r.min_face_area_ratio = static_cast<float>(
item.ValueOr<double>("min_face_area_ratio", static_cast<double>(r.min_face_area_ratio)));
r.min_face_aspect = static_cast<float>(
item.ValueOr<double>("min_face_aspect", static_cast<double>(r.min_face_aspect)));
r.max_face_aspect = static_cast<float>(
item.ValueOr<double>("max_face_aspect", static_cast<double>(r.max_face_aspect)));
r.min_sim = static_cast<float>(item.ValueOr<double>("min_sim", static_cast<double>(r.min_sim)));
r.max_known_sim = static_cast<float>(
item.ValueOr<double>("max_known_sim", static_cast<double>(r.max_known_sim)));
if (const SimpleJson* persons = item.Find("persons"); persons && persons->IsArray()) {
for (const auto& p : persons->AsArray()) {
const std::string s = p.AsString("");
if (!s.empty()) r.persons.push_back(s);
}
}
rules.push_back(std::move(r));
}
return rules;
}
void ParseFaceRules(const SimpleJson& config) {
face_rules_ = ParseFaceRulesFromConfig(config);
face_last_trigger_.clear();
face_person_last_trigger_.clear();
face_track_states_.clear();
known_identity_last_alarm_ms_.clear();
}
static bool FaceItemMatchesRule(
const FaceRule& rule,
const FaceRecogItem& it,
double img_area,
FaceRuleEval* eval = nullptr) {
FaceRuleEval local;
FaceRuleEval& out = eval ? *eval : local;
out = FaceRuleEval{};
if (rule.min_face_area_ratio > 0.0f && img_area > 0.0) {
const double a = static_cast<double>(it.bbox.w) * static_cast<double>(it.bbox.h);
const double r = a / img_area;
out.face_area_ratio = r;
if (r < static_cast<double>(rule.min_face_area_ratio)) {
out.reject_reason = "min_face_area_ratio";
out.detail = "area_ratio=" + Fixed6(r) + " min=" + Fixed6(rule.min_face_area_ratio);
return false;
}
}
if (rule.min_face_aspect > 0.0f || rule.max_face_aspect > 0.0f) {
const double h = std::max(1e-6, static_cast<double>(it.bbox.h));
const double aspect = static_cast<double>(it.bbox.w) / h;
out.face_aspect = aspect;
if (rule.min_face_aspect > 0.0f && aspect < static_cast<double>(rule.min_face_aspect)) {
out.reject_reason = "min_face_aspect";
out.detail = "aspect=" + Fixed3(aspect) + " min=" + Fixed3(rule.min_face_aspect);
return false;
}
if (rule.max_face_aspect > 0.0f && aspect > static_cast<double>(rule.max_face_aspect)) {
out.reject_reason = "max_face_aspect";
out.detail = "aspect=" + Fixed3(aspect) + " max=" + Fixed3(rule.max_face_aspect);
return false;
}
}
if (rule.kind == FaceRule::Kind::Unknown) {
if (FaceRecogStateIsKnown(it.state)) {
out.reject_reason = "known_state";
out.detail = "status=known";
return false;
}
if (rule.max_known_sim > 0.0f && it.best_sim >= rule.max_known_sim) {
out.reject_reason = "max_known_sim";
out.detail = "best_sim=" + Fixed3(it.best_sim) + " max=" + Fixed3(rule.max_known_sim);
return false;
}
} else {
if (!FaceRecogStateIsKnown(it.state)) {
out.reject_reason = "not_known";
out.detail = std::string("status=") + FaceRecogStateName(it.state);
return false;
}
if (it.best_sim < rule.min_sim) {
out.reject_reason = "min_sim";
out.detail = "best_sim=" + Fixed3(it.best_sim) + " min=" + Fixed3(rule.min_sim);
return false;
}
if (!rule.persons.empty()) {
bool ok = false;
for (const auto& p : rule.persons) {
if (p == it.best_name) {
ok = true;
break;
}
}
if (!ok) {
out.reject_reason = "person_filter";
out.detail = "best_name=" + it.best_name;
return false;
}
}
}
if (it.person_track_id < 0) {
out.reject_reason = "missing_track_id";
out.detail = "person_track_id=-1";
return false;
}
out.matched = true;
return true;
}
void EvaluateFaceRulesLocked(const FramePtr& frame) {
if (face_rules_.empty()) return;
if (!frame || !frame->face_recog || frame->face_recog->items.empty()) return;
const auto now = std::chrono::steady_clock::now();
const uint64_t now_epoch_ms = NowEpochMs();
const int img_w = frame->face_recog->img_w > 0 ? frame->face_recog->img_w : frame->width;
const int img_h = frame->face_recog->img_h > 0 ? frame->face_recog->img_h : frame->height;
const double img_area = (img_w > 0 && img_h > 0) ? static_cast<double>(img_w) * static_cast<double>(img_h) : 0.0;
if (track_agg_cfg_.state_expire_ms > 0) {
const uint64_t expire_ms = static_cast<uint64_t>(track_agg_cfg_.state_expire_ms);
for (auto it = face_track_states_.begin(); it != face_track_states_.end();) {
const auto& state = it->second;
const bool expired =
state.last_seen_ms > 0 &&
now_epoch_ms > state.last_seen_ms &&
(now_epoch_ms - state.last_seen_ms) > expire_ms;
if (expired) {
it = face_track_states_.erase(it);
} else {
++it;
}
}
}
std::vector<bool> qualifying_observations(frame->face_recog->items.size(), false);
for (const auto& rule : face_rules_) {
if (rule.per_person_cooldown_ms <= 0) {
const auto it_last = face_last_trigger_.find(rule.name);
if (it_last != face_last_trigger_.end() && rule.cooldown_ms > 0) {
const auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(now - it_last->second).count();
if (elapsed < rule.cooldown_ms) {
continue;
}
}
}
for (size_t i = 0; i < frame->face_recog->items.size(); ++i) {
const auto& it = frame->face_recog->items[i];
if (!FaceItemMatchesRule(rule, it, img_area)) continue;
if (rule.per_person_cooldown_ms > 0) {
const std::string key = BuildFaceTrackKey(rule, it.person_track_id);
auto it_last = face_person_last_trigger_.find(key);
if (it_last != face_person_last_trigger_.end()) {
const auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(now - it_last->second).count();
if (elapsed < rule.per_person_cooldown_ms) {
continue;
}
}
}
qualifying_observations[i] = true;
}
}
std::vector<FaceTrackDecision> track_decisions(frame->face_recog->items.size());
std::vector<bool> have_track_decision(frame->face_recog->items.size(), false);
for (size_t i = 0; i < frame->face_recog->items.size(); ++i) {
if (!qualifying_observations[i]) continue;
const auto& it = frame->face_recog->items[i];
auto& state = face_track_states_[it.person_track_id];
track_decisions[i] = UpdateFaceTrackState(
track_agg_cfg_, state, known_identity_last_alarm_ms_, it, now_epoch_ms);
have_track_decision[i] = true;
}
if (face_debug_log_unknown_candidates_) {
for (const auto& rule : face_rules_) {
if (rule.kind != FaceRule::Kind::Unknown) continue;
for (size_t i = 0; i < frame->face_recog->items.size(); ++i) {
FaceRuleEval eval;
const bool rule_matched = FaceItemMatchesRule(
rule, frame->face_recog->items[i], img_area, &eval);
LogUnknownCandidateDebug(
rule,
frame,
i,
img_area,
eval,
rule_matched,
have_track_decision[i] ? &track_decisions[i] : nullptr,
now_epoch_ms);
}
}
}
for (const auto& rule : face_rules_) {
if (rule.per_person_cooldown_ms <= 0) {
const auto it_last = face_last_trigger_.find(rule.name);
if (it_last != face_last_trigger_.end() && rule.cooldown_ms > 0) {
const auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(now - it_last->second).count();
if (elapsed < rule.cooldown_ms) {
continue;
}
}
}
bool matched = false;
std::string matched_name;
std::vector<Detection> dets;
std::vector<std::string> trigger_keys;
std::vector<FaceAlarmMatch> face_matches;
dets.reserve(3);
trigger_keys.reserve(3);
face_matches.reserve(3);
for (size_t i = 0; i < frame->face_recog->items.size(); ++i) {
const auto& it = frame->face_recog->items[i];
if (!FaceItemMatchesRule(rule, it, img_area)) continue;
const std::string key = BuildFaceTrackKey(rule, it.person_track_id);
if (rule.per_person_cooldown_ms > 0) {
auto it_last = face_person_last_trigger_.find(key);
if (it_last != face_person_last_trigger_.end()) {
const auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(now - it_last->second).count();
if (elapsed < rule.per_person_cooldown_ms) {
continue;
}
}
}
if (!have_track_decision[i]) continue;
const FaceTrackDecision& decision = track_decisions[i];
if (rule.kind == FaceRule::Kind::Person && !decision.trigger_known) continue;
if (rule.kind == FaceRule::Kind::Unknown && !decision.trigger_unknown) continue;
matched = true;
if (rule.kind == FaceRule::Kind::Person) matched_name = it.best_name;
Detection d;
d.cls_id = -1;
d.score = it.best_sim;
d.bbox = it.bbox;
d.track_id = it.person_track_id >= 0 ? it.person_track_id : it.best_person_id;
dets.push_back(std::move(d));
trigger_keys.push_back(key);
FaceAlarmMatch fm;
fm.status = (rule.kind == FaceRule::Kind::Unknown)
? FaceAlarmStatus::Unknown
: (FaceRecogStateIsKnown(it.state) ? FaceAlarmStatus::Known : FaceAlarmStatus::Uncertain);
fm.candidate_person_id = it.candidate_person_id;
fm.candidate_name = it.candidate_name;
fm.best_sim = it.best_sim;
fm.second_sim = it.second_sim;
face_matches.push_back(std::move(fm));
if (dets.size() >= 3) break;
}
if (!matched) continue;
if (rule.per_person_cooldown_ms > 0) {
for (const auto& key : trigger_keys) {
face_person_last_trigger_[key] = now;
}
} else {
const auto it_last = face_last_trigger_.find(rule.name);
if (it_last != face_last_trigger_.end() && rule.cooldown_ms > 0) {
const auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(now - it_last->second).count();
if (elapsed < rule.cooldown_ms) {
continue;
}
}
face_last_trigger_[rule.name] = now;
}
RuleMatchResult fake;
fake.matched = true;
fake.rule_name = rule.name;
if (rule.kind == FaceRule::Kind::Person && !matched_name.empty()) {
fake.rule_name = rule.name + ":" + matched_name;
}
fake.matched_detections = std::move(dets);
TriggerAlarm(fake, frame, std::move(face_matches));
}
}
void LogUnknownCandidateDebug(
const FaceRule& rule,
const FramePtr& frame,
size_t item_index,
double img_area,
const FaceRuleEval& eval,
bool rule_matched,
const FaceTrackDecision* decision,
uint64_t now_epoch_ms) {
if (face_debug_unknown_candidate_interval_ms_ > 0 &&
last_unknown_candidate_log_ms_ > 0 &&
now_epoch_ms > last_unknown_candidate_log_ms_ &&
(now_epoch_ms - last_unknown_candidate_log_ms_) < face_debug_unknown_candidate_interval_ms_) {
return;
}
last_unknown_candidate_log_ms_ = now_epoch_ms;
const auto& it = frame->face_recog->items[item_index];
const auto state_it = face_track_states_.find(it.person_track_id);
const FaceTrackState* state = (state_it != face_track_states_.end()) ? &state_it->second : nullptr;
const uint64_t track_age_ms = (state && state->first_seen_ms > 0 && now_epoch_ms >= state->first_seen_ms)
? (now_epoch_ms - state->first_seen_ms)
: 0;
const int quality_hits = state ? state->quality_hits : 0;
const bool known_leaning = state &&
(state->reported_known || state->known_hit_times.size() >= static_cast<size_t>(track_agg_cfg_.known_min_hits));
std::string gate = "rule_rejected";
if (rule_matched && !decision) {
gate = "no_track_decision";
} else if (decision && decision->trigger_unknown) {
gate = "trigger_unknown";
} else if (state && state->reported_unknown) {
gate = "already_reported";
} else if (known_leaning) {
gate = "known_leaning";
} else if (rule_matched && track_age_ms < static_cast<uint64_t>(track_agg_cfg_.unknown_min_track_age_ms)) {
gate = "waiting_track_age";
} else if (rule_matched && quality_hits < track_agg_cfg_.unknown_min_quality_hits) {
gate = "waiting_quality_hits";
}
const double area_ratio = eval.face_area_ratio > 0.0
? eval.face_area_ratio
: ((img_area > 0.0) ? (static_cast<double>(it.bbox.w) * static_cast<double>(it.bbox.h) / img_area) : 0.0);
const double aspect = eval.face_aspect > 0.0
? eval.face_aspect
: (static_cast<double>(it.bbox.w) / std::max(1e-6, static_cast<double>(it.bbox.h)));
std::ostringstream oss;
oss << "[alarm] unknown_candidate"
<< " rule=" << rule.name
<< " frame=" << frame->frame_id
<< " item=" << item_index
<< " status=" << FaceRecogStateName(it.state)
<< " track_id=" << it.person_track_id
<< " candidate_id=" << it.candidate_person_id
<< " candidate=" << (it.candidate_name.empty() ? "-" : it.candidate_name)
<< " best_sim=" << Fixed3(it.best_sim)
<< " second_sim=" << Fixed3(it.second_sim)
<< " margin=" << Fixed3(static_cast<double>(it.best_sim) - static_cast<double>(it.second_sim))
<< " bbox=(" << Fixed3(it.bbox.x) << "," << Fixed3(it.bbox.y)
<< "," << Fixed3(it.bbox.w) << "," << Fixed3(it.bbox.h) << ")"
<< " area_ratio=" << Fixed6(area_ratio)
<< " aspect=" << Fixed3(aspect)
<< " rule_matched=" << (rule_matched ? "true" : "false")
<< " reject_reason=" << (eval.reject_reason.empty() ? "-" : eval.reject_reason)
<< " reject_detail=" << (eval.detail.empty() ? "-" : eval.detail)
<< " track_age_ms=" << track_age_ms
<< " quality_hits=" << quality_hits << "/" << track_agg_cfg_.unknown_min_quality_hits
<< " min_track_age_ms=" << track_agg_cfg_.unknown_min_track_age_ms
<< " reported_known=" << (state && state->reported_known ? "true" : "false")
<< " reported_unknown=" << (state && state->reported_unknown ? "true" : "false")
<< " gate=" << gate;
LogInfo(oss.str());
}
static std::string BuildFaceTrackKey(const FaceRule& rule, int track_id) {
return rule.name + "#track:" + std::to_string(track_id);
}
void WorkerLoop() {
while (worker_running_.load()) {
AlarmJob job;
if (!alarm_queue_) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
continue;
}
if (!alarm_queue_->Pop(job, std::chrono::milliseconds(100))) {
continue;
}
in_flight_.fetch_add(1);
for (auto& action : actions_) {
action->Execute(job.event, job.frame);
}
in_flight_.fetch_sub(1);
}
}
void TriggerAlarm(const RuleMatchResult& result, FramePtr frame, std::vector<FaceAlarmMatch> face_matches = {}) {
++alarm_count_;
AlarmEvent event;
event.node_id = id_;
event.rule_name = result.rule_name;
event.timestamp_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
event.frame_id = frame->frame_id;
const uint64_t seq = next_event_seq_.fetch_add(1, std::memory_order_relaxed) + 1;
event.event_id = id_ + "_" + SanitizeEventIdPart(result.rule_name) + "_" +
std::to_string(event.frame_id) + "_" + std::to_string(seq);
event.detections = result.matched_detections;
event.face_matches = std::move(face_matches);
event.behavior_events = result.matched_behavior_events;
LogInfo("[alarm] trigger event_id=" + event.event_id +
" rule=" + event.rule_name +
" frame=" + std::to_string(event.frame_id));
AlarmJob job;
job.event = std::move(event);
job.frame = std::move(frame);
if (alarm_queue_) {
alarm_queue_->Push(std::move(job));
}
}
std::string id_;
std::vector<std::string> labels_;
RuleEngine rule_engine_;
std::vector<FaceRule> face_rules_;
std::map<std::string, std::chrono::steady_clock::time_point> face_last_trigger_;
std::map<std::string, std::chrono::steady_clock::time_point> face_person_last_trigger_;
FaceTrackAggregationConfig track_agg_cfg_;
std::map<int, FaceTrackState> face_track_states_;
std::unordered_map<int, uint64_t> known_identity_last_alarm_ms_;
bool face_debug_log_unknown_candidates_ = false;
uint64_t face_debug_unknown_candidate_interval_ms_ = 0;
uint64_t last_unknown_candidate_log_ms_ = 0;
std::shared_ptr<PacketRingBuffer> packet_buffer_;
std::vector<std::unique_ptr<IAlarmAction>> actions_;
mutable std::mutex mu_;
std::shared_ptr<SpscQueue<AlarmJob>> alarm_queue_;
size_t alarm_queue_size_ = 32;
QueueDropStrategy alarm_queue_strategy_ = QueueDropStrategy::DropOldest;
std::atomic<bool> worker_running_{false};
std::thread worker_;
std::atomic<size_t> in_flight_{0};
std::shared_ptr<SpscQueue<FramePtr>> input_queue_;
uint64_t processed_frames_ = 0;
uint64_t alarm_count_ = 0;
std::atomic<uint64_t> next_event_seq_{0};
int64_t eval_interval_ms_ = 0;
int64_t last_eval_pts_ms_ = 0;
};
REGISTER_NODE(AlarmNode, "alarm");
} // namespace rk3588