package watchdog import ( "context" "errors" "slices" "sync" "time" ) var ( ErrNotConfigured = errors.New("no checks configured") ErrNotRunning = errors.New("watchdog is not running") ) // Watchdog keeps checks to run either periodically // or on demand. type Watchdog struct { checks []*wdCheck mu sync.Mutex monitoring bool // is monitoring currently in progress events chan CheckResult // output channel limiter chan struct{} // TODO: use proper limiter here timeout time.Duration running int } type wdCheck struct { check Check stop chan struct{} } // New creates instance of Watchdog with // provided checks. func New(checks ...Check) *Watchdog { ch := make([]*wdCheck, len(checks)) for i := range checks { ch[i] = &wdCheck{ check: checks[i], } } w := &Watchdog{ checks: ch, } return w } func (w *Watchdog) ListChecks() []Check { w.mu.Lock() defer w.mu.Unlock() out := make([]Check, len(w.checks)) for i := range w.checks { out[i] = w.checks[i].check } return out } // SetTimeout sets timeout for all checks that // get started with Start method. Changing this // value does not affect running checks. // Watchdog does not enforce this timeout, it // just passes context.WithTimemout to check functions. // If this method is not called the default timeout // of 10 seconds is used. func (w *Watchdog) SetTimeout(d time.Duration) { w.mu.Lock() defer w.mu.Unlock() w.timeout = d } // AddChecks adds checks to the group. // If monitoring is in progress then monitoring it started for the newly added // check as well. // Check may have duplicate Name fields but note that RemoveChecks removes checks // by their Name fields. func (w *Watchdog) AddChecks(checks ...Check) { w.mu.Lock() defer w.mu.Unlock() for i := range checks { nc := &wdCheck{ check: checks[i], } w.checks = append(w.checks, nc) if w.monitoring { w.startMonitoring(nc) } } } // RemoveChecks removes the named checks. func (w *Watchdog) RemoveChecks(names ...string) { w.mu.Lock() defer w.mu.Unlock() remaining := make([]*wdCheck, 0, len(w.checks)-len(names)) for _, c := range w.checks { if slices.Contains(names, c.check.Name) { if w.monitoring { w.stopMonitoring(c) } continue } remaining = append(remaining, c) } w.checks = remaining } // Start starts monitoring. // Subsequent calls to start return the SAME channel. If you need // to have more that one reader from the channel - fan out on your side. // On start Watchdog runs all provided check and pushes current status of checks of // to the channel. // Subsequently if check func returns different status or if it returns an error the // result is pushed to the channel. // Concurrency argument limits the number of checks that can run concurrently. 0 means no // limit (all checks may run concurrently). func (w *Watchdog) Start(concurrency int) (<-chan CheckResult, error) { w.mu.Lock() defer w.mu.Unlock() if len(w.checks) == 0 { return nil, ErrNotConfigured } if w.monitoring { return w.events, nil } if concurrency == 0 { concurrency = len(w.checks) } if w.timeout == 0 { w.timeout = DefaultTimeout } w.events = make(chan CheckResult, concurrency) w.limiter = make(chan struct{}, concurrency) for i := range w.checks { w.startMonitoring(w.checks[i]) } w.monitoring = true return w.events, nil } // Stop stops execution of checks. // Subsequent calls return ErrNotRunning. func (w *Watchdog) Stop() error { w.mu.Lock() defer w.mu.Unlock() if !w.monitoring { return ErrNotRunning } for i := range w.checks { w.stopMonitoring(w.checks[i]) } return nil } // RunImmediately runs configured checks concurrently and returns results. // Setting concurrency to 0 means that all check of the group are allowed to run simultaneously. // Otherwise at most concurrency checks will be allowed to run simultaneously. func (w *Watchdog) RunImmediately(ctx context.Context, concurrency int) ([]CheckResult, error) { w.mu.Lock() if len(w.checks) == 0 { w.mu.Unlock() return nil, ErrNotConfigured } cp := w.copyChecks() w.mu.Unlock() // release if concurrency == 0 { concurrency = len(cp) } statuses := runChecksConcurrently(ctx, cp, concurrency) slices.SortFunc(statuses, func(a, b CheckResult) int { if a.Name < b.Name { return -1 } else if a.Name > b.Name { return 1 } return 0 }) return statuses, nil } func (w *Watchdog) copyChecks() []Check { cp := make([]Check, len(w.checks)) for i := range w.checks { cp[i] = w.checks[i].check } return cp } func (w *Watchdog) startMonitoring(wdc *wdCheck) { wdc.stop = make(chan struct{}) state := CheckResult{} ticker := time.Tick(wdc.check.Interval) c := wdc.check // this method is called only with // w.mu locked w.running++ go func() { defer func() { w.mu.Lock() defer w.mu.Unlock() w.running-- if w.running == 0 { // last goroutine to exit will also // close the output chan close(w.events) w.monitoring = false } }() for { w.limiter <- struct{}{} ctx, cancel := context.WithTimeout(context.Background(), w.timeout) defer cancel() status, err := c.Check(ctx) <-w.limiter s := CheckResult{ Name: c.Name, Status: status, Error: err, } if s.Status != state.Status || s.Error != nil { w.events <- s } state = s select { case <-ticker: // continue looping case <-wdc.stop: // stopping this specific check return } } }() } func (w *Watchdog) stopMonitoring(wdc *wdCheck) { close(wdc.stop) } func runChecksConcurrently(ctx context.Context, ch []Check, concurrency int) []CheckResult { statuses := make([]CheckResult, 0, len(ch)) m := sync.Mutex{} // for append operations sema := make(chan struct{}, concurrency) // semaphore to limit concurrency done := make(chan struct{}, len(ch)) count := len(ch) for _, e := range ch { sema <- struct{}{} // acquire go func() error { defer func() { <-sema done <- struct{}{} }() // relying on assumption that CheckFunc obeys context // cancellation status, err := e.Check(ctx) r := CheckResult{ Name: e.Name, Status: status, Error: err, } m.Lock() defer m.Unlock() statuses = append(statuses, r) return nil }() } // wait for all to finish for range done { count-- if count == 0 { close(done) } } return statuses }