Do not return event with ok status on first run of check. Will only return check result if anything went wrong.
334 lines
6.5 KiB
Go
334 lines
6.5 KiB
Go
package watchdog
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"slices"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
var (
|
|
ErrNotConfigured = errors.New("no checks configured")
|
|
ErrNotRunning = errors.New("watchdog is not running")
|
|
)
|
|
|
|
// Watchdog keeps checks to run either periodically
|
|
// or on demand.
|
|
type Watchdog struct {
|
|
checks []*wdCheck
|
|
mu sync.Mutex
|
|
|
|
monitoring bool // is monitoring currently in progress
|
|
|
|
events chan CheckResult // output channel
|
|
limiter chan struct{} // TODO: use proper limiter here
|
|
|
|
timeout time.Duration
|
|
|
|
running int
|
|
}
|
|
|
|
type wdCheck struct {
|
|
check Check
|
|
stop chan struct{}
|
|
}
|
|
|
|
// New creates instance of Watchdog with
|
|
// provided checks.
|
|
func New(checks ...Check) *Watchdog {
|
|
ch := make([]*wdCheck, len(checks))
|
|
|
|
for i := range checks {
|
|
ch[i] = &wdCheck{
|
|
check: checks[i],
|
|
}
|
|
}
|
|
|
|
w := &Watchdog{
|
|
checks: ch,
|
|
}
|
|
|
|
return w
|
|
}
|
|
|
|
func (w *Watchdog) ListChecks() []Check {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
out := make([]Check, len(w.checks))
|
|
for i := range w.checks {
|
|
out[i] = w.checks[i].check
|
|
}
|
|
|
|
return out
|
|
}
|
|
|
|
// SetTimeout sets timeout for all checks that
|
|
// get started with Start method. Changing this
|
|
// value does not affect running checks.
|
|
// Watchdog does not enforce this timeout, it
|
|
// just passes context.WithTimemout to check functions.
|
|
// If this method is not called the default timeout
|
|
// of 10 seconds is used.
|
|
func (w *Watchdog) SetTimeout(d time.Duration) {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
w.timeout = d
|
|
}
|
|
|
|
// AddChecks adds checks to the group.
|
|
// If monitoring is in progress then monitoring it started for the newly added
|
|
// check as well.
|
|
// Check may have duplicate Name fields but note that RemoveChecks removes checks
|
|
// by their Name fields.
|
|
func (w *Watchdog) AddChecks(checks ...Check) {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
for i := range checks {
|
|
nc := &wdCheck{
|
|
check: checks[i],
|
|
}
|
|
w.checks = append(w.checks, nc)
|
|
|
|
if w.monitoring {
|
|
w.startMonitoring(nc)
|
|
}
|
|
}
|
|
}
|
|
|
|
// RemoveChecks removes the named checks.
|
|
func (w *Watchdog) RemoveChecks(names ...string) {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
remaining := make([]*wdCheck, 0, len(w.checks)-len(names))
|
|
for _, c := range w.checks {
|
|
if slices.Contains(names, c.check.Name) {
|
|
if w.monitoring {
|
|
w.stopMonitoring(c)
|
|
}
|
|
continue
|
|
}
|
|
|
|
remaining = append(remaining, c)
|
|
}
|
|
|
|
w.checks = remaining
|
|
}
|
|
|
|
// Start starts monitoring.
|
|
// Subsequent calls to start return the SAME channel. If you need
|
|
// to have more that one reader from the channel - fan out on your side.
|
|
// On start Watchdog runs all provided check and pushes current status of checks of
|
|
// to the channel.
|
|
// Subsequently if check func returns different status or if it returns an error the
|
|
// result is pushed to the channel.
|
|
// Concurrency argument limits the number of checks that can run concurrently. 0 means no
|
|
// limit (all checks may run concurrently).
|
|
func (w *Watchdog) Start(concurrency int) (<-chan CheckResult, error) {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
if len(w.checks) == 0 {
|
|
return nil, ErrNotConfigured
|
|
}
|
|
|
|
if w.monitoring {
|
|
return w.events, nil
|
|
}
|
|
|
|
if concurrency == 0 {
|
|
concurrency = len(w.checks)
|
|
}
|
|
|
|
if w.timeout == 0 {
|
|
w.timeout = DefaultTimeout
|
|
}
|
|
|
|
w.events = make(chan CheckResult, concurrency)
|
|
w.limiter = make(chan struct{}, concurrency)
|
|
|
|
for i := range w.checks {
|
|
w.startMonitoring(w.checks[i])
|
|
}
|
|
|
|
w.monitoring = true
|
|
|
|
return w.events, nil
|
|
}
|
|
|
|
// Stop stops execution of checks.
|
|
// Subsequent calls return ErrNotRunning.
|
|
func (w *Watchdog) Stop() error {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
if !w.monitoring {
|
|
return ErrNotRunning
|
|
}
|
|
|
|
for i := range w.checks {
|
|
w.stopMonitoring(w.checks[i])
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// RunImmediately runs configured checks concurrently and returns results.
|
|
// Setting concurrency to 0 means that all check of the group are allowed to run simultaneously.
|
|
// Otherwise at most concurrency checks will be allowed to run simultaneously.
|
|
func (w *Watchdog) RunImmediately(ctx context.Context, concurrency int) ([]CheckResult, error) {
|
|
w.mu.Lock()
|
|
if len(w.checks) == 0 {
|
|
w.mu.Unlock()
|
|
return nil, ErrNotConfigured
|
|
}
|
|
|
|
cp := w.copyChecks()
|
|
w.mu.Unlock() // release
|
|
|
|
if concurrency == 0 {
|
|
concurrency = len(cp)
|
|
}
|
|
|
|
statuses := runChecksConcurrently(ctx, cp, concurrency)
|
|
|
|
slices.SortFunc(statuses, func(a, b CheckResult) int {
|
|
if a.Name < b.Name {
|
|
return -1
|
|
} else if a.Name > b.Name {
|
|
return 1
|
|
}
|
|
return 0
|
|
})
|
|
|
|
return statuses, nil
|
|
}
|
|
|
|
func (w *Watchdog) copyChecks() []Check {
|
|
cp := make([]Check, len(w.checks))
|
|
for i := range w.checks {
|
|
cp[i] = w.checks[i].check
|
|
}
|
|
|
|
return cp
|
|
}
|
|
|
|
func (w *Watchdog) startMonitoring(wdc *wdCheck) {
|
|
|
|
wdc.stop = make(chan struct{})
|
|
c := wdc.check
|
|
|
|
// this method is called only with
|
|
// w.mu locked
|
|
w.running++
|
|
|
|
go func() {
|
|
defer func() {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
w.running--
|
|
if w.running == 0 {
|
|
// last goroutine to exit will also
|
|
// close the output chan
|
|
close(w.events)
|
|
w.monitoring = false
|
|
}
|
|
}()
|
|
|
|
state := CheckResult{
|
|
// if first run return anything
|
|
// other that OK, we'll report it
|
|
// if first run is OK, then we do not need to report
|
|
Status: StatusOK,
|
|
}
|
|
|
|
ticker := time.Tick(wdc.check.Interval)
|
|
|
|
for {
|
|
w.limiter <- struct{}{}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), w.timeout)
|
|
defer cancel()
|
|
|
|
status, err := c.Check(ctx)
|
|
|
|
<-w.limiter
|
|
|
|
s := CheckResult{
|
|
Name: c.Name,
|
|
Status: status,
|
|
Error: err,
|
|
}
|
|
|
|
if s.Status != state.Status || s.Error != nil {
|
|
w.events <- s
|
|
}
|
|
|
|
state = s
|
|
|
|
select {
|
|
case <-ticker:
|
|
// continue looping
|
|
case <-wdc.stop:
|
|
// stopping this specific check
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (w *Watchdog) stopMonitoring(wdc *wdCheck) {
|
|
close(wdc.stop)
|
|
}
|
|
|
|
func runChecksConcurrently(ctx context.Context, ch []Check, concurrency int) []CheckResult {
|
|
statuses := make([]CheckResult, 0, len(ch))
|
|
m := sync.Mutex{} // for append operations
|
|
|
|
sema := make(chan struct{}, concurrency) // semaphore to limit concurrency
|
|
done := make(chan struct{}, len(ch))
|
|
|
|
count := len(ch)
|
|
|
|
for _, e := range ch {
|
|
sema <- struct{}{} // acquire
|
|
go func() error {
|
|
defer func() {
|
|
<-sema
|
|
done <- struct{}{}
|
|
}()
|
|
|
|
// relying on assumption that CheckFunc obeys context
|
|
// cancellation
|
|
status, err := e.Check(ctx)
|
|
|
|
r := CheckResult{
|
|
Name: e.Name,
|
|
Status: status,
|
|
Error: err,
|
|
}
|
|
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
statuses = append(statuses, r)
|
|
|
|
return nil
|
|
}()
|
|
}
|
|
|
|
// wait for all to finish
|
|
for range done {
|
|
count--
|
|
if count == 0 {
|
|
close(done)
|
|
}
|
|
}
|
|
|
|
return statuses
|
|
}
|