diff --git a/cmd/managerd/main.go b/cmd/managerd/main.go index 4fe4833..dfde7d7 100644 --- a/cmd/managerd/main.go +++ b/cmd/managerd/main.go @@ -68,6 +68,8 @@ func main() { if err := taskSvc.LoadPersistedTasks(); err != nil { log.Printf("load persisted tasks: %v", err) } + alarmCollector := service.NewAlarmCollector(store.DB(), agentClient, regSvc) + alarmCollector.Start() tplSvc := service.NewTemplateService(cfg) h := api.NewHandler(discoSvc, regSvc, agentClient, taskSvc, tplSvc) @@ -97,6 +99,7 @@ func main() { ui.SetAuditRepo(auditRepo) ui.SetDBPath(cfg.DBPathOrDefault()) ui.SetResourcesRepo(resourcesRepo) + ui.SetAlarmCollector(alarmCollector) uiRouter, err := ui.Routes() if err != nil { log.Fatalf("failed to init ui routes: %v", err) diff --git a/internal/service/alarm_collector.go b/internal/service/alarm_collector.go new file mode 100644 index 0000000..e40f34e --- /dev/null +++ b/internal/service/alarm_collector.go @@ -0,0 +1,179 @@ +package service + +import ( + "database/sql" + "encoding/json" + "log" + "sync" + "time" + + "3588AdminBackend/internal/models" +) + +type AlarmRecord struct { + ID string `json:"id"` + DeviceID string `json:"device_id"` + Channel string `json:"channel"` + Timestamp string `json:"timestamp"` + RuleName string `json:"rule_name"` + RuleType string `json:"rule_type"` + ObjectLabel string `json:"object_label"` + Confidence float64 `json:"confidence"` + SnapshotURL string `json:"snapshot_url"` + ClipURL string `json:"clip_url"` + DurationMs int64 `json:"duration_ms"` + CollectedAt string `json:"collected_at"` +} + +type AlarmCollector struct { + db *sql.DB + agent *AgentClient + registry *RegistryService + mu sync.Mutex + lastID string // last alarm ID seen, to avoid duplicates +} + +func NewAlarmCollector(db *sql.DB, agent *AgentClient, registry *RegistryService) *AlarmCollector { + return &AlarmCollector{db: db, agent: agent, registry: registry} +} + +func (c *AlarmCollector) Start() { + go c.poll() +} + +func (c *AlarmCollector) poll() { + // Initial delay to let the system settle + time.Sleep(5 * time.Second) + + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for range ticker.C { + c.collectFromDevices() + } +} + +func (c *AlarmCollector) collectFromDevices() { + if c.agent == nil || c.registry == nil { + return + } + devices := c.registry.GetDevices() + for _, dev := range devices { + if dev == nil || !dev.Online { + continue + } + alarms, err := c.fetchDeviceAlarms(dev) + if err != nil { + continue + } + for _, alarm := range alarms { + alarm.DeviceID = dev.DeviceID + alarm.CollectedAt = time.Now().Format(time.RFC3339) + c.saveAlarm(alarm) + } + } +} + +func (c *AlarmCollector) fetchDeviceAlarms(dev *models.Device) ([]AlarmRecord, error) { + c.mu.Lock() + lastID := c.lastID + c.mu.Unlock() + + url := "/v1/alarms/recent?limit=100" + body, status, err := c.agent.Do("GET", dev.IP, dev.AgentPort, url, nil) + if err != nil { + return nil, err + } + if status != 200 { + return nil, nil // agent may not support alarms yet + } + + var resp struct { + Alarms []struct { + ID string `json:"id"` + Timestamp string `json:"timestamp"` + Channel string `json:"channel"` + RuleName string `json:"rule_name"` + RuleType string `json:"rule_type"` + ObjectLabel string `json:"object_label"` + Confidence float64 `json:"confidence"` + SnapshotURL string `json:"snapshot_url"` + ClipURL string `json:"clip_url"` + DurationMs int64 `json:"duration_ms"` + } `json:"alarms"` + } + if err := json.Unmarshal(body, &resp); err != nil { + return nil, err + } + + newLastID := lastID + alarms := make([]AlarmRecord, 0) + for _, a := range resp.Alarms { + if a.ID == lastID { + break // reached previously seen alarms + } + alarms = append(alarms, AlarmRecord{ + ID: a.ID, + Timestamp: a.Timestamp, + Channel: a.Channel, + RuleName: a.RuleName, + RuleType: a.RuleType, + ObjectLabel: a.ObjectLabel, + Confidence: a.Confidence, + SnapshotURL: a.SnapshotURL, + ClipURL: a.ClipURL, + DurationMs: a.DurationMs, + }) + if newLastID == "" { + newLastID = a.ID + } + } + + if newLastID != "" { + c.mu.Lock() + c.lastID = newLastID + c.mu.Unlock() + } + return alarms, nil +} + +func (c *AlarmCollector) saveAlarm(alarm AlarmRecord) { + if c.db == nil { + return + } + _, err := c.db.Exec(` +INSERT INTO alarm_records(id, device_id, channel, timestamp, rule_name, rule_type, object_label, confidence, snapshot_url, clip_url, duration_ms, collected_at) +VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT(id) DO NOTHING +`, alarm.ID, alarm.DeviceID, alarm.Channel, alarm.Timestamp, alarm.RuleName, alarm.RuleType, alarm.ObjectLabel, alarm.Confidence, alarm.SnapshotURL, alarm.ClipURL, alarm.DurationMs, alarm.CollectedAt) + if err != nil { + log.Printf("alarm collector: save error: %v", err) + } +} + +// GetRecent returns the most recent N alarm records, newest first. +func (c *AlarmCollector) GetRecent(limit int) []AlarmRecord { + if c.db == nil || limit <= 0 { + return nil + } + rows, err := c.db.Query(` +SELECT id, device_id, channel, timestamp, rule_name, rule_type, object_label, confidence, snapshot_url, clip_url, duration_ms, collected_at +FROM alarm_records +ORDER BY timestamp DESC +LIMIT ? +`, limit) + if err != nil { + return nil + } + defer rows.Close() + + var alarms []AlarmRecord + for rows.Next() { + var a AlarmRecord + if err := rows.Scan(&a.ID, &a.DeviceID, &a.Channel, &a.Timestamp, &a.RuleName, &a.RuleType, &a.ObjectLabel, &a.Confidence, &a.SnapshotURL, &a.ClipURL, &a.DurationMs, &a.CollectedAt); err != nil { + continue + } + alarms = append(alarms, a) + } + return alarms +} diff --git a/internal/storage/migrate.go b/internal/storage/migrate.go index b1c2fc8..9e1defd 100644 --- a/internal/storage/migrate.go +++ b/internal/storage/migrate.go @@ -77,6 +77,20 @@ CREATE TABLE IF NOT EXISTS standard_resources ( created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); +CREATE TABLE IF NOT EXISTS alarm_records ( + id TEXT PRIMARY KEY, + device_id TEXT NOT NULL, + channel TEXT NOT NULL, + timestamp TEXT NOT NULL, + rule_name TEXT NOT NULL, + rule_type TEXT NOT NULL, + object_label TEXT NOT NULL DEFAULT '', + confidence REAL NOT NULL DEFAULT 0, + snapshot_url TEXT NOT NULL DEFAULT '', + clip_url TEXT NOT NULL DEFAULT '', + duration_ms INTEGER NOT NULL DEFAULT 0, + collected_at TEXT NOT NULL +); CREATE TABLE IF NOT EXISTS scene_templates ( id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE, diff --git a/internal/web/ui.go b/internal/web/ui.go index f2f96dd..c59d8f3 100644 --- a/internal/web/ui.go +++ b/internal/web/ui.go @@ -33,6 +33,7 @@ type UI struct { auditRepo *storage.AuditLogsRepo dbPath string resourcesRepo *storage.ResourcesRepo + alarmCollector *service.AlarmCollector tpl *template.Template } @@ -501,6 +502,13 @@ func (u *UI) SetResourcesRepo(repo *storage.ResourcesRepo) { u.resourcesRepo = repo } +func (u *UI) SetAlarmCollector(ac *service.AlarmCollector) { + if u == nil { + return + } + u.alarmCollector = ac +} + func tablerIconSVG(name string) string { icons := map[string]string{ "devices": ``,