This commit is contained in:
Dmitry Fedotov
2025-07-12 22:01:23 +03:00
commit 2f50eefbb2
6 changed files with 1309 additions and 0 deletions

245
watchdog.go Normal file
View File

@@ -0,0 +1,245 @@
package watchdog
import (
"context"
"errors"
"slices"
"sync"
"time"
"golang.org/x/sync/errgroup"
)
var (
ErrNotConfigured = errors.New("no checks configured")
ErrNotRunning = errors.New("watchdog is not running")
)
type Watchdog struct {
checks []Check
running *running
mu sync.Mutex
}
// NewWatchDog accepts settings for a number of groups to monitor.
// keys of a map are group names. values are slices of their respective checks.
func NewWatchDog(checks ...Check) *Watchdog {
w := &Watchdog{
checks: checks,
}
return w
}
type running struct {
out chan CheckResult
stop chan struct{}
}
func (w *Watchdog) ListChecks() ([]Check, error) {
w.mu.Lock()
defer w.mu.Unlock()
out := make([]Check, len(w.checks))
copy(out, w.checks)
return out, nil
}
// AddChecks adds checks to the group. This DOES NOT
// affect already runnning monitoring for group. Use Stop and
// then Start to restart monitoring when a new check is added.
// Check may have duplicate Name fields but note that RemoveChecks removes checks
// by their Name fields.
func (w *Watchdog) AddChecks(checks ...Check) error {
w.mu.Lock()
defer w.mu.Unlock()
w.checks = append(w.checks, checks...)
return nil
}
// RemoveChecks removes the named checks.
// This does not affect the already running monitoring for the group.
func (w *Watchdog) RemoveChecks(names ...string) error {
w.mu.Lock()
defer w.mu.Unlock()
remaining := make([]Check, 0, len(w.checks)-len(names))
for _, e := range w.checks {
if slices.Contains(names, e.Name) {
continue
}
remaining = append(remaining, e)
}
w.checks = remaining
return nil
}
// Start starts monitoring.
// Subsequent calls to start just returns the SAME channel. If you need
// to have more that one reader from the channel - fan out on your side.
// On start Watchdog runs all provided check and pushes current status of ALL checks of
// the group to the channel.
// Subsequently only changes of status are pushed regardless of return error values of
// CheckFunc provided to Watchdog.
func (w *Watchdog) Start(ctx context.Context) (<-chan CheckResult, error) {
w.mu.Lock()
defer w.mu.Unlock()
if w.running != nil {
return w.running.out, nil
}
// start new session
if len(w.checks) == 0 {
return nil, ErrNotConfigured
}
r := runMonitoringForGroup(ctx, w.checks)
w.running = r
return r.out, nil
}
// Stop stops execution of checks.
// Subsequent calls of Stop for the same groupID
// return ErrNoSuchGroup.
func (w *Watchdog) Stop() error {
w.mu.Lock()
defer w.mu.Unlock()
if w.running == nil {
return ErrNotRunning
}
close(w.running.stop)
w.running = nil
return nil
}
// RunImmediately runs configured checks for group concurrently and returns results.
// Concurrency limits number of checks that are allowed run concurrently. Setting
// concurrency to 0 means that all check of the group are allowed to run simultaneously.
func (w *Watchdog) RunImmediately(ctx context.Context, concurrency int) ([]CheckResult, error) {
w.mu.Lock()
if len(w.checks) == 0 {
w.mu.Unlock()
return nil, ErrNotConfigured
}
// making a copy here because mutex should not be
// held while checks are running
cp := make([]Check, len(w.checks))
copy(cp, w.checks)
w.mu.Unlock() // release
if concurrency == 0 {
concurrency = len(cp)
}
statuses := runChecksConcurrently(ctx, cp, concurrency)
slices.SortFunc(statuses, func(a, b CheckResult) int {
if a.Name < b.Name {
return -1
} else if a.Name > b.Name {
return 1
}
return 0
})
return statuses, nil
}
func runMonitoringForGroup(ctx context.Context, ch []Check) *running {
events := make(chan CheckResult)
stop := make(chan struct{})
grp := errgroup.Group{}
for _, c := range ch {
grp.Go(func() error {
state := CheckResult{}
ticker := time.Tick(c.Interval)
for {
status, err := c.Check(ctx)
s := CheckResult{
Name: c.Name,
Status: status,
Error: err,
}
if s.Status != state.Status {
events <- s
}
state = s
select {
case <-ticker:
// continue looping
case <-stop:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
})
}
// separate goroutine to close the output chan
// when everyone's dead
go func() {
grp.Wait()
close(events)
}()
return &running{out: events, stop: stop}
}
func runChecksConcurrently(ctx context.Context, ch []Check, concurrency int) []CheckResult {
statuses := make([]CheckResult, 0, len(ch))
m := sync.Mutex{} // for append operations
group := errgroup.Group{}
sema := make(chan struct{}, concurrency) // semaphore to limit concurrency
for _, e := range ch {
sema <- struct{}{} // acquire
group.Go(func() error {
defer func() {
}()
status, err := e.Check(ctx)
r := CheckResult{
Name: e.Name,
Status: status,
Error: err,
}
m.Lock()
defer m.Unlock()
statuses = append(statuses, r)
<-sema // release
return nil
})
}
group.Wait()
return statuses
}