更新agent接口

This commit is contained in:
sladro 2026-01-17 16:57:51 +08:00
parent 04973c3474
commit 86674140fc
21 changed files with 1577 additions and 30 deletions

View File

@ -268,6 +268,39 @@ Response 200
- 501`configs_dir` 未配置
- 500写盘失败
### 5.4 `PUT /v1/media-server/binary`
用途:更新 media-server 可执行文件(仅允许在 media-server 未运行时覆盖)。
**Auth**必须401
Headers
- `Content-Type: application/octet-stream`
- `Content-Length: <n>`(必须)
- `X-RK-Token: ...`
- `X-Binary-Sha256: <hex>`(可选;若存在必须匹配实际 sha256否则 400
Body二进制文件media-server 可执行文件)
Response 200
```json
{
"ok": true,
"path": "/opt/rk3588sys/bin/media-server",
"sha256": "...",
"size": 12345678,
"mtime_ms": 1730000000000,
"backup_path": "/opt/rk3588sys/bin/media-server.bak.20260117-123000"
}
```
失败:
- 400缺 Content-Length / Content-Type 非 application/octet-stream / sha256 不匹配
- 401unauthorized
- 409media-server 正在运行(需先 stop
- 413超过 `max_upload_mb`
- 501未启用进程控制agent 配置 `agent.media_server_process.enable=false` 或未配置)
- 500写盘/替换失败
## 6. 主程序进程控制agent 对外)
> 说明:该能力用于“启动/重启/关闭主程序media-server并选择加载哪个配置文件”。
@ -350,7 +383,7 @@ Response 200
Response 200
```json
{"ok":true,"running":true,"pid":1234,"config_path":"/etc/rk3588sys/config.json"}
{"ok":true,"running":true,"pid":1234,"config_path":"/etc/rk3588sys/config.json","version":"RK3588 Media Server v0.1.0 (git abc1234)"}
```
失败:

111
Agent_API_Extensions.md Normal file
View File

@ -0,0 +1,111 @@
# Agent 新增接口说明(供远程运维工具对接)
> 说明:本文件仅覆盖本次新增/扩展的 agent 接口与返回字段,不包含已有接口的完整文档。
## 通用
- 鉴权:需要 `X-RK-Token`(写操作必需;读操作依配置而定)。
- 返回:统一 JSON。
## 1) 指标接口
### GET `/v1/metrics`
**用途**:资源/进程运行指标JSON
**响应示例**(字段可能随平台略有不同):
```json
{
"timestamp_ms": 1710000000000,
"uptime_sec": 12345,
"cpu": {"usage_pct": 12.3, "sample_ms": 1000, "total_ticks": 123, "idle_ticks": 45},
"memory": {"total_kb": 1024000, "available_kb": 512000, "free_kb": 128000, "buffers_kb": 64000, "cached_kb": 256000},
"load_avg": {"one": 0.12, "five": 0.20, "fifteen": 0.30},
"disk": {"path": "/", "total_bytes": 1000000, "free_bytes": 200000, "used_bytes": 800000, "used_pct": 80},
"disk_io": {"read_bytes": 123456, "write_bytes": 654321},
"net": {"rx_bytes": 111, "tx_bytes": 222},
"media_server": {"supported": true, "running": true, "pid": 1234, "config_path": "/path", "started_at_ms": 1710000000000, "uptime_sec": 120}
}
```
## 2) 版本与资产
### GET `/v1/versions`
**用途**:版本信息与二进制/模型清单。
**响应示例**
```json
{
"agent": {
"version": "0.0.0-dev",
"git_sha": "...",
"binary": {"path": "...", "sha256": "...", "size": 123, "mtime_ms": 1710000000000}
},
"media_server": {
"supported": true,
"version": "...",
"binary": {"path": "...", "sha256": "...", "size": 456, "mtime_ms": 1710000000000}
},
"models": {
"count": 2,
"items": [{"name": "yolo", "sha256": "...", "path": "...", "size": 123, "mtime_ms": 1710000000000}]
}
}
```
### GET `/v1/assets`
**用途**:资产详情(配置、二进制、模型清单)。
**响应示例**
```json
{
"agent_binary": {"path": "...", "sha256": "...", "size": 123, "mtime_ms": 1710000000000},
"config": {"path": "...", "sha256": "...", "size": 456, "mtime_ms": 1710000000000},
"config_last_good": {"path": "...", "sha256": "...", "size": 456, "mtime_ms": 1710000000000},
"media_server_binary": {"path": "...", "sha256": "...", "size": 789, "mtime_ms": 1710000000000},
"models": {"items": [...]}
}
```
## 3) 任务状态查询
### GET `/v1/tasks/{id}`
**用途**:查询异步/长耗时任务状态。
**响应示例**
```json
{
"id": "...",
"type": "media_binary_update",
"status": "success",
"started_at_ms": 1710000000000,
"ended_at_ms": 1710000001000,
"error": "",
"result": {"path": "...", "sha256": "..."}
}
```
## 4) media-server 二进制回滚
### POST `/v1/media-server/binary/rollback`
**用途**:使用备份文件回滚 media-server 可执行文件。
**请求体**
```json
{ "backup_path": "/path/to/media-server.bak.20250101-120000" }
```
**响应示例**
```json
{
"ok": true,
"path": "...",
"sha256": "...",
"size": 123,
"mtime_ms": 1710000000000,
"backup_path": "...",
"task_id": "..."
}
```
## 5) 关键接口新增字段
- `/v1/media-server/status`、`/v1/media-server/start`、`/v1/media-server/restart`、`/v1/media-server/stop`
- **新增字段**`started_at_ms`
## 6) 操作审计(服务端日志)
- 审计日志JSONL默认落盘`{baseDir}/logs/agent_audit.jsonl`
- 覆盖操作:配置更新/应用、模型上传、media-server 启停与更新/回滚、face_gallery 更新与 reload 等

49
Agent_Feature_Gaps.md Normal file
View File

@ -0,0 +1,49 @@
# Agent 功能补全清单(建议)
> 目的:明确当前 agent 与“生产级设备管理”之间的差距,作为后续迭代清单。
## 1. 可观测性与性能监控
- 资源监控CPU/内存/磁盘/IO/网络吞吐
- 进程监控media-server 进程存活、重启次数、退出码、启动耗时
- 业务监控:关键 pipeline 延迟、丢帧率、处理速率FPS、队列长度
- 指标导出:/v1/metricsPrometheus 格式)或 JSON 指标接口
## 2. 版本与资产管理
- 设备侧版本查询agent 版本、media-server 版本、插件版本、模型版本
- 二进制/模型清单已安装版本、sha256、时间戳、来源
- 版本对比与漂移检测
## 3. 升级与回滚机制(更强一致性)
- 任务化升级:支持上传、校验、切换、验证、回滚的状态机
- 升级进度反馈:/v1/tasks/{id} 轮询或 SSE/WebSocket
- 多组件升级编排agent、media-server、插件、模型
## 4. 安全与权限
- Token 轮换与过期策略
- 读/写权限分级(至少读、写、运维三类)
- 操作审计:谁在什么时候做了什么(操作、结果、摘要)
## 5. 配置管理增强
- 配置版本历史与差异对比
- 配置发布审批/锁定(避免并发写冲突)
- 配置回滚到指定版本(不仅“上一次成功”)
## 6. 稳定性与自愈
- 崩溃自愈策略:退避重启、异常阈值熔断
- 看门狗与健康检查:/v1/healthz、/v1/readyz
## 7. 设备运维能力
- 日志归档与拉取(按时间范围/级别)
- 远程诊断:关键配置/环境/依赖状态采集
- 时间同步状态NTP/RTC
## 8. 规模化管理支持
- 设备分组/标签
- 批量操作与幂等指令
- 统一任务队列与重试策略
## 9. 最小优先级建议MVP+
1. 性能与进程监控指标接口
2. 版本查询与资产清单
3. 升级任务状态与回滚
4. 操作审计

