#include #include #include #include #include #include #include #include #include #include "node.h" #include "rule_engine.h" #include "frame_ring_buffer.h" #include "actions/action_base.h" #include "actions/log_action.h" #include "actions/http_action.h" #include "actions/snapshot_action.h" #include "actions/clip_action.h" #include "utils/dma_alloc.h" namespace rk3588 { class AlarmNode : public INode { private: struct AlarmJob { AlarmEvent event; std::shared_ptr frame; }; static std::shared_ptr CloneFrameForRingBuffer(const std::shared_ptr& src) { if (!src) return nullptr; if (src->width <= 0 || src->height <= 0) return nullptr; if (!src->data) return nullptr; const int dma_fd = src->dma_fd; if (dma_fd >= 0) DmaSyncStartFd(dma_fd); struct SyncGuard { int fd; ~SyncGuard() { if (fd >= 0) DmaSyncEndFd(fd); } } guard{dma_fd}; auto out = std::make_shared(); out->width = src->width; out->height = src->height; out->format = src->format; out->pts = src->pts; out->frame_id = src->frame_id; out->det = src->det; out->user_meta = src->user_meta; out->dma_fd = -1; const int w = src->width; const int h = src->height; auto plane_ptr = [&](int idx) -> const uint8_t* { if (idx < 0 || idx >= src->plane_count) return nullptr; if (src->planes[idx].data) return src->planes[idx].data; const int off = src->planes[idx].offset; if (off < 0) return nullptr; return src->data + static_cast(off); }; auto plane_stride = [&](int idx, int fallback) -> int { if (idx >= 0 && idx < src->plane_count && src->planes[idx].stride > 0) return src->planes[idx].stride; if (src->stride > 0) return src->stride; return fallback; }; if (src->format == PixelFormat::NV12) { const uint8_t* sy = plane_ptr(0); const uint8_t* suv = plane_ptr(1); if (!sy) return nullptr; const int sy_stride = plane_stride(0, w); const int suv_stride = plane_stride(1, w); if (!suv) { suv = src->data + static_cast(sy_stride) * static_cast(h); } const int y_stride = w; const int uv_stride = w; const size_t y_bytes = static_cast(y_stride) * static_cast(h); const size_t uv_bytes = static_cast(uv_stride) * static_cast(h / 2); auto buf = std::make_shared>(y_bytes + uv_bytes); out->data_owner = buf; out->data = buf->data(); out->data_size = buf->size(); out->stride = y_stride; out->plane_count = 2; out->planes[0] = {out->data, y_stride, static_cast(y_bytes), 0}; out->planes[1] = {out->data + y_bytes, uv_stride, static_cast(uv_bytes), static_cast(y_bytes)}; // Copy Y for (int row = 0; row < h; ++row) { std::memcpy(out->planes[0].data + row * y_stride, sy + row * sy_stride, static_cast(w)); } // Copy UV (interleaved) for (int row = 0; row < h / 2; ++row) { std::memcpy(out->planes[1].data + row * uv_stride, suv + row * suv_stride, static_cast(w)); } return out; } if (src->format == PixelFormat::YUV420) { const uint8_t* sy = plane_ptr(0); const uint8_t* su = plane_ptr(1); const uint8_t* sv = plane_ptr(2); if (!sy || !su || !sv) return nullptr; const int sy_stride = plane_stride(0, w); const int su_stride = plane_stride(1, w / 2); const int sv_stride = plane_stride(2, w / 2); const int y_stride = w; const int uv_stride = w / 2; const size_t y_bytes = static_cast(y_stride) * static_cast(h); const size_t u_bytes = static_cast(uv_stride) * static_cast(h / 2); const size_t v_bytes = u_bytes; auto buf = std::make_shared>(y_bytes + u_bytes + v_bytes); out->data_owner = buf; out->data = buf->data(); out->data_size = buf->size(); out->stride = y_stride; out->plane_count = 3; out->planes[0] = {out->data, y_stride, static_cast(y_bytes), 0}; out->planes[1] = {out->data + y_bytes, uv_stride, static_cast(u_bytes), static_cast(y_bytes)}; out->planes[2] = {out->data + y_bytes + u_bytes, uv_stride, static_cast(v_bytes), static_cast(y_bytes + u_bytes)}; for (int row = 0; row < h; ++row) { std::memcpy(out->planes[0].data + row * y_stride, sy + row * sy_stride, static_cast(w)); } for (int row = 0; row < h / 2; ++row) { std::memcpy(out->planes[1].data + row * uv_stride, su + row * su_stride, static_cast(w / 2)); std::memcpy(out->planes[2].data + row * uv_stride, sv + row * sv_stride, static_cast(w / 2)); } return out; } if (src->format == PixelFormat::RGB || src->format == PixelFormat::BGR) { const uint8_t* s = plane_ptr(0); if (!s) s = src->data; if (!s) return nullptr; const int s_stride = plane_stride(0, w * 3); const int dst_stride = w * 3; const size_t bytes = static_cast(dst_stride) * static_cast(h); auto buf = std::make_shared>(bytes); out->data_owner = buf; out->data = buf->data(); out->data_size = buf->size(); out->stride = dst_stride; out->plane_count = 1; out->planes[0] = {out->data, dst_stride, static_cast(bytes), 0}; for (int row = 0; row < h; ++row) { std::memcpy(out->data + row * dst_stride, s + row * s_stride, static_cast(dst_stride)); } return out; } return nullptr; } public: std::string Id() const override { return id_; } std::string Type() const override { return "alarm"; } bool Init(const SimpleJson& config, const NodeContext& ctx) override { id_ = config.ValueOr("id", "alarm"); // Parse labels for class name mapping if (const SimpleJson* labels_cfg = config.Find("labels")) { for (const auto& label : labels_cfg->AsArray()) { labels_.push_back(label.AsString("")); } } // Initialize rule engine if (const SimpleJson* rules_cfg = config.Find("rules")) { if (!rule_engine_.Init(*rules_cfg, labels_)) { std::cerr << "[alarm] failed to init rule engine\n"; return false; } } // Get pre/post buffer settings from clip action config int pre_sec = 5; int post_sec = 0; int fps_hint = 25; if (const SimpleJson* actions_cfg = config.Find("actions")) { if (const SimpleJson* clip_cfg = actions_cfg->Find("clip")) { pre_sec = clip_cfg->ValueOr("pre_sec", 5); post_sec = clip_cfg->ValueOr("post_sec", 0); fps_hint = clip_cfg->ValueOr("fps", 25); } } frame_buffer_ = std::make_shared(pre_sec, post_sec, fps_hint); // Alarm worker queue (avoid blocking Process()) size_t alarm_q_size = 32; QueueDropStrategy alarm_q_strategy = QueueDropStrategy::DropOldest; if (const SimpleJson* q = config.Find("event_queue")) { if (q->IsObject()) { alarm_q_size = static_cast(q->ValueOr("size", static_cast(alarm_q_size))); const std::string strat = q->ValueOr("strategy", "drop_oldest"); if (strat == "drop_newest") alarm_q_strategy = QueueDropStrategy::DropNewest; else if (strat == "block") alarm_q_strategy = QueueDropStrategy::Block; else alarm_q_strategy = QueueDropStrategy::DropOldest; } } alarm_queue_size_ = alarm_q_size; alarm_queue_strategy_ = alarm_q_strategy; alarm_queue_ = std::make_shared>(alarm_queue_size_, alarm_queue_strategy_); // Initialize actions if (const SimpleJson* actions_cfg = config.Find("actions")) { // Log action if (const SimpleJson* log_cfg = actions_cfg->Find("log")) { if (log_cfg->ValueOr("enable", true)) { auto action = std::make_unique(); if (action->Init(*log_cfg)) { actions_.push_back(std::move(action)); } } } // Snapshot action (must be before HTTP to fill snapshot_url) if (const SimpleJson* snap_cfg = actions_cfg->Find("snapshot")) { if (snap_cfg->ValueOr("enable", false)) { auto action = std::make_unique(); if (action->Init(*snap_cfg)) { actions_.push_back(std::move(action)); } } } // Clip action (must be before HTTP to fill clip_url) if (const SimpleJson* clip_cfg = actions_cfg->Find("clip")) { if (clip_cfg->ValueOr("enable", false)) { auto action = std::make_unique(); if (action->Init(*clip_cfg)) { auto* clip_ptr = static_cast(action.get()); clip_ptr->SetFrameBuffer(frame_buffer_); actions_.push_back(std::move(action)); } } } // HTTP action (should be last to include media URLs) if (const SimpleJson* http_cfg = actions_cfg->Find("http")) { if (http_cfg->ValueOr("enable", false)) { auto action = std::make_unique(); if (action->Init(*http_cfg)) { actions_.push_back(std::move(action)); } } } } else { // Default: just log action auto action = std::make_unique(); SimpleJson empty_cfg; action->Init(empty_cfg); actions_.push_back(std::move(action)); } input_queue_ = ctx.input_queue; if (!input_queue_) { std::cerr << "[alarm] no input queue for node " << id_ << "\n"; return false; } std::cout << "[alarm] initialized with " << actions_.size() << " actions\n"; return true; } bool Start() override { worker_running_.store(true); worker_ = std::thread(&AlarmNode::WorkerLoop, this); std::cout << "[alarm] started\n"; return true; } void Stop() override { worker_running_.store(false); if (alarm_queue_) alarm_queue_->Stop(); if (worker_.joinable()) worker_.join(); // Ensure all actions fully stop (Drain only clears pending work; Stop must reclaim threads/resources). for (auto& action : actions_) action->Drain(); for (auto& action : actions_) action->Stop(); std::cout << "[alarm] stopped, processed " << processed_frames_ << " frames, triggered " << alarm_count_ << " alarms\n"; } void Drain() override { // First drain alarm jobs (so snapshot/clip are finished before draining HTTP queue). while (alarm_queue_ && (alarm_queue_->Size() > 0 || in_flight_.load() > 0)) { std::this_thread::sleep_for(std::chrono::milliseconds(50)); } for (auto& action : actions_) action->Drain(); } bool UpdateConfig(const SimpleJson& new_config) override { const std::string new_id = new_config.ValueOr("id", id_); if (!new_id.empty() && new_id != id_) { return false; } // Parse labels std::vector new_labels; if (const SimpleJson* labels_cfg = new_config.Find("labels")) { for (const auto& label : labels_cfg->AsArray()) { new_labels.push_back(label.AsString("")); } } // Build new rule engine RuleEngine new_engine; if (const SimpleJson* rules_cfg = new_config.Find("rules")) { if (!new_engine.Init(*rules_cfg, new_labels)) { return false; } } // Pre/post buffer settings int pre_sec = 5; int post_sec = 0; int fps_hint = 25; if (const SimpleJson* actions_cfg = new_config.Find("actions")) { if (const SimpleJson* clip_cfg = actions_cfg->Find("clip")) { pre_sec = clip_cfg->ValueOr("pre_sec", 5); post_sec = clip_cfg->ValueOr("post_sec", 0); fps_hint = clip_cfg->ValueOr("fps", 25); } } auto new_frame_buffer = std::make_shared(pre_sec, post_sec, fps_hint); // Initialize new actions std::vector> new_actions; if (const SimpleJson* actions_cfg = new_config.Find("actions")) { if (const SimpleJson* log_cfg = actions_cfg->Find("log")) { if (log_cfg->ValueOr("enable", true)) { auto action = std::make_unique(); if (action->Init(*log_cfg)) { new_actions.push_back(std::move(action)); } } } if (const SimpleJson* snap_cfg = actions_cfg->Find("snapshot")) { if (snap_cfg->ValueOr("enable", false)) { auto action = std::make_unique(); if (action->Init(*snap_cfg)) { new_actions.push_back(std::move(action)); } } } if (const SimpleJson* clip_cfg = actions_cfg->Find("clip")) { if (clip_cfg->ValueOr("enable", false)) { auto action = std::make_unique(); if (action->Init(*clip_cfg)) { auto* clip_ptr = static_cast(action.get()); clip_ptr->SetFrameBuffer(new_frame_buffer); new_actions.push_back(std::move(action)); } } } if (const SimpleJson* http_cfg = actions_cfg->Find("http")) { if (http_cfg->ValueOr("enable", false)) { auto action = std::make_unique(); if (action->Init(*http_cfg)) { new_actions.push_back(std::move(action)); } } } } if (new_actions.empty()) { auto action = std::make_unique(); SimpleJson empty_cfg; if (action->Init(empty_cfg)) { new_actions.push_back(std::move(action)); } } // Stop worker thread to avoid executing actions while swapping. std::shared_ptr> old_queue; std::thread old_worker; { std::lock_guard lock(mu_); worker_running_.store(false); old_queue = alarm_queue_; old_worker = std::move(worker_); } if (old_queue) old_queue->Stop(); if (old_worker.joinable()) old_worker.join(); std::vector> old_actions; { std::lock_guard lock(mu_); labels_ = std::move(new_labels); rule_engine_ = std::move(new_engine); frame_buffer_ = std::move(new_frame_buffer); old_actions = std::move(actions_); actions_ = std::move(new_actions); in_flight_.store(0); alarm_queue_ = std::make_shared>(alarm_queue_size_, alarm_queue_strategy_); worker_running_.store(true); worker_ = std::thread(&AlarmNode::WorkerLoop, this); } for (auto& action : old_actions) action->Drain(); for (auto& action : old_actions) action->Stop(); return true; } bool GetCustomMetrics(SimpleJson& out) const override { std::lock_guard lock(mu_); SimpleJson::Object o; o["alarm_total"] = SimpleJson(static_cast(alarm_count_)); o["processed"] = SimpleJson(static_cast(processed_frames_)); out = SimpleJson(std::move(o)); return true; } NodeStatus Process(FramePtr frame) override { if (!frame) return NodeStatus::DROP; // Copy pointer out of lock; FrameRingBuffer is internally synchronized. std::shared_ptr fb; { std::lock_guard lock(mu_); fb = frame_buffer_; } if (fb) { using namespace std::chrono; const uint64_t ts_ms = static_cast(duration_cast(steady_clock::now().time_since_epoch()).count()); // IMPORTANT: store a CPU copy so the ring buffer doesn't extend DMA-BUF lifetime. if (auto copied = CloneFrameForRingBuffer(frame)) { fb->Push(std::move(copied), ts_ms); } } { std::lock_guard lock(mu_); auto result = rule_engine_.Evaluate(frame); if (result.matched) TriggerAlarm(result, frame); ++processed_frames_; } return NodeStatus::OK; } private: void WorkerLoop() { while (worker_running_.load()) { AlarmJob job; if (!alarm_queue_) { std::this_thread::sleep_for(std::chrono::milliseconds(50)); continue; } if (!alarm_queue_->Pop(job, std::chrono::milliseconds(100))) { continue; } in_flight_.fetch_add(1); for (auto& action : actions_) { action->Execute(job.event, job.frame); } in_flight_.fetch_sub(1); } } void TriggerAlarm(const RuleMatchResult& result, FramePtr frame) { ++alarm_count_; AlarmEvent event; event.node_id = id_; event.rule_name = result.rule_name; event.timestamp_ms = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()).count(); event.frame_id = frame->frame_id; event.detections = result.matched_detections; AlarmJob job; job.event = std::move(event); job.frame = std::move(frame); if (alarm_queue_) { alarm_queue_->Push(std::move(job)); } } std::string id_; std::vector labels_; RuleEngine rule_engine_; std::shared_ptr frame_buffer_; std::vector> actions_; mutable std::mutex mu_; std::shared_ptr> alarm_queue_; size_t alarm_queue_size_ = 32; QueueDropStrategy alarm_queue_strategy_ = QueueDropStrategy::DropOldest; std::atomic worker_running_{false}; std::thread worker_; std::atomic in_flight_{0}; std::shared_ptr> input_queue_; uint64_t processed_frames_ = 0; uint64_t alarm_count_ = 0; }; REGISTER_NODE(AlarmNode, "alarm"); } // namespace rk3588