OrangePi3588Media/src/http_server.cpp
sladro a12f91da63
Some checks failed
CI / host-build (push) Has been cancelled
CI / rk3588-cross-build (push) Has been cancelled
修复功能bug,对齐prd
2026-01-03 16:12:03 +08:00

690 lines
24 KiB
C++

#include "http_server.h"
#include <algorithm>
#include <cctype>
#include <fstream>
#include <iostream>
#include <map>
#include <sstream>
#include <string>
#include <vector>
#include "utils/logger.h"
#include "utils/simple_json.h"
#include "utils/simple_json_writer.h"
#if defined(_WIN32)
#ifndef NOMINMAX
#define NOMINMAX
#endif
#include <winsock2.h>
#include <ws2tcpip.h>
#else
#include <arpa/inet.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#endif
namespace rk3588 {
namespace {
#if defined(_WIN32)
using Sock = SOCKET;
constexpr Sock kInvalidSock = INVALID_SOCKET;
static bool InitSockets() {
WSADATA wsa;
return WSAStartup(MAKEWORD(2, 2), &wsa) == 0;
}
static void CleanupSockets() {
WSACleanup();
}
static void CloseSock(Sock s) {
if (s != kInvalidSock) closesocket(s);
}
static bool SetNonBlocking(Sock s) {
u_long mode = 1;
return ioctlsocket(s, FIONBIO, &mode) == 0;
}
#else
using Sock = int;
constexpr Sock kInvalidSock = -1;
static bool InitSockets() { return true; }
static void CleanupSockets() {}
static void CloseSock(Sock s) {
if (s != kInvalidSock) close(s);
}
static bool SetNonBlocking(Sock s) {
int flags = fcntl(s, F_GETFL, 0);
if (flags < 0) return false;
return fcntl(s, F_SETFL, flags | O_NONBLOCK) == 0;
}
#endif
static std::string ToLower(std::string s) {
for (char& c : s) c = static_cast<char>(std::tolower(static_cast<unsigned char>(c)));
return s;
}
static void Trim(std::string& s) {
auto not_space = [](unsigned char ch) { return !std::isspace(ch); };
s.erase(s.begin(), std::find_if(s.begin(), s.end(), not_space));
s.erase(std::find_if(s.rbegin(), s.rend(), not_space).base(), s.end());
}
static std::map<std::string, std::string> ParseQuery(const std::string& q) {
std::map<std::string, std::string> out;
size_t i = 0;
while (i < q.size()) {
size_t amp = q.find('&', i);
std::string part = q.substr(i, (amp == std::string::npos) ? std::string::npos : amp - i);
size_t eq = part.find('=');
std::string k = (eq == std::string::npos) ? part : part.substr(0, eq);
std::string v = (eq == std::string::npos) ? "" : part.substr(eq + 1);
if (!k.empty()) out[k] = v;
if (amp == std::string::npos) break;
i = amp + 1;
}
return out;
}
static std::string JsonEscape(std::string_view s) {
std::string out;
out.reserve(s.size() + 8);
for (char c : s) {
switch (c) {
case '\\': out += "\\\\"; break;
case '"': out += "\\\""; break;
case '\n': out += "\\n"; break;
case '\r': out += "\\r"; break;
case '\t': out += "\\t"; break;
default:
if (static_cast<unsigned char>(c) < 0x20) {
out += "?";
} else {
out.push_back(c);
}
}
}
return out;
}
static std::string StatusText(int code) {
switch (code) {
case 200: return "OK";
case 400: return "Bad Request";
case 404: return "Not Found";
case 405: return "Method Not Allowed";
case 409: return "Conflict";
case 500: return "Internal Server Error";
default: return "";
}
}
static std::string ContentTypeForPath(const std::string& path) {
auto pos = path.find_last_of('.');
std::string ext = (pos == std::string::npos) ? "" : ToLower(path.substr(pos + 1));
if (ext == "html") return "text/html; charset=utf-8";
if (ext == "js") return "application/javascript; charset=utf-8";
if (ext == "css") return "text/css; charset=utf-8";
if (ext == "json") return "application/json; charset=utf-8";
return "application/octet-stream";
}
static bool IsSafeStaticPath(const std::string& path) {
if (path.empty() || path[0] != '/') return false;
if (path.find('\\') != std::string::npos) return false;
if (path.find("..") != std::string::npos) return false;
return true;
}
static bool ReadFile(const std::string& file_path, std::string& out) {
std::ifstream ifs(file_path, std::ios::binary);
if (!ifs.is_open()) return false;
std::ostringstream oss;
oss << ifs.rdbuf();
out = oss.str();
return true;
}
static std::string QueueJson(const QueueSnapshot& q) {
std::ostringstream oss;
oss << "{";
oss << "\"size\":" << q.size << ',';
oss << "\"capacity\":" << q.capacity << ',';
oss << "\"pushed_total\":" << q.pushed_total << ',';
oss << "\"popped_total\":" << q.popped_total << ',';
oss << "\"dropped_total\":" << q.dropped_total << ',';
oss << "\"stopped\":" << (q.stopped ? "true" : "false") << ',';
oss << "\"pushed_fps\":" << q.pushed_fps << ',';
oss << "\"popped_fps\":" << q.popped_fps;
oss << "}";
return oss.str();
}
static std::string GraphSnapshotJson(const GraphSnapshot& g) {
std::ostringstream oss;
oss << "{";
oss << "\"name\":\"" << JsonEscape(g.name) << "\",";
oss << "\"running\":" << (g.running ? "true" : "false") << ',';
oss << "\"timestamp_ms\":" << g.timestamp_ms << ',';
oss << "\"total_fps\":" << g.total_fps << ',';
oss << "\"alarm_total\":" << g.alarm_total << ',';
oss << "\"publish_clients\":" << g.publish_clients << ',';
oss << "\"nodes\":[";
for (size_t i = 0; i < g.nodes.size(); ++i) {
const auto& n = g.nodes[i];
if (i) oss << ',';
oss << "{";
oss << "\"graph\":\"" << JsonEscape(n.graph) << "\",";
oss << "\"id\":\"" << JsonEscape(n.id) << "\",";
oss << "\"type\":\"" << JsonEscape(n.type) << "\",";
oss << "\"role\":\"" << JsonEscape(n.role) << "\",";
oss << "\"enabled\":" << (n.enabled ? "true" : "false") << ',';
oss << "\"input_fps\":" << n.input_fps << ',';
oss << "\"output_fps\":" << n.output_fps << ',';
oss << "\"ok_total\":" << n.ok_total << ',';
oss << "\"drop_total\":" << n.drop_total << ',';
oss << "\"error_total\":" << n.error_total << ',';
oss << "\"avg_process_time_ms\":" << n.avg_process_time_ms << ',';
oss << "\"input_queue\":" << QueueJson(n.input_queue) << ',';
oss << "\"custom_metrics\":" << StringifySimpleJson(n.custom_metrics);
oss << "}";
}
oss << "],";
oss << "\"edges\":[";
for (size_t i = 0; i < g.edges.size(); ++i) {
const auto& e = g.edges[i];
if (i) oss << ',';
oss << "{";
oss << "\"from\":\"" << JsonEscape(e.from) << "\",";
oss << "\"to\":\"" << JsonEscape(e.to) << "\",";
oss << "\"queue\":" << QueueJson(e.queue);
oss << "}";
}
oss << "]";
oss << "}";
return oss.str();
}
static std::string NodeSnapshotJson(const NodeSnapshot& n) {
std::ostringstream oss;
oss << "{";
oss << "\"graph\":\"" << JsonEscape(n.graph) << "\",";
oss << "\"id\":\"" << JsonEscape(n.id) << "\",";
oss << "\"type\":\"" << JsonEscape(n.type) << "\",";
oss << "\"role\":\"" << JsonEscape(n.role) << "\",";
oss << "\"enabled\":" << (n.enabled ? "true" : "false") << ',';
oss << "\"input_fps\":" << n.input_fps << ',';
oss << "\"output_fps\":" << n.output_fps << ',';
oss << "\"ok_total\":" << n.ok_total << ',';
oss << "\"drop_total\":" << n.drop_total << ',';
oss << "\"error_total\":" << n.error_total << ',';
oss << "\"avg_process_time_ms\":" << n.avg_process_time_ms << ',';
oss << "\"input_queue\":" << QueueJson(n.input_queue) << ',';
oss << "\"custom_metrics\":" << StringifySimpleJson(n.custom_metrics);
oss << "}";
return oss.str();
}
static std::string ErrorJson(const std::string& msg) {
std::ostringstream oss;
oss << "{\"error\":\"" << JsonEscape(msg) << "\"}";
return oss.str();
}
struct HttpRequest {
std::string method;
std::string target;
std::string path;
std::string query;
std::map<std::string, std::string> headers;
std::string body;
};
struct HttpResponse {
int status = 200;
std::string content_type = "application/json; charset=utf-8";
std::string body;
};
static bool RecvUntil(Sock s, std::string& data, const std::string& needle, size_t max_bytes) {
char buf[4096];
while (data.find(needle) == std::string::npos) {
if (data.size() > max_bytes) return false;
#if defined(_WIN32)
int n = recv(s, buf, static_cast<int>(sizeof(buf)), 0);
#else
int n = static_cast<int>(recv(s, buf, sizeof(buf), 0));
#endif
if (n <= 0) return false;
data.append(buf, buf + n);
}
return true;
}
static bool ParseHttpRequest(Sock s, HttpRequest& req, std::string& err) {
std::string data;
if (!RecvUntil(s, data, "\r\n\r\n", 1024 * 1024)) {
err = "failed to read headers";
return false;
}
size_t header_end = data.find("\r\n\r\n");
std::string header_part = data.substr(0, header_end);
std::string remain = data.substr(header_end + 4);
std::istringstream iss(header_part);
std::string line;
if (!std::getline(iss, line)) {
err = "empty request";
return false;
}
if (!line.empty() && line.back() == '\r') line.pop_back();
{
std::istringstream ls(line);
std::string version;
if (!(ls >> req.method >> req.target >> version)) {
err = "bad request line";
return false;
}
}
while (std::getline(iss, line)) {
if (!line.empty() && line.back() == '\r') line.pop_back();
if (line.empty()) continue;
auto pos = line.find(':');
if (pos == std::string::npos) continue;
std::string k = ToLower(line.substr(0, pos));
std::string v = line.substr(pos + 1);
Trim(v);
req.headers[k] = v;
}
// split path/query
{
auto qpos = req.target.find('?');
if (qpos == std::string::npos) {
req.path = req.target;
} else {
req.path = req.target.substr(0, qpos);
req.query = req.target.substr(qpos + 1);
}
}
size_t content_len = 0;
if (auto it = req.headers.find("content-length"); it != req.headers.end()) {
try {
content_len = static_cast<size_t>(std::stoul(it->second));
} catch (...) {
content_len = 0;
}
}
req.body = remain;
while (req.body.size() < content_len) {
char buf[4096];
#if defined(_WIN32)
int n = recv(s, buf, static_cast<int>(sizeof(buf)), 0);
#else
int n = static_cast<int>(recv(s, buf, sizeof(buf), 0));
#endif
if (n <= 0) break;
req.body.append(buf, buf + n);
}
if (req.body.size() > content_len) req.body.resize(content_len);
return true;
}
static bool SendAll(Sock s, const std::string& data) {
size_t sent = 0;
while (sent < data.size()) {
#if defined(_WIN32)
int n = send(s, data.data() + sent, static_cast<int>(data.size() - sent), 0);
#else
ssize_t n = send(s, data.data() + sent, data.size() - sent, 0);
#endif
if (n <= 0) return false;
sent += static_cast<size_t>(n);
}
return true;
}
static std::string MakeHttpResponse(const HttpResponse& res) {
std::ostringstream oss;
oss << "HTTP/1.1 " << res.status << " " << StatusText(res.status) << "\r\n";
oss << "Content-Type: " << res.content_type << "\r\n";
oss << "Content-Length: " << res.body.size() << "\r\n";
oss << "Connection: close\r\n";
oss << "\r\n";
oss << res.body;
return oss.str();
}
} // namespace
HttpServer::HttpServer(GraphManager& gm, int port, std::string web_root)
: gm_(gm), port_(port), web_root_(std::move(web_root)) {}
HttpServer::~HttpServer() {
Stop();
}
bool HttpServer::Start() {
bool expected = false;
if (!running_.compare_exchange_strong(expected, true)) {
return true;
}
worker_ = std::thread(&HttpServer::ServerLoop, this);
return true;
}
void HttpServer::Stop() {
bool was_running = running_.exchange(false);
if (!was_running) return;
const int64_t ls = listen_sock_.exchange(-1);
if (ls != -1) {
CloseSock(static_cast<Sock>(ls));
}
if (worker_.joinable()) worker_.join();
}
void HttpServer::ServerLoop() {
if (!InitSockets()) {
LogError("[HttpServer] socket init failed");
running_.store(false);
return;
}
Sock srv = socket(AF_INET, SOCK_STREAM, 0);
if (srv == kInvalidSock) {
LogError("[HttpServer] socket() failed");
CleanupSockets();
running_.store(false);
return;
}
int opt = 1;
#if defined(_WIN32)
setsockopt(srv, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<const char*>(&opt), sizeof(opt));
#else
setsockopt(srv, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
#endif
sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_port = htons(static_cast<uint16_t>(port_));
if (bind(srv, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) != 0) {
LogError("[HttpServer] bind failed on port " + std::to_string(port_));
CloseSock(srv);
CleanupSockets();
running_.store(false);
return;
}
if (listen(srv, 16) != 0) {
LogError("[HttpServer] listen failed");
CloseSock(srv);
CleanupSockets();
running_.store(false);
return;
}
(void)SetNonBlocking(srv);
listen_sock_.store(static_cast<int64_t>(srv));
LogInfo("[HttpServer] listening on 0.0.0.0:" + std::to_string(port_) + " (web_root=" + web_root_ + ")");
while (running_.load()) {
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(srv, &rfds);
timeval tv{};
tv.tv_sec = 0;
tv.tv_usec = 200000;
#if defined(_WIN32)
int ret = select(0, &rfds, nullptr, nullptr, &tv);
#else
int ret = select(srv + 1, &rfds, nullptr, nullptr, &tv);
#endif
if (!running_.load()) break;
if (ret <= 0) continue;
if (!FD_ISSET(srv, &rfds)) continue;
sockaddr_in cli{};
#if defined(_WIN32)
int len = sizeof(cli);
#else
socklen_t len = sizeof(cli);
#endif
Sock c = accept(srv, reinterpret_cast<sockaddr*>(&cli), &len);
if (c == kInvalidSock) continue;
HttpRequest req;
std::string perr;
HttpResponse resp;
if (!ParseHttpRequest(c, req, perr)) {
resp.status = 400;
resp.body = ErrorJson(perr);
auto raw = MakeHttpResponse(resp);
(void)SendAll(c, raw);
CloseSock(c);
continue;
}
// Dispatch
if (req.path.rfind("/api/", 0) == 0) {
auto OkJson = [] { return std::string("{\"ok\":true}"); };
if (req.method == "GET") {
if (req.path == "/api/graphs") {
auto snaps = gm_.ListGraphSnapshots();
std::ostringstream oss;
oss << "[";
for (size_t i = 0; i < snaps.size(); ++i) {
if (i) oss << ',';
oss << "{";
oss << "\"name\":\"" << JsonEscape(snaps[i].name) << "\",";
oss << "\"running\":" << (snaps[i].running ? "true" : "false") << ',';
oss << "\"total_fps\":" << snaps[i].total_fps << ',';
oss << "\"alarm_total\":" << snaps[i].alarm_total << ',';
oss << "\"publish_clients\":" << snaps[i].publish_clients;
oss << "}";
}
oss << "]";
resp.body = oss.str();
} else if (req.path.rfind("/api/graphs/", 0) == 0) {
std::string name = req.path.substr(std::string("/api/graphs/").size());
GraphSnapshot gs;
std::string gerr;
if (!gm_.GetGraphSnapshot(name, gs, gerr)) {
resp.status = 404;
resp.body = ErrorJson(gerr);
} else {
resp.body = GraphSnapshotJson(gs);
}
} else if (req.path == "/api/logs/recent") {
auto q = ParseQuery(req.query);
size_t limit = 200;
if (auto it = q.find("limit"); it != q.end()) {
try {
limit = static_cast<size_t>(std::stoul(it->second));
} catch (...) {
limit = 200;
}
}
auto lines = Logger::Instance().RecentLines(limit);
std::ostringstream oss;
oss << "{";
oss << "\"lines\":[";
for (size_t i = 0; i < lines.size(); ++i) {
if (i) oss << ',';
oss << "\"" << JsonEscape(lines[i]) << "\"";
}
oss << "]}";
resp.body = oss.str();
} else if (req.path == "/api/log/level") {
const LogLevel lvl = Logger::Instance().GetLevel();
resp.body = std::string("{\"level\":\"") + LogLevelToString(lvl) + "\"}";
} else if (req.path.rfind("/api/nodes/", 0) == 0) {
// /api/nodes/{id}/metrics
const std::string prefix = "/api/nodes/";
const std::string suffix = "/metrics";
auto pos = req.path.rfind(suffix);
if (pos == std::string::npos || pos + suffix.size() != req.path.size() || pos <= prefix.size()) {
resp.status = 404;
resp.body = ErrorJson("not found");
} else {
std::string node_id = req.path.substr(prefix.size(), pos - prefix.size());
auto q = ParseQuery(req.query);
std::optional<std::string> graph;
if (auto it = q.find("graph"); it != q.end() && !it->second.empty()) {
graph = it->second;
}
NodeSnapshot ns;
std::string nerr;
if (!gm_.GetNodeSnapshot(node_id, graph, ns, nerr)) {
resp.status = (nerr.find("not unique") != std::string::npos) ? 409 : 404;
resp.body = ErrorJson(nerr);
} else {
resp.body = NodeSnapshotJson(ns);
}
}
} else {
resp.status = 404;
resp.body = ErrorJson("not found");
}
} else if (req.method == "POST") {
if (req.path == "/api/config/reload") {
if (gm_.ConfigPath().empty()) {
resp.status = 500;
resp.body = ErrorJson("config_path not set");
} else {
std::string rerr;
if (!gm_.ReloadFromFile(gm_.ConfigPath(), rerr)) {
resp.status = 500;
resp.body = ErrorJson(rerr);
} else {
resp.body = OkJson();
}
}
} else if (req.path == "/api/config/rollback") {
std::string rerr;
if (!gm_.RollbackFromLastGood(rerr)) {
resp.status = 500;
resp.body = ErrorJson(rerr);
} else {
resp.body = OkJson();
}
} else if (req.path == "/api/log/level") {
if (req.body.empty()) {
resp.status = 400;
resp.body = ErrorJson("empty body");
} else {
SimpleJson body;
std::string jerr;
if (!ParseSimpleJson(req.body, body, jerr)) {
resp.status = 400;
resp.body = ErrorJson(jerr);
} else {
const std::string level = body.ValueOr<std::string>("level", "");
if (level.empty()) {
resp.status = 400;
resp.body = ErrorJson("missing 'level'");
} else {
LogLevel lvl;
if (!ParseLogLevel(level, lvl)) {
resp.status = 400;
resp.body = ErrorJson("invalid level; use debug|info|warn|error");
} else {
Logger::Instance().SetLevel(lvl);
resp.body = OkJson();
}
}
}
}
} else if (req.path.rfind("/api/nodes/", 0) == 0) {
// /api/nodes/{id}/config
const std::string prefix = "/api/nodes/";
const std::string suffix = "/config";
auto pos = req.path.rfind(suffix);
if (pos == std::string::npos || pos + suffix.size() != req.path.size() || pos <= prefix.size()) {
resp.status = 404;
resp.body = ErrorJson("not found");
} else {
std::string node_id = req.path.substr(prefix.size(), pos - prefix.size());
auto q = ParseQuery(req.query);
std::optional<std::string> graph;
if (auto it = q.find("graph"); it != q.end() && !it->second.empty()) {
graph = it->second;
}
if (req.body.empty()) {
resp.status = 400;
resp.body = ErrorJson("empty body");
} else {
SimpleJson body;
std::string jerr;
if (!ParseSimpleJson(req.body, body, jerr)) {
resp.status = 400;
resp.body = ErrorJson(jerr);
} else {
std::string uerr;
if (!gm_.UpdateNodeConfig(node_id, graph, body, uerr)) {
if (uerr.find("not unique") != std::string::npos) resp.status = 409;
else if (uerr.find("not found") != std::string::npos) resp.status = 404;
else resp.status = 400;
resp.body = ErrorJson(uerr);
} else {
resp.body = OkJson();
}
}
}
}
} else {
resp.status = 404;
resp.body = ErrorJson("not found");
}
} else {
resp.status = 405;
resp.body = ErrorJson("method not allowed");
}
} else {
std::string path = req.path;
if (path == "/") path = "/index.html";
if (!IsSafeStaticPath(path)) {
resp.status = 404;
resp.body = ErrorJson("not found");
} else {
std::string full = web_root_;
if (!full.empty() && (full.back() == '/' || full.back() == '\\')) {
full.pop_back();
}
full += path;
std::string content;
if (!ReadFile(full, content)) {
resp.status = 404;
resp.body = ErrorJson("not found");
} else {
resp.status = 200;
resp.content_type = ContentTypeForPath(path);
resp.body = std::move(content);
}
}
}
auto raw = MakeHttpResponse(resp);
(void)SendAll(c, raw);
CloseSock(c);
}
listen_sock_.store(-1);
CloseSock(srv);
CleanupSockets();
}
} // namespace rk3588