package script import ( "context" "errors" "fmt" "io" "sync/atomic" "golang.org/x/sync/errgroup" ) var ( EOF error = io.EOF ErrNoProcessors = errors.New("script: no processors provided") ) var ( _ Reader = &CSVReader{} _ Reader = &MemReader{} _ Reader = &StdinReader{} _ Writer = &CSVWriter{} _ Writer = &MemWriter{} _ Writer = &StdoutWriter{} _ Writer = &NopWriter{} ) type Reader interface { // Read is called by Processor to obtain input // for processing. Read must return EOF/io.EOF to indicate // that there is no more input. Read() ([]string, error) } type Writer interface { // Write is called by Processor to record // output. Write([]string) error } // Processor accepts string slice, does what it should // and returns output. Non-nil error returned // by Processor stops further processing. type Processor func(context.Context, []string) ([]string, error) type RunConfig struct { Input Reader Output Writer Processor Processor Offset int Limit int Concurrency int } type RunResult struct { Read int // number of records read without error (offset count not included) Processed int // number of records processed without error Written int // number of records written to Writer without error } // Run starts the script described by r. // First Read is called offset times with output of Read being discarded. // Then limit Reads are made and processor is called for each portion // of data. If limit is 0 then Run keeps processing input until it receives // EOF from Reader. // Run fails on any error including Reader error, Writer error and Processor error. // The returned RunResult is AWAYS VALID and indicates the actual progress of script. // Returned error explains why Run failed. It may be either read, process or write error. func Run(ctx context.Context, r RunConfig) (RunResult, error) { if r.Concurrency <= 0 { r.Concurrency = 1 } grp, ctx := errgroup.WithContext(ctx) rdch := make(chan []string, r.Concurrency) wrch := make(chan []string, r.Concurrency) var read, proc, written uint32 // read input from Reader and forward to Processor grp.Go(func() error { // closing chan to Processor defer close(rdch) for range r.Offset { _, err := r.Input.Read() if err != nil { return fmt.Errorf("script: could not advance to required offset (%d): %s", r.Offset, err) } } count := 0 for { inp, err := r.Input.Read() if errors.Is(err, EOF) { return nil } else if err != nil { return fmt.Errorf("script: read error: %s", err) } // increment read count read++ select { case rdch <- inp: case <-ctx.Done(): // will also close channel causing // all routines to complete return nil } count++ if count == r.Limit { // will never happen if limit has been set to 0 return nil } } }) // read output of Processor and write to Writer grp.Go(func() error { defer func() { for range wrch { // NOP to drain channel } }() // not paying attention to context here // because we must complete writes // this is run within group so that write // error would cancel group context for outp := range wrch { if err := r.Output.Write(outp); err != nil { return fmt.Errorf("script: write error: %s", err) } //increment write count written++ } return nil }) // run processing routines grp.Go(func() error { // closing chan to Writer defer close(wrch) defer func() { for range rdch { // NOP to drain channel } }() workergrp := errgroup.Group{} for range r.Concurrency { workergrp.Go(func() error { for inp := range rdch { result, err := r.Processor(ctx, inp) if err != nil { return fmt.Errorf("script: process error: %s", err) } // increment processed count atomic.AddUint32(&proc, 1) select { case wrch <- result: case <-ctx.Done(): // this case is a must if writer fails // otherwise we'd want to push process result // to wrch return nil } } return nil }) } if err := workergrp.Wait(); err != nil { return err } return nil }) err := grp.Wait() // if this is a context expiry then error is nil return RunResult{ Read: int(read), Processed: int(proc), Written: int(written), }, err }