diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index de183c8..7ad7c5b 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -482,6 +482,18 @@ set_target_properties(action_recog PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${RK_PLUGIN_OUTPUT_DIR} ) +# event_fusion plugin (behavior event id and lifecycle normalization) +add_library(event_fusion SHARED + event_fusion/event_fusion_node.cpp +) +target_include_directories(event_fusion PRIVATE ${CMAKE_SOURCE_DIR}/include ${CMAKE_SOURCE_DIR}/third_party) +target_link_libraries(event_fusion PRIVATE project_options Threads::Threads) +set_target_properties(event_fusion PROPERTIES + OUTPUT_NAME "event_fusion" + LIBRARY_OUTPUT_DIRECTORY ${RK_PLUGIN_OUTPUT_DIR} + RUNTIME_OUTPUT_DIRECTORY ${RK_PLUGIN_OUTPUT_DIR} +) + # storage plugin (continuous recording with segment management) add_library(storage SHARED storage/storage_node.cpp @@ -553,7 +565,7 @@ set_target_properties(ai_shoe_det PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${RK_PLUGIN_OUTPUT_DIR} ) -install(TARGETS input_rtsp input_file publish preprocess ai_yolo ai_face_det ai_scrfd ai_scrfd_sliding ai_face_recog tracker gate osd alarm logic_gate region_event action_recog storage ai_scheduler ai_shoe_det +install(TARGETS input_rtsp input_file publish preprocess ai_yolo ai_face_det ai_scrfd ai_scrfd_sliding ai_face_recog tracker gate osd alarm logic_gate region_event action_recog event_fusion storage ai_scheduler ai_shoe_det LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}/rk3588-media-server/plugins RUNTIME DESTINATION ${CMAKE_INSTALL_LIBDIR}/rk3588-media-server/plugins ) diff --git a/plugins/event_fusion/event_fusion_node.cpp b/plugins/event_fusion/event_fusion_node.cpp new file mode 100644 index 0000000..d17df50 --- /dev/null +++ b/plugins/event_fusion/event_fusion_node.cpp @@ -0,0 +1,139 @@ +#include "event_fusion_node.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "behavior/behavior_event.h" + +namespace rk3588 { +namespace { + +struct ActiveEvent { + int event_id = -1; + BehaviorEventItem last_item{}; + bool seen_in_frame = false; +}; + +static std::string TypeName(BehaviorEventType type) { + switch (type) { + case BehaviorEventType::Intrusion: return "intrusion"; + case BehaviorEventType::Climb: return "climb"; + case BehaviorEventType::Fall: return "fall"; + case BehaviorEventType::Fight: return "fight"; + } + return "unknown"; +} + +static std::string MakeKey(const BehaviorEventItem& item) { + std::vector track_ids = item.track_ids; + std::sort(track_ids.begin(), track_ids.end()); + + std::ostringstream oss; + oss << TypeName(item.type) << "|" << item.region_id << "|" << item.source; + for (int track_id : track_ids) { + oss << "|" << track_id; + } + return oss.str(); +} + +} // namespace + +struct EventFusionNode::Impl { + int next_event_id = 1; + std::map active_events; +}; + +EventFusionNode::EventFusionNode() : impl_(std::make_unique()) {} + +EventFusionNode::~EventFusionNode() = default; + +std::string EventFusionNode::Id() const { + return id_; +} + +std::string EventFusionNode::Type() const { + return "event_fusion"; +} + +bool EventFusionNode::Init(const SimpleJson& config, const NodeContext& ctx) { + id_ = config.ValueOr("id", "event_fusion"); + output_queues_ = ctx.output_queues; + return true; +} + +bool EventFusionNode::Start() { + return true; +} + +void EventFusionNode::Stop() {} + +NodeStatus EventFusionNode::Process(FramePtr frame) { + if (!frame) return NodeStatus::DROP; + + EnsureBehaviorEvents(*frame); + for (auto& [_, active] : impl_->active_events) { + active.seen_in_frame = false; + } + + std::vector normalized; + normalized.reserve(frame->behavior_events->items.size() + impl_->active_events.size()); + + for (auto item : frame->behavior_events->items) { + const std::string key = MakeKey(item); + auto& active = impl_->active_events[key]; + if (active.event_id < 0) { + active.event_id = impl_->next_event_id++; + } + active.seen_in_frame = true; + item.event_id = active.event_id; + active.last_item = item; + normalized.push_back(std::move(item)); + } + + std::vector to_remove; + for (auto& [key, active] : impl_->active_events) { + if (active.seen_in_frame) continue; + + BehaviorEventItem ended = active.last_item; + ended.status = BehaviorEventStatus::Ended; + ended.event_id = active.event_id; + ended.last_pts = frame->pts; + if (frame->pts >= ended.start_pts) { + ended.duration_ms = frame->pts - ended.start_pts; + } + normalized.push_back(std::move(ended)); + to_remove.push_back(key); + } + + for (const auto& key : to_remove) { + impl_->active_events.erase(key); + } + + frame->behavior_events->items = std::move(normalized); + PushToDownstream(frame); + return NodeStatus::OK; +} + +void EventFusionNode::EnsureBehaviorEvents(Frame& frame) { + if (!frame.behavior_events) { + frame.behavior_events = std::make_shared(); + } +} + +void EventFusionNode::PushToDownstream(const FramePtr& frame) { + for (auto& q : output_queues_) { + if (q) q->Push(frame); + } +} + +#ifndef RK3588_TEST_BUILD +REGISTER_NODE(EventFusionNode, "event_fusion"); +#endif + +} // namespace rk3588 diff --git a/plugins/event_fusion/event_fusion_node.h b/plugins/event_fusion/event_fusion_node.h new file mode 100644 index 0000000..702a097 --- /dev/null +++ b/plugins/event_fusion/event_fusion_node.h @@ -0,0 +1,33 @@ +#pragma once + +#include +#include +#include + +#include "node.h" + +namespace rk3588 { + +class EventFusionNode final : public INode { +public: + EventFusionNode(); + ~EventFusionNode() override; + + std::string Id() const override; + std::string Type() const override; + bool Init(const SimpleJson& config, const NodeContext& ctx) override; + bool Start() override; + void Stop() override; + NodeStatus Process(FramePtr frame) override; + +private: + void EnsureBehaviorEvents(Frame& frame); + void PushToDownstream(const FramePtr& frame); + + struct Impl; + std::unique_ptr impl_; + std::string id_; + std::vector>> output_queues_; +}; + +} // namespace rk3588 diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index b20b6da..60115ed 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -40,6 +40,7 @@ add_executable(rk3588_gtests test_behavior_event_model.cpp test_region_event.cpp test_action_recog.cpp + test_event_fusion.cpp test_infer_backend.cpp test_image_processor.cpp test_codec_backend.cpp @@ -51,6 +52,7 @@ add_executable(rk3588_gtests ${CMAKE_SOURCE_DIR}/src/utils/dma_alloc.cpp ${CMAKE_SOURCE_DIR}/plugins/region_event/region_event_node.cpp ${CMAKE_SOURCE_DIR}/plugins/action_recog/action_recog_node.cpp + ${CMAKE_SOURCE_DIR}/plugins/event_fusion/event_fusion_node.cpp ) target_include_directories(rk3588_gtests PRIVATE diff --git a/tests/test_event_fusion.cpp b/tests/test_event_fusion.cpp new file mode 100644 index 0000000..37f09dd --- /dev/null +++ b/tests/test_event_fusion.cpp @@ -0,0 +1,111 @@ +#include + +#include +#include + +#include "behavior/behavior_event.h" +#include "frame/frame.h" +#include "node.h" +#include "utils/simple_json.h" +#include "../plugins/event_fusion/event_fusion_node.h" + +namespace rk3588 { +namespace { + +SimpleJson ParseFusionConfig(const std::string& text) { + SimpleJson config; + std::string err; + const bool ok = ParseSimpleJson(text, config, err); + EXPECT_TRUE(ok) << err; + return config; +} + +TEST(EventFusionTest, AssignsStableEventIdAcrossFramesForSameTrackAndType) { + EventFusionNode node; + + const std::string config_text = R"({ + "id": "event_fusion" + })"; + + SimpleJson config = ParseFusionConfig(config_text); + NodeContext ctx; + auto out = std::make_shared>(4, QueueDropStrategy::DropOldest); + ctx.output_queues.push_back(out); + + ASSERT_TRUE(node.Init(config, ctx)); + ASSERT_TRUE(node.Start()); + + auto frame1 = std::make_shared(); + frame1->pts = 1000; + frame1->behavior_events = std::make_shared(); + { + BehaviorEventItem item; + item.type = BehaviorEventType::Fall; + item.status = BehaviorEventStatus::Active; + item.track_ids = {42}; + item.source = "action_recog"; + frame1->behavior_events->items.push_back(item); + } + + auto frame2 = std::make_shared(); + frame2->pts = 1100; + frame2->behavior_events = std::make_shared(); + { + BehaviorEventItem item; + item.type = BehaviorEventType::Fall; + item.status = BehaviorEventStatus::Active; + item.track_ids = {42}; + item.source = "action_recog"; + frame2->behavior_events->items.push_back(item); + } + + EXPECT_EQ(static_cast(node.Process(frame1)), static_cast(NodeStatus::OK)); + EXPECT_EQ(static_cast(node.Process(frame2)), static_cast(NodeStatus::OK)); + ASSERT_EQ(frame1->behavior_events->items.size(), 1u); + ASSERT_EQ(frame2->behavior_events->items.size(), 1u); + EXPECT_GT(frame1->behavior_events->items[0].event_id, 0); + EXPECT_EQ(frame1->behavior_events->items[0].event_id, frame2->behavior_events->items[0].event_id); +} + +TEST(EventFusionTest, MarksEventEndedWhenSourceStopsEmittingIt) { + EventFusionNode node; + + const std::string config_text = R"({ + "id": "event_fusion" + })"; + + SimpleJson config = ParseFusionConfig(config_text); + NodeContext ctx; + auto out = std::make_shared>(4, QueueDropStrategy::DropOldest); + ctx.output_queues.push_back(out); + + ASSERT_TRUE(node.Init(config, ctx)); + ASSERT_TRUE(node.Start()); + + auto frame1 = std::make_shared(); + frame1->pts = 1000; + frame1->behavior_events = std::make_shared(); + { + BehaviorEventItem item; + item.type = BehaviorEventType::Intrusion; + item.status = BehaviorEventStatus::Active; + item.track_ids = {9}; + item.source = "region_event"; + item.region_id = "zone_a"; + frame1->behavior_events->items.push_back(item); + } + + auto frame2 = std::make_shared(); + frame2->pts = 1300; + frame2->behavior_events = std::make_shared(); + + EXPECT_EQ(static_cast(node.Process(frame1)), static_cast(NodeStatus::OK)); + EXPECT_EQ(static_cast(node.Process(frame2)), static_cast(NodeStatus::OK)); + ASSERT_EQ(frame2->behavior_events->items.size(), 1u); + EXPECT_EQ(frame2->behavior_events->items[0].type, BehaviorEventType::Intrusion); + EXPECT_EQ(frame2->behavior_events->items[0].status, BehaviorEventStatus::Ended); + EXPECT_GT(frame2->behavior_events->items[0].event_id, 0); +} + +} // namespace +} // namespace rk3588