This commit is contained in:
2025-09-21 14:35:34 +03:00
parent 214fda877e
commit 0f2f9144ed

View File

@@ -16,17 +16,57 @@ var (
// Watchdog keeps checks to run either periodically // Watchdog keeps checks to run either periodically
// or on demand. // or on demand.
type Watchdog struct { type Watchdog struct {
checks []*wdCheck checks checksMap
mu sync.Mutex mu sync.Mutex
monitoring bool // is monitoring currently in progress
events chan CheckResult // output channel events chan CheckResult // output channel
limiter chan struct{} // TODO: use proper limiter here limiter chan struct{} // TODO: use proper limiter here
timeout time.Duration timeout time.Duration // timeout for checks to complete
running int monitoring bool // is monitoring currently in progress
running int // number of active checks monitored
}
type checksMap struct {
m map[string]*wdCheck
}
func (c *checksMap) build() {
if c.m == nil {
c.m = make(map[string]*wdCheck)
}
}
func (c *checksMap) Map() map[string]*wdCheck {
c.build()
return c.m
}
func (c *checksMap) Set(key string, v *wdCheck) {
c.build()
c.m[key] = v
}
func (c *checksMap) Lookup(key string) (*wdCheck, bool) {
c.build()
v, ok := c.m[key]
return v, ok
}
func (c *checksMap) Delete(key string) {
c.build()
delete(c.m, key)
}
func (c *checksMap) Len() int {
c.build()
return len(c.m)
} }
type wdCheck struct { type wdCheck struct {
@@ -37,29 +77,23 @@ type wdCheck struct {
// New creates instance of Watchdog with // New creates instance of Watchdog with
// provided checks. // provided checks.
func New(checks ...Check) *Watchdog { func New(checks ...Check) *Watchdog {
ch := make([]*wdCheck, len(checks)) w := Watchdog{}
for _, c := range checks {
for i := range checks { nc := &wdCheck{
ch[i] = &wdCheck{ check: c,
check: checks[i],
} }
w.checks.Set(c.Name, nc)
} }
w := &Watchdog{ return &w
checks: ch,
}
return w
} }
func (w *Watchdog) ListChecks() []Check { func (w *Watchdog) ListChecks() []Check {
w.mu.Lock() w.mu.Lock()
defer w.mu.Unlock() defer w.mu.Unlock()
out := make([]Check, len(w.checks)) out := w.copyChecks()
for i := range w.checks {
out[i] = w.checks[i].check
}
return out return out
} }
@@ -81,20 +115,27 @@ func (w *Watchdog) SetTimeout(d time.Duration) {
// AddChecks adds checks to the group. // AddChecks adds checks to the group.
// If monitoring is in progress then monitoring it started for the newly added // If monitoring is in progress then monitoring it started for the newly added
// check as well. // check as well.
// Check may have duplicate Name fields but note that RemoveChecks removes checks // Check may have not have duplicate Name fields. New check with the same
// by their Name fields. // hame overwrites the previous one.
func (w *Watchdog) AddChecks(checks ...Check) { func (w *Watchdog) AddChecks(checks ...Check) {
w.mu.Lock() w.mu.Lock()
defer w.mu.Unlock() defer w.mu.Unlock()
for i := range checks { for _, c := range checks {
nc := &wdCheck{ nc := &wdCheck{
check: checks[i], check: c,
} }
w.checks = append(w.checks, nc)
old, haveOld := w.checks.Lookup(c.Name)
w.checks.Set(c.Name, nc)
if w.monitoring { if w.monitoring {
w.startMonitoring(nc) w.startMonitoring(nc)
if haveOld {
w.stopMonitoring(old)
}
} }
} }
} }
@@ -104,19 +145,18 @@ func (w *Watchdog) RemoveChecks(names ...string) {
w.mu.Lock() w.mu.Lock()
defer w.mu.Unlock() defer w.mu.Unlock()
remaining := make([]*wdCheck, 0, len(w.checks)-len(names)) for _, name := range names {
for _, c := range w.checks { c, ok := w.checks.Lookup(name)
if slices.Contains(names, c.check.Name) { if !ok {
if w.monitoring {
w.stopMonitoring(c)
}
continue continue
} }
remaining = append(remaining, c) if w.monitoring {
} w.stopMonitoring(c)
}
w.checks = remaining w.checks.Delete(name)
}
} }
// Start starts monitoring. // Start starts monitoring.
@@ -132,7 +172,7 @@ func (w *Watchdog) Start(concurrency int) (<-chan CheckResult, error) {
w.mu.Lock() w.mu.Lock()
defer w.mu.Unlock() defer w.mu.Unlock()
if len(w.checks) == 0 { if w.checks.Len() == 0 {
return nil, ErrNotConfigured return nil, ErrNotConfigured
} }
@@ -141,7 +181,7 @@ func (w *Watchdog) Start(concurrency int) (<-chan CheckResult, error) {
} }
if concurrency == 0 { if concurrency == 0 {
concurrency = len(w.checks) concurrency = w.checks.Len()
} }
if w.timeout == 0 { if w.timeout == 0 {
@@ -151,12 +191,10 @@ func (w *Watchdog) Start(concurrency int) (<-chan CheckResult, error) {
w.events = make(chan CheckResult, concurrency) w.events = make(chan CheckResult, concurrency)
w.limiter = make(chan struct{}, concurrency) w.limiter = make(chan struct{}, concurrency)
for i := range w.checks { for _, c := range w.checks.Map() {
w.startMonitoring(w.checks[i]) w.startMonitoring(c)
} }
w.monitoring = true
return w.events, nil return w.events, nil
} }
@@ -170,8 +208,8 @@ func (w *Watchdog) Stop() error {
return ErrNotRunning return ErrNotRunning
} }
for i := range w.checks { for _, c := range w.checks.Map() {
w.stopMonitoring(w.checks[i]) w.stopMonitoring(c)
} }
return nil return nil
@@ -182,7 +220,8 @@ func (w *Watchdog) Stop() error {
// Otherwise at most concurrency checks will be allowed to run simultaneously. // Otherwise at most concurrency checks will be allowed to run simultaneously.
func (w *Watchdog) RunImmediately(ctx context.Context, concurrency int) ([]CheckResult, error) { func (w *Watchdog) RunImmediately(ctx context.Context, concurrency int) ([]CheckResult, error) {
w.mu.Lock() w.mu.Lock()
if len(w.checks) == 0 {
if w.checks.Len() == 0 {
w.mu.Unlock() w.mu.Unlock()
return nil, ErrNotConfigured return nil, ErrNotConfigured
} }
@@ -209,41 +248,28 @@ func (w *Watchdog) RunImmediately(ctx context.Context, concurrency int) ([]Check
} }
func (w *Watchdog) copyChecks() []Check { func (w *Watchdog) copyChecks() []Check {
cp := make([]Check, len(w.checks)) cp := make([]Check, 0, w.checks.Len())
for i := range w.checks { for _, v := range w.checks.Map() {
cp[i] = w.checks[i].check cp = append(cp, v.check)
} }
return cp return cp
} }
func (w *Watchdog) startMonitoring(wdc *wdCheck) { func (w *Watchdog) startMonitoring(wdc *wdCheck) {
wdc.stop = make(chan struct{}) wdc.stop = make(chan struct{})
c := wdc.check c := wdc.check
// this method is called only with if !w.monitoring {
// w.mu locked w.monitoring = true
}
w.running++ w.running++
go func() { go func() {
defer func() {
w.mu.Lock()
defer w.mu.Unlock()
w.running--
if w.running == 0 {
// last goroutine to exit will also
// close the output chan
close(w.events)
w.monitoring = false
}
}()
state := CheckResult{ state := CheckResult{
// if first run return anything // on first run return anything
// other that OK, we'll report it // other that OK
// if first run is OK, then we do not need to report
Status: StatusOK, Status: StatusOK,
} }
@@ -259,17 +285,19 @@ func (w *Watchdog) startMonitoring(wdc *wdCheck) {
<-w.limiter <-w.limiter
s := CheckResult{ r := CheckResult{
Name: c.Name, Name: c.Name,
Status: status, Status: status,
Error: err, Error: err,
} }
if s.Status != state.Status || s.Error != nil { // if status changed or we've got an error
w.events <- s // then report this
if r.Status != state.Status || r.Error != nil {
w.events <- r
} }
state = s state = r
select { select {
case <-ticker: case <-ticker:
@@ -284,23 +312,26 @@ func (w *Watchdog) startMonitoring(wdc *wdCheck) {
func (w *Watchdog) stopMonitoring(wdc *wdCheck) { func (w *Watchdog) stopMonitoring(wdc *wdCheck) {
close(wdc.stop) close(wdc.stop)
w.running--
if w.running == 0 {
w.monitoring = false
close(w.events)
}
} }
func runChecksConcurrently(ctx context.Context, ch []Check, concurrency int) []CheckResult { func runChecksConcurrently(ctx context.Context, ch []Check, concurrency int) []CheckResult {
statuses := make([]CheckResult, 0, len(ch))
m := sync.Mutex{} // for append operations
sema := make(chan struct{}, concurrency) // semaphore to limit concurrency sema := make(chan struct{}, concurrency) // semaphore to limit concurrency
done := make(chan struct{}, len(ch)) done := make(chan CheckResult, len(ch))
count := len(ch)
wg := new(sync.WaitGroup)
wg.Add(len(ch))
for _, e := range ch { for _, e := range ch {
sema <- struct{}{} // acquire go func() {
go func() error { sema <- struct{}{} // acquire
defer func() { defer func() {
<-sema <-sema // release
done <- struct{}{} wg.Done()
}() }()
// relying on assumption that CheckFunc obeys context // relying on assumption that CheckFunc obeys context
@@ -313,21 +344,21 @@ func runChecksConcurrently(ctx context.Context, ch []Check, concurrency int) []C
Error: err, Error: err,
} }
m.Lock() done <- r
defer m.Unlock()
statuses = append(statuses, r)
return nil
}() }()
} }
// wait for all to finish go func() {
for range done { wg.Wait()
count-- close(done)
if count == 0 { }()
close(done)
} results := make([]CheckResult, 0, len(ch))
// collect results
for r := range done {
results = append(results, r)
} }
return statuses return results
} }