开发sprint6阶段,第一次开发

This commit is contained in:
sladro 2025-12-31 09:40:36 +08:00
parent f1a290998f
commit 811a57c361
13 changed files with 1409 additions and 1 deletions

View File

@ -54,6 +54,7 @@ set(SRC_FILES
src/graph_manager.cpp
src/plugin_loader.cpp
src/ai_scheduler.cpp
src/http_server.cpp
src/utils/dma_alloc.cpp
src/utils/config_expand.cpp
)
@ -66,6 +67,10 @@ target_include_directories(media-server
${CMAKE_SOURCE_DIR}/third_party
)
target_link_libraries(media-server PRIVATE project_options Threads::Threads)
if(WIN32)
target_link_libraries(media-server PRIVATE ws2_32)
endif()
target_compile_definitions(media-server PRIVATE
RK_PROJECT_VERSION="${PROJECT_VERSION}"
RK_GIT_SHA="${RK_GIT_SHA}"

View File

@ -0,0 +1,161 @@
{
"global": {
"hot_reload": { "enable": true }
},
"queue": { "size": 8, "strategy": "drop_oldest" },
"templates": {
"security_pipeline": {
"nodes": [
{
"id": "in",
"type": "input_rtsp",
"role": "source",
"enable": true,
"url": "${url}",
"fps": 25,
"width": 1920,
"height": 1080,
"use_mpp": false,
"use_ffmpeg": true,
"force_tcp": true
},
{
"id": "pre",
"type": "preprocess",
"role": "filter",
"enable": true,
"dst_w": 640,
"dst_h": 640,
"dst_format": "rgb",
"keep_ratio": false,
"use_rga": true
},
{
"id": "ai",
"type": "ai_yolo",
"role": "filter",
"enable": true,
"model_path": "${model_path}",
"model_version": "v8",
"num_classes": 80,
"conf": 0.5,
"nms": 0.45,
"class_filter": []
},
{
"id": "alarm",
"type": "alarm",
"role": "sink",
"enable": true,
"labels": [],
"rules": [
{
"name": "object_detection",
"class_ids": [0],
"roi": { "x": 0.1, "y": 0.1, "w": 0.8, "h": 0.8 },
"min_duration_ms": 500,
"cooldown_ms": 5000
}
],
"actions": {
"log": { "enable": true, "level": "info" },
"http": {
"enable": false,
"url": "http://127.0.0.1:8080/api/alarm",
"timeout_ms": 3000,
"include_media_url": true
},
"snapshot": {
"enable": true,
"format": "jpg",
"quality": 85,
"upload": { "type": "local", "path": "/tmp/alarms" }
},
"clip": {
"enable": false,
"pre_sec": 5,
"post_sec": 10,
"format": "mp4",
"fps": 25,
"upload": { "type": "local", "path": "/tmp/alarms" }
}
}
},
{
"id": "osd",
"type": "osd",
"role": "filter",
"enable": true,
"draw_bbox": true,
"draw_text": true,
"line_width": 2,
"font_scale": 1,
"labels": []
},
{
"id": "post",
"type": "preprocess",
"role": "filter",
"enable": true,
"dst_w": 1920,
"dst_h": 1080,
"dst_format": "nv12",
"keep_ratio": false,
"use_rga": true
},
{
"id": "storage",
"type": "storage",
"role": "sink",
"enable": true,
"mode": "continuous",
"format": "mp4",
"codec": "h264",
"segment_sec": 300,
"path": "${rec_path}",
"filename_pattern": "%Y%m%d/%H%M%S",
"fps": 25,
"bitrate_kbps": 2000
},
{
"id": "pub",
"type": "publish",
"role": "sink",
"enable": true,
"codec": "h264",
"fps": 25,
"gop": 50,
"bitrate_kbps": 2000,
"use_mpp": true,
"use_ffmpeg_mux": true,
"outputs": [
{ "proto": "rtsp_server", "port": 8554, "path": "/live/${name}" }
]
}
],
"edges": [
["in", "pre"],
["pre", "ai"],
["ai", "alarm"],
["ai", "osd"],
["osd", "post"],
["post", "storage"],
["post", "pub"]
]
}
},
"instances": [
{
"name": "cam1",
"template": "security_pipeline",
"params": {
"name": "cam1",
"url": "rtsp://10.0.0.9:8554/cam",
"model_path": "/models/yolov8n.rknn",
"rec_path": "/rec/cam1"
}
}
]
}

View File

@ -0,0 +1,55 @@
{
"global": {
"hot_reload": { "enable": true }
},
"queue": { "size": 8, "strategy": "drop_oldest" },
"templates": {
"transcode_gateway": {
"nodes": [
{
"id": "in",
"type": "input_rtsp",
"role": "source",
"enable": true,
"url": "${url}",
"fps": 25,
"width": 1920,
"height": 1080,
"use_mpp": false,
"use_ffmpeg": true,
"force_tcp": true
},
{
"id": "pub",
"type": "publish",
"role": "sink",
"enable": true,
"codec": "h264",
"fps": 25,
"gop": 50,
"bitrate_kbps": 2000,
"use_mpp": true,
"use_ffmpeg_mux": true,
"outputs": [
{ "proto": "rtsp_server", "port": 8554, "path": "/live/${name}" }
]
}
],
"edges": [
["in", "pub"]
]
}
},
"instances": [
{
"name": "cam1",
"template": "transcode_gateway",
"params": {
"name": "cam1",
"url": "rtsp://10.0.0.9:8554/cam"
}
}
]
}

View File

@ -1,6 +1,10 @@
#pragma once
#include <chrono>
#include <cstdint>
#include <map>
#include <memory>
#include <optional>
#include <string>
#include <mutex>
#include <condition_variable>
@ -14,6 +18,49 @@
namespace rk3588 {
struct QueueSnapshot {
size_t size = 0;
size_t capacity = 0;
uint64_t pushed_total = 0;
uint64_t popped_total = 0;
uint64_t dropped_total = 0;
bool stopped = false;
double pushed_fps = 0.0;
double popped_fps = 0.0;
};
struct EdgeSnapshot {
std::string from;
std::string to;
QueueSnapshot queue;
};
struct NodeSnapshot {
std::string graph;
std::string id;
std::string type;
std::string role;
bool enabled = true;
QueueSnapshot input_queue;
double input_fps = 0.0;
double output_fps = 0.0;
uint64_t ok_total = 0;
uint64_t drop_total = 0;
uint64_t error_total = 0;
double avg_process_time_ms = 0.0;
};
struct GraphSnapshot {
std::string name;
bool running = false;
uint64_t timestamp_ms = 0;
double total_fps = 0.0;
std::vector<NodeSnapshot> nodes;
std::vector<EdgeSnapshot> edges;
};
class Graph {
public:
explicit Graph(std::string name);
@ -27,6 +74,9 @@ public:
const std::string& Name() const { return name_; }
const SimpleJson& GraphConfig() const { return graph_cfg_; }
GraphSnapshot Snapshot() const;
bool FindNodeSnapshotById(const std::string& node_id, NodeSnapshot& out) const;
// Attempt in-place update via INode::UpdateConfig for nodes whose config changed.
// Returns true if fully updated without rebuild.
// Returns false with empty err if rebuild is required.
@ -35,6 +85,13 @@ public:
QueueDropStrategy default_strategy, std::string& err);
private:
struct NodeMetrics {
std::atomic<uint64_t> ok_total{0};
std::atomic<uint64_t> drop_total{0};
std::atomic<uint64_t> error_total{0};
std::atomic<uint64_t> process_time_ns_total{0};
};
struct NodeEntry {
std::string id;
std::string type;
@ -44,6 +101,14 @@ private:
NodeContext context;
std::unique_ptr<INode> node;
std::thread worker;
std::shared_ptr<NodeMetrics> metrics;
};
struct EdgeEntry {
std::string from;
std::string to;
std::shared_ptr<SpscQueue<FramePtr>> queue;
};
std::string name_;
@ -51,8 +116,17 @@ private:
size_t built_default_queue_size_ = 8;
QueueDropStrategy built_default_strategy_ = QueueDropStrategy::DropOldest;
std::vector<NodeEntry> nodes_;
std::vector<EdgeEntry> edges_;
std::atomic<bool> running_{false};
std::atomic<bool> stop_requested_{false};
// Mutable rate state (updated on snapshot collection).
mutable std::mutex rate_mu_;
mutable std::chrono::steady_clock::time_point last_rate_tp_{};
mutable std::map<std::string, uint64_t> last_node_in_popped_;
mutable std::map<std::string, uint64_t> last_node_out_pushed_;
mutable std::map<std::string, uint64_t> last_edge_pushed_;
mutable std::map<std::string, uint64_t> last_edge_popped_;
};
class GraphManager {
@ -70,6 +144,12 @@ public:
bool ReloadFromFile(const std::string& path, std::string& err);
std::vector<GraphSnapshot> ListGraphSnapshots();
bool GetGraphSnapshot(const std::string& name, GraphSnapshot& out, std::string& err);
// If graph is not set: auto-match when unique, otherwise return false with err describing ambiguity.
bool GetNodeSnapshot(const std::string& node_id, const std::optional<std::string>& graph,
NodeSnapshot& out, std::string& err);
private:
bool running_ = false;
PluginLoader loader_;

32
include/http_server.h Normal file
View File

@ -0,0 +1,32 @@
#pragma once
#include <atomic>
#include <cstdint>
#include <string>
#include <thread>
#include "graph_manager.h"
namespace rk3588 {
class HttpServer {
public:
HttpServer(GraphManager& gm, int port, std::string web_root);
~HttpServer();
bool Start();
void Stop();
private:
void ServerLoop();
GraphManager& gm_;
int port_ = 9000;
std::string web_root_;
std::atomic<bool> running_{false};
std::atomic<int64_t> listen_sock_{-1};
std::thread worker_;
};
} // namespace rk3588

View File

@ -5,6 +5,7 @@
#include <cstddef>
#include <deque>
#include <mutex>
#include <utility>
namespace rk3588 {
@ -17,6 +18,15 @@ enum class QueueDropStrategy {
template <typename T>
class SpscQueue {
public:
struct Stats {
size_t size = 0;
size_t capacity = 0;
size_t dropped = 0;
size_t pushed = 0;
size_t popped = 0;
bool stopped = false;
};
SpscQueue(size_t capacity, QueueDropStrategy strategy)
: capacity_(capacity), strategy_(strategy) {}
@ -38,6 +48,7 @@ public:
}
}
queue_.push_back(std::move(item));
++pushed_;
data_cv_.notify_one();
return true;
}
@ -50,6 +61,7 @@ public:
if (queue_.empty()) return false;
out = std::move(queue_.front());
queue_.pop_front();
++popped_;
space_cv_.notify_one();
return true;
}
@ -78,6 +90,28 @@ public:
return dropped_;
}
size_t PushedCount() const {
std::lock_guard<std::mutex> lock(mu_);
return pushed_;
}
size_t PoppedCount() const {
std::lock_guard<std::mutex> lock(mu_);
return popped_;
}
Stats GetStats() const {
std::lock_guard<std::mutex> lock(mu_);
Stats s;
s.size = queue_.size();
s.capacity = capacity_;
s.dropped = dropped_;
s.pushed = pushed_;
s.popped = popped_;
s.stopped = stop_;
return s;
}
private:
size_t capacity_ = 0;
QueueDropStrategy strategy_ = QueueDropStrategy::DropOldest;
@ -87,6 +121,8 @@ private:
std::deque<T> queue_;
bool stop_ = false;
size_t dropped_ = 0;
size_t pushed_ = 0;
size_t popped_ = 0;
};
} // namespace rk3588

