OrangePi3588Media/plugins/gate/gate_node.cpp
2026-01-13 21:14:25 +08:00

209 lines
6.5 KiB
C++

#include <atomic>
#include <chrono>
#include <cstdint>
#include <mutex>
#include <set>
#include <string>
#include <utility>
#include "node.h"
#include "utils/logger.h"
#include "utils/shared_state.h"
namespace rk3588 {
namespace {
struct ConfigSnapshot {
std::string id;
std::string state_key;
std::set<int> class_ids; // empty => any
bool require_confirmed = true;
int min_count = 1;
int64_t max_age_ms = 800; // state freshness
int64_t min_interval_ms = 300; // throttle output
float min_box_area_ratio = 0.0f; // optional filter
bool pass_through_if_no_state = false;
};
static bool ParseIntSet(const SimpleJson& arr, std::set<int>& out, std::string& err) {
if (!arr.IsArray()) {
err = "expected array";
return false;
}
out.clear();
for (const auto& it : arr.AsArray()) {
const int v = it.AsInt(-1);
if (v < 0) continue;
out.insert(v);
}
return true;
}
static bool BuildConfigSnapshot(const SimpleJson& config, std::shared_ptr<const ConfigSnapshot>& out,
std::string& err) {
auto snap = std::make_shared<ConfigSnapshot>();
snap->id = config.ValueOr<std::string>("id", "gate");
snap->state_key = config.ValueOr<std::string>("state_key", "");
snap->require_confirmed = config.ValueOr<bool>("require_confirmed", true);
snap->min_count = config.ValueOr<int>("min_count", 1);
snap->max_age_ms = static_cast<int64_t>(config.ValueOr<int>("max_age_ms", 800));
snap->min_interval_ms = static_cast<int64_t>(config.ValueOr<int>("min_interval_ms", 300));
snap->min_box_area_ratio = config.ValueOr<float>("min_box_area_ratio", 0.0f);
snap->pass_through_if_no_state = config.ValueOr<bool>("pass_through_if_no_state", false);
if (const SimpleJson* ids = config.Find("class_ids")) {
if (!ParseIntSet(*ids, snap->class_ids, err)) {
err = "class_ids: " + err;
return false;
}
}
if (snap->min_count < 1) snap->min_count = 1;
if (snap->max_age_ms < 0) snap->max_age_ms = 0;
if (snap->min_interval_ms < 0) snap->min_interval_ms = 0;
if (snap->min_box_area_ratio < 0.0f) snap->min_box_area_ratio = 0.0f;
if (snap->min_box_area_ratio > 1.0f) snap->min_box_area_ratio = 1.0f;
out = std::move(snap);
return true;
}
} // namespace
class GateNode final : public INode {
public:
std::string Id() const override { return id_; }
std::string Type() const override { return "gate"; }
bool Init(const SimpleJson& config, const NodeContext& ctx) override {
std::string err;
std::shared_ptr<const ConfigSnapshot> snap;
if (!BuildConfigSnapshot(config, snap, err)) {
LogError("[GateNode] invalid config: " + err);
return false;
}
id_ = snap->id;
ctx_ = ctx;
{
std::lock_guard<std::mutex> lk(mu_);
cfg_ = std::move(snap);
}
return true;
}
bool Start() override { return true; }
void Stop() override {}
bool UpdateConfig(const SimpleJson& new_config) override {
std::string err;
std::shared_ptr<const ConfigSnapshot> snap;
if (!BuildConfigSnapshot(new_config, snap, err)) {
LogWarn("[GateNode] UpdateConfig ignored: " + err);
return false;
}
if (snap->id != id_) {
LogWarn("[GateNode] UpdateConfig ignored: id mismatch");
return false;
}
std::lock_guard<std::mutex> lk(mu_);
cfg_ = std::move(snap);
return true;
}
NodeStatus Process(FramePtr frame) override {
if (!frame) return NodeStatus::DROP;
std::shared_ptr<const ConfigSnapshot> cfg;
{
std::lock_guard<std::mutex> lk(mu_);
cfg = cfg_;
}
if (!cfg) return NodeStatus::DROP;
if (cfg->state_key.empty()) {
// Without a state source, this node cannot decide; choose conservative behavior.
if (cfg->pass_through_if_no_state) {
PushToDownstream(std::move(frame));
return NodeStatus::OK;
}
return NodeStatus::DROP;
}
const uint64_t now_us = NowSteadyUs();
auto snap = SharedState::Instance().GetTargets(cfg->state_key);
if (!snap) {
if (cfg->pass_through_if_no_state) {
PushToDownstream(std::move(frame));
return NodeStatus::OK;
}
return NodeStatus::DROP;
}
if (cfg->max_age_ms > 0) {
const uint64_t age_us = (now_us > snap->update_steady_us) ? (now_us - snap->update_steady_us) : 0;
if (age_us > static_cast<uint64_t>(cfg->max_age_ms) * 1000ULL) {
return NodeStatus::DROP;
}
}
const double img_area = (snap->img_w > 0 && snap->img_h > 0) ?
(static_cast<double>(snap->img_w) * static_cast<double>(snap->img_h)) :
0.0;
int match_count = 0;
for (const auto& obj : snap->objects) {
if (!cfg->class_ids.empty() && cfg->class_ids.count(obj.cls_id) == 0) continue;
if (cfg->require_confirmed && !obj.confirmed) continue;
if (cfg->min_box_area_ratio > 0.0f && img_area > 0.0) {
const double a = static_cast<double>(obj.bbox.w) * static_cast<double>(obj.bbox.h);
const double r = a / img_area;
if (r < static_cast<double>(cfg->min_box_area_ratio)) continue;
}
++match_count;
if (match_count >= cfg->min_count) break;
}
if (match_count < cfg->min_count) {
return NodeStatus::DROP;
}
if (cfg->min_interval_ms > 0) {
const uint64_t last = last_emit_us_.load(std::memory_order_relaxed);
if (last != 0 && now_us < last + static_cast<uint64_t>(cfg->min_interval_ms) * 1000ULL) {
return NodeStatus::DROP;
}
last_emit_us_.store(now_us, std::memory_order_relaxed);
}
PushToDownstream(std::move(frame));
return NodeStatus::OK;
}
private:
void PushToDownstream(FramePtr frame) {
for (auto& q : ctx_.output_queues) {
if (!q) continue;
q->push(frame);
}
}
std::string id_;
NodeContext ctx_{};
mutable std::mutex mu_;
std::shared_ptr<const ConfigSnapshot> cfg_;
std::atomic<uint64_t> last_emit_us_{0};
};
REGISTER_NODE(GateNode, "gate");
} // namespace rk3588