package script import ( "context" "errors" "fmt" "io" "golang.org/x/sync/errgroup" ) var ( EOF error = io.EOF ErrNoProcessors = errors.New("no processors provided") ) var ( _ Reader = &CSVReader{} _ Reader = &MemReader{} _ Reader = &StdinReader{} _ Writer = &CSVWriter{} _ Writer = &MemWriter{} _ Writer = &StdoutWriter{} _ Writer = &NopWriter{} ) type Reader interface { // Read is called by Processor to obtain input // for processing. Read must return EOF/io.EOF to indicate // that there is no more input. Read(context.Context) ([]string, error) } type Writer interface { // Write is called by Processor to record // output. Write(context.Context, []string) error } // Process accepts string slice, does what it should // and returns output. Non-nil error returned // by Processor terminates stops further processing. type Processor func(context.Context, []string) ([]string, error) type RunConfig struct { Input Reader Output Writer Processor Processor Offset int Limit int Concurrency int } // Run starts concurrency threads (goroutines), reads from provided Reader, // executes each Processor in the order they were provide and records result // with provided Writer. // At first Read is called offset times with output of Read being discarded. // Then limit Reads are made and processor is called for each portion // of data. If limit is 0 then Runner keep processing input until it receives // EOF from Reader. func Run(ctx context.Context, r RunConfig) error { if r.Concurrency == 0 { r.Concurrency = 1 } grp, ctx := errgroup.WithContext(ctx) rdch := make(chan []string, r.Concurrency) wrch := make(chan []string, r.Concurrency) // read input from Reader and forward to Processor grp.Go(func() error { // closing chan for processor to complete operations defer close(rdch) for range r.Offset { _, err := r.Input.Read(ctx) if err != nil { return fmt.Errorf("could not advance to required offset: %w", err) } } count := 0 for { inp, err := r.Input.Read(ctx) if err != nil && errors.Is(err, EOF) { return nil } else if err != nil { return err } select { case rdch <- inp: case <-ctx.Done(): // will also close channel causing // all routines to complete return nil } count++ if count == r.Limit { // will never happen if limit set to 0 return nil } } }) // read output of Processor and write to Writer grp.Go(func() error { // not paying attention to context here // because we must complete writes for outp := range wrch { if err := r.Output.Write(ctx, outp); err != nil { return err } } return nil }) grp.Go(func() error { // will close write chan once // all workers are done defer close(wrch) workergrp, innrctx := errgroup.WithContext(ctx) for range r.Concurrency { workergrp.Go(func() error { // not paying attention to context here // because we must complete writes for inp := range rdch { result, err := r.Processor(innrctx, inp) if err != nil { return err } wrch <- result } return nil }) } if err := workergrp.Wait(); err != nil { return err } return nil }) if err := grp.Wait(); err != nil { return err } return nil }