update agent files

This commit is contained in:
sladro 2026-01-18 11:10:52 +08:00
parent 86674140fc
commit f823e91825
4 changed files with 295 additions and 29 deletions

Binary file not shown.

View File

@ -0,0 +1,286 @@
package httpapi
import (
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"io"
"mime"
"net/http"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"time"
"rk3588sys/agent/internal/files"
)
type agentBinaryUpdateResult struct {
StagingPath string `json:"staging_path"`
Sha256 string `json:"sha256"`
Size int64 `json:"size"`
MtimeMS int64 `json:"mtime_ms"`
ScriptPath string `json:"script_path"`
ExecPath string `json:"exec_path"`
Pid int `json:"pid"`
LogPath string `json:"log_path"`
TaskID string `json:"task_id"`
}
func (s *Server) handleAgentBinaryUpdate(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPut {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, true) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
if mt, _, err := mime.ParseMediaType(r.Header.Get("Content-Type")); err != nil || mt != "application/octet-stream" {
errorJSON(w, http.StatusBadRequest, "validation failed: Content-Type must be application/octet-stream")
return
}
if r.ContentLength <= 0 {
errorJSON(w, http.StatusBadRequest, "validation failed: missing Content-Length")
return
}
if strings.TrimSpace(s.execPath) == "" {
errorJSON(w, http.StatusInternalServerError, "internal error: exec path is empty")
return
}
maxBytes := int64(s.agentCfg.MaxUploadMB) * 1024 * 1024
r.Body = http.MaxBytesReader(w, r.Body, maxBytes)
expected := strings.TrimSpace(r.Header.Get("X-Binary-Sha256"))
if expected != "" {
if len(expected) != 64 {
errorJSON(w, http.StatusBadRequest, "validation failed: invalid X-Binary-Sha256")
return
}
if _, err := hex.DecodeString(expected); err != nil {
errorJSON(w, http.StatusBadRequest, "validation failed: invalid X-Binary-Sha256")
return
}
}
baseDir := s.baseDir
if strings.TrimSpace(baseDir) == "" {
baseDir = filepath.Dir(s.execPath)
}
updateDir := filepath.Join(baseDir, "updates", "agent")
stagingDir := filepath.Join(updateDir, "staging")
backupDir := filepath.Join(updateDir, "backup")
lockPath := filepath.Join(updateDir, "update.lock")
if err := files.EnsureDir(updateDir, 0o755); err != nil {
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
lockFile, err := os.OpenFile(lockPath, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o644)
if err != nil {
if os.IsExist(err) {
errorJSON(w, http.StatusConflict, "update already in progress")
return
}
errorJSON(w, http.StatusInternalServerError, "internal error: create lock failed: "+err.Error())
return
}
_, _ = lockFile.WriteString(strconv.Itoa(os.Getpid()) + "\n")
_ = lockFile.Close()
lockOK := false
defer func() {
if !lockOK {
_ = os.Remove(lockPath)
}
}()
task := s.tasks.Start("agent_binary_update")
stagingPath, sha, size, mtimeMS, err := writeStagingBinary(stagingDir, r.Body, r.ContentLength, expected)
if err != nil {
_, _ = s.tasks.Finish(task.ID, nil, err)
s.recordAudit(r, "agent.binary.update", false, err.Error())
if strings.Contains(err.Error(), "payload too large") || strings.Contains(err.Error(), "request body too large") {
errorJSON(w, http.StatusRequestEntityTooLarge, "payload too large")
return
}
if strings.Contains(err.Error(), "sha256 mismatch") {
errorJSON(w, http.StatusBadRequest, "validation failed: sha256 mismatch")
return
}
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
pid := os.Getpid()
logPath := filepath.Join(updateDir, "update.log")
scriptPath, err := writeAgentUpdateScript(updateDir, pid, s.execPath, s.agentCfg.ConfigPath, stagingPath, backupDir, logPath, lockPath)
if err != nil {
_, _ = s.tasks.Finish(task.ID, nil, err)
s.recordAudit(r, "agent.binary.update", false, err.Error())
errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error())
return
}
cmd := exec.Command("/bin/sh", scriptPath)
if err := cmd.Start(); err != nil {
_, _ = s.tasks.Finish(task.ID, nil, err)
s.recordAudit(r, "agent.binary.update", false, err.Error())
errorJSON(w, http.StatusInternalServerError, "internal error: start update script failed: "+err.Error())
return
}
res := agentBinaryUpdateResult{
StagingPath: filepath.ToSlash(stagingPath),
Sha256: sha,
Size: size,
MtimeMS: mtimeMS,
ScriptPath: filepath.ToSlash(scriptPath),
ExecPath: filepath.ToSlash(s.execPath),
Pid: pid,
LogPath: filepath.ToSlash(logPath),
TaskID: task.ID,
}
_, _ = s.tasks.Finish(task.ID, res, nil)
lockOK = true
s.recordAudit(r, "agent.binary.update", true, res.StagingPath)
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "update": res})
go func() {
time.Sleep(500 * time.Millisecond)
os.Exit(0)
}()
}
func writeStagingBinary(dir string, r io.Reader, contentLength int64, expectedSha256 string) (string, string, int64, int64, error) {
if err := files.EnsureDir(dir, 0o755); err != nil {
return "", "", 0, 0, err
}
f, err := os.CreateTemp(dir, ".tmp-*")
if err != nil {
return "", "", 0, 0, fmt.Errorf("create temp: %w", err)
}
tmp := f.Name()
ok := false
defer func() {
_ = f.Close()
if !ok {
_ = os.Remove(tmp)
}
}()
h := sha256.New()
mw := io.MultiWriter(f, h)
if _, err := io.CopyN(mw, r, contentLength); err != nil {
return "", "", 0, 0, fmt.Errorf("read body: %w", err)
}
if err := f.Chmod(0o755); err != nil {
return "", "", 0, 0, fmt.Errorf("chmod temp: %w", err)
}
if err := f.Sync(); err != nil {
return "", "", 0, 0, fmt.Errorf("fsync temp: %w", err)
}
if err := f.Close(); err != nil {
return "", "", 0, 0, fmt.Errorf("close temp: %w", err)
}
sha := hex.EncodeToString(h.Sum(nil))
if expectedSha256 != "" && !strings.EqualFold(expectedSha256, sha) {
return "", "", 0, 0, errors.New("sha256 mismatch")
}
stagingPath := filepath.Join(dir, "agent_new.bin")
if err := files.ReplaceFile(tmp, stagingPath); err != nil {
return "", "", 0, 0, err
}
st, err := os.Stat(stagingPath)
if err != nil {
return "", "", 0, 0, fmt.Errorf("stat staging: %w", err)
}
ok = true
return stagingPath, sha, st.Size(), st.ModTime().UnixMilli(), nil
}
func writeAgentUpdateScript(updateDir string, pid int, execPath, configPath, stagingPath, backupDir, logPath, lockPath string) (string, error) {
if err := files.EnsureDir(updateDir, 0o755); err != nil {
return "", err
}
if err := files.EnsureDir(backupDir, 0o755); err != nil {
return "", err
}
scriptPath := filepath.Join(updateDir, "apply_update.sh")
content := fmt.Sprintf(`#!/bin/sh
set -u
PID=%d
EXEC=%s
CONFIG=%s
STAGING=%s
BACKUP_DIR=%s
LOG=%s
LOCK=%s
trap 'rm -f "$LOCK"' EXIT
mkdir -p "$BACKUP_DIR"
echo "$(date -Iseconds) update start" >> "$LOG"
if kill -0 "$PID" 2>/dev/null; then
i=0
while kill -0 "$PID" 2>/dev/null; do
i=$((i+1))
if [ "$i" -ge 60 ]; then
break
fi
sleep 0.5
done
fi
if kill -0 "$PID" 2>/dev/null; then
kill -TERM "$PID" 2>/dev/null || true
sleep 2
fi
if kill -0 "$PID" 2>/dev/null; then
kill -KILL "$PID" 2>/dev/null || true
sleep 1
fi
if kill -0 "$PID" 2>/dev/null; then
echo "$(date -Iseconds) agent still running" >> "$LOG"
exit 1
fi
ts=$(date +%%Y%%m%%d-%%H%%M%%S)
backup="$BACKUP_DIR/agent.bak.$ts"
if [ -f "$EXEC" ]; then
mv "$EXEC" "$backup"
fi
if mv "$STAGING" "$EXEC"; then
chmod 755 "$EXEC"
else
if [ -f "$backup" ]; then
mv "$backup" "$EXEC"
fi
echo "$(date -Iseconds) replace failed" >> "$LOG"
exit 1
fi
nohup "$EXEC" --config "$CONFIG" >/dev/null 2>&1 &
echo "$(date -Iseconds) update done" >> "$LOG"
`, pid, shQuote(execPath), shQuote(configPath), shQuote(stagingPath), shQuote(backupDir), shQuote(logPath), shQuote(lockPath))
if err := files.WriteFileAtomic(scriptPath, []byte(content), 0o755); err != nil {
return "", err
}
return scriptPath, nil
}
func shQuote(s string) string {
if s == "" {
return "''"
}
return "'" + strings.ReplaceAll(s, "'", `'"'"'`) + "'"
}

