From edb8f48f8bc324f94aaff31ca67125dbd39bef2e Mon Sep 17 00:00:00 2001 From: sladro Date: Wed, 31 Dec 2025 11:00:08 +0800 Subject: [PATCH] =?UTF-8?q?=E9=A1=B9=E7=9B=AE=E8=A1=A5=E5=85=85=E5=BC=80?= =?UTF-8?q?=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- include/graph_manager.h | 18 + include/node.h | 6 +- include/utils/config_schema.h | 206 +++++++++++ include/utils/logger.h | 99 ++++++ include/utils/simple_json_writer.h | 80 +++++ include/utils/thread_affinity.h | 71 ++++ plugins/alarm/alarm_node.cpp | 11 +- plugins/input_rtsp/input_rtsp_node.cpp | 323 +++++++++++------- plugins/publish/publish_node.cpp | 22 +- .../collect_metrics.cpython-311.pyc | Bin 0 -> 5173 bytes .../gen_multi_instances.cpython-311.pyc | Bin 0 -> 3660 bytes scripts/stress/collect_metrics.py | 71 ++++ scripts/stress/gen_multi_instances.py | 58 ++++ src/graph_manager.cpp | 205 +++++++++++ src/http_server.cpp | 193 ++++++++--- src/media_server_app.cpp | 2 +- 16 files changed, 1184 insertions(+), 181 deletions(-) create mode 100644 include/utils/config_schema.h create mode 100644 include/utils/logger.h create mode 100644 include/utils/simple_json_writer.h create mode 100644 include/utils/thread_affinity.h create mode 100644 scripts/stress/__pycache__/collect_metrics.cpython-311.pyc create mode 100644 scripts/stress/__pycache__/gen_multi_instances.cpython-311.pyc create mode 100644 scripts/stress/collect_metrics.py create mode 100644 scripts/stress/gen_multi_instances.py diff --git a/include/graph_manager.h b/include/graph_manager.h index 1976e19..5ae0b35 100644 --- a/include/graph_manager.h +++ b/include/graph_manager.h @@ -50,6 +50,8 @@ struct NodeSnapshot { uint64_t drop_total = 0; uint64_t error_total = 0; double avg_process_time_ms = 0.0; + + SimpleJson custom_metrics; }; struct GraphSnapshot { @@ -57,6 +59,10 @@ struct GraphSnapshot { bool running = false; uint64_t timestamp_ms = 0; double total_fps = 0.0; + + uint64_t alarm_total = 0; + uint64_t publish_clients = 0; + std::vector nodes; std::vector edges; }; @@ -77,6 +83,8 @@ public: GraphSnapshot Snapshot() const; bool FindNodeSnapshotById(const std::string& node_id, NodeSnapshot& out) const; + bool UpdateNodeConfig(const std::string& node_id, const SimpleJson& new_node_cfg, std::string& err); + // Attempt in-place update via INode::UpdateConfig for nodes whose config changed. // Returns true if fully updated without rebuild. // Returns false with empty err if rebuild is required. @@ -137,6 +145,7 @@ public: static bool LoadConfigFile(const std::string& path, SimpleJson& out, std::string& err); bool Build(const SimpleJson& root_cfg, std::string& err); + bool BuildFromFile(const std::string& path, std::string& err); bool StartAll(); void StopAll(); void RequestStop(); @@ -144,6 +153,13 @@ public: bool ReloadFromFile(const std::string& path, std::string& err); + const std::string& ConfigPath() const { return config_path_; } + const std::string& LastGoodPath() const { return last_good_path_; } + bool RollbackFromLastGood(std::string& err); + + bool UpdateNodeConfig(const std::string& node_id, const std::optional& graph, + const SimpleJson& new_node_cfg, std::string& err); + std::vector ListGraphSnapshots(); bool GetGraphSnapshot(const std::string& name, GraphSnapshot& out, std::string& err); // If graph is not set: auto-match when unique, otherwise return false with err describing ambiguity. @@ -155,6 +171,8 @@ private: PluginLoader loader_; std::vector> graphs_; SimpleJson last_good_root_; + std::string config_path_; + std::string last_good_path_; size_t default_queue_size_ = 8; QueueDropStrategy default_strategy_ = QueueDropStrategy::DropOldest; std::mutex graphs_mu_; diff --git a/include/node.h b/include/node.h index fe81ff2..c03f613 100644 --- a/include/node.h +++ b/include/node.h @@ -44,11 +44,15 @@ public: // Dynamic config update without restart. Returns true if update succeeded. virtual bool UpdateConfig(const SimpleJson& /*new_config*/) { return false; } + // Optional custom metrics for graph-level aggregation/observability. + // Return true if out is filled. + virtual bool GetCustomMetrics(SimpleJson& /*out*/) const { return false; } + // Called before Stop() to flush internal buffers (e.g., finish writing files). virtual void Drain() {} }; -constexpr int kNodeAbiVersion = 1; +constexpr int kNodeAbiVersion = 2; using CreateNodeFn = INode* (*)(); using DestroyNodeFn = void (*)(INode*); diff --git a/include/utils/config_schema.h b/include/utils/config_schema.h new file mode 100644 index 0000000..a5d9ab0 --- /dev/null +++ b/include/utils/config_schema.h @@ -0,0 +1,206 @@ +#pragma once + +#include + +#include "utils/simple_json.h" + +namespace rk3588 { + +inline bool IsStringNonEmpty(const SimpleJson* v) { + return v && v->IsString() && !v->AsString("").empty(); +} + +inline bool IsDropStrategy(const std::string& s) { + return s == "drop_oldest" || s == "drop_newest" || s == "block"; +} + +inline bool ValidateQueueCfg(const SimpleJson& q, std::string& err) { + if (!q.IsObject()) { + err = "queue must be object"; + return false; + } + if (const SimpleJson* s = q.Find("size")) { + if (!s->IsNumber() || s->AsInt(0) <= 0) { + err = "queue.size must be positive number"; + return false; + } + } + std::string policy; + if (const SimpleJson* p = q.Find("policy")) { + if (!p->IsString()) { + err = "queue.policy must be string"; + return false; + } + policy = p->AsString(""); + } + if (policy.empty()) { + if (const SimpleJson* p = q.Find("strategy")) { + if (!p->IsString()) { + err = "queue.strategy must be string"; + return false; + } + policy = p->AsString(""); + } + } + if (!policy.empty() && !IsDropStrategy(policy)) { + err = "queue policy/strategy must be one of: drop_oldest, drop_newest, block"; + return false; + } + return true; +} + +inline bool ValidateNodeCfg(const SimpleJson& node, std::string& err) { + if (!node.IsObject()) { + err = "node entry must be object"; + return false; + } + if (!IsStringNonEmpty(node.Find("id"))) { + err = "node.id must be non-empty string"; + return false; + } + if (!IsStringNonEmpty(node.Find("type"))) { + err = "node.type must be non-empty string"; + return false; + } + if (const SimpleJson* en = node.Find("enable")) { + if (!en->IsBool()) { + err = "node.enable must be bool"; + return false; + } + } + if (const SimpleJson* role = node.Find("role")) { + if (!role->IsString()) { + err = "node.role must be string"; + return false; + } + const std::string r = role->AsString(""); + if (!r.empty() && r != "source" && r != "filter" && r != "sink") { + err = "node.role must be one of: source, filter, sink"; + return false; + } + } + if (const SimpleJson* q = node.Find("queue")) { + std::string qerr; + if (!ValidateQueueCfg(*q, qerr)) { + err = "node.queue invalid: " + qerr; + return false; + } + } + if (const SimpleJson* aff = node.Find("cpu_affinity")) { + if (!aff->IsArray()) { + err = "node.cpu_affinity must be array"; + return false; + } + for (const auto& c : aff->AsArray()) { + if (!c.IsNumber() || c.AsInt(-1) < 0) { + err = "node.cpu_affinity entries must be non-negative numbers"; + return false; + } + } + } + return true; +} + +inline bool ValidateEdgeCfg(const SimpleJson& edge, std::string& err) { + if (edge.IsArray()) { + const auto& a = edge.AsArray(); + if (a.size() < 2) { + err = "edge array must be [from, to]"; + return false; + } + if (!a[0].IsString() || !a[1].IsString()) { + err = "edge array entries must be strings"; + return false; + } + if (a.size() >= 3) { + std::string qerr; + if (!ValidateQueueCfg(a[2], qerr)) { + err = "edge queue invalid: " + qerr; + return false; + } + } + return true; + } + if (edge.IsObject()) { + if (!IsStringNonEmpty(edge.Find("from")) || !IsStringNonEmpty(edge.Find("to"))) { + err = "edge object must have non-empty string 'from' and 'to'"; + return false; + } + if (const SimpleJson* q = edge.Find("queue")) { + std::string qerr; + if (!ValidateQueueCfg(*q, qerr)) { + err = "edge.queue invalid: " + qerr; + return false; + } + } + return true; + } + err = "edge entry must be array or object"; + return false; +} + +inline bool ValidateExpandedRootConfig(const SimpleJson& root, std::string& err) { + if (!root.IsObject()) { + err = "root config must be object"; + return false; + } + + if (const SimpleJson* q = root.Find("queue")) { + std::string qerr; + if (!ValidateQueueCfg(*q, qerr)) { + err = "root.queue invalid: " + qerr; + return false; + } + } + + if (const SimpleJson* g = root.Find("global")) { + if (!g->IsObject()) { + err = "root.global must be object"; + return false; + } + } + + const SimpleJson* graphs = root.Find("graphs"); + if (!graphs || !graphs->IsArray()) { + err = "root missing 'graphs' array"; + return false; + } + + for (const auto& gv : graphs->AsArray()) { + if (!gv.IsObject()) { + err = "graph entry must be object"; + return false; + } + if (!IsStringNonEmpty(gv.Find("name"))) { + err = "graph.name must be non-empty string"; + return false; + } + const SimpleJson* nodes = gv.Find("nodes"); + const SimpleJson* edges = gv.Find("edges"); + if (!nodes || !nodes->IsArray()) { + err = "graph.nodes must be array"; + return false; + } + if (!edges || !edges->IsArray()) { + err = "graph.edges must be array"; + return false; + } + for (const auto& nv : nodes->AsArray()) { + std::string nerr; + if (!ValidateNodeCfg(nv, nerr)) { + err = "graph.nodes invalid: " + nerr; + return false; + } + } + for (const auto& ev : edges->AsArray()) { + std::string eerr; + if (!ValidateEdgeCfg(ev, eerr)) { + err = "graph.edges invalid: " + eerr; + return false; + } + } + } + return true; +} + +} // namespace rk3588 diff --git a/include/utils/logger.h b/include/utils/logger.h new file mode 100644 index 0000000..790d227 --- /dev/null +++ b/include/utils/logger.h @@ -0,0 +1,99 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace rk3588 { + +enum class LogLevel { Debug, Info, Warn, Error }; + +class Logger { +public: + static Logger& Instance() { + static Logger inst; + return inst; + } + + void SetMaxLines(size_t n) { + std::lock_guard lock(mu_); + max_lines_ = (n == 0) ? 1 : n; + TrimLocked(); + } + + void Log(LogLevel lvl, const std::string& msg) { + const std::string line = FormatLine(lvl, msg); + { + std::lock_guard lock(mu_); + lines_.push_back(line); + TrimLocked(); + } + + // Keep existing stdout/stderr logging style for easy board-side debugging. + if (lvl == LogLevel::Warn || lvl == LogLevel::Error) { + std::cerr << line << "\n"; + } else { + std::cout << line << "\n"; + } + } + + std::vector RecentLines(size_t limit) const { + std::lock_guard lock(mu_); + if (limit == 0) return {}; + if (limit > lines_.size()) limit = lines_.size(); + std::vector out; + out.reserve(limit); + const size_t start = lines_.size() - limit; + for (size_t i = start; i < lines_.size(); ++i) { + out.push_back(lines_[i]); + } + return out; + } + +private: + Logger() = default; + + static const char* LevelText(LogLevel lvl) { + switch (lvl) { + case LogLevel::Debug: return "D"; + case LogLevel::Info: return "I"; + case LogLevel::Warn: return "W"; + case LogLevel::Error: return "E"; + } + return "I"; + } + + static std::string FormatLine(LogLevel lvl, const std::string& msg) { + using namespace std::chrono; + const auto now = system_clock::now(); + const auto ms = duration_cast(now.time_since_epoch()).count(); + std::ostringstream oss; + oss << "[" << ms << "]"; + oss << "[" << LevelText(lvl) << "] "; + oss << msg; + return oss.str(); + } + + void TrimLocked() { + while (lines_.size() > max_lines_) { + lines_.pop_front(); + } + } + + mutable std::mutex mu_; + std::deque lines_; + size_t max_lines_ = 2000; +}; + +inline void LogDebug(const std::string& msg) { Logger::Instance().Log(LogLevel::Debug, msg); } +inline void LogInfo(const std::string& msg) { Logger::Instance().Log(LogLevel::Info, msg); } +inline void LogWarn(const std::string& msg) { Logger::Instance().Log(LogLevel::Warn, msg); } +inline void LogError(const std::string& msg) { Logger::Instance().Log(LogLevel::Error, msg); } + +} // namespace rk3588 diff --git a/include/utils/simple_json_writer.h b/include/utils/simple_json_writer.h new file mode 100644 index 0000000..7461fc0 --- /dev/null +++ b/include/utils/simple_json_writer.h @@ -0,0 +1,80 @@ +#pragma once + +#include +#include +#include + +#include "utils/simple_json.h" + +namespace rk3588 { + +inline std::string JsonEscapeString(std::string_view s) { + std::string out; + out.reserve(s.size() + 8); + for (char c : s) { + switch (c) { + case '\\': out += "\\\\"; break; + case '"': out += "\\\""; break; + case '\n': out += "\\n"; break; + case '\r': out += "\\r"; break; + case '\t': out += "\\t"; break; + default: + if (static_cast(c) < 0x20) { + out += "?"; + } else { + out.push_back(c); + } + } + } + return out; +} + +inline void StringifySimpleJsonTo(const SimpleJson& v, std::ostringstream& oss); + +inline void StringifyArrayTo(const SimpleJson::Array& arr, std::ostringstream& oss) { + oss << '['; + for (size_t i = 0; i < arr.size(); ++i) { + if (i) oss << ','; + StringifySimpleJsonTo(arr[i], oss); + } + oss << ']'; +} + +inline void StringifyObjectTo(const SimpleJson::Object& obj, std::ostringstream& oss) { + oss << '{'; + bool first = true; + for (const auto& kv : obj) { + if (!first) oss << ','; + first = false; + oss << '"' << JsonEscapeString(kv.first) << '"' << ':'; + StringifySimpleJsonTo(kv.second, oss); + } + oss << '}'; +} + +inline void StringifySimpleJsonTo(const SimpleJson& v, std::ostringstream& oss) { + if (v.IsNull()) { + oss << "null"; + } else if (v.IsBool()) { + oss << (v.AsBool(false) ? "true" : "false"); + } else if (v.IsNumber()) { + // Keep simple. + oss << v.AsNumber(0.0); + } else if (v.IsString()) { + oss << '"' << JsonEscapeString(v.AsString("")) << '"'; + } else if (v.IsArray()) { + StringifyArrayTo(v.AsArray(), oss); + } else if (v.IsObject()) { + StringifyObjectTo(v.AsObject(), oss); + } else { + oss << "null"; + } +} + +inline std::string StringifySimpleJson(const SimpleJson& v) { + std::ostringstream oss; + StringifySimpleJsonTo(v, oss); + return oss.str(); +} + +} // namespace rk3588 diff --git a/include/utils/thread_affinity.h b/include/utils/thread_affinity.h new file mode 100644 index 0000000..4849dac --- /dev/null +++ b/include/utils/thread_affinity.h @@ -0,0 +1,71 @@ +#pragma once + +#include +#include +#include + +#include "utils/simple_json.h" + +#if defined(__linux__) +#include +#include +#elif defined(_WIN32) +#ifndef NOMINMAX +#define NOMINMAX +#endif +#include +#endif + +namespace rk3588 { + +inline std::vector ParseCpuAffinity(const SimpleJson& node_cfg) { + std::vector out; + const SimpleJson* a = node_cfg.Find("cpu_affinity"); + if (!a || !a->IsArray()) return out; + for (const auto& v : a->AsArray()) { + if (!v.IsNumber()) continue; + const int cpu = v.AsInt(-1); + if (cpu >= 0) out.push_back(cpu); + } + return out; +} + +inline bool SetCurrentThreadAffinity(const std::vector& cpus, std::string& err) { + err.clear(); + if (cpus.empty()) return true; + +#if defined(__linux__) + cpu_set_t set; + CPU_ZERO(&set); + for (int cpu : cpus) { + if (cpu < 0 || cpu >= CPU_SETSIZE) continue; + CPU_SET(cpu, &set); + } + const int rc = pthread_setaffinity_np(pthread_self(), sizeof(set), &set); + if (rc != 0) { + err = "pthread_setaffinity_np failed"; + return false; + } + return true; +#elif defined(_WIN32) + // Windows uses a bitmask; best-effort for up to 64 CPUs in the current group. + DWORD_PTR mask = 0; + for (int cpu : cpus) { + if (cpu < 0 || cpu >= static_cast(8 * sizeof(DWORD_PTR))) continue; + mask |= (static_cast(1) << cpu); + } + if (mask == 0) return true; + const DWORD_PTR prev = SetThreadAffinityMask(GetCurrentThread(), mask); + if (prev == 0) { + err = "SetThreadAffinityMask failed"; + return false; + } + return true; +#else + (void)cpus; + err = "thread affinity not supported on this platform"; + return false; +#endif +} + +} // namespace rk3588 diff --git a/plugins/alarm/alarm_node.cpp b/plugins/alarm/alarm_node.cpp index 7bbc986..36f747f 100644 --- a/plugins/alarm/alarm_node.cpp +++ b/plugins/alarm/alarm_node.cpp @@ -234,6 +234,15 @@ public: return true; } + bool GetCustomMetrics(SimpleJson& out) const override { + std::lock_guard lock(mu_); + SimpleJson::Object o; + o["alarm_total"] = SimpleJson(static_cast(alarm_count_)); + o["processed"] = SimpleJson(static_cast(processed_frames_)); + out = SimpleJson(std::move(o)); + return true; + } + NodeStatus Process(FramePtr frame) override { if (!frame) return NodeStatus::DROP; @@ -275,7 +284,7 @@ private: std::vector> actions_; ClipAction* clip_action_ = nullptr; - std::mutex mu_; + mutable std::mutex mu_; std::shared_ptr> input_queue_; uint64_t processed_frames_ = 0; diff --git a/plugins/input_rtsp/input_rtsp_node.cpp b/plugins/input_rtsp/input_rtsp_node.cpp index 7aa078e..fcf2045 100644 --- a/plugins/input_rtsp/input_rtsp_node.cpp +++ b/plugins/input_rtsp/input_rtsp_node.cpp @@ -6,6 +6,7 @@ #include #include "node.h" +#include "utils/thread_affinity.h" #if defined(RK3588_ENABLE_MPP) extern "C" { @@ -42,6 +43,10 @@ public: use_ffmpeg_ = config.ValueOr("use_ffmpeg", false); use_mpp_ = config.ValueOr("use_mpp", true); ffmpeg_force_tcp_ = config.ValueOr("force_tcp", true); + reconnect_sec_ = config.ValueOr("reconnect_sec", 5); + reconnect_backoff_max_sec_ = config.ValueOr("reconnect_backoff_max_sec", 30); + fallback_to_stub_on_fail_ = config.ValueOr("fallback_to_stub_on_fail", false); + cpu_affinity_ = ParseCpuAffinity(config); if (ctx.output_queues.empty()) { std::cerr << "[input_rtsp] no downstream queue configured for node " << id_ << "\n"; return false; @@ -98,6 +103,14 @@ public: } private: + void ApplyAffinity() { + if (cpu_affinity_.empty()) return; + std::string aerr; + if (!SetCurrentThreadAffinity(cpu_affinity_, aerr)) { + std::cerr << "[input_rtsp] SetCurrentThreadAffinity failed: " << aerr << "\n"; + } + } + void PushToDownstream(FramePtr frame) { for (auto& q : out_queues_) { q->Push(frame); @@ -105,6 +118,7 @@ private: } void LoopStub() { + ApplyAffinity(); using namespace std::chrono; auto frame_interval = fps_ > 0 ? milliseconds(1000 / fps_) : milliseconds(40); while (running_.load()) { @@ -130,79 +144,103 @@ private: } #if defined(RK3588_ENABLE_FFMPEG) void LoopFfmpegCpu() { + ApplyAffinity(); using namespace std::chrono; - AVFormatContext* fmt_ctx = nullptr; - AVCodecContext* codec_ctx = nullptr; - AVPacket* pkt = av_packet_alloc(); - AVFrame* frm = av_frame_alloc(); - int video_stream = -1; - AVRational time_base{1, 1000}; - AVDictionary* opts = nullptr; - if (ffmpeg_force_tcp_) av_dict_set(&opts, "rtsp_transport", "tcp", 0); - if (avformat_open_input(&fmt_ctx, url_.c_str(), nullptr, &opts) < 0) { - std::cerr << "[input_rtsp] avformat_open_input failed: " << url_ << "\n"; - av_dict_free(&opts); - Cleanup(fmt_ctx, codec_ctx, pkt, frm); - LoopStub(); - return; - } - av_dict_free(&opts); - if (avformat_find_stream_info(fmt_ctx, nullptr) < 0) { - std::cerr << "[input_rtsp] avformat_find_stream_info failed\n"; - Cleanup(fmt_ctx, codec_ctx, pkt, frm); - LoopStub(); - return; - } - for (unsigned i = 0; i < fmt_ctx->nb_streams; ++i) { - if (fmt_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) { - video_stream = static_cast(i); - time_base = fmt_ctx->streams[i]->time_base; - break; - } - } - if (video_stream < 0) { - std::cerr << "[input_rtsp] no video stream\n"; - Cleanup(fmt_ctx, codec_ctx, pkt, frm); - LoopStub(); - return; - } - - const AVCodec* codec = avcodec_find_decoder(fmt_ctx->streams[video_stream]->codecpar->codec_id); - if (!codec) { - std::cerr << "[input_rtsp] decoder not found\n"; - Cleanup(fmt_ctx, codec_ctx, pkt, frm); - LoopStub(); - return; - } - codec_ctx = avcodec_alloc_context3(codec); - avcodec_parameters_to_context(codec_ctx, fmt_ctx->streams[video_stream]->codecpar); - if (avcodec_open2(codec_ctx, codec, nullptr) < 0) { - std::cerr << "[input_rtsp] avcodec_open2 failed\n"; - Cleanup(fmt_ctx, codec_ctx, pkt, frm); - LoopStub(); - return; - } + int backoff = std::max(1, reconnect_sec_); + const int backoff_max = std::max(backoff, reconnect_backoff_max_sec_); while (running_.load()) { - if (av_read_frame(fmt_ctx, pkt) < 0) { - std::this_thread::sleep_for(milliseconds(10)); + AVFormatContext* fmt_ctx = nullptr; + AVCodecContext* codec_ctx = nullptr; + AVPacket* pkt = av_packet_alloc(); + AVFrame* frm = av_frame_alloc(); + int video_stream = -1; + AVRational time_base{1, 1000}; + + AVDictionary* opts = nullptr; + if (ffmpeg_force_tcp_) av_dict_set(&opts, "rtsp_transport", "tcp", 0); + if (avformat_open_input(&fmt_ctx, url_.c_str(), nullptr, &opts) < 0) { + std::cerr << "[input_rtsp] avformat_open_input failed: " << url_ << "\n"; + av_dict_free(&opts); + Cleanup(fmt_ctx, codec_ctx, pkt, frm); + if (fallback_to_stub_on_fail_) { + LoopStub(); + return; + } + std::this_thread::sleep_for(seconds(backoff)); + backoff = std::min(backoff_max, backoff * 2); continue; } - if (pkt->stream_index != video_stream) { - av_packet_unref(pkt); + av_dict_free(&opts); + if (avformat_find_stream_info(fmt_ctx, nullptr) < 0) { + std::cerr << "[input_rtsp] avformat_find_stream_info failed\n"; + Cleanup(fmt_ctx, codec_ctx, pkt, frm); + std::this_thread::sleep_for(seconds(backoff)); + backoff = std::min(backoff_max, backoff * 2); continue; } - if (avcodec_send_packet(codec_ctx, pkt) == 0) { - while (avcodec_receive_frame(codec_ctx, frm) == 0) { - PushFrameFromAVFrame(*frm, time_base); - av_frame_unref(frm); + for (unsigned i = 0; i < fmt_ctx->nb_streams; ++i) { + if (fmt_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) { + video_stream = static_cast(i); + time_base = fmt_ctx->streams[i]->time_base; + break; } } - av_packet_unref(pkt); - } + if (video_stream < 0) { + std::cerr << "[input_rtsp] no video stream\n"; + Cleanup(fmt_ctx, codec_ctx, pkt, frm); + std::this_thread::sleep_for(seconds(backoff)); + backoff = std::min(backoff_max, backoff * 2); + continue; + } - Cleanup(fmt_ctx, codec_ctx, pkt, frm); + const AVCodec* codec = avcodec_find_decoder(fmt_ctx->streams[video_stream]->codecpar->codec_id); + if (!codec) { + std::cerr << "[input_rtsp] decoder not found\n"; + Cleanup(fmt_ctx, codec_ctx, pkt, frm); + std::this_thread::sleep_for(seconds(backoff)); + backoff = std::min(backoff_max, backoff * 2); + continue; + } + codec_ctx = avcodec_alloc_context3(codec); + avcodec_parameters_to_context(codec_ctx, fmt_ctx->streams[video_stream]->codecpar); + if (avcodec_open2(codec_ctx, codec, nullptr) < 0) { + std::cerr << "[input_rtsp] avcodec_open2 failed\n"; + Cleanup(fmt_ctx, codec_ctx, pkt, frm); + std::this_thread::sleep_for(seconds(backoff)); + backoff = std::min(backoff_max, backoff * 2); + continue; + } + + backoff = std::max(1, reconnect_sec_); // reset after successful open + int read_fail = 0; + while (running_.load()) { + if (av_read_frame(fmt_ctx, pkt) < 0) { + if (++read_fail >= 50) { + break; + } + std::this_thread::sleep_for(milliseconds(10)); + continue; + } + read_fail = 0; + if (pkt->stream_index != video_stream) { + av_packet_unref(pkt); + continue; + } + if (avcodec_send_packet(codec_ctx, pkt) == 0) { + while (avcodec_receive_frame(codec_ctx, frm) == 0) { + PushFrameFromAVFrame(*frm, time_base); + av_frame_unref(frm); + } + } + av_packet_unref(pkt); + } + + Cleanup(fmt_ctx, codec_ctx, pkt, frm); + if (!running_.load()) break; + std::this_thread::sleep_for(seconds(std::max(1, reconnect_sec_))); + } } void PushFrameFromAVFrame(const AVFrame& f, AVRational time_base) { @@ -383,78 +421,100 @@ private: }; void LoopFfmpegMpp() { + ApplyAffinity(); using namespace std::chrono; - AVFormatContext* fmt_ctx = nullptr; - AVPacket* pkt = av_packet_alloc(); - int video_stream = -1; - AVRational time_base{1, 1000}; - AVDictionary* opts = nullptr; - if (ffmpeg_force_tcp_) av_dict_set(&opts, "rtsp_transport", "tcp", 0); - if (avformat_open_input(&fmt_ctx, url_.c_str(), nullptr, &opts) < 0) { - std::cerr << "[input_rtsp] avformat_open_input failed: " << url_ << "\n"; - av_dict_free(&opts); - Cleanup(fmt_ctx, nullptr, pkt, nullptr); - LoopStub(); - return; - } - av_dict_free(&opts); - if (avformat_find_stream_info(fmt_ctx, nullptr) < 0) { - std::cerr << "[input_rtsp] avformat_find_stream_info failed\n"; - Cleanup(fmt_ctx, nullptr, pkt, nullptr); - LoopStub(); - return; - } - for (unsigned i = 0; i < fmt_ctx->nb_streams; ++i) { - if (fmt_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) { - video_stream = static_cast(i); - time_base = fmt_ctx->streams[i]->time_base; - break; - } - } - if (video_stream < 0) { - std::cerr << "[input_rtsp] no video stream\n"; - Cleanup(fmt_ctx, nullptr, pkt, nullptr); - LoopStub(); - return; - } - - MppCodingType coding = MPP_VIDEO_CodingAVC; - auto codec_id = fmt_ctx->streams[video_stream]->codecpar->codec_id; - if (codec_id == AV_CODEC_ID_H264) coding = MPP_VIDEO_CodingAVC; - else if (codec_id == AV_CODEC_ID_HEVC) coding = MPP_VIDEO_CodingHEVC; - else { - std::cerr << "[input_rtsp] unsupported codec for mpp\n"; - Cleanup(fmt_ctx, nullptr, pkt, nullptr); - LoopStub(); - return; - } - - MppDecoderWrapper dec; - if (!dec.Init(coding)) { - std::cerr << "[input_rtsp] mpp init failed\n"; - Cleanup(fmt_ctx, nullptr, pkt, nullptr); - LoopStub(); - return; - } + int backoff = std::max(1, reconnect_sec_); + const int backoff_max = std::max(backoff, reconnect_backoff_max_sec_); while (running_.load()) { - if (av_read_frame(fmt_ctx, pkt) < 0) { - std::this_thread::sleep_for(milliseconds(10)); - continue; - } - if (pkt->stream_index != video_stream) { - av_packet_unref(pkt); - continue; - } - int64_t pts_ms = pkt->pts == AV_NOPTS_VALUE ? 0 - : av_rescale_q(pkt->pts, time_base, {1, 1000}); - dec.Decode(pkt->data, pkt->size, false, pts_ms, - [&](MppFrame frm) { PushFrameFromMpp(frm); }); - av_packet_unref(pkt); - } + AVFormatContext* fmt_ctx = nullptr; + AVPacket* pkt = av_packet_alloc(); + int video_stream = -1; + AVRational time_base{1, 1000}; - Cleanup(fmt_ctx, nullptr, pkt, nullptr); + AVDictionary* opts = nullptr; + if (ffmpeg_force_tcp_) av_dict_set(&opts, "rtsp_transport", "tcp", 0); + if (avformat_open_input(&fmt_ctx, url_.c_str(), nullptr, &opts) < 0) { + std::cerr << "[input_rtsp] avformat_open_input failed: " << url_ << "\n"; + av_dict_free(&opts); + Cleanup(fmt_ctx, nullptr, pkt, nullptr); + if (fallback_to_stub_on_fail_) { + LoopStub(); + return; + } + std::this_thread::sleep_for(seconds(backoff)); + backoff = std::min(backoff_max, backoff * 2); + continue; + } + av_dict_free(&opts); + if (avformat_find_stream_info(fmt_ctx, nullptr) < 0) { + std::cerr << "[input_rtsp] avformat_find_stream_info failed\n"; + Cleanup(fmt_ctx, nullptr, pkt, nullptr); + std::this_thread::sleep_for(seconds(backoff)); + backoff = std::min(backoff_max, backoff * 2); + continue; + } + for (unsigned i = 0; i < fmt_ctx->nb_streams; ++i) { + if (fmt_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) { + video_stream = static_cast(i); + time_base = fmt_ctx->streams[i]->time_base; + break; + } + } + if (video_stream < 0) { + std::cerr << "[input_rtsp] no video stream\n"; + Cleanup(fmt_ctx, nullptr, pkt, nullptr); + std::this_thread::sleep_for(seconds(backoff)); + backoff = std::min(backoff_max, backoff * 2); + continue; + } + + MppCodingType coding = MPP_VIDEO_CodingAVC; + auto codec_id = fmt_ctx->streams[video_stream]->codecpar->codec_id; + if (codec_id == AV_CODEC_ID_H264) coding = MPP_VIDEO_CodingAVC; + else if (codec_id == AV_CODEC_ID_HEVC) coding = MPP_VIDEO_CodingHEVC; + else { + std::cerr << "[input_rtsp] unsupported codec for mpp\n"; + Cleanup(fmt_ctx, nullptr, pkt, nullptr); + std::this_thread::sleep_for(seconds(backoff)); + backoff = std::min(backoff_max, backoff * 2); + continue; + } + + MppDecoderWrapper dec; + if (!dec.Init(coding)) { + std::cerr << "[input_rtsp] mpp init failed\n"; + Cleanup(fmt_ctx, nullptr, pkt, nullptr); + std::this_thread::sleep_for(seconds(backoff)); + backoff = std::min(backoff_max, backoff * 2); + continue; + } + + backoff = std::max(1, reconnect_sec_); + int read_fail = 0; + while (running_.load()) { + if (av_read_frame(fmt_ctx, pkt) < 0) { + if (++read_fail >= 50) break; + std::this_thread::sleep_for(milliseconds(10)); + continue; + } + read_fail = 0; + if (pkt->stream_index != video_stream) { + av_packet_unref(pkt); + continue; + } + int64_t pts_ms = pkt->pts == AV_NOPTS_VALUE ? 0 + : av_rescale_q(pkt->pts, time_base, {1, 1000}); + dec.Decode(pkt->data, pkt->size, false, pts_ms, + [&](MppFrame frm) { PushFrameFromMpp(frm); }); + av_packet_unref(pkt); + } + + Cleanup(fmt_ctx, nullptr, pkt, nullptr); + if (!running_.load()) break; + std::this_thread::sleep_for(seconds(std::max(1, reconnect_sec_))); + } } void PushFrameFromMpp(MppFrame frm) { @@ -541,6 +601,11 @@ private: bool use_ffmpeg_ = false; bool use_mpp_ = true; bool ffmpeg_force_tcp_ = true; + + int reconnect_sec_ = 5; + int reconnect_backoff_max_sec_ = 30; + bool fallback_to_stub_on_fail_ = false; + std::vector cpu_affinity_; }; REGISTER_NODE(InputRtspNode, "input_rtsp"); diff --git a/plugins/publish/publish_node.cpp b/plugins/publish/publish_node.cpp index 4b1803e..3909bd2 100644 --- a/plugins/publish/publish_node.cpp +++ b/plugins/publish/publish_node.cpp @@ -726,6 +726,22 @@ public: return true; } + bool GetCustomMetrics(SimpleJson& out) const override { + std::lock_guard lock(mu_); + uint64_t clients = 0; +#if defined(RK3588_ENABLE_ZLMEDIAKIT) + for (const auto& p : zlm_pubs_) { + if (!p) continue; + clients += static_cast(p->TotalReaderCount()); + } +#endif + SimpleJson::Object o; + o["clients"] = SimpleJson(static_cast(clients)); + o["encoded_frames"] = SimpleJson(static_cast(encoded_frames_)); + out = SimpleJson(std::move(o)); + return true; + } + void Stop() override { if (input_queue_) input_queue_->Stop(); @@ -849,6 +865,10 @@ private: } } + int TotalReaderCount() const { + return media_ ? mk_media_total_reader_count(media_) : 0; + } + void Write(const EncodedPacket& pkt, const std::vector& header, bool is_h265) { if (!media_ || pkt.data.empty()) return; @@ -1216,7 +1236,7 @@ private: std::shared_ptr> input_queue_; uint64_t encoded_frames_ = 0; - std::mutex mu_; + mutable std::mutex mu_; #if defined(RK3588_ENABLE_MPP) std::unique_ptr mpp_encoder_; diff --git a/scripts/stress/__pycache__/collect_metrics.cpython-311.pyc b/scripts/stress/__pycache__/collect_metrics.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..509f3ef2365e86fb584fdf0f37113781946a93bf GIT binary patch literal 5173 zcmb6dZA=?ia@V_Sd;Q60fIz_kLU0;zzGxGggzym{Q4SEHR|V3#8@~lijP2W9m&9FP z=_%5Y$WrJjQR$a%Ps-&|1)XTsQ*-}%%H1WY`ePkginWrZN~hD^{iA9tq5QgeYu3hu zMD5#U-g`4|=FQH`n>Xf9PN$84wzfDHtcKCQ@Wg1A^}+XH7`#s)0+AtNJlB)sq&_!| zoAjOV)1Gll4Ntu=z znOJ6S!OA=N)d}-6q->B~kRzLAYT0}XG`*A8wu4AmWQ$_BtgskH#YR@FMa1<9qL5|T zS}ZeREwUMF@X6LbSz?YPiL(R=4+4sp9n#F*fdC(s z;w4-(8VUwDDHsX+X2eMN2@{hh+YZGYy^*lQho!c$OHtlSX*A*_PP0t&9O4C0z~=Lk z0=AK65%_3`3-F(knwb}bh#-1x8jXc%42sQ0ML>rG5yWd|2%u2#Jm&aSj29)%GHMKf z5{dF*jTT@B&5U@RfeA%8BzjqWKLPtyqt8dsC4FN7A0NSvkrW8TtYQ>~+?gV z-7SyYEy`TV?Mu6T3v{OO;DTk*m9jSc+g6fne{AzSvU!y9+XwGfJ=mt&JSp2q+BP!Z zn`Pa}`p0a;Bep@IZoj6o4Jo!G&33F3CZ^$TMaETjEqFC}_3g9~|`AT;7xm5`;`VD<2>lF>nS!>K!auZ{)MrTQ;6= zWJT<p;$*!Rtb9RPr!X=Z=I4c=B0GIK~oEX@LoClT9BHsOl5b=G+qYHW3nb{d#LAgt`PJRT^`GK-Dt! z5doY<`5osVL?ozRQ(Jcp4?EO=?oMM~tYI9`30iu= zpb`YEbsICZ9n~(^6U%pgnpa4*n^&NWzTjWi#B8 zmQA*A&~5M8Z2N>&w#pW?1K)i?BEibmKH~K^5;obUZ17w`FVwhcKf7!f$>jDecWaXE z;4`Y&mNF~b@ove3Px-t)L7`@8XJI#a(@h;D4Q|8l$=5df9adP-PP{`{h834R))<4l-ODc)>x=f_m@oDez{+HYL z!n3dQ@ZP<9jH+3$G43|BrfDJ2nIV=6)Nwz z)2LB!X%dQ;4#%WuO!DY6Pn46UZPmhR=-UOa$so2B&okUovZobfW<+C{8!_qvElE*Robr^m1Yp^ar*`bYUZ3HH5xlE?$|)`X^dW3YEFHLAMq0GC&n4< zG*8^r=@A^*r1(bYhaDY!Sd77Eb7CMEd_vl$KKuZ;zjk>WG%F`eML9v_HCMMV6`O@3 zcLYxb&A}n$2da+Km`O08q}lYf7!4{zplD_RD#l#Xxs}pF=+HG7YqwyFvJ;4{4=<7p2)JLY1X5*O{wg3 zik(ig)AKzU+Wgz$-wZDfs5_5*b!>h(MUSTGQI#Gw2&exTTM4Mg$1!OlO;4!w#6}WQ zyU$|Mc$ywp>2ZTJp!n5ShA?3`O%JQ|ut7Mb$ZF?#ObDdufJz54cIUODSC3xlN_Ih| zZg(dSUF*8qwR9k5Z%Er40A;aXetVUm8SkozV!c^M<>I7Lq1GSHy(vd$+R>@%?^@Pf zy-JX*w*az^QnhTC(hUIKl%qB6XjPwjp@3dnrGOFoAI*fTEP3|IL~`Pj152XPck>si z+WqO;{VC`E`Ti{HTtLa`B}xG!wx`(kG~2#PR5N>5c4UD~FZGvx`N-9jay4a3D{hQk zKl|Rq^$F!*s&ZGVv^8DYy3muUYf?s+jxO}Cno6ABY)zxmaeGYNeJuB;YP!-jU5n;S zlXua)crjJt$!xD%v}9`FM}VcYQwNLPncBMFcYN5PZ2REo$43|YGPOV@D|ck7Jz%BE z&TQ+xm7RC{)Wc)&K0vuX)%t3>_0=VdLVw^~a;{}-n=ubaG(g6?_b*K=+-EIcw5Y9z zjfG3F7(eSqs_cZpx4ZX&a_^){cBv)1*0SX_*N>#jvDuv*xe;a>-@LtJxkYV!6W*2X zFZ)05|7`G!!7qnDAO31U9eaIcICb!i^uafB3qa)Dd+x>jgLll9*I|LW>>S88?o_xx zwA^gjYB_k(iM<~16K!b^j$x-^w!D=wW2ds(V4F3ygQbv=uSC$(vBY0(UWmF z@v18)lP4EWW*sHTQ#Zt=0}6f9nW}0}SGA`czO=)qI($F+2>8KR$6e!+ROSF>hBB#<9|52iPKfoKV2^kx=r_) z>VbOt{tF}?*V_iomiw(0gQVsD5e6_1NCq$u%vL;hk-+u9U2)3FJZO6PWCQbXI|G=9 z4Nky3+*5XnVjdpDn6D`YFkf4HOfdPTiauRy`KF!(s9F7foLc;TuW3lYzv{epVLx=a zT!-s0&5`@(LA!nt==oA}t)tfeognmL4qP?qNm$ZQ0~ zLi{md0&uvf7e597ekMt9GFdY3#!rT@&+E^ILAgq-zZs%ZHQrUTgM`BZU@@R~3Z-{< vEA?(OsNYS3Kl5OmBxuJ%{1H{7QZ*UcI^Xwh|Kq#2U%x3IMU~b;(PFK%Y9toPUA2@IdB{T_x{s^w%l2uEMyfSZB&5|o@MZ)Lh^IYwlCk4# zSg|wX<9pBf&bi+`_uM=7ccD-K!S(mbW%-d0p?{H%dwJFyFFt|BR|q3arO~yuZ|WKa zeNWnxp|8;t!t|Hu8Y8io`ONXsf*vF>55e!T8w~-9y5xMK>@%XBic&U*B}q+Y)!WG? z4jRA&(eQa;uw-O|Y8h`d+n8GRJcQokrlyDrsOqwAxj+lkn87UeF8dxb-~peti+ZyM zhyCCY_&+>&zz3H@4}tk(!rr<5#|UA=>!N@mi#wLLy*0Z|$bQ#y_gjzE1Nh!8@y%Sc zxFFQNaz)DETh;<~qsbd@$9>EF?JN$XqQ~fH;cswmyRdn^XkZ2RLs+xN&2^)Y!jGVX zNN46fx9F>Ao}$oIs2<29UG=-MHh$yCraEAZ-j=<<7~5OD_ObL1Bhs?r9qn=MK#dpK z0yBYr`^%!YEvDDl(ei7LsSOvrZ{WGJJw9YI>_%DNH%!4>WFZ68;#h$#^|i4Z*oeQ6 z9&F0y9SZEtysO@g=%O!$ikz{_9StF4q}9fQfbDYo?XziT44L>t(QoW^u?Q8|c0@5y z@ZCrFoq`_^;9&Vp`fyX0GM6SA_el~ap+Bs8b@m_%|&^SW8Htm=`*AI`r z#g6HB(FMBT!MpLE<-PxPGI**RD%ZtBzV~frwe=pu=n~p`mk~=eI+Z^$kyF!hQZyt% zR0LVk4N*x-!jxdh8A%t6tdP_sV$I~zh8#=IiHahng=AKll~Z~g4tE|)dQy{BL(VFB zCKi(wo0b*za=!DtqUMZsX#9qrRid=b0NSuQO}d$rH3{3yoRn7cY%G?|88!_f-!+{x zHu>cPBnBZo-d7mFpcImO5iP31B(k|tzlo3766 zQj#KRFh4@gxUEVXd_%Dq2_925X;!|Kk4;JQO<%zocFiK}!_uq>=@}Dd2sL7gMm@G~ zUA@KxqUE%7Ocf17(v5dVWaDW{r5W4~wcB z!i=-@qNa(rZHAD~_qYf_$r{2eBoL1Y8acadD)EfGiEaA5_p$9&MNQ1;Fjq>pDOh)y zi%rc>J|j@j1|>z$X;MPeld^1kWd%!$@tO)mci5b$rNCR4?C@zVmCJzP0_kdYP{cR^ zR!2ul$*=n?Lp%TQ>6OSg4H%ACC=8&G<3 z5=yU$;K^JQYinq;ny92Co6d;0Y}P3mjlA7$9;TF7Umn{#o7FO+VS7atNMMcps)De& zjB2yMouj#f2c)rlIcMZ z!iMYh*u>cN)2e!XeE3OT#au9% z3$E#0b@$4s`O&AubkSlin#@Jl^vRh7Ykl&k@6VUlP%9L;wZD_ddG&(Zb2alckA& z`$J{CIJZPsQ>H&&^T#cJyn!fo=gO|taMz-|L@lHj(`C9rgU&HH;yHo5B%-C;s`cvC ztF6=xFA8nDckb@F%CBlc!3qkcv#We>MS5_##t&QkF!4FQ8tykEW3})xD}2o4jy>f& zN)z>PcLULEmm|s^;xzMMw8jrv{7?gV{9V;E-%b8y^3jK1Pkl4Bg1^r^867uA$Lrm_ z<%xQ*vwZed-N%;-PUw>>%=aNPauS|e_>>hsWpbyUhB~d#pc#r+d%hVgP1d({mCmn* zy8d*ia&u|)&%da3@3y*k*Ft-&(4GeJv+p>fOs$5uSBwQ^Q7O~)$dI{xKRnf4)ze>( zthGUw>GI?PU+?Vss_&0|OZ~OZVXJev>|O1MEbaMrwAQiL>ex$WFjDXBD^J!3hRfp> zZsFH;zQf{on*84SfM5;0`}4q|Cj*C8#2*g+aPx16&4EL;fz#H&>GEXdFkrdPpSk`g zT>nycjT^MMLDPAj_6eo&GGAkQwq9%xyB!vr3oiOABbA%g+{&jWccjK0vA849sB`{% zzPr8(vk+Pgl|u~{;J|ZTJSVM}&2G!ERm&*`*K;d?J_=jx$4Kw^82#fIGk(JN4~hc$ z&-ddW$LOD8%*O|PKYc)fw7Eor?8`(V>eU9pNh3eiY=- #include +#if __has_include() +#include +namespace fs = std::filesystem; +#endif + #include "utils/config_expand.h" +#include "utils/config_schema.h" +#include "utils/logger.h" +#include "utils/simple_json_writer.h" +#include "utils/thread_affinity.h" namespace rk3588 { namespace { +bool WriteTextFile(const std::string& path, const std::string& content, std::string& err) { + std::ofstream ofs(path, std::ios::binary | std::ios::trunc); + if (!ofs.is_open()) { + err = "Failed to open file for write: " + path; + return false; + } + ofs.write(content.data(), static_cast(content.size())); + if (!ofs.good()) { + err = "Failed to write file: " + path; + return false; + } + return true; +} + +bool WriteTextFileAtomic(const std::string& path, const std::string& content, std::string& err) { + err.clear(); +#if __has_include() + const std::string tmp = path + ".tmp"; + if (!WriteTextFile(tmp, content, err)) return false; + std::error_code ec; + fs::remove(path, ec); // best-effort (Windows rename requires target missing) + ec.clear(); + fs::rename(tmp, path, ec); + if (ec) { + err = "rename failed: " + ec.message(); + std::error_code ec2; + fs::remove(tmp, ec2); + return false; + } + return true; +#else + return WriteTextFile(path, content, err); +#endif +} + QueueDropStrategy ParseDropStrategy(const std::string& s, QueueDropStrategy def) { if (s == "drop_oldest") return QueueDropStrategy::DropOldest; if (s == "drop_newest") return QueueDropStrategy::DropNewest; @@ -411,6 +455,49 @@ bool Graph::TryUpdateInPlace(const SimpleJson& new_graph_cfg, size_t default_que return true; } +namespace { + +SimpleJson ReplaceNodeInGraphCfg(const SimpleJson& graph_cfg, const std::string& node_id, + const SimpleJson& new_node_cfg) { + if (!graph_cfg.IsObject()) return graph_cfg; + SimpleJson::Object obj = graph_cfg.AsObject(); + const SimpleJson* nodes = graph_cfg.Find("nodes"); + if (!nodes || !nodes->IsArray()) return graph_cfg; + SimpleJson::Array out_nodes; + out_nodes.reserve(nodes->AsArray().size()); + for (const auto& n : nodes->AsArray()) { + if (n.IsObject() && n.ValueOr("id", "") == node_id) { + out_nodes.push_back(new_node_cfg); + } else { + out_nodes.push_back(n); + } + } + obj["nodes"] = SimpleJson(std::move(out_nodes)); + return SimpleJson(std::move(obj)); +} + +} // namespace + +bool Graph::UpdateNodeConfig(const std::string& node_id, const SimpleJson& new_node_cfg, std::string& err) { + err.clear(); + for (auto& entry : nodes_) { + if (entry.id != node_id) continue; + if (!entry.enabled || !entry.node) { + err = "node not running or disabled: " + node_id; + return false; + } + if (!entry.node->UpdateConfig(new_node_cfg)) { + err = "UpdateConfig returned false"; + return false; + } + entry.config = new_node_cfg; + graph_cfg_ = ReplaceNodeInGraphCfg(graph_cfg_, node_id, new_node_cfg); + return true; + } + err = "node not found: " + node_id; + return false; +} + bool Graph::Start() { bool expected = false; if (!running_.compare_exchange_strong(expected, true)) { @@ -431,6 +518,13 @@ bool Graph::Start() { if (entry.role != "source") { if (entry.context.input_queue) { entry.worker = std::thread([this, &entry]() { + { + const auto cpus = ParseCpuAffinity(entry.config); + std::string aerr; + if (!cpus.empty() && !SetCurrentThreadAffinity(cpus, aerr)) { + LogWarn("[Graph] SetCurrentThreadAffinity failed for node " + entry.id + ": " + aerr); + } + } FramePtr frame; while (true) { if (entry.context.input_queue->Pop(frame, std::chrono::milliseconds(100))) { @@ -599,6 +693,21 @@ GraphSnapshot Graph::Snapshot() const { ns.avg_process_time_ms = (static_cast(ns_total) / 1e6) / static_cast(proc_cnt); } + { + SimpleJson cm; + if (n.node && n.node->GetCustomMetrics(cm)) { + ns.custom_metrics = cm; + if (cm.IsObject()) { + if (const SimpleJson* a = cm.Find("alarm_total"); a && a->IsNumber()) { + snap.alarm_total += static_cast(a->AsNumber(0.0)); + } + if (const SimpleJson* c = cm.Find("clients"); c && c->IsNumber()) { + snap.publish_clients += static_cast(c->AsNumber(0.0)); + } + } + } + } + const bool source_like = (n.role == "source") || !n.context.input_queue; if (source_like) { total_fps += ns.output_fps; @@ -671,6 +780,10 @@ bool GraphManager::Build(const SimpleJson& root_cfg, std::string& err) { return false; } + if (!ValidateExpandedRootConfig(expanded, err)) { + return false; + } + auto graphs_it = expanded.AsObject().find("graphs"); if (graphs_it == expanded.AsObject().end() || !graphs_it->second.IsArray()) { err = "Root config missing 'graphs' array"; @@ -714,9 +827,26 @@ bool GraphManager::Build(const SimpleJson& root_cfg, std::string& err) { last_good_root_ = expanded; default_queue_size_ = default_queue_size; default_strategy_ = default_strategy; + + if (!last_good_path_.empty()) { + std::string werr; + if (!WriteTextFileAtomic(last_good_path_, StringifySimpleJson(last_good_root_), werr)) { + LogWarn("[GraphManager] persist last_good failed: " + werr); + } + } return true; } +bool GraphManager::BuildFromFile(const std::string& path, std::string& err) { + config_path_ = path; + last_good_path_ = path + ".last_good.json"; + SimpleJson root_cfg; + if (!LoadConfigFile(path, root_cfg, err)) { + return false; + } + return Build(root_cfg, err); +} + bool GraphManager::StartAll() { std::lock_guard lock(graphs_mu_); for (auto& g : graphs_) { @@ -756,6 +886,11 @@ void GraphManager::BlockUntilStop() { } bool GraphManager::ReloadFromFile(const std::string& path, std::string& err) { + if (config_path_.empty()) { + config_path_ = path; + last_good_path_ = path + ".last_good.json"; + } + SimpleJson root_cfg; if (!LoadConfigFile(path, root_cfg, err)) { return false; @@ -766,6 +901,10 @@ bool GraphManager::ReloadFromFile(const std::string& path, std::string& err) { return false; } + if (!ValidateExpandedRootConfig(expanded, err)) { + return false; + } + auto graphs_it = expanded.AsObject().find("graphs"); if (graphs_it == expanded.AsObject().end() || !graphs_it->second.IsArray()) { err = "Root config missing 'graphs' array"; @@ -1009,9 +1148,75 @@ bool GraphManager::ReloadFromFile(const std::string& path, std::string& err) { last_good_root_ = expanded; default_queue_size_ = new_default_queue_size; default_strategy_ = new_default_strategy; + + if (!last_good_path_.empty()) { + std::string werr; + if (!WriteTextFileAtomic(last_good_path_, StringifySimpleJson(last_good_root_), werr)) { + LogWarn("[GraphManager] persist last_good failed: " + werr); + } + } return true; } +bool GraphManager::RollbackFromLastGood(std::string& err) { + err.clear(); + if (config_path_.empty()) { + err = "config_path not set"; + return false; + } + if (last_good_path_.empty()) { + err = "last_good_path not set"; + return false; + } + + SimpleJson last_good; + if (!LoadConfigFile(last_good_path_, last_good, err)) { + return false; + } + + std::string werr; + if (!WriteTextFileAtomic(config_path_, StringifySimpleJson(last_good), werr)) { + err = "failed to write rollback config: " + werr; + return false; + } + + return ReloadFromFile(config_path_, err); +} + +bool GraphManager::UpdateNodeConfig(const std::string& node_id, const std::optional& graph, + const SimpleJson& new_node_cfg, std::string& err) { + std::lock_guard lock(graphs_mu_); + if (graph && !graph->empty()) { + for (const auto& g : graphs_) { + if (!g || g->Name() != *graph) continue; + return g->UpdateNodeConfig(node_id, new_node_cfg, err); + } + err = "graph not found: " + *graph; + return false; + } + + size_t hits = 0; + Graph* found = nullptr; + for (const auto& g : graphs_) { + if (!g) continue; + NodeSnapshot tmp; + if (g->FindNodeSnapshotById(node_id, tmp)) { + found = g.get(); + ++hits; + if (hits > 1) break; + } + } + if (hits == 0 || !found) { + err = "node not found: " + node_id; + return false; + } + if (hits > 1) { + err = "node id not unique, specify ?graph="; + return false; + } + return found->UpdateNodeConfig(node_id, new_node_cfg, err); +} + std::vector GraphManager::ListGraphSnapshots() { std::vector out; std::lock_guard lock(graphs_mu_); diff --git a/src/http_server.cpp b/src/http_server.cpp index 898655b..710312c 100644 --- a/src/http_server.cpp +++ b/src/http_server.cpp @@ -9,6 +9,10 @@ #include #include +#include "utils/logger.h" +#include "utils/simple_json.h" +#include "utils/simple_json_writer.h" + #if defined(_WIN32) #ifndef NOMINMAX #define NOMINMAX @@ -168,6 +172,8 @@ static std::string GraphSnapshotJson(const GraphSnapshot& g) { oss << "\"running\":" << (g.running ? "true" : "false") << ','; oss << "\"timestamp_ms\":" << g.timestamp_ms << ','; oss << "\"total_fps\":" << g.total_fps << ','; + oss << "\"alarm_total\":" << g.alarm_total << ','; + oss << "\"publish_clients\":" << g.publish_clients << ','; oss << "\"nodes\":["; for (size_t i = 0; i < g.nodes.size(); ++i) { @@ -185,7 +191,8 @@ static std::string GraphSnapshotJson(const GraphSnapshot& g) { oss << "\"drop_total\":" << n.drop_total << ','; oss << "\"error_total\":" << n.error_total << ','; oss << "\"avg_process_time_ms\":" << n.avg_process_time_ms << ','; - oss << "\"input_queue\":" << QueueJson(n.input_queue); + oss << "\"input_queue\":" << QueueJson(n.input_queue) << ','; + oss << "\"custom_metrics\":" << StringifySimpleJson(n.custom_metrics); oss << "}"; } oss << "],"; @@ -220,7 +227,8 @@ static std::string NodeSnapshotJson(const NodeSnapshot& n) { oss << "\"drop_total\":" << n.drop_total << ','; oss << "\"error_total\":" << n.error_total << ','; oss << "\"avg_process_time_ms\":" << n.avg_process_time_ms << ','; - oss << "\"input_queue\":" << QueueJson(n.input_queue); + oss << "\"input_queue\":" << QueueJson(n.input_queue) << ','; + oss << "\"custom_metrics\":" << StringifySimpleJson(n.custom_metrics); oss << "}"; return oss.str(); } @@ -470,60 +478,149 @@ void HttpServer::ServerLoop() { // Dispatch if (req.path.rfind("/api/", 0) == 0) { - if (req.method != "GET") { - resp.status = 405; - resp.body = ErrorJson("method not allowed"); - } else if (req.path == "/api/graphs") { - auto snaps = gm_.ListGraphSnapshots(); - std::ostringstream oss; - oss << "["; - for (size_t i = 0; i < snaps.size(); ++i) { - if (i) oss << ','; + auto OkJson = [] { return std::string("{\"ok\":true}"); }; + + if (req.method == "GET") { + if (req.path == "/api/graphs") { + auto snaps = gm_.ListGraphSnapshots(); + std::ostringstream oss; + oss << "["; + for (size_t i = 0; i < snaps.size(); ++i) { + if (i) oss << ','; + oss << "{"; + oss << "\"name\":\"" << JsonEscape(snaps[i].name) << "\","; + oss << "\"running\":" << (snaps[i].running ? "true" : "false") << ','; + oss << "\"total_fps\":" << snaps[i].total_fps << ','; + oss << "\"alarm_total\":" << snaps[i].alarm_total << ','; + oss << "\"publish_clients\":" << snaps[i].publish_clients; + oss << "}"; + } + oss << "]"; + resp.body = oss.str(); + } else if (req.path.rfind("/api/graphs/", 0) == 0) { + std::string name = req.path.substr(std::string("/api/graphs/").size()); + GraphSnapshot gs; + std::string gerr; + if (!gm_.GetGraphSnapshot(name, gs, gerr)) { + resp.status = 404; + resp.body = ErrorJson(gerr); + } else { + resp.body = GraphSnapshotJson(gs); + } + } else if (req.path == "/api/logs/recent") { + auto q = ParseQuery(req.query); + size_t limit = 200; + if (auto it = q.find("limit"); it != q.end()) { + try { + limit = static_cast(std::stoul(it->second)); + } catch (...) { + limit = 200; + } + } + auto lines = Logger::Instance().RecentLines(limit); + std::ostringstream oss; oss << "{"; - oss << "\"name\":\"" << JsonEscape(snaps[i].name) << "\","; - oss << "\"running\":" << (snaps[i].running ? "true" : "false") << ','; - oss << "\"total_fps\":" << snaps[i].total_fps; - oss << "}"; - } - oss << "]"; - resp.body = oss.str(); - } else if (req.path.rfind("/api/graphs/", 0) == 0) { - std::string name = req.path.substr(std::string("/api/graphs/").size()); - GraphSnapshot gs; - std::string gerr; - if (!gm_.GetGraphSnapshot(name, gs, gerr)) { - resp.status = 404; - resp.body = ErrorJson(gerr); + oss << "\"lines\":["; + for (size_t i = 0; i < lines.size(); ++i) { + if (i) oss << ','; + oss << "\"" << JsonEscape(lines[i]) << "\""; + } + oss << "]}"; + resp.body = oss.str(); + } else if (req.path.rfind("/api/nodes/", 0) == 0) { + // /api/nodes/{id}/metrics + const std::string prefix = "/api/nodes/"; + const std::string suffix = "/metrics"; + auto pos = req.path.rfind(suffix); + if (pos == std::string::npos || pos + suffix.size() != req.path.size() || pos <= prefix.size()) { + resp.status = 404; + resp.body = ErrorJson("not found"); + } else { + std::string node_id = req.path.substr(prefix.size(), pos - prefix.size()); + auto q = ParseQuery(req.query); + std::optional graph; + if (auto it = q.find("graph"); it != q.end() && !it->second.empty()) { + graph = it->second; + } + NodeSnapshot ns; + std::string nerr; + if (!gm_.GetNodeSnapshot(node_id, graph, ns, nerr)) { + resp.status = (nerr.find("not unique") != std::string::npos) ? 409 : 404; + resp.body = ErrorJson(nerr); + } else { + resp.body = NodeSnapshotJson(ns); + } + } } else { - resp.body = GraphSnapshotJson(gs); - } - } else if (req.path.rfind("/api/nodes/", 0) == 0) { - // /api/nodes/{id}/metrics - const std::string prefix = "/api/nodes/"; - const std::string suffix = "/metrics"; - auto pos = req.path.rfind(suffix); - if (pos == std::string::npos || pos + suffix.size() != req.path.size() || pos <= prefix.size()) { resp.status = 404; resp.body = ErrorJson("not found"); - } else { - std::string node_id = req.path.substr(prefix.size(), pos - prefix.size()); - auto q = ParseQuery(req.query); - std::optional graph; - if (auto it = q.find("graph"); it != q.end() && !it->second.empty()) { - graph = it->second; } - NodeSnapshot ns; - std::string nerr; - if (!gm_.GetNodeSnapshot(node_id, graph, ns, nerr)) { - resp.status = (nerr.find("not unique") != std::string::npos) ? 409 : 404; - resp.body = ErrorJson(nerr); + } else if (req.method == "POST") { + if (req.path == "/api/config/reload") { + if (gm_.ConfigPath().empty()) { + resp.status = 500; + resp.body = ErrorJson("config_path not set"); + } else { + std::string rerr; + if (!gm_.ReloadFromFile(gm_.ConfigPath(), rerr)) { + resp.status = 500; + resp.body = ErrorJson(rerr); + } else { + resp.body = OkJson(); + } + } + } else if (req.path == "/api/config/rollback") { + std::string rerr; + if (!gm_.RollbackFromLastGood(rerr)) { + resp.status = 500; + resp.body = ErrorJson(rerr); + } else { + resp.body = OkJson(); + } + } else if (req.path.rfind("/api/nodes/", 0) == 0) { + // /api/nodes/{id}/config + const std::string prefix = "/api/nodes/"; + const std::string suffix = "/config"; + auto pos = req.path.rfind(suffix); + if (pos == std::string::npos || pos + suffix.size() != req.path.size() || pos <= prefix.size()) { + resp.status = 404; + resp.body = ErrorJson("not found"); + } else { + std::string node_id = req.path.substr(prefix.size(), pos - prefix.size()); + auto q = ParseQuery(req.query); + std::optional graph; + if (auto it = q.find("graph"); it != q.end() && !it->second.empty()) { + graph = it->second; + } + if (req.body.empty()) { + resp.status = 400; + resp.body = ErrorJson("empty body"); + } else { + SimpleJson body; + std::string jerr; + if (!ParseSimpleJson(req.body, body, jerr)) { + resp.status = 400; + resp.body = ErrorJson(jerr); + } else { + std::string uerr; + if (!gm_.UpdateNodeConfig(node_id, graph, body, uerr)) { + if (uerr.find("not unique") != std::string::npos) resp.status = 409; + else if (uerr.find("not found") != std::string::npos) resp.status = 404; + else resp.status = 400; + resp.body = ErrorJson(uerr); + } else { + resp.body = OkJson(); + } + } + } + } } else { - resp.body = NodeSnapshotJson(ns); - } + resp.status = 404; + resp.body = ErrorJson("not found"); } } else { - resp.status = 404; - resp.body = ErrorJson("not found"); + resp.status = 405; + resp.body = ErrorJson("method not allowed"); } } else { std::string path = req.path; diff --git a/src/media_server_app.cpp b/src/media_server_app.cpp index 74f97a7..23eafdd 100644 --- a/src/media_server_app.cpp +++ b/src/media_server_app.cpp @@ -189,7 +189,7 @@ int MediaServerApp::Start() { web_root = g->ValueOr("web_root", web_root); } - if (!graph_manager_.Build(root_cfg, err)) { + if (!graph_manager_.BuildFromFile(config_path_, err)) { std::cerr << "[MediaServerApp] Failed to build graphs: " << err << "\n"; return 1; }