From 82143dffc7e4893ec4378ca27c13463c8608ca32 Mon Sep 17 00:00:00 2001 From: Dmitry Fedotov Date: Sat, 30 Aug 2025 23:26:56 +0300 Subject: [PATCH] fix: obey main context --- runner.go | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/runner.go b/runner.go index 6689a73..2609ea2 100644 --- a/runner.go +++ b/runner.go @@ -37,9 +37,9 @@ type Writer interface { Write(context.Context, []string) error } -// Process accepts string slice, does what it should +// Processor accepts string slice, does what it should // and returns output. Non-nil error returned -// by Processor terminates stops further processing. +// by Processor stops further processing. type Processor func(context.Context, []string) ([]string, error) type RunConfig struct { @@ -51,13 +51,15 @@ type RunConfig struct { Concurrency int } -// Run starts concurrency threads (goroutines), reads from provided Reader, -// executes each Processor in the order they were provide and records result -// with provided Writer. -// At first Read is called offset times with output of Read being discarded. +// Run starts the script described by r. +// First Read is called offset times with output of Read being discarded. // Then limit Reads are made and processor is called for each portion -// of data. If limit is 0 then Runner keep processing input until it receives +// of data. If limit is 0 then Run keep processing input until it receives // EOF from Reader. +// Run fails on any error including Reader error, Writer error and Processor error. +// If an error is encountered the writer operation will be attampted anyway so that +// the output is left in consistent state, recording what has been actually done +// by Processor. func Run(ctx context.Context, r RunConfig) error { if r.Concurrency == 0 { r.Concurrency = 1 @@ -118,6 +120,7 @@ func Run(ctx context.Context, r RunConfig) error { return nil }) + // run processing routines grp.Go(func() error { // will close write chan once // all workers are done @@ -136,12 +139,16 @@ func Run(ctx context.Context, r RunConfig) error { } wrch <- result + + // if one of workers died or parent context expired + // we should die too + if err := innrctx.Err(); err != nil { + return nil + } } return nil - }) - } if err := workergrp.Wait(); err != nil { -- 2.49.1