#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include "node.h" #include "plugin_loader.h" #include "utils/result.h" #include "utils/simple_json.h" namespace rk3588 { class IInferBackend; struct QueueSnapshot { size_t size = 0; size_t capacity = 0; uint64_t pushed_total = 0; uint64_t popped_total = 0; uint64_t dropped_total = 0; bool stopped = false; double pushed_fps = 0.0; double popped_fps = 0.0; }; struct EdgeSnapshot { std::string from; std::string to; QueueSnapshot queue; }; struct NodeSnapshot { std::string graph; std::string id; std::string type; std::string role; bool enabled = true; QueueSnapshot input_queue; double input_fps = 0.0; double output_fps = 0.0; uint64_t ok_total = 0; uint64_t drop_total = 0; uint64_t error_total = 0; double avg_process_time_ms = 0.0; SimpleJson custom_metrics; }; struct GraphSnapshot { std::string name; bool running = false; uint64_t timestamp_ms = 0; double total_fps = 0.0; uint64_t alarm_total = 0; uint64_t publish_clients = 0; std::vector nodes; std::vector edges; }; class Graph { public: explicit Graph(std::string name); ~Graph(); bool Build(const SimpleJson& graph_cfg, PluginLoader& loader, size_t default_queue_size, QueueDropStrategy default_strategy, std::string& err); bool Start(); void Stop(); const std::string& Name() const { return name_; } const SimpleJson& GraphConfig() const { return graph_cfg_; } GraphSnapshot Snapshot() const; bool FindNodeSnapshotById(const std::string& node_id, NodeSnapshot& out) const; bool UpdateNodeConfig(const std::string& node_id, const SimpleJson& new_node_cfg, std::string& err); // Attempt in-place update via INode::UpdateConfig for nodes whose config changed. // Returns true if fully updated without rebuild. // Returns false with empty err if rebuild is required. // Returns false with non-empty err if update failed. bool TryUpdateInPlace(const SimpleJson& new_graph_cfg, size_t default_queue_size, QueueDropStrategy default_strategy, std::string& err); private: class Executor; struct NodeMetrics { std::atomic ok_total{0}; std::atomic drop_total{0}; std::atomic error_total{0}; std::atomic process_time_ns_total{0}; }; struct NodeEntry { NodeEntry() = default; NodeEntry(const NodeEntry&) = delete; NodeEntry& operator=(const NodeEntry&) = delete; NodeEntry(NodeEntry&& other) noexcept { *this = std::move(other); } NodeEntry& operator=(NodeEntry&& other) noexcept { if (this == &other) return *this; id = std::move(other.id); type = std::move(other.type); role = std::move(other.role); enabled = other.enabled; config = std::move(other.config); context = std::move(other.context); node = std::move(other.node); cpu_affinity = std::move(other.cpu_affinity); sched_state.store(other.sched_state.load(std::memory_order_relaxed), std::memory_order_relaxed); pool_index = other.pool_index; metrics = std::move(other.metrics); return *this; } std::string id; std::string type; std::string role; bool enabled = true; SimpleJson config; NodeContext context; NodePtr node; std::vector cpu_affinity; // Scheduling state for Executor. // Bit0: queued, Bit1: running. std::atomic sched_state{0}; uint32_t pool_index = 0; std::shared_ptr metrics; }; struct EdgeEntry { std::string from; std::string to; std::shared_ptr> queue; }; std::string name_; SimpleJson graph_cfg_; size_t built_default_queue_size_ = 8; QueueDropStrategy built_default_strategy_ = QueueDropStrategy::DropOldest; std::shared_ptr infer_backend_; std::vector nodes_; std::vector edges_; std::unique_ptr executor_; std::atomic running_{false}; std::atomic stop_requested_{false}; // Mutable rate state (updated on snapshot collection). mutable std::mutex rate_mu_; mutable std::chrono::steady_clock::time_point last_rate_tp_{}; mutable std::map last_node_in_popped_; mutable std::map last_node_out_pushed_; mutable std::map last_edge_pushed_; mutable std::map last_edge_popped_; }; class GraphManager { public: explicit GraphManager(std::string plugin_dir = "plugins"); ~GraphManager(); static bool LoadConfigFile(const std::string& path, SimpleJson& out, std::string& err); static Result LoadConfig(const std::string& path); bool Build(const SimpleJson& root_cfg, std::string& err); bool BuildFromFile(const std::string& path, std::string& err); Status BuildFromConfig(const SimpleJson& root_cfg); Status BuildFromPath(const std::string& path); bool StartAll(); void StopAll(); void RequestStop(); void BlockUntilStop(); bool ReloadFromFile(const std::string& path, std::string& err); Status Reload(const std::string& path); const std::string& ConfigPath() const { return config_path_; } bool UpdateNodeConfig(const std::string& node_id, const std::optional& graph, const SimpleJson& new_node_cfg, std::string& err); Status SetNodeConfig(const std::string& node_id, const SimpleJson& new_node_cfg, const std::optional& graph = std::nullopt); std::vector ListGraphSnapshots(); bool GetGraphSnapshot(const std::string& name, GraphSnapshot& out, std::string& err); Result GetGraph(const std::string& name); // If graph is not set: auto-match when unique, otherwise return false with err describing ambiguity. bool GetNodeSnapshot(const std::string& node_id, const std::optional& graph, NodeSnapshot& out, std::string& err); Result GetNode(const std::string& node_id, const std::optional& graph = std::nullopt); private: bool running_ = false; PluginLoader loader_; std::vector> graphs_; // Persisted "source" root config (may contain templates/instances). SimpleJson last_good_source_root_; // Expanded root config used for running graphs (instances expanded into graphs). SimpleJson last_good_expanded_root_; std::string config_path_; size_t default_queue_size_ = 8; QueueDropStrategy default_strategy_ = QueueDropStrategy::DropOldest; std::mutex graphs_mu_; std::mutex mu_; std::condition_variable cv_; }; } // namespace rk3588