package outbox import ( "context" "database/sql" "encoding/json" "errors" "time" ) var ( ErrMarshal = errors.New("error marshaling value") ErrInsert = errors.New("error inserting value") ErrSelectPending = errors.New("error selecting pending objects") ErrUpdate = errors.New("error updating object") ErrCleanup = errors.New("error deleting processed items") ) type Box[T any] interface { Save(ctx context.Context, id string, v *T) error MarkProcessed(ctx context.Context, id string) error GetPending(ctx context.Context, n int) ([]*T, error) } func OpenOrCreate[T any](ctx context.Context, db *sql.DB) (*Outbox[T], error) { if err := initDB(ctx, db); err != nil { return nil, err } return &Outbox[T]{ db: db, }, nil } type Outbox[T any] struct { db *sql.DB } func (o *Outbox[T]) Save(ctx context.Context, id string, v *T) error { now := time.Now() b, err := json.Marshal(v) if err != nil { return errors.Join(ErrMarshal, err) } stmt := "INSERT INTO outbox (id, created_at, is_processed, payload) VALUES ($1, $2, $3, $4)" if _, err := o.db.ExecContext(ctx, stmt, id, now, false, b); err != nil { return errors.Join(ErrInsert, err) } return nil } func (o *Outbox[T]) MarkProcessed(ctx context.Context, id string) error { stmt := "UPDATE outbox SET is_processed = true WHERE id = $1" if _, err := o.db.ExecContext(ctx, stmt, id); err != nil { return errors.Join(ErrUpdate, err) } return nil } func (o *Outbox[T]) GetPending(ctx context.Context, n int) ([]*T, error) { stmt := "SELECT payload FROM outbox WHERE is_processed = false order by created_at desc limit $1" rows, err := o.db.QueryContext(ctx, stmt, n) if err != nil { return nil, errors.Join(ErrSelectPending, err) } defer rows.Close() out := make([]*T, 0) for rows.Next() { b := []byte{} if err := rows.Scan(&b); err != nil { return nil, err } v := new(T) if err := json.Unmarshal(b, v); err != nil { return nil, err } out = append(out, v) } return out, nil } func (o *Outbox[T]) Cleanup(ctx context.Context) error { stmt := "DELETE FROM outbox WHERE is_processed = true" if _, err := o.db.ExecContext(ctx, stmt); err != nil { return errors.Join(ErrCleanup, err) } return nil }