View File

@ -111,6 +111,17 @@ bool Graph::Build(const SimpleJson& graph_cfg, PluginLoader& loader, size_t defa
built_default_queue_size_ = default_queue_size;
built_default_strategy_ = default_strategy;
nodes_.clear();
edges_.clear();
{
std::lock_guard<std::mutex> lock(rate_mu_);
last_rate_tp_ = {};
last_node_in_popped_.clear();
last_node_out_pushed_.clear();
last_edge_pushed_.clear();
last_edge_popped_.clear();
}
const auto& obj = graph_cfg.AsObject();
auto name_it = obj.find("name");
if (name_it != obj.end() && name_it->second.IsString()) {
@ -135,6 +146,7 @@ bool Graph::Build(const SimpleJson& graph_cfg, PluginLoader& loader, size_t defa
entry.type = node_val.ValueOr<std::string>("type", "");
entry.role = node_val.ValueOr<std::string>("role", "");
entry.enabled = node_val.ValueOr<bool>("enable", true);
entry.metrics = std::make_shared<NodeMetrics>();
if (entry.id.empty() || entry.type.empty()) {
err = "Node missing id or type";
@ -201,6 +213,7 @@ bool Graph::Build(const SimpleJson& graph_cfg, PluginLoader& loader, size_t defa
}
enabled_edges.emplace_back(e.from, e.to);
edges_.push_back(EdgeEntry{e.from, e.to, queue});
}
// Role validation
@ -253,6 +266,8 @@ bool Graph::Build(const SimpleJson& graph_cfg, PluginLoader& loader, size_t defa
return false;
}
}
// Instantiation
for (auto& entry : nodes_) {
@ -419,7 +434,27 @@ bool Graph::Start() {
FramePtr frame;
while (true) {
if (entry.context.input_queue->Pop(frame, std::chrono::milliseconds(100))) {
if (frame) entry.node->Process(frame);
if (!frame) {
continue;
}
const auto t0 = std::chrono::steady_clock::now();
NodeStatus st = entry.node->Process(frame);
const auto t1 = std::chrono::steady_clock::now();
const auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(t1 - t0).count();
if (entry.metrics) {
if (ns > 0) {
entry.metrics->process_time_ns_total.fetch_add(static_cast<uint64_t>(ns),
std::memory_order_relaxed);
}
if (st == NodeStatus::OK) {
entry.metrics->ok_total.fetch_add(1, std::memory_order_relaxed);
} else if (st == NodeStatus::DROP) {
entry.metrics->drop_total.fetch_add(1, std::memory_order_relaxed);
} else {
entry.metrics->error_total.fetch_add(1, std::memory_order_relaxed);
}
}
continue;
}
@ -482,6 +517,140 @@ void Graph::Stop() {
running_.store(false);
}
namespace {
uint64_t NowEpochMs() {
using namespace std::chrono;
return static_cast<uint64_t>(duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count());
}
QueueSnapshot ToQueueSnapshot(const SpscQueue<FramePtr>::Stats& st) {
QueueSnapshot q;
q.size = st.size;
q.capacity = st.capacity;
q.dropped_total = static_cast<uint64_t>(st.dropped);
q.pushed_total = static_cast<uint64_t>(st.pushed);
q.popped_total = static_cast<uint64_t>(st.popped);
q.stopped = st.stopped;
return q;
}
} // namespace
GraphSnapshot Graph::Snapshot() const {
GraphSnapshot snap;
snap.name = name_;
snap.running = running_.load();
snap.timestamp_ms = NowEpochMs();
const auto now_tp = std::chrono::steady_clock::now();
double dt_sec = 0.0;
{
std::lock_guard<std::mutex> lock(rate_mu_);
if (last_rate_tp_.time_since_epoch().count() != 0) {
dt_sec = std::chrono::duration_cast<std::chrono::duration<double>>(now_tp - last_rate_tp_).count();
}
last_rate_tp_ = now_tp;
}
double total_fps = 0.0;
for (const auto& n : nodes_) {
if (!n.enabled || !n.node) continue;
NodeSnapshot ns;
ns.graph = name_;
ns.id = n.id;
ns.type = n.type;
ns.role = n.role;
ns.enabled = n.enabled;
uint64_t in_popped = 0;
if (n.context.input_queue) {
auto st = n.context.input_queue->GetStats();
ns.input_queue = ToQueueSnapshot(st);
in_popped = ns.input_queue.popped_total;
}
uint64_t out_pushed = 0;
for (const auto& oq : n.context.output_queues) {
if (!oq) continue;
out_pushed += static_cast<uint64_t>(oq->PushedCount());
}
{
std::lock_guard<std::mutex> lock(rate_mu_);
const uint64_t last_in = last_node_in_popped_[n.id];
const uint64_t last_out = last_node_out_pushed_[n.id];
if (dt_sec > 0.0) {
if (in_popped >= last_in) ns.input_fps = static_cast<double>(in_popped - last_in) / dt_sec;
if (out_pushed >= last_out) ns.output_fps = static_cast<double>(out_pushed - last_out) / dt_sec;
}
last_node_in_popped_[n.id] = in_popped;
last_node_out_pushed_[n.id] = out_pushed;
}
if (n.metrics) {
ns.ok_total = n.metrics->ok_total.load(std::memory_order_relaxed);
ns.drop_total = n.metrics->drop_total.load(std::memory_order_relaxed);
ns.error_total = n.metrics->error_total.load(std::memory_order_relaxed);
}
const uint64_t proc_cnt = ns.ok_total + ns.drop_total + ns.error_total;
const uint64_t ns_total = n.metrics ? n.metrics->process_time_ns_total.load(std::memory_order_relaxed) : 0;
if (proc_cnt > 0) {
ns.avg_process_time_ms = (static_cast<double>(ns_total) / 1e6) / static_cast<double>(proc_cnt);
}
const bool source_like = (n.role == "source") || !n.context.input_queue;
if (source_like) {
total_fps += ns.output_fps;
}
snap.nodes.push_back(std::move(ns));
}
for (const auto& e : edges_) {
if (!e.queue) continue;
EdgeSnapshot es;
es.from = e.from;
es.to = e.to;
const auto st = e.queue->GetStats();
es.queue = ToQueueSnapshot(st);
const std::string key = e.from + "->" + e.to;
{
std::lock_guard<std::mutex> lock(rate_mu_);
const uint64_t last_pushed = last_edge_pushed_[key];
const uint64_t last_popped = last_edge_popped_[key];
if (dt_sec > 0.0) {
if (es.queue.pushed_total >= last_pushed) {
es.queue.pushed_fps = static_cast<double>(es.queue.pushed_total - last_pushed) / dt_sec;
}
if (es.queue.popped_total >= last_popped) {
es.queue.popped_fps = static_cast<double>(es.queue.popped_total - last_popped) / dt_sec;
}
}
last_edge_pushed_[key] = es.queue.pushed_total;
last_edge_popped_[key] = es.queue.popped_total;
}
snap.edges.push_back(std::move(es));
}
snap.total_fps = total_fps;
return snap;
}
bool Graph::FindNodeSnapshotById(const std::string& node_id, NodeSnapshot& out) const {
auto snap = Snapshot();
for (auto& n : snap.nodes) {
if (n.id == node_id) {
out = std::move(n);
return true;
}
}
return false;
}
GraphManager::GraphManager(std::string plugin_dir)
: loader_(std::move(plugin_dir)) {}
@ -725,4 +894,61 @@ bool GraphManager::ReloadFromFile(const std::string& path, std::string& err) {
return true;
}
std::vector<GraphSnapshot> GraphManager::ListGraphSnapshots() {
std::vector<GraphSnapshot> out;
std::lock_guard<std::mutex> lock(graphs_mu_);
out.reserve(graphs_.size());
for (const auto& g : graphs_) {
if (!g) continue;
out.push_back(g->Snapshot());
}
return out;
}
bool GraphManager::GetGraphSnapshot(const std::string& name, GraphSnapshot& out, std::string& err) {
std::lock_guard<std::mutex> lock(graphs_mu_);
for (const auto& g : graphs_) {
if (g && g->Name() == name) {
out = g->Snapshot();
return true;
}
}
err = "graph not found: " + name;
return false;
}
bool GraphManager::GetNodeSnapshot(const std::string& node_id, const std::optional<std::string>& graph,
NodeSnapshot& out, std::string& err) {
std::lock_guard<std::mutex> lock(graphs_mu_);
if (graph && !graph->empty()) {
for (const auto& g : graphs_) {
if (!g || g->Name() != *graph) continue;
if (g->FindNodeSnapshotById(node_id, out)) return true;
err = "node not found: " + node_id + " in graph " + *graph;
return false;
}
err = "graph not found: " + *graph;
return false;
}
// Auto-match when unique.
size_t hits = 0;
for (const auto& g : graphs_) {
if (!g) continue;
NodeSnapshot tmp;
if (g->FindNodeSnapshotById(node_id, tmp)) {
out = std::move(tmp);
++hits;
if (hits > 1) break;
}
}
if (hits == 1) return true;
if (hits == 0) {
err = "node not found: " + node_id;
return false;
}
err = "node id is not unique; specify ?graph=<name>";
return false;
}
} // namespace rk3588

562
src/http_server.cpp Normal file
View File

@ -0,0 +1,562 @@
#include "http_server.h"
#include <algorithm>
#include <cctype>
#include <fstream>
#include <iostream>
#include <map>
#include <sstream>
#include <string>
#include <vector>
#if defined(_WIN32)
#ifndef NOMINMAX
#define NOMINMAX
#endif
#include <winsock2.h>
#include <ws2tcpip.h>
#else
#include <arpa/inet.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#endif
namespace rk3588 {
namespace {
#if defined(_WIN32)
using Sock = SOCKET;
constexpr Sock kInvalidSock = INVALID_SOCKET;
static bool InitSockets() {
WSADATA wsa;
return WSAStartup(MAKEWORD(2, 2), &wsa) == 0;
}
static void CleanupSockets() {
WSACleanup();
}
static void CloseSock(Sock s) {
if (s != kInvalidSock) closesocket(s);
}
static bool SetNonBlocking(Sock s) {
u_long mode = 1;
return ioctlsocket(s, FIONBIO, &mode) == 0;
}
#else
using Sock = int;
constexpr Sock kInvalidSock = -1;
static bool InitSockets() { return true; }
static void CleanupSockets() {}
static void CloseSock(Sock s) {
if (s != kInvalidSock) close(s);
}
static bool SetNonBlocking(Sock s) {
int flags = fcntl(s, F_GETFL, 0);
if (flags < 0) return false;
return fcntl(s, F_SETFL, flags | O_NONBLOCK) == 0;
}
#endif
static std::string ToLower(std::string s) {
for (char& c : s) c = static_cast<char>(std::tolower(static_cast<unsigned char>(c)));
return s;
}
static void Trim(std::string& s) {
auto not_space = [](unsigned char ch) { return !std::isspace(ch); };
s.erase(s.begin(), std::find_if(s.begin(), s.end(), not_space));
s.erase(std::find_if(s.rbegin(), s.rend(), not_space).base(), s.end());
}
static std::map<std::string, std::string> ParseQuery(const std::string& q) {
std::map<std::string, std::string> out;
size_t i = 0;
while (i < q.size()) {
size_t amp = q.find('&', i);
std::string part = q.substr(i, (amp == std::string::npos) ? std::string::npos : amp - i);
size_t eq = part.find('=');
std::string k = (eq == std::string::npos) ? part : part.substr(0, eq);
std::string v = (eq == std::string::npos) ? "" : part.substr(eq + 1);
if (!k.empty()) out[k] = v;
if (amp == std::string::npos) break;
i = amp + 1;
}
return out;
}
static std::string JsonEscape(std::string_view s) {
std::string out;
out.reserve(s.size() + 8);
for (char c : s) {
switch (c) {
case '\\': out += "\\\\"; break;
case '"': out += "\\\""; break;
case '\n': out += "\\n"; break;
case '\r': out += "\\r"; break;
case '\t': out += "\\t"; break;
default:
if (static_cast<unsigned char>(c) < 0x20) {
out += "?";
} else {
out.push_back(c);
}
}
}
return out;
}
static std::string StatusText(int code) {
switch (code) {
case 200: return "OK";
case 400: return "Bad Request";
case 404: return "Not Found";
case 405: return "Method Not Allowed";
case 409: return "Conflict";
case 500: return "Internal Server Error";
default: return "";
}
}
static std::string ContentTypeForPath(const std::string& path) {
auto pos = path.find_last_of('.');
std::string ext = (pos == std::string::npos) ? "" : ToLower(path.substr(pos + 1));
if (ext == "html") return "text/html; charset=utf-8";
if (ext == "js") return "application/javascript; charset=utf-8";
if (ext == "css") return "text/css; charset=utf-8";
if (ext == "json") return "application/json; charset=utf-8";
return "application/octet-stream";
}
static bool IsSafeStaticPath(const std::string& path) {
if (path.empty() || path[0] != '/') return false;
if (path.find('\\') != std::string::npos) return false;
if (path.find("..") != std::string::npos) return false;
return true;
}
static bool ReadFile(const std::string& file_path, std::string& out) {
std::ifstream ifs(file_path, std::ios::binary);
if (!ifs.is_open()) return false;
std::ostringstream oss;
oss << ifs.rdbuf();
out = oss.str();
return true;
}
static std::string QueueJson(const QueueSnapshot& q) {
std::ostringstream oss;
oss << "{";
oss << "\"size\":" << q.size << ',';
oss << "\"capacity\":" << q.capacity << ',';
oss << "\"pushed_total\":" << q.pushed_total << ',';
oss << "\"popped_total\":" << q.popped_total << ',';
oss << "\"dropped_total\":" << q.dropped_total << ',';
oss << "\"stopped\":" << (q.stopped ? "true" : "false") << ',';
oss << "\"pushed_fps\":" << q.pushed_fps << ',';
oss << "\"popped_fps\":" << q.popped_fps;
oss << "}";
return oss.str();
}
static std::string GraphSnapshotJson(const GraphSnapshot& g) {
std::ostringstream oss;
oss << "{";
oss << "\"name\":\"" << JsonEscape(g.name) << "\",";
oss << "\"running\":" << (g.running ? "true" : "false") << ',';
oss << "\"timestamp_ms\":" << g.timestamp_ms << ',';
oss << "\"total_fps\":" << g.total_fps << ',';
oss << "\"nodes\":[";
for (size_t i = 0; i < g.nodes.size(); ++i) {
const auto& n = g.nodes[i];
if (i) oss << ',';
oss << "{";
oss << "\"graph\":\"" << JsonEscape(n.graph) << "\",";
oss << "\"id\":\"" << JsonEscape(n.id) << "\",";
oss << "\"type\":\"" << JsonEscape(n.type) << "\",";
oss << "\"role\":\"" << JsonEscape(n.role) << "\",";
oss << "\"enabled\":" << (n.enabled ? "true" : "false") << ',';
oss << "\"input_fps\":" << n.input_fps << ',';
oss << "\"output_fps\":" << n.output_fps << ',';
oss << "\"ok_total\":" << n.ok_total << ',';
oss << "\"drop_total\":" << n.drop_total << ',';
oss << "\"error_total\":" << n.error_total << ',';
oss << "\"avg_process_time_ms\":" << n.avg_process_time_ms << ',';
oss << "\"input_queue\":" << QueueJson(n.input_queue);
oss << "}";
}
oss << "],";
oss << "\"edges\":[";
for (size_t i = 0; i < g.edges.size(); ++i) {
const auto& e = g.edges[i];
if (i) oss << ',';
oss << "{";
oss << "\"from\":\"" << JsonEscape(e.from) << "\",";
oss << "\"to\":\"" << JsonEscape(e.to) << "\",";
oss << "\"queue\":" << QueueJson(e.queue);
oss << "}";
}
oss << "]";
oss << "}";
return oss.str();
}
static std::string NodeSnapshotJson(const NodeSnapshot& n) {
std::ostringstream oss;
oss << "{";
oss << "\"graph\":\"" << JsonEscape(n.graph) << "\",";
oss << "\"id\":\"" << JsonEscape(n.id) << "\",";
oss << "\"type\":\"" << JsonEscape(n.type) << "\",";
oss << "\"role\":\"" << JsonEscape(n.role) << "\",";
oss << "\"enabled\":" << (n.enabled ? "true" : "false") << ',';
oss << "\"input_fps\":" << n.input_fps << ',';
oss << "\"output_fps\":" << n.output_fps << ',';
oss << "\"ok_total\":" << n.ok_total << ',';
oss << "\"drop_total\":" << n.drop_total << ',';
oss << "\"error_total\":" << n.error_total << ',';
oss << "\"avg_process_time_ms\":" << n.avg_process_time_ms << ',';
oss << "\"input_queue\":" << QueueJson(n.input_queue);
oss << "}";
return oss.str();
}
static std::string ErrorJson(const std::string& msg) {
std::ostringstream oss;
oss << "{\"error\":\"" << JsonEscape(msg) << "\"}";
return oss.str();
}
struct HttpRequest {
std::string method;
std::string target;
std::string path;
std::string query;
std::map<std::string, std::string> headers;
std::string body;
};
struct HttpResponse {
int status = 200;
std::string content_type = "application/json; charset=utf-8";
std::string body;
};
static bool RecvUntil(Sock s, std::string& data, const std::string& needle, size_t max_bytes) {
char buf[4096];
while (data.find(needle) == std::string::npos) {
if (data.size() > max_bytes) return false;
#if defined(_WIN32)
int n = recv(s, buf, static_cast<int>(sizeof(buf)), 0);
#else
int n = static_cast<int>(recv(s, buf, sizeof(buf), 0));
#endif
if (n <= 0) return false;
data.append(buf, buf + n);
}
return true;
}
static bool ParseHttpRequest(Sock s, HttpRequest& req, std::string& err) {
std::string data;
if (!RecvUntil(s, data, "\r\n\r\n", 1024 * 1024)) {
err = "failed to read headers";
return false;
}
size_t header_end = data.find("\r\n\r\n");
std::string header_part = data.substr(0, header_end);
std::string remain = data.substr(header_end + 4);
std::istringstream iss(header_part);
std::string line;
if (!std::getline(iss, line)) {
err = "empty request";
return false;
}
if (!line.empty() && line.back() == '\r') line.pop_back();
{
std::istringstream ls(line);
std::string version;
if (!(ls >> req.method >> req.target >> version)) {
err = "bad request line";
return false;
}
}
while (std::getline(iss, line)) {
if (!line.empty() && line.back() == '\r') line.pop_back();
if (line.empty()) continue;
auto pos = line.find(':');
if (pos == std::string::npos) continue;
std::string k = ToLower(line.substr(0, pos));
std::string v = line.substr(pos + 1);
Trim(v);
req.headers[k] = v;
}
// split path/query
{
auto qpos = req.target.find('?');
if (qpos == std::string::npos) {
req.path = req.target;
} else {
req.path = req.target.substr(0, qpos);
req.query = req.target.substr(qpos + 1);
}
}
size_t content_len = 0;
if (auto it = req.headers.find("content-length"); it != req.headers.end()) {
try {
content_len = static_cast<size_t>(std::stoul(it->second));
} catch (...) {
content_len = 0;
}
}
req.body = remain;
while (req.body.size() < content_len) {
char buf[4096];
#if defined(_WIN32)
int n = recv(s, buf, static_cast<int>(sizeof(buf)), 0);
#else
int n = static_cast<int>(recv(s, buf, sizeof(buf), 0));
#endif
if (n <= 0) break;
req.body.append(buf, buf + n);
}
if (req.body.size() > content_len) req.body.resize(content_len);
return true;
}
static bool SendAll(Sock s, const std::string& data) {
size_t sent = 0;
while (sent < data.size()) {
#if defined(_WIN32)
int n = send(s, data.data() + sent, static_cast<int>(data.size() - sent), 0);
#else
ssize_t n = send(s, data.data() + sent, data.size() - sent, 0);
#endif
if (n <= 0) return false;
sent += static_cast<size_t>(n);
}
return true;
}
static std::string MakeHttpResponse(const HttpResponse& res) {
std::ostringstream oss;
oss << "HTTP/1.1 " << res.status << " " << StatusText(res.status) << "\r\n";
oss << "Content-Type: " << res.content_type << "\r\n";
oss << "Content-Length: " << res.body.size() << "\r\n";
oss << "Connection: close\r\n";
oss << "\r\n";
oss << res.body;
return oss.str();
}
} // namespace
HttpServer::HttpServer(GraphManager& gm, int port, std::string web_root)
: gm_(gm), port_(port), web_root_(std::move(web_root)) {}
HttpServer::~HttpServer() {
Stop();
}
bool HttpServer::Start() {
bool expected = false;
if (!running_.compare_exchange_strong(expected, true)) {
return true;
}
worker_ = std::thread(&HttpServer::ServerLoop, this);
return true;
}
void HttpServer::Stop() {
bool was_running = running_.exchange(false);
if (!was_running) return;
const int64_t ls = listen_sock_.exchange(-1);
if (ls != -1) {
CloseSock(static_cast<Sock>(ls));
}
if (worker_.joinable()) worker_.join();
}
void HttpServer::ServerLoop() {
if (!InitSockets()) {
std::cerr << "[HttpServer] socket init failed\n";
running_.store(false);
return;
}
Sock srv = socket(AF_INET, SOCK_STREAM, 0);
if (srv == kInvalidSock) {
std::cerr << "[HttpServer] socket() failed\n";
CleanupSockets();
running_.store(false);
return;
}
int opt = 1;
#if defined(_WIN32)
setsockopt(srv, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<const char*>(&opt), sizeof(opt));
#else
setsockopt(srv, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
#endif
sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_port = htons(static_cast<uint16_t>(port_));
if (bind(srv, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) != 0) {
std::cerr << "[HttpServer] bind failed on port " << port_ << "\n";
CloseSock(srv);
CleanupSockets();
running_.store(false);
return;
}
if (listen(srv, 16) != 0) {
std::cerr << "[HttpServer] listen failed\n";
CloseSock(srv);
CleanupSockets();
running_.store(false);
return;
}
(void)SetNonBlocking(srv);
listen_sock_.store(static_cast<int64_t>(srv));
std::cout << "[HttpServer] listening on 0.0.0.0:" << port_ << " (web_root=" << web_root_ << ")\n";
while (running_.load()) {
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(srv, &rfds);
timeval tv{};
tv.tv_sec = 0;
tv.tv_usec = 200000;
#if defined(_WIN32)
int ret = select(0, &rfds, nullptr, nullptr, &tv);
#else
int ret = select(srv + 1, &rfds, nullptr, nullptr, &tv);
#endif
if (!running_.load()) break;
if (ret <= 0) continue;
if (!FD_ISSET(srv, &rfds)) continue;
sockaddr_in cli{};
#if defined(_WIN32)
int len = sizeof(cli);
#else
socklen_t len = sizeof(cli);
#endif
Sock c = accept(srv, reinterpret_cast<sockaddr*>(&cli), &len);
if (c == kInvalidSock) continue;
HttpRequest req;
std::string perr;
HttpResponse resp;
if (!ParseHttpRequest(c, req, perr)) {
resp.status = 400;
resp.body = ErrorJson(perr);
auto raw = MakeHttpResponse(resp);
(void)SendAll(c, raw);
CloseSock(c);
continue;
}
// Dispatch
if (req.path.rfind("/api/", 0) == 0) {
if (req.method != "GET") {
resp.status = 405;
resp.body = ErrorJson("method not allowed");
} else if (req.path == "/api/graphs") {
auto snaps = gm_.ListGraphSnapshots();
std::ostringstream oss;
oss << "[";
for (size_t i = 0; i < snaps.size(); ++i) {
if (i) oss << ',';
oss << "{";
oss << "\"name\":\"" << JsonEscape(snaps[i].name) << "\",";
oss << "\"running\":" << (snaps[i].running ? "true" : "false") << ',';
oss << "\"total_fps\":" << snaps[i].total_fps;
oss << "}";
}
oss << "]";
resp.body = oss.str();
} else if (req.path.rfind("/api/graphs/", 0) == 0) {
std::string name = req.path.substr(std::string("/api/graphs/").size());
GraphSnapshot gs;
std::string gerr;
if (!gm_.GetGraphSnapshot(name, gs, gerr)) {
resp.status = 404;
resp.body = ErrorJson(gerr);
} else {
resp.body = GraphSnapshotJson(gs);
}
} else if (req.path.rfind("/api/nodes/", 0) == 0) {
// /api/nodes/{id}/metrics
const std::string prefix = "/api/nodes/";
const std::string suffix = "/metrics";
auto pos = req.path.rfind(suffix);
if (pos == std::string::npos || pos + suffix.size() != req.path.size() || pos <= prefix.size()) {
resp.status = 404;
resp.body = ErrorJson("not found");
} else {
std::string node_id = req.path.substr(prefix.size(), pos - prefix.size());
auto q = ParseQuery(req.query);
std::optional<std::string> graph;
if (auto it = q.find("graph"); it != q.end() && !it->second.empty()) {
graph = it->second;
}
NodeSnapshot ns;
std::string nerr;
if (!gm_.GetNodeSnapshot(node_id, graph, ns, nerr)) {
resp.status = (nerr.find("not unique") != std::string::npos) ? 409 : 404;
resp.body = ErrorJson(nerr);
} else {
resp.body = NodeSnapshotJson(ns);
}
}
} else {
resp.status = 404;
resp.body = ErrorJson("not found");
}
} else {
std::string path = req.path;
if (path == "/") path = "/index.html";
if (!IsSafeStaticPath(path)) {
resp.status = 404;
resp.body = ErrorJson("not found");
} else {
std::string full = web_root_;
if (!full.empty() && (full.back() == '/' || full.back() == '\\')) {
full.pop_back();
}
full += path;
std::string content;
if (!ReadFile(full, content)) {
resp.status = 404;
resp.body = ErrorJson("not found");
} else {
resp.status = 200;
resp.content_type = ContentTypeForPath(path);
resp.body = std::move(content);
}
}
}
auto raw = MakeHttpResponse(resp);
(void)SendAll(c, raw);
CloseSock(c);
}
listen_sock_.store(-1);
CloseSock(srv);
CleanupSockets();
}
} // namespace rk3588

View File

@ -22,6 +22,7 @@ namespace fs = std::filesystem;
#endif
#include "graph_manager.h"
#include "http_server.h"
#include "utils/simple_json.h"
namespace rk3588 {
@ -181,6 +182,13 @@ int MediaServerApp::Start() {
return 1;
}
int metrics_port = 9000;
std::string web_root = "web";
if (const SimpleJson* g = root_cfg.Find("global")) {
metrics_port = g->ValueOr<int>("metrics_port", metrics_port);
web_root = g->ValueOr<std::string>("web_root", web_root);
}
if (!graph_manager_.Build(root_cfg, err)) {
std::cerr << "[MediaServerApp] Failed to build graphs: " << err << "\n";
return 1;
@ -195,6 +203,9 @@ int MediaServerApp::Start() {
return 1;
}
HttpServer http(graph_manager_, metrics_port, web_root);
(void)http.Start();
std::cout << "[MediaServerApp] Running. Press Ctrl+C to stop.\n";
std::atomic<bool> stop_watch{false};
@ -214,6 +225,8 @@ int MediaServerApp::Start() {
graph_manager_.BlockUntilStop();
http.Stop();
stop_watch.store(true);
if (watcher.joinable()) watcher.join();
std::cout << "[MediaServerApp] Shutdown complete.\n";

115
web/app.js Normal file
View File

@ -0,0 +1,115 @@
function qs(name) {
const url = new URL(window.location.href);
return url.searchParams.get(name);
}
async function fetchJson(url) {
const res = await fetch(url, { cache: "no-store" });
const text = await res.text();
let data;
try {
data = JSON.parse(text);
} catch {
throw new Error(`Bad JSON from ${url}: ${text.slice(0, 200)}`);
}
if (!res.ok) {
const msg = data && data.error ? data.error : `HTTP ${res.status}`;
throw new Error(msg);
}
return data;
}
function fmt(n, digits = 1) {
if (typeof n !== "number" || !isFinite(n)) return "-";
return n.toFixed(digits);
}
async function updateIndex() {
const body = document.getElementById("graphs-body");
if (!body) return;
let graphs = [];
try {
graphs = await fetchJson("/api/graphs");
} catch (e) {
body.innerHTML = `<tr><td colspan="3">${e.message}</td></tr>`;
return;
}
body.innerHTML = "";
for (const g of graphs) {
const tr = document.createElement("tr");
const statusCls = g.running ? "status-running" : "status-stopped";
tr.innerHTML = `
<td><a href="/graph.html?name=${encodeURIComponent(g.name)}">${g.name}</a></td>
<td class="${statusCls}">${g.running ? "Running" : "Stopped"}</td>
<td>${fmt(g.total_fps)}</td>
`;
body.appendChild(tr);
}
}
async function updateGraph() {
const name = qs("name");
const nodesBody = document.getElementById("nodes-body");
const edgesBody = document.getElementById("edges-body");
if (!name || !nodesBody || !edgesBody) return;
const title = document.getElementById("graph-title");
if (title) title.textContent = `Graph: ${name}`;
let g;
try {
g = await fetchJson(`/api/graphs/${encodeURIComponent(name)}`);
} catch (e) {
nodesBody.innerHTML = `<tr><td colspan="9">${e.message}</td></tr>`;
edgesBody.innerHTML = `<tr><td colspan="7">${e.message}</td></tr>`;
return;
}
const meta = document.getElementById("graph-meta");
if (meta) {
meta.textContent = `running=${g.running} total_fps=${fmt(g.total_fps)} ts_ms=${g.timestamp_ms}`;
}
nodesBody.innerHTML = "";
for (const n of g.nodes) {
const tr = document.createElement("tr");
tr.innerHTML = `
<td>${n.id}</td>
<td>${n.type}</td>
<td>${n.role || ""}</td>
<td>${fmt(n.input_fps)}</td>
<td>${fmt(n.output_fps)}</td>
<td>${n.input_queue ? `${n.input_queue.size}/${n.input_queue.capacity}` : "-"}</td>
<td>${n.drop_total}</td>
<td>${n.error_total}</td>
<td>${fmt(n.avg_process_time_ms, 3)}</td>
`;
nodesBody.appendChild(tr);
}
edgesBody.innerHTML = "";
for (const e of g.edges) {
const tr = document.createElement("tr");
tr.innerHTML = `
<td>${e.from}</td>
<td>${e.to}</td>
<td>${e.queue.size}</td>
<td>${e.queue.capacity}</td>
<td>${e.queue.dropped_total}</td>
<td>${fmt(e.queue.pushed_fps)}</td>
<td>${fmt(e.queue.popped_fps)}</td>
`;
edgesBody.appendChild(tr);
}
}
function boot() {
updateIndex();
updateGraph();
setInterval(updateIndex, 2000);
setInterval(updateGraph, 1000);
}
window.addEventListener("load", boot);

53
web/graph.html Normal file
View File

@ -0,0 +1,53 @@
<!doctype html>
<html lang="zh-CN">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>RK3588 Media Server - Graph</title>
<link rel="stylesheet" href="/style.css" />
</head>
<body>
<div class="toolbar">
<a href="/">← Back</a>
</div>
<h1 id="graph-title">Graph</h1>
<div class="hint" id="graph-meta"></div>
<h2>Nodes</h2>
<table>
<thead>
<tr>
<th>ID</th>
<th>Type</th>
<th>Role</th>
<th>Input FPS</th>
<th>Output FPS</th>
<th>InQ Size</th>
<th>Drop</th>
<th>Err</th>
<th>Avg ms</th>
</tr>
</thead>
<tbody id="nodes-body"></tbody>
</table>
<h2>Edges</h2>
<table>
<thead>
<tr>
<th>From</th>
<th>To</th>
<th>Q Size</th>
<th>Capacity</th>
<th>Drop</th>
<th>Push FPS</th>
<th>Pop FPS</th>
</tr>
</thead>
<tbody id="edges-body"></tbody>
</table>
<script src="/app.js"></script>
</body>
</html>

26
web/index.html Normal file
View File

@ -0,0 +1,26 @@
<!doctype html>
<html lang="zh-CN">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>RK3588 Media Server - Graphs</title>
<link rel="stylesheet" href="/style.css" />
</head>
<body>
<h1>Graphs</h1>
<div class="hint">Auto refresh every 2s.</div>
<table>
<thead>
<tr>
<th>Name</th>
<th>Status</th>
<th>Total FPS</th>
</tr>
</thead>
<tbody id="graphs-body"></tbody>
</table>
<script src="/app.js"></script>
</body>
</html>

44
web/style.css Normal file
View File

@ -0,0 +1,44 @@
body {
font-family: ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, Helvetica, Arial;
margin: 16px;
color: #111;
}
h1, h2 {
margin: 12px 0;
}
.hint {
color: #555;
margin: 6px 0 12px;
}
.toolbar {
margin-bottom: 8px;
}
table {
border-collapse: collapse;
width: 100%;
}
th, td {
border: 1px solid #ddd;
padding: 6px 8px;
text-align: left;
font-size: 14px;
}
th {
background: #f6f6f6;
}
.status-running {
color: #0a7a0a;
font-weight: 600;
}
.status-stopped {
color: #b00020;
font-weight: 600;
}