View File

@ -36,6 +36,7 @@ type Server struct {
audit *audit.Recorder
tasks *tasks.Registry
baseDir string
execPath string
deviceID string
hostname string
agentPort int
@ -66,6 +67,12 @@ func New(agentCfg config.AgentConfig, baseDir string, ms *mediaserver.Client, st
if agentCfg.MediaServerProcess.Enable {
pc = procctl.New(agentCfg, baseDir)
}
execPath, _ := os.Executable()
if execPath != "" {
if resolved, err := filepath.EvalSymlinks(execPath); err == nil {
execPath = resolved
}
}
s := &Server{
agentCfg: agentCfg,
ms: ms,
@ -74,6 +81,7 @@ func New(agentCfg config.AgentConfig, baseDir string, ms *mediaserver.Client, st
audit: audit.NewRecorder(defaultAuditPath(baseDir)),
tasks: tasks.NewRegistry(),
baseDir: baseDir,
execPath: execPath,
deviceID: deviceID,
hostname: sysinfo.Hostname(),
agentPort: agentPort,
@ -102,6 +110,7 @@ func New(agentCfg config.AgentConfig, baseDir string, ms *mediaserver.Client, st
mux.HandleFunc("/v1/media-server/configs/", s.handleMediaConfigUpload)
mux.HandleFunc("/v1/media-server/binary", s.handleMediaBinaryUpdate)
mux.HandleFunc("/v1/media-server/binary/rollback", s.handleMediaBinaryRollback)
mux.HandleFunc("/v1/agent/binary", s.handleAgentBinaryUpdate)
mux.HandleFunc("/v1/graphs", s.handleGraphs)
mux.HandleFunc("/v1/graphs/", s.handleGraphDetail)
mux.HandleFunc("/v1/logs/recent", s.handleLogsRecent)

View File

@ -1,29 +0,0 @@
{
"agent": {
"listen": "0.0.0.0:9100",
"token": "CHANGE_ME",
"require_token_for_read": false,
"discovery_enable": true,
"discovery_port": 35688,
"device_name": "cam1_strict_minio_alarm",
"device_id_path": "./device_id",
"models_dir": "./models",
"max_upload_mb": 200,
"config_path": "./configs/test_cam1_strict_minio_alarm_rtsp_server.json",
"media_server_process": {
"enable": true,
"exec_path": "../build/media-server",
"work_dir": "..",
"configs_dir": "../configs",
"pid_file": "./rk3588sys-media-server.pid",
"graceful_timeout_ms": 5000
},
"media_server_base_url": "http://127.0.0.1:9000",
"media_server_timeout_ms": 3000,
"media_server_retry": { "max_attempts": 3, "backoff_ms": [200, 500] }
}
}