View File

@ -81,6 +81,20 @@
> 说明:`det_post` 是检测结果后处理节点(可选)。它会对 `frame->det` 做过滤/修正/二次验证,使 OSD 与告警读取到一致的“后处理结果”。
### 2.3 多硬件解耦架构(已完成)
为支持 RK3588 之外的平台Atlas/Jetson 等),已将硬件强绑定模块抽象为统一接口层,并保留 RK3588 默认实现与 DMA-BUF 快路径:
- **推理**`IInferBackend`(默认 `Rk3588InferBackend`
- **图像处理**`IImageProcessor`(默认 `Rk3588ImageProcessor`RGA/Swscale 兜底)
- **编解码**`IDecoder` / `IEncoder`(默认 RK3588 MPP 路径)
- **缓冲区**`FrameBuffer` 统一 DMA/内存同步语义
运行时通过 `HwFactory` 根据配置选择后端实现:
- 配置字段:`platform` / `hw_platform`(不填默认 `rk3588`
- 可扩展实现:`Atlas*` / `Jetson*`(保持现有业务配置不变,仅切换硬件实现)
---
## 三、数据模型Frame & 元数据)

Binary file not shown.

View File

@ -0,0 +1,56 @@
package assets
import (
"crypto/sha256"
"encoding/hex"
"errors"
"io"
"os"
"path/filepath"
)
type FileInfo struct {
Path string `json:"path"`
Sha256 string `json:"sha256"`
Size int64 `json:"size"`
MtimeMS int64 `json:"mtime_ms"`
}
func Info(path string) (FileInfo, error) {
if path == "" {
return FileInfo{}, errors.New("path is empty")
}
st, err := os.Stat(path)
if err != nil {
return FileInfo{}, err
}
if st.IsDir() {
return FileInfo{}, errors.New("path is a directory")
}
f, err := os.Open(path)
if err != nil {
return FileInfo{}, err
}
defer f.Close()
h := sha256.New()
if _, err := io.Copy(h, f); err != nil {
return FileInfo{}, err
}
sha := hex.EncodeToString(h.Sum(nil))
return FileInfo{
Path: filepath.ToSlash(path),
Sha256: sha,
Size: st.Size(),
MtimeMS: st.ModTime().UnixMilli(),
}, nil
}
func ExecutableInfo() (FileInfo, error) {
p, err := os.Executable()
if err != nil {
return FileInfo{}, err
}
return Info(p)
}

View File

@ -0,0 +1,34 @@
package assets
import (
"crypto/sha256"
"encoding/hex"
"os"
"path/filepath"
"testing"
)
func TestInfo(t *testing.T) {
dir := t.TempDir()
p := filepath.Join(dir, "sample.bin")
data := []byte("rk3588-assets")
if err := os.WriteFile(p, data, 0o644); err != nil {
t.Fatalf("write file: %v", err)
}
got, err := Info(p)
if err != nil {
t.Fatalf("info: %v", err)
}
h := sha256.Sum256(data)
wantSha := hex.EncodeToString(h[:])
if got.Sha256 != wantSha {
t.Fatalf("sha256 mismatch: got %s want %s", got.Sha256, wantSha)
}
if got.Size != int64(len(data)) {
t.Fatalf("size mismatch: got %d want %d", got.Size, len(data))
}
if got.MtimeMS == 0 {
t.Fatalf("mtime not set")
}
}

View File

@ -0,0 +1,56 @@
package audit
import (
"encoding/json"
"os"
"path/filepath"
"sync"
"rk3588sys/agent/internal/files"
)
type Entry struct {
TimeMS int64 `json:"time_ms"`
Action string `json:"action"`
Success bool `json:"success"`
RemoteIP string `json:"remote_ip,omitempty"`
TokenHash string `json:"token_hash,omitempty"`
Detail string `json:"detail,omitempty"`
RequestID string `json:"request_id,omitempty"`
}
type Recorder struct {
mu sync.Mutex
path string
}
func NewRecorder(path string) *Recorder {
if path == "" {
return nil
}
return &Recorder{path: path}
}
func (r *Recorder) Record(entry Entry) error {
if r == nil {
return nil
}
b, err := json.Marshal(entry)
if err != nil {
return err
}
b = append(b, '\n')
r.mu.Lock()
defer r.mu.Unlock()
if err := files.EnsureDir(filepath.Dir(r.path), 0o755); err != nil {
return err
}
f, err := os.OpenFile(r.path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
if err != nil {
return err
}
defer f.Close()
_, err = f.Write(b)
return err
}

View File

@ -0,0 +1,37 @@
package audit
import (
"bufio"
"encoding/json"
"os"
"path/filepath"
"testing"
)
func TestRecorderRecord(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "audit.jsonl")
rec := NewRecorder(path)
entry := Entry{TimeMS: 123, Action: "config.update", Success: true, RemoteIP: "1.2.3.4"}
if err := rec.Record(entry); err != nil {
t.Fatalf("record: %v", err)
}
f, err := os.Open(path)
if err != nil {
t.Fatalf("open: %v", err)
}
defer f.Close()
scan := bufio.NewScanner(f)
if !scan.Scan() {
t.Fatalf("no log line")
}
var got Entry
if err := json.Unmarshal(scan.Bytes(), &got); err != nil {
t.Fatalf("unmarshal: %v", err)
}
if got.Action != entry.Action || got.Success != entry.Success {
t.Fatalf("entry mismatch: %+v", got)
}
}

View File

@ -0,0 +1,268 @@
package httpapi
import (
"crypto/sha256"
"encoding/hex"
"net"
"net/http"
"os"
"path/filepath"
"strings"
"time"
"rk3588sys/agent/internal/assets"
"rk3588sys/agent/internal/audit"
"rk3588sys/agent/internal/metrics"
"rk3588sys/agent/internal/sysinfo"
)
func defaultAuditPath(baseDir string) string {
if strings.TrimSpace(baseDir) == "" {
return filepath.Join("logs", "agent_audit.jsonl")
}
return filepath.Join(baseDir, "logs", "agent_audit.jsonl")
}
func (s *Server) handleMetrics(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, false) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
now := time.Now()
resp := map[string]any{
"timestamp_ms": now.UnixMilli(),
"uptime_sec": sysinfo.UptimeSec(),
}
if cpu, err := metrics.ReadCPUStat(); err == nil {
s.cpuMu.Lock()
usage := 0.0
intervalMS := int64(0)
if !s.lastCPUTS.IsZero() {
usage = metrics.CPUUsage(s.lastCPU, cpu)
intervalMS = now.Sub(s.lastCPUTS).Milliseconds()
}
s.lastCPU = cpu
s.lastCPUTS = now
s.cpuMu.Unlock()
resp["cpu"] = map[string]any{
"usage_pct": usage,
"sample_ms": intervalMS,
"total_ticks": cpu.Total,
"idle_ticks": cpu.Idle,
}
} else {
resp["cpu"] = map[string]any{"error": err.Error()}
}
if mem, err := metrics.ReadMemInfo(); err == nil {
resp["memory"] = mem
} else {
resp["memory"] = map[string]any{"error": err.Error()}
}
if la, err := metrics.ReadLoadAvg(); err == nil {
resp["load_avg"] = la
} else {
resp["load_avg"] = map[string]any{"error": err.Error()}
}
diskPath := s.baseDir
if strings.TrimSpace(diskPath) == "" {
diskPath = "/"
}
if du, err := metrics.ReadDiskUsage(diskPath); err == nil {
resp["disk"] = du
} else {
resp["disk"] = map[string]any{"path": diskPath, "error": err.Error()}
}
if nio, err := metrics.ReadNetDev(); err == nil {
resp["net"] = nio
} else {
resp["net"] = map[string]any{"error": err.Error()}
}
if dio, err := metrics.ReadDiskStats(); err == nil {
resp["disk_io"] = dio
} else {
resp["disk_io"] = map[string]any{"error": err.Error()}
}
media := map[string]any{"supported": s.proc != nil && s.proc.Enabled()}
if s.proc == nil || !s.proc.Enabled() {
resp["media_server"] = media
writeJSON(w, http.StatusOK, resp)
return
}
st, err := s.proc.Status()
if err != nil {
media["error"] = err.Error()
} else {
media["running"] = st.Running
media["pid"] = st.Pid
media["config_path"] = st.ConfigPath
media["started_at_ms"] = st.StartedAtMS
if st.Running && st.StartedAtMS > 0 {
media["uptime_sec"] = (now.UnixMilli() - st.StartedAtMS) / 1000
}
}
resp["media_server"] = media
writeJSON(w, http.StatusOK, resp)
}
func (s *Server) handleVersions(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, false) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
resp := map[string]any{}
agentInfo := map[string]any{"version": s.version, "git_sha": s.gitSHA}
if bin, err := assets.ExecutableInfo(); err == nil {
agentInfo["binary"] = bin
} else {
agentInfo["binary_error"] = err.Error()
}
resp["agent"] = agentInfo
mediaInfo := map[string]any{"supported": s.proc != nil && s.proc.Enabled()}
if s.proc != nil && s.proc.Enabled() {
if ver, err := s.proc.Version(); err == nil {
mediaInfo["version"] = ver
} else {
mediaInfo["version_error"] = err.Error()
}
if bin, err := s.proc.BinaryInfo(); err == nil {
mediaInfo["binary"] = bin
} else {
mediaInfo["binary_error"] = err.Error()
}
}
resp["media_server"] = mediaInfo
if m, err := s.store.List(); err == nil {
resp["models"] = map[string]any{"count": len(m.Items), "items": m.Items}
} else {
resp["models"] = map[string]any{"error": err.Error()}
}
writeJSON(w, http.StatusOK, resp)
}
func (s *Server) handleAssets(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, false) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
resp := map[string]any{}
if bin, err := assets.ExecutableInfo(); err == nil {
resp["agent_binary"] = bin
} else {
resp["agent_binary_error"] = err.Error()
}
if st, err := os.Stat(s.agentCfg.ConfigPath); err == nil && !st.IsDir() {
if info, err := assets.Info(s.agentCfg.ConfigPath); err == nil {
resp["config"] = info
} else {
resp["config_error"] = err.Error()
}
}
lastGood := s.agentCfg.ConfigPath + ".last_good.json"
if st, err := os.Stat(lastGood); err == nil && !st.IsDir() {
if info, err := assets.Info(lastGood); err == nil {
resp["config_last_good"] = info
} else {
resp["config_last_good_error"] = err.Error()
}
}
if s.proc != nil && s.proc.Enabled() {
if bin, err := s.proc.BinaryInfo(); err == nil {
resp["media_server_binary"] = bin
} else {
resp["media_server_binary_error"] = err.Error()
}
}
if m, err := s.store.List(); err == nil {
resp["models"] = m
} else {
resp["models_error"] = err.Error()
}
writeJSON(w, http.StatusOK, resp)
}
func (s *Server) handleTask(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, false) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
id := strings.TrimPrefix(r.URL.Path, "/v1/tasks/")
if strings.TrimSpace(id) == "" {
errorJSON(w, http.StatusNotFound, "not found")
return
}
t, ok := s.tasks.Get(id)
if !ok {
errorJSON(w, http.StatusNotFound, "not found")
return
}
writeJSON(w, http.StatusOK, t)
}
func (s *Server) recordAudit(r *http.Request, action string, ok bool, detail string) {
if s.audit == nil {
return
}
tok := strings.TrimSpace(r.Header.Get("X-RK-Token"))
entry := audit.Entry{
TimeMS: time.Now().UnixMilli(),
Action: action,
Success: ok,
RemoteIP: remoteIP(r),
TokenHash: tokenHash(tok),
Detail: detail,
RequestID: strings.TrimSpace(r.Header.Get("X-Request-Id")),
}
_ = s.audit.Record(entry)
}
func remoteIP(r *http.Request) string {
if v := strings.TrimSpace(r.Header.Get("X-Forwarded-For")); v != "" {
parts := strings.Split(v, ",")
if len(parts) > 0 {
return strings.TrimSpace(parts[0])
}
}
host, _, err := net.SplitHostPort(r.RemoteAddr)
if err == nil {
return host
}
return r.RemoteAddr
}
func tokenHash(tok string) string {
if tok == "" {
return ""
}
h := sha256.Sum256([]byte(tok))
return hex.EncodeToString(h[:])
}

