Files
script/runner.go
Dmitry Fedotov 454c632462 feat: revise API, add README.md, LICENSE (#2)
Co-authored-by: Dmitry Fedotov <dmitry@uint32.ru>
Co-committed-by: Dmitry Fedotov <dmitry@uint32.ru>
2025-08-31 13:38:54 +03:00

169 lines
3.6 KiB
Go

package script
import (
"context"
"errors"
"fmt"
"io"
"golang.org/x/sync/errgroup"
)
var (
EOF error = io.EOF
ErrNoProcessors = errors.New("no processors provided")
)
var (
_ Reader = &CSVReader{}
_ Reader = &MemReader{}
_ Reader = &StdinReader{}
_ Writer = &CSVWriter{}
_ Writer = &MemWriter{}
_ Writer = &StdoutWriter{}
_ Writer = &NopWriter{}
)
type Reader interface {
// Read is called by Processor to obtain input
// for processing. Read must return EOF/io.EOF to indicate
// that there is no more input.
Read() ([]string, error)
}
type Writer interface {
// Write is called by Processor to record
// output.
Write([]string) error
}
// Processor accepts string slice, does what it should
// and returns output. Non-nil error returned
// by Processor stops further processing.
type Processor func(context.Context, []string) ([]string, error)
type RunConfig struct {
Input Reader
Output Writer
Processor Processor
Offset int
Limit int
Concurrency int
}
// Run starts the script described by r.
// First Read is called offset times with output of Read being discarded.
// Then limit Reads are made and processor is called for each portion
// of data. If limit is 0 then Run keep processing input until it receives
// EOF from Reader.
// Run fails on any error including Reader error, Writer error and Processor error.
// If an error is encountered the writer operation will be attampted anyway so that
// the output is left in consistent state, recording what has been actually done
// by Processor.
func Run(ctx context.Context, r RunConfig) error {
if r.Concurrency == 0 {
r.Concurrency = 1
}
grp, ctx := errgroup.WithContext(ctx)
rdch := make(chan []string, r.Concurrency)
wrch := make(chan []string, r.Concurrency)
// read input from Reader and forward to Processor
grp.Go(func() error {
// closing chan for processor to complete operations
defer close(rdch)
for range r.Offset {
_, err := r.Input.Read()
if err != nil {
return fmt.Errorf("could not advance to required offset: %w", err)
}
}
count := 0
for {
inp, err := r.Input.Read()
if err != nil && errors.Is(err, EOF) {
return nil
} else if err != nil {
return err
}
select {
case rdch <- inp:
case <-ctx.Done():
// will also close channel causing
// all routines to complete
return nil
}
count++
if count == r.Limit { // will never happen if limit set to 0
return nil
}
}
})
// read output of Processor and write to Writer
grp.Go(func() error {
// not paying attention to context here
// because we must complete writes
// this is run within group so that write
// error would cancel group context
for outp := range wrch {
if err := r.Output.Write(outp); err != nil {
return err
}
}
return nil
})
// run processing routines
grp.Go(func() error {
// will close write chan once
// all workers are done
defer close(wrch)
workergrp, innrctx := errgroup.WithContext(ctx)
for range r.Concurrency {
workergrp.Go(func() error {
// not paying attention to context here
// because we must complete writes
for inp := range rdch {
result, err := r.Processor(innrctx, inp)
if err != nil {
return err
}
wrch <- result
// if one of workers died or parent context expired
// we should die too
if err := innrctx.Err(); err != nil {
return nil
}
}
return nil
})
}
if err := workergrp.Wait(); err != nil {
return err
}
return nil
})
if err := grp.Wait(); err != nil {
return err
}
return nil
}