diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..1baa236 --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module code.uint32.ru/tiny/outbox + +go 1.24.1 + +require github.com/mattn/go-sqlite3 v1.14.28 // indirect diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..42e5bac --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +github.com/mattn/go-sqlite3 v1.14.28 h1:ThEiQrnbtumT+QMknw63Befp/ce/nUPgBPMlRFEum7A= +github.com/mattn/go-sqlite3 v1.14.28/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= diff --git a/init_db.go b/init_db.go new file mode 100644 index 0000000..ec92d38 --- /dev/null +++ b/init_db.go @@ -0,0 +1,24 @@ +package outbox + +import ( + "context" + "database/sql" + _ "embed" + "errors" +) + +var ( + ErrMigrate = errors.New("error creating outbox table") +) + +//go:embed init_db.sql +var initStatement string + +func initDB(ctx context.Context, db *sql.DB, tablename string) error { + _, err := db.ExecContext(ctx, initStatement, tablename) + if err != nil { + return errors.Join(ErrMigrate, err) + } + + return nil +} diff --git a/init_db.sql b/init_db.sql new file mode 100644 index 0000000..cc5418b --- /dev/null +++ b/init_db.sql @@ -0,0 +1,7 @@ +CREATE TABLE IF NOT EXISTS $1 ( + id text PRIMARY KEY, + created_at timestamp with time zone default now, + updated_at timestamp with time zone default now, + deleted_at timestamp with time zone, + payload jsonb +) diff --git a/repo.go b/repo.go new file mode 100644 index 0000000..4c6a023 --- /dev/null +++ b/repo.go @@ -0,0 +1,119 @@ +package outbox + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "time" +) + +var ( + ErrSerialize = errors.New("error serialize/deserialize") + ErrInsert = errors.New("error create") + ErrNotFound = errors.New("not found") + ErrRead = errors.New("error read") + ErrUpdate = errors.New("error update") + ErrDelete = errors.New("error delete") +) + +func OpenOrCreate[T any](ctx context.Context, db *sql.DB, tablename string) (*Repo[T], error) { + if err := initDB(ctx, db, tablename); err != nil { + return nil, err + } + + return &Repo[T]{ + db: db, + table: tablename, + }, nil +} + +type Repo[T any] struct { + db *sql.DB + table string +} + +func (r *Repo[T]) CreateOrUpdate(ctx context.Context, id string, v *T) error { + now := time.Now() + + stmt := "INSERT INTO $1 (id, created_at, updated_at, payload) VALUES ($2, $3, $4)" + + b, err := serialize(v) + if err != nil { + return err + } + + if _, err := r.db.ExecContext(ctx, stmt, r.table, id, now, now, b); err != nil { + return errors.Join(ErrInsert, err) + } + + return nil +} + +func (r *Repo[T]) Read(ctx context.Context, id string) (*T, error) { + stmt := "SELECT payload FROM $1 WHERE id = $2 AND deleted_at is NULL" + + row, err := r.db.QueryContext(ctx, stmt, r.table, id) + if err != nil { + // TODO err not found + return nil, errors.Join(ErrRead, err) + } + + b := []byte{} + if err := row.Scan(&b); err != nil { + return nil, err + } + + v, err := deserialize[T](b) + if err != nil { + return nil, err + } + + return v, nil +} + +func (r *Repo[T]) Update(ctx context.Context, id string, v *T) error { + now := time.Now() + stmt := "UPDATE $1 SET updated_at = $3, payload = $4 WHERE id = $2" + + b, err := json.Marshal(v) + if err != nil { + return errors.Join(ErrSerialize, err) + } + + if _, err := r.db.ExecContext(ctx, stmt, r.table, id, now, b); err != nil { + return errors.Join(ErrUpdate, err) + } + + return nil +} + +func (r *Repo[T]) Delete(ctx context.Context, id string) error { + now := time.Now() + + stmt := "UPDATE $1 SET deleted_at = $3 WHERE id = $2" + + if _, err := r.db.ExecContext(ctx, stmt, r.table, now, id); err != nil { + return errors.Join(ErrDelete, err) + } + + return nil +} + +func serialize(v any) ([]byte, error) { + b, err := json.Marshal(v) + if err != nil { + return nil, errors.Join(ErrSerialize, err) + } + + return b, nil +} + +func deserialize[T any](b []byte) (*T, error) { + v := new(T) + if err := json.Unmarshal(b, v); err != nil { + return nil, errors.Join(ErrSerialize, err) + } + + return v, nil +} diff --git a/repo_test.go b/repo_test.go new file mode 100644 index 0000000..0bff76d --- /dev/null +++ b/repo_test.go @@ -0,0 +1,69 @@ +package outbox_test + +import ( + "context" + "database/sql" + "os" + "testing" + + _ "github.com/mattn/go-sqlite3" + + "code.uint32.ru/tiny/outbox" +) + +type My struct { + A string + B int +} + +func TestOutboxMethods(t *testing.T) { + db, err := sql.Open("sqlite3", "test.db") + if err != nil { + t.Fatal(err) + } + + defer os.Remove("test.db") + defer db.Close() + + ctx := context.Background() + + box, err := outbox.OpenOrCreate[My](ctx, db) + if err != nil { + t.Fatal(err) + } + + one := My{ + A: "hello", + B: 42, + } + + if err := box.Save(ctx, "test", &one); err != nil { + t.Fatal(err) + } + + lst, err := box.GetPending(ctx, 100) + if err != nil { + t.Fatal(err) + } + + if len(lst) != 1 { + t.Fatal("incorrect pending len") + } + + if err := box.MarkProcessed(ctx, "test"); err != nil { + t.Fatal(err) + } + + lst, err = box.GetPending(ctx, 100) + if err != nil { + t.Fatal(err) + } + + if len(lst) != 0 { + t.Fatal("should be zero pending") + } + + if err := box.Cleanup(ctx); err != nil { + t.Fatal(err) + } +}