167 lines
3.5 KiB
Go
167 lines
3.5 KiB
Go
package script
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
var (
|
|
EOF error = io.EOF
|
|
ErrNoProcessors = errors.New("no processors provided")
|
|
)
|
|
|
|
var (
|
|
_ Reader = &CSVReader{}
|
|
_ Reader = &MemReader{}
|
|
_ Reader = &StdinReader{}
|
|
_ Writer = &CSVWriter{}
|
|
_ Writer = &MemWriter{}
|
|
_ Writer = &StdoutWriter{}
|
|
_ Writer = &NopWriter{}
|
|
)
|
|
|
|
type Reader interface {
|
|
// Read is called by Processor to obtain input
|
|
// for processing. Read must return EOF/io.EOF to indicate
|
|
// that there is no more input.
|
|
Read(context.Context) ([]string, error)
|
|
}
|
|
|
|
type Writer interface {
|
|
// Write is called by Processor to record
|
|
// output.
|
|
Write(context.Context, []string) error
|
|
}
|
|
|
|
// Processor accepts string slice, does what it should
|
|
// and returns output. Non-nil error returned
|
|
// by Processor stops further processing.
|
|
type Processor func(context.Context, []string) ([]string, error)
|
|
|
|
type RunConfig struct {
|
|
Input Reader
|
|
Output Writer
|
|
Processor Processor
|
|
Offset int
|
|
Limit int
|
|
Concurrency int
|
|
}
|
|
|
|
// Run starts the script described by r.
|
|
// First Read is called offset times with output of Read being discarded.
|
|
// Then limit Reads are made and processor is called for each portion
|
|
// of data. If limit is 0 then Run keep processing input until it receives
|
|
// EOF from Reader.
|
|
// Run fails on any error including Reader error, Writer error and Processor error.
|
|
// If an error is encountered the writer operation will be attampted anyway so that
|
|
// the output is left in consistent state, recording what has been actually done
|
|
// by Processor.
|
|
func Run(ctx context.Context, r RunConfig) error {
|
|
if r.Concurrency == 0 {
|
|
r.Concurrency = 1
|
|
}
|
|
|
|
grp, ctx := errgroup.WithContext(ctx)
|
|
|
|
rdch := make(chan []string, r.Concurrency)
|
|
wrch := make(chan []string, r.Concurrency)
|
|
|
|
// read input from Reader and forward to Processor
|
|
grp.Go(func() error {
|
|
// closing chan for processor to complete operations
|
|
defer close(rdch)
|
|
|
|
for range r.Offset {
|
|
_, err := r.Input.Read(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("could not advance to required offset: %w", err)
|
|
}
|
|
}
|
|
|
|
count := 0
|
|
for {
|
|
inp, err := r.Input.Read(ctx)
|
|
if err != nil && errors.Is(err, EOF) {
|
|
return nil
|
|
} else if err != nil {
|
|
return err
|
|
}
|
|
|
|
select {
|
|
case rdch <- inp:
|
|
case <-ctx.Done():
|
|
// will also close channel causing
|
|
// all routines to complete
|
|
return nil
|
|
}
|
|
|
|
count++
|
|
|
|
if count == r.Limit { // will never happen if limit set to 0
|
|
return nil
|
|
}
|
|
}
|
|
})
|
|
|
|
// read output of Processor and write to Writer
|
|
grp.Go(func() error {
|
|
// not paying attention to context here
|
|
// because we must complete writes
|
|
for outp := range wrch {
|
|
if err := r.Output.Write(ctx, outp); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
// run processing routines
|
|
grp.Go(func() error {
|
|
// will close write chan once
|
|
// all workers are done
|
|
defer close(wrch)
|
|
|
|
workergrp, innrctx := errgroup.WithContext(ctx)
|
|
|
|
for range r.Concurrency {
|
|
workergrp.Go(func() error {
|
|
// not paying attention to context here
|
|
// because we must complete writes
|
|
for inp := range rdch {
|
|
result, err := r.Processor(innrctx, inp)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
wrch <- result
|
|
|
|
// if one of workers died or parent context expired
|
|
// we should die too
|
|
if err := innrctx.Err(); err != nil {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
if err := workergrp.Wait(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err := grp.Wait(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|