diff --git a/API_Device_RemoteMgmt_InterfaceTable.md b/API_Device_RemoteMgmt_InterfaceTable.md index f81eb91..81624fb 100644 --- a/API_Device_RemoteMgmt_InterfaceTable.md +++ b/API_Device_RemoteMgmt_InterfaceTable.md @@ -236,13 +236,78 @@ Response 200:`{"ok":true}` 失败:500 + `{"error":"..."}` -## 6. 只读代理接口(agent 对外,推荐管理端统一走 agent) +## 6. 主程序进程控制(agent 对外) -### 6.1 `GET /v1/graphs` +> 说明:该能力用于“启动/重启/关闭主程序(media-server)并选择加载哪个配置文件”。 +> +> agent 启动 media-server 时会显式传入:`--config `,因此不依赖 media-server 内部默认配置。 + +### 6.1 `POST /v1/media-server/start` +用途:启动本机 media-server(若已运行则幂等返回 ok;若已运行但 config 不同则 409)。 + +**Auth**:必须(401) + +Body(可选,JSON): +```json +{"config":"cam1"} +``` + +`config` 解析规则: +- 为空/缺省:使用 `agent.config_path` +- 非空:只允许文件名/配置名(禁止包含 `/`、`\\`、`..`);若不带扩展名自动补 `.json`;最终从 `agent.media_server_process.configs_dir` 下解析为 `/.json` + +Response 200: +```json +{"ok":true,"running":true,"pid":1234,"config_path":"/etc/rk3588sys/config.json"} +``` + +失败: +- 400:config 不合法 / config 文件不存在 +- 409:已运行但 config 不同(提示使用 restart) +- 501:未启用进程控制(agent 配置 `agent.media_server_process.enable=false` 或未配置) +- 500:启动失败 / 写 pidfile 失败 + +### 6.2 `POST /v1/media-server/restart` +用途:重启本机 media-server(可切换 config)。 + +**Auth**:必须(401) + +Body(可选,JSON): +```json +{"config":"cam1"} +``` + +Response 200: +```json +{"ok":true,"running":true,"pid":1234,"config_path":"/home/orangepi/Desktop/OrangePi3588Media/configs/cam1.json"} +``` + +失败: +- 400:config 不合法 / config 文件不存在 +- 501:未启用进程控制 +- 500:停止/启动失败 + +### 6.3 `POST /v1/media-server/stop` +用途:停止本机 media-server(未运行也返回 ok)。 + +**Auth**:必须(401) + +Response 200: +```json +{"ok":true,"running":false,"pid":1234,"config_path":"/etc/rk3588sys/config.json"} +``` + +失败: +- 501:未启用进程控制 +- 500:停止失败 + +## 7. 只读代理接口(agent 对外,推荐管理端统一走 agent) + +### 7.1 `GET /v1/graphs` 代理 media-server:`GET /api/graphs` -### 6.2 `GET /v1/graphs/{name}` +### 7.2 `GET /v1/graphs/{name}` 代理 media-server:`GET /api/graphs/{name}` -### 6.3 `GET /v1/logs/recent?limit=200` +### 7.3 `GET /v1/logs/recent?limit=200` 代理 media-server:`GET /api/logs/recent?limit=...` diff --git a/agent/agent_cam1.config.json b/agent/agent_cam1.config.json index 826628f..9ce3fab 100644 --- a/agent/agent_cam1.config.json +++ b/agent/agent_cam1.config.json @@ -14,6 +14,14 @@ "max_upload_mb": 200, "config_path": "./test_cam1_strict_minio_alarm_rtsp_server.json", + "media_server_process": { + "enable": true, + "exec_path": "/home/orangepi/Desktop/OrangePi3588Media/build/media-server", + "work_dir": "/home/orangepi/Desktop/OrangePi3588Media", + "configs_dir": "/home/orangepi/Desktop/OrangePi3588Media/configs", + "pid_file": "/var/run/rk3588sys-media-server.pid", + "graceful_timeout_ms": 5000 + }, "media_server_base_url": "http://127.0.0.1:9000", "media_server_timeout_ms": 3000, "media_server_retry": { "max_attempts": 3, "backoff_ms": [200, 500] } diff --git a/agent/cmd/rk3588-agent/main.go b/agent/cmd/rk3588-agent/main.go index 6033c12..c6c2c86 100644 --- a/agent/cmd/rk3588-agent/main.go +++ b/agent/cmd/rk3588-agent/main.go @@ -2,8 +2,8 @@ package main import ( "context" - "fmt" "flag" + "fmt" "net" "net/http" "net/url" diff --git a/agent/internal/config/config.go b/agent/internal/config/config.go index 8933889..34ca814 100644 --- a/agent/internal/config/config.go +++ b/agent/internal/config/config.go @@ -15,19 +15,29 @@ type Config struct { } type AgentConfig struct { - Listen string `json:"listen"` - Token string `json:"token"` - RequireTokenForRead bool `json:"require_token_for_read"` - DiscoveryEnable bool `json:"discovery_enable"` - DiscoveryPort int `json:"discovery_port"` - DeviceName string `json:"device_name"` - DeviceIDPath string `json:"device_id_path"` - ModelsDir string `json:"models_dir"` - MaxUploadMB int `json:"max_upload_mb"` - ConfigPath string `json:"config_path"` - MediaServerBaseURL string `json:"media_server_base_url"` - MediaServerTimeout int `json:"media_server_timeout_ms"` - MediaServerRetry RetryConfig `json:"media_server_retry"` + Listen string `json:"listen"` + Token string `json:"token"` + RequireTokenForRead bool `json:"require_token_for_read"` + DiscoveryEnable bool `json:"discovery_enable"` + DiscoveryPort int `json:"discovery_port"` + DeviceName string `json:"device_name"` + DeviceIDPath string `json:"device_id_path"` + ModelsDir string `json:"models_dir"` + MaxUploadMB int `json:"max_upload_mb"` + ConfigPath string `json:"config_path"` + MediaServerProcess MediaServerProcessConfig `json:"media_server_process"` + MediaServerBaseURL string `json:"media_server_base_url"` + MediaServerTimeout int `json:"media_server_timeout_ms"` + MediaServerRetry RetryConfig `json:"media_server_retry"` +} + +type MediaServerProcessConfig struct { + Enable bool `json:"enable"` + ExecPath string `json:"exec_path"` + WorkDir string `json:"work_dir"` + ConfigsDir string `json:"configs_dir"` + PidFile string `json:"pid_file"` + GracefulTimeoutMS int `json:"graceful_timeout_ms"` } type RetryConfig struct { @@ -46,8 +56,13 @@ func Default() Config { ModelsDir: "/opt/rk3588sys/models", MaxUploadMB: 200, ConfigPath: "/etc/rk3588sys/config.json", - MediaServerBaseURL: "http://127.0.0.1:9000", - MediaServerTimeout: 3000, + MediaServerProcess: MediaServerProcessConfig{ + Enable: false, + PidFile: "/var/run/rk3588sys-media-server.pid", + GracefulTimeoutMS: 5000, + }, + MediaServerBaseURL: "http://127.0.0.1:9000", + MediaServerTimeout: 3000, MediaServerRetry: RetryConfig{ MaxAttempts: 3, BackoffMS: []int{200, 500}, @@ -94,6 +109,24 @@ func (c Config) Validate() error { if strings.TrimSpace(a.ConfigPath) == "" { return errors.New("agent.config_path is required") } + if a.MediaServerProcess.Enable { + p := a.MediaServerProcess + if strings.TrimSpace(p.ExecPath) == "" { + return errors.New("agent.media_server_process.exec_path is required") + } + if strings.TrimSpace(p.WorkDir) == "" { + return errors.New("agent.media_server_process.work_dir is required") + } + if strings.TrimSpace(p.ConfigsDir) == "" { + return errors.New("agent.media_server_process.configs_dir is required") + } + if strings.TrimSpace(p.PidFile) == "" { + return errors.New("agent.media_server_process.pid_file is required") + } + if p.GracefulTimeoutMS <= 0 { + return fmt.Errorf("agent.media_server_process.graceful_timeout_ms invalid: %d", p.GracefulTimeoutMS) + } + } if strings.TrimSpace(a.DeviceIDPath) == "" { return errors.New("agent.device_id_path is required") } diff --git a/agent/internal/httpapi/server.go b/agent/internal/httpapi/server.go index 877a5a5..7911726 100644 --- a/agent/internal/httpapi/server.go +++ b/agent/internal/httpapi/server.go @@ -18,6 +18,7 @@ import ( "rk3588sys/agent/internal/files" "rk3588sys/agent/internal/mediaserver" "rk3588sys/agent/internal/modelstore" + "rk3588sys/agent/internal/procctl" "rk3588sys/agent/internal/sysinfo" ) @@ -25,6 +26,7 @@ type Server struct { agentCfg config.AgentConfig ms *mediaserver.Client store *modelstore.Store + proc *procctl.Controller deviceID string hostname string agentPort int @@ -48,10 +50,15 @@ type InfoResponse struct { } func New(agentCfg config.AgentConfig, ms *mediaserver.Client, store *modelstore.Store, deviceID string, agentPort int, mediaPort int, version, gitSHA string) http.Handler { + var pc *procctl.Controller + if agentCfg.MediaServerProcess.Enable { + pc = procctl.New(agentCfg) + } s := &Server{ agentCfg: agentCfg, ms: ms, store: store, + proc: pc, deviceID: deviceID, hostname: sysinfo.Hostname(), agentPort: agentPort, @@ -67,6 +74,9 @@ func New(agentCfg config.AgentConfig, ms *mediaserver.Client, store *modelstore. mux.HandleFunc("/v1/models/", s.handleModelUpload) mux.HandleFunc("/v1/media-server/reload", s.handleMediaReload) mux.HandleFunc("/v1/media-server/rollback", s.handleMediaRollback) + mux.HandleFunc("/v1/media-server/start", s.handleMediaStart) + mux.HandleFunc("/v1/media-server/restart", s.handleMediaRestart) + mux.HandleFunc("/v1/media-server/stop", s.handleMediaStop) mux.HandleFunc("/v1/graphs", s.handleGraphs) mux.HandleFunc("/v1/graphs/", s.handleGraphDetail) mux.HandleFunc("/v1/logs/recent", s.handleLogsRecent) @@ -269,6 +279,140 @@ func (s *Server) handleMediaRollback(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, map[string]any{"ok": true}) } +type mediaProcReq struct { + Config string `json:"config"` +} + +func (s *Server) handleMediaStart(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + errorJSON(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + if !s.authorize(r, true) { + errorJSON(w, http.StatusUnauthorized, "unauthorized") + return + } + if s.proc == nil || !s.proc.Enabled() { + errorJSON(w, http.StatusNotImplemented, "not supported") + return + } + + req, err := readOptionalJSON[mediaProcReq](w, r, 1<<20) + if err != nil { + errorJSON(w, http.StatusBadRequest, err.Error()) + return + } + + st, err := s.proc.Start(req.Config) + if err != nil { + if errors.Is(err, procctl.ErrConflict) { + errorJSON(w, http.StatusConflict, err.Error()) + return + } + if errors.Is(err, procctl.ErrInvalidConfig) || errors.Is(err, procctl.ErrConfigNotFound) { + errorJSON(w, http.StatusBadRequest, "validation failed: "+err.Error()) + return + } + if errors.Is(err, procctl.ErrNotSupported) { + errorJSON(w, http.StatusNotImplemented, "not supported") + return + } + errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error()) + return + } + writeJSON(w, http.StatusOK, map[string]any{"ok": true, "running": st.Running, "pid": st.Pid, "config_path": st.ConfigPath}) +} + +func (s *Server) handleMediaRestart(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + errorJSON(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + if !s.authorize(r, true) { + errorJSON(w, http.StatusUnauthorized, "unauthorized") + return + } + if s.proc == nil || !s.proc.Enabled() { + errorJSON(w, http.StatusNotImplemented, "not supported") + return + } + + req, err := readOptionalJSON[mediaProcReq](w, r, 1<<20) + if err != nil { + errorJSON(w, http.StatusBadRequest, err.Error()) + return + } + + st, err := s.proc.Restart(req.Config) + if err != nil { + if errors.Is(err, procctl.ErrInvalidConfig) || errors.Is(err, procctl.ErrConfigNotFound) { + errorJSON(w, http.StatusBadRequest, "validation failed: "+err.Error()) + return + } + if errors.Is(err, procctl.ErrNotSupported) { + errorJSON(w, http.StatusNotImplemented, "not supported") + return + } + errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error()) + return + } + writeJSON(w, http.StatusOK, map[string]any{"ok": true, "running": st.Running, "pid": st.Pid, "config_path": st.ConfigPath}) +} + +func (s *Server) handleMediaStop(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + errorJSON(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + if !s.authorize(r, true) { + errorJSON(w, http.StatusUnauthorized, "unauthorized") + return + } + if s.proc == nil || !s.proc.Enabled() { + errorJSON(w, http.StatusNotImplemented, "not supported") + return + } + st, err := s.proc.Stop() + if err != nil { + if errors.Is(err, procctl.ErrNotSupported) { + errorJSON(w, http.StatusNotImplemented, "not supported") + return + } + errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error()) + return + } + writeJSON(w, http.StatusOK, map[string]any{"ok": true, "running": st.Running, "pid": st.Pid, "config_path": st.ConfigPath}) +} + +func readOptionalJSON[T any](w http.ResponseWriter, r *http.Request, maxBytes int64) (T, error) { + var zero T + if r.Body == nil { + return zero, nil + } + + r.Body = http.MaxBytesReader(w, r.Body, maxBytes) + body, err := io.ReadAll(r.Body) + if err != nil { + if strings.Contains(err.Error(), "request body too large") { + return zero, errors.New("payload too large") + } + return zero, fmt.Errorf("invalid json: %v", err) + } + if len(strings.TrimSpace(string(body))) == 0 { + return zero, nil + } + + if mt, _, err := mime.ParseMediaType(r.Header.Get("Content-Type")); err != nil || mt != "application/json" { + return zero, errors.New("validation failed: Content-Type must be application/json") + } + + var v T + if err := json.Unmarshal(body, &v); err != nil { + return zero, fmt.Errorf("invalid json: %v", err) + } + return v, nil +} + func (s *Server) handleGraphs(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { errorJSON(w, http.StatusMethodNotAllowed, "method not allowed") diff --git a/agent/internal/modelstore/modelstore.go b/agent/internal/modelstore/modelstore.go index a358397..96305da 100644 --- a/agent/internal/modelstore/modelstore.go +++ b/agent/internal/modelstore/modelstore.go @@ -105,10 +105,10 @@ func (s *Store) Upload(name string, r io.Reader, contentLength int64, expectedSh } item := Item{ - Name: name, - Sha256: sha, - Path: filepath.ToSlash(finalPath), - Size: stat.Size(), + Name: name, + Sha256: sha, + Path: filepath.ToSlash(finalPath), + Size: stat.Size(), MtimeMS: stat.ModTime().UnixMilli(), } if err := s.upsertManifest(item); err != nil { diff --git a/agent/internal/procctl/procctl.go b/agent/internal/procctl/procctl.go new file mode 100644 index 0000000..8b1e7af --- /dev/null +++ b/agent/internal/procctl/procctl.go @@ -0,0 +1,162 @@ +package procctl + +import ( + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "rk3588sys/agent/internal/config" + "rk3588sys/agent/internal/files" +) + +var ErrNotSupported = errors.New("not supported") +var ErrConflict = errors.New("conflict") +var ErrInvalidConfig = errors.New("invalid config") +var ErrConfigNotFound = errors.New("config not found") + +type Status struct { + Running bool `json:"running"` + Pid int `json:"pid"` + ConfigPath string `json:"config_path"` +} + +type pidFile struct { + Pid int `json:"pid"` + ConfigPath string `json:"config_path"` + StartedAtMS int64 `json:"started_at_ms"` +} + +type Controller struct { + mu sync.Mutex + proc config.MediaServerProcessConfig + defCfg string +} + +func New(agentCfg config.AgentConfig) *Controller { + return &Controller{ + proc: agentCfg.MediaServerProcess, + defCfg: agentCfg.ConfigPath, + } +} + +func (c *Controller) Enabled() bool { return c != nil && c.proc.Enable } + +func (c *Controller) Start(configName string) (Status, error) { + c.mu.Lock() + defer c.mu.Unlock() + + resolved, err := c.resolveConfigPath(configName) + if err != nil { + return Status{}, err + } + + pf, _ := c.readPidFile() + if pf != nil { + alive, _ := isAlive(pf.Pid) + if alive { + if filepath.Clean(pf.ConfigPath) == filepath.Clean(resolved) { + return Status{Running: true, Pid: pf.Pid, ConfigPath: pf.ConfigPath}, nil + } + return Status{}, fmt.Errorf("%w: already running with config %s", ErrConflict, pf.ConfigPath) + } + _ = os.Remove(c.proc.PidFile) + } + + pid, err := startProcess(c.proc.ExecPath, c.proc.WorkDir, resolved) + if err != nil { + return Status{}, err + } + + pf2 := pidFile{Pid: pid, ConfigPath: resolved, StartedAtMS: time.Now().UnixMilli()} + b, _ := json.Marshal(pf2) + b = append(b, '\n') + if err := files.WriteFileAtomic(c.proc.PidFile, b, 0o644); err != nil { + return Status{}, fmt.Errorf("write pid file: %w", err) + } + return Status{Running: true, Pid: pid, ConfigPath: resolved}, nil +} + +func (c *Controller) Stop() (Status, error) { + c.mu.Lock() + defer c.mu.Unlock() + + pf, err := c.readPidFile() + if err != nil { + return Status{}, err + } + if pf == nil { + return Status{Running: false}, nil + } + + alive, _ := isAlive(pf.Pid) + if !alive { + _ = os.Remove(c.proc.PidFile) + return Status{Running: false, Pid: pf.Pid, ConfigPath: pf.ConfigPath}, nil + } + + if err := stopProcess(pf.Pid, time.Duration(c.proc.GracefulTimeoutMS)*time.Millisecond); err != nil { + return Status{}, err + } + _ = os.Remove(c.proc.PidFile) + return Status{Running: false, Pid: pf.Pid, ConfigPath: pf.ConfigPath}, nil +} + +func (c *Controller) Restart(configName string) (Status, error) { + _, _ = c.Stop() + return c.Start(configName) +} + +func (c *Controller) resolveConfigPath(name string) (string, error) { + n := strings.TrimSpace(name) + if n == "" { + if strings.TrimSpace(c.defCfg) == "" { + return "", fmt.Errorf("%w: default config_path is empty", ErrInvalidConfig) + } + return c.defCfg, nil + } + if strings.Contains(n, "..") || strings.ContainsAny(n, "/\\") { + return "", fmt.Errorf("%w: contains invalid characters", ErrInvalidConfig) + } + if !strings.HasSuffix(n, ".json") { + n += ".json" + } + base := strings.TrimSpace(c.proc.ConfigsDir) + if base == "" { + return "", fmt.Errorf("%w: configs_dir is empty", ErrInvalidConfig) + } + p := filepath.Join(base, n) + st, err := os.Stat(p) + if err != nil { + if os.IsNotExist(err) { + return "", fmt.Errorf("%w: %s", ErrConfigNotFound, p) + } + return "", fmt.Errorf("stat config: %w", err) + } + if st.IsDir() { + return "", fmt.Errorf("%w: is a directory", ErrInvalidConfig) + } + return p, nil +} + +func (c *Controller) readPidFile() (*pidFile, error) { + b, err := os.ReadFile(c.proc.PidFile) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + return nil, fmt.Errorf("read pid file: %w", err) + } + var pf pidFile + if err := json.Unmarshal(b, &pf); err != nil { + return nil, fmt.Errorf("parse pid file: %w", err) + } + if pf.Pid <= 0 { + return nil, fmt.Errorf("pid file invalid pid: %d", pf.Pid) + } + return &pf, nil +} diff --git a/agent/internal/procctl/procctl_linux.go b/agent/internal/procctl/procctl_linux.go new file mode 100644 index 0000000..a1409c8 --- /dev/null +++ b/agent/internal/procctl/procctl_linux.go @@ -0,0 +1,79 @@ +//go:build linux + +package procctl + +import ( + "fmt" + "os" + "os/exec" + "syscall" + "time" + + "rk3588sys/agent/internal/log" +) + +func isAlive(pid int) (bool, error) { + if pid <= 0 { + return false, nil + } + p, err := os.FindProcess(pid) + if err != nil { + return false, err + } + if err := p.Signal(syscall.Signal(0)); err != nil { + return false, nil + } + return true, nil +} + +func startProcess(execPath, workDir, configPath string) (int, error) { + cmd := exec.Command(execPath, "--config", configPath) + cmd.Dir = workDir + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + if err := cmd.Start(); err != nil { + return 0, fmt.Errorf("start media-server: %w", err) + } + pid := cmd.Process.Pid + go func() { + err := cmd.Wait() + if err != nil { + log.Warn(fmt.Sprintf("media-server exited pid=%d err=%v", pid, err)) + } else { + log.Info(fmt.Sprintf("media-server exited pid=%d", pid)) + } + }() + return pid, nil +} + +func stopProcess(pid int, timeout time.Duration) error { + if pid <= 0 { + return nil + } + p, err := os.FindProcess(pid) + if err != nil { + return err + } + _ = p.Signal(syscall.SIGTERM) + + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + alive, _ := isAlive(pid) + if !alive { + return nil + } + time.Sleep(100 * time.Millisecond) + } + + _ = p.Signal(syscall.SIGKILL) + deadline = time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + alive, _ := isAlive(pid) + if !alive { + return nil + } + time.Sleep(100 * time.Millisecond) + } + return fmt.Errorf("failed to stop pid=%d", pid) +} diff --git a/agent/internal/procctl/procctl_other.go b/agent/internal/procctl/procctl_other.go new file mode 100644 index 0000000..317d5dd --- /dev/null +++ b/agent/internal/procctl/procctl_other.go @@ -0,0 +1,11 @@ +//go:build !linux + +package procctl + +import "time" + +func isAlive(pid int) (bool, error) { return false, ErrNotSupported } + +func startProcess(execPath, workDir, configPath string) (int, error) { return 0, ErrNotSupported } + +func stopProcess(pid int, timeout time.Duration) error { return ErrNotSupported }