View File

@ -96,6 +96,7 @@ func (s *Server) handleFaceGallery(w http.ResponseWriter, r *http.Request) {
}
if err := files.ReplaceFile(tmp, dst); err != nil {
s.recordAudit(r, "face_gallery.update", false, err.Error())
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
@ -103,9 +104,11 @@ func (s *Server) handleFaceGallery(w http.ResponseWriter, r *http.Request) {
st, err := os.Stat(dst)
if err != nil {
s.recordAudit(r, "face_gallery.update", false, err.Error())
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
s.recordAudit(r, "face_gallery.update", true, "")
writeJSON(w, http.StatusOK, faceGalleryInfo{Ok: true, Path: dst, Exists: true, Size: st.Size(), MtimeMS: st.ModTime().UnixMilli()})
return
default:
@ -174,11 +177,13 @@ func (s *Server) handleFaceGalleryReload(w http.ResponseWriter, r *http.Request)
}
patch := map[string]any{"gallery": map[string]any{"reload_seq": reloadSeq}}
if err := s.ms.UpdateNodeConfig(ctx, n.ID, g.Name, patch); err != nil {
s.recordAudit(r, "face_gallery.reload", false, err.Error())
errorJSON(w, http.StatusInternalServerError, "internal error: update node config failed: "+err.Error())
return
}
reloaded++
}
}
s.recordAudit(r, "face_gallery.reload", true, "")
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "reloaded": reloaded, "reload_seq": reloadSeq})
}

View File

