修改agent功能,加入重启,准备测试
Some checks are pending
CI / host-build (push) Waiting to run
CI / rk3588-cross-build (push) Waiting to run

This commit is contained in:
sladro 2026-01-10 11:06:39 +08:00
parent 5f69321b40
commit 2a562514bc
9 changed files with 526 additions and 24 deletions

View File

@ -236,13 +236,78 @@ Response 200`{"ok":true}`
失败500 + `{"error":"..."}`
## 6. 只读代理接口agent 对外,推荐管理端统一走 agent
## 6. 主程序进程控制agent 对外
### 6.1 `GET /v1/graphs`
> 说明:该能力用于“启动/重启/关闭主程序media-server并选择加载哪个配置文件”。
>
> agent 启动 media-server 时会显式传入:`--config <resolved_config_path>`,因此不依赖 media-server 内部默认配置。
### 6.1 `POST /v1/media-server/start`
用途:启动本机 media-server若已运行则幂等返回 ok若已运行但 config 不同则 409
**Auth**必须401
Body可选JSON
```json
{"config":"cam1"}
```
`config` 解析规则:
- 为空/缺省:使用 `agent.config_path`
- 非空:只允许文件名/配置名(禁止包含 `/`、`\\`、`..`);若不带扩展名自动补 `.json`;最终从 `agent.media_server_process.configs_dir` 下解析为 `<configs_dir>/<config>.json`
Response 200
```json
{"ok":true,"running":true,"pid":1234,"config_path":"/etc/rk3588sys/config.json"}
```
失败:
- 400config 不合法 / config 文件不存在
- 409已运行但 config 不同(提示使用 restart
- 501未启用进程控制agent 配置 `agent.media_server_process.enable=false` 或未配置)
- 500启动失败 / 写 pidfile 失败
### 6.2 `POST /v1/media-server/restart`
用途:重启本机 media-server可切换 config
**Auth**必须401
Body可选JSON
```json
{"config":"cam1"}
```
Response 200
```json
{"ok":true,"running":true,"pid":1234,"config_path":"/home/orangepi/Desktop/OrangePi3588Media/configs/cam1.json"}
```
失败:
- 400config 不合法 / config 文件不存在
- 501未启用进程控制
- 500停止/启动失败
### 6.3 `POST /v1/media-server/stop`
用途:停止本机 media-server未运行也返回 ok
**Auth**必须401
Response 200
```json
{"ok":true,"running":false,"pid":1234,"config_path":"/etc/rk3588sys/config.json"}
```
失败:
- 501未启用进程控制
- 500停止失败
## 7. 只读代理接口agent 对外,推荐管理端统一走 agent
### 7.1 `GET /v1/graphs`
代理 media-server`GET /api/graphs`
### 6.2 `GET /v1/graphs/{name}`
### 7.2 `GET /v1/graphs/{name}`
代理 media-server`GET /api/graphs/{name}`
### 6.3 `GET /v1/logs/recent?limit=200`
### 7.3 `GET /v1/logs/recent?limit=200`
代理 media-server`GET /api/logs/recent?limit=...`

View File

@ -14,6 +14,14 @@
"max_upload_mb": 200,
"config_path": "./test_cam1_strict_minio_alarm_rtsp_server.json",
"media_server_process": {
"enable": true,
"exec_path": "/home/orangepi/Desktop/OrangePi3588Media/build/media-server",
"work_dir": "/home/orangepi/Desktop/OrangePi3588Media",
"configs_dir": "/home/orangepi/Desktop/OrangePi3588Media/configs",
"pid_file": "/var/run/rk3588sys-media-server.pid",
"graceful_timeout_ms": 5000
},
"media_server_base_url": "http://127.0.0.1:9000",
"media_server_timeout_ms": 3000,
"media_server_retry": { "max_attempts": 3, "backoff_ms": [200, 500] }

View File

