Files
script/runner.go

198 lines
4.3 KiB
Go
Raw Permalink Normal View History

2025-08-30 22:00:31 +03:00
package script
import (
"context"
"errors"
"fmt"
"io"
2025-10-14 22:23:06 +03:00
"sync/atomic"
2025-08-30 22:00:31 +03:00
"golang.org/x/sync/errgroup"
)
var (
EOF error = io.EOF
2025-10-14 22:23:06 +03:00
ErrNoProcessors = errors.New("script: no processors provided")
2025-08-30 22:00:31 +03:00
)
var (
_ Reader = &CSVReader{}
_ Reader = &MemReader{}
_ Reader = &StdinReader{}
_ Writer = &CSVWriter{}
_ Writer = &MemWriter{}
_ Writer = &StdoutWriter{}
_ Writer = &NopWriter{}
)
type Reader interface {
// Read is called by Processor to obtain input
// for processing. Read must return EOF/io.EOF to indicate
// that there is no more input.
Read() ([]string, error)
2025-08-30 22:00:31 +03:00
}
type Writer interface {
// Write is called by Processor to record
// output.
Write([]string) error
2025-08-30 22:00:31 +03:00
}
2025-08-30 23:26:56 +03:00
// Processor accepts string slice, does what it should
2025-08-30 22:00:31 +03:00
// and returns output. Non-nil error returned
2025-08-30 23:26:56 +03:00
// by Processor stops further processing.
2025-08-30 22:00:31 +03:00
type Processor func(context.Context, []string) ([]string, error)
type RunConfig struct {
Input Reader
Output Writer
Processor Processor
Offset int
Limit int
Concurrency int
}
2025-10-14 22:23:06 +03:00
type RunResult struct {
Read int // number of records read without error (offset count not included)
Processed int // number of records processed without error
Written int // number of records written to Writer without error
}
2025-08-30 23:26:56 +03:00
// Run starts the script described by r.
// First Read is called offset times with output of Read being discarded.
2025-08-30 22:00:31 +03:00
// Then limit Reads are made and processor is called for each portion
2025-10-14 22:23:06 +03:00
// of data. If limit is 0 then Run keeps processing input until it receives
2025-08-30 22:00:31 +03:00
// EOF from Reader.
2025-08-30 23:26:56 +03:00
// Run fails on any error including Reader error, Writer error and Processor error.
2025-10-14 22:23:06 +03:00
// The returned RunResult is AWAYS VALID and indicates the actual progress of script.
// Returned error explains why Run failed. It may be either read, process or write error.
func Run(ctx context.Context, r RunConfig) (RunResult, error) {
if r.Concurrency <= 0 {
2025-08-30 22:00:31 +03:00
r.Concurrency = 1
}
grp, ctx := errgroup.WithContext(ctx)
rdch := make(chan []string, r.Concurrency)
wrch := make(chan []string, r.Concurrency)
2025-10-14 22:23:06 +03:00
var read, proc, written uint32
2025-08-30 22:00:31 +03:00
// read input from Reader and forward to Processor
grp.Go(func() error {
2025-10-14 22:23:06 +03:00
// closing chan to Processor
2025-08-30 22:00:31 +03:00
defer close(rdch)
for range r.Offset {
_, err := r.Input.Read()
2025-08-30 22:00:31 +03:00
if err != nil {
2025-10-14 22:23:06 +03:00
return fmt.Errorf("script: could not advance to required offset (%d): %s", r.Offset, err)
2025-08-30 22:00:31 +03:00
}
}
count := 0
2025-10-14 22:23:06 +03:00
2025-08-30 22:00:31 +03:00
for {
inp, err := r.Input.Read()
2025-10-14 22:23:06 +03:00
if errors.Is(err, EOF) {
2025-08-30 22:00:31 +03:00
return nil
} else if err != nil {
2025-10-14 22:23:06 +03:00
return fmt.Errorf("script: read error: %s", err)
2025-08-30 22:00:31 +03:00
}
2025-10-14 22:23:06 +03:00
// increment read count
read++
2025-08-30 22:00:31 +03:00
select {
case rdch <- inp:
case <-ctx.Done():
// will also close channel causing
// all routines to complete
return nil
}
count++
2025-10-14 22:23:06 +03:00
if count == r.Limit { // will never happen if limit has been set to 0
2025-08-30 22:00:31 +03:00
return nil
}
}
})
// read output of Processor and write to Writer
grp.Go(func() error {
2025-10-14 22:23:06 +03:00
defer func() {
for range wrch {
// NOP to drain channel
}
}()
2025-08-30 22:00:31 +03:00
// not paying attention to context here
// because we must complete writes
// this is run within group so that write
// error would cancel group context
2025-08-30 22:00:31 +03:00
for outp := range wrch {
if err := r.Output.Write(outp); err != nil {
2025-10-14 22:23:06 +03:00
return fmt.Errorf("script: write error: %s", err)
2025-08-30 22:00:31 +03:00
}
2025-10-14 22:23:06 +03:00
//increment write count
written++
2025-08-30 22:00:31 +03:00
}
return nil
})
2025-08-30 23:26:56 +03:00
// run processing routines
2025-08-30 22:00:31 +03:00
grp.Go(func() error {
2025-10-14 22:23:06 +03:00
// closing chan to Writer
2025-08-30 22:00:31 +03:00
defer close(wrch)
2025-10-14 22:23:06 +03:00
defer func() {
for range rdch {
// NOP to drain channel
}
}()
2025-08-30 22:00:31 +03:00
2025-10-14 22:23:06 +03:00
workergrp := errgroup.Group{}
2025-08-30 22:00:31 +03:00
for range r.Concurrency {
workergrp.Go(func() error {
for inp := range rdch {
2025-10-14 22:23:06 +03:00
result, err := r.Processor(ctx, inp)
2025-08-30 22:00:31 +03:00
if err != nil {
2025-10-14 22:23:06 +03:00
return fmt.Errorf("script: process error: %s", err)
2025-08-30 22:00:31 +03:00
}
2025-10-14 22:23:06 +03:00
// increment processed count
atomic.AddUint32(&proc, 1)
2025-08-30 23:26:56 +03:00
2025-10-14 22:23:06 +03:00
select {
case wrch <- result:
case <-ctx.Done():
// this case is a must if writer fails
// otherwise we'd want to push process result
// to wrch
2025-08-30 23:26:56 +03:00
return nil
}
2025-08-30 22:00:31 +03:00
}
return nil
})
}
if err := workergrp.Wait(); err != nil {
return err
}
return nil
})
2025-10-14 22:23:06 +03:00
err := grp.Wait() // if this is a context expiry then error is nil
2025-08-30 22:00:31 +03:00
2025-10-14 22:23:06 +03:00
return RunResult{
Read: int(read),
Processed: int(proc),
Written: int(written),
}, err
2025-08-30 22:00:31 +03:00
}