3588AdminBackend/internal/service/alarm_collector.go

212 lines
5.4 KiB
Go

package service
import (
"database/sql"
"encoding/json"
"fmt"
"log"
"sync"
"time"
"3588AdminBackend/internal/models"
)
type AlarmRecord struct {
ID string `json:"id"`
DeviceID string `json:"device_id"`
Channel string `json:"channel"`
Timestamp string `json:"timestamp"`
RuleName string `json:"rule_name"`
RuleType string `json:"rule_type"`
ObjectLabel string `json:"object_label"`
Confidence float64 `json:"confidence"`
SnapshotURL string `json:"snapshot_url"`
ClipURL string `json:"clip_url"`
DurationMs int64 `json:"duration_ms"`
CollectedAt string `json:"collected_at"`
}
type AlarmCollector struct {
db *sql.DB
agent *AgentClient
registry *RegistryService
mu sync.Mutex
lastID string // last alarm ID seen, to avoid duplicates
}
func NewAlarmCollector(db *sql.DB, agent *AgentClient, registry *RegistryService) *AlarmCollector {
return &AlarmCollector{db: db, agent: agent, registry: registry}
}
func (c *AlarmCollector) Start() {
go c.poll()
}
func (c *AlarmCollector) poll() {
// Initial delay to let the system settle
time.Sleep(5 * time.Second)
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for range ticker.C {
c.collectFromDevices()
}
}
func (c *AlarmCollector) collectFromDevices() {
if c.agent == nil || c.registry == nil {
return
}
devices := c.registry.GetDevices()
for _, dev := range devices {
if dev == nil || !dev.Online {
continue
}
alarms, err := c.fetchDeviceAlarms(dev)
if err != nil {
continue
}
for _, alarm := range alarms {
alarm.DeviceID = dev.DeviceID
alarm.CollectedAt = time.Now().Format(time.RFC3339)
c.saveAlarm(alarm)
}
}
}
func (c *AlarmCollector) fetchDeviceAlarms(dev *models.Device) ([]AlarmRecord, error) {
c.mu.Lock()
lastID := c.lastID
c.mu.Unlock()
url := "/v1/alarms/recent?limit=100"
body, status, err := c.agent.Do("GET", dev.IP, dev.AgentPort, url, nil)
if err != nil {
return nil, err
}
if status != 200 {
return nil, nil // agent may not support alarms yet
}
var resp struct {
Alarms []map[string]any `json:"alarms"`
}
if err := json.Unmarshal(body, &resp); err != nil {
return nil, err
}
newLastID := lastID
alarms := make([]AlarmRecord, 0)
for _, a := range resp.Alarms {
id, _ := a["id"].(string)
if id == "" {
continue
}
if id == lastID {
break // reached previously seen alarms
}
// Extract fields from whatever format media-server sends
channel, _ := a["channel"].(string)
if channel == "" {
channel, _ = a["node_id"].(string) // media-server uses node_id
}
ruleName, _ := a["rule_name"].(string)
ruleType, _ := a["rule_type"].(string)
objectLabel, _ := a["object_label"].(string)
snapshotURL, _ := a["snapshot_url"].(string)
clipURL, _ := a["clip_url"].(string)
// Timestamp can be string (RFC3339) or number (unix millis)
var ts string
switch v := a["timestamp"].(type) {
case string:
ts = v
case float64:
ts = time.UnixMilli(int64(v)).Format(time.RFC3339)
}
confidence, _ := a["confidence"].(float64)
if confidence == 0 {
if score, ok := a["score"].(float64); ok {
confidence = score
}
}
// Extract from nested detections if top-level fields are empty
if dets, ok := a["detections"].([]any); ok && len(dets) > 0 {
if objectLabel == "" {
objectLabel = fmt.Sprintf("%d 个检测目标", len(dets))
}
if confidence == 0 {
if d0, ok := dets[0].(map[string]any); ok {
if s, ok := d0["score"].(float64); ok {
confidence = s
}
}
}
}
durationMs, _ := a["duration_ms"].(float64)
alarms = append(alarms, AlarmRecord{
ID: id,
Timestamp: ts,
Channel: channel,
RuleName: ruleName,
RuleType: ruleType,
ObjectLabel: objectLabel,
Confidence: confidence,
SnapshotURL: snapshotURL,
ClipURL: clipURL,
DurationMs: int64(durationMs),
})
if newLastID == "" {
newLastID = id
}
}
if newLastID != "" {
c.mu.Lock()
c.lastID = newLastID
c.mu.Unlock()
}
return alarms, nil
}
func (c *AlarmCollector) saveAlarm(alarm AlarmRecord) {
if c.db == nil {
return
}
_, err := c.db.Exec(`
INSERT INTO alarm_records(id, device_id, channel, timestamp, rule_name, rule_type, object_label, confidence, snapshot_url, clip_url, duration_ms, collected_at)
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO NOTHING
`, alarm.ID, alarm.DeviceID, alarm.Channel, alarm.Timestamp, alarm.RuleName, alarm.RuleType, alarm.ObjectLabel, alarm.Confidence, alarm.SnapshotURL, alarm.ClipURL, alarm.DurationMs, alarm.CollectedAt)
if err != nil {
log.Printf("alarm collector: save error: %v", err)
}
}
// GetRecent returns the most recent N alarm records, newest first.
func (c *AlarmCollector) GetRecent(limit int) []AlarmRecord {
if c.db == nil || limit <= 0 {
return nil
}
rows, err := c.db.Query(`
SELECT id, device_id, channel, timestamp, rule_name, rule_type, object_label, confidence, snapshot_url, clip_url, duration_ms, collected_at
FROM alarm_records
ORDER BY timestamp DESC
LIMIT ?
`, limit)
if err != nil {
return nil
}
defer rows.Close()
var alarms []AlarmRecord
for rows.Next() {
var a AlarmRecord
if err := rows.Scan(&a.ID, &a.DeviceID, &a.Channel, &a.Timestamp, &a.RuleName, &a.RuleType, &a.ObjectLabel, &a.Confidence, &a.SnapshotURL, &a.ClipURL, &a.DurationMs, &a.CollectedAt); err != nil {
continue
}
alarms = append(alarms, a)
}
return alarms
}