feat: add behavior event fusion plugin

This commit is contained in:
sladro 2026-04-01 10:20:36 +08:00
parent f275fc0e92
commit 0c98be39cd
5 changed files with 298 additions and 1 deletions

View File

@ -482,6 +482,18 @@ set_target_properties(action_recog PROPERTIES
RUNTIME_OUTPUT_DIRECTORY ${RK_PLUGIN_OUTPUT_DIR}
)
# event_fusion plugin (behavior event id and lifecycle normalization)
add_library(event_fusion SHARED
event_fusion/event_fusion_node.cpp
)
target_include_directories(event_fusion PRIVATE ${CMAKE_SOURCE_DIR}/include ${CMAKE_SOURCE_DIR}/third_party)
target_link_libraries(event_fusion PRIVATE project_options Threads::Threads)
set_target_properties(event_fusion PROPERTIES
OUTPUT_NAME "event_fusion"
LIBRARY_OUTPUT_DIRECTORY ${RK_PLUGIN_OUTPUT_DIR}
RUNTIME_OUTPUT_DIRECTORY ${RK_PLUGIN_OUTPUT_DIR}
)
# storage plugin (continuous recording with segment management)
add_library(storage SHARED
storage/storage_node.cpp
@ -553,7 +565,7 @@ set_target_properties(ai_shoe_det PROPERTIES
RUNTIME_OUTPUT_DIRECTORY ${RK_PLUGIN_OUTPUT_DIR}
)
install(TARGETS input_rtsp input_file publish preprocess ai_yolo ai_face_det ai_scrfd ai_scrfd_sliding ai_face_recog tracker gate osd alarm logic_gate region_event action_recog storage ai_scheduler ai_shoe_det
install(TARGETS input_rtsp input_file publish preprocess ai_yolo ai_face_det ai_scrfd ai_scrfd_sliding ai_face_recog tracker gate osd alarm logic_gate region_event action_recog event_fusion storage ai_scheduler ai_shoe_det
LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}/rk3588-media-server/plugins
RUNTIME DESTINATION ${CMAKE_INSTALL_LIBDIR}/rk3588-media-server/plugins
)

View File

@ -0,0 +1,139 @@
#include "event_fusion_node.h"
#include <algorithm>
#include <map>
#include <memory>
#include <set>
#include <sstream>
#include <string>
#include <utility>
#include <vector>
#include "behavior/behavior_event.h"
namespace rk3588 {
namespace {
struct ActiveEvent {
int event_id = -1;
BehaviorEventItem last_item{};
bool seen_in_frame = false;
};
static std::string TypeName(BehaviorEventType type) {
switch (type) {
case BehaviorEventType::Intrusion: return "intrusion";
case BehaviorEventType::Climb: return "climb";
case BehaviorEventType::Fall: return "fall";
case BehaviorEventType::Fight: return "fight";
}
return "unknown";
}
static std::string MakeKey(const BehaviorEventItem& item) {
std::vector<int> track_ids = item.track_ids;
std::sort(track_ids.begin(), track_ids.end());
std::ostringstream oss;
oss << TypeName(item.type) << "|" << item.region_id << "|" << item.source;
for (int track_id : track_ids) {
oss << "|" << track_id;
}
return oss.str();
}
} // namespace
struct EventFusionNode::Impl {
int next_event_id = 1;
std::map<std::string, ActiveEvent> active_events;
};
EventFusionNode::EventFusionNode() : impl_(std::make_unique<Impl>()) {}
EventFusionNode::~EventFusionNode() = default;
std::string EventFusionNode::Id() const {
return id_;
}
std::string EventFusionNode::Type() const {
return "event_fusion";
}
bool EventFusionNode::Init(const SimpleJson& config, const NodeContext& ctx) {
id_ = config.ValueOr<std::string>("id", "event_fusion");
output_queues_ = ctx.output_queues;
return true;
}
bool EventFusionNode::Start() {
return true;
}
void EventFusionNode::Stop() {}
NodeStatus EventFusionNode::Process(FramePtr frame) {
if (!frame) return NodeStatus::DROP;
EnsureBehaviorEvents(*frame);
for (auto& [_, active] : impl_->active_events) {
active.seen_in_frame = false;
}
std::vector<BehaviorEventItem> normalized;
normalized.reserve(frame->behavior_events->items.size() + impl_->active_events.size());
for (auto item : frame->behavior_events->items) {
const std::string key = MakeKey(item);
auto& active = impl_->active_events[key];
if (active.event_id < 0) {
active.event_id = impl_->next_event_id++;
}
active.seen_in_frame = true;
item.event_id = active.event_id;
active.last_item = item;
normalized.push_back(std::move(item));
}
std::vector<std::string> to_remove;
for (auto& [key, active] : impl_->active_events) {
if (active.seen_in_frame) continue;
BehaviorEventItem ended = active.last_item;
ended.status = BehaviorEventStatus::Ended;
ended.event_id = active.event_id;
ended.last_pts = frame->pts;
if (frame->pts >= ended.start_pts) {
ended.duration_ms = frame->pts - ended.start_pts;
}
normalized.push_back(std::move(ended));
to_remove.push_back(key);
}
for (const auto& key : to_remove) {
impl_->active_events.erase(key);
}
frame->behavior_events->items = std::move(normalized);
PushToDownstream(frame);
return NodeStatus::OK;
}
void EventFusionNode::EnsureBehaviorEvents(Frame& frame) {
if (!frame.behavior_events) {
frame.behavior_events = std::make_shared<BehaviorEventResult>();
}
}
void EventFusionNode::PushToDownstream(const FramePtr& frame) {
for (auto& q : output_queues_) {
if (q) q->Push(frame);
}
}
#ifndef RK3588_TEST_BUILD
REGISTER_NODE(EventFusionNode, "event_fusion");
#endif
} // namespace rk3588

View File

@ -0,0 +1,33 @@
#pragma once
#include <memory>
#include <string>
#include <vector>
#include "node.h"
namespace rk3588 {
class EventFusionNode final : public INode {
public:
EventFusionNode();
~EventFusionNode() override;
std::string Id() const override;
std::string Type() const override;
bool Init(const SimpleJson& config, const NodeContext& ctx) override;
bool Start() override;
void Stop() override;
NodeStatus Process(FramePtr frame) override;
private:
void EnsureBehaviorEvents(Frame& frame);
void PushToDownstream(const FramePtr& frame);
struct Impl;
std::unique_ptr<Impl> impl_;
std::string id_;
std::vector<std::shared_ptr<SpscQueue<FramePtr>>> output_queues_;
};
} // namespace rk3588

View File

@ -40,6 +40,7 @@ add_executable(rk3588_gtests
test_behavior_event_model.cpp
test_region_event.cpp
test_action_recog.cpp
test_event_fusion.cpp
test_infer_backend.cpp
test_image_processor.cpp
test_codec_backend.cpp
@ -51,6 +52,7 @@ add_executable(rk3588_gtests
${CMAKE_SOURCE_DIR}/src/utils/dma_alloc.cpp
${CMAKE_SOURCE_DIR}/plugins/region_event/region_event_node.cpp
${CMAKE_SOURCE_DIR}/plugins/action_recog/action_recog_node.cpp
${CMAKE_SOURCE_DIR}/plugins/event_fusion/event_fusion_node.cpp
)
target_include_directories(rk3588_gtests PRIVATE

111
tests/test_event_fusion.cpp Normal file
View File

@ -0,0 +1,111 @@
#include <gtest/gtest.h>
#include <memory>
#include <string>
#include "behavior/behavior_event.h"
#include "frame/frame.h"
#include "node.h"
#include "utils/simple_json.h"
#include "../plugins/event_fusion/event_fusion_node.h"
namespace rk3588 {
namespace {
SimpleJson ParseFusionConfig(const std::string& text) {
SimpleJson config;
std::string err;
const bool ok = ParseSimpleJson(text, config, err);
EXPECT_TRUE(ok) << err;
return config;
}
TEST(EventFusionTest, AssignsStableEventIdAcrossFramesForSameTrackAndType) {
EventFusionNode node;
const std::string config_text = R"({
"id": "event_fusion"
})";
SimpleJson config = ParseFusionConfig(config_text);
NodeContext ctx;
auto out = std::make_shared<SpscQueue<FramePtr>>(4, QueueDropStrategy::DropOldest);
ctx.output_queues.push_back(out);
ASSERT_TRUE(node.Init(config, ctx));
ASSERT_TRUE(node.Start());
auto frame1 = std::make_shared<Frame>();
frame1->pts = 1000;
frame1->behavior_events = std::make_shared<BehaviorEventResult>();
{
BehaviorEventItem item;
item.type = BehaviorEventType::Fall;
item.status = BehaviorEventStatus::Active;
item.track_ids = {42};
item.source = "action_recog";
frame1->behavior_events->items.push_back(item);
}
auto frame2 = std::make_shared<Frame>();
frame2->pts = 1100;
frame2->behavior_events = std::make_shared<BehaviorEventResult>();
{
BehaviorEventItem item;
item.type = BehaviorEventType::Fall;
item.status = BehaviorEventStatus::Active;
item.track_ids = {42};
item.source = "action_recog";
frame2->behavior_events->items.push_back(item);
}
EXPECT_EQ(static_cast<int>(node.Process(frame1)), static_cast<int>(NodeStatus::OK));
EXPECT_EQ(static_cast<int>(node.Process(frame2)), static_cast<int>(NodeStatus::OK));
ASSERT_EQ(frame1->behavior_events->items.size(), 1u);
ASSERT_EQ(frame2->behavior_events->items.size(), 1u);
EXPECT_GT(frame1->behavior_events->items[0].event_id, 0);
EXPECT_EQ(frame1->behavior_events->items[0].event_id, frame2->behavior_events->items[0].event_id);
}
TEST(EventFusionTest, MarksEventEndedWhenSourceStopsEmittingIt) {
EventFusionNode node;
const std::string config_text = R"({
"id": "event_fusion"
})";
SimpleJson config = ParseFusionConfig(config_text);
NodeContext ctx;
auto out = std::make_shared<SpscQueue<FramePtr>>(4, QueueDropStrategy::DropOldest);
ctx.output_queues.push_back(out);
ASSERT_TRUE(node.Init(config, ctx));
ASSERT_TRUE(node.Start());
auto frame1 = std::make_shared<Frame>();
frame1->pts = 1000;
frame1->behavior_events = std::make_shared<BehaviorEventResult>();
{
BehaviorEventItem item;
item.type = BehaviorEventType::Intrusion;
item.status = BehaviorEventStatus::Active;
item.track_ids = {9};
item.source = "region_event";
item.region_id = "zone_a";
frame1->behavior_events->items.push_back(item);
}
auto frame2 = std::make_shared<Frame>();
frame2->pts = 1300;
frame2->behavior_events = std::make_shared<BehaviorEventResult>();
EXPECT_EQ(static_cast<int>(node.Process(frame1)), static_cast<int>(NodeStatus::OK));
EXPECT_EQ(static_cast<int>(node.Process(frame2)), static_cast<int>(NodeStatus::OK));
ASSERT_EQ(frame2->behavior_events->items.size(), 1u);
EXPECT_EQ(frame2->behavior_events->items[0].type, BehaviorEventType::Intrusion);
EXPECT_EQ(frame2->behavior_events->items[0].status, BehaviorEventStatus::Ended);
EXPECT_GT(frame2->behavior_events->items[0].event_id, 0);
}
} // namespace
} // namespace rk3588