开发sprint5,完成开发,等待测试
This commit is contained in:
parent
4eeb501681
commit
f1a290998f
@ -55,6 +55,7 @@ set(SRC_FILES
|
||||
src/plugin_loader.cpp
|
||||
src/ai_scheduler.cpp
|
||||
src/utils/dma_alloc.cpp
|
||||
src/utils/config_expand.cpp
|
||||
)
|
||||
|
||||
add_executable(media-server ${SRC_FILES})
|
||||
|
||||
@ -24,6 +24,16 @@ public:
|
||||
bool Start();
|
||||
void Stop();
|
||||
|
||||
const std::string& Name() const { return name_; }
|
||||
const SimpleJson& GraphConfig() const { return graph_cfg_; }
|
||||
|
||||
// Attempt in-place update via INode::UpdateConfig for nodes whose config changed.
|
||||
// Returns true if fully updated without rebuild.
|
||||
// Returns false with empty err if rebuild is required.
|
||||
// Returns false with non-empty err if update failed.
|
||||
bool TryUpdateInPlace(const SimpleJson& new_graph_cfg, size_t default_queue_size,
|
||||
QueueDropStrategy default_strategy, std::string& err);
|
||||
|
||||
private:
|
||||
struct NodeEntry {
|
||||
std::string id;
|
||||
@ -37,6 +47,9 @@ private:
|
||||
};
|
||||
|
||||
std::string name_;
|
||||
SimpleJson graph_cfg_;
|
||||
size_t built_default_queue_size_ = 8;
|
||||
QueueDropStrategy built_default_strategy_ = QueueDropStrategy::DropOldest;
|
||||
std::vector<NodeEntry> nodes_;
|
||||
std::atomic<bool> running_{false};
|
||||
std::atomic<bool> stop_requested_{false};
|
||||
@ -55,10 +68,16 @@ public:
|
||||
void RequestStop();
|
||||
void BlockUntilStop();
|
||||
|
||||
bool ReloadFromFile(const std::string& path, std::string& err);
|
||||
|
||||
private:
|
||||
bool running_ = false;
|
||||
PluginLoader loader_;
|
||||
std::vector<std::unique_ptr<Graph>> graphs_;
|
||||
SimpleJson last_good_root_;
|
||||
size_t default_queue_size_ = 8;
|
||||
QueueDropStrategy default_strategy_ = QueueDropStrategy::DropOldest;
|
||||
std::mutex graphs_mu_;
|
||||
std::mutex mu_;
|
||||
std::condition_variable cv_;
|
||||
};
|
||||
|
||||
@ -13,8 +13,15 @@ public:
|
||||
explicit PluginLoader(std::string plugin_dir);
|
||||
~PluginLoader();
|
||||
|
||||
PluginLoader(const PluginLoader&) = delete;
|
||||
PluginLoader& operator=(const PluginLoader&) = delete;
|
||||
|
||||
std::unique_ptr<INode> Create(const std::string& type, std::string& err);
|
||||
|
||||
// Switch plugin directory. This will unload any cached plugins.
|
||||
void SetPluginDir(std::string plugin_dir);
|
||||
const std::string& PluginDir() const { return plugin_dir_; }
|
||||
|
||||
private:
|
||||
struct PluginHandle {
|
||||
void* handle = nullptr;
|
||||
|
||||
19
include/utils/config_expand.h
Normal file
19
include/utils/config_expand.h
Normal file
@ -0,0 +1,19 @@
|
||||
#pragma once
|
||||
|
||||
#include <map>
|
||||
#include <string>
|
||||
|
||||
#include "utils/simple_json.h"
|
||||
|
||||
namespace rk3588 {
|
||||
|
||||
// Expand root config supporting:
|
||||
// - templates/instances -> graphs
|
||||
// - ${placeholder} replacement from instance.params
|
||||
// The output is a normalized root containing at least {"queue", "graphs"}.
|
||||
bool ExpandRootConfig(const SimpleJson& in_root, SimpleJson& out_root, std::string& err);
|
||||
|
||||
// Deep structural equality for SimpleJson (used to decide whether a node's config changed).
|
||||
bool JsonDeepEqual(const SimpleJson& a, const SimpleJson& b);
|
||||
|
||||
} // namespace rk3588
|
||||
@ -2,6 +2,7 @@
|
||||
#include <chrono>
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
@ -132,22 +133,116 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
bool UpdateConfig(const SimpleJson& new_config) override {
|
||||
const std::string new_id = new_config.ValueOr<std::string>("id", id_);
|
||||
if (!new_id.empty() && new_id != id_) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Parse labels
|
||||
std::vector<std::string> new_labels;
|
||||
if (const SimpleJson* labels_cfg = new_config.Find("labels")) {
|
||||
for (const auto& label : labels_cfg->AsArray()) {
|
||||
new_labels.push_back(label.AsString(""));
|
||||
}
|
||||
}
|
||||
|
||||
// Build new rule engine
|
||||
RuleEngine new_engine;
|
||||
if (const SimpleJson* rules_cfg = new_config.Find("rules")) {
|
||||
if (!new_engine.Init(*rules_cfg, new_labels)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Pre-event buffer settings
|
||||
int pre_sec = 5;
|
||||
int fps_hint = 25;
|
||||
if (const SimpleJson* actions_cfg = new_config.Find("actions")) {
|
||||
if (const SimpleJson* clip_cfg = actions_cfg->Find("clip")) {
|
||||
pre_sec = clip_cfg->ValueOr<int>("pre_sec", 5);
|
||||
fps_hint = clip_cfg->ValueOr<int>("fps", 25);
|
||||
}
|
||||
}
|
||||
auto new_frame_buffer = std::make_shared<FrameRingBuffer>(pre_sec, fps_hint);
|
||||
|
||||
// Initialize new actions
|
||||
std::vector<std::unique_ptr<IAlarmAction>> new_actions;
|
||||
if (const SimpleJson* actions_cfg = new_config.Find("actions")) {
|
||||
if (const SimpleJson* log_cfg = actions_cfg->Find("log")) {
|
||||
if (log_cfg->ValueOr<bool>("enable", true)) {
|
||||
auto action = std::make_unique<LogAction>();
|
||||
if (action->Init(*log_cfg)) {
|
||||
new_actions.push_back(std::move(action));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (const SimpleJson* snap_cfg = actions_cfg->Find("snapshot")) {
|
||||
if (snap_cfg->ValueOr<bool>("enable", false)) {
|
||||
auto action = std::make_unique<SnapshotAction>();
|
||||
if (action->Init(*snap_cfg)) {
|
||||
new_actions.push_back(std::move(action));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (const SimpleJson* clip_cfg = actions_cfg->Find("clip")) {
|
||||
if (clip_cfg->ValueOr<bool>("enable", false)) {
|
||||
auto action = std::make_unique<ClipAction>();
|
||||
if (action->Init(*clip_cfg)) {
|
||||
auto* clip_ptr = static_cast<ClipAction*>(action.get());
|
||||
clip_ptr->SetFrameBuffer(new_frame_buffer);
|
||||
new_actions.push_back(std::move(action));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (const SimpleJson* http_cfg = actions_cfg->Find("http")) {
|
||||
if (http_cfg->ValueOr<bool>("enable", false)) {
|
||||
auto action = std::make_unique<HttpAction>();
|
||||
if (action->Init(*http_cfg)) {
|
||||
new_actions.push_back(std::move(action));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (new_actions.empty()) {
|
||||
auto action = std::make_unique<LogAction>();
|
||||
SimpleJson empty_cfg;
|
||||
if (action->Init(empty_cfg)) {
|
||||
new_actions.push_back(std::move(action));
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<std::unique_ptr<IAlarmAction>> old_actions;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mu_);
|
||||
labels_ = std::move(new_labels);
|
||||
rule_engine_ = std::move(new_engine);
|
||||
frame_buffer_ = std::move(new_frame_buffer);
|
||||
old_actions = std::move(actions_);
|
||||
actions_ = std::move(new_actions);
|
||||
clip_action_ = nullptr;
|
||||
for (auto& a : actions_) {
|
||||
if (auto* p = dynamic_cast<ClipAction*>(a.get())) {
|
||||
clip_action_ = p;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (auto& action : old_actions) action->Drain();
|
||||
for (auto& action : old_actions) action->Stop();
|
||||
return true;
|
||||
}
|
||||
|
||||
NodeStatus Process(FramePtr frame) override {
|
||||
if (!frame) return NodeStatus::DROP;
|
||||
|
||||
// Always push to ring buffer for pre-event recording
|
||||
frame_buffer_->Push(frame);
|
||||
std::lock_guard<std::mutex> lock(mu_);
|
||||
if (frame_buffer_) frame_buffer_->Push(frame);
|
||||
if (clip_action_) clip_action_->PushPostEventFrame(frame);
|
||||
|
||||
// Push to clip action for post-event collection if active
|
||||
if (clip_action_) {
|
||||
clip_action_->PushPostEventFrame(frame);
|
||||
}
|
||||
|
||||
// Evaluate rules
|
||||
auto result = rule_engine_.Evaluate(frame);
|
||||
if (result.matched) {
|
||||
TriggerAlarm(result, frame);
|
||||
}
|
||||
if (result.matched) TriggerAlarm(result, frame);
|
||||
|
||||
++processed_frames_;
|
||||
return NodeStatus::OK;
|
||||
@ -180,6 +275,8 @@ private:
|
||||
std::vector<std::unique_ptr<IAlarmAction>> actions_;
|
||||
ClipAction* clip_action_ = nullptr;
|
||||
|
||||
std::mutex mu_;
|
||||
|
||||
std::shared_ptr<SpscQueue<FramePtr>> input_queue_;
|
||||
uint64_t processed_frames_ = 0;
|
||||
uint64_t alarm_count_ = 0;
|
||||
|
||||
@ -660,6 +660,72 @@ public:
|
||||
return true;
|
||||
}
|
||||
|
||||
bool UpdateConfig(const SimpleJson& new_config) override {
|
||||
const std::string new_id = new_config.ValueOr<std::string>("id", id_);
|
||||
if (!new_id.empty() && new_id != id_) return false;
|
||||
|
||||
const std::string new_codec = new_config.ValueOr<std::string>("codec", codec_);
|
||||
bool new_use_mpp = new_config.ValueOr<bool>("use_mpp", use_mpp_);
|
||||
bool new_use_ffmpeg_mux = new_config.ValueOr<bool>("use_ffmpeg_mux", use_ffmpeg_mux_);
|
||||
|
||||
// outputs change requires rebuild (ports/paths might conflict)
|
||||
if (const SimpleJson* outs = new_config.Find("outputs")) {
|
||||
if (!outs->IsArray()) return false;
|
||||
std::vector<OutputConfig> new_outputs;
|
||||
for (const auto& ov : outs->AsArray()) {
|
||||
if (!ov.IsObject()) continue;
|
||||
OutputConfig cfg;
|
||||
cfg.proto = ov.ValueOr<std::string>("proto", cfg.proto);
|
||||
cfg.host = ov.ValueOr<std::string>("host", cfg.host);
|
||||
cfg.port = ov.ValueOr<int>("port", cfg.port);
|
||||
cfg.path = ov.ValueOr<std::string>("path", cfg.path);
|
||||
cfg.segment_sec = ov.ValueOr<int>("segment_sec", cfg.segment_sec);
|
||||
new_outputs.push_back(std::move(cfg));
|
||||
}
|
||||
if (new_outputs.empty()) new_outputs.push_back(OutputConfig{});
|
||||
|
||||
if (new_outputs.size() != outputs_.size()) return false;
|
||||
for (size_t i = 0; i < outputs_.size(); ++i) {
|
||||
if (new_outputs[i].proto != outputs_[i].proto || new_outputs[i].host != outputs_[i].host ||
|
||||
new_outputs[i].port != outputs_[i].port || new_outputs[i].path != outputs_[i].path ||
|
||||
new_outputs[i].segment_sec != outputs_[i].segment_sec) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int new_fps = new_config.ValueOr<int>("fps", fps_);
|
||||
int new_gop = new_config.ValueOr<int>("gop", gop_);
|
||||
int new_bitrate = new_config.ValueOr<int>("bitrate_kbps", bitrate_kbps_);
|
||||
|
||||
if (new_codec != codec_) return false;
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mu_);
|
||||
fps_ = new_fps;
|
||||
gop_ = new_gop;
|
||||
bitrate_kbps_ = new_bitrate;
|
||||
use_mpp_ = new_use_mpp;
|
||||
use_ffmpeg_mux_ = new_use_ffmpeg_mux;
|
||||
|
||||
// Force re-init on next frame to apply new params.
|
||||
#if defined(RK3588_ENABLE_FFMPEG)
|
||||
if (mux_mgr_) mux_mgr_->Close();
|
||||
mux_mgr_.reset();
|
||||
#endif
|
||||
#if defined(RK3588_ENABLE_ZLMEDIAKIT)
|
||||
for (auto& p : zlm_pubs_) p->Close();
|
||||
zlm_pubs_.clear();
|
||||
#endif
|
||||
#if defined(RK3588_ENABLE_MPP)
|
||||
if (mpp_encoder_) mpp_encoder_->Shutdown();
|
||||
encoder_ready_ = false;
|
||||
#endif
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void Stop() override {
|
||||
if (input_queue_) input_queue_->Stop();
|
||||
|
||||
@ -678,6 +744,8 @@ public:
|
||||
NodeStatus Process(FramePtr frame) override {
|
||||
if (!frame) return NodeStatus::DROP;
|
||||
|
||||
std::lock_guard<std::mutex> lock(mu_);
|
||||
|
||||
#if defined(RK3588_ENABLE_MPP)
|
||||
if (use_mpp_) {
|
||||
ProcessMpp(frame);
|
||||
@ -1148,6 +1216,8 @@ private:
|
||||
std::shared_ptr<SpscQueue<FramePtr>> input_queue_;
|
||||
uint64_t encoded_frames_ = 0;
|
||||
|
||||
std::mutex mu_;
|
||||
|
||||
#if defined(RK3588_ENABLE_MPP)
|
||||
std::unique_ptr<MppVencEncoder> mpp_encoder_;
|
||||
bool encoder_ready_ = false;
|
||||
|
||||
@ -9,6 +9,7 @@
|
||||
#include <memory>
|
||||
#include <sstream>
|
||||
#include <thread>
|
||||
#include <mutex>
|
||||
|
||||
#include "node.h"
|
||||
|
||||
@ -294,12 +295,53 @@ public:
|
||||
}
|
||||
|
||||
void Drain() override {
|
||||
std::lock_guard<std::mutex> lock(mu_);
|
||||
CloseCurrentFile();
|
||||
}
|
||||
|
||||
bool UpdateConfig(const SimpleJson& new_config) override {
|
||||
const std::string new_id = new_config.ValueOr<std::string>("id", id_);
|
||||
if (!new_id.empty() && new_id != id_) return false;
|
||||
|
||||
// We only support a safe subset of updates; unsupported changes trigger graph rebuild.
|
||||
const std::string new_mode = new_config.ValueOr<std::string>("mode", mode_);
|
||||
const std::string new_format = new_config.ValueOr<std::string>("format", format_);
|
||||
const std::string new_codec = new_config.ValueOr<std::string>("codec", codec_);
|
||||
if (new_mode != mode_ || new_format != format_ || new_codec != codec_) {
|
||||
return false;
|
||||
}
|
||||
|
||||
int new_segment = new_config.ValueOr<int>("segment_sec", segment_sec_);
|
||||
std::string new_path = new_config.ValueOr<std::string>("path", base_path_);
|
||||
std::string new_pattern = new_config.ValueOr<std::string>("filename_pattern", filename_pattern_);
|
||||
int new_fps = new_config.ValueOr<int>("fps", fps_);
|
||||
int new_bitrate = new_config.ValueOr<int>("bitrate_kbps", bitrate_kbps_);
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mu_);
|
||||
segment_sec_ = new_segment;
|
||||
base_path_ = std::move(new_path);
|
||||
filename_pattern_ = std::move(new_pattern);
|
||||
fps_ = new_fps;
|
||||
bitrate_kbps_ = new_bitrate;
|
||||
|
||||
try {
|
||||
std::filesystem::create_directories(base_path_);
|
||||
} catch (...) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Apply on next segment.
|
||||
CloseCurrentFile();
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
NodeStatus Process(FramePtr frame) override {
|
||||
if (!frame) return NodeStatus::DROP;
|
||||
|
||||
std::lock_guard<std::mutex> lock(mu_);
|
||||
ProcessFrame(frame);
|
||||
++total_frames_;
|
||||
return NodeStatus::OK;
|
||||
@ -468,6 +510,8 @@ private:
|
||||
int fps_ = 25;
|
||||
int bitrate_kbps_ = 2000;
|
||||
|
||||
std::mutex mu_;
|
||||
|
||||
std::shared_ptr<SpscQueue<FramePtr>> input_queue_;
|
||||
uint64_t total_frames_ = 0;
|
||||
|
||||
|
||||
@ -4,9 +4,12 @@
|
||||
#include <iostream>
|
||||
#include <map>
|
||||
#include <set>
|
||||
#include <deque>
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
|
||||
#include "utils/config_expand.h"
|
||||
|
||||
namespace rk3588 {
|
||||
|
||||
namespace {
|
||||
@ -34,6 +37,68 @@ void ApplyQueueConfig(const SimpleJson* queue_cfg, size_t default_queue_size,
|
||||
}
|
||||
}
|
||||
|
||||
struct ParsedEdge {
|
||||
std::string from;
|
||||
std::string to;
|
||||
const SimpleJson* queue_cfg = nullptr; // can be null
|
||||
SimpleJson queue_value; // normalized copy (null if none)
|
||||
};
|
||||
|
||||
bool ParseEdges(const SimpleJson& edges_json, std::vector<ParsedEdge>& out, std::string& err) {
|
||||
out.clear();
|
||||
if (!edges_json.IsArray()) {
|
||||
err = "edges must be array";
|
||||
return false;
|
||||
}
|
||||
for (const auto& edge_val : edges_json.AsArray()) {
|
||||
ParsedEdge e;
|
||||
if (edge_val.IsArray()) {
|
||||
const auto& edge_arr = edge_val.AsArray();
|
||||
if (edge_arr.size() < 2) {
|
||||
err = "Edge array must be [from, to] or [from, to, {...}]";
|
||||
return false;
|
||||
}
|
||||
e.from = edge_arr[0].AsString("");
|
||||
e.to = edge_arr[1].AsString("");
|
||||
if (edge_arr.size() >= 3 && edge_arr[2].IsObject()) {
|
||||
const SimpleJson* q = edge_arr[2].Find("queue");
|
||||
e.queue_cfg = q ? q : &edge_arr[2];
|
||||
}
|
||||
} else if (edge_val.IsObject()) {
|
||||
e.from = edge_val.ValueOr<std::string>("from", "");
|
||||
e.to = edge_val.ValueOr<std::string>("to", "");
|
||||
e.queue_cfg = edge_val.Find("queue");
|
||||
} else {
|
||||
err = "Edge must be an array or object";
|
||||
return false;
|
||||
}
|
||||
if (e.from.empty() || e.to.empty()) {
|
||||
err = "Edge has empty endpoint";
|
||||
return false;
|
||||
}
|
||||
if (e.queue_cfg) e.queue_value = *e.queue_cfg;
|
||||
out.push_back(std::move(e));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
struct EdgeKey {
|
||||
std::string from;
|
||||
std::string to;
|
||||
bool operator<(const EdgeKey& o) const {
|
||||
if (from != o.from) return from < o.from;
|
||||
return to < o.to;
|
||||
}
|
||||
};
|
||||
|
||||
bool EffectiveQueueForEdge(const ParsedEdge& e, const SimpleJson* from_node_queue_cfg,
|
||||
size_t default_queue_size, QueueDropStrategy default_strategy,
|
||||
size_t& out_size, QueueDropStrategy& out_strategy) {
|
||||
const SimpleJson* chosen = e.queue_cfg ? e.queue_cfg : from_node_queue_cfg;
|
||||
ApplyQueueConfig(chosen, default_queue_size, default_strategy, out_size, out_strategy);
|
||||
return true;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
Graph::Graph(std::string name) : name_(std::move(name)) {}
|
||||
@ -42,6 +107,10 @@ Graph::~Graph() { Stop(); }
|
||||
|
||||
bool Graph::Build(const SimpleJson& graph_cfg, PluginLoader& loader, size_t default_queue_size,
|
||||
QueueDropStrategy default_strategy, std::string& err) {
|
||||
graph_cfg_ = graph_cfg;
|
||||
built_default_queue_size_ = default_queue_size;
|
||||
built_default_strategy_ = default_strategy;
|
||||
|
||||
const auto& obj = graph_cfg.AsObject();
|
||||
auto name_it = obj.find("name");
|
||||
if (name_it != obj.end() && name_it->second.IsString()) {
|
||||
@ -88,86 +157,106 @@ bool Graph::Build(const SimpleJson& graph_cfg, PluginLoader& loader, size_t defa
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto& edge_val : edges_it->second.AsArray()) {
|
||||
std::string from;
|
||||
std::string to;
|
||||
const SimpleJson* queue_cfg = nullptr;
|
||||
std::vector<ParsedEdge> parsed_edges;
|
||||
if (!ParseEdges(edges_it->second, parsed_edges, err)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Supported forms:
|
||||
// 1) ["from", "to"]
|
||||
// 2) ["from", "to", {"queue": {...}}]
|
||||
// 3) {"from": "a", "to": "b", "queue": {...}}
|
||||
if (edge_val.IsArray()) {
|
||||
const auto& edge_arr = edge_val.AsArray();
|
||||
if (edge_arr.size() < 2) {
|
||||
err = "Edge array must be [from, to] or [from, to, {...}]";
|
||||
return false;
|
||||
}
|
||||
from = edge_arr[0].AsString("");
|
||||
to = edge_arr[1].AsString("");
|
||||
if (edge_arr.size() >= 3 && edge_arr[2].IsObject()) {
|
||||
// Third element can be {queue:{...}} or the queue object itself.
|
||||
queue_cfg = edge_arr[2].Find("queue");
|
||||
if (!queue_cfg) queue_cfg = &edge_arr[2];
|
||||
}
|
||||
} else if (edge_val.IsObject()) {
|
||||
from = edge_val.ValueOr<std::string>("from", "");
|
||||
to = edge_val.ValueOr<std::string>("to", "");
|
||||
queue_cfg = edge_val.Find("queue");
|
||||
} else {
|
||||
err = "Edge must be an array or object";
|
||||
return false;
|
||||
}
|
||||
// Track enabled-edge connectivity for validation.
|
||||
std::vector<std::pair<std::string, std::string>> enabled_edges;
|
||||
|
||||
if (from.empty() || to.empty()) {
|
||||
err = "Edge has empty endpoint";
|
||||
return false;
|
||||
}
|
||||
|
||||
auto from_it = id_to_node.find(from);
|
||||
auto to_it = id_to_node.find(to);
|
||||
for (const auto& e : parsed_edges) {
|
||||
auto from_it = id_to_node.find(e.from);
|
||||
auto to_it = id_to_node.find(e.to);
|
||||
|
||||
if (from_it == id_to_node.end() || to_it == id_to_node.end()) {
|
||||
// Check if nodes exist but are disabled
|
||||
bool from_exists = false;
|
||||
bool to_exists = false;
|
||||
for(const auto& n : nodes_) {
|
||||
if(n.id == from) from_exists = true;
|
||||
if(n.id == to) to_exists = true;
|
||||
if(n.id == e.from) from_exists = true;
|
||||
if(n.id == e.to) to_exists = true;
|
||||
}
|
||||
|
||||
if (!from_exists || !to_exists) {
|
||||
err = "Edge references unknown node: " + from + " -> " + to;
|
||||
err = "Edge references unknown node: " + e.from + " -> " + e.to;
|
||||
return false;
|
||||
}
|
||||
// At least one is disabled, skip edge
|
||||
continue;
|
||||
}
|
||||
|
||||
const SimpleJson* from_node_queue = from_it->second->config.Find("queue");
|
||||
size_t qsize = default_queue_size;
|
||||
QueueDropStrategy strategy = default_strategy;
|
||||
ApplyQueueConfig(queue_cfg, default_queue_size, default_strategy, qsize, strategy);
|
||||
EffectiveQueueForEdge(e, from_node_queue, default_queue_size, default_strategy, qsize, strategy);
|
||||
auto queue = std::make_shared<SpscQueue<FramePtr>>(qsize, strategy);
|
||||
from_it->second->context.output_queues.push_back(queue);
|
||||
|
||||
if (!to_it->second->context.input_queue) {
|
||||
to_it->second->context.input_queue = queue;
|
||||
} else {
|
||||
std::cerr << "[Graph] Warning: Node " << to << " already has input. Ignoring edge from " << from << "\n";
|
||||
err = "Node " + e.to + " has multiple inputs; only 1 is supported";
|
||||
return false;
|
||||
}
|
||||
|
||||
enabled_edges.emplace_back(e.from, e.to);
|
||||
}
|
||||
|
||||
// Role validation
|
||||
for (auto& entry : nodes_) {
|
||||
if (!entry.enabled) continue;
|
||||
|
||||
if (entry.role == "source" && entry.context.input_queue) {
|
||||
err = "Source node " + entry.id + " cannot have input";
|
||||
return false;
|
||||
}
|
||||
if (entry.role == "sink" && !entry.context.output_queues.empty()) {
|
||||
err = "Sink node " + entry.id + " cannot have output";
|
||||
return false;
|
||||
}
|
||||
if ((entry.role == "filter" || entry.role == "sink") && !entry.context.input_queue) {
|
||||
err = "Node " + entry.id + " role=" + entry.role + " must have input";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Role validation & Instantiation
|
||||
for (auto& entry : nodes_) {
|
||||
if (!entry.enabled) continue;
|
||||
|
||||
if (entry.role == "source") {
|
||||
if (entry.context.input_queue) {
|
||||
err = "Source node " + entry.id + " cannot have input";
|
||||
return false;
|
||||
// Cycle detection on enabled nodes.
|
||||
{
|
||||
std::map<std::string, int> indeg;
|
||||
std::map<std::string, std::vector<std::string>> adj;
|
||||
for (const auto& n : nodes_) {
|
||||
if (!n.enabled) continue;
|
||||
indeg[n.id] = 0;
|
||||
}
|
||||
for (const auto& pr : enabled_edges) {
|
||||
adj[pr.first].push_back(pr.second);
|
||||
indeg[pr.second] += 1;
|
||||
}
|
||||
std::deque<std::string> q;
|
||||
for (const auto& kv : indeg) {
|
||||
if (kv.second == 0) q.push_back(kv.first);
|
||||
}
|
||||
size_t visited = 0;
|
||||
while (!q.empty()) {
|
||||
auto cur = q.front();
|
||||
q.pop_front();
|
||||
++visited;
|
||||
for (const auto& nxt : adj[cur]) {
|
||||
auto it = indeg.find(nxt);
|
||||
if (it == indeg.end()) continue;
|
||||
if (--it->second == 0) q.push_back(nxt);
|
||||
}
|
||||
}
|
||||
if (visited != indeg.size()) {
|
||||
err = "Graph contains a cycle (DAG required)";
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
// Instantiation
|
||||
for (auto& entry : nodes_) {
|
||||
if (!entry.enabled) continue;
|
||||
std::string load_err;
|
||||
entry.node = loader.Create(entry.type, load_err);
|
||||
if (!entry.node) {
|
||||
@ -183,6 +272,130 @@ bool Graph::Build(const SimpleJson& graph_cfg, PluginLoader& loader, size_t defa
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Graph::TryUpdateInPlace(const SimpleJson& new_graph_cfg, size_t default_queue_size,
|
||||
QueueDropStrategy default_strategy, std::string& err) {
|
||||
err.clear();
|
||||
if (!new_graph_cfg.IsObject()) {
|
||||
err = "new graph config is not object";
|
||||
return false;
|
||||
}
|
||||
|
||||
const SimpleJson* new_nodes = new_graph_cfg.Find("nodes");
|
||||
const SimpleJson* new_edges = new_graph_cfg.Find("edges");
|
||||
const SimpleJson* old_nodes = graph_cfg_.Find("nodes");
|
||||
const SimpleJson* old_edges = graph_cfg_.Find("edges");
|
||||
if (!new_nodes || !new_edges || !old_nodes || !old_edges) {
|
||||
err = "graph missing nodes/edges";
|
||||
return false;
|
||||
}
|
||||
|
||||
// If topology changed, require rebuild.
|
||||
std::vector<ParsedEdge> pe_new;
|
||||
std::vector<ParsedEdge> pe_old;
|
||||
std::string parse_err;
|
||||
if (!ParseEdges(*new_edges, pe_new, parse_err) || !ParseEdges(*old_edges, pe_old, parse_err)) {
|
||||
err = parse_err;
|
||||
return false;
|
||||
}
|
||||
|
||||
auto build_edge_map = [](const std::vector<ParsedEdge>& pes, std::map<EdgeKey, ParsedEdge>& out) {
|
||||
out.clear();
|
||||
for (const auto& e : pes) {
|
||||
out[{e.from, e.to}] = e;
|
||||
}
|
||||
};
|
||||
std::map<EdgeKey, ParsedEdge> m_new;
|
||||
std::map<EdgeKey, ParsedEdge> m_old;
|
||||
build_edge_map(pe_new, m_new);
|
||||
build_edge_map(pe_old, m_old);
|
||||
if (m_new.size() != m_old.size()) {
|
||||
return false; // rebuild
|
||||
}
|
||||
|
||||
// Build node config maps by id.
|
||||
std::map<std::string, SimpleJson> new_node_cfg;
|
||||
std::map<std::string, SimpleJson> old_node_cfg;
|
||||
std::map<std::string, bool> new_enabled;
|
||||
std::map<std::string, bool> old_enabled;
|
||||
std::map<std::string, std::string> new_type;
|
||||
std::map<std::string, std::string> old_type;
|
||||
|
||||
for (const auto& nv : new_nodes->AsArray()) {
|
||||
if (!nv.IsObject()) continue;
|
||||
std::string id = nv.ValueOr<std::string>("id", "");
|
||||
if (id.empty()) continue;
|
||||
new_node_cfg[id] = nv;
|
||||
new_enabled[id] = nv.ValueOr<bool>("enable", true);
|
||||
new_type[id] = nv.ValueOr<std::string>("type", "");
|
||||
}
|
||||
for (const auto& ov : old_nodes->AsArray()) {
|
||||
if (!ov.IsObject()) continue;
|
||||
std::string id = ov.ValueOr<std::string>("id", "");
|
||||
if (id.empty()) continue;
|
||||
old_node_cfg[id] = ov;
|
||||
old_enabled[id] = ov.ValueOr<bool>("enable", true);
|
||||
old_type[id] = ov.ValueOr<std::string>("type", "");
|
||||
}
|
||||
|
||||
if (new_node_cfg.size() != old_node_cfg.size()) {
|
||||
return false; // rebuild
|
||||
}
|
||||
for (const auto& kv : old_node_cfg) {
|
||||
auto it = new_node_cfg.find(kv.first);
|
||||
if (it == new_node_cfg.end()) return false; // rebuild
|
||||
if (new_type[kv.first] != old_type[kv.first]) return false; // rebuild
|
||||
if (new_enabled[kv.first] != old_enabled[kv.first]) return false; // rebuild
|
||||
}
|
||||
|
||||
// Compare effective queue specs.
|
||||
for (const auto& kv : m_old) {
|
||||
auto itn = m_new.find(kv.first);
|
||||
if (itn == m_new.end()) return false;
|
||||
|
||||
const auto& eo = kv.second;
|
||||
const auto& en = itn->second;
|
||||
|
||||
const SimpleJson* from_old_queue = nullptr;
|
||||
const SimpleJson* from_new_queue = nullptr;
|
||||
if (auto fn = old_node_cfg.find(eo.from); fn != old_node_cfg.end()) {
|
||||
from_old_queue = fn->second.Find("queue");
|
||||
}
|
||||
if (auto fn = new_node_cfg.find(en.from); fn != new_node_cfg.end()) {
|
||||
from_new_queue = fn->second.Find("queue");
|
||||
}
|
||||
|
||||
size_t osz = 0, nsz = 0;
|
||||
QueueDropStrategy ostrat = QueueDropStrategy::DropOldest;
|
||||
QueueDropStrategy nstrat = QueueDropStrategy::DropOldest;
|
||||
EffectiveQueueForEdge(eo, from_old_queue, built_default_queue_size_, built_default_strategy_, osz, ostrat);
|
||||
EffectiveQueueForEdge(en, from_new_queue, default_queue_size, default_strategy, nsz, nstrat);
|
||||
if (osz != nsz || ostrat != nstrat) {
|
||||
return false; // rebuild
|
||||
}
|
||||
}
|
||||
|
||||
// In-place update: only apply for nodes whose full config changed.
|
||||
for (auto& entry : nodes_) {
|
||||
if (!entry.enabled || !entry.node) continue;
|
||||
auto it_new = new_node_cfg.find(entry.id);
|
||||
auto it_old = old_node_cfg.find(entry.id);
|
||||
if (it_new == new_node_cfg.end() || it_old == old_node_cfg.end()) return false;
|
||||
|
||||
if (!JsonDeepEqual(it_new->second, it_old->second)) {
|
||||
if (!entry.node->UpdateConfig(it_new->second)) {
|
||||
// If any node cannot update in place, request rebuild.
|
||||
return false;
|
||||
}
|
||||
entry.config = it_new->second;
|
||||
}
|
||||
}
|
||||
|
||||
graph_cfg_ = new_graph_cfg;
|
||||
built_default_queue_size_ = default_queue_size;
|
||||
built_default_strategy_ = default_strategy;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Graph::Start() {
|
||||
bool expected = false;
|
||||
if (!running_.compare_exchange_strong(expected, true)) {
|
||||
@ -285,15 +498,20 @@ bool GraphManager::LoadConfigFile(const std::string& path, SimpleJson& out, std:
|
||||
}
|
||||
|
||||
bool GraphManager::Build(const SimpleJson& root_cfg, std::string& err) {
|
||||
auto graphs_it = root_cfg.AsObject().find("graphs");
|
||||
if (graphs_it == root_cfg.AsObject().end() || !graphs_it->second.IsArray()) {
|
||||
SimpleJson expanded;
|
||||
if (!ExpandRootConfig(root_cfg, expanded, err)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto graphs_it = expanded.AsObject().find("graphs");
|
||||
if (graphs_it == expanded.AsObject().end() || !graphs_it->second.IsArray()) {
|
||||
err = "Root config missing 'graphs' array";
|
||||
return false;
|
||||
}
|
||||
|
||||
size_t default_queue_size = 8;
|
||||
QueueDropStrategy default_strategy = QueueDropStrategy::DropOldest;
|
||||
if (const auto* queue_cfg = root_cfg.Find("queue")) {
|
||||
if (const auto* queue_cfg = expanded.Find("queue")) {
|
||||
if (queue_cfg->IsObject()) {
|
||||
default_queue_size = static_cast<size_t>(queue_cfg->ValueOr<int>("size", 8));
|
||||
std::string strategy = queue_cfg->ValueOr<std::string>("strategy", "drop_oldest");
|
||||
@ -301,6 +519,17 @@ bool GraphManager::Build(const SimpleJson& root_cfg, std::string& err) {
|
||||
}
|
||||
}
|
||||
|
||||
// Apply plugin_path only on initial Build (safe: no nodes exist yet).
|
||||
if (const SimpleJson* g = expanded.Find("global")) {
|
||||
const std::string plugin_path = g->ValueOr<std::string>("plugin_path", "");
|
||||
if (!plugin_path.empty()) {
|
||||
loader_.SetPluginDir(plugin_path);
|
||||
}
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lock(graphs_mu_);
|
||||
graphs_.clear();
|
||||
|
||||
for (const auto& graph_val : graphs_it->second.AsArray()) {
|
||||
if (!graph_val.IsObject()) {
|
||||
err = "Graph entry is not object";
|
||||
@ -313,10 +542,15 @@ bool GraphManager::Build(const SimpleJson& root_cfg, std::string& err) {
|
||||
}
|
||||
graphs_.push_back(std::move(graph));
|
||||
}
|
||||
|
||||
last_good_root_ = expanded;
|
||||
default_queue_size_ = default_queue_size;
|
||||
default_strategy_ = default_strategy;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool GraphManager::StartAll() {
|
||||
std::lock_guard<std::mutex> lock(graphs_mu_);
|
||||
for (auto& g : graphs_) {
|
||||
if (!g->Start()) {
|
||||
return false;
|
||||
@ -335,8 +569,11 @@ void GraphManager::StopAll() {
|
||||
if (!running_) return;
|
||||
running_ = false;
|
||||
}
|
||||
for (auto& g : graphs_) {
|
||||
g->Stop();
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(graphs_mu_);
|
||||
for (auto& g : graphs_) {
|
||||
g->Stop();
|
||||
}
|
||||
}
|
||||
cv_.notify_all();
|
||||
}
|
||||
@ -350,4 +587,142 @@ void GraphManager::BlockUntilStop() {
|
||||
cv_.wait(lock, [&] { return !running_; });
|
||||
}
|
||||
|
||||
bool GraphManager::ReloadFromFile(const std::string& path, std::string& err) {
|
||||
SimpleJson root_cfg;
|
||||
if (!LoadConfigFile(path, root_cfg, err)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SimpleJson expanded;
|
||||
if (!ExpandRootConfig(root_cfg, expanded, err)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto graphs_it = expanded.AsObject().find("graphs");
|
||||
if (graphs_it == expanded.AsObject().end() || !graphs_it->second.IsArray()) {
|
||||
err = "Root config missing 'graphs' array";
|
||||
return false;
|
||||
}
|
||||
|
||||
size_t new_default_queue_size = 8;
|
||||
QueueDropStrategy new_default_strategy = QueueDropStrategy::DropOldest;
|
||||
if (const auto* queue_cfg = expanded.Find("queue")) {
|
||||
if (queue_cfg->IsObject()) {
|
||||
new_default_queue_size = static_cast<size_t>(queue_cfg->ValueOr<int>("size", 8));
|
||||
std::string strategy = queue_cfg->ValueOr<std::string>("strategy", "drop_oldest");
|
||||
new_default_strategy = ParseDropStrategy(strategy, new_default_strategy);
|
||||
}
|
||||
}
|
||||
|
||||
const std::string new_plugin_path = [&]() -> std::string {
|
||||
if (const SimpleJson* g = expanded.Find("global")) {
|
||||
return g->ValueOr<std::string>("plugin_path", "");
|
||||
}
|
||||
return "";
|
||||
}();
|
||||
|
||||
std::lock_guard<std::mutex> lock(graphs_mu_);
|
||||
|
||||
// If plugin_dir changes, do full stop+rebuild to avoid unloading libs while nodes exist.
|
||||
if (!new_plugin_path.empty() && new_plugin_path != loader_.PluginDir()) {
|
||||
for (auto& g : graphs_) g->Stop();
|
||||
graphs_.clear();
|
||||
loader_.SetPluginDir(new_plugin_path);
|
||||
|
||||
for (const auto& graph_val : graphs_it->second.AsArray()) {
|
||||
std::string name = graph_val.ValueOr<std::string>("name", "noname");
|
||||
auto graph = std::make_unique<Graph>(name);
|
||||
if (!graph->Build(graph_val, loader_, new_default_queue_size, new_default_strategy, err)) {
|
||||
return false;
|
||||
}
|
||||
graphs_.push_back(std::move(graph));
|
||||
}
|
||||
for (auto& g : graphs_) {
|
||||
if (!g->Start()) {
|
||||
err = "Failed to start graph after full rebuild";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
last_good_root_ = expanded;
|
||||
default_queue_size_ = new_default_queue_size;
|
||||
default_strategy_ = new_default_strategy;
|
||||
return true;
|
||||
}
|
||||
|
||||
// Index existing graphs by name.
|
||||
std::map<std::string, size_t> old_index;
|
||||
for (size_t i = 0; i < graphs_.size(); ++i) {
|
||||
old_index[graphs_[i]->Name()] = i;
|
||||
}
|
||||
|
||||
// Track graphs referenced by new config.
|
||||
std::set<std::string> seen;
|
||||
|
||||
// Update or rebuild existing graphs.
|
||||
for (const auto& graph_val : graphs_it->second.AsArray()) {
|
||||
if (!graph_val.IsObject()) {
|
||||
err = "Graph entry is not object";
|
||||
return false;
|
||||
}
|
||||
std::string name = graph_val.ValueOr<std::string>("name", "noname");
|
||||
seen.insert(name);
|
||||
|
||||
auto it = old_index.find(name);
|
||||
if (it == old_index.end()) {
|
||||
// New graph: build+start.
|
||||
auto graph = std::make_unique<Graph>(name);
|
||||
if (!graph->Build(graph_val, loader_, new_default_queue_size, new_default_strategy, err)) {
|
||||
return false;
|
||||
}
|
||||
if (!graph->Start()) {
|
||||
err = "Failed to start new graph: " + name;
|
||||
return false;
|
||||
}
|
||||
graphs_.push_back(std::move(graph));
|
||||
continue;
|
||||
}
|
||||
|
||||
auto& g = graphs_[it->second];
|
||||
std::string upd_err;
|
||||
if (g->TryUpdateInPlace(graph_val, new_default_queue_size, new_default_strategy, upd_err)) {
|
||||
continue;
|
||||
}
|
||||
if (!upd_err.empty()) {
|
||||
err = "UpdateConfig failed for graph " + name + ": " + upd_err;
|
||||
return false;
|
||||
}
|
||||
|
||||
// Need rebuild.
|
||||
g->Stop();
|
||||
auto new_g = std::make_unique<Graph>(name);
|
||||
if (!new_g->Build(graph_val, loader_, new_default_queue_size, new_default_strategy, err)) {
|
||||
// Rollback: keep old stopped graph is not acceptable; attempt restart old.
|
||||
(void)g->Start();
|
||||
return false;
|
||||
}
|
||||
if (!new_g->Start()) {
|
||||
err = "Failed to start rebuilt graph: " + name;
|
||||
(void)g->Start();
|
||||
return false;
|
||||
}
|
||||
g = std::move(new_g);
|
||||
}
|
||||
|
||||
// Stop and remove graphs not present anymore.
|
||||
for (auto itg = graphs_.begin(); itg != graphs_.end();) {
|
||||
if (!seen.count((*itg)->Name())) {
|
||||
(*itg)->Stop();
|
||||
itg = graphs_.erase(itg);
|
||||
} else {
|
||||
++itg;
|
||||
}
|
||||
}
|
||||
|
||||
last_good_root_ = expanded;
|
||||
default_queue_size_ = new_default_queue_size;
|
||||
default_strategy_ = new_default_strategy;
|
||||
return true;
|
||||
}
|
||||
|
||||
} // namespace rk3588
|
||||
|
||||
@ -4,10 +4,21 @@
|
||||
#include <signal.h>
|
||||
#include <string>
|
||||
#include <cstdlib>
|
||||
#include <atomic>
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
#include <vector>
|
||||
|
||||
#if __has_include(<filesystem>)
|
||||
#include <filesystem>
|
||||
namespace fs = std::filesystem;
|
||||
#endif
|
||||
|
||||
#if defined(__linux__)
|
||||
#include <limits.h>
|
||||
#include <unistd.h>
|
||||
#include <sys/inotify.h>
|
||||
#include <fcntl.h>
|
||||
#endif
|
||||
|
||||
#include "graph_manager.h"
|
||||
@ -39,11 +50,6 @@ std::string ResolvePluginDir() {
|
||||
return std::string("plugins");
|
||||
}
|
||||
|
||||
GraphManager& GetManager() {
|
||||
static GraphManager mgr(ResolvePluginDir());
|
||||
return mgr;
|
||||
}
|
||||
|
||||
void HandleSignal(int) {
|
||||
if (g_manager) {
|
||||
std::cerr << "\n[MediaServerApp] Caught signal, stopping...\n";
|
||||
@ -51,10 +57,121 @@ void HandleSignal(int) {
|
||||
}
|
||||
}
|
||||
|
||||
bool HotReloadEnabled(const SimpleJson& root_cfg) {
|
||||
if (const SimpleJson* g = root_cfg.Find("global")) {
|
||||
if (const SimpleJson* hr = g->Find("hot_reload")) {
|
||||
return hr->ValueOr<bool>("enable", false);
|
||||
}
|
||||
// Backward compatible: allow global.hot_reload_enable
|
||||
if (g->Find("hot_reload_enable")) {
|
||||
return g->ValueOr<bool>("hot_reload_enable", false);
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
#if defined(__linux__)
|
||||
static void SplitDirFile(const std::string& path, std::string& out_dir, std::string& out_file) {
|
||||
auto pos = path.find_last_of('/');
|
||||
if (pos == std::string::npos) {
|
||||
out_dir = ".";
|
||||
out_file = path;
|
||||
return;
|
||||
}
|
||||
out_dir = path.substr(0, pos);
|
||||
out_file = path.substr(pos + 1);
|
||||
if (out_dir.empty()) out_dir = "/";
|
||||
}
|
||||
|
||||
void InotifyWatchLoop(const std::string& path, std::atomic<bool>& stop_flag, GraphManager& mgr) {
|
||||
std::string dir;
|
||||
std::string file;
|
||||
SplitDirFile(path, dir, file);
|
||||
|
||||
int fd = inotify_init1(IN_NONBLOCK);
|
||||
if (fd < 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Watch directory to catch atomic replace (write temp + rename) patterns.
|
||||
int wd = inotify_add_watch(fd, dir.c_str(), IN_CLOSE_WRITE | IN_MOVED_TO | IN_CREATE | IN_MODIFY);
|
||||
if (wd < 0) {
|
||||
close(fd);
|
||||
return;
|
||||
}
|
||||
|
||||
std::vector<char> buf(4096);
|
||||
while (!stop_flag.load()) {
|
||||
int n = read(fd, buf.data(), static_cast<unsigned int>(buf.size()));
|
||||
if (n <= 0) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
||||
continue;
|
||||
}
|
||||
|
||||
bool matched = false;
|
||||
for (char* p = buf.data(); p < buf.data() + n;) {
|
||||
auto* ev = reinterpret_cast<inotify_event*>(p);
|
||||
if (ev->len > 0 && ev->name[0] != '\0') {
|
||||
if (file == ev->name) {
|
||||
matched = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
p += sizeof(inotify_event) + ev->len;
|
||||
}
|
||||
|
||||
if (matched) {
|
||||
std::string err;
|
||||
if (!mgr.ReloadFromFile(path, err)) {
|
||||
std::cerr << "[MediaServerApp] hot reload failed: " << err << "\n";
|
||||
} else {
|
||||
std::cout << "[MediaServerApp] hot reload succeeded\n";
|
||||
}
|
||||
// Debounce burst events.
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
}
|
||||
}
|
||||
|
||||
inotify_rm_watch(fd, wd);
|
||||
close(fd);
|
||||
}
|
||||
#endif
|
||||
|
||||
void PollWatchLoop(const std::string& path, std::atomic<bool>& stop_flag, GraphManager& mgr) {
|
||||
#if __has_include(<filesystem>)
|
||||
std::error_code ec;
|
||||
auto last = fs::exists(path, ec) ? fs::last_write_time(path, ec) : fs::file_time_type{};
|
||||
#endif
|
||||
|
||||
while (!stop_flag.load()) {
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
if (stop_flag.load()) break;
|
||||
|
||||
#if __has_include(<filesystem>)
|
||||
std::error_code e2;
|
||||
if (!fs::exists(path, e2)) continue;
|
||||
auto cur = fs::last_write_time(path, e2);
|
||||
if (e2) continue;
|
||||
if (cur == last) continue;
|
||||
last = cur;
|
||||
|
||||
std::string err;
|
||||
if (!mgr.ReloadFromFile(path, err)) {
|
||||
std::cerr << "[MediaServerApp] hot reload failed: " << err << "\n";
|
||||
} else {
|
||||
std::cout << "[MediaServerApp] hot reload succeeded\n";
|
||||
}
|
||||
#else
|
||||
(void)path;
|
||||
(void)mgr;
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
MediaServerApp::MediaServerApp(std::string config_path)
|
||||
: config_path_(std::move(config_path)) {}
|
||||
: config_path_(std::move(config_path)), graph_manager_(ResolvePluginDir()) {}
|
||||
|
||||
int MediaServerApp::Start() {
|
||||
SimpleJson root_cfg;
|
||||
@ -64,24 +181,41 @@ int MediaServerApp::Start() {
|
||||
return 1;
|
||||
}
|
||||
|
||||
auto& manager = GetManager();
|
||||
|
||||
if (!manager.Build(root_cfg, err)) {
|
||||
if (!graph_manager_.Build(root_cfg, err)) {
|
||||
std::cerr << "[MediaServerApp] Failed to build graphs: " << err << "\n";
|
||||
return 1;
|
||||
}
|
||||
|
||||
g_manager = &manager;
|
||||
g_manager = &graph_manager_;
|
||||
signal(SIGINT, HandleSignal);
|
||||
signal(SIGTERM, HandleSignal);
|
||||
|
||||
if (!manager.StartAll()) {
|
||||
if (!graph_manager_.StartAll()) {
|
||||
std::cerr << "[MediaServerApp] Failed to start graphs\n";
|
||||
return 1;
|
||||
}
|
||||
|
||||
std::cout << "[MediaServerApp] Running. Press Ctrl+C to stop.\n";
|
||||
manager.BlockUntilStop();
|
||||
|
||||
std::atomic<bool> stop_watch{false};
|
||||
std::thread watcher;
|
||||
if (HotReloadEnabled(root_cfg)) {
|
||||
#if defined(__linux__)
|
||||
watcher = std::thread([&] {
|
||||
// Prefer inotify; if it fails, fall back to polling.
|
||||
InotifyWatchLoop(config_path_, stop_watch, graph_manager_);
|
||||
PollWatchLoop(config_path_, stop_watch, graph_manager_);
|
||||
});
|
||||
#else
|
||||
watcher = std::thread([&] { PollWatchLoop(config_path_, stop_watch, graph_manager_); });
|
||||
#endif
|
||||
std::cout << "[MediaServerApp] Hot reload enabled\n";
|
||||
}
|
||||
|
||||
graph_manager_.BlockUntilStop();
|
||||
|
||||
stop_watch.store(true);
|
||||
if (watcher.joinable()) watcher.join();
|
||||
std::cout << "[MediaServerApp] Shutdown complete.\n";
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -68,6 +68,15 @@ PluginLoader::~PluginLoader() {
|
||||
}
|
||||
}
|
||||
|
||||
void PluginLoader::SetPluginDir(std::string plugin_dir) {
|
||||
if (plugin_dir == plugin_dir_) return;
|
||||
for (auto& kv : cache_) {
|
||||
CloseLibraryHandle(kv.second.handle);
|
||||
}
|
||||
cache_.clear();
|
||||
plugin_dir_ = std::move(plugin_dir);
|
||||
}
|
||||
|
||||
std::string PluginLoader::BuildLibraryPath(const std::string& type) const {
|
||||
#ifdef _WIN32
|
||||
return plugin_dir_ + "/" + type + SharedLibExtension();
|
||||
|
||||
306
src/utils/config_expand.cpp
Normal file
306
src/utils/config_expand.cpp
Normal file
@ -0,0 +1,306 @@
|
||||
#include "utils/config_expand.h"
|
||||
|
||||
#include <sstream>
|
||||
|
||||
namespace rk3588 {
|
||||
|
||||
namespace {
|
||||
|
||||
std::string ToStringForPlaceholder(const SimpleJson& v) {
|
||||
if (v.IsString()) return v.AsString("");
|
||||
if (v.IsBool()) return v.AsBool(false) ? "true" : "false";
|
||||
if (v.IsNumber()) {
|
||||
std::ostringstream oss;
|
||||
// Keep simple (no locale, no trailing zeros control).
|
||||
oss << v.AsNumber(0.0);
|
||||
return oss.str();
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
std::string ReplaceAllPlaceholders(std::string s, const std::map<std::string, std::string>& vars) {
|
||||
// Replace occurrences of ${key}.
|
||||
for (const auto& kv : vars) {
|
||||
const std::string needle = "${" + kv.first + "}";
|
||||
if (needle.size() <= 3) continue;
|
||||
size_t pos = 0;
|
||||
while ((pos = s.find(needle, pos)) != std::string::npos) {
|
||||
s.replace(pos, needle.size(), kv.second);
|
||||
pos += kv.second.size();
|
||||
}
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
SimpleJson ReplacePlaceholders(const SimpleJson& in, const std::map<std::string, std::string>& vars) {
|
||||
if (in.IsString()) {
|
||||
return SimpleJson(ReplaceAllPlaceholders(in.AsString(""), vars));
|
||||
}
|
||||
if (in.IsArray()) {
|
||||
SimpleJson::Array out;
|
||||
out.reserve(in.AsArray().size());
|
||||
for (const auto& el : in.AsArray()) {
|
||||
out.push_back(ReplacePlaceholders(el, vars));
|
||||
}
|
||||
return SimpleJson(std::move(out));
|
||||
}
|
||||
if (in.IsObject()) {
|
||||
SimpleJson::Object out;
|
||||
for (const auto& kv : in.AsObject()) {
|
||||
out.emplace(kv.first, ReplacePlaceholders(kv.second, vars));
|
||||
}
|
||||
return SimpleJson(std::move(out));
|
||||
}
|
||||
// Null/bool/number unchanged.
|
||||
return in;
|
||||
}
|
||||
|
||||
SimpleJson DeepMerge(const SimpleJson& base, const SimpleJson& override_v) {
|
||||
if (!base.IsObject() || !override_v.IsObject()) return override_v;
|
||||
|
||||
SimpleJson::Object merged = base.AsObject();
|
||||
for (const auto& kv : override_v.AsObject()) {
|
||||
auto it = merged.find(kv.first);
|
||||
if (it != merged.end() && it->second.IsObject() && kv.second.IsObject()) {
|
||||
it->second = DeepMerge(it->second, kv.second);
|
||||
} else {
|
||||
it->second = kv.second;
|
||||
}
|
||||
}
|
||||
return SimpleJson(std::move(merged));
|
||||
}
|
||||
|
||||
bool HasKey(const SimpleJson& obj, const std::string& key) {
|
||||
return obj.IsObject() && obj.AsObject().find(key) != obj.AsObject().end();
|
||||
}
|
||||
|
||||
bool ExpandInstances(const SimpleJson& in_root, SimpleJson::Array& out_graphs, std::string& err) {
|
||||
const SimpleJson* templates_cfg = in_root.Find("templates");
|
||||
const SimpleJson* instances_cfg = in_root.Find("instances");
|
||||
if (!instances_cfg) return true;
|
||||
if (!instances_cfg->IsArray()) {
|
||||
err = "Root 'instances' must be an array";
|
||||
return false;
|
||||
}
|
||||
|
||||
const auto& templates_obj = templates_cfg ? templates_cfg->AsObject() : SimpleJson::Object{};
|
||||
|
||||
for (const auto& inst_val : instances_cfg->AsArray()) {
|
||||
if (!inst_val.IsObject()) {
|
||||
err = "Instance entry is not object";
|
||||
return false;
|
||||
}
|
||||
const std::string inst_name = inst_val.ValueOr<std::string>("name", "");
|
||||
const std::string tpl_name = inst_val.ValueOr<std::string>("template", "");
|
||||
if (inst_name.empty() || tpl_name.empty()) {
|
||||
err = "Instance missing 'name' or 'template'";
|
||||
return false;
|
||||
}
|
||||
|
||||
auto tpl_it = templates_obj.find(tpl_name);
|
||||
if (tpl_it == templates_obj.end() || !tpl_it->second.IsObject()) {
|
||||
err = "Template not found or invalid: " + tpl_name;
|
||||
return false;
|
||||
}
|
||||
const SimpleJson& tpl = tpl_it->second;
|
||||
|
||||
const SimpleJson* tpl_nodes = tpl.Find("nodes");
|
||||
const SimpleJson* tpl_edges = tpl.Find("edges");
|
||||
if (!tpl_nodes || !tpl_nodes->IsArray() || !tpl_edges || !tpl_edges->IsArray()) {
|
||||
err = "Template must contain arrays: 'nodes' and 'edges'";
|
||||
return false;
|
||||
}
|
||||
|
||||
// Placeholder vars.
|
||||
std::map<std::string, std::string> vars;
|
||||
vars.emplace("name", inst_name);
|
||||
if (const SimpleJson* params = inst_val.Find("params"); params && params->IsObject()) {
|
||||
for (const auto& kv : params->AsObject()) {
|
||||
vars.emplace(kv.first, ToStringForPlaceholder(kv.second));
|
||||
}
|
||||
}
|
||||
|
||||
// Override object.
|
||||
const SimpleJson* override_nodes = nullptr;
|
||||
if (const SimpleJson* ov = inst_val.Find("override")) {
|
||||
if (!ov->IsObject()) {
|
||||
err = "Instance override must be object";
|
||||
return false;
|
||||
}
|
||||
if (const SimpleJson* nodes_ov = ov->Find("nodes")) {
|
||||
if (!nodes_ov->IsObject()) {
|
||||
err = "Instance override.nodes must be object";
|
||||
return false;
|
||||
}
|
||||
override_nodes = nodes_ov;
|
||||
}
|
||||
}
|
||||
|
||||
// Build id mapping for nodes.
|
||||
std::map<std::string, std::string> id_map;
|
||||
SimpleJson::Array inst_nodes;
|
||||
inst_nodes.reserve(tpl_nodes->AsArray().size());
|
||||
|
||||
for (const auto& node_val : tpl_nodes->AsArray()) {
|
||||
if (!node_val.IsObject()) {
|
||||
err = "Template node entry is not object";
|
||||
return false;
|
||||
}
|
||||
const std::string old_id = node_val.ValueOr<std::string>("id", "");
|
||||
if (old_id.empty()) {
|
||||
err = "Template node missing 'id'";
|
||||
return false;
|
||||
}
|
||||
const std::string new_id = inst_name + "_" + old_id;
|
||||
id_map.emplace(old_id, new_id);
|
||||
|
||||
// Apply placeholders first.
|
||||
SimpleJson expanded_node = ReplacePlaceholders(node_val, vars);
|
||||
|
||||
// Apply per-node override by template node id.
|
||||
if (override_nodes) {
|
||||
if (const SimpleJson* n_ov = override_nodes->Find(old_id)) {
|
||||
if (!n_ov->IsObject()) {
|
||||
err = "override.nodes." + old_id + " must be object";
|
||||
return false;
|
||||
}
|
||||
expanded_node = DeepMerge(expanded_node, *n_ov);
|
||||
}
|
||||
}
|
||||
|
||||
// Rewrite id.
|
||||
SimpleJson::Object obj = expanded_node.AsObject();
|
||||
obj["id"] = SimpleJson(new_id);
|
||||
inst_nodes.emplace_back(SimpleJson(std::move(obj)));
|
||||
}
|
||||
|
||||
// Rewrite edges.
|
||||
SimpleJson::Array inst_edges;
|
||||
inst_edges.reserve(tpl_edges->AsArray().size());
|
||||
for (const auto& e : tpl_edges->AsArray()) {
|
||||
if (e.IsArray()) {
|
||||
const auto& arr = e.AsArray();
|
||||
if (arr.size() < 2) {
|
||||
err = "Template edge array must be [from, to]";
|
||||
return false;
|
||||
}
|
||||
std::string from = arr[0].AsString("");
|
||||
std::string to = arr[1].AsString("");
|
||||
auto fi = id_map.find(from);
|
||||
auto ti = id_map.find(to);
|
||||
if (fi == id_map.end() || ti == id_map.end()) {
|
||||
err = "Template edge references unknown node: " + from + " -> " + to;
|
||||
return false;
|
||||
}
|
||||
|
||||
SimpleJson::Array out_arr;
|
||||
out_arr.emplace_back(SimpleJson(fi->second));
|
||||
out_arr.emplace_back(SimpleJson(ti->second));
|
||||
// Preserve optional third element (queue config) as-is.
|
||||
if (arr.size() >= 3) {
|
||||
out_arr.push_back(ReplacePlaceholders(arr[2], vars));
|
||||
}
|
||||
inst_edges.emplace_back(SimpleJson(std::move(out_arr)));
|
||||
} else if (e.IsObject()) {
|
||||
std::string from = e.ValueOr<std::string>("from", "");
|
||||
std::string to = e.ValueOr<std::string>("to", "");
|
||||
auto fi = id_map.find(from);
|
||||
auto ti = id_map.find(to);
|
||||
if (fi == id_map.end() || ti == id_map.end()) {
|
||||
err = "Template edge references unknown node: " + from + " -> " + to;
|
||||
return false;
|
||||
}
|
||||
SimpleJson obj = ReplacePlaceholders(e, vars);
|
||||
SimpleJson::Object out_obj = obj.AsObject();
|
||||
out_obj["from"] = SimpleJson(fi->second);
|
||||
out_obj["to"] = SimpleJson(ti->second);
|
||||
inst_edges.emplace_back(SimpleJson(std::move(out_obj)));
|
||||
} else {
|
||||
err = "Template edge must be array or object";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
SimpleJson::Object graph;
|
||||
graph.emplace("name", SimpleJson(inst_name));
|
||||
graph.emplace("nodes", SimpleJson(std::move(inst_nodes)));
|
||||
graph.emplace("edges", SimpleJson(std::move(inst_edges)));
|
||||
out_graphs.emplace_back(SimpleJson(std::move(graph)));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
bool ExpandRootConfig(const SimpleJson& in_root, SimpleJson& out_root, std::string& err) {
|
||||
if (!in_root.IsObject()) {
|
||||
err = "Root config must be an object";
|
||||
return false;
|
||||
}
|
||||
|
||||
SimpleJson::Object out;
|
||||
|
||||
// Preserve queue if provided.
|
||||
if (const SimpleJson* q = in_root.Find("queue")) {
|
||||
out.emplace("queue", *q);
|
||||
}
|
||||
|
||||
// Preserve global if provided (used for plugin_path/hot_reload).
|
||||
if (const SimpleJson* g = in_root.Find("global")) {
|
||||
out.emplace("global", *g);
|
||||
}
|
||||
|
||||
SimpleJson::Array graphs;
|
||||
|
||||
// Expand instances (optional).
|
||||
if (!ExpandInstances(in_root, graphs, err)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Append explicit graphs (optional).
|
||||
if (const SimpleJson* g = in_root.Find("graphs")) {
|
||||
if (!g->IsArray()) {
|
||||
err = "Root 'graphs' must be an array";
|
||||
return false;
|
||||
}
|
||||
for (const auto& gv : g->AsArray()) {
|
||||
graphs.push_back(gv);
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure graphs exists.
|
||||
out.emplace("graphs", SimpleJson(std::move(graphs)));
|
||||
out_root = SimpleJson(std::move(out));
|
||||
return true;
|
||||
}
|
||||
|
||||
bool JsonDeepEqual(const SimpleJson& a, const SimpleJson& b) {
|
||||
if (a.type() != b.type()) return false;
|
||||
if (a.IsNull()) return true;
|
||||
if (a.IsBool()) return a.AsBool(false) == b.AsBool(false);
|
||||
if (a.IsNumber()) return a.AsNumber(0.0) == b.AsNumber(0.0);
|
||||
if (a.IsString()) return a.AsString("") == b.AsString("");
|
||||
if (a.IsArray()) {
|
||||
const auto& aa = a.AsArray();
|
||||
const auto& bb = b.AsArray();
|
||||
if (aa.size() != bb.size()) return false;
|
||||
for (size_t i = 0; i < aa.size(); ++i) {
|
||||
if (!JsonDeepEqual(aa[i], bb[i])) return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
if (a.IsObject()) {
|
||||
const auto& ao = a.AsObject();
|
||||
const auto& bo = b.AsObject();
|
||||
if (ao.size() != bo.size()) return false;
|
||||
for (const auto& kv : ao) {
|
||||
auto it = bo.find(kv.first);
|
||||
if (it == bo.end()) return false;
|
||||
if (!JsonDeepEqual(kv.second, it->second)) return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
} // namespace rk3588
|
||||
Loading…
Reference in New Issue
Block a user