feat: stable version

Co-authored-by: Dmitry Fedotov <dmitry@uint32.ru>
Co-committed-by: Dmitry Fedotov <dmitry@uint32.ru>
This commit is contained in:
2025-07-25 18:42:16 +03:00
committed by dmitry
parent 82a4641ab0
commit 3aadddbcac
8 changed files with 362 additions and 228 deletions

View File

@@ -6,8 +6,6 @@ import (
"slices"
"sync"
"time"
"golang.org/x/sync/errgroup"
)
var (
@@ -18,20 +16,37 @@ var (
// Watchdog keeps checks to run either periodically
// or on demand.
type Watchdog struct {
checks []Check
running *running
mu sync.Mutex
checks []*wdCheck
mu sync.Mutex
monitoring bool // is monitoring currently in progress
events chan CheckResult // output channel
limiter chan struct{} // TODO: use proper limiter here
timeout time.Duration
running int
}
type running struct {
out chan CheckResult
stop chan struct{}
type wdCheck struct {
check Check
stop chan struct{}
}
// New accepts checks to run.
// New creates instance of Watchdog with
// provided checks.
func New(checks ...Check) *Watchdog {
ch := make([]*wdCheck, len(checks))
for i := range checks {
ch[i] = &wdCheck{
check: checks[i],
}
}
w := &Watchdog{
checks: checks,
checks: ch,
}
return w
@@ -42,94 +57,129 @@ func (w *Watchdog) ListChecks() []Check {
defer w.mu.Unlock()
out := make([]Check, len(w.checks))
copy(out, w.checks)
for i := range w.checks {
out[i] = w.checks[i].check
}
return out
}
// AddChecks adds checks to the group. This DOES NOT
// affect already runnning monitoring for group. Use Stop and
// then Start to restart monitoring when a new check is added.
// SetTimeout sets timeout for all checks that
// get started with Start method. Changing this
// value does not affect running checks.
// Watchdog does not enforce this timeout, it
// just passes context.WithTimemout to check functions.
// If this method is not called the default timeout
// of 10 seconds is used.
func (w *Watchdog) SetTimeout(d time.Duration) {
w.mu.Lock()
defer w.mu.Unlock()
w.timeout = d
}
// AddChecks adds checks to the group.
// If monitoring is in progress then monitoring it started for the newly added
// check as well.
// Check may have duplicate Name fields but note that RemoveChecks removes checks
// by their Name fields.
func (w *Watchdog) AddChecks(checks ...Check) {
w.mu.Lock()
defer w.mu.Unlock()
w.checks = append(w.checks, checks...)
for i := range checks {
nc := &wdCheck{
check: checks[i],
}
w.checks = append(w.checks, nc)
if w.monitoring {
w.startMonitoring(nc)
}
}
}
// RemoveChecks removes the named checks.
// This does not affect the already running monitoring for the group.
func (w *Watchdog) RemoveChecks(names ...string) {
w.mu.Lock()
defer w.mu.Unlock()
remaining := make([]Check, 0, len(w.checks)-len(names))
for _, e := range w.checks {
if slices.Contains(names, e.Name) {
remaining := make([]*wdCheck, 0, len(w.checks)-len(names))
for _, c := range w.checks {
if slices.Contains(names, c.check.Name) {
if w.monitoring {
w.stopMonitoring(c)
}
continue
}
remaining = append(remaining, e)
remaining = append(remaining, c)
}
w.checks = remaining
}
// Start starts monitoring.
// Subsequent calls to start just returns the SAME channel. If you need
// Subsequent calls to start return the SAME channel. If you need
// to have more that one reader from the channel - fan out on your side.
// On start Watchdog runs all provided check and pushes current status of ALL checks of
// the group to the channel.
// Subsequently only changes of status are pushed regardless of return error values of
// CheckFunc provided to Watchdog.
// Concurrency limits the number of checks that can run concurrently. 0 means no
// On start Watchdog runs all provided check and pushes current status of checks of
// to the channel.
// Subsequently if check func returns different status or if it returns an error the
// result is pushed to the channel.
// Concurrency argument limits the number of checks that can run concurrently. 0 means no
// limit (all checks may run concurrently).
func (w *Watchdog) Start(ctx context.Context, concurrency int) (<-chan CheckResult, error) {
func (w *Watchdog) Start(concurrency int) (<-chan CheckResult, error) {
w.mu.Lock()
defer w.mu.Unlock()
if w.running != nil {
return w.running.out, nil
}
// start new session
if len(w.checks) == 0 {
return nil, ErrNotConfigured
}
cp := w.copyChecks()
if concurrency == 0 {
concurrency = len(cp)
if w.monitoring {
return w.events, nil
}
w.runMonitoringForGroup(ctx, cp, concurrency)
if concurrency == 0 {
concurrency = len(w.checks)
}
return w.running.out, nil
if w.timeout == 0 {
w.timeout = DefaultTimeout
}
w.events = make(chan CheckResult, concurrency)
w.limiter = make(chan struct{}, concurrency)
for i := range w.checks {
w.startMonitoring(w.checks[i])
}
w.monitoring = true
return w.events, nil
}
// Stop stops execution of checks.
// Subsequent calls of Stop for the same groupID
// return ErrNotRunning.
// Subsequent calls return ErrNotRunning.
func (w *Watchdog) Stop() error {
w.mu.Lock()
defer w.mu.Unlock()
if w.running == nil {
if !w.monitoring {
return ErrNotRunning
}
close(w.running.stop)
w.running = nil
for i := range w.checks {
w.stopMonitoring(w.checks[i])
}
return nil
}
// RunImmediately runs configured checks for group concurrently and returns results.
// Concurrency limits number of checks that are allowed run concurrently. Setting
// concurrency to 0 means that all check of the group are allowed to run simultaneously.
// RunImmediately runs configured checks concurrently and returns results.
// Setting concurrency to 0 means that all check of the group are allowed to run simultaneously.
// Otherwise at most concurrency checks will be allowed to run simultaneously.
func (w *Watchdog) RunImmediately(ctx context.Context, concurrency int) ([]CheckResult, error) {
w.mu.Lock()
if len(w.checks) == 0 {
@@ -160,86 +210,94 @@ func (w *Watchdog) RunImmediately(ctx context.Context, concurrency int) ([]Check
func (w *Watchdog) copyChecks() []Check {
cp := make([]Check, len(w.checks))
copy(cp, w.checks)
for i := range w.checks {
cp[i] = w.checks[i].check
}
return cp
}
func (w *Watchdog) runMonitoringForGroup(ctx context.Context, checks []Check, concurrency int) {
events := make(chan CheckResult, len(checks))
stop := make(chan struct{})
func (w *Watchdog) startMonitoring(wdc *wdCheck) {
wdc.stop = make(chan struct{})
w.running = &running{out: events, stop: stop}
state := CheckResult{}
ticker := time.Tick(wdc.check.Interval)
grp := errgroup.Group{}
sema := make(chan struct{}, concurrency)
c := wdc.check
for _, c := range checks {
grp.Go(func() error {
state := CheckResult{}
ticker := time.Tick(c.Interval)
// this method is called only with
// w.mu locked
w.running++
for {
sema <- struct{}{}
status, err := c.Check(ctx)
<-sema
s := CheckResult{
Name: c.Name,
Status: status,
Error: err,
}
if s.Status != state.Status {
events <- s
}
state = s
select {
case <-ticker:
// continue looping
case <-stop:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
})
}
// separate goroutine to close the output chan
// when everyone's dead
go func() {
grp.Wait()
close(events)
defer func() {
w.mu.Lock()
defer w.mu.Unlock()
w.mu.Lock()
defer w.mu.Unlock()
if w.running != nil {
w.running = nil
w.running--
if w.running == 0 {
// last goroutine to exit will also
// close the output chan
close(w.events)
w.monitoring = false
}
}()
for {
w.limiter <- struct{}{}
ctx, cancel := context.WithTimeout(context.Background(), w.timeout)
defer cancel()
status, err := c.Check(ctx)
<-w.limiter
s := CheckResult{
Name: c.Name,
Status: status,
Error: err,
}
if s.Status != state.Status || s.Error != nil {
w.events <- s
}
state = s
select {
case <-ticker:
// continue looping
case <-wdc.stop:
// stopping this specific check
return
}
}
}()
}
func (w *Watchdog) stopMonitoring(wdc *wdCheck) {
close(wdc.stop)
}
func runChecksConcurrently(ctx context.Context, ch []Check, concurrency int) []CheckResult {
statuses := make([]CheckResult, 0, len(ch))
m := sync.Mutex{} // for append operations
group := errgroup.Group{}
sema := make(chan struct{}, concurrency) // semaphore to limit concurrency
done := make(chan struct{}, len(ch))
count := len(ch)
for _, e := range ch {
sema <- struct{}{} // acquire
group.Go(func() error {
go func() error {
defer func() {
<-sema
done <- struct{}{}
}()
// relying on fact that CheckFunc obeys context
// relying on assumption that CheckFunc obeys context
// cancellation
status, err := e.Check(ctx)
@@ -253,13 +311,17 @@ func runChecksConcurrently(ctx context.Context, ch []Check, concurrency int) []C
defer m.Unlock()
statuses = append(statuses, r)
<-sema // release
return nil
})
}()
}
group.Wait()
// wait for all to finish
for range done {
count--
if count == 0 {
close(done)
}
}
return statuses
}