From 811a57c361b8cb2acec5f122aa5029d9c23e9d17 Mon Sep 17 00:00:00 2001 From: sladro Date: Wed, 31 Dec 2025 09:40:36 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BC=80=E5=8F=91sprint6=E9=98=B6=E6=AE=B5?= =?UTF-8?q?=EF=BC=8C=E7=AC=AC=E4=B8=80=E6=AC=A1=E5=BC=80=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CMakeLists.txt | 5 + configs/sprint5_cam1_security_instances.json | 161 +++++ configs/sprint5_cam1_transcode_instances.json | 55 ++ include/graph_manager.h | 80 +++ include/http_server.h | 32 + include/utils/spsc_queue.h | 36 ++ src/graph_manager.cpp | 228 ++++++- src/http_server.cpp | 562 ++++++++++++++++++ src/media_server_app.cpp | 13 + web/app.js | 115 ++++ web/graph.html | 53 ++ web/index.html | 26 + web/style.css | 44 ++ 13 files changed, 1409 insertions(+), 1 deletion(-) create mode 100644 configs/sprint5_cam1_security_instances.json create mode 100644 configs/sprint5_cam1_transcode_instances.json create mode 100644 include/http_server.h create mode 100644 src/http_server.cpp create mode 100644 web/app.js create mode 100644 web/graph.html create mode 100644 web/index.html create mode 100644 web/style.css diff --git a/CMakeLists.txt b/CMakeLists.txt index 4f03544..9424c30 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -54,6 +54,7 @@ set(SRC_FILES src/graph_manager.cpp src/plugin_loader.cpp src/ai_scheduler.cpp + src/http_server.cpp src/utils/dma_alloc.cpp src/utils/config_expand.cpp ) @@ -66,6 +67,10 @@ target_include_directories(media-server ${CMAKE_SOURCE_DIR}/third_party ) target_link_libraries(media-server PRIVATE project_options Threads::Threads) + +if(WIN32) + target_link_libraries(media-server PRIVATE ws2_32) +endif() target_compile_definitions(media-server PRIVATE RK_PROJECT_VERSION="${PROJECT_VERSION}" RK_GIT_SHA="${RK_GIT_SHA}" diff --git a/configs/sprint5_cam1_security_instances.json b/configs/sprint5_cam1_security_instances.json new file mode 100644 index 0000000..a2e52fb --- /dev/null +++ b/configs/sprint5_cam1_security_instances.json @@ -0,0 +1,161 @@ +{ + "global": { + "hot_reload": { "enable": true } + }, + "queue": { "size": 8, "strategy": "drop_oldest" }, + + "templates": { + "security_pipeline": { + "nodes": [ + { + "id": "in", + "type": "input_rtsp", + "role": "source", + "enable": true, + "url": "${url}", + "fps": 25, + "width": 1920, + "height": 1080, + "use_mpp": false, + "use_ffmpeg": true, + "force_tcp": true + }, + { + "id": "pre", + "type": "preprocess", + "role": "filter", + "enable": true, + "dst_w": 640, + "dst_h": 640, + "dst_format": "rgb", + "keep_ratio": false, + "use_rga": true + }, + { + "id": "ai", + "type": "ai_yolo", + "role": "filter", + "enable": true, + "model_path": "${model_path}", + "model_version": "v8", + "num_classes": 80, + "conf": 0.5, + "nms": 0.45, + "class_filter": [] + }, + { + "id": "alarm", + "type": "alarm", + "role": "sink", + "enable": true, + "labels": [], + "rules": [ + { + "name": "object_detection", + "class_ids": [0], + "roi": { "x": 0.1, "y": 0.1, "w": 0.8, "h": 0.8 }, + "min_duration_ms": 500, + "cooldown_ms": 5000 + } + ], + "actions": { + "log": { "enable": true, "level": "info" }, + "http": { + "enable": false, + "url": "http://127.0.0.1:8080/api/alarm", + "timeout_ms": 3000, + "include_media_url": true + }, + "snapshot": { + "enable": true, + "format": "jpg", + "quality": 85, + "upload": { "type": "local", "path": "/tmp/alarms" } + }, + "clip": { + "enable": false, + "pre_sec": 5, + "post_sec": 10, + "format": "mp4", + "fps": 25, + "upload": { "type": "local", "path": "/tmp/alarms" } + } + } + }, + { + "id": "osd", + "type": "osd", + "role": "filter", + "enable": true, + "draw_bbox": true, + "draw_text": true, + "line_width": 2, + "font_scale": 1, + "labels": [] + }, + { + "id": "post", + "type": "preprocess", + "role": "filter", + "enable": true, + "dst_w": 1920, + "dst_h": 1080, + "dst_format": "nv12", + "keep_ratio": false, + "use_rga": true + }, + { + "id": "storage", + "type": "storage", + "role": "sink", + "enable": true, + "mode": "continuous", + "format": "mp4", + "codec": "h264", + "segment_sec": 300, + "path": "${rec_path}", + "filename_pattern": "%Y%m%d/%H%M%S", + "fps": 25, + "bitrate_kbps": 2000 + }, + { + "id": "pub", + "type": "publish", + "role": "sink", + "enable": true, + "codec": "h264", + "fps": 25, + "gop": 50, + "bitrate_kbps": 2000, + "use_mpp": true, + "use_ffmpeg_mux": true, + "outputs": [ + { "proto": "rtsp_server", "port": 8554, "path": "/live/${name}" } + ] + } + ], + "edges": [ + ["in", "pre"], + ["pre", "ai"], + ["ai", "alarm"], + ["ai", "osd"], + ["osd", "post"], + ["post", "storage"], + ["post", "pub"] + ] + } + }, + + "instances": [ + { + "name": "cam1", + "template": "security_pipeline", + "params": { + "name": "cam1", + "url": "rtsp://10.0.0.9:8554/cam", + "model_path": "/models/yolov8n.rknn", + "rec_path": "/rec/cam1" + } + } + ] +} diff --git a/configs/sprint5_cam1_transcode_instances.json b/configs/sprint5_cam1_transcode_instances.json new file mode 100644 index 0000000..e3f9607 --- /dev/null +++ b/configs/sprint5_cam1_transcode_instances.json @@ -0,0 +1,55 @@ +{ + "global": { + "hot_reload": { "enable": true } + }, + "queue": { "size": 8, "strategy": "drop_oldest" }, + + "templates": { + "transcode_gateway": { + "nodes": [ + { + "id": "in", + "type": "input_rtsp", + "role": "source", + "enable": true, + "url": "${url}", + "fps": 25, + "width": 1920, + "height": 1080, + "use_mpp": false, + "use_ffmpeg": true, + "force_tcp": true + }, + { + "id": "pub", + "type": "publish", + "role": "sink", + "enable": true, + "codec": "h264", + "fps": 25, + "gop": 50, + "bitrate_kbps": 2000, + "use_mpp": true, + "use_ffmpeg_mux": true, + "outputs": [ + { "proto": "rtsp_server", "port": 8554, "path": "/live/${name}" } + ] + } + ], + "edges": [ + ["in", "pub"] + ] + } + }, + + "instances": [ + { + "name": "cam1", + "template": "transcode_gateway", + "params": { + "name": "cam1", + "url": "rtsp://10.0.0.9:8554/cam" + } + } + ] +} diff --git a/include/graph_manager.h b/include/graph_manager.h index 2e7943d..1976e19 100644 --- a/include/graph_manager.h +++ b/include/graph_manager.h @@ -1,6 +1,10 @@ #pragma once +#include +#include +#include #include +#include #include #include #include @@ -14,6 +18,49 @@ namespace rk3588 { +struct QueueSnapshot { + size_t size = 0; + size_t capacity = 0; + uint64_t pushed_total = 0; + uint64_t popped_total = 0; + uint64_t dropped_total = 0; + bool stopped = false; + double pushed_fps = 0.0; + double popped_fps = 0.0; +}; + +struct EdgeSnapshot { + std::string from; + std::string to; + QueueSnapshot queue; +}; + +struct NodeSnapshot { + std::string graph; + std::string id; + std::string type; + std::string role; + bool enabled = true; + + QueueSnapshot input_queue; + double input_fps = 0.0; + double output_fps = 0.0; + + uint64_t ok_total = 0; + uint64_t drop_total = 0; + uint64_t error_total = 0; + double avg_process_time_ms = 0.0; +}; + +struct GraphSnapshot { + std::string name; + bool running = false; + uint64_t timestamp_ms = 0; + double total_fps = 0.0; + std::vector nodes; + std::vector edges; +}; + class Graph { public: explicit Graph(std::string name); @@ -27,6 +74,9 @@ public: const std::string& Name() const { return name_; } const SimpleJson& GraphConfig() const { return graph_cfg_; } + GraphSnapshot Snapshot() const; + bool FindNodeSnapshotById(const std::string& node_id, NodeSnapshot& out) const; + // Attempt in-place update via INode::UpdateConfig for nodes whose config changed. // Returns true if fully updated without rebuild. // Returns false with empty err if rebuild is required. @@ -35,6 +85,13 @@ public: QueueDropStrategy default_strategy, std::string& err); private: + struct NodeMetrics { + std::atomic ok_total{0}; + std::atomic drop_total{0}; + std::atomic error_total{0}; + std::atomic process_time_ns_total{0}; + }; + struct NodeEntry { std::string id; std::string type; @@ -44,6 +101,14 @@ private: NodeContext context; std::unique_ptr node; std::thread worker; + + std::shared_ptr metrics; + }; + + struct EdgeEntry { + std::string from; + std::string to; + std::shared_ptr> queue; }; std::string name_; @@ -51,8 +116,17 @@ private: size_t built_default_queue_size_ = 8; QueueDropStrategy built_default_strategy_ = QueueDropStrategy::DropOldest; std::vector nodes_; + std::vector edges_; std::atomic running_{false}; std::atomic stop_requested_{false}; + + // Mutable rate state (updated on snapshot collection). + mutable std::mutex rate_mu_; + mutable std::chrono::steady_clock::time_point last_rate_tp_{}; + mutable std::map last_node_in_popped_; + mutable std::map last_node_out_pushed_; + mutable std::map last_edge_pushed_; + mutable std::map last_edge_popped_; }; class GraphManager { @@ -70,6 +144,12 @@ public: bool ReloadFromFile(const std::string& path, std::string& err); + std::vector ListGraphSnapshots(); + bool GetGraphSnapshot(const std::string& name, GraphSnapshot& out, std::string& err); + // If graph is not set: auto-match when unique, otherwise return false with err describing ambiguity. + bool GetNodeSnapshot(const std::string& node_id, const std::optional& graph, + NodeSnapshot& out, std::string& err); + private: bool running_ = false; PluginLoader loader_; diff --git a/include/http_server.h b/include/http_server.h new file mode 100644 index 0000000..f7103ab --- /dev/null +++ b/include/http_server.h @@ -0,0 +1,32 @@ +#pragma once + +#include +#include +#include +#include + +#include "graph_manager.h" + +namespace rk3588 { + +class HttpServer { +public: + HttpServer(GraphManager& gm, int port, std::string web_root); + ~HttpServer(); + + bool Start(); + void Stop(); + +private: + void ServerLoop(); + + GraphManager& gm_; + int port_ = 9000; + std::string web_root_; + + std::atomic running_{false}; + std::atomic listen_sock_{-1}; + std::thread worker_; +}; + +} // namespace rk3588 diff --git a/include/utils/spsc_queue.h b/include/utils/spsc_queue.h index 62bfa01..a03e0f5 100644 --- a/include/utils/spsc_queue.h +++ b/include/utils/spsc_queue.h @@ -5,6 +5,7 @@ #include #include #include +#include namespace rk3588 { @@ -17,6 +18,15 @@ enum class QueueDropStrategy { template class SpscQueue { public: + struct Stats { + size_t size = 0; + size_t capacity = 0; + size_t dropped = 0; + size_t pushed = 0; + size_t popped = 0; + bool stopped = false; + }; + SpscQueue(size_t capacity, QueueDropStrategy strategy) : capacity_(capacity), strategy_(strategy) {} @@ -38,6 +48,7 @@ public: } } queue_.push_back(std::move(item)); + ++pushed_; data_cv_.notify_one(); return true; } @@ -50,6 +61,7 @@ public: if (queue_.empty()) return false; out = std::move(queue_.front()); queue_.pop_front(); + ++popped_; space_cv_.notify_one(); return true; } @@ -78,6 +90,28 @@ public: return dropped_; } + size_t PushedCount() const { + std::lock_guard lock(mu_); + return pushed_; + } + + size_t PoppedCount() const { + std::lock_guard lock(mu_); + return popped_; + } + + Stats GetStats() const { + std::lock_guard lock(mu_); + Stats s; + s.size = queue_.size(); + s.capacity = capacity_; + s.dropped = dropped_; + s.pushed = pushed_; + s.popped = popped_; + s.stopped = stop_; + return s; + } + private: size_t capacity_ = 0; QueueDropStrategy strategy_ = QueueDropStrategy::DropOldest; @@ -87,6 +121,8 @@ private: std::deque queue_; bool stop_ = false; size_t dropped_ = 0; + size_t pushed_ = 0; + size_t popped_ = 0; }; } // namespace rk3588 diff --git a/src/graph_manager.cpp b/src/graph_manager.cpp index f339c30..4c29cbb 100644 --- a/src/graph_manager.cpp +++ b/src/graph_manager.cpp @@ -111,6 +111,17 @@ bool Graph::Build(const SimpleJson& graph_cfg, PluginLoader& loader, size_t defa built_default_queue_size_ = default_queue_size; built_default_strategy_ = default_strategy; + nodes_.clear(); + edges_.clear(); + { + std::lock_guard lock(rate_mu_); + last_rate_tp_ = {}; + last_node_in_popped_.clear(); + last_node_out_pushed_.clear(); + last_edge_pushed_.clear(); + last_edge_popped_.clear(); + } + const auto& obj = graph_cfg.AsObject(); auto name_it = obj.find("name"); if (name_it != obj.end() && name_it->second.IsString()) { @@ -135,6 +146,7 @@ bool Graph::Build(const SimpleJson& graph_cfg, PluginLoader& loader, size_t defa entry.type = node_val.ValueOr("type", ""); entry.role = node_val.ValueOr("role", ""); entry.enabled = node_val.ValueOr("enable", true); + entry.metrics = std::make_shared(); if (entry.id.empty() || entry.type.empty()) { err = "Node missing id or type"; @@ -201,6 +213,7 @@ bool Graph::Build(const SimpleJson& graph_cfg, PluginLoader& loader, size_t defa } enabled_edges.emplace_back(e.from, e.to); + edges_.push_back(EdgeEntry{e.from, e.to, queue}); } // Role validation @@ -253,6 +266,8 @@ bool Graph::Build(const SimpleJson& graph_cfg, PluginLoader& loader, size_t defa return false; } + } + // Instantiation for (auto& entry : nodes_) { @@ -419,7 +434,27 @@ bool Graph::Start() { FramePtr frame; while (true) { if (entry.context.input_queue->Pop(frame, std::chrono::milliseconds(100))) { - if (frame) entry.node->Process(frame); + if (!frame) { + continue; + } + + const auto t0 = std::chrono::steady_clock::now(); + NodeStatus st = entry.node->Process(frame); + const auto t1 = std::chrono::steady_clock::now(); + const auto ns = std::chrono::duration_cast(t1 - t0).count(); + if (entry.metrics) { + if (ns > 0) { + entry.metrics->process_time_ns_total.fetch_add(static_cast(ns), + std::memory_order_relaxed); + } + if (st == NodeStatus::OK) { + entry.metrics->ok_total.fetch_add(1, std::memory_order_relaxed); + } else if (st == NodeStatus::DROP) { + entry.metrics->drop_total.fetch_add(1, std::memory_order_relaxed); + } else { + entry.metrics->error_total.fetch_add(1, std::memory_order_relaxed); + } + } continue; } @@ -482,6 +517,140 @@ void Graph::Stop() { running_.store(false); } +namespace { + +uint64_t NowEpochMs() { + using namespace std::chrono; + return static_cast(duration_cast(system_clock::now().time_since_epoch()).count()); +} + +QueueSnapshot ToQueueSnapshot(const SpscQueue::Stats& st) { + QueueSnapshot q; + q.size = st.size; + q.capacity = st.capacity; + q.dropped_total = static_cast(st.dropped); + q.pushed_total = static_cast(st.pushed); + q.popped_total = static_cast(st.popped); + q.stopped = st.stopped; + return q; +} + +} // namespace + +GraphSnapshot Graph::Snapshot() const { + GraphSnapshot snap; + snap.name = name_; + snap.running = running_.load(); + snap.timestamp_ms = NowEpochMs(); + + const auto now_tp = std::chrono::steady_clock::now(); + double dt_sec = 0.0; + { + std::lock_guard lock(rate_mu_); + if (last_rate_tp_.time_since_epoch().count() != 0) { + dt_sec = std::chrono::duration_cast>(now_tp - last_rate_tp_).count(); + } + last_rate_tp_ = now_tp; + } + + double total_fps = 0.0; + for (const auto& n : nodes_) { + if (!n.enabled || !n.node) continue; + + NodeSnapshot ns; + ns.graph = name_; + ns.id = n.id; + ns.type = n.type; + ns.role = n.role; + ns.enabled = n.enabled; + + uint64_t in_popped = 0; + if (n.context.input_queue) { + auto st = n.context.input_queue->GetStats(); + ns.input_queue = ToQueueSnapshot(st); + in_popped = ns.input_queue.popped_total; + } + + uint64_t out_pushed = 0; + for (const auto& oq : n.context.output_queues) { + if (!oq) continue; + out_pushed += static_cast(oq->PushedCount()); + } + + { + std::lock_guard lock(rate_mu_); + const uint64_t last_in = last_node_in_popped_[n.id]; + const uint64_t last_out = last_node_out_pushed_[n.id]; + if (dt_sec > 0.0) { + if (in_popped >= last_in) ns.input_fps = static_cast(in_popped - last_in) / dt_sec; + if (out_pushed >= last_out) ns.output_fps = static_cast(out_pushed - last_out) / dt_sec; + } + last_node_in_popped_[n.id] = in_popped; + last_node_out_pushed_[n.id] = out_pushed; + } + + if (n.metrics) { + ns.ok_total = n.metrics->ok_total.load(std::memory_order_relaxed); + ns.drop_total = n.metrics->drop_total.load(std::memory_order_relaxed); + ns.error_total = n.metrics->error_total.load(std::memory_order_relaxed); + } + const uint64_t proc_cnt = ns.ok_total + ns.drop_total + ns.error_total; + const uint64_t ns_total = n.metrics ? n.metrics->process_time_ns_total.load(std::memory_order_relaxed) : 0; + if (proc_cnt > 0) { + ns.avg_process_time_ms = (static_cast(ns_total) / 1e6) / static_cast(proc_cnt); + } + + const bool source_like = (n.role == "source") || !n.context.input_queue; + if (source_like) { + total_fps += ns.output_fps; + } + + snap.nodes.push_back(std::move(ns)); + } + + for (const auto& e : edges_) { + if (!e.queue) continue; + EdgeSnapshot es; + es.from = e.from; + es.to = e.to; + const auto st = e.queue->GetStats(); + es.queue = ToQueueSnapshot(st); + const std::string key = e.from + "->" + e.to; + + { + std::lock_guard lock(rate_mu_); + const uint64_t last_pushed = last_edge_pushed_[key]; + const uint64_t last_popped = last_edge_popped_[key]; + if (dt_sec > 0.0) { + if (es.queue.pushed_total >= last_pushed) { + es.queue.pushed_fps = static_cast(es.queue.pushed_total - last_pushed) / dt_sec; + } + if (es.queue.popped_total >= last_popped) { + es.queue.popped_fps = static_cast(es.queue.popped_total - last_popped) / dt_sec; + } + } + last_edge_pushed_[key] = es.queue.pushed_total; + last_edge_popped_[key] = es.queue.popped_total; + } + + snap.edges.push_back(std::move(es)); + } + + snap.total_fps = total_fps; + return snap; +} + +bool Graph::FindNodeSnapshotById(const std::string& node_id, NodeSnapshot& out) const { + auto snap = Snapshot(); + for (auto& n : snap.nodes) { + if (n.id == node_id) { + out = std::move(n); + return true; + } + } + return false; +} + GraphManager::GraphManager(std::string plugin_dir) : loader_(std::move(plugin_dir)) {} @@ -725,4 +894,61 @@ bool GraphManager::ReloadFromFile(const std::string& path, std::string& err) { return true; } +std::vector GraphManager::ListGraphSnapshots() { + std::vector out; + std::lock_guard lock(graphs_mu_); + out.reserve(graphs_.size()); + for (const auto& g : graphs_) { + if (!g) continue; + out.push_back(g->Snapshot()); + } + return out; +} + +bool GraphManager::GetGraphSnapshot(const std::string& name, GraphSnapshot& out, std::string& err) { + std::lock_guard lock(graphs_mu_); + for (const auto& g : graphs_) { + if (g && g->Name() == name) { + out = g->Snapshot(); + return true; + } + } + err = "graph not found: " + name; + return false; +} + +bool GraphManager::GetNodeSnapshot(const std::string& node_id, const std::optional& graph, + NodeSnapshot& out, std::string& err) { + std::lock_guard lock(graphs_mu_); + if (graph && !graph->empty()) { + for (const auto& g : graphs_) { + if (!g || g->Name() != *graph) continue; + if (g->FindNodeSnapshotById(node_id, out)) return true; + err = "node not found: " + node_id + " in graph " + *graph; + return false; + } + err = "graph not found: " + *graph; + return false; + } + + // Auto-match when unique. + size_t hits = 0; + for (const auto& g : graphs_) { + if (!g) continue; + NodeSnapshot tmp; + if (g->FindNodeSnapshotById(node_id, tmp)) { + out = std::move(tmp); + ++hits; + if (hits > 1) break; + } + } + if (hits == 1) return true; + if (hits == 0) { + err = "node not found: " + node_id; + return false; + } + err = "node id is not unique; specify ?graph="; + return false; +} + } // namespace rk3588 diff --git a/src/http_server.cpp b/src/http_server.cpp new file mode 100644 index 0000000..898655b --- /dev/null +++ b/src/http_server.cpp @@ -0,0 +1,562 @@ +#include "http_server.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#if defined(_WIN32) +#ifndef NOMINMAX +#define NOMINMAX +#endif +#include +#include +#else +#include +#include +#include +#include +#include +#include +#endif + +namespace rk3588 { + +namespace { + +#if defined(_WIN32) +using Sock = SOCKET; +constexpr Sock kInvalidSock = INVALID_SOCKET; +static bool InitSockets() { + WSADATA wsa; + return WSAStartup(MAKEWORD(2, 2), &wsa) == 0; +} +static void CleanupSockets() { + WSACleanup(); +} +static void CloseSock(Sock s) { + if (s != kInvalidSock) closesocket(s); +} +static bool SetNonBlocking(Sock s) { + u_long mode = 1; + return ioctlsocket(s, FIONBIO, &mode) == 0; +} +#else +using Sock = int; +constexpr Sock kInvalidSock = -1; +static bool InitSockets() { return true; } +static void CleanupSockets() {} +static void CloseSock(Sock s) { + if (s != kInvalidSock) close(s); +} +static bool SetNonBlocking(Sock s) { + int flags = fcntl(s, F_GETFL, 0); + if (flags < 0) return false; + return fcntl(s, F_SETFL, flags | O_NONBLOCK) == 0; +} +#endif + +static std::string ToLower(std::string s) { + for (char& c : s) c = static_cast(std::tolower(static_cast(c))); + return s; +} + +static void Trim(std::string& s) { + auto not_space = [](unsigned char ch) { return !std::isspace(ch); }; + s.erase(s.begin(), std::find_if(s.begin(), s.end(), not_space)); + s.erase(std::find_if(s.rbegin(), s.rend(), not_space).base(), s.end()); +} + +static std::map ParseQuery(const std::string& q) { + std::map out; + size_t i = 0; + while (i < q.size()) { + size_t amp = q.find('&', i); + std::string part = q.substr(i, (amp == std::string::npos) ? std::string::npos : amp - i); + size_t eq = part.find('='); + std::string k = (eq == std::string::npos) ? part : part.substr(0, eq); + std::string v = (eq == std::string::npos) ? "" : part.substr(eq + 1); + if (!k.empty()) out[k] = v; + if (amp == std::string::npos) break; + i = amp + 1; + } + return out; +} + +static std::string JsonEscape(std::string_view s) { + std::string out; + out.reserve(s.size() + 8); + for (char c : s) { + switch (c) { + case '\\': out += "\\\\"; break; + case '"': out += "\\\""; break; + case '\n': out += "\\n"; break; + case '\r': out += "\\r"; break; + case '\t': out += "\\t"; break; + default: + if (static_cast(c) < 0x20) { + out += "?"; + } else { + out.push_back(c); + } + } + } + return out; +} + +static std::string StatusText(int code) { + switch (code) { + case 200: return "OK"; + case 400: return "Bad Request"; + case 404: return "Not Found"; + case 405: return "Method Not Allowed"; + case 409: return "Conflict"; + case 500: return "Internal Server Error"; + default: return ""; + } +} + +static std::string ContentTypeForPath(const std::string& path) { + auto pos = path.find_last_of('.'); + std::string ext = (pos == std::string::npos) ? "" : ToLower(path.substr(pos + 1)); + if (ext == "html") return "text/html; charset=utf-8"; + if (ext == "js") return "application/javascript; charset=utf-8"; + if (ext == "css") return "text/css; charset=utf-8"; + if (ext == "json") return "application/json; charset=utf-8"; + return "application/octet-stream"; +} + +static bool IsSafeStaticPath(const std::string& path) { + if (path.empty() || path[0] != '/') return false; + if (path.find('\\') != std::string::npos) return false; + if (path.find("..") != std::string::npos) return false; + return true; +} + +static bool ReadFile(const std::string& file_path, std::string& out) { + std::ifstream ifs(file_path, std::ios::binary); + if (!ifs.is_open()) return false; + std::ostringstream oss; + oss << ifs.rdbuf(); + out = oss.str(); + return true; +} + +static std::string QueueJson(const QueueSnapshot& q) { + std::ostringstream oss; + oss << "{"; + oss << "\"size\":" << q.size << ','; + oss << "\"capacity\":" << q.capacity << ','; + oss << "\"pushed_total\":" << q.pushed_total << ','; + oss << "\"popped_total\":" << q.popped_total << ','; + oss << "\"dropped_total\":" << q.dropped_total << ','; + oss << "\"stopped\":" << (q.stopped ? "true" : "false") << ','; + oss << "\"pushed_fps\":" << q.pushed_fps << ','; + oss << "\"popped_fps\":" << q.popped_fps; + oss << "}"; + return oss.str(); +} + +static std::string GraphSnapshotJson(const GraphSnapshot& g) { + std::ostringstream oss; + oss << "{"; + oss << "\"name\":\"" << JsonEscape(g.name) << "\","; + oss << "\"running\":" << (g.running ? "true" : "false") << ','; + oss << "\"timestamp_ms\":" << g.timestamp_ms << ','; + oss << "\"total_fps\":" << g.total_fps << ','; + + oss << "\"nodes\":["; + for (size_t i = 0; i < g.nodes.size(); ++i) { + const auto& n = g.nodes[i]; + if (i) oss << ','; + oss << "{"; + oss << "\"graph\":\"" << JsonEscape(n.graph) << "\","; + oss << "\"id\":\"" << JsonEscape(n.id) << "\","; + oss << "\"type\":\"" << JsonEscape(n.type) << "\","; + oss << "\"role\":\"" << JsonEscape(n.role) << "\","; + oss << "\"enabled\":" << (n.enabled ? "true" : "false") << ','; + oss << "\"input_fps\":" << n.input_fps << ','; + oss << "\"output_fps\":" << n.output_fps << ','; + oss << "\"ok_total\":" << n.ok_total << ','; + oss << "\"drop_total\":" << n.drop_total << ','; + oss << "\"error_total\":" << n.error_total << ','; + oss << "\"avg_process_time_ms\":" << n.avg_process_time_ms << ','; + oss << "\"input_queue\":" << QueueJson(n.input_queue); + oss << "}"; + } + oss << "],"; + + oss << "\"edges\":["; + for (size_t i = 0; i < g.edges.size(); ++i) { + const auto& e = g.edges[i]; + if (i) oss << ','; + oss << "{"; + oss << "\"from\":\"" << JsonEscape(e.from) << "\","; + oss << "\"to\":\"" << JsonEscape(e.to) << "\","; + oss << "\"queue\":" << QueueJson(e.queue); + oss << "}"; + } + oss << "]"; + + oss << "}"; + return oss.str(); +} + +static std::string NodeSnapshotJson(const NodeSnapshot& n) { + std::ostringstream oss; + oss << "{"; + oss << "\"graph\":\"" << JsonEscape(n.graph) << "\","; + oss << "\"id\":\"" << JsonEscape(n.id) << "\","; + oss << "\"type\":\"" << JsonEscape(n.type) << "\","; + oss << "\"role\":\"" << JsonEscape(n.role) << "\","; + oss << "\"enabled\":" << (n.enabled ? "true" : "false") << ','; + oss << "\"input_fps\":" << n.input_fps << ','; + oss << "\"output_fps\":" << n.output_fps << ','; + oss << "\"ok_total\":" << n.ok_total << ','; + oss << "\"drop_total\":" << n.drop_total << ','; + oss << "\"error_total\":" << n.error_total << ','; + oss << "\"avg_process_time_ms\":" << n.avg_process_time_ms << ','; + oss << "\"input_queue\":" << QueueJson(n.input_queue); + oss << "}"; + return oss.str(); +} + +static std::string ErrorJson(const std::string& msg) { + std::ostringstream oss; + oss << "{\"error\":\"" << JsonEscape(msg) << "\"}"; + return oss.str(); +} + +struct HttpRequest { + std::string method; + std::string target; + std::string path; + std::string query; + std::map headers; + std::string body; +}; + +struct HttpResponse { + int status = 200; + std::string content_type = "application/json; charset=utf-8"; + std::string body; +}; + +static bool RecvUntil(Sock s, std::string& data, const std::string& needle, size_t max_bytes) { + char buf[4096]; + while (data.find(needle) == std::string::npos) { + if (data.size() > max_bytes) return false; +#if defined(_WIN32) + int n = recv(s, buf, static_cast(sizeof(buf)), 0); +#else + int n = static_cast(recv(s, buf, sizeof(buf), 0)); +#endif + if (n <= 0) return false; + data.append(buf, buf + n); + } + return true; +} + +static bool ParseHttpRequest(Sock s, HttpRequest& req, std::string& err) { + std::string data; + if (!RecvUntil(s, data, "\r\n\r\n", 1024 * 1024)) { + err = "failed to read headers"; + return false; + } + + size_t header_end = data.find("\r\n\r\n"); + std::string header_part = data.substr(0, header_end); + std::string remain = data.substr(header_end + 4); + + std::istringstream iss(header_part); + std::string line; + if (!std::getline(iss, line)) { + err = "empty request"; + return false; + } + if (!line.empty() && line.back() == '\r') line.pop_back(); + { + std::istringstream ls(line); + std::string version; + if (!(ls >> req.method >> req.target >> version)) { + err = "bad request line"; + return false; + } + } + while (std::getline(iss, line)) { + if (!line.empty() && line.back() == '\r') line.pop_back(); + if (line.empty()) continue; + auto pos = line.find(':'); + if (pos == std::string::npos) continue; + std::string k = ToLower(line.substr(0, pos)); + std::string v = line.substr(pos + 1); + Trim(v); + req.headers[k] = v; + } + + // split path/query + { + auto qpos = req.target.find('?'); + if (qpos == std::string::npos) { + req.path = req.target; + } else { + req.path = req.target.substr(0, qpos); + req.query = req.target.substr(qpos + 1); + } + } + + size_t content_len = 0; + if (auto it = req.headers.find("content-length"); it != req.headers.end()) { + try { + content_len = static_cast(std::stoul(it->second)); + } catch (...) { + content_len = 0; + } + } + req.body = remain; + while (req.body.size() < content_len) { + char buf[4096]; +#if defined(_WIN32) + int n = recv(s, buf, static_cast(sizeof(buf)), 0); +#else + int n = static_cast(recv(s, buf, sizeof(buf), 0)); +#endif + if (n <= 0) break; + req.body.append(buf, buf + n); + } + if (req.body.size() > content_len) req.body.resize(content_len); + return true; +} + +static bool SendAll(Sock s, const std::string& data) { + size_t sent = 0; + while (sent < data.size()) { +#if defined(_WIN32) + int n = send(s, data.data() + sent, static_cast(data.size() - sent), 0); +#else + ssize_t n = send(s, data.data() + sent, data.size() - sent, 0); +#endif + if (n <= 0) return false; + sent += static_cast(n); + } + return true; +} + +static std::string MakeHttpResponse(const HttpResponse& res) { + std::ostringstream oss; + oss << "HTTP/1.1 " << res.status << " " << StatusText(res.status) << "\r\n"; + oss << "Content-Type: " << res.content_type << "\r\n"; + oss << "Content-Length: " << res.body.size() << "\r\n"; + oss << "Connection: close\r\n"; + oss << "\r\n"; + oss << res.body; + return oss.str(); +} + +} // namespace + +HttpServer::HttpServer(GraphManager& gm, int port, std::string web_root) + : gm_(gm), port_(port), web_root_(std::move(web_root)) {} + +HttpServer::~HttpServer() { + Stop(); +} + +bool HttpServer::Start() { + bool expected = false; + if (!running_.compare_exchange_strong(expected, true)) { + return true; + } + worker_ = std::thread(&HttpServer::ServerLoop, this); + return true; +} + +void HttpServer::Stop() { + bool was_running = running_.exchange(false); + if (!was_running) return; + + const int64_t ls = listen_sock_.exchange(-1); + if (ls != -1) { + CloseSock(static_cast(ls)); + } + if (worker_.joinable()) worker_.join(); +} + +void HttpServer::ServerLoop() { + if (!InitSockets()) { + std::cerr << "[HttpServer] socket init failed\n"; + running_.store(false); + return; + } + + Sock srv = socket(AF_INET, SOCK_STREAM, 0); + if (srv == kInvalidSock) { + std::cerr << "[HttpServer] socket() failed\n"; + CleanupSockets(); + running_.store(false); + return; + } + + int opt = 1; +#if defined(_WIN32) + setsockopt(srv, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast(&opt), sizeof(opt)); +#else + setsockopt(srv, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); +#endif + + sockaddr_in addr{}; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(INADDR_ANY); + addr.sin_port = htons(static_cast(port_)); + if (bind(srv, reinterpret_cast(&addr), sizeof(addr)) != 0) { + std::cerr << "[HttpServer] bind failed on port " << port_ << "\n"; + CloseSock(srv); + CleanupSockets(); + running_.store(false); + return; + } + if (listen(srv, 16) != 0) { + std::cerr << "[HttpServer] listen failed\n"; + CloseSock(srv); + CleanupSockets(); + running_.store(false); + return; + } + (void)SetNonBlocking(srv); + listen_sock_.store(static_cast(srv)); + std::cout << "[HttpServer] listening on 0.0.0.0:" << port_ << " (web_root=" << web_root_ << ")\n"; + + while (running_.load()) { + fd_set rfds; + FD_ZERO(&rfds); + FD_SET(srv, &rfds); + timeval tv{}; + tv.tv_sec = 0; + tv.tv_usec = 200000; +#if defined(_WIN32) + int ret = select(0, &rfds, nullptr, nullptr, &tv); +#else + int ret = select(srv + 1, &rfds, nullptr, nullptr, &tv); +#endif + if (!running_.load()) break; + if (ret <= 0) continue; + if (!FD_ISSET(srv, &rfds)) continue; + + sockaddr_in cli{}; +#if defined(_WIN32) + int len = sizeof(cli); +#else + socklen_t len = sizeof(cli); +#endif + Sock c = accept(srv, reinterpret_cast(&cli), &len); + if (c == kInvalidSock) continue; + + HttpRequest req; + std::string perr; + HttpResponse resp; + + if (!ParseHttpRequest(c, req, perr)) { + resp.status = 400; + resp.body = ErrorJson(perr); + auto raw = MakeHttpResponse(resp); + (void)SendAll(c, raw); + CloseSock(c); + continue; + } + + // Dispatch + if (req.path.rfind("/api/", 0) == 0) { + if (req.method != "GET") { + resp.status = 405; + resp.body = ErrorJson("method not allowed"); + } else if (req.path == "/api/graphs") { + auto snaps = gm_.ListGraphSnapshots(); + std::ostringstream oss; + oss << "["; + for (size_t i = 0; i < snaps.size(); ++i) { + if (i) oss << ','; + oss << "{"; + oss << "\"name\":\"" << JsonEscape(snaps[i].name) << "\","; + oss << "\"running\":" << (snaps[i].running ? "true" : "false") << ','; + oss << "\"total_fps\":" << snaps[i].total_fps; + oss << "}"; + } + oss << "]"; + resp.body = oss.str(); + } else if (req.path.rfind("/api/graphs/", 0) == 0) { + std::string name = req.path.substr(std::string("/api/graphs/").size()); + GraphSnapshot gs; + std::string gerr; + if (!gm_.GetGraphSnapshot(name, gs, gerr)) { + resp.status = 404; + resp.body = ErrorJson(gerr); + } else { + resp.body = GraphSnapshotJson(gs); + } + } else if (req.path.rfind("/api/nodes/", 0) == 0) { + // /api/nodes/{id}/metrics + const std::string prefix = "/api/nodes/"; + const std::string suffix = "/metrics"; + auto pos = req.path.rfind(suffix); + if (pos == std::string::npos || pos + suffix.size() != req.path.size() || pos <= prefix.size()) { + resp.status = 404; + resp.body = ErrorJson("not found"); + } else { + std::string node_id = req.path.substr(prefix.size(), pos - prefix.size()); + auto q = ParseQuery(req.query); + std::optional graph; + if (auto it = q.find("graph"); it != q.end() && !it->second.empty()) { + graph = it->second; + } + NodeSnapshot ns; + std::string nerr; + if (!gm_.GetNodeSnapshot(node_id, graph, ns, nerr)) { + resp.status = (nerr.find("not unique") != std::string::npos) ? 409 : 404; + resp.body = ErrorJson(nerr); + } else { + resp.body = NodeSnapshotJson(ns); + } + } + } else { + resp.status = 404; + resp.body = ErrorJson("not found"); + } + } else { + std::string path = req.path; + if (path == "/") path = "/index.html"; + if (!IsSafeStaticPath(path)) { + resp.status = 404; + resp.body = ErrorJson("not found"); + } else { + std::string full = web_root_; + if (!full.empty() && (full.back() == '/' || full.back() == '\\')) { + full.pop_back(); + } + full += path; + std::string content; + if (!ReadFile(full, content)) { + resp.status = 404; + resp.body = ErrorJson("not found"); + } else { + resp.status = 200; + resp.content_type = ContentTypeForPath(path); + resp.body = std::move(content); + } + } + } + + auto raw = MakeHttpResponse(resp); + (void)SendAll(c, raw); + CloseSock(c); + } + + listen_sock_.store(-1); + CloseSock(srv); + CleanupSockets(); +} + +} // namespace rk3588 diff --git a/src/media_server_app.cpp b/src/media_server_app.cpp index 8b624ec..74f97a7 100644 --- a/src/media_server_app.cpp +++ b/src/media_server_app.cpp @@ -22,6 +22,7 @@ namespace fs = std::filesystem; #endif #include "graph_manager.h" +#include "http_server.h" #include "utils/simple_json.h" namespace rk3588 { @@ -181,6 +182,13 @@ int MediaServerApp::Start() { return 1; } + int metrics_port = 9000; + std::string web_root = "web"; + if (const SimpleJson* g = root_cfg.Find("global")) { + metrics_port = g->ValueOr("metrics_port", metrics_port); + web_root = g->ValueOr("web_root", web_root); + } + if (!graph_manager_.Build(root_cfg, err)) { std::cerr << "[MediaServerApp] Failed to build graphs: " << err << "\n"; return 1; @@ -195,6 +203,9 @@ int MediaServerApp::Start() { return 1; } + HttpServer http(graph_manager_, metrics_port, web_root); + (void)http.Start(); + std::cout << "[MediaServerApp] Running. Press Ctrl+C to stop.\n"; std::atomic stop_watch{false}; @@ -214,6 +225,8 @@ int MediaServerApp::Start() { graph_manager_.BlockUntilStop(); + http.Stop(); + stop_watch.store(true); if (watcher.joinable()) watcher.join(); std::cout << "[MediaServerApp] Shutdown complete.\n"; diff --git a/web/app.js b/web/app.js new file mode 100644 index 0000000..890cffa --- /dev/null +++ b/web/app.js @@ -0,0 +1,115 @@ +function qs(name) { + const url = new URL(window.location.href); + return url.searchParams.get(name); +} + +async function fetchJson(url) { + const res = await fetch(url, { cache: "no-store" }); + const text = await res.text(); + let data; + try { + data = JSON.parse(text); + } catch { + throw new Error(`Bad JSON from ${url}: ${text.slice(0, 200)}`); + } + if (!res.ok) { + const msg = data && data.error ? data.error : `HTTP ${res.status}`; + throw new Error(msg); + } + return data; +} + +function fmt(n, digits = 1) { + if (typeof n !== "number" || !isFinite(n)) return "-"; + return n.toFixed(digits); +} + +async function updateIndex() { + const body = document.getElementById("graphs-body"); + if (!body) return; + + let graphs = []; + try { + graphs = await fetchJson("/api/graphs"); + } catch (e) { + body.innerHTML = `${e.message}`; + return; + } + + body.innerHTML = ""; + for (const g of graphs) { + const tr = document.createElement("tr"); + const statusCls = g.running ? "status-running" : "status-stopped"; + tr.innerHTML = ` + ${g.name} + ${g.running ? "Running" : "Stopped"} + ${fmt(g.total_fps)} + `; + body.appendChild(tr); + } +} + +async function updateGraph() { + const name = qs("name"); + const nodesBody = document.getElementById("nodes-body"); + const edgesBody = document.getElementById("edges-body"); + if (!name || !nodesBody || !edgesBody) return; + + const title = document.getElementById("graph-title"); + if (title) title.textContent = `Graph: ${name}`; + + let g; + try { + g = await fetchJson(`/api/graphs/${encodeURIComponent(name)}`); + } catch (e) { + nodesBody.innerHTML = `${e.message}`; + edgesBody.innerHTML = `${e.message}`; + return; + } + + const meta = document.getElementById("graph-meta"); + if (meta) { + meta.textContent = `running=${g.running} total_fps=${fmt(g.total_fps)} ts_ms=${g.timestamp_ms}`; + } + + nodesBody.innerHTML = ""; + for (const n of g.nodes) { + const tr = document.createElement("tr"); + tr.innerHTML = ` + ${n.id} + ${n.type} + ${n.role || ""} + ${fmt(n.input_fps)} + ${fmt(n.output_fps)} + ${n.input_queue ? `${n.input_queue.size}/${n.input_queue.capacity}` : "-"} + ${n.drop_total} + ${n.error_total} + ${fmt(n.avg_process_time_ms, 3)} + `; + nodesBody.appendChild(tr); + } + + edgesBody.innerHTML = ""; + for (const e of g.edges) { + const tr = document.createElement("tr"); + tr.innerHTML = ` + ${e.from} + ${e.to} + ${e.queue.size} + ${e.queue.capacity} + ${e.queue.dropped_total} + ${fmt(e.queue.pushed_fps)} + ${fmt(e.queue.popped_fps)} + `; + edgesBody.appendChild(tr); + } +} + +function boot() { + updateIndex(); + updateGraph(); + setInterval(updateIndex, 2000); + setInterval(updateGraph, 1000); +} + +window.addEventListener("load", boot); diff --git a/web/graph.html b/web/graph.html new file mode 100644 index 0000000..2780747 --- /dev/null +++ b/web/graph.html @@ -0,0 +1,53 @@ + + + + + + RK3588 Media Server - Graph + + + +
+ ← Back +
+ +

Graph

+
+ +

Nodes

+ + + + + + + + + + + + + + + +
IDTypeRoleInput FPSOutput FPSInQ SizeDropErrAvg ms
+ +

Edges

+ + + + + + + + + + + + + +
FromToQ SizeCapacityDropPush FPSPop FPS
+ + + + diff --git a/web/index.html b/web/index.html new file mode 100644 index 0000000..7059fce --- /dev/null +++ b/web/index.html @@ -0,0 +1,26 @@ + + + + + + RK3588 Media Server - Graphs + + + +

Graphs

+
Auto refresh every 2s.
+ + + + + + + + + + +
NameStatusTotal FPS
+ + + + diff --git a/web/style.css b/web/style.css new file mode 100644 index 0000000..b027849 --- /dev/null +++ b/web/style.css @@ -0,0 +1,44 @@ +body { + font-family: ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, Helvetica, Arial; + margin: 16px; + color: #111; +} + +h1, h2 { + margin: 12px 0; +} + +.hint { + color: #555; + margin: 6px 0 12px; +} + +.toolbar { + margin-bottom: 8px; +} + +table { + border-collapse: collapse; + width: 100%; +} + +th, td { + border: 1px solid #ddd; + padding: 6px 8px; + text-align: left; + font-size: 14px; +} + +th { + background: #f6f6f6; +} + +.status-running { + color: #0a7a0a; + font-weight: 600; +} + +.status-stopped { + color: #b00020; + font-weight: 600; +}