This commit is contained in:
Dmitry Fedotov
2025-05-03 21:35:32 +03:00
parent efc5bea070
commit ff386892e4
6 changed files with 226 additions and 0 deletions

5
go.mod Normal file
View File

@@ -0,0 +1,5 @@
module code.uint32.ru/tiny/outbox
go 1.24.1
require github.com/mattn/go-sqlite3 v1.14.28 // indirect

2
go.sum Normal file
View File

@@ -0,0 +1,2 @@
github.com/mattn/go-sqlite3 v1.14.28 h1:ThEiQrnbtumT+QMknw63Befp/ce/nUPgBPMlRFEum7A=
github.com/mattn/go-sqlite3 v1.14.28/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=

24
init_db.go Normal file
View File

@@ -0,0 +1,24 @@
package outbox
import (
"context"
"database/sql"
_ "embed"
"errors"
)
var (
ErrMigrate = errors.New("error creating outbox table")
)
//go:embed init_db.sql
var initStatement string
func initDB(ctx context.Context, db *sql.DB, tablename string) error {
_, err := db.ExecContext(ctx, initStatement, tablename)
if err != nil {
return errors.Join(ErrMigrate, err)
}
return nil
}

7
init_db.sql Normal file
View File

@@ -0,0 +1,7 @@
CREATE TABLE IF NOT EXISTS $1 (
id text PRIMARY KEY,
created_at timestamp with time zone default now,
updated_at timestamp with time zone default now,
deleted_at timestamp with time zone,
payload jsonb
)

119
repo.go Normal file
View File

@@ -0,0 +1,119 @@
package outbox
import (
"context"
"database/sql"
"encoding/json"
"errors"
"time"
)
var (
ErrSerialize = errors.New("error serialize/deserialize")
ErrInsert = errors.New("error create")
ErrNotFound = errors.New("not found")
ErrRead = errors.New("error read")
ErrUpdate = errors.New("error update")
ErrDelete = errors.New("error delete")
)
func OpenOrCreate[T any](ctx context.Context, db *sql.DB, tablename string) (*Repo[T], error) {
if err := initDB(ctx, db, tablename); err != nil {
return nil, err
}
return &Repo[T]{
db: db,
table: tablename,
}, nil
}
type Repo[T any] struct {
db *sql.DB
table string
}
func (r *Repo[T]) CreateOrUpdate(ctx context.Context, id string, v *T) error {
now := time.Now()
stmt := "INSERT INTO $1 (id, created_at, updated_at, payload) VALUES ($2, $3, $4)"
b, err := serialize(v)
if err != nil {
return err
}
if _, err := r.db.ExecContext(ctx, stmt, r.table, id, now, now, b); err != nil {
return errors.Join(ErrInsert, err)
}
return nil
}
func (r *Repo[T]) Read(ctx context.Context, id string) (*T, error) {
stmt := "SELECT payload FROM $1 WHERE id = $2 AND deleted_at is NULL"
row, err := r.db.QueryContext(ctx, stmt, r.table, id)
if err != nil {
// TODO err not found
return nil, errors.Join(ErrRead, err)
}
b := []byte{}
if err := row.Scan(&b); err != nil {
return nil, err
}
v, err := deserialize[T](b)
if err != nil {
return nil, err
}
return v, nil
}
func (r *Repo[T]) Update(ctx context.Context, id string, v *T) error {
now := time.Now()
stmt := "UPDATE $1 SET updated_at = $3, payload = $4 WHERE id = $2"
b, err := json.Marshal(v)
if err != nil {
return errors.Join(ErrSerialize, err)
}
if _, err := r.db.ExecContext(ctx, stmt, r.table, id, now, b); err != nil {
return errors.Join(ErrUpdate, err)
}
return nil
}
func (r *Repo[T]) Delete(ctx context.Context, id string) error {
now := time.Now()
stmt := "UPDATE $1 SET deleted_at = $3 WHERE id = $2"
if _, err := r.db.ExecContext(ctx, stmt, r.table, now, id); err != nil {
return errors.Join(ErrDelete, err)
}
return nil
}
func serialize(v any) ([]byte, error) {
b, err := json.Marshal(v)
if err != nil {
return nil, errors.Join(ErrSerialize, err)
}
return b, nil
}
func deserialize[T any](b []byte) (*T, error) {
v := new(T)
if err := json.Unmarshal(b, v); err != nil {
return nil, errors.Join(ErrSerialize, err)
}
return v, nil
}

69
repo_test.go Normal file
View File

@@ -0,0 +1,69 @@
package outbox_test
import (
"context"
"database/sql"
"os"
"testing"
_ "github.com/mattn/go-sqlite3"
"code.uint32.ru/tiny/outbox"
)
type My struct {
A string
B int
}
func TestOutboxMethods(t *testing.T) {
db, err := sql.Open("sqlite3", "test.db")
if err != nil {
t.Fatal(err)
}
defer os.Remove("test.db")
defer db.Close()
ctx := context.Background()
box, err := outbox.OpenOrCreate[My](ctx, db)
if err != nil {
t.Fatal(err)
}
one := My{
A: "hello",
B: 42,
}
if err := box.Save(ctx, "test", &one); err != nil {
t.Fatal(err)
}
lst, err := box.GetPending(ctx, 100)
if err != nil {
t.Fatal(err)
}
if len(lst) != 1 {
t.Fatal("incorrect pending len")
}
if err := box.MarkProcessed(ctx, "test"); err != nil {
t.Fatal(err)
}
lst, err = box.GetPending(ctx, 100)
if err != nil {
t.Fatal(err)
}
if len(lst) != 0 {
t.Fatal("should be zero pending")
}
if err := box.Cleanup(ctx); err != nil {
t.Fatal(err)
}
}