package watchdog import ( "context" "errors" "slices" "sync" "time" ) var ( ErrNotConfigured = errors.New("no checks configured") ErrNotRunning = errors.New("watchdog is not running") ) // Watchdog keeps checks to run either periodically // or on demand. type Watchdog struct { checks map[string]*wdCheck mu sync.Mutex events chan CheckResult // output channel limiter chan struct{} // TODO: use proper limiter here timeout time.Duration // timeout for checks to complete monitoring bool // is monitoring currently in progress running int // number of active checks monitored } type wdCheck struct { check Check stop chan struct{} } // New creates instance of Watchdog with // provided checks. func New(checks ...Check) *Watchdog { w := Watchdog{ checks: make(map[string]*wdCheck), } for _, c := range checks { nc := &wdCheck{ check: c, } w.checks[c.Name] = nc } return &w } func (w *Watchdog) ListChecks() []Check { w.mu.Lock() defer w.mu.Unlock() out := w.copyChecks() return out } // SetTimeout sets timeout for all checks that // get started with Start method. Changing this // value does not affect running checks. // Watchdog does not enforce this timeout, it // just passes context.WithTimemout to check functions. // If this method is not called the default timeout // of 10 seconds is used. func (w *Watchdog) SetTimeout(d time.Duration) { w.mu.Lock() defer w.mu.Unlock() w.timeout = d } // AddChecks adds checks to the group. // If monitoring is in progress then monitoring it started for the newly added // check as well. // Check may have not have duplicate Name fields. New check with the same // hame overwrites the previous one. func (w *Watchdog) AddChecks(checks ...Check) { w.mu.Lock() defer w.mu.Unlock() if w.checks == nil { w.checks = make(map[string]*wdCheck) } for _, c := range checks { nc := &wdCheck{ check: c, } old, haveOld := w.checks[c.Name] w.checks[c.Name] = nc if w.monitoring { w.startMonitoring(nc) if haveOld { w.stopMonitoring(old) } } } } // RemoveChecks removes the named checks. func (w *Watchdog) RemoveChecks(names ...string) { w.mu.Lock() defer w.mu.Unlock() for _, name := range names { c, ok := w.checks[name] if !ok { continue } if w.monitoring { w.stopMonitoring(c) } delete(w.checks, name) } } // Start starts monitoring. // Subsequent calls to start return the SAME channel. If you need // to have more that one reader from the channel - fan out on your side. // On start Watchdog runs all provided check and pushes current status of checks of // to the channel. // Subsequently if check func returns different status or if it returns an error the // result is pushed to the channel. // Concurrency argument limits the number of checks that can run concurrently. 0 means no // limit (all checks may run concurrently). func (w *Watchdog) Start(concurrency int) (<-chan CheckResult, error) { w.mu.Lock() defer w.mu.Unlock() if len(w.checks) == 0 { return nil, ErrNotConfigured } if w.monitoring { return w.events, nil } if concurrency == 0 { concurrency = len(w.checks) } if w.timeout == 0 { w.timeout = DefaultTimeout } w.events = make(chan CheckResult, concurrency) w.limiter = make(chan struct{}, concurrency) for _, c := range w.checks { w.startMonitoring(c) } return w.events, nil } // Stop stops execution of checks. // Subsequent calls return ErrNotRunning. func (w *Watchdog) Stop() error { w.mu.Lock() defer w.mu.Unlock() if !w.monitoring { return ErrNotRunning } for _, c := range w.checks { w.stopMonitoring(c) } return nil } // RunImmediately runs configured checks concurrently and returns results. // Setting concurrency to 0 means that all check of the group are allowed to run simultaneously. // Otherwise at most concurrency checks will be allowed to run simultaneously. func (w *Watchdog) RunImmediately(ctx context.Context, concurrency int) ([]CheckResult, error) { w.mu.Lock() if len(w.checks) == 0 { w.mu.Unlock() return nil, ErrNotConfigured } cp := w.copyChecks() w.mu.Unlock() if concurrency == 0 { concurrency = len(cp) } statuses := runChecksConcurrently(ctx, cp, concurrency) slices.SortFunc(statuses, func(a, b CheckResult) int { if a.Name < b.Name { return -1 } else if a.Name > b.Name { return 1 } return 0 }) return statuses, nil } func (w *Watchdog) copyChecks() []Check { cp := make([]Check, 0, len(w.checks)) for _, v := range w.checks { cp = append(cp, v.check) } return cp } func (w *Watchdog) startMonitoring(wdc *wdCheck) { wdc.stop = make(chan struct{}) c := wdc.check if !w.monitoring { w.monitoring = true } w.running++ go func() { var curr error = nil ticker := time.Tick(wdc.check.Interval) for { w.limiter <- struct{}{} ctx, cancel := context.WithTimeout(context.Background(), w.timeout) defer cancel() err := c.Check(ctx) <-w.limiter r := CheckResult{ Name: c.Name, Error: err, } // if status changed or we've got an error // then report this if !errors.Is(r.Error, curr) { w.events <- r } curr = r.Error select { case <-ticker: // continue looping case <-wdc.stop: // stopping this specific check return } } }() } func (w *Watchdog) stopMonitoring(wdc *wdCheck) { close(wdc.stop) w.running-- if w.running == 0 { w.monitoring = false close(w.events) } } func runChecksConcurrently(ctx context.Context, ch []Check, concurrency int) []CheckResult { sema := make(chan struct{}, concurrency) // semaphore to limit concurrency done := make(chan CheckResult, len(ch)) wg := new(sync.WaitGroup) wg.Add(len(ch)) for _, e := range ch { go func() { sema <- struct{}{} // acquire defer func() { <-sema // release wg.Done() }() // relying on assumption that CheckFunc obeys context // cancellation err := e.Check(ctx) r := CheckResult{ Name: e.Name, Error: err, } done <- r }() } go func() { wg.Wait() close(done) }() results := make([]CheckResult, 0, len(ch)) // collect results for r := range done { results = append(results, r) } return results }