OrangePi3588Media/agent/internal/httpapi/server.go

1084 lines
34 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package httpapi
import (
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"mime"
"net/http"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
"time"
"rk3588sys/agent/internal/audit"
"rk3588sys/agent/internal/config"
"rk3588sys/agent/internal/files"
"rk3588sys/agent/internal/mediaserver"
"rk3588sys/agent/internal/metrics"
"rk3588sys/agent/internal/modelstore"
"rk3588sys/agent/internal/procctl"
"rk3588sys/agent/internal/sysinfo"
"rk3588sys/agent/internal/tasks"
)
type Server struct {
agentCfg config.AgentConfig
ms *mediaserver.Client
store *modelstore.Store
proc procctl.ProcessController
audit *audit.Recorder
tasks *tasks.Registry
baseDir string
execPath string
resourcesDir string
deviceID string
hostname string
agentPort int
mediaPort int
version string
buildID string
buildType string
gitSHA string
cpuMu sync.Mutex
lastCPU metrics.CPUStat
lastCPUTS time.Time
handler http.Handler
}
// ServeHTTP 实现 http.Handler 接口
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.handler.ServeHTTP(w, r)
}
type InfoResponse struct {
DeviceID string `json:"device_id"`
DeviceName string `json:"device_name"`
Hostname string `json:"hostname"`
IP string `json:"ip"`
AgentPort int `json:"agent_port"`
MediaPort int `json:"media_port"`
Version string `json:"version"`
BuildID string `json:"build_id"`
BuildType string `json:"build_type"`
GitSHA string `json:"git_sha"`
ConfigPath string `json:"config_path"`
PreviousConfigPath string `json:"previous_config_path"`
ConfigID string `json:"config_id,omitempty"`
ConfigVersion string `json:"config_version,omitempty"`
BusinessName string `json:"business_name,omitempty"`
Template string `json:"template,omitempty"`
Profile string `json:"profile,omitempty"`
Overlays []string `json:"overlays,omitempty"`
InstanceName string `json:"instance_name,omitempty"`
InstanceDisplayName string `json:"instance_display_name,omitempty"`
UptimeSec int64 `json:"uptime_sec"`
}
func New(agentCfg config.AgentConfig, baseDir string, ms *mediaserver.Client, store *modelstore.Store, deviceID string, agentPort int, mediaPort int, version, buildID, buildType, gitSHA string) *Server {
var pc procctl.ProcessController
if agentCfg.MediaServerProcess.Enable {
// 使用传统的 fork/exec 模式
pc = procctl.New(agentCfg, baseDir)
} else {
// 使用 systemctl 模式管理 Media Server
pc = procctl.NewSystemCtlController("media-server")
}
execPath, _ := os.Executable()
if execPath != "" {
if resolved, err := filepath.EvalSymlinks(execPath); err == nil {
execPath = resolved
}
}
resourcesDir := strings.TrimSpace(agentCfg.ResourcesDir)
if resourcesDir == "" {
resourcesDir = filepath.Join(baseDir, "resources")
}
s := &Server{
agentCfg: agentCfg,
ms: ms,
store: store,
proc: pc,
audit: audit.NewRecorder(defaultAuditPath(baseDir)),
tasks: tasks.NewRegistry(),
baseDir: baseDir,
execPath: execPath,
resourcesDir: resourcesDir,
deviceID: deviceID,
hostname: sysinfo.Hostname(),
agentPort: agentPort,
mediaPort: mediaPort,
version: version,
buildID: buildID,
buildType: buildType,
gitSHA: gitSHA,
}
mux := http.NewServeMux()
mux.HandleFunc("/v1/info", s.handleInfo)
mux.HandleFunc("/v1/config/status", s.handleConfigStatus)
mux.HandleFunc("/v1/config/candidate", s.handleConfigCandidate)
mux.HandleFunc("/v1/config/candidate/apply", s.handleConfigCandidateApply)
mux.HandleFunc("/v1/config", s.handleConfig)
mux.HandleFunc("/v1/config/ui/schema", s.handleConfigUISchema)
mux.HandleFunc("/v1/config/ui/state", s.handleConfigUIState)
mux.HandleFunc("/v1/config/ui/plan", s.handleConfigUIPlan)
mux.HandleFunc("/v1/config/ui/apply", s.handleConfigUIApply)
mux.HandleFunc("/v1/face-gallery", s.handleFaceGallery)
mux.HandleFunc("/v1/face-gallery/reload", s.handleFaceGalleryReload)
mux.HandleFunc("/v1/alarms/report", s.handleAlarmReport)
mux.HandleFunc("/v1/alarms/recent", s.handleAlarmsRecent)
mux.HandleFunc("/v1/resources/status", s.handleResourcesStatus)
mux.HandleFunc("/v1/resources/", s.handleResourceUpload)
mux.HandleFunc("/v1/models", s.handleModelsList)
mux.HandleFunc("/v1/models/status", s.handleModelsStatus)
mux.HandleFunc("/v1/models/", s.handleModelUpload)
mux.HandleFunc("/v1/media-server/reload", s.handleMediaReload)
mux.HandleFunc("/v1/media-server/rollback", s.handleMediaRollback)
mux.HandleFunc("/v1/media-server/start", s.handleMediaStart)
mux.HandleFunc("/v1/media-server/restart", s.handleMediaRestart)
mux.HandleFunc("/v1/media-server/stop", s.handleMediaStop)
mux.HandleFunc("/v1/media-server/status", s.handleMediaStatus)
mux.HandleFunc("/v1/media-server/configs/", s.handleMediaConfigUpload)
mux.HandleFunc("/v1/media-server/binary", s.handleMediaBinaryUpdate)
mux.HandleFunc("/v1/media-server/binary/rollback", s.handleMediaBinaryRollback)
mux.HandleFunc("/v1/agent/binary", s.handleAgentBinaryUpdate)
mux.HandleFunc("/v1/graph-node-types", s.handleGraphNodeTypes)
mux.HandleFunc("/v1/graphs", s.handleGraphs)
mux.HandleFunc("/v1/graphs/", s.handleGraphDetail)
mux.HandleFunc("/v1/logs/recent", s.handleLogsRecent)
mux.HandleFunc("/v1/metrics", s.handleMetrics)
mux.HandleFunc("/v1/versions", s.handleVersions)
mux.HandleFunc("/v1/assets", s.handleAssets)
mux.HandleFunc("/v1/tasks/", s.handleTask)
// Migrate face_gallery.db from old location (modelsDir) to new location (resourcesDir)
oldFG := filepath.Join(agentCfg.ModelsDir, "face_gallery.db")
newFGDir := filepath.Join(resourcesDir, "face_gallery")
newFG := filepath.Join(newFGDir, "face_gallery.db")
if _, err := os.Stat(oldFG); err == nil {
if _, err := os.Stat(newFG); os.IsNotExist(err) {
_ = files.EnsureDir(newFGDir, 0o755)
if data, rerr := os.ReadFile(oldFG); rerr == nil {
_ = files.WriteFileAtomic(newFG, data, 0o644)
}
}
}
s.handler = mux
return s
}
func (s *Server) handleInfo(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, false) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
ip := sysinfo.PrimaryIPv4()
resp := InfoResponse{
DeviceID: s.deviceID,
DeviceName: s.agentCfg.DeviceName,
Hostname: s.hostname,
IP: ip,
AgentPort: s.agentPort,
MediaPort: s.mediaPort,
Version: s.version,
BuildID: s.buildID,
BuildType: s.buildType,
GitSHA: s.gitSHA,
ConfigPath: s.agentCfg.ConfigPath,
PreviousConfigPath: s.agentCfg.ConfigPath + ".last_good.json",
UptimeSec: sysinfo.UptimeSec(),
}
if summary := readConfigMetadataSummary(s.agentCfg.ConfigPath); summary != nil {
resp.ConfigID = summary.ConfigID
resp.ConfigVersion = summary.ConfigVersion
resp.BusinessName = summary.BusinessName
resp.Template = summary.Template
resp.Profile = summary.Profile
resp.Overlays = copyStringSlice(summary.Overlays)
resp.InstanceName = summary.InstanceName
resp.InstanceDisplayName = summary.InstanceDisplayName
}
writeJSON(w, http.StatusOK, resp)
}
func (s *Server) handleConfigCandidate(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPut {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, true) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
if mt, _, err := mime.ParseMediaType(r.Header.Get("Content-Type")); err != nil || mt != "application/json" {
errorJSON(w, http.StatusBadRequest, "validation failed: Content-Type must be application/json")
return
}
maxBytes := int64(s.agentCfg.MaxUploadMB) * 1024 * 1024
if maxBytes <= 0 {
maxBytes = int64(20 << 20)
}
r.Body = http.MaxBytesReader(w, r.Body, maxBytes)
body, err := io.ReadAll(r.Body)
if err != nil {
if strings.Contains(err.Error(), "request body too large") {
errorJSON(w, http.StatusRequestEntityTooLarge, "payload too large")
return
}
errorJSON(w, http.StatusBadRequest, "invalid json: "+err.Error())
return
}
root, err := validateRootConfigJSON(body)
if err != nil {
errorJSON(w, http.StatusBadRequest, err.Error())
return
}
candidatePath := s.configCandidatePath()
if err := files.WriteFileAtomic(candidatePath, append(body, '\n'), 0o644); err != nil {
s.recordAudit(r, "config.candidate.update", false, err.Error())
errorJSON(w, http.StatusInternalServerError, "internal error: write candidate failed: "+err.Error())
return
}
st := readConfigFileStatus(candidatePath)
s.recordAudit(r, "config.candidate.update", true, candidatePath)
writeJSON(w, http.StatusOK, map[string]any{
"ok": true,
"path": filepath.ToSlash(candidatePath),
"sha256": st.Sha256,
"size": st.Size,
"mtime_ms": st.MtimeMS,
"metadata": root.Metadata,
})
}
func (s *Server) handleConfigCandidateApply(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, true) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
candidatePath := s.configCandidatePath()
body, err := os.ReadFile(candidatePath)
if err != nil {
if os.IsNotExist(err) {
errorJSON(w, http.StatusNotFound, "candidate config not found")
return
}
errorJSON(w, http.StatusInternalServerError, "internal error: read candidate failed: "+err.Error())
return
}
if _, err := validateRootConfigJSON(body); err != nil {
errorJSON(w, http.StatusBadRequest, err.Error())
return
}
if err := s.applyCandidateConfigBytes(r.Context(), body); err != nil {
s.recordAudit(r, "config.candidate.apply", false, err.Error())
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
_ = os.Remove(candidatePath)
s.recordAudit(r, "config.candidate.apply", true, candidatePath)
writeJSON(w, http.StatusOK, map[string]any{
"ok": true,
"status": s.configStatusPayload(),
})
}
func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
if !s.authorize(r, false) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
b, err := os.ReadFile(s.agentCfg.ConfigPath)
if err != nil {
if os.IsNotExist(err) {
errorJSON(w, http.StatusNotFound, "not found")
return
}
errorJSON(w, http.StatusInternalServerError, "internal error: read config failed: "+err.Error())
return
}
var tmp any
if err := json.Unmarshal(b, &tmp); err != nil {
errorJSON(w, http.StatusInternalServerError, "internal error: config is not valid json: "+err.Error())
return
}
writeRawJSON(w, http.StatusOK, b)
return
case http.MethodPut:
if !s.authorize(r, true) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
if mt, _, err := mime.ParseMediaType(r.Header.Get("Content-Type")); err != nil || mt != "application/json" {
errorJSON(w, http.StatusBadRequest, "validation failed: Content-Type must be application/json")
return
}
const maxConfigBytes = int64(20 << 20)
r.Body = http.MaxBytesReader(w, r.Body, maxConfigBytes)
body, err := io.ReadAll(r.Body)
if err != nil {
if strings.Contains(err.Error(), "request body too large") {
errorJSON(w, http.StatusRequestEntityTooLarge, "payload too large")
return
}
errorJSON(w, http.StatusBadRequest, "invalid json: "+err.Error())
return
}
if len(body) == 0 {
errorJSON(w, http.StatusBadRequest, "validation failed: empty body")
return
}
var tmp any
if err := json.Unmarshal(body, &tmp); err != nil {
errorJSON(w, http.StatusBadRequest, "invalid json: "+err.Error())
return
}
if err := s.applyRootConfigBytes(r.Context(), body); err != nil {
s.recordAudit(r, "config.update", false, err.Error())
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
s.recordAudit(r, "config.update", true, "")
writeJSON(w, http.StatusOK, map[string]any{"ok": true})
return
default:
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
}
type rootConfigDocument struct {
Templates map[string]any `json:"templates"`
Instances []any `json:"instances"`
Metadata map[string]any `json:"metadata"`
}
func validateRootConfigJSON(body []byte) (rootConfigDocument, error) {
if len(strings.TrimSpace(string(body))) == 0 {
return rootConfigDocument{}, errors.New("validation failed: empty body")
}
var root rootConfigDocument
if err := json.Unmarshal(body, &root); err != nil {
return rootConfigDocument{}, fmt.Errorf("invalid json: %w", err)
}
if len(root.Templates) == 0 {
return rootConfigDocument{}, errors.New("validation failed: templates must be a non-empty object")
}
if root.Instances == nil {
return rootConfigDocument{}, errors.New("validation failed: instances must be an array")
}
if len(root.Metadata) == 0 {
return rootConfigDocument{}, errors.New("validation failed: metadata must be a non-empty object")
}
return root, nil
}
func (s *Server) applyRootConfigBytes(ctx context.Context, body []byte) error {
previous, err := os.ReadFile(s.agentCfg.ConfigPath)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("read current config failed: %w", err)
}
var restoreBody []byte
if err == nil && len(previous) > 0 {
restoreBody = previous
}
return s.writeConfigAndReload(ctx, body, restoreBody)
}
func (s *Server) writeConfigAndReload(ctx context.Context, body []byte, restoreBody []byte) error {
if err := files.WriteFileAtomic(s.agentCfg.ConfigPath, append(body, '\n'), 0o644); err != nil {
return fmt.Errorf("write config failed: %w", err)
}
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := s.ms.Reload(ctx); err != nil {
rerr := err
if len(restoreBody) == 0 {
return fmt.Errorf("reload failed: %v", rerr)
}
if werr := files.WriteFileAtomic(s.agentCfg.ConfigPath, append(restoreBody, '\n'), 0o644); werr != nil {
return fmt.Errorf("reload failed: %v; restore write failed: %v", rerr, werr)
}
restoreCtx, restoreCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer restoreCancel()
if restoreErr := s.ms.Reload(restoreCtx); restoreErr != nil {
return fmt.Errorf("reload failed: %v; restore reload failed: %v", rerr, restoreErr)
}
return fmt.Errorf("reload failed: %v; restored previous config", rerr)
}
return nil
}
func (s *Server) applyCandidateConfigBytes(ctx context.Context, body []byte) error {
current, err := os.ReadFile(s.agentCfg.ConfigPath)
if err == nil && len(current) > 0 {
if err := files.WriteFileAtomic(s.agentCfg.ConfigPath+".last_good.json", append(current, '\n'), 0o644); err != nil {
return fmt.Errorf("write last_good failed: %w", err)
}
} else if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("read current config failed: %w", err)
}
return s.writeConfigAndReload(ctx, body, current)
}
var modelNameRE = regexp.MustCompile(`^[A-Za-z0-9._-]+$`)
var configNameRE = regexp.MustCompile(`^[A-Za-z0-9._-]+$`)
func (s *Server) handleModelUpload(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPut {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, true) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
if mt, _, err := mime.ParseMediaType(r.Header.Get("Content-Type")); err != nil || mt != "application/octet-stream" {
errorJSON(w, http.StatusBadRequest, "validation failed: Content-Type must be application/octet-stream")
return
}
name := strings.TrimPrefix(r.URL.Path, "/v1/models/")
name = strings.TrimSpace(name)
if name == "" || strings.Contains(name, "/") || strings.Contains(name, "\\") {
errorJSON(w, http.StatusBadRequest, "validation failed: invalid name")
return
}
if !modelNameRE.MatchString(name) {
errorJSON(w, http.StatusBadRequest, "validation failed: invalid name")
return
}
if r.ContentLength <= 0 {
errorJSON(w, http.StatusBadRequest, "validation failed: missing Content-Length")
return
}
expected := strings.TrimSpace(r.Header.Get("X-Model-Sha256"))
if expected != "" {
if len(expected) != 64 {
errorJSON(w, http.StatusBadRequest, "validation failed: invalid X-Model-Sha256")
return
}
if _, err := hex.DecodeString(expected); err != nil {
errorJSON(w, http.StatusBadRequest, "validation failed: invalid X-Model-Sha256")
return
}
}
task := s.tasks.Start("model_upload")
item, err := s.store.Upload(name, r.Body, r.ContentLength, expected)
if err != nil {
_, _ = s.tasks.Finish(task.ID, nil, err)
w.Header().Set("X-Task-Id", task.ID)
s.recordAudit(r, "model.upload", false, err.Error())
if errors.Is(err, modelstore.ErrPayloadTooLarge) {
errorJSON(w, http.StatusRequestEntityTooLarge, "payload too large")
return
}
if strings.Contains(err.Error(), "sha256 mismatch") {
errorJSON(w, http.StatusBadRequest, "validation failed: sha256 mismatch")
return
}
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
_, _ = s.tasks.Finish(task.ID, item, nil)
s.recordAudit(r, "model.upload", true, name)
writeJSON(w, http.StatusOK, map[string]any{
"ok": true,
"name": item.Name,
"sha256": item.Sha256,
"path": item.Path,
"size": item.Size,
"task_id": task.ID,
})
}
func (s *Server) handleModelsList(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, false) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
m, err := s.store.List()
if err != nil {
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
writeJSON(w, http.StatusOK, m)
}
func (s *Server) handleMediaConfigUpload(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPut {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, true) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
if mt, _, err := mime.ParseMediaType(r.Header.Get("Content-Type")); err != nil || mt != "application/json" {
errorJSON(w, http.StatusBadRequest, "validation failed: Content-Type must be application/json")
return
}
name := strings.TrimPrefix(r.URL.Path, "/v1/media-server/configs/")
name = strings.TrimSpace(name)
finalName, err := normalizeConfigName(name)
if err != nil {
errorJSON(w, http.StatusBadRequest, "validation failed: invalid name")
return
}
configsDir, err := s.resolveConfigsDir()
if err != nil {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
}
maxBytes := int64(s.agentCfg.MaxUploadMB) * 1024 * 1024
r.Body = http.MaxBytesReader(w, r.Body, maxBytes)
body, err := io.ReadAll(r.Body)
if err != nil {
if strings.Contains(err.Error(), "request body too large") {
errorJSON(w, http.StatusRequestEntityTooLarge, "payload too large")
return
}
errorJSON(w, http.StatusBadRequest, "invalid json: "+err.Error())
return
}
if len(body) == 0 {
errorJSON(w, http.StatusBadRequest, "validation failed: empty body")
return
}
var tmp any
if err := json.Unmarshal(body, &tmp); err != nil {
errorJSON(w, http.StatusBadRequest, "invalid json: "+err.Error())
return
}
dst := filepath.Join(configsDir, finalName)
if err := files.WriteFileAtomic(dst, append(body, '\n'), 0o644); err != nil {
s.recordAudit(r, "media.config.upload", false, err.Error())
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
st, err := os.Stat(dst)
if err != nil {
s.recordAudit(r, "media.config.upload", false, err.Error())
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
s.recordAudit(r, "media.config.upload", true, finalName)
writeJSON(w, http.StatusOK, map[string]any{
"ok": true,
"name": finalName,
"path": filepath.ToSlash(dst),
"size": st.Size(),
"mtime_ms": st.ModTime().UnixMilli(),
})
}
func (s *Server) handleMediaReload(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, true) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
if err := s.ms.Reload(r.Context()); err != nil {
s.recordAudit(r, "media.reload", false, err.Error())
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
s.recordAudit(r, "media.reload", true, "")
writeJSON(w, http.StatusOK, map[string]any{"ok": true})
}
func (s *Server) handleMediaRollback(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, true) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
previousPath := s.agentCfg.ConfigPath + ".last_good.json"
body, err := os.ReadFile(previousPath)
if err != nil {
if os.IsNotExist(err) {
s.recordAudit(r, "media.rollback", false, "previous config not found")
errorJSON(w, http.StatusNotFound, "previous config not found")
return
}
s.recordAudit(r, "media.rollback", false, err.Error())
errorJSON(w, http.StatusInternalServerError, "internal error: read previous config failed: "+err.Error())
return
}
if _, err := validateRootConfigJSON(body); err != nil {
s.recordAudit(r, "media.rollback", false, err.Error())
errorJSON(w, http.StatusBadRequest, err.Error())
return
}
if err := s.writeConfigAndReload(r.Context(), body, nil); err != nil {
s.recordAudit(r, "media.rollback", false, err.Error())
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
s.recordAudit(r, "media.rollback", true, "")
writeJSON(w, http.StatusOK, map[string]any{"ok": true})
}
type mediaProcReq struct {
Config string `json:"config"`
}
// StartMediaServer 启动 media-server供外部调用
func (s *Server) StartMediaServer(configName string) error {
if s.proc == nil || !s.proc.Enabled() {
return errors.New("media server process management not enabled")
}
_, err := s.proc.Start(configName)
return err
}
func (s *Server) handleMediaStart(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, true) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
if s.proc == nil || !s.proc.Enabled() {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
}
req, err := readOptionalJSON[mediaProcReq](w, r, 1<<20)
if err != nil {
errorJSON(w, http.StatusBadRequest, err.Error())
return
}
st, err := s.proc.Start(req.Config)
if err != nil {
s.recordAudit(r, "media.start", false, err.Error())
if errors.Is(err, procctl.ErrConflict) {
errorJSON(w, http.StatusConflict, err.Error())
return
}
if errors.Is(err, procctl.ErrInvalidConfig) || errors.Is(err, procctl.ErrConfigNotFound) {
errorJSON(w, http.StatusBadRequest, "validation failed: "+err.Error())
return
}
if errors.Is(err, procctl.ErrNotSupported) {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
}
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
s.recordAudit(r, "media.start", true, req.Config)
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "running": st.Running, "pid": st.Pid, "config_path": st.ConfigPath, "started_at_ms": st.StartedAtMS})
}
func (s *Server) handleMediaRestart(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, true) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
if s.proc == nil || !s.proc.Enabled() {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
}
req, err := readOptionalJSON[mediaProcReq](w, r, 1<<20)
if err != nil {
errorJSON(w, http.StatusBadRequest, err.Error())
return
}
st, err := s.proc.Restart(req.Config)
if err != nil {
s.recordAudit(r, "media.restart", false, err.Error())
if errors.Is(err, procctl.ErrInvalidConfig) || errors.Is(err, procctl.ErrConfigNotFound) {
errorJSON(w, http.StatusBadRequest, "validation failed: "+err.Error())
return
}
if errors.Is(err, procctl.ErrNotSupported) {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
}
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
s.recordAudit(r, "media.restart", true, req.Config)
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "running": st.Running, "pid": st.Pid, "config_path": st.ConfigPath, "started_at_ms": st.StartedAtMS})
}
func (s *Server) handleMediaStop(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, true) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
if s.proc == nil || !s.proc.Enabled() {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
}
st, err := s.proc.Stop()
if err != nil {
s.recordAudit(r, "media.stop", false, err.Error())
if errors.Is(err, procctl.ErrNotSupported) {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
}
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
s.recordAudit(r, "media.stop", true, "")
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "running": st.Running, "pid": st.Pid, "config_path": st.ConfigPath, "started_at_ms": st.StartedAtMS})
}
func (s *Server) handleMediaStatus(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, false) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
if s.proc == nil || !s.proc.Enabled() {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
}
st, err := s.proc.Status()
if err != nil {
if errors.Is(err, procctl.ErrNotSupported) {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
}
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
ver, verr := s.proc.Version()
if verr != nil {
errorJSON(w, http.StatusInternalServerError, "internal error: "+verr.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{
"ok": true,
"running": st.Running,
"pid": st.Pid,
"config_path": st.ConfigPath,
"started_at_ms": st.StartedAtMS,
"version": ver,
})
}
func (s *Server) handleMediaBinaryUpdate(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPut {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, true) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
if s.proc == nil || !s.proc.Enabled() {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
}
if mt, _, err := mime.ParseMediaType(r.Header.Get("Content-Type")); err != nil || mt != "application/octet-stream" {
errorJSON(w, http.StatusBadRequest, "validation failed: Content-Type must be application/octet-stream")
return
}
if r.ContentLength <= 0 {
errorJSON(w, http.StatusBadRequest, "validation failed: missing Content-Length")
return
}
maxBytes := int64(s.agentCfg.MaxUploadMB) * 1024 * 1024
r.Body = http.MaxBytesReader(w, r.Body, maxBytes)
expected := strings.TrimSpace(r.Header.Get("X-Binary-Sha256"))
if expected != "" {
if len(expected) != 64 {
errorJSON(w, http.StatusBadRequest, "validation failed: invalid X-Binary-Sha256")
return
}
if _, err := hex.DecodeString(expected); err != nil {
errorJSON(w, http.StatusBadRequest, "validation failed: invalid X-Binary-Sha256")
return
}
}
task := s.tasks.Start("media_binary_update")
res, err := s.proc.UpdateBinary(r.Body, r.ContentLength, expected)
if err != nil {
_, _ = s.tasks.Finish(task.ID, nil, err)
w.Header().Set("X-Task-Id", task.ID)
s.recordAudit(r, "media.binary.update", false, err.Error())
if errors.Is(err, procctl.ErrConflict) {
errorJSON(w, http.StatusConflict, err.Error())
return
}
if strings.Contains(err.Error(), "payload too large") || strings.Contains(err.Error(), "request body too large") {
errorJSON(w, http.StatusRequestEntityTooLarge, "payload too large")
return
}
if strings.Contains(err.Error(), "sha256 mismatch") {
errorJSON(w, http.StatusBadRequest, "validation failed: sha256 mismatch")
return
}
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
_, _ = s.tasks.Finish(task.ID, res, nil)
s.recordAudit(r, "media.binary.update", true, res.Path)
writeJSON(w, http.StatusOK, map[string]any{
"ok": true,
"path": res.Path,
"sha256": res.Sha256,
"size": res.Size,
"mtime_ms": res.MtimeMS,
"backup_path": res.BackupPath,
"task_id": task.ID,
})
}
type mediaBinaryRollbackReq struct {
BackupPath string `json:"backup_path"`
}
func (s *Server) handleMediaBinaryRollback(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, true) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
if s.proc == nil || !s.proc.Enabled() {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
}
req, err := readRequiredJSON[mediaBinaryRollbackReq](w, r, 1<<20)
if err != nil {
errorJSON(w, http.StatusBadRequest, err.Error())
return
}
task := s.tasks.Start("media_binary_rollback")
res, err := s.proc.RollbackBinary(req.BackupPath)
if err != nil {
_, _ = s.tasks.Finish(task.ID, nil, err)
w.Header().Set("X-Task-Id", task.ID)
s.recordAudit(r, "media.binary.rollback", false, err.Error())
if errors.Is(err, procctl.ErrConflict) {
errorJSON(w, http.StatusConflict, err.Error())
return
}
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
_, _ = s.tasks.Finish(task.ID, res, nil)
s.recordAudit(r, "media.binary.rollback", true, res.Path)
writeJSON(w, http.StatusOK, map[string]any{
"ok": true,
"path": res.Path,
"sha256": res.Sha256,
"size": res.Size,
"mtime_ms": res.MtimeMS,
"backup_path": res.BackupPath,
"task_id": task.ID,
})
}
func normalizeConfigName(name string) (string, error) {
if name == "" || strings.Contains(name, "/") || strings.Contains(name, "\\") || strings.Contains(name, "..") {
return "", errors.New("invalid name")
}
if !configNameRE.MatchString(name) {
return "", errors.New("invalid name")
}
if !strings.HasSuffix(strings.ToLower(name), ".json") {
name += ".json"
}
return name, nil
}
func (s *Server) resolveConfigsDir() (string, error) {
base := strings.TrimSpace(s.agentCfg.MediaServerProcess.ConfigsDir)
if base == "" {
return "", errors.New("configs_dir is empty")
}
if filepath.IsAbs(base) {
return base, nil
}
if s.baseDir == "" {
return filepath.Clean(base), nil
}
return filepath.Join(s.baseDir, base), nil
}
func readOptionalJSON[T any](w http.ResponseWriter, r *http.Request, maxBytes int64) (T, error) {
var zero T
if r.Body == nil {
return zero, nil
}
r.Body = http.MaxBytesReader(w, r.Body, maxBytes)
body, err := io.ReadAll(r.Body)
if err != nil {
if strings.Contains(err.Error(), "request body too large") {
return zero, errors.New("payload too large")
}
return zero, fmt.Errorf("invalid json: %v", err)
}
if len(strings.TrimSpace(string(body))) == 0 {
return zero, nil
}
if mt, _, err := mime.ParseMediaType(r.Header.Get("Content-Type")); err != nil || mt != "application/json" {
return zero, errors.New("validation failed: Content-Type must be application/json")
}
var v T
if err := json.Unmarshal(body, &v); err != nil {
return zero, fmt.Errorf("invalid json: %v", err)
}
return v, nil
}
func (s *Server) handleGraphs(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, false) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
st, b, err := s.ms.GetGraphs(r.Context())
if err != nil {
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
writeRawJSON(w, st, b)
}
func (s *Server) handleGraphDetail(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, false) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
name := strings.TrimPrefix(r.URL.Path, "/v1/graphs/")
if name == "" {
errorJSON(w, http.StatusNotFound, "not found")
return
}
st, b, err := s.ms.GetGraph(r.Context(), name)
if err != nil {
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
writeRawJSON(w, st, b)
}
func (s *Server) handleLogsRecent(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, false) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
limit := 200
if v := strings.TrimSpace(r.URL.Query().Get("limit")); v != "" {
if n, err := strconv.Atoi(v); err == nil && n > 0 {
limit = n
}
}
st, b, err := s.ms.GetLogsRecent(r.Context(), limit)
if err != nil {
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
writeRawJSON(w, st, b)
}
func (s *Server) authorize(r *http.Request, write bool) bool {
need := write || s.agentCfg.RequireTokenForRead
if !need {
return true
}
tok := r.Header.Get("X-RK-Token")
return tok != "" && tok == s.agentCfg.Token
}
func writeJSON(w http.ResponseWriter, status int, v any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
_ = json.NewEncoder(w).Encode(v)
}
func writeRawJSON(w http.ResponseWriter, status int, raw []byte) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
_, _ = w.Write(raw)
}
func errorJSON(w http.ResponseWriter, status int, msg string) {
writeJSON(w, status, map[string]any{"error": msg})
}