From 73f9ba3c5937e5f7c540e5d6271fda5c5f3d13d2 Mon Sep 17 00:00:00 2001 From: Dmitry Fedotov Date: Sat, 3 May 2025 20:27:04 +0300 Subject: [PATCH] init --- db.go | 24 ++++++++++++ go.mod | 5 +++ go.sum | 2 + init_db.sql | 6 +++ outbox.go | 103 +++++++++++++++++++++++++++++++++++++++++++++++++ outbox_test.go | 69 +++++++++++++++++++++++++++++++++ 6 files changed, 209 insertions(+) create mode 100644 db.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 init_db.sql create mode 100644 outbox.go create mode 100644 outbox_test.go diff --git a/db.go b/db.go new file mode 100644 index 0000000..1fb0430 --- /dev/null +++ b/db.go @@ -0,0 +1,24 @@ +package outbox + +import ( + "context" + "database/sql" + _ "embed" + "errors" +) + +var ( + ErrMigrate = errors.New("error creating outbox table") +) + +//go:embed init_db.sql +var initStatement string + +func initDB(ctx context.Context, db *sql.DB) error { + _, err := db.ExecContext(ctx, initStatement) + if err != nil { + return errors.Join(ErrMigrate, err) + } + + return nil +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..1baa236 --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module code.uint32.ru/tiny/outbox + +go 1.24.1 + +require github.com/mattn/go-sqlite3 v1.14.28 // indirect diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..42e5bac --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +github.com/mattn/go-sqlite3 v1.14.28 h1:ThEiQrnbtumT+QMknw63Befp/ce/nUPgBPMlRFEum7A= +github.com/mattn/go-sqlite3 v1.14.28/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= diff --git a/init_db.sql b/init_db.sql new file mode 100644 index 0000000..4c91cf8 --- /dev/null +++ b/init_db.sql @@ -0,0 +1,6 @@ +CREATE TABLE IF NOT EXISTS outbox ( + id text PRIMARY KEY, + created_at timestamp with time zone default now, + is_processed boolean default false, + payload jsonb +) diff --git a/outbox.go b/outbox.go new file mode 100644 index 0000000..71f0a70 --- /dev/null +++ b/outbox.go @@ -0,0 +1,103 @@ +package outbox + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "time" +) + +var ( + ErrMarshal = errors.New("error marshaling value") + ErrInsert = errors.New("error inserting value") + ErrSelectPending = errors.New("error selecting pending objects") + ErrUpdate = errors.New("error updating object") + ErrCleanup = errors.New("error deleting processed items") +) + +type Box[T any] interface { + Save(ctx context.Context, id string, v *T) error + MarkProcessed(ctx context.Context, id string) error + GetPending(ctx context.Context, n int) ([]*T, error) +} + +func OpenOrCreate[T any](ctx context.Context, db *sql.DB) (*Outbox[T], error) { + if err := initDB(ctx, db); err != nil { + return nil, err + } + + return &Outbox[T]{ + db: db, + }, nil +} + +type Outbox[T any] struct { + db *sql.DB +} + +func (o *Outbox[T]) Save(ctx context.Context, id string, v *T) error { + now := time.Now() + + b, err := json.Marshal(v) + if err != nil { + return errors.Join(ErrMarshal, err) + } + + stmt := "INSERT INTO outbox (id, created_at, is_processed, payload) VALUES ($1, $2, $3, $4)" + + if _, err := o.db.ExecContext(ctx, stmt, id, now, false, b); err != nil { + return errors.Join(ErrInsert, err) + } + + return nil +} + +func (o *Outbox[T]) MarkProcessed(ctx context.Context, id string) error { + stmt := "UPDATE outbox SET is_processed = true WHERE id = $1" + + if _, err := o.db.ExecContext(ctx, stmt, id); err != nil { + return errors.Join(ErrUpdate, err) + } + + return nil +} + +func (o *Outbox[T]) GetPending(ctx context.Context, n int) ([]*T, error) { + stmt := "SELECT payload FROM outbox WHERE is_processed = false order by created_at desc limit $1" + + rows, err := o.db.QueryContext(ctx, stmt, n) + if err != nil { + return nil, errors.Join(ErrSelectPending, err) + } + + defer rows.Close() + + out := make([]*T, 0) + + for rows.Next() { + b := []byte{} + if err := rows.Scan(&b); err != nil { + return nil, err + } + + v := new(T) + if err := json.Unmarshal(b, v); err != nil { + return nil, err + } + + out = append(out, v) + } + + return out, nil +} + +func (o *Outbox[T]) Cleanup(ctx context.Context) error { + stmt := "DELETE FROM outbox WHERE is_processed = true" + + if _, err := o.db.ExecContext(ctx, stmt); err != nil { + return errors.Join(ErrCleanup, err) + } + + return nil +} diff --git a/outbox_test.go b/outbox_test.go new file mode 100644 index 0000000..0bff76d --- /dev/null +++ b/outbox_test.go @@ -0,0 +1,69 @@ +package outbox_test + +import ( + "context" + "database/sql" + "os" + "testing" + + _ "github.com/mattn/go-sqlite3" + + "code.uint32.ru/tiny/outbox" +) + +type My struct { + A string + B int +} + +func TestOutboxMethods(t *testing.T) { + db, err := sql.Open("sqlite3", "test.db") + if err != nil { + t.Fatal(err) + } + + defer os.Remove("test.db") + defer db.Close() + + ctx := context.Background() + + box, err := outbox.OpenOrCreate[My](ctx, db) + if err != nil { + t.Fatal(err) + } + + one := My{ + A: "hello", + B: 42, + } + + if err := box.Save(ctx, "test", &one); err != nil { + t.Fatal(err) + } + + lst, err := box.GetPending(ctx, 100) + if err != nil { + t.Fatal(err) + } + + if len(lst) != 1 { + t.Fatal("incorrect pending len") + } + + if err := box.MarkProcessed(ctx, "test"); err != nil { + t.Fatal(err) + } + + lst, err = box.GetPending(ctx, 100) + if err != nil { + t.Fatal(err) + } + + if len(lst) != 0 { + t.Fatal("should be zero pending") + } + + if err := box.Cleanup(ctx); err != nil { + t.Fatal(err) + } +}