OrangePi3588Media/agent/internal/httpapi/server.go
2026-04-18 12:16:10 +08:00

875 lines
26 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package httpapi
import (
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"mime"
"net/http"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
"time"
"rk3588sys/agent/internal/audit"
"rk3588sys/agent/internal/config"
"rk3588sys/agent/internal/files"
"rk3588sys/agent/internal/mediaserver"
"rk3588sys/agent/internal/metrics"
"rk3588sys/agent/internal/modelstore"
"rk3588sys/agent/internal/procctl"
"rk3588sys/agent/internal/sysinfo"
"rk3588sys/agent/internal/tasks"
)
type Server struct {
agentCfg config.AgentConfig
ms *mediaserver.Client
store *modelstore.Store
proc procctl.ProcessController
audit *audit.Recorder
tasks *tasks.Registry
baseDir string
execPath string
deviceID string
hostname string
agentPort int
mediaPort int
version string
buildID string
buildType string
gitSHA string
cpuMu sync.Mutex
lastCPU metrics.CPUStat
lastCPUTS time.Time
handler http.Handler
}
// ServeHTTP 实现 http.Handler 接口
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.handler.ServeHTTP(w, r)
}
type InfoResponse struct {
DeviceID string `json:"device_id"`
DeviceName string `json:"device_name"`
Hostname string `json:"hostname"`
IP string `json:"ip"`
AgentPort int `json:"agent_port"`
MediaPort int `json:"media_port"`
Version string `json:"version"`
BuildID string `json:"build_id"`
BuildType string `json:"build_type"`
GitSHA string `json:"git_sha"`
ConfigPath string `json:"config_path"`
LastGoodPath string `json:"last_good_path"`
UptimeSec int64 `json:"uptime_sec"`
}
func New(agentCfg config.AgentConfig, baseDir string, ms *mediaserver.Client, store *modelstore.Store, deviceID string, agentPort int, mediaPort int, version, buildID, buildType, gitSHA string) *Server {
var pc procctl.ProcessController
if agentCfg.MediaServerProcess.Enable {
// 使用传统的 fork/exec 模式
pc = procctl.New(agentCfg, baseDir)
} else {
// 使用 systemctl 模式管理 Media Server
pc = procctl.NewSystemCtlController("media-server")
}
execPath, _ := os.Executable()
if execPath != "" {
if resolved, err := filepath.EvalSymlinks(execPath); err == nil {
execPath = resolved
}
}
s := &Server{
agentCfg: agentCfg,
ms: ms,
store: store,
proc: pc,
audit: audit.NewRecorder(defaultAuditPath(baseDir)),
tasks: tasks.NewRegistry(),
baseDir: baseDir,
execPath: execPath,
deviceID: deviceID,
hostname: sysinfo.Hostname(),
agentPort: agentPort,
mediaPort: mediaPort,
version: version,
buildID: buildID,
buildType: buildType,
gitSHA: gitSHA,
}
mux := http.NewServeMux()
mux.HandleFunc("/v1/info", s.handleInfo)
mux.HandleFunc("/v1/config", s.handleConfig)
mux.HandleFunc("/v1/config/ui/schema", s.handleConfigUISchema)
mux.HandleFunc("/v1/config/ui/state", s.handleConfigUIState)
mux.HandleFunc("/v1/config/ui/plan", s.handleConfigUIPlan)
mux.HandleFunc("/v1/config/ui/apply", s.handleConfigUIApply)
mux.HandleFunc("/v1/face-gallery", s.handleFaceGallery)
mux.HandleFunc("/v1/face-gallery/reload", s.handleFaceGalleryReload)
mux.HandleFunc("/v1/models", s.handleModelsList)
mux.HandleFunc("/v1/models/", s.handleModelUpload)
mux.HandleFunc("/v1/media-server/reload", s.handleMediaReload)
mux.HandleFunc("/v1/media-server/rollback", s.handleMediaRollback)
mux.HandleFunc("/v1/media-server/start", s.handleMediaStart)
mux.HandleFunc("/v1/media-server/restart", s.handleMediaRestart)
mux.HandleFunc("/v1/media-server/stop", s.handleMediaStop)
mux.HandleFunc("/v1/media-server/status", s.handleMediaStatus)
mux.HandleFunc("/v1/media-server/configs/", s.handleMediaConfigUpload)
mux.HandleFunc("/v1/media-server/binary", s.handleMediaBinaryUpdate)
mux.HandleFunc("/v1/media-server/binary/rollback", s.handleMediaBinaryRollback)
mux.HandleFunc("/v1/agent/binary", s.handleAgentBinaryUpdate)
mux.HandleFunc("/v1/graphs", s.handleGraphs)
mux.HandleFunc("/v1/graphs/", s.handleGraphDetail)
mux.HandleFunc("/v1/logs/recent", s.handleLogsRecent)
mux.HandleFunc("/v1/metrics", s.handleMetrics)
mux.HandleFunc("/v1/versions", s.handleVersions)
mux.HandleFunc("/v1/assets", s.handleAssets)
mux.HandleFunc("/v1/tasks/", s.handleTask)
s.handler = mux
return s
}
func (s *Server) handleInfo(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, false) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
ip := sysinfo.PrimaryIPv4()
resp := InfoResponse{
DeviceID: s.deviceID,
DeviceName: s.agentCfg.DeviceName,
Hostname: s.hostname,
IP: ip,
AgentPort: s.agentPort,
MediaPort: s.mediaPort,
Version: s.version,
BuildID: s.buildID,
BuildType: s.buildType,
GitSHA: s.gitSHA,
ConfigPath: s.agentCfg.ConfigPath,
LastGoodPath: s.agentCfg.ConfigPath + ".last_good.json",
UptimeSec: sysinfo.UptimeSec(),
}
writeJSON(w, http.StatusOK, resp)
}
func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
if !s.authorize(r, false) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
b, err := os.ReadFile(s.agentCfg.ConfigPath)
if err != nil {
if os.IsNotExist(err) {
errorJSON(w, http.StatusNotFound, "not found")
return
}
errorJSON(w, http.StatusInternalServerError, "internal error: read config failed: "+err.Error())
return
}
var tmp any
if err := json.Unmarshal(b, &tmp); err != nil {
errorJSON(w, http.StatusInternalServerError, "internal error: config is not valid json: "+err.Error())
return
}
writeRawJSON(w, http.StatusOK, b)
return
case http.MethodPut:
if !s.authorize(r, true) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
if mt, _, err := mime.ParseMediaType(r.Header.Get("Content-Type")); err != nil || mt != "application/json" {
errorJSON(w, http.StatusBadRequest, "validation failed: Content-Type must be application/json")
return
}
const maxConfigBytes = int64(20 << 20)
r.Body = http.MaxBytesReader(w, r.Body, maxConfigBytes)
body, err := io.ReadAll(r.Body)
if err != nil {
if strings.Contains(err.Error(), "request body too large") {
errorJSON(w, http.StatusRequestEntityTooLarge, "payload too large")
return
}
errorJSON(w, http.StatusBadRequest, "invalid json: "+err.Error())
return
}
if len(body) == 0 {
errorJSON(w, http.StatusBadRequest, "validation failed: empty body")
return
}
var tmp any
if err := json.Unmarshal(body, &tmp); err != nil {
errorJSON(w, http.StatusBadRequest, "invalid json: "+err.Error())
return
}
if err := s.applyRootConfigBytes(r.Context(), body); err != nil {
s.recordAudit(r, "config.update", false, err.Error())
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
s.recordAudit(r, "config.update", true, "")
writeJSON(w, http.StatusOK, map[string]any{"ok": true})
return
default:
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
}
func (s *Server) applyRootConfigBytes(ctx context.Context, body []byte) error {
if err := files.WriteFileAtomic(s.agentCfg.ConfigPath, append(body, '\n'), 0o644); err != nil {
return fmt.Errorf("write config failed: %w", err)
}
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := s.ms.Reload(ctx); err != nil {
rerr := err
rbErr := s.ms.Rollback(ctx)
if rbErr != nil {
return fmt.Errorf("reload failed: %v; rollback failed: %v", rerr, rbErr)
}
return fmt.Errorf("reload failed: %v; rollback ok", rerr)
}
return nil
}
var modelNameRE = regexp.MustCompile(`^[A-Za-z0-9._-]+$`)
var configNameRE = regexp.MustCompile(`^[A-Za-z0-9._-]+$`)
func (s *Server) handleModelUpload(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPut {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, true) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
if mt, _, err := mime.ParseMediaType(r.Header.Get("Content-Type")); err != nil || mt != "application/octet-stream" {
errorJSON(w, http.StatusBadRequest, "validation failed: Content-Type must be application/octet-stream")
return
}
name := strings.TrimPrefix(r.URL.Path, "/v1/models/")
name = strings.TrimSpace(name)
if name == "" || strings.Contains(name, "/") || strings.Contains(name, "\\") {
errorJSON(w, http.StatusBadRequest, "validation failed: invalid name")
return
}
if !modelNameRE.MatchString(name) {
errorJSON(w, http.StatusBadRequest, "validation failed: invalid name")
return
}
if r.ContentLength <= 0 {
errorJSON(w, http.StatusBadRequest, "validation failed: missing Content-Length")
return
}
expected := strings.TrimSpace(r.Header.Get("X-Model-Sha256"))
if expected != "" {
if len(expected) != 64 {
errorJSON(w, http.StatusBadRequest, "validation failed: invalid X-Model-Sha256")
return
}
if _, err := hex.DecodeString(expected); err != nil {
errorJSON(w, http.StatusBadRequest, "validation failed: invalid X-Model-Sha256")
return
}
}
task := s.tasks.Start("model_upload")
item, err := s.store.Upload(name, r.Body, r.ContentLength, expected)
if err != nil {
_, _ = s.tasks.Finish(task.ID, nil, err)
w.Header().Set("X-Task-Id", task.ID)
s.recordAudit(r, "model.upload", false, err.Error())
if errors.Is(err, modelstore.ErrPayloadTooLarge) {
errorJSON(w, http.StatusRequestEntityTooLarge, "payload too large")
return
}
if strings.Contains(err.Error(), "sha256 mismatch") {
errorJSON(w, http.StatusBadRequest, "validation failed: sha256 mismatch")
return
}
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
_, _ = s.tasks.Finish(task.ID, item, nil)
s.recordAudit(r, "model.upload", true, name)
writeJSON(w, http.StatusOK, map[string]any{
"ok": true,
"name": item.Name,
"sha256": item.Sha256,
"path": item.Path,
"size": item.Size,
"task_id": task.ID,
})
}
func (s *Server) handleModelsList(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, false) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
m, err := s.store.List()
if err != nil {
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
writeJSON(w, http.StatusOK, m)
}
func (s *Server) handleMediaConfigUpload(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPut {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, true) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
if mt, _, err := mime.ParseMediaType(r.Header.Get("Content-Type")); err != nil || mt != "application/json" {
errorJSON(w, http.StatusBadRequest, "validation failed: Content-Type must be application/json")
return
}
name := strings.TrimPrefix(r.URL.Path, "/v1/media-server/configs/")
name = strings.TrimSpace(name)
finalName, err := normalizeConfigName(name)
if err != nil {
errorJSON(w, http.StatusBadRequest, "validation failed: invalid name")
return
}
configsDir, err := s.resolveConfigsDir()
if err != nil {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
}
maxBytes := int64(s.agentCfg.MaxUploadMB) * 1024 * 1024
r.Body = http.MaxBytesReader(w, r.Body, maxBytes)
body, err := io.ReadAll(r.Body)
if err != nil {
if strings.Contains(err.Error(), "request body too large") {
errorJSON(w, http.StatusRequestEntityTooLarge, "payload too large")
return
}
errorJSON(w, http.StatusBadRequest, "invalid json: "+err.Error())
return
}
if len(body) == 0 {
errorJSON(w, http.StatusBadRequest, "validation failed: empty body")
return
}
var tmp any
if err := json.Unmarshal(body, &tmp); err != nil {
errorJSON(w, http.StatusBadRequest, "invalid json: "+err.Error())
return
}
dst := filepath.Join(configsDir, finalName)
if err := files.WriteFileAtomic(dst, append(body, '\n'), 0o644); err != nil {
s.recordAudit(r, "media.config.upload", false, err.Error())
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
st, err := os.Stat(dst)
if err != nil {
s.recordAudit(r, "media.config.upload", false, err.Error())
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
s.recordAudit(r, "media.config.upload", true, finalName)
writeJSON(w, http.StatusOK, map[string]any{
"ok": true,
"name": finalName,
"path": filepath.ToSlash(dst),
"size": st.Size(),
"mtime_ms": st.ModTime().UnixMilli(),
})
}
func (s *Server) handleMediaReload(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, true) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
if err := s.ms.Reload(r.Context()); err != nil {
s.recordAudit(r, "media.reload", false, err.Error())
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
s.recordAudit(r, "media.reload", true, "")
writeJSON(w, http.StatusOK, map[string]any{"ok": true})
}
func (s *Server) handleMediaRollback(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, true) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
if err := s.ms.Rollback(r.Context()); err != nil {
s.recordAudit(r, "media.rollback", false, err.Error())
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
s.recordAudit(r, "media.rollback", true, "")
writeJSON(w, http.StatusOK, map[string]any{"ok": true})
}
type mediaProcReq struct {
Config string `json:"config"`
}
// StartMediaServer 启动 media-server供外部调用
func (s *Server) StartMediaServer(configName string) error {
if s.proc == nil || !s.proc.Enabled() {
return errors.New("media server process management not enabled")
}
_, err := s.proc.Start(configName)
return err
}
func (s *Server) handleMediaStart(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, true) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
if s.proc == nil || !s.proc.Enabled() {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
}
req, err := readOptionalJSON[mediaProcReq](w, r, 1<<20)
if err != nil {
errorJSON(w, http.StatusBadRequest, err.Error())
return
}
st, err := s.proc.Start(req.Config)
if err != nil {
s.recordAudit(r, "media.start", false, err.Error())
if errors.Is(err, procctl.ErrConflict) {
errorJSON(w, http.StatusConflict, err.Error())
return
}
if errors.Is(err, procctl.ErrInvalidConfig) || errors.Is(err, procctl.ErrConfigNotFound) {
errorJSON(w, http.StatusBadRequest, "validation failed: "+err.Error())
return
}
if errors.Is(err, procctl.ErrNotSupported) {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
}
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
s.recordAudit(r, "media.start", true, req.Config)
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "running": st.Running, "pid": st.Pid, "config_path": st.ConfigPath, "started_at_ms": st.StartedAtMS})
}
func (s *Server) handleMediaRestart(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, true) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
if s.proc == nil || !s.proc.Enabled() {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
}
req, err := readOptionalJSON[mediaProcReq](w, r, 1<<20)
if err != nil {
errorJSON(w, http.StatusBadRequest, err.Error())
return
}
st, err := s.proc.Restart(req.Config)
if err != nil {
s.recordAudit(r, "media.restart", false, err.Error())
if errors.Is(err, procctl.ErrInvalidConfig) || errors.Is(err, procctl.ErrConfigNotFound) {
errorJSON(w, http.StatusBadRequest, "validation failed: "+err.Error())
return
}
if errors.Is(err, procctl.ErrNotSupported) {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
}
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
s.recordAudit(r, "media.restart", true, req.Config)
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "running": st.Running, "pid": st.Pid, "config_path": st.ConfigPath, "started_at_ms": st.StartedAtMS})
}
func (s *Server) handleMediaStop(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, true) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
if s.proc == nil || !s.proc.Enabled() {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
}
st, err := s.proc.Stop()
if err != nil {
s.recordAudit(r, "media.stop", false, err.Error())
if errors.Is(err, procctl.ErrNotSupported) {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
}
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
s.recordAudit(r, "media.stop", true, "")
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "running": st.Running, "pid": st.Pid, "config_path": st.ConfigPath, "started_at_ms": st.StartedAtMS})
}
func (s *Server) handleMediaStatus(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, false) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
if s.proc == nil || !s.proc.Enabled() {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
}
st, err := s.proc.Status()
if err != nil {
if errors.Is(err, procctl.ErrNotSupported) {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
}
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
ver, verr := s.proc.Version()
if verr != nil {
errorJSON(w, http.StatusInternalServerError, "internal error: "+verr.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{
"ok": true,
"running": st.Running,
"pid": st.Pid,
"config_path": st.ConfigPath,
"started_at_ms": st.StartedAtMS,
"version": ver,
})
}
func (s *Server) handleMediaBinaryUpdate(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPut {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, true) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
if s.proc == nil || !s.proc.Enabled() {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
}
if mt, _, err := mime.ParseMediaType(r.Header.Get("Content-Type")); err != nil || mt != "application/octet-stream" {
errorJSON(w, http.StatusBadRequest, "validation failed: Content-Type must be application/octet-stream")
return
}
if r.ContentLength <= 0 {
errorJSON(w, http.StatusBadRequest, "validation failed: missing Content-Length")
return
}
maxBytes := int64(s.agentCfg.MaxUploadMB) * 1024 * 1024
r.Body = http.MaxBytesReader(w, r.Body, maxBytes)
expected := strings.TrimSpace(r.Header.Get("X-Binary-Sha256"))
if expected != "" {
if len(expected) != 64 {
errorJSON(w, http.StatusBadRequest, "validation failed: invalid X-Binary-Sha256")
return
}
if _, err := hex.DecodeString(expected); err != nil {
errorJSON(w, http.StatusBadRequest, "validation failed: invalid X-Binary-Sha256")
return
}
}
task := s.tasks.Start("media_binary_update")
res, err := s.proc.UpdateBinary(r.Body, r.ContentLength, expected)
if err != nil {
_, _ = s.tasks.Finish(task.ID, nil, err)
w.Header().Set("X-Task-Id", task.ID)
s.recordAudit(r, "media.binary.update", false, err.Error())
if errors.Is(err, procctl.ErrConflict) {
errorJSON(w, http.StatusConflict, err.Error())
return
}
if strings.Contains(err.Error(), "payload too large") || strings.Contains(err.Error(), "request body too large") {
errorJSON(w, http.StatusRequestEntityTooLarge, "payload too large")
return
}
if strings.Contains(err.Error(), "sha256 mismatch") {
errorJSON(w, http.StatusBadRequest, "validation failed: sha256 mismatch")
return
}
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
_, _ = s.tasks.Finish(task.ID, res, nil)
s.recordAudit(r, "media.binary.update", true, res.Path)
writeJSON(w, http.StatusOK, map[string]any{
"ok": true,
"path": res.Path,
"sha256": res.Sha256,
"size": res.Size,
"mtime_ms": res.MtimeMS,
"backup_path": res.BackupPath,
"task_id": task.ID,
})
}
type mediaBinaryRollbackReq struct {
BackupPath string `json:"backup_path"`
}
func (s *Server) handleMediaBinaryRollback(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, true) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
if s.proc == nil || !s.proc.Enabled() {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
}
req, err := readRequiredJSON[mediaBinaryRollbackReq](w, r, 1<<20)
if err != nil {
errorJSON(w, http.StatusBadRequest, err.Error())
return
}
task := s.tasks.Start("media_binary_rollback")
res, err := s.proc.RollbackBinary(req.BackupPath)
if err != nil {
_, _ = s.tasks.Finish(task.ID, nil, err)
w.Header().Set("X-Task-Id", task.ID)
s.recordAudit(r, "media.binary.rollback", false, err.Error())
if errors.Is(err, procctl.ErrConflict) {
errorJSON(w, http.StatusConflict, err.Error())
return
}
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
_, _ = s.tasks.Finish(task.ID, res, nil)
s.recordAudit(r, "media.binary.rollback", true, res.Path)
writeJSON(w, http.StatusOK, map[string]any{
"ok": true,
"path": res.Path,
"sha256": res.Sha256,
"size": res.Size,
"mtime_ms": res.MtimeMS,
"backup_path": res.BackupPath,
"task_id": task.ID,
})
}
func normalizeConfigName(name string) (string, error) {
if name == "" || strings.Contains(name, "/") || strings.Contains(name, "\\") || strings.Contains(name, "..") {
return "", errors.New("invalid name")
}
if !configNameRE.MatchString(name) {
return "", errors.New("invalid name")
}
if !strings.HasSuffix(strings.ToLower(name), ".json") {
name += ".json"
}
return name, nil
}
func (s *Server) resolveConfigsDir() (string, error) {
base := strings.TrimSpace(s.agentCfg.MediaServerProcess.ConfigsDir)
if base == "" {
return "", errors.New("configs_dir is empty")
}
if filepath.IsAbs(base) {
return base, nil
}
if s.baseDir == "" {
return filepath.Clean(base), nil
}
return filepath.Join(s.baseDir, base), nil
}
func readOptionalJSON[T any](w http.ResponseWriter, r *http.Request, maxBytes int64) (T, error) {
var zero T
if r.Body == nil {
return zero, nil
}
r.Body = http.MaxBytesReader(w, r.Body, maxBytes)
body, err := io.ReadAll(r.Body)
if err != nil {
if strings.Contains(err.Error(), "request body too large") {
return zero, errors.New("payload too large")
}
return zero, fmt.Errorf("invalid json: %v", err)
}
if len(strings.TrimSpace(string(body))) == 0 {
return zero, nil
}
if mt, _, err := mime.ParseMediaType(r.Header.Get("Content-Type")); err != nil || mt != "application/json" {
return zero, errors.New("validation failed: Content-Type must be application/json")
}
var v T
if err := json.Unmarshal(body, &v); err != nil {
return zero, fmt.Errorf("invalid json: %v", err)
}
return v, nil
}
func (s *Server) handleGraphs(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, false) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
st, b, err := s.ms.GetGraphs(r.Context())
if err != nil {
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
writeRawJSON(w, st, b)
}
func (s *Server) handleGraphDetail(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, false) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
name := strings.TrimPrefix(r.URL.Path, "/v1/graphs/")
if name == "" {
errorJSON(w, http.StatusNotFound, "not found")
return
}
st, b, err := s.ms.GetGraph(r.Context(), name)
if err != nil {
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
writeRawJSON(w, st, b)
}
func (s *Server) handleLogsRecent(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, false) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
limit := 200
if v := strings.TrimSpace(r.URL.Query().Get("limit")); v != "" {
if n, err := strconv.Atoi(v); err == nil && n > 0 {
limit = n
}
}
st, b, err := s.ms.GetLogsRecent(r.Context(), limit)
if err != nil {
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
writeRawJSON(w, st, b)
}
func (s *Server) authorize(r *http.Request, write bool) bool {
need := write || s.agentCfg.RequireTokenForRead
if !need {
return true
}
tok := r.Header.Get("X-RK-Token")
return tok != "" && tok == s.agentCfg.Token
}
func writeJSON(w http.ResponseWriter, status int, v any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
_ = json.NewEncoder(w).Encode(v)
}
func writeRawJSON(w http.ResponseWriter, status int, raw []byte) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
_, _ = w.Write(raw)
}
func errorJSON(w http.ResponseWriter, status int, msg string) {
writeJSON(w, status, map[string]any{"error": msg})
}