OrangePi3588Media/agent/internal/httpapi/extras.go

484 lines
12 KiB
Go

package httpapi
import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"net"
"net/http"
"os"
"path/filepath"
"strings"
"time"
"rk3588sys/agent/internal/assets"
"rk3588sys/agent/internal/audit"
"rk3588sys/agent/internal/metrics"
"rk3588sys/agent/internal/sysinfo"
)
type configFileStatus struct {
Exists bool `json:"exists"`
Path string `json:"path"`
Sha256 string `json:"sha256,omitempty"`
Size int64 `json:"size,omitempty"`
MtimeMS int64 `json:"mtime_ms,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
Error string `json:"error,omitempty"`
}
type configMetadataSummary struct {
ConfigID string
ConfigVersion string
BusinessName string
Template string
Profile string
Overlays []string
InstanceName string
InstanceDisplayName string
}
func defaultAuditPath(baseDir string) string {
if strings.TrimSpace(baseDir) == "" {
return filepath.Join("logs", "agent_audit.jsonl")
}
return filepath.Join(baseDir, "logs", "agent_audit.jsonl")
}
func (s *Server) handleMetrics(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, false) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
now := time.Now()
resp := map[string]any{
"timestamp_ms": now.UnixMilli(),
"uptime_sec": sysinfo.UptimeSec(),
}
if cpu, err := metrics.ReadCPUStat(); err == nil {
s.cpuMu.Lock()
usage := 0.0
intervalMS := int64(0)
if !s.lastCPUTS.IsZero() {
usage = metrics.CPUUsage(s.lastCPU, cpu)
intervalMS = now.Sub(s.lastCPUTS).Milliseconds()
}
s.lastCPU = cpu
s.lastCPUTS = now
s.cpuMu.Unlock()
resp["cpu"] = map[string]any{
"usage_pct": usage,
"sample_ms": intervalMS,
"total_ticks": cpu.Total,
"idle_ticks": cpu.Idle,
}
} else {
resp["cpu"] = map[string]any{"error": err.Error()}
}
if mem, err := metrics.ReadMemInfo(); err == nil {
resp["memory"] = mem
} else {
resp["memory"] = map[string]any{"error": err.Error()}
}
if la, err := metrics.ReadLoadAvg(); err == nil {
resp["load_avg"] = la
} else {
resp["load_avg"] = map[string]any{"error": err.Error()}
}
diskPath := s.baseDir
if strings.TrimSpace(diskPath) == "" {
diskPath = "/"
}
if du, err := metrics.ReadDiskUsage(diskPath); err == nil {
resp["disk"] = du
} else {
resp["disk"] = map[string]any{"path": diskPath, "error": err.Error()}
}
if nio, err := metrics.ReadNetDev(); err == nil {
resp["net"] = nio
} else {
resp["net"] = map[string]any{"error": err.Error()}
}
if dio, err := metrics.ReadDiskStats(); err == nil {
resp["disk_io"] = dio
} else {
resp["disk_io"] = map[string]any{"error": err.Error()}
}
media := map[string]any{"supported": s.proc != nil && s.proc.Enabled()}
if s.proc == nil || !s.proc.Enabled() {
resp["media_server"] = media
writeJSON(w, http.StatusOK, resp)
return
}
st, err := s.proc.Status()
if err != nil {
media["error"] = err.Error()
} else {
media["running"] = st.Running
media["pid"] = st.Pid
media["config_path"] = st.ConfigPath
media["started_at_ms"] = st.StartedAtMS
if st.Running && st.StartedAtMS > 0 {
media["uptime_sec"] = (now.UnixMilli() - st.StartedAtMS) / 1000
}
}
resp["media_server"] = media
writeJSON(w, http.StatusOK, resp)
}
func (s *Server) handleVersions(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, false) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
resp := map[string]any{}
agentInfo := map[string]any{
"version": s.version,
"build_id": s.buildID,
"build_type": s.buildType,
"git_sha": s.gitSHA,
}
if bin, err := assets.ExecutableInfo(); err == nil {
agentInfo["binary"] = bin
} else {
agentInfo["binary_error"] = err.Error()
}
resp["agent"] = agentInfo
mediaInfo := map[string]any{"supported": s.proc != nil && s.proc.Enabled()}
if s.proc != nil && s.proc.Enabled() {
if ver, err := s.proc.Version(); err == nil {
mediaInfo["version"] = ver
} else {
mediaInfo["version_error"] = err.Error()
}
if bin, err := s.proc.BinaryInfo(); err == nil {
mediaInfo["binary"] = bin
} else {
mediaInfo["binary_error"] = err.Error()
}
}
resp["media_server"] = mediaInfo
if m, err := s.store.List(); err == nil {
resp["models"] = map[string]any{"count": len(m.Items), "items": m.Items}
} else {
resp["models"] = map[string]any{"error": err.Error()}
}
writeJSON(w, http.StatusOK, resp)
}
func (s *Server) handleConfigStatus(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, false) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
writeJSON(w, http.StatusOK, s.configStatusPayload())
}
func (s *Server) configCandidatePath() string {
return s.agentCfg.ConfigPath + ".candidate.json"
}
func (s *Server) configStatusPayload() map[string]any {
current := readConfigFileStatus(s.agentCfg.ConfigPath)
lastGoodPath := s.agentCfg.ConfigPath + ".last_good.json"
lastGood := readConfigFileStatus(lastGoodPath)
candidatePath := s.configCandidatePath()
candidate := readConfigFileStatus(candidatePath)
resp := map[string]any{
"ok": true,
"config_path": filepath.ToSlash(s.agentCfg.ConfigPath),
"exists": current.Exists,
"previous_config_path": filepath.ToSlash(lastGoodPath),
"previous_config": lastGood,
"candidate_path": filepath.ToSlash(candidatePath),
"candidate": candidate,
}
if current.Exists {
resp["size"] = current.Size
resp["mtime_ms"] = current.MtimeMS
resp["sha256"] = current.Sha256
}
if len(current.Metadata) > 0 {
resp["metadata"] = current.Metadata
if summary := metadataSummaryFromMap(current.Metadata); summary != nil {
if summary.ConfigID != "" {
resp["config_id"] = summary.ConfigID
}
if summary.ConfigVersion != "" {
resp["config_version"] = summary.ConfigVersion
}
if summary.BusinessName != "" {
resp["business_name"] = summary.BusinessName
}
if summary.Template != "" {
resp["template"] = summary.Template
}
if summary.Profile != "" {
resp["profile"] = summary.Profile
}
if len(summary.Overlays) > 0 {
resp["overlays"] = copyStringSlice(summary.Overlays)
}
if summary.InstanceName != "" {
resp["instance_name"] = summary.InstanceName
}
if summary.InstanceDisplayName != "" {
resp["instance_display_name"] = summary.InstanceDisplayName
}
}
}
if current.Error != "" {
resp["error"] = current.Error
}
media := map[string]any{"supported": s.proc != nil && s.proc.Enabled()}
if s.proc != nil && s.proc.Enabled() {
if st, err := s.proc.Status(); err == nil {
media["running"] = st.Running
media["pid"] = st.Pid
media["config_path"] = filepath.ToSlash(st.ConfigPath)
media["started_at_ms"] = st.StartedAtMS
} else {
media["error"] = err.Error()
}
}
resp["media_server"] = media
return resp
}
func readConfigMetadataSummary(path string) *configMetadataSummary {
st := readConfigFileStatus(path)
if len(st.Metadata) == 0 {
return nil
}
return metadataSummaryFromMap(st.Metadata)
}
func metadataSummaryFromMap(metadata map[string]any) *configMetadataSummary {
if len(metadata) == 0 {
return nil
}
summary := &configMetadataSummary{
ConfigID: stringValue(metadata["config_id"]),
ConfigVersion: stringValue(metadata["config_version"]),
BusinessName: stringValue(metadata["business_name"]),
Template: stringValue(metadata["template"]),
Profile: stringValue(metadata["profile"]),
Overlays: stringSliceValue(metadata["overlays"]),
}
if names := stringSliceValue(metadata["instance_names"]); len(names) > 0 {
summary.InstanceName = names[0]
}
if names := stringSliceValue(metadata["instance_display_names"]); len(names) > 0 {
summary.InstanceDisplayName = names[0]
}
return summary
}
func stringValue(v any) string {
s, _ := v.(string)
return strings.TrimSpace(s)
}
func stringSliceValue(v any) []string {
switch vv := v.(type) {
case []string:
return copyStringSlice(vv)
case []any:
out := make([]string, 0, len(vv))
for _, item := range vv {
if s, ok := item.(string); ok {
s = strings.TrimSpace(s)
if s != "" {
out = append(out, s)
}
}
}
return out
default:
return nil
}
}
func copyStringSlice(v []string) []string {
if len(v) == 0 {
return nil
}
out := make([]string, len(v))
copy(out, v)
return out
}
func readConfigFileStatus(path string) configFileStatus {
out := configFileStatus{Path: filepath.ToSlash(path)}
if strings.TrimSpace(path) == "" {
out.Error = "path is empty"
return out
}
info, err := assets.Info(path)
if err != nil {
if os.IsNotExist(err) {
return out
}
out.Error = err.Error()
return out
}
out.Exists = true
out.Sha256 = info.Sha256
out.Size = info.Size
out.MtimeMS = info.MtimeMS
b, err := os.ReadFile(path)
if err != nil {
out.Error = err.Error()
return out
}
var root struct {
Metadata map[string]any `json:"metadata"`
}
if err := json.Unmarshal(b, &root); err != nil {
out.Error = "config is not valid json: " + err.Error()
return out
}
if len(root.Metadata) > 0 {
out.Metadata = root.Metadata
}
return out
}
func (s *Server) handleAssets(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, false) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
resp := map[string]any{}
if bin, err := assets.ExecutableInfo(); err == nil {
resp["agent_binary"] = bin
} else {
resp["agent_binary_error"] = err.Error()
}
if st, err := os.Stat(s.agentCfg.ConfigPath); err == nil && !st.IsDir() {
if info, err := assets.Info(s.agentCfg.ConfigPath); err == nil {
resp["config"] = info
} else {
resp["config_error"] = err.Error()
}
}
lastGood := s.agentCfg.ConfigPath + ".last_good.json"
if st, err := os.Stat(lastGood); err == nil && !st.IsDir() {
if info, err := assets.Info(lastGood); err == nil {
resp["config_last_good"] = info
} else {
resp["config_last_good_error"] = err.Error()
}
}
if s.proc != nil && s.proc.Enabled() {
if bin, err := s.proc.BinaryInfo(); err == nil {
resp["media_server_binary"] = bin
} else {
resp["media_server_binary_error"] = err.Error()
}
}
if m, err := s.store.List(); err == nil {
resp["models"] = m
} else {
resp["models_error"] = err.Error()
}
writeJSON(w, http.StatusOK, resp)
}
func (s *Server) handleTask(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
errorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if !s.authorize(r, false) {
errorJSON(w, http.StatusUnauthorized, "unauthorized")
return
}
id := strings.TrimPrefix(r.URL.Path, "/v1/tasks/")
if strings.TrimSpace(id) == "" {
errorJSON(w, http.StatusNotFound, "not found")
return
}
t, ok := s.tasks.Get(id)
if !ok {
errorJSON(w, http.StatusNotFound, "not found")
return
}
writeJSON(w, http.StatusOK, t)
}
func (s *Server) recordAudit(r *http.Request, action string, ok bool, detail string) {
if s.audit == nil {
return
}
tok := strings.TrimSpace(r.Header.Get("X-RK-Token"))
entry := audit.Entry{
TimeMS: time.Now().UnixMilli(),
Action: action,
Success: ok,
RemoteIP: remoteIP(r),
TokenHash: tokenHash(tok),
Detail: detail,
RequestID: strings.TrimSpace(r.Header.Get("X-Request-Id")),
}
_ = s.audit.Record(entry)
}
func remoteIP(r *http.Request) string {
if v := strings.TrimSpace(r.Header.Get("X-Forwarded-For")); v != "" {
parts := strings.Split(v, ",")
if len(parts) > 0 {
return strings.TrimSpace(parts[0])
}
}
host, _, err := net.SplitHostPort(r.RemoteAddr)
if err == nil {
return host
}
return r.RemoteAddr
}
func tokenHash(tok string) string {
if tok == "" {
return ""
}
h := sha256.Sum256([]byte(tok))
return hex.EncodeToString(h[:])
}