212 lines
5.4 KiB
Go
212 lines
5.4 KiB
Go
package service
|
|
|
|
import (
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
|
|
"3588AdminBackend/internal/models"
|
|
)
|
|
|
|
type AlarmRecord struct {
|
|
ID string `json:"id"`
|
|
DeviceID string `json:"device_id"`
|
|
Channel string `json:"channel"`
|
|
Timestamp string `json:"timestamp"`
|
|
RuleName string `json:"rule_name"`
|
|
RuleType string `json:"rule_type"`
|
|
ObjectLabel string `json:"object_label"`
|
|
Confidence float64 `json:"confidence"`
|
|
SnapshotURL string `json:"snapshot_url"`
|
|
ClipURL string `json:"clip_url"`
|
|
DurationMs int64 `json:"duration_ms"`
|
|
CollectedAt string `json:"collected_at"`
|
|
}
|
|
|
|
type AlarmCollector struct {
|
|
db *sql.DB
|
|
agent *AgentClient
|
|
registry *RegistryService
|
|
mu sync.Mutex
|
|
lastID string // last alarm ID seen, to avoid duplicates
|
|
}
|
|
|
|
func NewAlarmCollector(db *sql.DB, agent *AgentClient, registry *RegistryService) *AlarmCollector {
|
|
return &AlarmCollector{db: db, agent: agent, registry: registry}
|
|
}
|
|
|
|
func (c *AlarmCollector) Start() {
|
|
go c.poll()
|
|
}
|
|
|
|
func (c *AlarmCollector) poll() {
|
|
// Initial delay to let the system settle
|
|
time.Sleep(5 * time.Second)
|
|
|
|
ticker := time.NewTicker(10 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for range ticker.C {
|
|
c.collectFromDevices()
|
|
}
|
|
}
|
|
|
|
func (c *AlarmCollector) collectFromDevices() {
|
|
if c.agent == nil || c.registry == nil {
|
|
return
|
|
}
|
|
devices := c.registry.GetDevices()
|
|
for _, dev := range devices {
|
|
if dev == nil || !dev.Online {
|
|
continue
|
|
}
|
|
alarms, err := c.fetchDeviceAlarms(dev)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
for _, alarm := range alarms {
|
|
alarm.DeviceID = dev.DeviceID
|
|
alarm.CollectedAt = time.Now().Format(time.RFC3339)
|
|
c.saveAlarm(alarm)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *AlarmCollector) fetchDeviceAlarms(dev *models.Device) ([]AlarmRecord, error) {
|
|
c.mu.Lock()
|
|
lastID := c.lastID
|
|
c.mu.Unlock()
|
|
|
|
url := "/v1/alarms/recent?limit=100"
|
|
body, status, err := c.agent.Do("GET", dev.IP, dev.AgentPort, url, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if status != 200 {
|
|
return nil, nil // agent may not support alarms yet
|
|
}
|
|
|
|
var resp struct {
|
|
Alarms []map[string]any `json:"alarms"`
|
|
}
|
|
if err := json.Unmarshal(body, &resp); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
newLastID := lastID
|
|
alarms := make([]AlarmRecord, 0)
|
|
for _, a := range resp.Alarms {
|
|
id, _ := a["id"].(string)
|
|
if id == "" {
|
|
continue
|
|
}
|
|
if id == lastID {
|
|
break // reached previously seen alarms
|
|
}
|
|
// Extract fields from whatever format media-server sends
|
|
channel, _ := a["channel"].(string)
|
|
if channel == "" {
|
|
channel, _ = a["node_id"].(string) // media-server uses node_id
|
|
}
|
|
ruleName, _ := a["rule_name"].(string)
|
|
ruleType, _ := a["rule_type"].(string)
|
|
objectLabel, _ := a["object_label"].(string)
|
|
snapshotURL, _ := a["snapshot_url"].(string)
|
|
clipURL, _ := a["clip_url"].(string)
|
|
// Timestamp can be string (RFC3339) or number (unix millis)
|
|
var ts string
|
|
switch v := a["timestamp"].(type) {
|
|
case string:
|
|
ts = v
|
|
case float64:
|
|
ts = time.UnixMilli(int64(v)).Format(time.RFC3339)
|
|
}
|
|
confidence, _ := a["confidence"].(float64)
|
|
if confidence == 0 {
|
|
if score, ok := a["score"].(float64); ok {
|
|
confidence = score
|
|
}
|
|
}
|
|
// Extract from nested detections if top-level fields are empty
|
|
if dets, ok := a["detections"].([]any); ok && len(dets) > 0 {
|
|
if objectLabel == "" {
|
|
objectLabel = fmt.Sprintf("%d 个检测目标", len(dets))
|
|
}
|
|
if confidence == 0 {
|
|
if d0, ok := dets[0].(map[string]any); ok {
|
|
if s, ok := d0["score"].(float64); ok {
|
|
confidence = s
|
|
}
|
|
}
|
|
}
|
|
}
|
|
durationMs, _ := a["duration_ms"].(float64)
|
|
alarms = append(alarms, AlarmRecord{
|
|
ID: id,
|
|
Timestamp: ts,
|
|
Channel: channel,
|
|
RuleName: ruleName,
|
|
RuleType: ruleType,
|
|
ObjectLabel: objectLabel,
|
|
Confidence: confidence,
|
|
SnapshotURL: snapshotURL,
|
|
ClipURL: clipURL,
|
|
DurationMs: int64(durationMs),
|
|
})
|
|
if newLastID == "" {
|
|
newLastID = id
|
|
}
|
|
}
|
|
|
|
if newLastID != "" {
|
|
c.mu.Lock()
|
|
c.lastID = newLastID
|
|
c.mu.Unlock()
|
|
}
|
|
return alarms, nil
|
|
}
|
|
|
|
func (c *AlarmCollector) saveAlarm(alarm AlarmRecord) {
|
|
if c.db == nil {
|
|
return
|
|
}
|
|
_, err := c.db.Exec(`
|
|
INSERT INTO alarm_records(id, device_id, channel, timestamp, rule_name, rule_type, object_label, confidence, snapshot_url, clip_url, duration_ms, collected_at)
|
|
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(id) DO NOTHING
|
|
`, alarm.ID, alarm.DeviceID, alarm.Channel, alarm.Timestamp, alarm.RuleName, alarm.RuleType, alarm.ObjectLabel, alarm.Confidence, alarm.SnapshotURL, alarm.ClipURL, alarm.DurationMs, alarm.CollectedAt)
|
|
if err != nil {
|
|
log.Printf("alarm collector: save error: %v", err)
|
|
}
|
|
}
|
|
|
|
// GetRecent returns the most recent N alarm records, newest first.
|
|
func (c *AlarmCollector) GetRecent(limit int) []AlarmRecord {
|
|
if c.db == nil || limit <= 0 {
|
|
return nil
|
|
}
|
|
rows, err := c.db.Query(`
|
|
SELECT id, device_id, channel, timestamp, rule_name, rule_type, object_label, confidence, snapshot_url, clip_url, duration_ms, collected_at
|
|
FROM alarm_records
|
|
ORDER BY timestamp DESC
|
|
LIMIT ?
|
|
`, limit)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
defer rows.Close()
|
|
|
|
var alarms []AlarmRecord
|
|
for rows.Next() {
|
|
var a AlarmRecord
|
|
if err := rows.Scan(&a.ID, &a.DeviceID, &a.Channel, &a.Timestamp, &a.RuleName, &a.RuleType, &a.ObjectLabel, &a.Confidence, &a.SnapshotURL, &a.ClipURL, &a.DurationMs, &a.CollectedAt); err != nil {
|
|
continue
|
|
}
|
|
alarms = append(alarms, a)
|
|
}
|
|
return alarms
|
|
}
|