init
This commit is contained in:
24
db.go
Normal file
24
db.go
Normal file
@@ -0,0 +1,24 @@
|
||||
package outbox
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
_ "embed"
|
||||
"errors"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrMigrate = errors.New("error creating outbox table")
|
||||
)
|
||||
|
||||
//go:embed init_db.sql
|
||||
var initStatement string
|
||||
|
||||
func initDB(ctx context.Context, db *sql.DB) error {
|
||||
_, err := db.ExecContext(ctx, initStatement)
|
||||
if err != nil {
|
||||
return errors.Join(ErrMigrate, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
5
go.mod
Normal file
5
go.mod
Normal file
@@ -0,0 +1,5 @@
|
||||
module code.uint32.ru/tiny/outbox
|
||||
|
||||
go 1.24.1
|
||||
|
||||
require github.com/mattn/go-sqlite3 v1.14.28 // indirect
|
||||
2
go.sum
Normal file
2
go.sum
Normal file
@@ -0,0 +1,2 @@
|
||||
github.com/mattn/go-sqlite3 v1.14.28 h1:ThEiQrnbtumT+QMknw63Befp/ce/nUPgBPMlRFEum7A=
|
||||
github.com/mattn/go-sqlite3 v1.14.28/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
|
||||
6
init_db.sql
Normal file
6
init_db.sql
Normal file
@@ -0,0 +1,6 @@
|
||||
CREATE TABLE IF NOT EXISTS outbox (
|
||||
id text PRIMARY KEY,
|
||||
created_at timestamp with time zone default now,
|
||||
is_processed boolean default false,
|
||||
payload jsonb
|
||||
)
|
||||
103
outbox.go
Normal file
103
outbox.go
Normal file
@@ -0,0 +1,103 @@
|
||||
package outbox
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrMarshal = errors.New("error marshaling value")
|
||||
ErrInsert = errors.New("error inserting value")
|
||||
ErrSelectPending = errors.New("error selecting pending objects")
|
||||
ErrUpdate = errors.New("error updating object")
|
||||
ErrCleanup = errors.New("error deleting processed items")
|
||||
)
|
||||
|
||||
type Box[T any] interface {
|
||||
Save(ctx context.Context, id string, v *T) error
|
||||
MarkProcessed(ctx context.Context, id string) error
|
||||
GetPending(ctx context.Context, n int) ([]*T, error)
|
||||
}
|
||||
|
||||
func OpenOrCreate[T any](ctx context.Context, db *sql.DB) (*Outbox[T], error) {
|
||||
if err := initDB(ctx, db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Outbox[T]{
|
||||
db: db,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type Outbox[T any] struct {
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
func (o *Outbox[T]) Save(ctx context.Context, id string, v *T) error {
|
||||
now := time.Now()
|
||||
|
||||
b, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
return errors.Join(ErrMarshal, err)
|
||||
}
|
||||
|
||||
stmt := "INSERT INTO outbox (id, created_at, is_processed, payload) VALUES ($1, $2, $3, $4)"
|
||||
|
||||
if _, err := o.db.ExecContext(ctx, stmt, id, now, false, b); err != nil {
|
||||
return errors.Join(ErrInsert, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o *Outbox[T]) MarkProcessed(ctx context.Context, id string) error {
|
||||
stmt := "UPDATE outbox SET is_processed = true WHERE id = $1"
|
||||
|
||||
if _, err := o.db.ExecContext(ctx, stmt, id); err != nil {
|
||||
return errors.Join(ErrUpdate, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o *Outbox[T]) GetPending(ctx context.Context, n int) ([]*T, error) {
|
||||
stmt := "SELECT payload FROM outbox WHERE is_processed = false order by created_at desc limit $1"
|
||||
|
||||
rows, err := o.db.QueryContext(ctx, stmt, n)
|
||||
if err != nil {
|
||||
return nil, errors.Join(ErrSelectPending, err)
|
||||
}
|
||||
|
||||
defer rows.Close()
|
||||
|
||||
out := make([]*T, 0)
|
||||
|
||||
for rows.Next() {
|
||||
b := []byte{}
|
||||
if err := rows.Scan(&b); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
v := new(T)
|
||||
if err := json.Unmarshal(b, v); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
out = append(out, v)
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (o *Outbox[T]) Cleanup(ctx context.Context) error {
|
||||
stmt := "DELETE FROM outbox WHERE is_processed = true"
|
||||
|
||||
if _, err := o.db.ExecContext(ctx, stmt); err != nil {
|
||||
return errors.Join(ErrCleanup, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
69
outbox_test.go
Normal file
69
outbox_test.go
Normal file
@@ -0,0 +1,69 @@
|
||||
package outbox_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
|
||||
"code.uint32.ru/tiny/outbox"
|
||||
)
|
||||
|
||||
type My struct {
|
||||
A string
|
||||
B int
|
||||
}
|
||||
|
||||
func TestOutboxMethods(t *testing.T) {
|
||||
db, err := sql.Open("sqlite3", "test.db")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
defer os.Remove("test.db")
|
||||
defer db.Close()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
box, err := outbox.OpenOrCreate[My](ctx, db)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
one := My{
|
||||
A: "hello",
|
||||
B: 42,
|
||||
}
|
||||
|
||||
if err := box.Save(ctx, "test", &one); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
lst, err := box.GetPending(ctx, 100)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(lst) != 1 {
|
||||
t.Fatal("incorrect pending len")
|
||||
}
|
||||
|
||||
if err := box.MarkProcessed(ctx, "test"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
lst, err = box.GetPending(ctx, 100)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(lst) != 0 {
|
||||
t.Fatal("should be zero pending")
|
||||
}
|
||||
|
||||
if err := box.Cleanup(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user