diff --git a/watchdog.go b/watchdog.go index d3da178..ed26ad7 100644 --- a/watchdog.go +++ b/watchdog.go @@ -16,17 +16,57 @@ var ( // Watchdog keeps checks to run either periodically // or on demand. type Watchdog struct { - checks []*wdCheck + checks checksMap mu sync.Mutex - monitoring bool // is monitoring currently in progress - events chan CheckResult // output channel limiter chan struct{} // TODO: use proper limiter here - timeout time.Duration + timeout time.Duration // timeout for checks to complete - running int + monitoring bool // is monitoring currently in progress + running int // number of active checks monitored +} + +type checksMap struct { + m map[string]*wdCheck +} + +func (c *checksMap) build() { + if c.m == nil { + c.m = make(map[string]*wdCheck) + } +} + +func (c *checksMap) Map() map[string]*wdCheck { + c.build() + + return c.m +} + +func (c *checksMap) Set(key string, v *wdCheck) { + c.build() + + c.m[key] = v +} + +func (c *checksMap) Lookup(key string) (*wdCheck, bool) { + c.build() + + v, ok := c.m[key] + return v, ok +} + +func (c *checksMap) Delete(key string) { + c.build() + + delete(c.m, key) +} + +func (c *checksMap) Len() int { + c.build() + + return len(c.m) } type wdCheck struct { @@ -37,29 +77,23 @@ type wdCheck struct { // New creates instance of Watchdog with // provided checks. func New(checks ...Check) *Watchdog { - ch := make([]*wdCheck, len(checks)) - - for i := range checks { - ch[i] = &wdCheck{ - check: checks[i], + w := Watchdog{} + for _, c := range checks { + nc := &wdCheck{ + check: c, } + + w.checks.Set(c.Name, nc) } - w := &Watchdog{ - checks: ch, - } - - return w + return &w } func (w *Watchdog) ListChecks() []Check { w.mu.Lock() defer w.mu.Unlock() - out := make([]Check, len(w.checks)) - for i := range w.checks { - out[i] = w.checks[i].check - } + out := w.copyChecks() return out } @@ -81,20 +115,27 @@ func (w *Watchdog) SetTimeout(d time.Duration) { // AddChecks adds checks to the group. // If monitoring is in progress then monitoring it started for the newly added // check as well. -// Check may have duplicate Name fields but note that RemoveChecks removes checks -// by their Name fields. +// Check may have not have duplicate Name fields. New check with the same +// hame overwrites the previous one. func (w *Watchdog) AddChecks(checks ...Check) { w.mu.Lock() defer w.mu.Unlock() - for i := range checks { + for _, c := range checks { nc := &wdCheck{ - check: checks[i], + check: c, } - w.checks = append(w.checks, nc) + + old, haveOld := w.checks.Lookup(c.Name) + + w.checks.Set(c.Name, nc) if w.monitoring { w.startMonitoring(nc) + + if haveOld { + w.stopMonitoring(old) + } } } } @@ -104,19 +145,18 @@ func (w *Watchdog) RemoveChecks(names ...string) { w.mu.Lock() defer w.mu.Unlock() - remaining := make([]*wdCheck, 0, len(w.checks)-len(names)) - for _, c := range w.checks { - if slices.Contains(names, c.check.Name) { - if w.monitoring { - w.stopMonitoring(c) - } + for _, name := range names { + c, ok := w.checks.Lookup(name) + if !ok { continue } - remaining = append(remaining, c) - } + if w.monitoring { + w.stopMonitoring(c) + } - w.checks = remaining + w.checks.Delete(name) + } } // Start starts monitoring. @@ -132,7 +172,7 @@ func (w *Watchdog) Start(concurrency int) (<-chan CheckResult, error) { w.mu.Lock() defer w.mu.Unlock() - if len(w.checks) == 0 { + if w.checks.Len() == 0 { return nil, ErrNotConfigured } @@ -141,7 +181,7 @@ func (w *Watchdog) Start(concurrency int) (<-chan CheckResult, error) { } if concurrency == 0 { - concurrency = len(w.checks) + concurrency = w.checks.Len() } if w.timeout == 0 { @@ -151,12 +191,10 @@ func (w *Watchdog) Start(concurrency int) (<-chan CheckResult, error) { w.events = make(chan CheckResult, concurrency) w.limiter = make(chan struct{}, concurrency) - for i := range w.checks { - w.startMonitoring(w.checks[i]) + for _, c := range w.checks.Map() { + w.startMonitoring(c) } - w.monitoring = true - return w.events, nil } @@ -170,8 +208,8 @@ func (w *Watchdog) Stop() error { return ErrNotRunning } - for i := range w.checks { - w.stopMonitoring(w.checks[i]) + for _, c := range w.checks.Map() { + w.stopMonitoring(c) } return nil @@ -182,7 +220,8 @@ func (w *Watchdog) Stop() error { // Otherwise at most concurrency checks will be allowed to run simultaneously. func (w *Watchdog) RunImmediately(ctx context.Context, concurrency int) ([]CheckResult, error) { w.mu.Lock() - if len(w.checks) == 0 { + + if w.checks.Len() == 0 { w.mu.Unlock() return nil, ErrNotConfigured } @@ -209,41 +248,28 @@ func (w *Watchdog) RunImmediately(ctx context.Context, concurrency int) ([]Check } func (w *Watchdog) copyChecks() []Check { - cp := make([]Check, len(w.checks)) - for i := range w.checks { - cp[i] = w.checks[i].check + cp := make([]Check, 0, w.checks.Len()) + for _, v := range w.checks.Map() { + cp = append(cp, v.check) } return cp } func (w *Watchdog) startMonitoring(wdc *wdCheck) { - wdc.stop = make(chan struct{}) c := wdc.check - // this method is called only with - // w.mu locked + if !w.monitoring { + w.monitoring = true + } + w.running++ go func() { - defer func() { - w.mu.Lock() - defer w.mu.Unlock() - - w.running-- - if w.running == 0 { - // last goroutine to exit will also - // close the output chan - close(w.events) - w.monitoring = false - } - }() - state := CheckResult{ - // if first run return anything - // other that OK, we'll report it - // if first run is OK, then we do not need to report + // on first run return anything + // other that OK Status: StatusOK, } @@ -259,17 +285,19 @@ func (w *Watchdog) startMonitoring(wdc *wdCheck) { <-w.limiter - s := CheckResult{ + r := CheckResult{ Name: c.Name, Status: status, Error: err, } - if s.Status != state.Status || s.Error != nil { - w.events <- s + // if status changed or we've got an error + // then report this + if r.Status != state.Status || r.Error != nil { + w.events <- r } - state = s + state = r select { case <-ticker: @@ -284,23 +312,26 @@ func (w *Watchdog) startMonitoring(wdc *wdCheck) { func (w *Watchdog) stopMonitoring(wdc *wdCheck) { close(wdc.stop) + w.running-- + + if w.running == 0 { + w.monitoring = false + close(w.events) + } } func runChecksConcurrently(ctx context.Context, ch []Check, concurrency int) []CheckResult { - statuses := make([]CheckResult, 0, len(ch)) - m := sync.Mutex{} // for append operations - sema := make(chan struct{}, concurrency) // semaphore to limit concurrency - done := make(chan struct{}, len(ch)) - - count := len(ch) + done := make(chan CheckResult, len(ch)) + wg := new(sync.WaitGroup) + wg.Add(len(ch)) for _, e := range ch { - sema <- struct{}{} // acquire - go func() error { + go func() { + sema <- struct{}{} // acquire defer func() { - <-sema - done <- struct{}{} + <-sema // release + wg.Done() }() // relying on assumption that CheckFunc obeys context @@ -313,21 +344,21 @@ func runChecksConcurrently(ctx context.Context, ch []Check, concurrency int) []C Error: err, } - m.Lock() - defer m.Unlock() - statuses = append(statuses, r) - - return nil + done <- r }() } - // wait for all to finish - for range done { - count-- - if count == 0 { - close(done) - } + go func() { + wg.Wait() + close(done) + }() + + results := make([]CheckResult, 0, len(ch)) + + // collect results + for r := range done { + results = append(results, r) } - return statuses + return results }