feat: alarm collector service - poll devices for alarm records

This commit is contained in:
tian 2026-05-07 10:27:35 +08:00
parent 002b082faa
commit a7bd5e1309
4 changed files with 204 additions and 0 deletions

View File

@ -68,6 +68,8 @@ func main() {
if err := taskSvc.LoadPersistedTasks(); err != nil {
log.Printf("load persisted tasks: %v", err)
}
alarmCollector := service.NewAlarmCollector(store.DB(), agentClient, regSvc)
alarmCollector.Start()
tplSvc := service.NewTemplateService(cfg)
h := api.NewHandler(discoSvc, regSvc, agentClient, taskSvc, tplSvc)
@ -97,6 +99,7 @@ func main() {
ui.SetAuditRepo(auditRepo)
ui.SetDBPath(cfg.DBPathOrDefault())
ui.SetResourcesRepo(resourcesRepo)
ui.SetAlarmCollector(alarmCollector)
uiRouter, err := ui.Routes()
if err != nil {
log.Fatalf("failed to init ui routes: %v", err)

View File

@ -0,0 +1,179 @@
package service
import (
"database/sql"
"encoding/json"
"log"
"sync"
"time"
"3588AdminBackend/internal/models"
)
type AlarmRecord struct {
ID string `json:"id"`
DeviceID string `json:"device_id"`
Channel string `json:"channel"`
Timestamp string `json:"timestamp"`
RuleName string `json:"rule_name"`
RuleType string `json:"rule_type"`
ObjectLabel string `json:"object_label"`
Confidence float64 `json:"confidence"`
SnapshotURL string `json:"snapshot_url"`
ClipURL string `json:"clip_url"`
DurationMs int64 `json:"duration_ms"`
CollectedAt string `json:"collected_at"`
}
type AlarmCollector struct {
db *sql.DB
agent *AgentClient
registry *RegistryService
mu sync.Mutex
lastID string // last alarm ID seen, to avoid duplicates
}
func NewAlarmCollector(db *sql.DB, agent *AgentClient, registry *RegistryService) *AlarmCollector {
return &AlarmCollector{db: db, agent: agent, registry: registry}
}
func (c *AlarmCollector) Start() {
go c.poll()
}
func (c *AlarmCollector) poll() {
// Initial delay to let the system settle
time.Sleep(5 * time.Second)
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for range ticker.C {
c.collectFromDevices()
}
}
func (c *AlarmCollector) collectFromDevices() {
if c.agent == nil || c.registry == nil {
return
}
devices := c.registry.GetDevices()
for _, dev := range devices {
if dev == nil || !dev.Online {
continue
}
alarms, err := c.fetchDeviceAlarms(dev)
if err != nil {
continue
}
for _, alarm := range alarms {
alarm.DeviceID = dev.DeviceID
alarm.CollectedAt = time.Now().Format(time.RFC3339)
c.saveAlarm(alarm)
}
}
}
func (c *AlarmCollector) fetchDeviceAlarms(dev *models.Device) ([]AlarmRecord, error) {
c.mu.Lock()
lastID := c.lastID
c.mu.Unlock()
url := "/v1/alarms/recent?limit=100"
body, status, err := c.agent.Do("GET", dev.IP, dev.AgentPort, url, nil)
if err != nil {
return nil, err
}
if status != 200 {
return nil, nil // agent may not support alarms yet
}
var resp struct {
Alarms []struct {
ID string `json:"id"`
Timestamp string `json:"timestamp"`
Channel string `json:"channel"`
RuleName string `json:"rule_name"`
RuleType string `json:"rule_type"`
ObjectLabel string `json:"object_label"`
Confidence float64 `json:"confidence"`
SnapshotURL string `json:"snapshot_url"`
ClipURL string `json:"clip_url"`
DurationMs int64 `json:"duration_ms"`
} `json:"alarms"`
}
if err := json.Unmarshal(body, &resp); err != nil {
return nil, err
}
newLastID := lastID
alarms := make([]AlarmRecord, 0)
for _, a := range resp.Alarms {
if a.ID == lastID {
break // reached previously seen alarms
}
alarms = append(alarms, AlarmRecord{
ID: a.ID,
Timestamp: a.Timestamp,
Channel: a.Channel,
RuleName: a.RuleName,
RuleType: a.RuleType,
ObjectLabel: a.ObjectLabel,
Confidence: a.Confidence,
SnapshotURL: a.SnapshotURL,
ClipURL: a.ClipURL,
DurationMs: a.DurationMs,
})
if newLastID == "" {
newLastID = a.ID
}
}
if newLastID != "" {
c.mu.Lock()
c.lastID = newLastID
c.mu.Unlock()
}
return alarms, nil
}
func (c *AlarmCollector) saveAlarm(alarm AlarmRecord) {
if c.db == nil {
return
}
_, err := c.db.Exec(`
INSERT INTO alarm_records(id, device_id, channel, timestamp, rule_name, rule_type, object_label, confidence, snapshot_url, clip_url, duration_ms, collected_at)
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO NOTHING
`, alarm.ID, alarm.DeviceID, alarm.Channel, alarm.Timestamp, alarm.RuleName, alarm.RuleType, alarm.ObjectLabel, alarm.Confidence, alarm.SnapshotURL, alarm.ClipURL, alarm.DurationMs, alarm.CollectedAt)
if err != nil {
log.Printf("alarm collector: save error: %v", err)
}
}
// GetRecent returns the most recent N alarm records, newest first.
func (c *AlarmCollector) GetRecent(limit int) []AlarmRecord {
if c.db == nil || limit <= 0 {
return nil
}
rows, err := c.db.Query(`
SELECT id, device_id, channel, timestamp, rule_name, rule_type, object_label, confidence, snapshot_url, clip_url, duration_ms, collected_at
FROM alarm_records
ORDER BY timestamp DESC
LIMIT ?
`, limit)
if err != nil {
return nil
}
defer rows.Close()
var alarms []AlarmRecord
for rows.Next() {
var a AlarmRecord
if err := rows.Scan(&a.ID, &a.DeviceID, &a.Channel, &a.Timestamp, &a.RuleName, &a.RuleType, &a.ObjectLabel, &a.Confidence, &a.SnapshotURL, &a.ClipURL, &a.DurationMs, &a.CollectedAt); err != nil {
continue
}
alarms = append(alarms, a)
}
return alarms
}

View File

@ -77,6 +77,20 @@ CREATE TABLE IF NOT EXISTS standard_resources (
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS alarm_records (
id TEXT PRIMARY KEY,
device_id TEXT NOT NULL,
channel TEXT NOT NULL,
timestamp TEXT NOT NULL,
rule_name TEXT NOT NULL,
rule_type TEXT NOT NULL,
object_label TEXT NOT NULL DEFAULT '',
confidence REAL NOT NULL DEFAULT 0,
snapshot_url TEXT NOT NULL DEFAULT '',
clip_url TEXT NOT NULL DEFAULT '',
duration_ms INTEGER NOT NULL DEFAULT 0,
collected_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS scene_templates (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL UNIQUE,

View File

@ -33,6 +33,7 @@ type UI struct {
auditRepo *storage.AuditLogsRepo
dbPath string
resourcesRepo *storage.ResourcesRepo
alarmCollector *service.AlarmCollector
tpl *template.Template
}
@ -501,6 +502,13 @@ func (u *UI) SetResourcesRepo(repo *storage.ResourcesRepo) {
u.resourcesRepo = repo
}
func (u *UI) SetAlarmCollector(ac *service.AlarmCollector) {
if u == nil {
return
}
u.alarmCollector = ac
}
func tablerIconSVG(name string) string {
icons := map[string]string{
"devices": `<svg xmlns="http://www.w3.org/2000/svg" class="icon ui-icon" width="24" height="24" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="1.75" stroke-linecap="round" stroke-linejoin="round"><path stroke="none" d="M0 0h24v24H0z" fill="none"/><rect x="3" y="4" width="18" height="12" rx="1"/><path d="M7 20h10"/><path d="M9 16v4"/><path d="M15 16v4"/></svg>`,