Reviewed-on: #1 Co-authored-by: Dmitry Fedotov <dmitry@uint32.ru> Co-committed-by: Dmitry Fedotov <dmitry@uint32.ru>
325 lines
6.1 KiB
Go
325 lines
6.1 KiB
Go
package watchdog
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"slices"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
var (
|
|
ErrNotConfigured = errors.New("no checks configured")
|
|
ErrNotRunning = errors.New("watchdog is not running")
|
|
)
|
|
|
|
// Watchdog keeps checks to run either periodically
|
|
// or on demand.
|
|
type Watchdog struct {
|
|
checks map[string]*wdCheck
|
|
mu sync.Mutex
|
|
|
|
events chan CheckResult // output channel
|
|
limiter chan struct{} // TODO: use proper limiter here
|
|
|
|
timeout time.Duration // timeout for checks to complete
|
|
|
|
monitoring bool // is monitoring currently in progress
|
|
running int // number of active checks monitored
|
|
}
|
|
|
|
type wdCheck struct {
|
|
check Check
|
|
stop chan struct{}
|
|
}
|
|
|
|
// New creates instance of Watchdog with
|
|
// provided checks.
|
|
func New(checks ...Check) *Watchdog {
|
|
w := Watchdog{
|
|
checks: make(map[string]*wdCheck),
|
|
}
|
|
|
|
for _, c := range checks {
|
|
nc := &wdCheck{
|
|
check: c,
|
|
}
|
|
|
|
w.checks[c.Name] = nc
|
|
}
|
|
|
|
return &w
|
|
}
|
|
|
|
func (w *Watchdog) ListChecks() []Check {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
out := w.copyChecks()
|
|
|
|
return out
|
|
}
|
|
|
|
// SetTimeout sets timeout for all checks that
|
|
// get started with Start method. Changing this
|
|
// value does not affect running checks.
|
|
// Watchdog does not enforce this timeout, it
|
|
// just passes context.WithTimemout to check functions.
|
|
// If this method is not called the default timeout
|
|
// of 10 seconds is used.
|
|
func (w *Watchdog) SetTimeout(d time.Duration) {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
w.timeout = d
|
|
}
|
|
|
|
// AddChecks adds checks to the group.
|
|
// If monitoring is in progress then monitoring it started for the newly added
|
|
// check as well.
|
|
// Check may have not have duplicate Name fields. New check with the same
|
|
// hame overwrites the previous one.
|
|
func (w *Watchdog) AddChecks(checks ...Check) {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
if w.checks == nil {
|
|
w.checks = make(map[string]*wdCheck)
|
|
}
|
|
|
|
for _, c := range checks {
|
|
nc := &wdCheck{
|
|
check: c,
|
|
}
|
|
|
|
old, haveOld := w.checks[c.Name]
|
|
|
|
w.checks[c.Name] = nc
|
|
|
|
if w.monitoring {
|
|
w.startMonitoring(nc)
|
|
|
|
if haveOld {
|
|
w.stopMonitoring(old)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// RemoveChecks removes the named checks.
|
|
func (w *Watchdog) RemoveChecks(names ...string) {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
for _, name := range names {
|
|
c, ok := w.checks[name]
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
if w.monitoring {
|
|
w.stopMonitoring(c)
|
|
}
|
|
|
|
delete(w.checks, name)
|
|
}
|
|
}
|
|
|
|
// Start starts monitoring.
|
|
// Subsequent calls to start return the SAME channel. If you need
|
|
// to have more that one reader from the channel - fan out on your side.
|
|
// On start Watchdog runs all provided check and pushes current status of checks of
|
|
// to the channel.
|
|
// Subsequently if check func returns different status or if it returns an error the
|
|
// result is pushed to the channel.
|
|
// Concurrency argument limits the number of checks that can run concurrently. 0 means no
|
|
// limit (all checks may run concurrently).
|
|
func (w *Watchdog) Start(concurrency int) (<-chan CheckResult, error) {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
if len(w.checks) == 0 {
|
|
return nil, ErrNotConfigured
|
|
}
|
|
|
|
if w.monitoring {
|
|
return w.events, nil
|
|
}
|
|
|
|
if concurrency == 0 {
|
|
concurrency = len(w.checks)
|
|
}
|
|
|
|
if w.timeout == 0 {
|
|
w.timeout = DefaultTimeout
|
|
}
|
|
|
|
w.events = make(chan CheckResult, concurrency)
|
|
w.limiter = make(chan struct{}, concurrency)
|
|
|
|
for _, c := range w.checks {
|
|
w.startMonitoring(c)
|
|
}
|
|
|
|
return w.events, nil
|
|
}
|
|
|
|
// Stop stops execution of checks.
|
|
// Subsequent calls return ErrNotRunning.
|
|
func (w *Watchdog) Stop() error {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
if !w.monitoring {
|
|
return ErrNotRunning
|
|
}
|
|
|
|
for _, c := range w.checks {
|
|
w.stopMonitoring(c)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// RunImmediately runs configured checks concurrently and returns results.
|
|
// Setting concurrency to 0 means that all check of the group are allowed to run simultaneously.
|
|
// Otherwise at most concurrency checks will be allowed to run simultaneously.
|
|
func (w *Watchdog) RunImmediately(ctx context.Context, concurrency int) ([]CheckResult, error) {
|
|
w.mu.Lock()
|
|
|
|
if len(w.checks) == 0 {
|
|
w.mu.Unlock()
|
|
return nil, ErrNotConfigured
|
|
}
|
|
|
|
cp := w.copyChecks()
|
|
w.mu.Unlock()
|
|
|
|
if concurrency == 0 {
|
|
concurrency = len(cp)
|
|
}
|
|
|
|
statuses := runChecksConcurrently(ctx, cp, concurrency)
|
|
|
|
slices.SortFunc(statuses, func(a, b CheckResult) int {
|
|
if a.Name < b.Name {
|
|
return -1
|
|
} else if a.Name > b.Name {
|
|
return 1
|
|
}
|
|
return 0
|
|
})
|
|
|
|
return statuses, nil
|
|
}
|
|
|
|
func (w *Watchdog) copyChecks() []Check {
|
|
cp := make([]Check, 0, len(w.checks))
|
|
for _, v := range w.checks {
|
|
cp = append(cp, v.check)
|
|
}
|
|
|
|
return cp
|
|
}
|
|
|
|
func (w *Watchdog) startMonitoring(wdc *wdCheck) {
|
|
wdc.stop = make(chan struct{})
|
|
c := wdc.check
|
|
|
|
if !w.monitoring {
|
|
w.monitoring = true
|
|
}
|
|
|
|
w.running++
|
|
|
|
go func() {
|
|
var curr error = nil
|
|
|
|
ticker := time.Tick(wdc.check.Interval)
|
|
|
|
for {
|
|
w.limiter <- struct{}{}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), w.timeout)
|
|
defer cancel()
|
|
|
|
err := c.Check(ctx)
|
|
|
|
<-w.limiter
|
|
|
|
r := CheckResult{
|
|
Name: c.Name,
|
|
Error: err,
|
|
}
|
|
|
|
// if status changed or we've got an error
|
|
// then report this
|
|
if !errors.Is(r.Error, curr) {
|
|
w.events <- r
|
|
}
|
|
|
|
curr = r.Error
|
|
|
|
select {
|
|
case <-ticker:
|
|
// continue looping
|
|
case <-wdc.stop:
|
|
// stopping this specific check
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (w *Watchdog) stopMonitoring(wdc *wdCheck) {
|
|
close(wdc.stop)
|
|
w.running--
|
|
|
|
if w.running == 0 {
|
|
w.monitoring = false
|
|
close(w.events)
|
|
}
|
|
}
|
|
|
|
func runChecksConcurrently(ctx context.Context, ch []Check, concurrency int) []CheckResult {
|
|
sema := make(chan struct{}, concurrency) // semaphore to limit concurrency
|
|
done := make(chan CheckResult, len(ch))
|
|
|
|
wg := new(sync.WaitGroup)
|
|
wg.Add(len(ch))
|
|
for _, e := range ch {
|
|
go func() {
|
|
sema <- struct{}{} // acquire
|
|
defer func() {
|
|
<-sema // release
|
|
wg.Done()
|
|
}()
|
|
|
|
// relying on assumption that CheckFunc obeys context
|
|
// cancellation
|
|
err := e.Check(ctx)
|
|
|
|
r := CheckResult{
|
|
Name: e.Name,
|
|
Error: err,
|
|
}
|
|
|
|
done <- r
|
|
}()
|
|
}
|
|
|
|
go func() {
|
|
wg.Wait()
|
|
close(done)
|
|
}()
|
|
|
|
results := make([]CheckResult, 0, len(ch))
|
|
|
|
// collect results
|
|
for r := range done {
|
|
results = append(results, r)
|
|
}
|
|
|
|
return results
|
|
}
|