@ -2,8 +2,8 @@ package main
import (
"context"
"fmt"
"flag"
"fmt"
"net"
"net/http"
"net/url"

View File

@ -15,19 +15,29 @@ type Config struct {
}
type AgentConfig struct {
Listen string `json:"listen"`
Token string `json:"token"`
RequireTokenForRead bool `json:"require_token_for_read"`
DiscoveryEnable bool `json:"discovery_enable"`
DiscoveryPort int `json:"discovery_port"`
DeviceName string `json:"device_name"`
DeviceIDPath string `json:"device_id_path"`
ModelsDir string `json:"models_dir"`
MaxUploadMB int `json:"max_upload_mb"`
ConfigPath string `json:"config_path"`
MediaServerBaseURL string `json:"media_server_base_url"`
MediaServerTimeout int `json:"media_server_timeout_ms"`
MediaServerRetry RetryConfig `json:"media_server_retry"`
Listen string `json:"listen"`
Token string `json:"token"`
RequireTokenForRead bool `json:"require_token_for_read"`
DiscoveryEnable bool `json:"discovery_enable"`
DiscoveryPort int `json:"discovery_port"`
DeviceName string `json:"device_name"`
DeviceIDPath string `json:"device_id_path"`
ModelsDir string `json:"models_dir"`
MaxUploadMB int `json:"max_upload_mb"`
ConfigPath string `json:"config_path"`
MediaServerProcess MediaServerProcessConfig `json:"media_server_process"`
MediaServerBaseURL string `json:"media_server_base_url"`
MediaServerTimeout int `json:"media_server_timeout_ms"`
MediaServerRetry RetryConfig `json:"media_server_retry"`
}
type MediaServerProcessConfig struct {
Enable bool `json:"enable"`
ExecPath string `json:"exec_path"`
WorkDir string `json:"work_dir"`
ConfigsDir string `json:"configs_dir"`
PidFile string `json:"pid_file"`
GracefulTimeoutMS int `json:"graceful_timeout_ms"`
}
type RetryConfig struct {
@ -46,8 +56,13 @@ func Default() Config {
ModelsDir: "/opt/rk3588sys/models",
MaxUploadMB: 200,
ConfigPath: "/etc/rk3588sys/config.json",
MediaServerBaseURL: "http://127.0.0.1:9000",
MediaServerTimeout: 3000,
MediaServerProcess: MediaServerProcessConfig{
Enable: false,
PidFile: "/var/run/rk3588sys-media-server.pid",
GracefulTimeoutMS: 5000,
},
MediaServerBaseURL: "http://127.0.0.1:9000",
MediaServerTimeout: 3000,
MediaServerRetry: RetryConfig{
MaxAttempts: 3,
BackoffMS: []int{200, 500},
@ -94,6 +109,24 @@ func (c Config) Validate() error {
if strings.TrimSpace(a.ConfigPath) == "" {
return errors.New("agent.config_path is required")
}
if a.MediaServerProcess.Enable {
p := a.MediaServerProcess
if strings.TrimSpace(p.ExecPath) == "" {
return errors.New("agent.media_server_process.exec_path is required")
}
if strings.TrimSpace(p.WorkDir) == "" {
return errors.New("agent.media_server_process.work_dir is required")
}
if strings.TrimSpace(p.ConfigsDir) == "" {
return errors.New("agent.media_server_process.configs_dir is required")
}
if strings.TrimSpace(p.PidFile) == "" {
return errors.New("agent.media_server_process.pid_file is required")
}
if p.GracefulTimeoutMS <= 0 {
return fmt.Errorf("agent.media_server_process.graceful_timeout_ms invalid: %d", p.GracefulTimeoutMS)
}
}
if strings.TrimSpace(a.DeviceIDPath) == "" {
return errors.New("agent.device_id_path is required")
}

View File

@ -18,6 +18,7 @@ import (
"rk3588sys/agent/internal/files"
"rk3588sys/agent/internal/mediaserver"
"rk3588sys/agent/internal/modelstore"
"rk3588sys/agent/internal/procctl"
"rk3588sys/agent/internal/sysinfo"
)
@ -25,6 +26,7 @@ type Server struct {
agentCfg config.AgentConfig
ms *mediaserver.Client
store *modelstore.Store
proc *procctl.Controller
deviceID string
hostname string
agentPort int
@ -48,10 +50,15 @@ type InfoResponse struct {
}
func New(agentCfg config.AgentConfig, ms *mediaserver.Client, store *modelstore.Store, deviceID string, agentPort int, mediaPort int, version, gitSHA string) http.Handler {
var pc *procctl.Controller
if agentCfg.MediaServerProcess.Enable {
pc = procctl.New(agentCfg)
}
s := &Server{
agentCfg: agentCfg,
ms: ms,
store: store,
proc: pc,
deviceID: deviceID,
hostname: sysinfo.Hostname(),
agentPort: agentPort,
@ -67,6 +74,9 @@ func New(agentCfg config.AgentConfig, ms *mediaserver.Client, store *modelstore.
mux.HandleFunc("/v1/models/", s.handleModelUpload)
mux.HandleFunc("/v1/media-server/reload", s.handleMediaReload)
mux.HandleFunc("/v1/media-server/rollback", s.handleMediaRollback)
mux.HandleFunc("/v1/media-server/start", s.handleMediaStart)
mux.HandleFunc("/v1/media-server/restart", s.handleMediaRestart)
mux.HandleFunc("/v1/media-server/stop", s.handleMediaStop)
mux.HandleFunc("/v1/graphs", s.handleGraphs)
mux.HandleFunc("/v1/graphs/", s.handleGraphDetail)
mux.HandleFunc("/v1/logs/recent", s.handleLogsRecent)
@ -269,6 +279,140 @@ func (s *Server) handleMediaRollback(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, map[string]any{"ok": true})
}
type mediaProcReq struct {
Config string `json:"config"`
}
func (s *Server) handleMediaStart(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, true) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
if s.proc == nil || !s.proc.Enabled() {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
}
req, err := readOptionalJSON[mediaProcReq](w, r, 1<<20)
if err != nil {
errorJSON(w, http.StatusBadRequest, err.Error())
return
}
st, err := s.proc.Start(req.Config)
if err != nil {
if errors.Is(err, procctl.ErrConflict) {
errorJSON(w, http.StatusConflict, err.Error())
return
}
if errors.Is(err, procctl.ErrInvalidConfig) || errors.Is(err, procctl.ErrConfigNotFound) {
errorJSON(w, http.StatusBadRequest, "validation failed: "+err.Error())
return
}
if errors.Is(err, procctl.ErrNotSupported) {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
}
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "running": st.Running, "pid": st.Pid, "config_path": st.ConfigPath})
}
func (s *Server) handleMediaRestart(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, true) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
if s.proc == nil || !s.proc.Enabled() {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
}
req, err := readOptionalJSON[mediaProcReq](w, r, 1<<20)
if err != nil {
errorJSON(w, http.StatusBadRequest, err.Error())
return
}
st, err := s.proc.Restart(req.Config)
if err != nil {
if errors.Is(err, procctl.ErrInvalidConfig) || errors.Is(err, procctl.ErrConfigNotFound) {
errorJSON(w, http.StatusBadRequest, "validation failed: "+err.Error())
return
}
if errors.Is(err, procctl.ErrNotSupported) {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
}
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "running": st.Running, "pid": st.Pid, "config_path": st.ConfigPath})
}
func (s *Server) handleMediaStop(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, true) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
if s.proc == nil || !s.proc.Enabled() {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
}
st, err := s.proc.Stop()
if err != nil {
if errors.Is(err, procctl.ErrNotSupported) {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
}
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "running": st.Running, "pid": st.Pid, "config_path": st.ConfigPath})
}
func readOptionalJSON[T any](w http.ResponseWriter, r *http.Request, maxBytes int64) (T, error) {
var zero T
if r.Body == nil {
return zero, nil
}
r.Body = http.MaxBytesReader(w, r.Body, maxBytes)
body, err := io.ReadAll(r.Body)
if err != nil {
if strings.Contains(err.Error(), "request body too large") {
return zero, errors.New("payload too large")
}
return zero, fmt.Errorf("invalid json: %v", err)
}
if len(strings.TrimSpace(string(body))) == 0 {
return zero, nil
}
if mt, _, err := mime.ParseMediaType(r.Header.Get("Content-Type")); err != nil || mt != "application/json" {
return zero, errors.New("validation failed: Content-Type must be application/json")
}
var v T
if err := json.Unmarshal(body, &v); err != nil {
return zero, fmt.Errorf("invalid json: %v", err)
}
return v, nil
}
func (s *Server) handleGraphs(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")

View File

@ -105,10 +105,10 @@ func (s *Store) Upload(name string, r io.Reader, contentLength int64, expectedSh
}
item := Item{
Name: name,
Sha256: sha,
Path: filepath.ToSlash(finalPath),
Size: stat.Size(),
Name: name,
Sha256: sha,
Path: filepath.ToSlash(finalPath),
Size: stat.Size(),
MtimeMS: stat.ModTime().UnixMilli(),
}
if err := s.upsertManifest(item); err != nil {

View File

@ -0,0 +1,162 @@
package procctl
import (
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
"rk3588sys/agent/internal/config"
"rk3588sys/agent/internal/files"
)
var ErrNotSupported = errors.New("not supported")
var ErrConflict = errors.New("conflict")
var ErrInvalidConfig = errors.New("invalid config")
var ErrConfigNotFound = errors.New("config not found")
type Status struct {
Running bool `json:"running"`
Pid int `json:"pid"`
ConfigPath string `json:"config_path"`
}
type pidFile struct {
Pid int `json:"pid"`
ConfigPath string `json:"config_path"`
StartedAtMS int64 `json:"started_at_ms"`
}
type Controller struct {
mu sync.Mutex
proc config.MediaServerProcessConfig
defCfg string
}
func New(agentCfg config.AgentConfig) *Controller {
return &Controller{
proc: agentCfg.MediaServerProcess,
defCfg: agentCfg.ConfigPath,
}
}
func (c *Controller) Enabled() bool { return c != nil && c.proc.Enable }
func (c *Controller) Start(configName string) (Status, error) {
c.mu.Lock()
defer c.mu.Unlock()
resolved, err := c.resolveConfigPath(configName)
if err != nil {
return Status{}, err
}
pf, _ := c.readPidFile()
if pf != nil {
alive, _ := isAlive(pf.Pid)
if alive {
if filepath.Clean(pf.ConfigPath) == filepath.Clean(resolved) {
return Status{Running: true, Pid: pf.Pid, ConfigPath: pf.ConfigPath}, nil
}
return Status{}, fmt.Errorf("%w: already running with config %s", ErrConflict, pf.ConfigPath)
}
_ = os.Remove(c.proc.PidFile)
}
pid, err := startProcess(c.proc.ExecPath, c.proc.WorkDir, resolved)
if err != nil {
return Status{}, err
}
pf2 := pidFile{Pid: pid, ConfigPath: resolved, StartedAtMS: time.Now().UnixMilli()}
b, _ := json.Marshal(pf2)
b = append(b, '\n')
if err := files.WriteFileAtomic(c.proc.PidFile, b, 0o644); err != nil {
return Status{}, fmt.Errorf("write pid file: %w", err)
}
return Status{Running: true, Pid: pid, ConfigPath: resolved}, nil
}
func (c *Controller) Stop() (Status, error) {
c.mu.Lock()
defer c.mu.Unlock()
pf, err := c.readPidFile()
if err != nil {
return Status{}, err
}
if pf == nil {
return Status{Running: false}, nil
}
alive, _ := isAlive(pf.Pid)
if !alive {
_ = os.Remove(c.proc.PidFile)
return Status{Running: false, Pid: pf.Pid, ConfigPath: pf.ConfigPath}, nil
}
if err := stopProcess(pf.Pid, time.Duration(c.proc.GracefulTimeoutMS)*time.Millisecond); err != nil {
return Status{}, err
}
_ = os.Remove(c.proc.PidFile)
return Status{Running: false, Pid: pf.Pid, ConfigPath: pf.ConfigPath}, nil
}
func (c *Controller) Restart(configName string) (Status, error) {
_, _ = c.Stop()
return c.Start(configName)
}
func (c *Controller) resolveConfigPath(name string) (string, error) {
n := strings.TrimSpace(name)
if n == "" {
if strings.TrimSpace(c.defCfg) == "" {
return "", fmt.Errorf("%w: default config_path is empty", ErrInvalidConfig)
}
return c.defCfg, nil
}
if strings.Contains(n, "..") || strings.ContainsAny(n, "/\\") {
return "", fmt.Errorf("%w: contains invalid characters", ErrInvalidConfig)
}
if !strings.HasSuffix(n, ".json") {
n += ".json"
}
base := strings.TrimSpace(c.proc.ConfigsDir)
if base == "" {
return "", fmt.Errorf("%w: configs_dir is empty", ErrInvalidConfig)
}
p := filepath.Join(base, n)
st, err := os.Stat(p)
if err != nil {
if os.IsNotExist(err) {
return "", fmt.Errorf("%w: %s", ErrConfigNotFound, p)
}
return "", fmt.Errorf("stat config: %w", err)
}
if st.IsDir() {
return "", fmt.Errorf("%w: is a directory", ErrInvalidConfig)
}
return p, nil
}
func (c *Controller) readPidFile() (*pidFile, error) {
b, err := os.ReadFile(c.proc.PidFile)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, fmt.Errorf("read pid file: %w", err)
}
var pf pidFile
if err := json.Unmarshal(b, &pf); err != nil {
return nil, fmt.Errorf("parse pid file: %w", err)
}
if pf.Pid <= 0 {
return nil, fmt.Errorf("pid file invalid pid: %d", pf.Pid)
}
return &pf, nil
}

View File

@ -0,0 +1,79 @@
//go:build linux
package procctl
import (
"fmt"
"os"
"os/exec"
"syscall"
"time"
"rk3588sys/agent/internal/log"
)
func isAlive(pid int) (bool, error) {
if pid <= 0 {
return false, nil
}
p, err := os.FindProcess(pid)
if err != nil {
return false, err
}
if err := p.Signal(syscall.Signal(0)); err != nil {
return false, nil
}
return true, nil
}
func startProcess(execPath, workDir, configPath string) (int, error) {
cmd := exec.Command(execPath, "--config", configPath)
cmd.Dir = workDir
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
if err := cmd.Start(); err != nil {
return 0, fmt.Errorf("start media-server: %w", err)
}
pid := cmd.Process.Pid
go func() {
err := cmd.Wait()
if err != nil {
log.Warn(fmt.Sprintf("media-server exited pid=%d err=%v", pid, err))
} else {
log.Info(fmt.Sprintf("media-server exited pid=%d", pid))
}
}()
return pid, nil
}
func stopProcess(pid int, timeout time.Duration) error {
if pid <= 0 {
return nil
}
p, err := os.FindProcess(pid)
if err != nil {
return err
}
_ = p.Signal(syscall.SIGTERM)
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
alive, _ := isAlive(pid)
if !alive {
return nil
}
time.Sleep(100 * time.Millisecond)
}
_ = p.Signal(syscall.SIGKILL)
deadline = time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
alive, _ := isAlive(pid)
if !alive {
return nil
}
time.Sleep(100 * time.Millisecond)
}
return fmt.Errorf("failed to stop pid=%d", pid)
}

View File

@ -0,0 +1,11 @@
//go:build !linux
package procctl
import "time"
func isAlive(pid int) (bool, error) { return false, ErrNotSupported }
func startProcess(execPath, workDir, configPath string) (int, error) { return 0, ErrNotSupported }
func stopProcess(pid int, timeout time.Duration) error { return ErrNotSupported }