Compare commits
1 Commits
tidy-up
...
858cae9547
Author | SHA1 | Date | |
---|---|---|---|
858cae9547 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,3 +1,2 @@
|
|||||||
cmd
|
cmd
|
||||||
coverage.out
|
coverage.out
|
||||||
example
|
|
||||||
|
22
LICENSE
22
LICENSE
@@ -1,22 +0,0 @@
|
|||||||
Copyright 2025 Dmitry Fedotov
|
|
||||||
|
|
||||||
Permission is hereby granted, free of charge, to any person
|
|
||||||
obtaining a copy of this software and associated documentation
|
|
||||||
files (the “Software”), to deal in the Software without
|
|
||||||
restriction, including without limitation the rights to use,
|
|
||||||
copy, modify, merge, publish, distribute, sublicense, and/or
|
|
||||||
sell copies of the Software, and to permit persons to whom
|
|
||||||
the Software is furnished to do so,
|
|
||||||
subject to the following conditions:
|
|
||||||
|
|
||||||
The above copyright notice and this permission notice shall
|
|
||||||
be included in all copies or substantial portions of the Software.
|
|
||||||
|
|
||||||
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND,
|
|
||||||
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
|
|
||||||
OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
|
||||||
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
|
||||||
HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
|
||||||
WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
||||||
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
|
|
||||||
OR OTHER DEALINGS IN THE SOFTWARE.
|
|
137
README.md
137
README.md
@@ -1,137 +0,0 @@
|
|||||||
# package script
|
|
||||||
|
|
||||||
This package is intended as a collection of helper functions and tools fow quick construction of scripts that process .csv tables.
|
|
||||||
|
|
||||||
Note that this is a work in progress. API is not guaranteed to be stable.
|
|
||||||
|
|
||||||
## Example usage
|
|
||||||
|
|
||||||
This demonstrates a very basic case: reading from stdin, checking whether array len is 2 and switching array elements.
|
|
||||||
|
|
||||||
```go
|
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"os/signal"
|
|
||||||
"syscall"
|
|
||||||
|
|
||||||
"code.uint32.ru/dmitry/script"
|
|
||||||
)
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
|
|
||||||
defer stop()
|
|
||||||
|
|
||||||
reader := script.NewStdinReader()
|
|
||||||
writer := script.NewStdoutWriter()
|
|
||||||
|
|
||||||
processor := func(_ context.Context, in []string) ([]string, error) {
|
|
||||||
if len(in) != 2 {
|
|
||||||
return nil, errors.New("incorrect input len")
|
|
||||||
}
|
|
||||||
|
|
||||||
return []string{in[1], in[0]}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
conf := script.RunConfig{
|
|
||||||
Input: reader,
|
|
||||||
Output: writer,
|
|
||||||
Processor: processor,
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := script.Run(ctx, conf); err != nil {
|
|
||||||
fmt.Println(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
A more conplicated example featuring a processor which is itself a struct with external dependency.
|
|
||||||
|
|
||||||
```go
|
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"os/signal"
|
|
||||||
"syscall"
|
|
||||||
|
|
||||||
"code.uint32.ru/dmitry/script"
|
|
||||||
)
|
|
||||||
|
|
||||||
// ExternalDep represent an external dependency
|
|
||||||
// which your script utilizes. This can be a DB connection etc.
|
|
||||||
type ExternalDep interface {
|
|
||||||
QueryID(string) ([]string, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type Processor struct {
|
|
||||||
dep ExternalDep
|
|
||||||
}
|
|
||||||
|
|
||||||
func New(e ExternalDep) *Processor {
|
|
||||||
|
|
||||||
return &Processor{
|
|
||||||
dep: e,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Processor) Process(ctx context.Context, in []string) ([]string, error) {
|
|
||||||
var out []string
|
|
||||||
|
|
||||||
result, err := p.dep.QueryID(in[0])
|
|
||||||
if err != nil {
|
|
||||||
// you can as well record the error and continue
|
|
||||||
// processing without yielding the error here
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
out = append(out, result...)
|
|
||||||
out = append(out, "processed")
|
|
||||||
|
|
||||||
return out, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
|
|
||||||
defer stop()
|
|
||||||
|
|
||||||
r, err := script.NewCSVReader("/tmp/some_file.csv")
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
if err := r.Close(); err != nil {
|
|
||||||
fmt.Println("err closing reader", err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
w, err := script.NewCSVWriter("/tmp/some_output.csv")
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
if err := w.Close(); err != nil {
|
|
||||||
fmt.Println("err closing writer", err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// intializing the processor
|
|
||||||
p := New(ExternalDep(nil))
|
|
||||||
|
|
||||||
conf := script.RunConfig{
|
|
||||||
Input: r,
|
|
||||||
Output: w,
|
|
||||||
Processor: p.Process, // Process implements script.Processor
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := script.Run(ctx, conf); err != nil {
|
|
||||||
fmt.Println(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
```
|
|
@@ -1,6 +1,7 @@
|
|||||||
package script
|
package script
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"io"
|
"io"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -15,7 +16,7 @@ func NewMemReader(records [][]string) *MemReader {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MemReader) Read() ([]string, error) {
|
func (m *MemReader) Read(context.Context) ([]string, error) {
|
||||||
if m.curr == len(m.rows) || len(m.rows) == 0 {
|
if m.curr == len(m.rows) || len(m.rows) == 0 {
|
||||||
return nil, io.EOF
|
return nil, io.EOF
|
||||||
}
|
}
|
||||||
|
@@ -1,6 +1,7 @@
|
|||||||
package script
|
package script
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/csv"
|
"encoding/csv"
|
||||||
"os"
|
"os"
|
||||||
)
|
)
|
||||||
@@ -15,7 +16,7 @@ type CSVReader struct {
|
|||||||
rdr *csv.Reader
|
rdr *csv.Reader
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *CSVReader) Read() ([]string, error) {
|
func (c *CSVReader) Read(context.Context) ([]string, error) {
|
||||||
return c.rdr.Read()
|
return c.rdr.Read()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -2,6 +2,7 @@ package script
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
|
"context"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -22,7 +23,7 @@ func NewStdinReader() *StdinReader {
|
|||||||
// then splits result at "," separator and returns
|
// then splits result at "," separator and returns
|
||||||
// the resulting slice.
|
// the resulting slice.
|
||||||
// It returns EOF when nothing left to read.
|
// It returns EOF when nothing left to read.
|
||||||
func (s *StdinReader) Read() ([]string, error) {
|
func (s *StdinReader) Read(_ context.Context) ([]string, error) {
|
||||||
if !s.scanner.Scan() {
|
if !s.scanner.Scan() {
|
||||||
err := s.scanner.Err()
|
err := s.scanner.Err()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@@ -1,6 +1,7 @@
|
|||||||
package script
|
package script
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/csv"
|
"encoding/csv"
|
||||||
"errors"
|
"errors"
|
||||||
"os"
|
"os"
|
||||||
@@ -19,7 +20,7 @@ type CSVWriter struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Write writer row to csv. Writes are buffered
|
// Write writer row to csv. Writes are buffered
|
||||||
func (c *CSVWriter) Write(record []string) error {
|
func (c *CSVWriter) Write(_ context.Context, record []string) error {
|
||||||
err := c.wr.Write(record)
|
err := c.wr.Write(record)
|
||||||
c.err = err
|
c.err = err
|
||||||
|
|
||||||
|
@@ -1,11 +1,13 @@
|
|||||||
package script
|
package script
|
||||||
|
|
||||||
|
import "context"
|
||||||
|
|
||||||
type NopWriter struct{}
|
type NopWriter struct{}
|
||||||
|
|
||||||
func NewNopWriter() *NopWriter {
|
func NewNopWriter() *NopWriter {
|
||||||
return new(NopWriter)
|
return new(NopWriter)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *NopWriter) Write([]string) error {
|
func (d *NopWriter) Write(context.Context, []string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
@@ -1,5 +1,7 @@
|
|||||||
package script
|
package script
|
||||||
|
|
||||||
|
import "context"
|
||||||
|
|
||||||
type MemWriter struct {
|
type MemWriter struct {
|
||||||
rows [][]string
|
rows [][]string
|
||||||
}
|
}
|
||||||
@@ -8,7 +10,7 @@ func NewMemWriter() *MemWriter {
|
|||||||
return new(MemWriter)
|
return new(MemWriter)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MemWriter) Write(record []string) error {
|
func (m *MemWriter) Write(_ context.Context, record []string) error {
|
||||||
m.rows = append(m.rows, record)
|
m.rows = append(m.rows, record)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@@ -1,6 +1,7 @@
|
|||||||
package script
|
package script
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
@@ -11,7 +12,7 @@ func NewStdoutWriter() *StdoutWriter {
|
|||||||
return new(StdoutWriter)
|
return new(StdoutWriter)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *StdoutWriter) Write(in []string) error {
|
func (s *StdoutWriter) Write(_ context.Context, in []string) error {
|
||||||
_, err := os.Stdout.WriteString(strings.Join(in, ",") + "\n")
|
_, err := os.Stdout.WriteString(strings.Join(in, ",") + "\n")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
12
runner.go
12
runner.go
@@ -28,13 +28,13 @@ type Reader interface {
|
|||||||
// Read is called by Processor to obtain input
|
// Read is called by Processor to obtain input
|
||||||
// for processing. Read must return EOF/io.EOF to indicate
|
// for processing. Read must return EOF/io.EOF to indicate
|
||||||
// that there is no more input.
|
// that there is no more input.
|
||||||
Read() ([]string, error)
|
Read(context.Context) ([]string, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type Writer interface {
|
type Writer interface {
|
||||||
// Write is called by Processor to record
|
// Write is called by Processor to record
|
||||||
// output.
|
// output.
|
||||||
Write([]string) error
|
Write(context.Context, []string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Processor accepts string slice, does what it should
|
// Processor accepts string slice, does what it should
|
||||||
@@ -76,7 +76,7 @@ func Run(ctx context.Context, r RunConfig) error {
|
|||||||
defer close(rdch)
|
defer close(rdch)
|
||||||
|
|
||||||
for range r.Offset {
|
for range r.Offset {
|
||||||
_, err := r.Input.Read()
|
_, err := r.Input.Read(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not advance to required offset: %w", err)
|
return fmt.Errorf("could not advance to required offset: %w", err)
|
||||||
}
|
}
|
||||||
@@ -84,7 +84,7 @@ func Run(ctx context.Context, r RunConfig) error {
|
|||||||
|
|
||||||
count := 0
|
count := 0
|
||||||
for {
|
for {
|
||||||
inp, err := r.Input.Read()
|
inp, err := r.Input.Read(ctx)
|
||||||
if err != nil && errors.Is(err, EOF) {
|
if err != nil && errors.Is(err, EOF) {
|
||||||
return nil
|
return nil
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
@@ -111,10 +111,8 @@ func Run(ctx context.Context, r RunConfig) error {
|
|||||||
grp.Go(func() error {
|
grp.Go(func() error {
|
||||||
// not paying attention to context here
|
// not paying attention to context here
|
||||||
// because we must complete writes
|
// because we must complete writes
|
||||||
// this is run within group so that write
|
|
||||||
// error would cancel group context
|
|
||||||
for outp := range wrch {
|
for outp := range wrch {
|
||||||
if err := r.Output.Write(outp); err != nil {
|
if err := r.Output.Write(ctx, outp); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -41,7 +41,7 @@ func TestBasicRun(t *testing.T) {
|
|||||||
|
|
||||||
type infiniteReader struct{}
|
type infiniteReader struct{}
|
||||||
|
|
||||||
func (ir *infiniteReader) Read() ([]string, error) {
|
func (ir *infiniteReader) Read(_ context.Context) ([]string, error) {
|
||||||
return []string{"infinity", "looks", "like", "this"}, nil
|
return []string{"infinity", "looks", "like", "this"}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user