package watchdog import ( "context" "errors" "slices" "sync" "time" "golang.org/x/sync/errgroup" ) var ( ErrNotConfigured = errors.New("no checks configured") ErrNotRunning = errors.New("watchdog is not running") ) // Watchdog keeps checks to run either periodically // or on demand. type Watchdog struct { checks []Check running *running mu sync.Mutex } type running struct { out chan CheckResult stop chan struct{} } // New accepts checks to run. func New(checks ...Check) *Watchdog { w := &Watchdog{ checks: checks, } return w } func (w *Watchdog) ListChecks() []Check { w.mu.Lock() defer w.mu.Unlock() out := make([]Check, len(w.checks)) copy(out, w.checks) return out } // AddChecks adds checks to the group. This DOES NOT // affect already runnning monitoring for group. Use Stop and // then Start to restart monitoring when a new check is added. // Check may have duplicate Name fields but note that RemoveChecks removes checks // by their Name fields. func (w *Watchdog) AddChecks(checks ...Check) { w.mu.Lock() defer w.mu.Unlock() w.checks = append(w.checks, checks...) } // RemoveChecks removes the named checks. // This does not affect the already running monitoring for the group. func (w *Watchdog) RemoveChecks(names ...string) { w.mu.Lock() defer w.mu.Unlock() remaining := make([]Check, 0, len(w.checks)-len(names)) for _, e := range w.checks { if slices.Contains(names, e.Name) { continue } remaining = append(remaining, e) } w.checks = remaining } // Start starts monitoring. // Subsequent calls to start just returns the SAME channel. If you need // to have more that one reader from the channel - fan out on your side. // On start Watchdog runs all provided check and pushes current status of ALL checks of // the group to the channel. // Subsequently only changes of status are pushed regardless of return error values of // CheckFunc provided to Watchdog. // Concurrency limits the number of checks that can run concurrently. 0 means no // limit (all checks may run concurrently). func (w *Watchdog) Start(ctx context.Context, concurrency int) (<-chan CheckResult, error) { w.mu.Lock() defer w.mu.Unlock() if w.running != nil { return w.running.out, nil } // start new session if len(w.checks) == 0 { return nil, ErrNotConfigured } cp := w.copyChecks() if concurrency == 0 { concurrency = len(cp) } w.runMonitoringForGroup(ctx, cp, concurrency) return w.running.out, nil } // Stop stops execution of checks. // Subsequent calls of Stop for the same groupID // return ErrNotRunning. func (w *Watchdog) Stop() error { w.mu.Lock() defer w.mu.Unlock() if w.running == nil { return ErrNotRunning } close(w.running.stop) w.running = nil return nil } // RunImmediately runs configured checks for group concurrently and returns results. // Concurrency limits number of checks that are allowed run concurrently. Setting // concurrency to 0 means that all check of the group are allowed to run simultaneously. func (w *Watchdog) RunImmediately(ctx context.Context, concurrency int) ([]CheckResult, error) { w.mu.Lock() if len(w.checks) == 0 { w.mu.Unlock() return nil, ErrNotConfigured } cp := w.copyChecks() w.mu.Unlock() // release if concurrency == 0 { concurrency = len(cp) } statuses := runChecksConcurrently(ctx, cp, concurrency) slices.SortFunc(statuses, func(a, b CheckResult) int { if a.Name < b.Name { return -1 } else if a.Name > b.Name { return 1 } return 0 }) return statuses, nil } func (w *Watchdog) copyChecks() []Check { cp := make([]Check, len(w.checks)) copy(cp, w.checks) return cp } func (w *Watchdog) runMonitoringForGroup(ctx context.Context, checks []Check, concurrency int) { events := make(chan CheckResult, len(checks)) stop := make(chan struct{}) w.running = &running{out: events, stop: stop} grp := errgroup.Group{} sema := make(chan struct{}, concurrency) for _, c := range checks { grp.Go(func() error { state := CheckResult{} ticker := time.Tick(c.Interval) for { sema <- struct{}{} status, err := c.Check(ctx) <-sema s := CheckResult{ Name: c.Name, Status: status, Error: err, } if s.Status != state.Status { events <- s } state = s select { case <-ticker: // continue looping case <-stop: return nil case <-ctx.Done(): return ctx.Err() } } }) } // separate goroutine to close the output chan // when everyone's dead go func() { grp.Wait() close(events) w.mu.Lock() defer w.mu.Unlock() if w.running != nil { w.running = nil } }() } func runChecksConcurrently(ctx context.Context, ch []Check, concurrency int) []CheckResult { statuses := make([]CheckResult, 0, len(ch)) m := sync.Mutex{} // for append operations group := errgroup.Group{} sema := make(chan struct{}, concurrency) // semaphore to limit concurrency for _, e := range ch { sema <- struct{}{} // acquire group.Go(func() error { defer func() { }() // relying on fact that CheckFunc obeys context // cancellation status, err := e.Check(ctx) r := CheckResult{ Name: e.Name, Status: status, Error: err, } m.Lock() defer m.Unlock() statuses = append(statuses, r) <-sema // release return nil }) } group.Wait() return statuses }