OrangePi3588Media/include/graph_manager.h
sladro 1737419aed
Some checks are pending
CI / host-build (push) Waiting to run
CI / rk3588-cross-build (push) Waiting to run
更新agent
2026-01-10 22:21:48 +08:00

187 lines
5.6 KiB
C++

#pragma once
#include <chrono>
#include <cstdint>
#include <map>
#include <memory>
#include <optional>
#include <string>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <thread>
#include <atomic>
#include "node.h"
#include "plugin_loader.h"
#include "utils/simple_json.h"
namespace rk3588 {
struct QueueSnapshot {
size_t size = 0;
size_t capacity = 0;
uint64_t pushed_total = 0;
uint64_t popped_total = 0;
uint64_t dropped_total = 0;
bool stopped = false;
double pushed_fps = 0.0;
double popped_fps = 0.0;
};
struct EdgeSnapshot {
std::string from;
std::string to;
QueueSnapshot queue;
};
struct NodeSnapshot {
std::string graph;
std::string id;
std::string type;
std::string role;
bool enabled = true;
QueueSnapshot input_queue;
double input_fps = 0.0;
double output_fps = 0.0;
uint64_t ok_total = 0;
uint64_t drop_total = 0;
uint64_t error_total = 0;
double avg_process_time_ms = 0.0;
SimpleJson custom_metrics;
};
struct GraphSnapshot {
std::string name;
bool running = false;
uint64_t timestamp_ms = 0;
double total_fps = 0.0;
uint64_t alarm_total = 0;
uint64_t publish_clients = 0;
std::vector<NodeSnapshot> nodes;
std::vector<EdgeSnapshot> edges;
};
class Graph {
public:
explicit Graph(std::string name);
~Graph();
bool Build(const SimpleJson& graph_cfg, PluginLoader& loader, size_t default_queue_size,
QueueDropStrategy default_strategy, std::string& err);
bool Start();
void Stop();
const std::string& Name() const { return name_; }
const SimpleJson& GraphConfig() const { return graph_cfg_; }
GraphSnapshot Snapshot() const;
bool FindNodeSnapshotById(const std::string& node_id, NodeSnapshot& out) const;
bool UpdateNodeConfig(const std::string& node_id, const SimpleJson& new_node_cfg, std::string& err);
// Attempt in-place update via INode::UpdateConfig for nodes whose config changed.
// Returns true if fully updated without rebuild.
// Returns false with empty err if rebuild is required.
// Returns false with non-empty err if update failed.
bool TryUpdateInPlace(const SimpleJson& new_graph_cfg, size_t default_queue_size,
QueueDropStrategy default_strategy, std::string& err);
private:
struct NodeMetrics {
std::atomic<uint64_t> ok_total{0};
std::atomic<uint64_t> drop_total{0};
std::atomic<uint64_t> error_total{0};
std::atomic<uint64_t> process_time_ns_total{0};
};
struct NodeEntry {
std::string id;
std::string type;
std::string role;
bool enabled = true;
SimpleJson config;
NodeContext context;
std::unique_ptr<INode> node;
std::thread worker;
std::shared_ptr<NodeMetrics> metrics;
};
struct EdgeEntry {
std::string from;
std::string to;
std::shared_ptr<SpscQueue<FramePtr>> queue;
};
std::string name_;
SimpleJson graph_cfg_;
size_t built_default_queue_size_ = 8;
QueueDropStrategy built_default_strategy_ = QueueDropStrategy::DropOldest;
std::vector<NodeEntry> nodes_;
std::vector<EdgeEntry> edges_;
std::atomic<bool> running_{false};
std::atomic<bool> stop_requested_{false};
// Mutable rate state (updated on snapshot collection).
mutable std::mutex rate_mu_;
mutable std::chrono::steady_clock::time_point last_rate_tp_{};
mutable std::map<std::string, uint64_t> last_node_in_popped_;
mutable std::map<std::string, uint64_t> last_node_out_pushed_;
mutable std::map<std::string, uint64_t> last_edge_pushed_;
mutable std::map<std::string, uint64_t> last_edge_popped_;
};
class GraphManager {
public:
explicit GraphManager(std::string plugin_dir = "plugins");
~GraphManager();
static bool LoadConfigFile(const std::string& path, SimpleJson& out, std::string& err);
bool Build(const SimpleJson& root_cfg, std::string& err);
bool BuildFromFile(const std::string& path, std::string& err);
bool StartAll();
void StopAll();
void RequestStop();
void BlockUntilStop();
bool ReloadFromFile(const std::string& path, std::string& err);
const std::string& ConfigPath() const { return config_path_; }
const std::string& LastGoodPath() const { return last_good_path_; }
bool RollbackFromLastGood(std::string& err);
bool UpdateNodeConfig(const std::string& node_id, const std::optional<std::string>& graph,
const SimpleJson& new_node_cfg, std::string& err);
std::vector<GraphSnapshot> ListGraphSnapshots();
bool GetGraphSnapshot(const std::string& name, GraphSnapshot& out, std::string& err);
// If graph is not set: auto-match when unique, otherwise return false with err describing ambiguity.
bool GetNodeSnapshot(const std::string& node_id, const std::optional<std::string>& graph,
NodeSnapshot& out, std::string& err);
private:
bool running_ = false;
PluginLoader loader_;
std::vector<std::unique_ptr<Graph>> graphs_;
// Persisted "source" root config (may contain templates/instances).
SimpleJson last_good_source_root_;
// Expanded root config used for running graphs (instances expanded into graphs).
SimpleJson last_good_expanded_root_;
std::string config_path_;
std::string last_good_path_;
size_t default_queue_size_ = 8;
QueueDropStrategy default_strategy_ = QueueDropStrategy::DropOldest;
std::mutex graphs_mu_;
std::mutex mu_;
std::condition_variable cv_;
};
} // namespace rk3588