104 lines
2.2 KiB
Go
104 lines
2.2 KiB
Go
package outbox
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"time"
|
|
)
|
|
|
|
var (
|
|
ErrMarshal = errors.New("error marshaling value")
|
|
ErrInsert = errors.New("error inserting value")
|
|
ErrSelectPending = errors.New("error selecting pending objects")
|
|
ErrUpdate = errors.New("error updating object")
|
|
ErrCleanup = errors.New("error deleting processed items")
|
|
)
|
|
|
|
type Box[T any] interface {
|
|
Save(ctx context.Context, id string, v *T) error
|
|
MarkProcessed(ctx context.Context, id string) error
|
|
GetPending(ctx context.Context, n int) ([]*T, error)
|
|
}
|
|
|
|
func OpenOrCreate[T any](ctx context.Context, db *sql.DB) (*Outbox[T], error) {
|
|
if err := initDB(ctx, db); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &Outbox[T]{
|
|
db: db,
|
|
}, nil
|
|
}
|
|
|
|
type Outbox[T any] struct {
|
|
db *sql.DB
|
|
}
|
|
|
|
func (o *Outbox[T]) Save(ctx context.Context, id string, v *T) error {
|
|
now := time.Now()
|
|
|
|
b, err := json.Marshal(v)
|
|
if err != nil {
|
|
return errors.Join(ErrMarshal, err)
|
|
}
|
|
|
|
stmt := "INSERT INTO outbox (id, created_at, is_processed, payload) VALUES ($1, $2, $3, $4)"
|
|
|
|
if _, err := o.db.ExecContext(ctx, stmt, id, now, false, b); err != nil {
|
|
return errors.Join(ErrInsert, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (o *Outbox[T]) MarkProcessed(ctx context.Context, id string) error {
|
|
stmt := "UPDATE outbox SET is_processed = true WHERE id = $1"
|
|
|
|
if _, err := o.db.ExecContext(ctx, stmt, id); err != nil {
|
|
return errors.Join(ErrUpdate, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (o *Outbox[T]) GetPending(ctx context.Context, n int) ([]*T, error) {
|
|
stmt := "SELECT payload FROM outbox WHERE is_processed = false order by created_at desc limit $1"
|
|
|
|
rows, err := o.db.QueryContext(ctx, stmt, n)
|
|
if err != nil {
|
|
return nil, errors.Join(ErrSelectPending, err)
|
|
}
|
|
|
|
defer rows.Close()
|
|
|
|
out := make([]*T, 0)
|
|
|
|
for rows.Next() {
|
|
b := []byte{}
|
|
if err := rows.Scan(&b); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
v := new(T)
|
|
if err := json.Unmarshal(b, v); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
out = append(out, v)
|
|
}
|
|
|
|
return out, nil
|
|
}
|
|
|
|
func (o *Outbox[T]) Cleanup(ctx context.Context) error {
|
|
stmt := "DELETE FROM outbox WHERE is_processed = true"
|
|
|
|
if _, err := o.db.ExecContext(ctx, stmt); err != nil {
|
|
return errors.Join(ErrCleanup, err)
|
|
}
|
|
|
|
return nil
|
|
}
|