From ec5db88f977619a1cb8c59f8ec863c9842aad60d Mon Sep 17 00:00:00 2001 From: Dmitry Fedotov Date: Sat, 30 Aug 2025 22:00:31 +0300 Subject: [PATCH] init: basic tools --- .gitignore | 2 + Makefile | 8 +++ go.mod | 5 ++ go.sum | 2 + input_mem.go | 29 +++++++++ input_scv.go | 36 +++++++++++ input_stdin.go | 37 +++++++++++ output_csv.go | 71 +++++++++++++++++++++ output_discard.go | 13 ++++ output_mem.go | 21 ++++++ output_stdout.go | 18 ++++++ runner.go | 159 ++++++++++++++++++++++++++++++++++++++++++++++ runner_test.go | 66 +++++++++++++++++++ 13 files changed, 467 insertions(+) create mode 100644 .gitignore create mode 100644 Makefile create mode 100644 go.mod create mode 100644 go.sum create mode 100644 input_mem.go create mode 100644 input_scv.go create mode 100644 input_stdin.go create mode 100644 output_csv.go create mode 100644 output_discard.go create mode 100644 output_mem.go create mode 100644 output_stdout.go create mode 100644 runner.go create mode 100644 runner_test.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e3effdd --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +cmd +coverage.out diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..0c28b2a --- /dev/null +++ b/Makefile @@ -0,0 +1,8 @@ +.PHONY: coverage test + +coverage: + go test -coverprofile=coverage.out + go tool cover -html=coverage.out + +test: + go test -v -cover ./... diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..a8fc2e7 --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module code.uint32.ru/dmitry/script + +go 1.24.5 + +require golang.org/x/sync v0.16.0 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..878acfe --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= +golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= diff --git a/input_mem.go b/input_mem.go new file mode 100644 index 0000000..c4ce808 --- /dev/null +++ b/input_mem.go @@ -0,0 +1,29 @@ +package script + +import ( + "context" + "io" +) + +type MemReader struct { + rows [][]string + curr int +} + +func NewMemReader(records [][]string) *MemReader { + return &MemReader{ + rows: records, + } +} + +func (m *MemReader) Read(context.Context) ([]string, error) { + if m.curr == len(m.rows) || len(m.rows) == 0 { + return nil, io.EOF + } + + defer func() { + m.curr++ + }() + + return m.rows[m.curr], nil +} diff --git a/input_scv.go b/input_scv.go new file mode 100644 index 0000000..9277990 --- /dev/null +++ b/input_scv.go @@ -0,0 +1,36 @@ +package script + +import ( + "context" + "encoding/csv" + "os" +) + +// CSV creates csv.Reader reading from filename. +func NewCSVReader(filename string) (*CSVReader, error) { + return newCSVreader(filename) +} + +type CSVReader struct { + f *os.File + rdr *csv.Reader +} + +func (c *CSVReader) Read(context.Context) ([]string, error) { + return c.rdr.Read() +} + +func (c *CSVReader) Close() error { + return c.f.Close() +} + +func newCSVreader(name string) (*CSVReader, error) { + f, err := os.Open(name) + if err != nil { + return nil, err + } + + csvr := csv.NewReader(f) + + return &CSVReader{f: f, rdr: csvr}, nil +} diff --git a/input_stdin.go b/input_stdin.go new file mode 100644 index 0000000..52523f5 --- /dev/null +++ b/input_stdin.go @@ -0,0 +1,37 @@ +package script + +import ( + "bufio" + "context" + "io" + "os" + "strings" +) + +// StdinReader reads from stdin +type StdinReader struct { + scanner bufio.Scanner +} + +func NewStdinReader() *StdinReader { + return &StdinReader{ + scanner: *bufio.NewScanner(os.Stdin), + } +} + +// Read reads from stdin until \n character +// then splits result at "," separator and returns +// the resulting slice. +// It returns EOF when nothing left to read. +func (s *StdinReader) Read(_ context.Context) ([]string, error) { + if !s.scanner.Scan() { + err := s.scanner.Err() + if err == nil { + return nil, io.EOF + } + + return nil, err + } + + return strings.Split(s.scanner.Text(), ","), nil +} diff --git a/output_csv.go b/output_csv.go new file mode 100644 index 0000000..de57b77 --- /dev/null +++ b/output_csv.go @@ -0,0 +1,71 @@ +package script + +import ( + "context" + "encoding/csv" + "errors" + "os" +) + +// CSV creates csv.Writer writing to underlying file. +// Do not forget to call Close method once you are done. +func NewCSVWriter(filename string) (*CSVWriter, error) { + return newCSVwriter(filename) +} + +type CSVWriter struct { + f *os.File + wr *csv.Writer + err error +} + +// Write writer row to csv. Writes are buffered +func (c *CSVWriter) Write(_ context.Context, record []string) error { + err := c.wr.Write(record) + c.err = err + + return err +} + +// Close flushes underlying csv.Writer and closes +// file. +func (c *CSVWriter) Close() error { + c.wr.Flush() + flushErr := c.wr.Error() + closeErr := c.f.Close() + + err := errors.Join(flushErr, closeErr) + c.err = err + + if err != nil { + return err + } + + return nil +} + +// Error return last encountered error. +func (c *CSVWriter) Error() error { + return c.err +} + +func newCSVwriter(name string) (*CSVWriter, error) { + var ( + f *os.File + err error + ) + + if _, err = os.Stat(name); err == nil { + f, err = os.OpenFile(name, os.O_APPEND|os.O_WRONLY, 0664) + } else { + f, err = os.Create(name) + } + + if err != nil { + return nil, err + } + + csvwr := csv.NewWriter(f) + + return &CSVWriter{f: f, wr: csvwr}, nil +} diff --git a/output_discard.go b/output_discard.go new file mode 100644 index 0000000..7891865 --- /dev/null +++ b/output_discard.go @@ -0,0 +1,13 @@ +package script + +import "context" + +type NopWriter struct{} + +func NewNopWriter() *NopWriter { + return new(NopWriter) +} + +func (d *NopWriter) Write(context.Context, []string) error { + return nil +} diff --git a/output_mem.go b/output_mem.go new file mode 100644 index 0000000..c6d36c6 --- /dev/null +++ b/output_mem.go @@ -0,0 +1,21 @@ +package script + +import "context" + +type MemWriter struct { + rows [][]string +} + +func NewMemWriter() *MemWriter { + return new(MemWriter) +} + +func (m *MemWriter) Write(_ context.Context, record []string) error { + m.rows = append(m.rows, record) + + return nil +} + +func (m *MemWriter) Output() [][]string { + return m.rows +} diff --git a/output_stdout.go b/output_stdout.go new file mode 100644 index 0000000..5cc5fc2 --- /dev/null +++ b/output_stdout.go @@ -0,0 +1,18 @@ +package script + +import ( + "context" + "os" + "strings" +) + +type StdoutWriter struct{} + +func NewStdoutWriter() *StdoutWriter { + return new(StdoutWriter) +} + +func (s *StdoutWriter) Write(_ context.Context, in []string) error { + _, err := os.Stdout.WriteString(strings.Join(in, ",") + "\n") + return err +} diff --git a/runner.go b/runner.go new file mode 100644 index 0000000..6689a73 --- /dev/null +++ b/runner.go @@ -0,0 +1,159 @@ +package script + +import ( + "context" + "errors" + "fmt" + "io" + + "golang.org/x/sync/errgroup" +) + +var ( + EOF error = io.EOF + ErrNoProcessors = errors.New("no processors provided") +) + +var ( + _ Reader = &CSVReader{} + _ Reader = &MemReader{} + _ Reader = &StdinReader{} + _ Writer = &CSVWriter{} + _ Writer = &MemWriter{} + _ Writer = &StdoutWriter{} + _ Writer = &NopWriter{} +) + +type Reader interface { + // Read is called by Processor to obtain input + // for processing. Read must return EOF/io.EOF to indicate + // that there is no more input. + Read(context.Context) ([]string, error) +} + +type Writer interface { + // Write is called by Processor to record + // output. + Write(context.Context, []string) error +} + +// Process accepts string slice, does what it should +// and returns output. Non-nil error returned +// by Processor terminates stops further processing. +type Processor func(context.Context, []string) ([]string, error) + +type RunConfig struct { + Input Reader + Output Writer + Processor Processor + Offset int + Limit int + Concurrency int +} + +// Run starts concurrency threads (goroutines), reads from provided Reader, +// executes each Processor in the order they were provide and records result +// with provided Writer. +// At first Read is called offset times with output of Read being discarded. +// Then limit Reads are made and processor is called for each portion +// of data. If limit is 0 then Runner keep processing input until it receives +// EOF from Reader. +func Run(ctx context.Context, r RunConfig) error { + if r.Concurrency == 0 { + r.Concurrency = 1 + } + + grp, ctx := errgroup.WithContext(ctx) + + rdch := make(chan []string, r.Concurrency) + wrch := make(chan []string, r.Concurrency) + + // read input from Reader and forward to Processor + grp.Go(func() error { + // closing chan for processor to complete operations + defer close(rdch) + + for range r.Offset { + _, err := r.Input.Read(ctx) + if err != nil { + return fmt.Errorf("could not advance to required offset: %w", err) + } + } + + count := 0 + for { + inp, err := r.Input.Read(ctx) + if err != nil && errors.Is(err, EOF) { + return nil + } else if err != nil { + return err + } + + select { + case rdch <- inp: + case <-ctx.Done(): + // will also close channel causing + // all routines to complete + return nil + } + + count++ + + if count == r.Limit { // will never happen if limit set to 0 + return nil + } + } + }) + + // read output of Processor and write to Writer + grp.Go(func() error { + // not paying attention to context here + // because we must complete writes + for outp := range wrch { + if err := r.Output.Write(ctx, outp); err != nil { + return err + } + } + + return nil + }) + + grp.Go(func() error { + // will close write chan once + // all workers are done + defer close(wrch) + + workergrp, innrctx := errgroup.WithContext(ctx) + + for range r.Concurrency { + workergrp.Go(func() error { + // not paying attention to context here + // because we must complete writes + for inp := range rdch { + result, err := r.Processor(innrctx, inp) + if err != nil { + return err + } + + wrch <- result + } + + return nil + + }) + + } + + if err := workergrp.Wait(); err != nil { + return err + } + + return nil + }) + + if err := grp.Wait(); err != nil { + return err + } + + return nil +} diff --git a/runner_test.go b/runner_test.go new file mode 100644 index 0000000..da07711 --- /dev/null +++ b/runner_test.go @@ -0,0 +1,66 @@ +package script_test + +import ( + "context" + "reflect" + "testing" + "time" + + "code.uint32.ru/dmitry/script" +) + +var echoProcessor = func(_ context.Context, in []string) ([]string, error) { + return in, nil +} + +func TestBasicRun(t *testing.T) { + t.Parallel() + input := [][]string{ + {"hello", "world"}, + } + + r := script.NewMemReader(input) + w := script.NewMemWriter() + + conf := script.RunConfig{ + Input: r, + Output: w, + Processor: echoProcessor, + } + + if err := script.Run(t.Context(), conf); err != nil { + t.Fatal(err) + } + + output := w.Output() + + if !reflect.DeepEqual(input, output) { + t.Errorf("incorrect output, want: %v, got: %v", input, output) + } +} + +type infiniteReader struct{} + +func (ir *infiniteReader) Read(_ context.Context) ([]string, error) { + return []string{"infinity", "looks", "like", "this"}, nil +} + +func TestRunnerObeysContext(t *testing.T) { + t.Parallel() + + r := &infiniteReader{} + w := script.NewNopWriter() + + conf := script.RunConfig{ + Input: r, + Output: w, + Processor: echoProcessor, + } + + ctx, cancel := context.WithTimeout(t.Context(), time.Millisecond) + defer cancel() + + if err := script.Run(ctx, conf); err != nil { + t.Fatal(err) + } +}