@ -14,14 +14,18 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"time"
"rk3588sys/agent/internal/audit"
"rk3588sys/agent/internal/config"
"rk3588sys/agent/internal/files"
"rk3588sys/agent/internal/mediaserver"
"rk3588sys/agent/internal/metrics"
"rk3588sys/agent/internal/modelstore"
"rk3588sys/agent/internal/procctl"
"rk3588sys/agent/internal/sysinfo"
"rk3588sys/agent/internal/tasks"
)
type Server struct {
@ -29,6 +33,8 @@ type Server struct {
ms *mediaserver.Client
store *modelstore.Store
proc *procctl.Controller
audit *audit.Recorder
tasks *tasks.Registry
baseDir string
deviceID string
hostname string
@ -36,6 +42,9 @@ type Server struct {
mediaPort int
version string
gitSHA string
cpuMu sync.Mutex
lastCPU metrics.CPUStat
lastCPUTS time.Time
}
type InfoResponse struct {
@ -62,6 +71,8 @@ func New(agentCfg config.AgentConfig, baseDir string, ms *mediaserver.Client, st
ms: ms,
store: store,
proc: pc,
audit: audit.NewRecorder(defaultAuditPath(baseDir)),
tasks: tasks.NewRegistry(),
baseDir: baseDir,
deviceID: deviceID,
hostname: sysinfo.Hostname(),
@ -89,9 +100,15 @@ func New(agentCfg config.AgentConfig, baseDir string, ms *mediaserver.Client, st
mux.HandleFunc("/v1/media-server/stop", s.handleMediaStop)
mux.HandleFunc("/v1/media-server/status", s.handleMediaStatus)
mux.HandleFunc("/v1/media-server/configs/", s.handleMediaConfigUpload)
mux.HandleFunc("/v1/media-server/binary", s.handleMediaBinaryUpdate)
mux.HandleFunc("/v1/media-server/binary/rollback", s.handleMediaBinaryRollback)
mux.HandleFunc("/v1/graphs", s.handleGraphs)
mux.HandleFunc("/v1/graphs/", s.handleGraphDetail)
mux.HandleFunc("/v1/logs/recent", s.handleLogsRecent)
mux.HandleFunc("/v1/metrics", s.handleMetrics)
mux.HandleFunc("/v1/versions", s.handleVersions)
mux.HandleFunc("/v1/assets", s.handleAssets)
mux.HandleFunc("/v1/tasks/", s.handleTask)
return mux
}
@ -179,9 +196,11 @@ func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) {
}
if err := s.applyRootConfigBytes(r.Context(), body); err != nil {
s.recordAudit(r, "config.update", false, err.Error())
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
s.recordAudit(r, "config.update", true, "")
writeJSON(w, http.StatusOK, map[string]any{"ok": true})
return
default:
@ -251,8 +270,12 @@ func (s *Server) handleModelUpload(w http.ResponseWriter, r *http.Request) {
}
}
task := s.tasks.Start("model_upload")
item, err := s.store.Upload(name, r.Body, r.ContentLength, expected)
if err != nil {
_, _ = s.tasks.Finish(task.ID, nil, err)
w.Header().Set("X-Task-Id", task.ID)
s.recordAudit(r, "model.upload", false, err.Error())
if errors.Is(err, modelstore.ErrPayloadTooLarge) {
errorJSON(w, http.StatusRequestEntityTooLarge, "payload too large")
return
@ -264,12 +287,15 @@ func (s *Server) handleModelUpload(w http.ResponseWriter, r *http.Request) {
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
_, _ = s.tasks.Finish(task.ID, item, nil)
s.recordAudit(r, "model.upload", true, name)
writeJSON(w, http.StatusOK, map[string]any{
"ok": true,
"name": item.Name,
"sha256": item.Sha256,
"path": item.Path,
"size": item.Size,
"ok": true,
"name": item.Name,
"sha256": item.Sha256,
"path": item.Path,
"size": item.Size,
"task_id": task.ID,
})
}
@ -341,14 +367,17 @@ func (s *Server) handleMediaConfigUpload(w http.ResponseWriter, r *http.Request)
dst := filepath.Join(configsDir, finalName)
if err := files.WriteFileAtomic(dst, append(body, '\n'), 0o644); err != nil {
s.recordAudit(r, "media.config.upload", false, err.Error())
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
st, err := os.Stat(dst)
if err != nil {
s.recordAudit(r, "media.config.upload", false, err.Error())
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
s.recordAudit(r, "media.config.upload", true, finalName)
writeJSON(w, http.StatusOK, map[string]any{
"ok": true,
"name": finalName,
@ -368,9 +397,11 @@ func (s *Server) handleMediaReload(w http.ResponseWriter, r *http.Request) {
return
}
if err := s.ms.Reload(r.Context()); err != nil {
s.recordAudit(r, "media.reload", false, err.Error())
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
s.recordAudit(r, "media.reload", true, "")
writeJSON(w, http.StatusOK, map[string]any{"ok": true})
}
@ -384,9 +415,11 @@ func (s *Server) handleMediaRollback(w http.ResponseWriter, r *http.Request) {
return
}
if err := s.ms.Rollback(r.Context()); err != nil {
s.recordAudit(r, "media.rollback", false, err.Error())
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
s.recordAudit(r, "media.rollback", true, "")
writeJSON(w, http.StatusOK, map[string]any{"ok": true})
}
@ -416,6 +449,7 @@ func (s *Server) handleMediaStart(w http.ResponseWriter, r *http.Request) {
st, err := s.proc.Start(req.Config)
if err != nil {
s.recordAudit(r, "media.start", false, err.Error())
if errors.Is(err, procctl.ErrConflict) {
errorJSON(w, http.StatusConflict, err.Error())
return
@ -431,7 +465,8 @@ func (s *Server) handleMediaStart(w http.ResponseWriter, r *http.Request) {
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "running": st.Running, "pid": st.Pid, "config_path": st.ConfigPath})
s.recordAudit(r, "media.start", true, req.Config)
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "running": st.Running, "pid": st.Pid, "config_path": st.ConfigPath, "started_at_ms": st.StartedAtMS})
}
func (s *Server) handleMediaRestart(w http.ResponseWriter, r *http.Request) {
@ -456,6 +491,7 @@ func (s *Server) handleMediaRestart(w http.ResponseWriter, r *http.Request) {
st, err := s.proc.Restart(req.Config)
if err != nil {
s.recordAudit(r, "media.restart", false, err.Error())
if errors.Is(err, procctl.ErrInvalidConfig) || errors.Is(err, procctl.ErrConfigNotFound) {
errorJSON(w, http.StatusBadRequest, "validation failed: "+err.Error())
return
@ -467,7 +503,8 @@ func (s *Server) handleMediaRestart(w http.ResponseWriter, r *http.Request) {
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "running": st.Running, "pid": st.Pid, "config_path": st.ConfigPath})
s.recordAudit(r, "media.restart", true, req.Config)
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "running": st.Running, "pid": st.Pid, "config_path": st.ConfigPath, "started_at_ms": st.StartedAtMS})
}
func (s *Server) handleMediaStop(w http.ResponseWriter, r *http.Request) {
@ -485,6 +522,7 @@ func (s *Server) handleMediaStop(w http.ResponseWriter, r *http.Request) {
}
st, err := s.proc.Stop()
if err != nil {
s.recordAudit(r, "media.stop", false, err.Error())
if errors.Is(err, procctl.ErrNotSupported) {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
@ -492,7 +530,8 @@ func (s *Server) handleMediaStop(w http.ResponseWriter, r *http.Request) {
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "running": st.Running, "pid": st.Pid, "config_path": st.ConfigPath})
s.recordAudit(r, "media.stop", true, "")
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "running": st.Running, "pid": st.Pid, "config_path": st.ConfigPath, "started_at_ms": st.StartedAtMS})
}
func (s *Server) handleMediaStatus(w http.ResponseWriter, r *http.Request) {
@ -517,7 +556,141 @@ func (s *Server) handleMediaStatus(w http.ResponseWriter, r *http.Request) {
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "running": st.Running, "pid": st.Pid, "config_path": st.ConfigPath})
ver, verr := s.proc.Version()
if verr != nil {
errorJSON(w, http.StatusInternalServerError, "internal error: "+verr.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{
"ok": true,
"running": st.Running,
"pid": st.Pid,
"config_path": st.ConfigPath,
"started_at_ms": st.StartedAtMS,
"version": ver,
})
}
func (s *Server) handleMediaBinaryUpdate(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPut {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, true) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
if s.proc == nil || !s.proc.Enabled() {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
}
if mt, _, err := mime.ParseMediaType(r.Header.Get("Content-Type")); err != nil || mt != "application/octet-stream" {
errorJSON(w, http.StatusBadRequest, "validation failed: Content-Type must be application/octet-stream")
return
}
if r.ContentLength <= 0 {
errorJSON(w, http.StatusBadRequest, "validation failed: missing Content-Length")
return
}
maxBytes := int64(s.agentCfg.MaxUploadMB) * 1024 * 1024
r.Body = http.MaxBytesReader(w, r.Body, maxBytes)
expected := strings.TrimSpace(r.Header.Get("X-Binary-Sha256"))
if expected != "" {
if len(expected) != 64 {
errorJSON(w, http.StatusBadRequest, "validation failed: invalid X-Binary-Sha256")
return
}
if _, err := hex.DecodeString(expected); err != nil {
errorJSON(w, http.StatusBadRequest, "validation failed: invalid X-Binary-Sha256")
return
}
}
task := s.tasks.Start("media_binary_update")
res, err := s.proc.UpdateBinary(r.Body, r.ContentLength, expected)
if err != nil {
_, _ = s.tasks.Finish(task.ID, nil, err)
w.Header().Set("X-Task-Id", task.ID)
s.recordAudit(r, "media.binary.update", false, err.Error())
if errors.Is(err, procctl.ErrConflict) {
errorJSON(w, http.StatusConflict, err.Error())
return
}
if strings.Contains(err.Error(), "payload too large") || strings.Contains(err.Error(), "request body too large") {
errorJSON(w, http.StatusRequestEntityTooLarge, "payload too large")
return
}
if strings.Contains(err.Error(), "sha256 mismatch") {
errorJSON(w, http.StatusBadRequest, "validation failed: sha256 mismatch")
return
}
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
_, _ = s.tasks.Finish(task.ID, res, nil)
s.recordAudit(r, "media.binary.update", true, res.Path)
writeJSON(w, http.StatusOK, map[string]any{
"ok": true,
"path": res.Path,
"sha256": res.Sha256,
"size": res.Size,
"mtime_ms": res.MtimeMS,
"backup_path": res.BackupPath,
"task_id": task.ID,
})
}
type mediaBinaryRollbackReq struct {
BackupPath string `json:"backup_path"`
}
func (s *Server) handleMediaBinaryRollback(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, true) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
if s.proc == nil || !s.proc.Enabled() {
errorJSON(w, http.StatusNotImplemented, "not supported")
return
}
req, err := readRequiredJSON[mediaBinaryRollbackReq](w, r, 1<<20)
if err != nil {
errorJSON(w, http.StatusBadRequest, err.Error())
return
}
task := s.tasks.Start("media_binary_rollback")
res, err := s.proc.RollbackBinary(req.BackupPath)
if err != nil {
_, _ = s.tasks.Finish(task.ID, nil, err)
w.Header().Set("X-Task-Id", task.ID)
s.recordAudit(r, "media.binary.rollback", false, err.Error())
if errors.Is(err, procctl.ErrConflict) {
errorJSON(w, http.StatusConflict, err.Error())
return
}
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
_, _ = s.tasks.Finish(task.ID, res, nil)
s.recordAudit(r, "media.binary.rollback", true, res.Path)
writeJSON(w, http.StatusOK, map[string]any{
"ok": true,
"path": res.Path,
"sha256": res.Sha256,
"size": res.Size,
"mtime_ms": res.MtimeMS,
"backup_path": res.BackupPath,
"task_id": task.ID,
})
}
func normalizeConfigName(name string) (string, error) {

View File

@ -21,11 +21,11 @@ type UIInstance struct {
}
type uiRootConfig struct {
Global map[string]any `json:"global,omitempty"`
Queue map[string]any `json:"queue,omitempty"`
Templates map[string]any `json:"templates,omitempty"`
Instances []UIInstance `json:"instances,omitempty"`
Graphs []map[string]any `json:"graphs,omitempty"`
Global map[string]any `json:"global,omitempty"`
Queue map[string]any `json:"queue,omitempty"`
Templates map[string]any `json:"templates,omitempty"`
Instances []UIInstance `json:"instances,omitempty"`
Graphs []map[string]any `json:"graphs,omitempty"`
}
type uiPlanRequest struct {
@ -41,10 +41,10 @@ type uiDiff struct {
}
type uiPlanResponse struct {
Ok bool `json:"ok"`
Ok bool `json:"ok"`
GeneratedConfig map[string]any `json:"generated_config"`
Diff uiDiff `json:"diff"`
Warnings []string `json:"warnings,omitempty"`
Diff uiDiff `json:"diff"`
Warnings []string `json:"warnings,omitempty"`
}
func (s *Server) handleConfigUISchema(w http.ResponseWriter, r *http.Request) {
@ -565,9 +565,11 @@ func (s *Server) handleConfigUIApply(w http.ResponseWriter, r *http.Request) {
return
}
if err := s.applyRootConfigBytes(r.Context(), b); err != nil {
s.recordAudit(r, "config.ui.apply", false, err.Error())
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
s.recordAudit(r, "config.ui.apply", true, "")
writeJSON(w, http.StatusOK, resp)
}
@ -931,10 +933,10 @@ func uiTemplates() map[string]any {
"outputs": []any{map[string]any{"proto": "rtsp_server", "port": "${rtsp_port}", "path": "/live/${name}"}}},
map[string]any{"id": "alarm", "type": "alarm", "role": "sink", "enable": true,
"eval_fps": 10,
"labels": []any{},
"labels": []any{},
"rules": []any{
map[string]any{"name": "person_in_view", "class_ids": []any{0},
"roi": map[string]any{"x": 0.0, "y": 0.0, "w": 1.0, "h": 1.0},
"roi": map[string]any{"x": 0.0, "y": 0.0, "w": 1.0, "h": 1.0},
"min_duration_ms": 0, "cooldown_ms": "${cooldown_ms}"},
},
"actions": map[string]any{
@ -996,7 +998,7 @@ func uiTemplates() map[string]any {
map[string]any{"id": "face_recog", "type": "ai_face_recog", "role": "filter", "enable": true,
"model_path": "${recog_model_path}", "align": true, "emit_embedding": false, "max_faces": 10,
"threshold": map[string]any{"accept": "${thr_accept}", "margin": "${thr_margin}"},
"gallery": map[string]any{"backend": "sqlite", "path": "${gallery_path}", "load_on_start": true, "expected_dim": 512, "dtype": "auto"}},
"gallery": map[string]any{"backend": "sqlite", "path": "${gallery_path}", "load_on_start": true, "expected_dim": 512, "dtype": "auto"}},
map[string]any{"id": "osd", "type": "osd", "role": "filter", "enable": true,
"draw_bbox": true, "draw_text": true, "draw_face_det": false, "line_width": 2, "font_scale": 1, "labels": []any{}},
map[string]any{"id": "post", "type": "preprocess", "role": "filter", "enable": true,

View File

@ -0,0 +1,263 @@
package metrics
import (
"bufio"
"errors"
"os"
"strconv"
"strings"
)
var ErrNotSupported = errors.New("not supported")
type CPUStat struct {
Total uint64 `json:"total"`
Idle uint64 `json:"idle"`
}
type MemInfo struct {
TotalKB uint64 `json:"total_kb"`
AvailableKB uint64 `json:"available_kb"`
FreeKB uint64 `json:"free_kb"`
BuffersKB uint64 `json:"buffers_kb"`
CachedKB uint64 `json:"cached_kb"`
}
type NetIO struct {
RxBytes uint64 `json:"rx_bytes"`
TxBytes uint64 `json:"tx_bytes"`
}
type DiskIO struct {
ReadBytes uint64 `json:"read_bytes"`
WriteBytes uint64 `json:"write_bytes"`
}
type LoadAvg struct {
One float64 `json:"one"`
Five float64 `json:"five"`
Fifteen float64 `json:"fifteen"`
}
type DiskUsage struct {
Path string `json:"path"`
TotalBytes uint64 `json:"total_bytes"`
FreeBytes uint64 `json:"free_bytes"`
UsedBytes uint64 `json:"used_bytes"`
UsedPct float64 `json:"used_pct"`
}
func ReadCPUStat() (CPUStat, error) {
b, err := os.ReadFile("/proc/stat")
if err != nil {
return CPUStat{}, err
}
return ParseProcStat(string(b))
}
func ParseProcStat(data string) (CPUStat, error) {
s := bufio.NewScanner(strings.NewReader(data))
for s.Scan() {
line := strings.TrimSpace(s.Text())
if strings.HasPrefix(line, "cpu ") {
parts := strings.Fields(line)
if len(parts) < 5 {
return CPUStat{}, errors.New("invalid cpu stat")
}
var total uint64
var idle uint64
for i := 1; i < len(parts); i++ {
v, err := strconv.ParseUint(parts[i], 10, 64)
if err != nil {
return CPUStat{}, err
}
total += v
if i == 4 || i == 5 {
idle += v
}
}
return CPUStat{Total: total, Idle: idle}, nil
}
}
if err := s.Err(); err != nil {
return CPUStat{}, err
}
return CPUStat{}, errors.New("cpu line not found")
}
func CPUUsage(prev, cur CPUStat) float64 {
if cur.Total <= prev.Total {
return 0
}
totalDelta := cur.Total - prev.Total
idleDelta := uint64(0)
if cur.Idle > prev.Idle {
idleDelta = cur.Idle - prev.Idle
}
if totalDelta == 0 {
return 0
}
used := totalDelta - idleDelta
return float64(used) / float64(totalDelta) * 100
}
func ReadMemInfo() (MemInfo, error) {
b, err := os.ReadFile("/proc/meminfo")
if err != nil {
return MemInfo{}, err
}
return ParseMemInfo(string(b))
}
func ParseMemInfo(data string) (MemInfo, error) {
out := MemInfo{}
s := bufio.NewScanner(strings.NewReader(data))
for s.Scan() {
line := strings.TrimSpace(s.Text())
if line == "" {
continue
}
parts := strings.Fields(line)
if len(parts) < 2 {
continue
}
key := strings.TrimSuffix(parts[0], ":")
val, err := strconv.ParseUint(parts[1], 10, 64)
if err != nil {
continue
}
switch key {
case "MemTotal":
out.TotalKB = val
case "MemAvailable":
out.AvailableKB = val
case "MemFree":
out.FreeKB = val
case "Buffers":
out.BuffersKB = val
case "Cached":
out.CachedKB = val
}
}
if err := s.Err(); err != nil {
return MemInfo{}, err
}
if out.TotalKB == 0 {
return MemInfo{}, errors.New("meminfo missing MemTotal")
}
return out, nil
}
func ReadNetDev() (NetIO, error) {
b, err := os.ReadFile("/proc/net/dev")
if err != nil {
return NetIO{}, err
}
return ParseNetDev(string(b))
}
func ParseNetDev(data string) (NetIO, error) {
var out NetIO
s := bufio.NewScanner(strings.NewReader(data))
for s.Scan() {
line := strings.TrimSpace(s.Text())
if line == "" || strings.HasPrefix(line, "Inter-") || strings.HasPrefix(line, "face") {
continue
}
parts := strings.SplitN(line, ":", 2)
if len(parts) != 2 {
continue
}
name := strings.TrimSpace(parts[0])
if name == "lo" {
continue
}
fields := strings.Fields(parts[1])
if len(fields) < 9 {
continue
}
rx, err := strconv.ParseUint(fields[0], 10, 64)
if err != nil {
continue
}
tx, err := strconv.ParseUint(fields[8], 10, 64)
if err != nil {
continue
}
out.RxBytes += rx
out.TxBytes += tx
}
if err := s.Err(); err != nil {
return NetIO{}, err
}
return out, nil
}
func ReadDiskStats() (DiskIO, error) {
b, err := os.ReadFile("/proc/diskstats")
if err != nil {
return DiskIO{}, err
}
return ParseDiskStats(string(b))
}
func ParseDiskStats(data string) (DiskIO, error) {
var out DiskIO
s := bufio.NewScanner(strings.NewReader(data))
for s.Scan() {
line := strings.TrimSpace(s.Text())
if line == "" {
continue
}
fields := strings.Fields(line)
if len(fields) < 11 {
continue
}
name := fields[2]
if strings.HasPrefix(name, "loop") || strings.HasPrefix(name, "ram") {
continue
}
readSectors, err := strconv.ParseUint(fields[5], 10, 64)
if err != nil {
continue
}
writeSectors, err := strconv.ParseUint(fields[9], 10, 64)
if err != nil {
continue
}
out.ReadBytes += readSectors * 512
out.WriteBytes += writeSectors * 512
}
if err := s.Err(); err != nil {
return DiskIO{}, err
}
return out, nil
}
func ReadLoadAvg() (LoadAvg, error) {
b, err := os.ReadFile("/proc/loadavg")
if err != nil {
return LoadAvg{}, err
}
return ParseLoadAvg(string(b))
}
func ParseLoadAvg(data string) (LoadAvg, error) {
fields := strings.Fields(strings.TrimSpace(data))
if len(fields) < 3 {
return LoadAvg{}, errors.New("invalid loadavg")
}
one, err := strconv.ParseFloat(fields[0], 64)
if err != nil {
return LoadAvg{}, err
}
five, err := strconv.ParseFloat(fields[1], 64)
if err != nil {
return LoadAvg{}, err
}
fifteen, err := strconv.ParseFloat(fields[2], 64)
if err != nil {
return LoadAvg{}, err
}
return LoadAvg{One: one, Five: five, Fifteen: fifteen}, nil
}

View File

@ -0,0 +1,33 @@
//go:build linux
package metrics
import (
"strings"
"syscall"
)
func ReadDiskUsage(path string) (DiskUsage, error) {
p := strings.TrimSpace(path)
if p == "" {
p = "/"
}
var st syscall.Statfs_t
if err := syscall.Statfs(p, &st); err != nil {
return DiskUsage{Path: p}, err
}
total := st.Blocks * uint64(st.Bsize)
free := st.Bavail * uint64(st.Bsize)
used := total - free
usedPct := 0.0
if total > 0 {
usedPct = float64(used) / float64(total) * 100
}
return DiskUsage{
Path: p,
TotalBytes: total,
FreeBytes: free,
UsedBytes: used,
UsedPct: usedPct,
}, nil
}

View File

@ -0,0 +1,7 @@
//go:build !linux
package metrics
func ReadDiskUsage(path string) (DiskUsage, error) {
return DiskUsage{Path: path}, ErrNotSupported
}

View File

@ -0,0 +1,12 @@
//go:build !linux
package metrics
import "testing"
func TestDiskUsageNotSupported(t *testing.T) {
_, err := ReadDiskUsage("/")
if err != ErrNotSupported {
t.Fatalf("expected ErrNotSupported, got %v", err)
}
}

View File

@ -0,0 +1,64 @@
package metrics
import "testing"
func TestParseProcStat(t *testing.T) {
data := "cpu 10 20 30 40 50 60 70 80 90 100\n"
stat, err := ParseProcStat(data)
if err != nil {
t.Fatalf("parse: %v", err)
}
if stat.Total != 550 {
t.Fatalf("total mismatch: %d", stat.Total)
}
if stat.Idle != 90 {
t.Fatalf("idle mismatch: %d", stat.Idle)
}
}
func TestParseMemInfo(t *testing.T) {
data := "MemTotal: 1000 kB\nMemAvailable: 400 kB\nMemFree: 100 kB\nBuffers: 50 kB\nCached: 200 kB\n"
mem, err := ParseMemInfo(data)
if err != nil {
t.Fatalf("parse: %v", err)
}
if mem.TotalKB != 1000 || mem.AvailableKB != 400 || mem.CachedKB != 200 {
t.Fatalf("mem mismatch: %+v", mem)
}
}
func TestParseNetDev(t *testing.T) {
data := "Inter-| Receive | Transmit\n" +
" face |bytes packets errs drop fifo frame compressed multicast|bytes packets errs drop fifo colls carrier compressed\n" +
" lo: 1 0 0 0 0 0 0 0 2 0 0 0 0 0 0 0\n" +
"eth0: 100 0 0 0 0 0 0 0 200 0 0 0 0 0 0 0\n"
netio, err := ParseNetDev(data)
if err != nil {
t.Fatalf("parse: %v", err)
}
if netio.RxBytes != 100 || netio.TxBytes != 200 {
t.Fatalf("net mismatch: %+v", netio)
}
}
func TestParseDiskStats(t *testing.T) {
data := " 8 0 sda 1 0 10 0 2 0 20 0 0 0 0 0 0\n"
io, err := ParseDiskStats(data)
if err != nil {
t.Fatalf("parse: %v", err)
}
if io.ReadBytes != 10*512 || io.WriteBytes != 20*512 {
t.Fatalf("disk io mismatch: %+v", io)
}
}
func TestParseLoadAvg(t *testing.T) {
data := "0.10 0.20 0.30 1/100 1234\n"
la, err := ParseLoadAvg(data)
if err != nil {
t.Fatalf("parse: %v", err)
}
if la.One != 0.10 || la.Five != 0.20 || la.Fifteen != 0.30 {
t.Fatalf("load mismatch: %+v", la)
}
}

View File

@ -1,10 +1,14 @@
package procctl
import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
@ -20,9 +24,18 @@ var ErrInvalidConfig = errors.New("invalid config")
var ErrConfigNotFound = errors.New("config not found")
type Status struct {
Running bool `json:"running"`
Pid int `json:"pid"`
ConfigPath string `json:"config_path"`
Running bool `json:"running"`
Pid int `json:"pid"`
ConfigPath string `json:"config_path"`
StartedAtMS int64 `json:"started_at_ms"`
}
type BinaryUpdateResult struct {
Path string `json:"path"`
Sha256 string `json:"sha256"`
Size int64 `json:"size"`
MtimeMS int64 `json:"mtime_ms"`
BackupPath string `json:"backup_path"`
}
type pidFile struct {
@ -76,9 +89,31 @@ func (c *Controller) Status() (Status, error) {
}
if !alive {
_ = os.Remove(c.proc.PidFile)
return Status{Running: false, Pid: pf.Pid, ConfigPath: pf.ConfigPath}, nil
return Status{Running: false, Pid: pf.Pid, ConfigPath: pf.ConfigPath, StartedAtMS: pf.StartedAtMS}, nil
}
return Status{Running: true, Pid: pf.Pid, ConfigPath: pf.ConfigPath}, nil
return Status{Running: true, Pid: pf.Pid, ConfigPath: pf.ConfigPath, StartedAtMS: pf.StartedAtMS}, nil
}
func (c *Controller) Version() (string, error) {
c.mu.Lock()
defer c.mu.Unlock()
if strings.TrimSpace(c.proc.ExecPath) == "" {
return "", errors.New("exec_path is empty")
}
cmd := exec.Command(c.proc.ExecPath, "--version")
if strings.TrimSpace(c.proc.WorkDir) != "" {
cmd.Dir = c.proc.WorkDir
}
out, err := cmd.CombinedOutput()
if err != nil {
return "", fmt.Errorf("get version failed: %w", err)
}
ver := strings.TrimSpace(string(out))
if ver == "" {
return "", errors.New("version output empty")
}
return ver, nil
}
func (c *Controller) Start(configName string) (Status, error) {
@ -95,7 +130,7 @@ func (c *Controller) Start(configName string) (Status, error) {
alive, _ := isAlive(pf.Pid)
if alive {
if filepath.Clean(pf.ConfigPath) == filepath.Clean(resolved) {
return Status{Running: true, Pid: pf.Pid, ConfigPath: pf.ConfigPath}, nil
return Status{Running: true, Pid: pf.Pid, ConfigPath: pf.ConfigPath, StartedAtMS: pf.StartedAtMS}, nil
}
return Status{}, fmt.Errorf("%w: already running with config %s", ErrConflict, pf.ConfigPath)
}
@ -114,7 +149,7 @@ func (c *Controller) Start(configName string) (Status, error) {
_ = stopProcess(pid, 1*time.Second)
return Status{}, fmt.Errorf("write pid file: %w", err)
}
return Status{Running: true, Pid: pid, ConfigPath: resolved}, nil
return Status{Running: true, Pid: pid, ConfigPath: resolved, StartedAtMS: pf2.StartedAtMS}, nil
}
func (c *Controller) Stop() (Status, error) {
@ -132,14 +167,14 @@ func (c *Controller) Stop() (Status, error) {
alive, _ := isAlive(pf.Pid)
if !alive {
_ = os.Remove(c.proc.PidFile)
return Status{Running: false, Pid: pf.Pid, ConfigPath: pf.ConfigPath}, nil
return Status{Running: false, Pid: pf.Pid, ConfigPath: pf.ConfigPath, StartedAtMS: pf.StartedAtMS}, nil
}
if err := stopProcess(pf.Pid, time.Duration(c.proc.GracefulTimeoutMS)*time.Millisecond); err != nil {
return Status{}, err
}
_ = os.Remove(c.proc.PidFile)
return Status{Running: false, Pid: pf.Pid, ConfigPath: pf.ConfigPath}, nil
return Status{Running: false, Pid: pf.Pid, ConfigPath: pf.ConfigPath, StartedAtMS: pf.StartedAtMS}, nil
}
func (c *Controller) Restart(configName string) (Status, error) {
@ -179,6 +214,180 @@ func (c *Controller) resolveConfigPath(name string) (string, error) {
return p, nil
}
func (c *Controller) UpdateBinary(r io.Reader, contentLength int64, expectedSha256 string) (BinaryUpdateResult, error) {
c.mu.Lock()
defer c.mu.Unlock()
if contentLength <= 0 {
return BinaryUpdateResult{}, errors.New("missing Content-Length")
}
if strings.TrimSpace(c.proc.ExecPath) == "" {
return BinaryUpdateResult{}, errors.New("exec_path is empty")
}
pf, _ := c.readPidFile()
if pf != nil {
alive, _ := isAlive(pf.Pid)
if alive {
return BinaryUpdateResult{}, fmt.Errorf("%w: media-server is running", ErrConflict)
}
_ = os.Remove(c.proc.PidFile)
}
dir := filepath.Dir(c.proc.ExecPath)
if err := files.EnsureDir(dir, 0o755); err != nil {
return BinaryUpdateResult{}, err
}
f, err := os.CreateTemp(dir, ".tmp-*")
if err != nil {
return BinaryUpdateResult{}, fmt.Errorf("create temp: %w", err)
}
tmp := f.Name()
ok := false
defer func() {
_ = f.Close()
if !ok {
_ = os.Remove(tmp)
}
}()
h := sha256.New()
mw := io.MultiWriter(f, h)
if _, err := io.CopyN(mw, r, contentLength); err != nil {
return BinaryUpdateResult{}, fmt.Errorf("read body: %w", err)
}
if err := f.Chmod(0o755); err != nil {
return BinaryUpdateResult{}, fmt.Errorf("chmod temp: %w", err)
}
if err := f.Sync(); err != nil {
return BinaryUpdateResult{}, fmt.Errorf("fsync temp: %w", err)
}
if err := f.Close(); err != nil {
return BinaryUpdateResult{}, fmt.Errorf("close temp: %w", err)
}
sha := hex.EncodeToString(h.Sum(nil))
if expectedSha256 != "" && !strings.EqualFold(expectedSha256, sha) {
return BinaryUpdateResult{}, errors.New("sha256 mismatch")
}
backup := ""
if st, err := os.Stat(c.proc.ExecPath); err == nil && !st.IsDir() {
backup = fmt.Sprintf("%s.bak.%s", c.proc.ExecPath, time.Now().Format("20060102-150405"))
if err := os.Rename(c.proc.ExecPath, backup); err != nil {
return BinaryUpdateResult{}, fmt.Errorf("backup old binary: %w", err)
}
}
if err := files.ReplaceFile(tmp, c.proc.ExecPath); err != nil {
if backup != "" {
_ = os.Rename(backup, c.proc.ExecPath)
}
return BinaryUpdateResult{}, err
}
st, err := os.Stat(c.proc.ExecPath)
if err != nil {
return BinaryUpdateResult{}, fmt.Errorf("stat binary: %w", err)
}
ok = true
return BinaryUpdateResult{
Path: filepath.ToSlash(c.proc.ExecPath),
Sha256: sha,
Size: st.Size(),
MtimeMS: st.ModTime().UnixMilli(),
BackupPath: filepath.ToSlash(backup),
}, nil
}
func (c *Controller) RollbackBinary(backupPath string) (BinaryUpdateResult, error) {
c.mu.Lock()
defer c.mu.Unlock()
if strings.TrimSpace(backupPath) == "" {
return BinaryUpdateResult{}, errors.New("backup_path is empty")
}
if strings.TrimSpace(c.proc.ExecPath) == "" {
return BinaryUpdateResult{}, errors.New("exec_path is empty")
}
pf, _ := c.readPidFile()
if pf != nil {
alive, _ := isAlive(pf.Pid)
if alive {
return BinaryUpdateResult{}, fmt.Errorf("%w: media-server is running", ErrConflict)
}
_ = os.Remove(c.proc.PidFile)
}
if st, err := os.Stat(backupPath); err != nil {
return BinaryUpdateResult{}, fmt.Errorf("stat backup: %w", err)
} else if st.IsDir() {
return BinaryUpdateResult{}, errors.New("backup_path is a directory")
}
oldBackup := ""
if st, err := os.Stat(c.proc.ExecPath); err == nil && !st.IsDir() {
oldBackup = fmt.Sprintf("%s.bak.rollback.%s", c.proc.ExecPath, time.Now().Format("20060102-150405"))
if err := os.Rename(c.proc.ExecPath, oldBackup); err != nil {
return BinaryUpdateResult{}, fmt.Errorf("backup current binary: %w", err)
}
}
if err := files.ReplaceFile(backupPath, c.proc.ExecPath); err != nil {
if oldBackup != "" {
_ = os.Rename(oldBackup, c.proc.ExecPath)
}
return BinaryUpdateResult{}, err
}
st, err := os.Stat(c.proc.ExecPath)
if err != nil {
return BinaryUpdateResult{}, fmt.Errorf("stat binary: %w", err)
}
sha, err := sha256File(c.proc.ExecPath)
if err != nil {
return BinaryUpdateResult{}, err
}
return BinaryUpdateResult{
Path: filepath.ToSlash(c.proc.ExecPath),
Sha256: sha,
Size: st.Size(),
MtimeMS: st.ModTime().UnixMilli(),
BackupPath: filepath.ToSlash(oldBackup),
}, nil
}
func (c *Controller) BinaryInfo() (BinaryUpdateResult, error) {
c.mu.Lock()
defer c.mu.Unlock()
if strings.TrimSpace(c.proc.ExecPath) == "" {
return BinaryUpdateResult{}, errors.New("exec_path is empty")
}
st, err := os.Stat(c.proc.ExecPath)
if err != nil {
return BinaryUpdateResult{}, err
}
if st.IsDir() {
return BinaryUpdateResult{}, errors.New("exec_path is a directory")
}
sha, err := sha256File(c.proc.ExecPath)
if err != nil {
return BinaryUpdateResult{}, err
}
return BinaryUpdateResult{
Path: filepath.ToSlash(c.proc.ExecPath),
Sha256: sha,
Size: st.Size(),
MtimeMS: st.ModTime().UnixMilli(),
}, nil
}
func (c *Controller) readPidFile() (*pidFile, error) {
b, err := os.ReadFile(c.proc.PidFile)
if err != nil {
@ -196,3 +405,16 @@ func (c *Controller) readPidFile() (*pidFile, error) {
}
return &pf, nil
}
func sha256File(path string) (string, error) {
f, err := os.Open(path)
if err != nil {
return "", err
}
defer f.Close()
H := sha256.New()
if _, err := io.Copy(H, f); err != nil {
return "", err
}
return hex.EncodeToString(H.Sum(nil)), nil
}

View File

@ -0,0 +1,83 @@
package tasks
import (
"crypto/rand"
"encoding/hex"
"sync"
"time"
)
type Status string
const (
StatusRunning Status = "running"
StatusSuccess Status = "success"
StatusFailed Status = "failed"
)
type Task struct {
ID string `json:"id"`
Type string `json:"type"`
Status Status `json:"status"`
StartedAtMS int64 `json:"started_at_ms"`
EndedAtMS int64 `json:"ended_at_ms,omitempty"`
Error string `json:"error,omitempty"`
Result any `json:"result,omitempty"`
}
type Registry struct {
mu sync.Mutex
tasks map[string]Task
}
func NewRegistry() *Registry {
return &Registry{tasks: map[string]Task{}}
}
func (r *Registry) Start(typ string) Task {
r.mu.Lock()
defer r.mu.Unlock()
id := newID()
t := Task{
ID: id,
Type: typ,
Status: StatusRunning,
StartedAtMS: time.Now().UnixMilli(),
}
r.tasks[id] = t
return t
}
func (r *Registry) Finish(id string, result any, err error) (Task, bool) {
r.mu.Lock()
defer r.mu.Unlock()
t, ok := r.tasks[id]
if !ok {
return Task{}, false
}
if err != nil {
t.Status = StatusFailed
t.Error = err.Error()
} else {
t.Status = StatusSuccess
t.Result = result
}
t.EndedAtMS = time.Now().UnixMilli()
r.tasks[id] = t
return t, true
}
func (r *Registry) Get(id string) (Task, bool) {
r.mu.Lock()
defer r.mu.Unlock()
t, ok := r.tasks[id]
return t, ok
}
func newID() string {
b := make([]byte, 16)
if _, err := rand.Read(b); err != nil {
return hex.EncodeToString([]byte(time.Now().Format("20060102150405.000")))
}
return hex.EncodeToString(b)
}

View File

@ -0,0 +1,25 @@
package tasks
import "testing"
func TestRegistryLifecycle(t *testing.T) {
reg := NewRegistry()
task := reg.Start("upgrade")
if task.ID == "" || task.Status != StatusRunning {
t.Fatalf("unexpected start task: %+v", task)
}
got, ok := reg.Get(task.ID)
if !ok || got.ID != task.ID {
t.Fatalf("get failed: %+v", got)
}
res := map[string]any{"ok": true}
finished, ok := reg.Finish(task.ID, res, nil)
if !ok || finished.Status != StatusSuccess {
t.Fatalf("finish failed: %+v", finished)
}
got2, ok := reg.Get(task.ID)
if !ok || got2.Status != StatusSuccess {
t.Fatalf("get after finish failed: %+v", got2)
}
}