Compare commits
2 Commits
ec5db88f97
...
858cae9547
Author | SHA1 | Date | |
---|---|---|---|
858cae9547 | |||
82143dffc7 |
25
runner.go
25
runner.go
@@ -37,9 +37,9 @@ type Writer interface {
|
|||||||
Write(context.Context, []string) error
|
Write(context.Context, []string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process accepts string slice, does what it should
|
// Processor accepts string slice, does what it should
|
||||||
// and returns output. Non-nil error returned
|
// and returns output. Non-nil error returned
|
||||||
// by Processor terminates stops further processing.
|
// by Processor stops further processing.
|
||||||
type Processor func(context.Context, []string) ([]string, error)
|
type Processor func(context.Context, []string) ([]string, error)
|
||||||
|
|
||||||
type RunConfig struct {
|
type RunConfig struct {
|
||||||
@@ -51,13 +51,15 @@ type RunConfig struct {
|
|||||||
Concurrency int
|
Concurrency int
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run starts concurrency threads (goroutines), reads from provided Reader,
|
// Run starts the script described by r.
|
||||||
// executes each Processor in the order they were provide and records result
|
// First Read is called offset times with output of Read being discarded.
|
||||||
// with provided Writer.
|
|
||||||
// At first Read is called offset times with output of Read being discarded.
|
|
||||||
// Then limit Reads are made and processor is called for each portion
|
// Then limit Reads are made and processor is called for each portion
|
||||||
// of data. If limit is 0 then Runner keep processing input until it receives
|
// of data. If limit is 0 then Run keep processing input until it receives
|
||||||
// EOF from Reader.
|
// EOF from Reader.
|
||||||
|
// Run fails on any error including Reader error, Writer error and Processor error.
|
||||||
|
// If an error is encountered the writer operation will be attampted anyway so that
|
||||||
|
// the output is left in consistent state, recording what has been actually done
|
||||||
|
// by Processor.
|
||||||
func Run(ctx context.Context, r RunConfig) error {
|
func Run(ctx context.Context, r RunConfig) error {
|
||||||
if r.Concurrency == 0 {
|
if r.Concurrency == 0 {
|
||||||
r.Concurrency = 1
|
r.Concurrency = 1
|
||||||
@@ -118,6 +120,7 @@ func Run(ctx context.Context, r RunConfig) error {
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// run processing routines
|
||||||
grp.Go(func() error {
|
grp.Go(func() error {
|
||||||
// will close write chan once
|
// will close write chan once
|
||||||
// all workers are done
|
// all workers are done
|
||||||
@@ -136,12 +139,16 @@ func Run(ctx context.Context, r RunConfig) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
wrch <- result
|
wrch <- result
|
||||||
|
|
||||||
|
// if one of workers died or parent context expired
|
||||||
|
// we should die too
|
||||||
|
if err := innrctx.Err(); err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
})
|
})
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := workergrp.Wait(); err != nil {
|
if err := workergrp.Wait(); err != nil {
|
||||||
|
Reference in New Issue
Block a user