199 lines
4.3 KiB
Go
199 lines
4.3 KiB
Go
package service
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"sync"
|
|
|
|
"3588AdminBackend/internal/config"
|
|
"3588AdminBackend/internal/models"
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
type TaskService struct {
|
|
cfg *config.Config
|
|
agent *AgentClient
|
|
registry *RegistryService
|
|
tasks map[string]*models.Task
|
|
mu sync.RWMutex
|
|
listeners map[string][]chan *models.DeviceTaskStatus
|
|
lmu sync.RWMutex
|
|
}
|
|
|
|
func NewTaskService(cfg *config.Config, agent *AgentClient, registry *RegistryService) *TaskService {
|
|
return &TaskService{
|
|
cfg: cfg,
|
|
agent: agent,
|
|
registry: registry,
|
|
tasks: make(map[string]*models.Task),
|
|
listeners: make(map[string][]chan *models.DeviceTaskStatus),
|
|
}
|
|
}
|
|
|
|
func (s *TaskService) ListTasks() []models.Task {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
items := make([]models.Task, 0, len(s.tasks))
|
|
for _, t := range s.tasks {
|
|
t.Mu.RLock()
|
|
|
|
snap := models.Task{
|
|
ID: t.ID,
|
|
Type: t.Type,
|
|
DeviceIDs: append([]string(nil), t.DeviceIDs...),
|
|
Payload: t.Payload,
|
|
Status: t.Status,
|
|
Devices: make(map[string]*models.DeviceTaskStatus, len(t.Devices)),
|
|
}
|
|
for did, ds := range t.Devices {
|
|
snap.Devices[did] = &models.DeviceTaskStatus{
|
|
DeviceID: ds.DeviceID,
|
|
Status: ds.Status,
|
|
Progress: ds.Progress,
|
|
Error: ds.Error,
|
|
}
|
|
}
|
|
|
|
t.Mu.RUnlock()
|
|
items = append(items, snap)
|
|
}
|
|
|
|
return items
|
|
}
|
|
|
|
func (s *TaskService) CreateTask(tType string, deviceIDs []string, payload interface{}) (*models.Task, error) {
|
|
id := uuid.New().String()
|
|
task := models.NewTask(id, tType, deviceIDs, payload)
|
|
|
|
s.mu.Lock()
|
|
s.tasks[id] = task
|
|
s.mu.Unlock()
|
|
|
|
go s.runTask(task)
|
|
return task, nil
|
|
}
|
|
|
|
func (s *TaskService) runTask(task *models.Task) {
|
|
task.Mu.Lock()
|
|
task.Status = models.TaskRunning
|
|
task.Mu.Unlock()
|
|
|
|
// Concurrency control
|
|
sem := make(chan struct{}, s.cfg.Concurrency)
|
|
var wg sync.WaitGroup
|
|
|
|
for _, did := range task.DeviceIDs {
|
|
wg.Add(1)
|
|
go func(did string) {
|
|
defer wg.Done()
|
|
sem <- struct{}{}
|
|
defer func() { <-sem }()
|
|
|
|
s.executeOnDevice(task, did)
|
|
}(did)
|
|
}
|
|
|
|
wg.Wait()
|
|
task.Mu.Lock()
|
|
task.Status = models.TaskSuccess // Simple logic: overall success if finished
|
|
task.Mu.Unlock()
|
|
}
|
|
|
|
func (s *TaskService) executeOnDevice(task *models.Task, did string) {
|
|
s.updateDeviceStatus(task.ID, did, models.TaskRunning, 0, "")
|
|
|
|
// Find device
|
|
devs := s.registry.GetDevices()
|
|
var dev *models.Device
|
|
for _, d := range devs {
|
|
if d.DeviceID == did {
|
|
dev = d
|
|
break
|
|
}
|
|
}
|
|
|
|
if dev == nil {
|
|
s.updateDeviceStatus(task.ID, did, models.TaskFailed, 0, "device not found")
|
|
return
|
|
}
|
|
|
|
if !dev.Online {
|
|
s.updateDeviceStatus(task.ID, did, models.TaskFailed, 0, "device offline")
|
|
return
|
|
}
|
|
|
|
// For now, only config_apply is implemented in PRD
|
|
if task.Type == "config_apply" {
|
|
body, _ := json.Marshal(task.Payload)
|
|
_, code, err := s.agent.Do("PUT", dev.IP, dev.AgentPort, "/v1/config", body)
|
|
if err != nil {
|
|
s.updateDeviceStatus(task.ID, did, models.TaskFailed, 0, err.Error())
|
|
} else if code >= 400 {
|
|
s.updateDeviceStatus(task.ID, did, models.TaskFailed, 0, fmt.Sprintf("agent error: %d", code))
|
|
} else {
|
|
s.updateDeviceStatus(task.ID, did, models.TaskSuccess, 1.0, "")
|
|
}
|
|
} else {
|
|
s.updateDeviceStatus(task.ID, did, models.TaskFailed, 0, "unsupported task type")
|
|
}
|
|
}
|
|
|
|
func (s *TaskService) updateDeviceStatus(taskID, did string, status models.TaskStatus, progress float64, errStr string) {
|
|
s.mu.RLock()
|
|
task, ok := s.tasks[taskID]
|
|
s.mu.RUnlock()
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
task.Mu.Lock()
|
|
ds, ok := task.Devices[did]
|
|
if ok {
|
|
ds.Status = status
|
|
ds.Progress = progress
|
|
ds.Error = errStr
|
|
}
|
|
task.Mu.Unlock()
|
|
|
|
// Notify listeners
|
|
s.lmu.RLock()
|
|
channels := s.listeners[taskID]
|
|
s.lmu.RUnlock()
|
|
|
|
update := &models.DeviceTaskStatus{
|
|
DeviceID: did,
|
|
Status: status,
|
|
Progress: progress,
|
|
Error: errStr,
|
|
}
|
|
|
|
for _, ch := range channels {
|
|
select {
|
|
case ch <- update:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *TaskService) Subscribe(taskID string) (chan *models.DeviceTaskStatus, func()) {
|
|
ch := make(chan *models.DeviceTaskStatus, 10)
|
|
s.lmu.Lock()
|
|
s.listeners[taskID] = append(s.listeners[taskID], ch)
|
|
s.lmu.Unlock()
|
|
|
|
cleanup := func() {
|
|
s.lmu.Lock()
|
|
list := s.listeners[taskID]
|
|
for i, c := range list {
|
|
if c == ch {
|
|
s.listeners[taskID] = append(list[:i], list[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
s.lmu.Unlock()
|
|
}
|
|
|
|
return ch, cleanup
|
|
}
|