diff --git a/CMakeLists.txt b/CMakeLists.txt index f934439..e517d5f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,6 +23,8 @@ set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_EXTENSIONS OFF) +find_package(Threads REQUIRED) + # Helper target for common warnings add_library(project_options INTERFACE) if(MSVC) @@ -49,14 +51,18 @@ endif() set(SRC_FILES src/main.cpp src/media_server_app.cpp + src/graph_manager.cpp + src/plugin_loader.cpp ) add_executable(media-server ${SRC_FILES}) +set_target_properties(media-server PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}) target_include_directories(media-server PRIVATE ${CMAKE_SOURCE_DIR}/include + ${CMAKE_SOURCE_DIR}/third_party ) -target_link_libraries(media-server PRIVATE project_options) +target_link_libraries(media-server PRIVATE project_options Threads::Threads) target_compile_definitions(media-server PRIVATE RK_PROJECT_VERSION="${PROJECT_VERSION}" RK_GIT_SHA="${RK_GIT_SHA}" @@ -68,5 +74,7 @@ if(BUILD_SAMPLES) add_subdirectory(samples) endif() +add_subdirectory(plugins) + include(GNUInstallDirs) install(FILES configs/sample_cam1.json DESTINATION ${CMAKE_INSTALL_DATADIR}/rk3588-media-server) diff --git a/configs/sample_cam1.json b/configs/sample_cam1.json index ac62457..182426f 100644 --- a/configs/sample_cam1.json +++ b/configs/sample_cam1.json @@ -1,4 +1,5 @@ { + "queue": { "size": 8, "strategy": "drop_oldest" }, "graphs": [ { "name": "cam1", @@ -8,13 +9,20 @@ "type": "input_rtsp", "role": "source", "enable": true, - "url": "rtsp://user:pass@ip/stream1" + "url": "rtsp://user:pass@ip/stream1", + "fps": 25, + "width": 1920, + "height": 1080 }, { "id": "pub_cam1", "type": "publish", "role": "sink", "enable": true, + "codec": "h264", + "fps": 25, + "gop": 50, + "bitrate_kbps": 4000, "outputs": [ { "proto": "rtsp", "port": 8554, "path": "/live/cam1" } ] diff --git a/include/frame/frame.h b/include/frame/frame.h index 006bc1a..ba9e0be 100644 --- a/include/frame/frame.h +++ b/include/frame/frame.h @@ -17,6 +17,8 @@ struct Frame { int height = 0; PixelFormat format = PixelFormat::UNKNOWN; + int stride = 0; + int dma_fd = -1; uint64_t pts = 0; uint64_t frame_id = 0; diff --git a/include/graph_manager.h b/include/graph_manager.h new file mode 100644 index 0000000..6fe36ba --- /dev/null +++ b/include/graph_manager.h @@ -0,0 +1,60 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "node.h" +#include "plugin_loader.h" +#include "utils/simple_json.h" + +namespace rk3588 { + +class Graph { +public: + explicit Graph(std::string name); + ~Graph(); + + bool Build(const SimpleJson& graph_cfg, PluginLoader& loader, size_t default_queue_size, + QueueDropStrategy default_strategy, std::string& err); + bool Start(); + void Stop(); + +private: + struct NodeEntry { + std::string id; + std::string type; + bool enabled = true; + SimpleJson config; + NodeContext context; + std::unique_ptr node; + }; + + std::string name_; + std::vector nodes_; +}; + +class GraphManager { +public: + explicit GraphManager(std::string plugin_dir = "plugins"); + ~GraphManager(); + + static bool LoadConfigFile(const std::string& path, SimpleJson& out, std::string& err); + + bool Build(const SimpleJson& root_cfg, std::string& err); + bool StartAll(); + void StopAll(); + void RequestStop(); + void BlockUntilStop(); + +private: + bool running_ = false; + PluginLoader loader_; + std::vector> graphs_; + std::mutex mu_; + std::condition_variable cv_; +}; + +} // namespace rk3588 diff --git a/include/media_server_app.h b/include/media_server_app.h new file mode 100644 index 0000000..c8564e0 --- /dev/null +++ b/include/media_server_app.h @@ -0,0 +1,19 @@ +#pragma once + +#include + +#include "graph_manager.h" + +namespace rk3588 { + +class MediaServerApp { +public: + explicit MediaServerApp(std::string config_path); + int Start(); + +private: + std::string config_path_; + GraphManager graph_manager_; +}; + +} // namespace rk3588 diff --git a/include/node.h b/include/node.h new file mode 100644 index 0000000..e481d8e --- /dev/null +++ b/include/node.h @@ -0,0 +1,50 @@ +#pragma once + +#include +#include +#include + +#include "frame/frame.h" +#include "utils/simple_json.h" +#include "utils/spsc_queue.h" + +namespace rk3588 { + +using FramePtr = std::shared_ptr; + +struct NodeContext { + // For nodes with upstream input. + std::shared_ptr> input_queue; + // For nodes that produce downstream outputs (one queue per outgoing edge). + std::vector>> output_queues; +}; + +class INode { +public: + virtual ~INode() = default; + virtual std::string Id() const = 0; + virtual std::string Type() const = 0; + virtual bool Init(const SimpleJson& config, const NodeContext& ctx) = 0; + virtual bool Start() = 0; + virtual void Stop() = 0; +}; + +constexpr int kNodeAbiVersion = 1; + +using CreateNodeFn = INode* (*)(); +using DestroyNodeFn = void (*)(INode*); +using GetTypeFn = const char* (*)(); +using GetAbiFn = int (*)(); + +#define REGISTER_NODE(NodeClass, NodeTypeStr) \ + extern "C" rk3588::INode* CreateNode() { \ + return new NodeClass(); \ + } \ + extern "C" const char* GetNodeType() { \ + return NodeTypeStr; \ + } \ + extern "C" int GetAbiVersion() { \ + return rk3588::kNodeAbiVersion; \ + } + +} // namespace rk3588 diff --git a/include/plugin_loader.h b/include/plugin_loader.h new file mode 100644 index 0000000..2e6a331 --- /dev/null +++ b/include/plugin_loader.h @@ -0,0 +1,33 @@ +#pragma once + +#include +#include +#include + +#include "node.h" + +namespace rk3588 { + +class PluginLoader { +public: + explicit PluginLoader(std::string plugin_dir); + ~PluginLoader(); + + std::unique_ptr Create(const std::string& type, std::string& err); + +private: + struct PluginHandle { + void* handle = nullptr; + CreateNodeFn create = nullptr; + GetTypeFn get_type = nullptr; + GetAbiFn get_abi = nullptr; + }; + + std::string BuildLibraryPath(const std::string& type) const; + bool LoadPlugin(const std::string& type, PluginHandle& out, std::string& err); + + std::string plugin_dir_; + std::map cache_; +}; + +} // namespace rk3588 diff --git a/include/utils/simple_json.h b/include/utils/simple_json.h new file mode 100644 index 0000000..71634a7 --- /dev/null +++ b/include/utils/simple_json.h @@ -0,0 +1,304 @@ +// Minimal JSON DOM and parser for configuration use. +// Supports objects, arrays, strings, numbers, booleans, and null. +// Not a full JSON implementation but sufficient for structured config files. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace rk3588 { + +class SimpleJson { +public: + enum class Type { Null, Bool, Number, String, Array, Object }; + using Array = std::vector; + using Object = std::map; + using Variant = std::variant; + + SimpleJson() : value_(nullptr) {} + explicit SimpleJson(std::nullptr_t) : value_(nullptr) {} + explicit SimpleJson(bool b) : value_(b) {} + explicit SimpleJson(double n) : value_(n) {} + explicit SimpleJson(std::string s) : value_(std::move(s)) {} + explicit SimpleJson(Array a) : value_(std::move(a)) {} + explicit SimpleJson(Object o) : value_(std::move(o)) {} + + Type type() const { + return static_cast(value_.index()); + } + + bool IsNull() const { return type() == Type::Null; } + bool IsBool() const { return type() == Type::Bool; } + bool IsNumber() const { return type() == Type::Number; } + bool IsString() const { return type() == Type::String; } + bool IsArray() const { return type() == Type::Array; } + bool IsObject() const { return type() == Type::Object; } + + bool AsBool(bool def = false) const { + return std::holds_alternative(value_) ? std::get(value_) : def; + } + + double AsNumber(double def = 0.0) const { + return std::holds_alternative(value_) ? std::get(value_) : def; + } + + int AsInt(int def = 0) const { + return static_cast(AsNumber(static_cast(def))); + } + + const std::string& AsString(const std::string& def = EmptyString()) const { + if (const auto* s = std::get_if(&value_)) { + return *s; + } + return def; + } + + const Array& AsArray() const { + static const Array kEmpty; + if (const auto* a = std::get_if(&value_)) { + return *a; + } + return kEmpty; + } + + const Object& AsObject() const { + static const Object kEmpty; + if (const auto* o = std::get_if(&value_)) { + return *o; + } + return kEmpty; + } + + const SimpleJson* Find(const std::string& key) const { + if (const auto* o = std::get_if(&value_)) { + auto it = o->find(key); + if (it != o->end()) { + return &it->second; + } + } + return nullptr; + } + + template + T ValueOr(const std::string& key, const T& def) const { + const SimpleJson* child = Find(key); + if (!child) return def; + if constexpr (std::is_same_v) { + return child->AsString(def); + } else if constexpr (std::is_same_v) { + return child->AsNumber(def); + } else if constexpr (std::is_same_v) { + return child->AsInt(def); + } else if constexpr (std::is_same_v) { + return child->AsBool(def); + } else { + return def; + } + } + +private: + Variant value_; + + static const std::string& EmptyString() { + static const std::string empty; + return empty; + } +}; + +inline void SkipWhitespace(std::string_view text, size_t& pos) { + while (pos < text.size() && std::isspace(static_cast(text[pos]))) { + ++pos; + } +} + +inline bool ConsumeLiteral(std::string_view text, size_t& pos, std::string_view lit) { + if (text.substr(pos, lit.size()) == lit) { + pos += lit.size(); + return true; + } + return false; +} + +inline bool ParseString(std::string_view text, size_t& pos, std::string& out, std::string& err) { + if (text[pos] != '"') return false; + ++pos; // skip opening quote + while (pos < text.size()) { + char c = text[pos++]; + if (c == '"') { + return true; + } + if (c == '\\') { + if (pos >= text.size()) { + err = "Unexpected end of input in string escape"; + return false; + } + char esc = text[pos++]; + switch (esc) { + case '"': out.push_back('"'); break; + case '\\': out.push_back('\\'); break; + case '/': out.push_back('/'); break; + case 'b': out.push_back('\b'); break; + case 'f': out.push_back('\f'); break; + case 'n': out.push_back('\n'); break; + case 'r': out.push_back('\r'); break; + case 't': out.push_back('\t'); break; + default: + err = "Unsupported escape sequence"; + return false; + } + } else { + out.push_back(c); + } + } + err = "Unterminated string"; + return false; +} + +inline bool ParseNumber(std::string_view text, size_t& pos, double& out) { + size_t start = pos; + if (text[pos] == '-') ++pos; + while (pos < text.size() && std::isdigit(static_cast(text[pos]))) ++pos; + if (pos < text.size() && text[pos] == '.') { + ++pos; + while (pos < text.size() && std::isdigit(static_cast(text[pos]))) ++pos; + } + if (pos < text.size() && (text[pos] == 'e' || text[pos] == 'E')) { + ++pos; + if (pos < text.size() && (text[pos] == '+' || text[pos] == '-')) ++pos; + while (pos < text.size() && std::isdigit(static_cast(text[pos]))) ++pos; + } + try { + out = std::stod(std::string{text.substr(start, pos - start)}); + return true; + } catch (...) { + return false; + } +} + +inline bool ParseValue(std::string_view text, size_t& pos, SimpleJson& out, std::string& err); + +inline bool ParseArray(std::string_view text, size_t& pos, SimpleJson& out, std::string& err) { + if (text[pos] != '[') return false; + ++pos; + SkipWhitespace(text, pos); + SimpleJson::Array arr; + if (pos < text.size() && text[pos] == ']') { + ++pos; + out = SimpleJson(std::move(arr)); + return true; + } + while (pos < text.size()) { + SimpleJson element; + if (!ParseValue(text, pos, element, err)) return false; + arr.push_back(std::move(element)); + SkipWhitespace(text, pos); + if (pos < text.size() && text[pos] == ',') { + ++pos; + SkipWhitespace(text, pos); + continue; + } + if (pos < text.size() && text[pos] == ']') { + ++pos; + out = SimpleJson(std::move(arr)); + return true; + } + err = "Expected ',' or ']' in array"; + return false; + } + err = "Unterminated array"; + return false; +} + +inline bool ParseObject(std::string_view text, size_t& pos, SimpleJson& out, std::string& err) { + if (text[pos] != '{') return false; + ++pos; + SkipWhitespace(text, pos); + SimpleJson::Object obj; + if (pos < text.size() && text[pos] == '}') { + ++pos; + out = SimpleJson(std::move(obj)); + return true; + } + while (pos < text.size()) { + SkipWhitespace(text, pos); + if (pos >= text.size() || text[pos] != '"') { + err = "Expected string key"; + return false; + } + std::string key; + if (!ParseString(text, pos, key, err)) return false; + SkipWhitespace(text, pos); + if (pos >= text.size() || text[pos] != ':') { + err = "Expected ':' after object key"; + return false; + } + ++pos; + SkipWhitespace(text, pos); + SimpleJson value; + if (!ParseValue(text, pos, value, err)) return false; + obj.emplace(std::move(key), std::move(value)); + SkipWhitespace(text, pos); + if (pos < text.size() && text[pos] == ',') { + ++pos; + SkipWhitespace(text, pos); + continue; + } + if (pos < text.size() && text[pos] == '}') { + ++pos; + out = SimpleJson(std::move(obj)); + return true; + } + err = "Expected ',' or '}' in object"; + return false; + } + err = "Unterminated object"; + return false; +} + +inline bool ParseValue(std::string_view text, size_t& pos, SimpleJson& out, std::string& err) { + SkipWhitespace(text, pos); + if (pos >= text.size()) { + err = "Unexpected end of input"; + return false; + } + char c = text[pos]; + if (c == 'n') { + if (ConsumeLiteral(text, pos, "null")) { out = SimpleJson(nullptr); return true; } + } else if (c == 't') { + if (ConsumeLiteral(text, pos, "true")) { out = SimpleJson(true); return true; } + } else if (c == 'f') { + if (ConsumeLiteral(text, pos, "false")) { out = SimpleJson(false); return true; } + } else if (c == '"') { + std::string s; + if (ParseString(text, pos, s, err)) { out = SimpleJson(std::move(s)); return true; } + return false; + } else if (c == '[') { + return ParseArray(text, pos, out, err); + } else if (c == '{') { + return ParseObject(text, pos, out, err); + } else if (c == '-' || std::isdigit(static_cast(c))) { + double num = 0.0; + if (ParseNumber(text, pos, num)) { out = SimpleJson(num); return true; } + } + err = "Invalid JSON value"; + return false; +} + +inline bool ParseSimpleJson(const std::string& input, SimpleJson& out, std::string& err) { + size_t pos = 0; + if (!ParseValue(input, pos, out, err)) return false; + SkipWhitespace(input, pos); + if (pos != input.size()) { + err = "Unexpected characters after JSON document"; + return false; + } + return true; +} + +} // namespace rk3588 diff --git a/include/utils/spsc_queue.h b/include/utils/spsc_queue.h new file mode 100644 index 0000000..630bffa --- /dev/null +++ b/include/utils/spsc_queue.h @@ -0,0 +1,78 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace rk3588 { + +enum class QueueDropStrategy { + DropOldest, + Block +}; + +template +class SpscQueue { +public: + SpscQueue(size_t capacity, QueueDropStrategy strategy) + : capacity_(capacity), strategy_(strategy) {} + + bool Push(T item) { + std::unique_lock lock(mu_); + if (queue_.size() >= capacity_) { + if (strategy_ == QueueDropStrategy::DropOldest) { + queue_.pop_front(); + ++dropped_; + } else { + // Block until space is available + space_cv_.wait(lock, [&] { return queue_.size() < capacity_ || stop_; }); + if (stop_) return false; + } + } + queue_.push_back(std::move(item)); + data_cv_.notify_one(); + return true; + } + + bool Pop(T& out, std::chrono::milliseconds timeout) { + std::unique_lock lock(mu_); + if (!data_cv_.wait_for(lock, timeout, [&] { return !queue_.empty() || stop_; })) { + return false; + } + if (queue_.empty()) return false; + out = std::move(queue_.front()); + queue_.pop_front(); + space_cv_.notify_one(); + return true; + } + + void Stop() { + std::lock_guard lock(mu_); + stop_ = true; + data_cv_.notify_all(); + space_cv_.notify_all(); + } + + size_t Size() const { + std::lock_guard lock(mu_); + return queue_.size(); + } + + size_t Capacity() const { return capacity_; } + + size_t DroppedCount() const { return dropped_; } + +private: + size_t capacity_ = 0; + QueueDropStrategy strategy_ = QueueDropStrategy::DropOldest; + mutable std::mutex mu_; + std::condition_variable data_cv_; + std::condition_variable space_cv_; + std::deque queue_; + bool stop_ = false; + size_t dropped_ = 0; +}; + +} // namespace rk3588 diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt new file mode 100644 index 0000000..40e3d35 --- /dev/null +++ b/plugins/CMakeLists.txt @@ -0,0 +1,24 @@ +set(RK_PLUGIN_OUTPUT_DIR ${CMAKE_BINARY_DIR}/plugins) + +add_library(input_rtsp SHARED input_rtsp/input_rtsp_node.cpp) +target_include_directories(input_rtsp PRIVATE ${CMAKE_SOURCE_DIR}/include ${CMAKE_SOURCE_DIR}/third_party) +target_link_libraries(input_rtsp PRIVATE project_options Threads::Threads) +set_target_properties(input_rtsp PROPERTIES + OUTPUT_NAME "input_rtsp" + LIBRARY_OUTPUT_DIRECTORY ${RK_PLUGIN_OUTPUT_DIR} + RUNTIME_OUTPUT_DIRECTORY ${RK_PLUGIN_OUTPUT_DIR} +) + +add_library(publish SHARED publish/publish_node.cpp) +target_include_directories(publish PRIVATE ${CMAKE_SOURCE_DIR}/include ${CMAKE_SOURCE_DIR}/third_party) +target_link_libraries(publish PRIVATE project_options Threads::Threads) +set_target_properties(publish PROPERTIES + OUTPUT_NAME "publish" + LIBRARY_OUTPUT_DIRECTORY ${RK_PLUGIN_OUTPUT_DIR} + RUNTIME_OUTPUT_DIRECTORY ${RK_PLUGIN_OUTPUT_DIR} +) + +install(TARGETS input_rtsp publish + LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}/rk3588-media-server/plugins + RUNTIME DESTINATION ${CMAKE_INSTALL_LIBDIR}/rk3588-media-server/plugins +) diff --git a/plugins/input_rtsp/input_rtsp_node.cpp b/plugins/input_rtsp/input_rtsp_node.cpp new file mode 100644 index 0000000..b5516ae --- /dev/null +++ b/plugins/input_rtsp/input_rtsp_node.cpp @@ -0,0 +1,77 @@ +#include +#include +#include +#include + +#include "node.h" + +namespace rk3588 { + +class InputRtspNode : public INode { +public: + std::string Id() const override { return id_; } + std::string Type() const override { return "input_rtsp"; } + + bool Init(const SimpleJson& config, const NodeContext& ctx) override { + id_ = config.ValueOr("id", "input_rtsp"); + url_ = config.ValueOr("url", ""); + fps_ = config.ValueOr("fps", 25); + width_ = config.ValueOr("width", 1920); + height_ = config.ValueOr("height", 1080); + if (ctx.output_queues.empty()) { + std::cerr << "[input_rtsp] no downstream queue configured for node " << id_ << "\n"; + return false; + } + out_queue_ = ctx.output_queues[0]; + return true; + } + + bool Start() override { + if (!out_queue_) return false; + running_.store(true); + worker_ = std::thread(&InputRtspNode::Loop, this); + std::cout << "[input_rtsp] start url=" << url_ << " fps=" << fps_ << "\n"; + return true; + } + + void Stop() override { + running_.store(false); + if (out_queue_) out_queue_->Stop(); + if (worker_.joinable()) worker_.join(); + } + +private: + void Loop() { + using namespace std::chrono; + auto frame_interval = fps_ > 0 ? milliseconds(1000 / fps_) : milliseconds(40); + while (running_.load()) { + auto frame = std::make_shared(); + frame->width = width_; + frame->height = height_; + frame->format = PixelFormat::NV12; + frame->frame_id = ++frame_id_; + frame->pts = duration_cast(steady_clock::now().time_since_epoch()).count(); + out_queue_->Push(frame); + + if (frame_id_ % 100 == 0) { + std::cout << "[input_rtsp] generated frame " << frame_id_ << " queue=" + << out_queue_->Size() << " drops=" << out_queue_->DroppedCount() << "\n"; + } + std::this_thread::sleep_for(frame_interval); + } + } + + std::string id_; + std::string url_; + int fps_ = 25; + int width_ = 1920; + int height_ = 1080; + std::atomic running_{false}; + std::shared_ptr> out_queue_; + std::thread worker_; + uint64_t frame_id_ = 0; +}; + +REGISTER_NODE(InputRtspNode, "input_rtsp"); + +} // namespace rk3588 diff --git a/plugins/publish/publish_node.cpp b/plugins/publish/publish_node.cpp new file mode 100644 index 0000000..cbd6c80 --- /dev/null +++ b/plugins/publish/publish_node.cpp @@ -0,0 +1,86 @@ +#include +#include +#include +#include + +#include "node.h" + +namespace rk3588 { + +class PublishNode : public INode { +public: + std::string Id() const override { return id_; } + std::string Type() const override { return "publish"; } + + bool Init(const SimpleJson& config, const NodeContext& ctx) override { + id_ = config.ValueOr("id", "publish"); + input_queue_ = ctx.input_queue; + codec_ = config.ValueOr("codec", "h264"); + fps_ = config.ValueOr("fps", 25); + gop_ = config.ValueOr("gop", 50); + bitrate_kbps_ = config.ValueOr("bitrate_kbps", 4000); + // Outputs array optional; log first entry if present. + const SimpleJson* outputs = config.Find("outputs"); + if (outputs && outputs->IsArray() && !outputs->AsArray().empty()) { + const auto& first = outputs->AsArray().front(); + if (first.IsObject()) { + port_ = first.ValueOr("port", 8554); + path_ = first.ValueOr("path", "/live/cam1"); + } + } + if (!input_queue_) { + std::cerr << "[publish] no input queue for node " << id_ << "\n"; + return false; + } + return true; + } + + bool Start() override { + if (!input_queue_) return false; + running_.store(true); + worker_ = std::thread(&PublishNode::Loop, this); + std::cout << "[publish] start codec=" << codec_ << " fps=" << fps_ << " gop=" << gop_ + << " bitrate=" << bitrate_kbps_ << "kbps target rtsp://0.0.0.0:" << port_ + << path_ << "\n"; + return true; + } + + void Stop() override { + running_.store(false); + if (input_queue_) input_queue_->Stop(); + if (worker_.joinable()) worker_.join(); + } + +private: + void Loop() { + using namespace std::chrono; + FramePtr frame; + while (running_.load()) { + if (!input_queue_->Pop(frame, milliseconds(200))) { + continue; + } + ++encoded_frames_; + if (encoded_frames_ % 100 == 0 && frame) { + std::cout << "[publish] encoded frame " << frame->frame_id + << " queue=" << input_queue_->Size() + << " drops=" << input_queue_->DroppedCount() << "\n"; + } + } + } + + std::string id_; + std::string codec_ = "h264"; + int fps_ = 25; + int gop_ = 50; + int bitrate_kbps_ = 4000; + int port_ = 8554; + std::string path_ = "/live/cam1"; + std::atomic running_{false}; + std::shared_ptr> input_queue_; + std::thread worker_; + uint64_t encoded_frames_ = 0; +}; + +REGISTER_NODE(PublishNode, "publish"); + +} // namespace rk3588 diff --git a/src/graph_manager.cpp b/src/graph_manager.cpp new file mode 100644 index 0000000..4cff3ea --- /dev/null +++ b/src/graph_manager.cpp @@ -0,0 +1,215 @@ +#include "graph_manager.h" + +#include +#include +#include +#include + +namespace rk3588 { + +Graph::Graph(std::string name) : name_(std::move(name)) {} + +Graph::~Graph() { Stop(); } + +bool Graph::Build(const SimpleJson& graph_cfg, PluginLoader& loader, size_t default_queue_size, + QueueDropStrategy default_strategy, std::string& err) { + const auto& obj = graph_cfg.AsObject(); + auto name_it = obj.find("name"); + if (name_it != obj.end() && name_it->second.IsString()) { + name_ = name_it->second.AsString(name_); + } + + // Parse nodes + auto nodes_it = obj.find("nodes"); + if (nodes_it == obj.end() || !nodes_it->second.IsArray()) { + err = "Graph missing 'nodes' array"; + return false; + } + + for (const auto& node_val : nodes_it->second.AsArray()) { + if (!node_val.IsObject()) { + err = "Node entry is not object"; + return false; + } + NodeEntry entry; + entry.config = node_val; + entry.id = node_val.ValueOr("id", ""); + entry.type = node_val.ValueOr("type", ""); + entry.enabled = node_val.ValueOr("enable", true); + if (entry.id.empty() || entry.type.empty()) { + err = "Node missing id or type"; + return false; + } + nodes_.push_back(std::move(entry)); + } + + // Parse edges + auto edges_it = obj.find("edges"); + if (edges_it == obj.end() || !edges_it->second.IsArray()) { + err = "Graph missing 'edges' array"; + return false; + } + + std::map id_to_node; + for (auto& n : nodes_) { + id_to_node[n.id] = &n; + } + + for (const auto& edge_val : edges_it->second.AsArray()) { + if (!edge_val.IsArray() || edge_val.AsArray().size() != 2) { + err = "Edge must be [from, to]"; + return false; + } + const auto& edge_arr = edge_val.AsArray(); + std::string from = edge_arr[0].AsString(""); + std::string to = edge_arr[1].AsString(""); + if (from.empty() || to.empty()) { + err = "Edge has empty endpoint"; + return false; + } + auto from_it = id_to_node.find(from); + auto to_it = id_to_node.find(to); + if (from_it == id_to_node.end() || to_it == id_to_node.end()) { + err = "Edge references unknown node"; + return false; + } + size_t qsize = default_queue_size; + QueueDropStrategy strategy = default_strategy; + if (const auto* qcfg = edge_val.Find("queue")) { + if (qcfg->IsObject()) { + qsize = static_cast(qcfg->ValueOr("size", static_cast(default_queue_size))); + std::string strat = qcfg->ValueOr("strategy", "drop_oldest"); + if (strat == "drop_oldest") strategy = QueueDropStrategy::DropOldest; + else strategy = QueueDropStrategy::Block; + } + } + auto queue = std::make_shared>(qsize, strategy); + from_it->second->context.output_queues.push_back(queue); + // For now support single input per node; first edge wins. + if (!to_it->second->context.input_queue) { + to_it->second->context.input_queue = queue; + } + } + + // Instantiate nodes via plugins + for (auto& entry : nodes_) { + if (!entry.enabled) continue; + std::string load_err; + entry.node = loader.Create(entry.type, load_err); + if (!entry.node) { + err = load_err; + return false; + } + if (!entry.node->Init(entry.config, entry.context)) { + err = "Init failed for node " + entry.id; + return false; + } + } + + return true; +} + +bool Graph::Start() { + for (auto& entry : nodes_) { + if (!entry.enabled || !entry.node) continue; + if (!entry.node->Start()) { + std::cerr << "[Graph] failed to start node: " << entry.id << "\n"; + return false; + } + } + std::cout << "[Graph] started graph " << name_ << " with " << nodes_.size() << " nodes\n"; + return true; +} + +void Graph::Stop() { + for (auto it = nodes_.rbegin(); it != nodes_.rend(); ++it) { + if (it->node) { + it->node->Stop(); + } + if (it->context.input_queue) it->context.input_queue->Stop(); + for (auto& q : it->context.output_queues) q->Stop(); + } +} + +GraphManager::GraphManager(std::string plugin_dir) + : loader_(std::move(plugin_dir)) {} + +GraphManager::~GraphManager() { StopAll(); } + +bool GraphManager::LoadConfigFile(const std::string& path, SimpleJson& out, std::string& err) { + std::ifstream ifs(path); + if (!ifs.is_open()) { + err = "Failed to open config: " + path; + return false; + } + std::string content((std::istreambuf_iterator(ifs)), std::istreambuf_iterator()); + return ParseSimpleJson(content, out, err); +} + +bool GraphManager::Build(const SimpleJson& root_cfg, std::string& err) { + auto graphs_it = root_cfg.AsObject().find("graphs"); + if (graphs_it == root_cfg.AsObject().end() || !graphs_it->second.IsArray()) { + err = "Root config missing 'graphs' array"; + return false; + } + + size_t default_queue_size = 8; + QueueDropStrategy default_strategy = QueueDropStrategy::DropOldest; + if (const auto* queue_cfg = root_cfg.Find("queue")) { + if (queue_cfg->IsObject()) { + default_queue_size = static_cast(queue_cfg->ValueOr("size", 8)); + std::string strategy = queue_cfg->ValueOr("strategy", "drop_oldest"); + if (strategy == "block") default_strategy = QueueDropStrategy::Block; + } + } + + for (const auto& graph_val : graphs_it->second.AsArray()) { + if (!graph_val.IsObject()) { + err = "Graph entry is not object"; + return false; + } + std::string name = graph_val.ValueOr("name", "noname"); + auto graph = std::make_unique(name); + if (!graph->Build(graph_val, loader_, default_queue_size, default_strategy, err)) { + return false; + } + graphs_.push_back(std::move(graph)); + } + return true; +} + +bool GraphManager::StartAll() { + for (auto& g : graphs_) { + if (!g->Start()) { + return false; + } + } + { + std::lock_guard lock(mu_); + running_ = true; + } + return true; +} + +void GraphManager::StopAll() { + { + std::lock_guard lock(mu_); + if (!running_) return; + running_ = false; + } + for (auto& g : graphs_) { + g->Stop(); + } + cv_.notify_all(); +} + +void GraphManager::RequestStop() { + StopAll(); +} + +void GraphManager::BlockUntilStop() { + std::unique_lock lock(mu_); + cv_.wait(lock, [&] { return !running_; }); +} + +} // namespace rk3588 diff --git a/src/media_server_app.cpp b/src/media_server_app.cpp index 5225eca..897c5aa 100644 --- a/src/media_server_app.cpp +++ b/src/media_server_app.cpp @@ -1,15 +1,52 @@ #include "media_server_app.h" #include +#include +#include namespace rk3588 { +namespace { + +GraphManager* g_manager = nullptr; + +void HandleSignal(int) { + if (g_manager) { + std::cerr << "\n[MediaServerApp] Caught signal, stopping...\n"; + g_manager->RequestStop(); + } +} + +} // namespace + MediaServerApp::MediaServerApp(std::string config_path) : config_path_(std::move(config_path)) {} int MediaServerApp::Start() { - std::cout << "[MediaServerApp] config=" << config_path_ - << " (pipeline execution not yet implemented)\n"; + SimpleJson root_cfg; + std::string err; + if (!GraphManager::LoadConfigFile(config_path_, root_cfg, err)) { + std::cerr << "[MediaServerApp] Failed to load config: " << err << "\n"; + return 1; + } + + if (!graph_manager_.Build(root_cfg, err)) { + std::cerr << "[MediaServerApp] Failed to build graphs: " << err << "\n"; + return 1; + } + + g_manager = &graph_manager_; + signal(SIGINT, HandleSignal); + signal(SIGTERM, HandleSignal); + + if (!graph_manager_.StartAll()) { + std::cerr << "[MediaServerApp] Failed to start graphs\n"; + return 1; + } + + std::cout << "[MediaServerApp] Running. Press Ctrl+C to stop.\n"; + graph_manager_.BlockUntilStop(); + std::cout << "[MediaServerApp] Shutdown complete.\n"; return 0; } diff --git a/src/plugin_loader.cpp b/src/plugin_loader.cpp new file mode 100644 index 0000000..ae82803 --- /dev/null +++ b/src/plugin_loader.cpp @@ -0,0 +1,128 @@ +#include "plugin_loader.h" + +#include +#include + +#ifdef _WIN32 +#include +#else +#include +#endif + +namespace rk3588 { + +namespace { + +void* LoadLibraryHandle(const std::string& path, std::string& err) { +#ifdef _WIN32 + HMODULE handle = LoadLibraryA(path.c_str()); + if (!handle) { + err = "LoadLibrary failed for " + path; + } + return handle; +#else + void* handle = dlopen(path.c_str(), RTLD_LAZY); + if (!handle) { + const char* dl_err = dlerror(); + err = dl_err ? dl_err : "dlopen failed"; + } + return handle; +#endif +} + +void CloseLibraryHandle(void* handle) { + if (!handle) return; +#ifdef _WIN32 + FreeLibrary(static_cast(handle)); +#else + dlclose(handle); +#endif +} + +void* LoadSymbol(void* handle, const char* name) { +#ifdef _WIN32 + return reinterpret_cast(GetProcAddress(static_cast(handle), name)); +#else + return dlsym(handle, name); +#endif +} + +std::string SharedLibExtension() { +#ifdef _WIN32 + return ".dll"; +#elif __APPLE__ + return ".dylib"; +#else + return ".so"; +#endif +} + +} // namespace + +PluginLoader::PluginLoader(std::string plugin_dir) + : plugin_dir_(std::move(plugin_dir)) {} + +PluginLoader::~PluginLoader() { + for (auto& kv : cache_) { + CloseLibraryHandle(kv.second.handle); + } +} + +std::string PluginLoader::BuildLibraryPath(const std::string& type) const { +#ifdef _WIN32 + return plugin_dir_ + "/" + type + SharedLibExtension(); +#else + return plugin_dir_ + "/lib" + type + SharedLibExtension(); +#endif +} + +bool PluginLoader::LoadPlugin(const std::string& type, PluginHandle& out, std::string& err) { + std::string path = BuildLibraryPath(type); + void* handle = LoadLibraryHandle(path, err); + if (!handle) return false; + + auto create = reinterpret_cast(LoadSymbol(handle, "CreateNode")); + auto get_type = reinterpret_cast(LoadSymbol(handle, "GetNodeType")); + auto get_abi = reinterpret_cast(LoadSymbol(handle, "GetAbiVersion")); + if (!create || !get_type || !get_abi) { + err = "Missing required symbols in plugin: " + path; + CloseLibraryHandle(handle); + return false; + } + if (get_abi() != kNodeAbiVersion) { + std::ostringstream oss; + oss << "ABI mismatch for plugin " << type << ": expected " << kNodeAbiVersion + << " got " << get_abi(); + err = oss.str(); + CloseLibraryHandle(handle); + return false; + } + out.handle = handle; + out.create = create; + out.get_type = get_type; + out.get_abi = get_abi; + return true; +} + +std::unique_ptr PluginLoader::Create(const std::string& type, std::string& err) { + auto it = cache_.find(type); + if (it == cache_.end()) { + PluginHandle handle; + if (!LoadPlugin(type, handle, err)) { + std::cerr << "[PluginLoader] Failed to load plugin '" << type << "': " << err + << "\n"; + return nullptr; + } + it = cache_.emplace(type, std::move(handle)).first; + } + + PluginHandle& handle = it->second; + std::unique_ptr node(handle.create()); + if (!node) { + err = "CreateNode returned null for type " + type; + return nullptr; + } + return node; +} + +} // namespace rk3588