From 8dd26cf49a969961f299e027970e545ba840a8a6 Mon Sep 17 00:00:00 2001 From: tian <11429339@qq.com> Date: Thu, 7 May 2026 10:25:15 +0800 Subject: [PATCH] feat: add /v1/alarms/report and /v1/alarms/recent endpoints --- agent/internal/httpapi/alarms.go | 183 +++++++++++++++++++++++++++++++ agent/internal/httpapi/server.go | 2 + 2 files changed, 185 insertions(+) create mode 100644 agent/internal/httpapi/alarms.go diff --git a/agent/internal/httpapi/alarms.go b/agent/internal/httpapi/alarms.go new file mode 100644 index 0000000..46486b8 --- /dev/null +++ b/agent/internal/httpapi/alarms.go @@ -0,0 +1,183 @@ +package httpapi + +import ( + "bufio" + "crypto/rand" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + "rk3588sys/agent/internal/files" +) + +type alarmRecord struct { + ID string `json:"id"` + Timestamp string `json:"timestamp"` + Channel string `json:"channel"` + RuleName string `json:"rule_name"` + RuleType string `json:"rule_type"` + ObjectLabel string `json:"object_label,omitempty"` + Confidence float64 `json:"confidence,omitempty"` + SnapshotURL string `json:"snapshot_url,omitempty"` + ClipURL string `json:"clip_url,omitempty"` + DurationMs int64 `json:"duration_ms,omitempty"` +} + +func (s *Server) alarmsPath() string { + return filepath.Join(s.baseDir, "logs", "alarms.jsonl") +} + +// handleAlarmReport receives alarm notifications from media-server. +// POST /v1/alarms/report +func (s *Server) handleAlarmReport(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + errorJSON(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + if !s.authorize(r, true) { + errorJSON(w, http.StatusUnauthorized, "unauthorized") + return + } + + maxBytes := int64(1 << 20) // 1MB max + r.Body = http.MaxBytesReader(w, r.Body, maxBytes) + + var alarm alarmRecord + if err := json.NewDecoder(r.Body).Decode(&alarm); err != nil { + errorJSON(w, http.StatusBadRequest, "invalid json: "+err.Error()) + return + } + if strings.TrimSpace(alarm.Channel) == "" { + errorJSON(w, http.StatusBadRequest, "channel is required") + return + } + if strings.TrimSpace(alarm.RuleName) == "" { + errorJSON(w, http.StatusBadRequest, "rule_name is required") + return + } + if alarm.ID == "" { + alarm.ID = fmt.Sprintf("alarm_%s_%s", time.Now().Format("20060102_150405"), randomHex(6)) + } + if alarm.Timestamp == "" { + alarm.Timestamp = time.Now().Format(time.RFC3339) + } + + path := s.alarmsPath() + dir := filepath.Dir(path) + if err := files.EnsureDir(dir, 0o755); err != nil { + errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error()) + return + } + + line, err := json.Marshal(alarm) + if err != nil { + errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error()) + return + } + + f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) + if err != nil { + errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error()) + return + } + defer f.Close() + + if _, err := fmt.Fprintf(f, "%s\n", string(line)); err != nil { + errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error()) + return + } + + s.recordAudit(r, "alarm.report", true, alarm.ID) + writeJSON(w, http.StatusOK, map[string]any{"ok": true, "id": alarm.ID}) +} + +// handleAlarmsRecent returns recent alarm records. +// GET /v1/alarms/recent?limit=50&since=2026-01-01T00:00:00Z +func (s *Server) handleAlarmsRecent(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + errorJSON(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + if !s.authorize(r, false) { + errorJSON(w, http.StatusUnauthorized, "unauthorized") + return + } + + limit := 50 + if v := strings.TrimSpace(r.URL.Query().Get("limit")); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 && n <= 500 { + limit = n + } + } + + alarms, err := s.readRecentAlarms(limit) + if err != nil { + errorJSON(w, http.StatusInternalServerError, "internal error: "+err.Error()) + return + } + if alarms == nil { + alarms = make([]alarmRecord, 0) + } + + writeJSON(w, http.StatusOK, map[string]any{"alarms": alarms}) +} + +func (s *Server) readRecentAlarms(limit int) ([]alarmRecord, error) { + path := s.alarmsPath() + f, err := os.Open(path) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + return nil, err + } + defer f.Close() + + // Read all lines and keep last N + var lines []string + scanner := bufio.NewScanner(f) + scanner.Buffer(make([]byte, 1<<20), 1<<20) // 1MB max line + for scanner.Scan() { + line := scanner.Text() + if strings.TrimSpace(line) == "" { + continue + } + lines = append(lines, line) + // Keep only last 2*limit lines in memory + if len(lines) > 2*limit { + lines = lines[limit:] + } + } + if err := scanner.Err(); err != nil && err != io.EOF { + return nil, err + } + + // Take last `limit` lines + start := 0 + if len(lines) > limit { + start = len(lines) - limit + } + + alarms := make([]alarmRecord, 0, limit) + for i := start; i < len(lines); i++ { + var alarm alarmRecord + if err := json.Unmarshal([]byte(lines[i]), &alarm); err != nil { + continue // skip malformed lines + } + alarms = append(alarms, alarm) + } + return alarms, nil +} + +func randomHex(n int) string { + b := make([]byte, n) + rand.Read(b) + return hex.EncodeToString(b)[:n] +} diff --git a/agent/internal/httpapi/server.go b/agent/internal/httpapi/server.go index 9df3a3d..6976a2a 100644 --- a/agent/internal/httpapi/server.go +++ b/agent/internal/httpapi/server.go @@ -132,6 +132,8 @@ func New(agentCfg config.AgentConfig, baseDir string, ms *mediaserver.Client, st mux.HandleFunc("/v1/config/ui/apply", s.handleConfigUIApply) mux.HandleFunc("/v1/face-gallery", s.handleFaceGallery) mux.HandleFunc("/v1/face-gallery/reload", s.handleFaceGalleryReload) + mux.HandleFunc("/v1/alarms/report", s.handleAlarmReport) + mux.HandleFunc("/v1/alarms/recent", s.handleAlarmsRecent) mux.HandleFunc("/v1/resources/status", s.handleResourcesStatus) mux.HandleFunc("/v1/resources/", s.handleResourceUpload) mux.HandleFunc("/v1/models", s.handleModelsList)