OrangePi3588Media/include/graph_manager.h

234 lines
7.2 KiB
C++

#pragma once
#include <chrono>
#include <cstdint>
#include <map>
#include <memory>
#include <optional>
#include <string>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <thread>
#include <atomic>
#include <utility>
#include "node.h"
#include "plugin_loader.h"
#include "utils/result.h"
#include "utils/simple_json.h"
namespace rk3588 {
class IInferBackend;
struct QueueSnapshot {
size_t size = 0;
size_t capacity = 0;
uint64_t pushed_total = 0;
uint64_t popped_total = 0;
uint64_t dropped_total = 0;
bool stopped = false;
double pushed_fps = 0.0;
double popped_fps = 0.0;
};
struct EdgeSnapshot {
std::string from;
std::string to;
QueueSnapshot queue;
};
struct NodeSnapshot {
std::string graph;
std::string id;
std::string type;
std::string role;
bool enabled = true;
QueueSnapshot input_queue;
double input_fps = 0.0;
double output_fps = 0.0;
uint64_t ok_total = 0;
uint64_t drop_total = 0;
uint64_t error_total = 0;
double avg_process_time_ms = 0.0;
SimpleJson custom_metrics;
};
struct GraphSnapshot {
std::string name;
bool running = false;
uint64_t timestamp_ms = 0;
double total_fps = 0.0;
uint64_t alarm_total = 0;
uint64_t publish_clients = 0;
std::vector<NodeSnapshot> nodes;
std::vector<EdgeSnapshot> edges;
};
class Graph {
public:
explicit Graph(std::string name);
~Graph();
bool Build(const SimpleJson& graph_cfg, PluginLoader& loader, size_t default_queue_size,
QueueDropStrategy default_strategy, std::string& err);
bool Start();
void Stop();
const std::string& Name() const { return name_; }
const SimpleJson& GraphConfig() const { return graph_cfg_; }
GraphSnapshot Snapshot() const;
bool FindNodeSnapshotById(const std::string& node_id, NodeSnapshot& out) const;
bool UpdateNodeConfig(const std::string& node_id, const SimpleJson& new_node_cfg, std::string& err);
// Attempt in-place update via INode::UpdateConfig for nodes whose config changed.
// Returns true if fully updated without rebuild.
// Returns false with empty err if rebuild is required.
// Returns false with non-empty err if update failed.
bool TryUpdateInPlace(const SimpleJson& new_graph_cfg, size_t default_queue_size,
QueueDropStrategy default_strategy, std::string& err);
private:
class Executor;
struct NodeMetrics {
std::atomic<uint64_t> ok_total{0};
std::atomic<uint64_t> drop_total{0};
std::atomic<uint64_t> error_total{0};
std::atomic<uint64_t> process_time_ns_total{0};
};
struct NodeEntry {
NodeEntry() = default;
NodeEntry(const NodeEntry&) = delete;
NodeEntry& operator=(const NodeEntry&) = delete;
NodeEntry(NodeEntry&& other) noexcept {
*this = std::move(other);
}
NodeEntry& operator=(NodeEntry&& other) noexcept {
if (this == &other) return *this;
id = std::move(other.id);
type = std::move(other.type);
role = std::move(other.role);
enabled = other.enabled;
config = std::move(other.config);
context = std::move(other.context);
node = std::move(other.node);
cpu_affinity = std::move(other.cpu_affinity);
sched_state.store(other.sched_state.load(std::memory_order_relaxed), std::memory_order_relaxed);
pool_index = other.pool_index;
metrics = std::move(other.metrics);
return *this;
}
std::string id;
std::string type;
std::string role;
bool enabled = true;
SimpleJson config;
NodeContext context;
NodePtr node;
std::vector<int> cpu_affinity;
// Scheduling state for Executor.
// Bit0: queued, Bit1: running.
std::atomic<uint32_t> sched_state{0};
uint32_t pool_index = 0;
std::shared_ptr<NodeMetrics> metrics;
};
struct EdgeEntry {
std::string from;
std::string to;
std::shared_ptr<SpscQueue<FramePtr>> queue;
};
std::string name_;
SimpleJson graph_cfg_;
size_t built_default_queue_size_ = 8;
QueueDropStrategy built_default_strategy_ = QueueDropStrategy::DropOldest;
std::shared_ptr<IInferBackend> infer_backend_;
std::vector<NodeEntry> nodes_;
std::vector<EdgeEntry> edges_;
std::unique_ptr<Executor> executor_;
std::atomic<bool> running_{false};
std::atomic<bool> stop_requested_{false};
// Mutable rate state (updated on snapshot collection).
mutable std::mutex rate_mu_;
mutable std::chrono::steady_clock::time_point last_rate_tp_{};
mutable std::map<std::string, uint64_t> last_node_in_popped_;
mutable std::map<std::string, uint64_t> last_node_out_pushed_;
mutable std::map<std::string, uint64_t> last_edge_pushed_;
mutable std::map<std::string, uint64_t> last_edge_popped_;
};
class GraphManager {
public:
explicit GraphManager(std::string plugin_dir = "plugins");
~GraphManager();
static bool LoadConfigFile(const std::string& path, SimpleJson& out, std::string& err);
static Result<SimpleJson> LoadConfig(const std::string& path);
bool Build(const SimpleJson& root_cfg, std::string& err);
bool BuildFromFile(const std::string& path, std::string& err);
Status BuildFromConfig(const SimpleJson& root_cfg);
Status BuildFromPath(const std::string& path);
bool StartAll();
void StopAll();
void RequestStop();
void BlockUntilStop();
bool ReloadFromFile(const std::string& path, std::string& err);
Status Reload(const std::string& path);
const std::string& ConfigPath() const { return config_path_; }
bool UpdateNodeConfig(const std::string& node_id, const std::optional<std::string>& graph,
const SimpleJson& new_node_cfg, std::string& err);
Status SetNodeConfig(const std::string& node_id, const SimpleJson& new_node_cfg,
const std::optional<std::string>& graph = std::nullopt);
std::vector<GraphSnapshot> ListGraphSnapshots();
bool GetGraphSnapshot(const std::string& name, GraphSnapshot& out, std::string& err);
Result<GraphSnapshot> GetGraph(const std::string& name);
// If graph is not set: auto-match when unique, otherwise return false with err describing ambiguity.
bool GetNodeSnapshot(const std::string& node_id, const std::optional<std::string>& graph,
NodeSnapshot& out, std::string& err);
Result<NodeSnapshot> GetNode(const std::string& node_id,
const std::optional<std::string>& graph = std::nullopt);
private:
bool running_ = false;
PluginLoader loader_;
std::vector<std::unique_ptr<Graph>> graphs_;
// Persisted "source" root config (may contain templates/instances).
SimpleJson last_good_source_root_;
// Expanded root config used for running graphs (instances expanded into graphs).
SimpleJson last_good_expanded_root_;
std::string config_path_;
size_t default_queue_size_ = 8;
QueueDropStrategy default_strategy_ = QueueDropStrategy::DropOldest;
std::mutex graphs_mu_;
std::mutex mu_;
std::condition_variable cv_;
};
} // namespace rk3588