From b6dc6d3fce1af5f529fd9f725240f89281f57d2b Mon Sep 17 00:00:00 2001 From: Dmitry Fedotov Date: Sun, 4 May 2025 21:13:19 +0300 Subject: [PATCH] working version --- LICENSE | 2 +- go.mod | 6 +- go.sum | 2 - init_db.go | 31 ++++++--- init_db.sql | 7 -- repo.go | 122 +++++++++++++++++++++++------------ repo_test.go | 69 -------------------- test/Makefile | 7 ++ test/go.mod | 3 + test/repo_test.go | 159 ++++++++++++++++++++++++++++++++++++++++++++++ 10 files changed, 277 insertions(+), 131 deletions(-) delete mode 100644 init_db.sql delete mode 100644 repo_test.go create mode 100644 test/Makefile create mode 100644 test/go.mod create mode 100644 test/repo_test.go diff --git a/LICENSE b/LICENSE index ec98758..85a8e0f 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2025 tiny +Copyright (c) 2025 Dmitry Fedotov Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: diff --git a/go.mod b/go.mod index 1baa236..aa3680f 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,3 @@ -module code.uint32.ru/tiny/outbox +module code.uint32.ru/tiny/repo -go 1.24.1 - -require github.com/mattn/go-sqlite3 v1.14.28 // indirect +go 1.24 diff --git a/go.sum b/go.sum index 42e5bac..e69de29 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +0,0 @@ -github.com/mattn/go-sqlite3 v1.14.28 h1:ThEiQrnbtumT+QMknw63Befp/ce/nUPgBPMlRFEum7A= -github.com/mattn/go-sqlite3 v1.14.28/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= diff --git a/init_db.go b/init_db.go index ec92d38..f808fa5 100644 --- a/init_db.go +++ b/init_db.go @@ -1,23 +1,36 @@ -package outbox +package repo import ( "context" "database/sql" _ "embed" "errors" + "strings" ) -var ( - ErrMigrate = errors.New("error creating outbox table") -) - -//go:embed init_db.sql -var initStatement string +var initStatement = ` +CREATE TABLE IF NOT EXISTS -- ( + id text PRIMARY KEY, + created_at timestamp with time zone, + updated_at timestamp with time zone, + deleted_at timestamp with time zone, + payload jsonb +)` func initDB(ctx context.Context, db *sql.DB, tablename string) error { - _, err := db.ExecContext(ctx, initStatement, tablename) + if tablename == "" { + return errors.Join(ErrInitRepo, errors.New("tablename may not be empty")) + } + + if db == nil { + return errors.Join(ErrInitRepo, errors.New("db instance is nil")) + } + + query := strings.Replace(initStatement, "--", tablename, 1) + + _, err := db.ExecContext(ctx, query) if err != nil { - return errors.Join(ErrMigrate, err) + return errors.Join(ErrInitRepo, err) } return nil diff --git a/init_db.sql b/init_db.sql deleted file mode 100644 index cc5418b..0000000 --- a/init_db.sql +++ /dev/null @@ -1,7 +0,0 @@ -CREATE TABLE IF NOT EXISTS $1 ( - id text PRIMARY KEY, - created_at timestamp with time zone default now, - updated_at timestamp with time zone default now, - deleted_at timestamp with time zone, - payload jsonb -) diff --git a/repo.go b/repo.go index 4c6a023..e2ec89a 100644 --- a/repo.go +++ b/repo.go @@ -1,4 +1,4 @@ -package outbox +package repo import ( "context" @@ -9,62 +9,76 @@ import ( ) var ( - ErrSerialize = errors.New("error serialize/deserialize") - ErrInsert = errors.New("error create") + ErrInitRepo = errors.New("error creating table") + ErrMarshal = errors.New("error marshal") + ErrExecQuery = errors.New("error executing DB query") ErrNotFound = errors.New("not found") - ErrRead = errors.New("error read") - ErrUpdate = errors.New("error update") - ErrDelete = errors.New("error delete") ) -func OpenOrCreate[T any](ctx context.Context, db *sql.DB, tablename string) (*Repo[T], error) { +type Repo[T any] interface { + // Create inserts object into repository table. If id already exists + // in the database it is an error. + Create(ctx context.Context, id string, v *T) error + // Read returns object with specified id or ErrNotFound + Read(ctx context.Context, id string) (*T, error) + // Update updates object with id. + Update(ctx context.Context, id string, v *T) error + // Delete performs soft-delete (actually marks object as unavailable). + Delete(ctx context.Context, id string) error + // Purge actually deletes database record with id. + Purge(ctx context.Context, id string) error +} + +// OpenOrCreate accepts *sql.DB and tablename and tries to create the named table +// in the database if it does not exist. The DB instance will also be used by the repository. +func OpenOrCreate[T any](ctx context.Context, db *sql.DB, tablename string) (Repo[T], error) { if err := initDB(ctx, db, tablename); err != nil { return nil, err } - return &Repo[T]{ + return &repo[T]{ db: db, table: tablename, }, nil } -type Repo[T any] struct { +type repo[T any] struct { db *sql.DB table string } -func (r *Repo[T]) CreateOrUpdate(ctx context.Context, id string, v *T) error { +func (r *repo[T]) Create(ctx context.Context, id string, v *T) error { now := time.Now() - stmt := "INSERT INTO $1 (id, created_at, updated_at, payload) VALUES ($2, $3, $4)" - - b, err := serialize(v) + b, err := marshal(v) if err != nil { return err } - if _, err := r.db.ExecContext(ctx, stmt, r.table, id, now, now, b); err != nil { - return errors.Join(ErrInsert, err) + query := "INSERT INTO " + r.table + " (id, created_at, updated_at, payload) VALUES ($1, $2, $3, $4)" + + if err := r.execContext(ctx, query, id, now, now, string(b)); err != nil { + return err } return nil } -func (r *Repo[T]) Read(ctx context.Context, id string) (*T, error) { - stmt := "SELECT payload FROM $1 WHERE id = $2 AND deleted_at is NULL" +func (r *repo[T]) Read(ctx context.Context, id string) (*T, error) { + query := "SELECT payload FROM " + r.table + " WHERE id = $1 AND deleted_at is NULL" - row, err := r.db.QueryContext(ctx, stmt, r.table, id) - if err != nil { - // TODO err not found - return nil, errors.Join(ErrRead, err) - } + row := r.db.QueryRowContext(ctx, query, id) - b := []byte{} - if err := row.Scan(&b); err != nil { + var s string + + err := row.Scan(&s) + if err != nil && errors.Is(err, sql.ErrNoRows) { + return nil, errors.Join(ErrNotFound, err) + } else if err != nil { return nil, err } - v, err := deserialize[T](b) + v, err := unmarshal[T]([]byte(s)) if err != nil { return nil, err } @@ -72,47 +86,77 @@ func (r *Repo[T]) Read(ctx context.Context, id string) (*T, error) { return v, nil } -func (r *Repo[T]) Update(ctx context.Context, id string, v *T) error { +func (r *repo[T]) Update(ctx context.Context, id string, v *T) error { now := time.Now() - stmt := "UPDATE $1 SET updated_at = $3, payload = $4 WHERE id = $2" - b, err := json.Marshal(v) + b, err := marshal(v) if err != nil { - return errors.Join(ErrSerialize, err) + return err } - if _, err := r.db.ExecContext(ctx, stmt, r.table, id, now, b); err != nil { - return errors.Join(ErrUpdate, err) + query := "UPDATE " + r.table + " SET updated_at = $1, payload = $2 WHERE id = $3 AND deleted_at IS NULL" + + if err := r.execContext(ctx, query, now, string(b), id); err != nil { + return err } return nil } -func (r *Repo[T]) Delete(ctx context.Context, id string) error { +// Delete performs soft-delete. It just marks the record as deleted and it will +// no longer be available to Read and Update methods. +func (r *repo[T]) Delete(ctx context.Context, id string) error { now := time.Now() - stmt := "UPDATE $1 SET deleted_at = $3 WHERE id = $2" + query := "UPDATE " + r.table + " SET deleted_at = $1 WHERE id = $2 AND deleted_at IS NULL" - if _, err := r.db.ExecContext(ctx, stmt, r.table, now, id); err != nil { - return errors.Join(ErrDelete, err) + if err := r.execContext(ctx, query, now, id); err != nil { + return err } return nil } -func serialize(v any) ([]byte, error) { +func (r *repo[T]) Purge(ctx context.Context, id string) error { + query := "DELETE FROM " + r.table + " WHERE id = $1" + + if err := r.execContext(ctx, query, id); err != nil { + return err + } + + return nil +} + +func (r *repo[T]) execContext(ctx context.Context, query string, args ...any) error { + res, err := r.db.ExecContext(ctx, query, args...) + if err != nil { + return errors.Join(ErrExecQuery, err) + } + affected, err := res.RowsAffected() + if err != nil { + return errors.Join(ErrExecQuery, err) + } + + if affected == 0 { + return ErrNotFound + } + + return nil +} + +func marshal(v any) ([]byte, error) { b, err := json.Marshal(v) if err != nil { - return nil, errors.Join(ErrSerialize, err) + return nil, errors.Join(ErrMarshal, err) } return b, nil } -func deserialize[T any](b []byte) (*T, error) { +func unmarshal[T any](b []byte) (*T, error) { v := new(T) if err := json.Unmarshal(b, v); err != nil { - return nil, errors.Join(ErrSerialize, err) + return nil, errors.Join(ErrMarshal, err) } return v, nil diff --git a/repo_test.go b/repo_test.go deleted file mode 100644 index 0bff76d..0000000 --- a/repo_test.go +++ /dev/null @@ -1,69 +0,0 @@ -package outbox_test - -import ( - "context" - "database/sql" - "os" - "testing" - - _ "github.com/mattn/go-sqlite3" - - "code.uint32.ru/tiny/outbox" -) - -type My struct { - A string - B int -} - -func TestOutboxMethods(t *testing.T) { - db, err := sql.Open("sqlite3", "test.db") - if err != nil { - t.Fatal(err) - } - - defer os.Remove("test.db") - defer db.Close() - - ctx := context.Background() - - box, err := outbox.OpenOrCreate[My](ctx, db) - if err != nil { - t.Fatal(err) - } - - one := My{ - A: "hello", - B: 42, - } - - if err := box.Save(ctx, "test", &one); err != nil { - t.Fatal(err) - } - - lst, err := box.GetPending(ctx, 100) - if err != nil { - t.Fatal(err) - } - - if len(lst) != 1 { - t.Fatal("incorrect pending len") - } - - if err := box.MarkProcessed(ctx, "test"); err != nil { - t.Fatal(err) - } - - lst, err = box.GetPending(ctx, 100) - if err != nil { - t.Fatal(err) - } - - if len(lst) != 0 { - t.Fatal("should be zero pending") - } - - if err := box.Cleanup(ctx); err != nil { - t.Fatal(err) - } -} diff --git a/test/Makefile b/test/Makefile new file mode 100644 index 0000000..fb5947d --- /dev/null +++ b/test/Makefile @@ -0,0 +1,7 @@ +.PHONY: test + +test: + docker run --rm -d -e POSTGRES_PASSWORD=postgres --name test-postgres -p 5432:5432 postgres:15.6-bookworm + sleep 10 + PGX_DSN=postgres://postgres:postgres@127.0.0.1:5432 go test -v -cover + docker stop test-postgres diff --git a/test/go.mod b/test/go.mod new file mode 100644 index 0000000..9d957e9 --- /dev/null +++ b/test/go.mod @@ -0,0 +1,3 @@ +module pkgtest + +go 1.24.2 diff --git a/test/repo_test.go b/test/repo_test.go new file mode 100644 index 0000000..3bbc4da --- /dev/null +++ b/test/repo_test.go @@ -0,0 +1,159 @@ +package repo_test + +import ( + "context" + "database/sql" + "errors" + "os" + "reflect" + "testing" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/stdlib" + _ "github.com/mattn/go-sqlite3" + + "code.uint32.ru/tiny/repo" +) + +type My struct { + A string + B int +} + +func TestSqlite3(t *testing.T) { + db, err := sql.Open("sqlite3", "test.db") + if err != nil { + t.Fatal(err) + } + + defer os.Remove("test.db") + defer db.Close() + + ctx := context.Background() + + dotest(ctx, db, t) +} + +func TestPGX(t *testing.T) { + dsn, ok := os.LookupEnv("PGX_DSN") + if !ok { + t.SkipNow() + } + + conf, err := pgx.ParseConfig(dsn) + if err != nil { + t.Fatal(err) + } + conf.DefaultQueryExecMode = pgx.QueryExecModeSimpleProtocol + conf.DescriptionCacheCapacity = 0 + conf.StatementCacheCapacity = 0 + + db, err := sql.Open("pgx", stdlib.RegisterConnConfig(conf)) + if err != nil { + t.Fatal(err) + } + + defer db.Close() + + ctx := context.Background() + + dotest(ctx, db, t) +} + +func TestOpenOrCreateFails(t *testing.T) { + ctx := context.Background() + + if _, err := repo.OpenOrCreate[My](ctx, nil, ""); err == nil { + t.Errorf("must fail on empty tablename") + } + + if _, err := repo.OpenOrCreate[My](ctx, nil, "test"); err == nil { + t.Errorf("must fail on nil DB instance") + } +} + +func dotest(ctx context.Context, db *sql.DB, t *testing.T) { + r, err := repo.OpenOrCreate[My](ctx, db, "my_test") + if err != nil { + t.Fatal(err) + } + + testID := "watermelon" + incorrectID := "surelynonexistant" + + one := &My{ + A: "hello", + B: 42, + } + + if err := r.Create(ctx, testID, one); err != nil { + t.Fatal(err) + } + + if err := r.Create(ctx, testID, one); err == nil { + // can not insert with same id + t.Errorf("insert of the same id dows not fail") + } + + two, err := r.Read(ctx, testID) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(one, two) { + t.Errorf("instances are not equal") + t.Logf("%+v", one) + t.Logf("%+v", two) + } + + one.B = 13 + + if err := r.Update(ctx, testID, one); err != nil { + t.Error(err) + } + + two, err = r.Read(ctx, testID) + if err != nil { + t.Error(err) + } + + if !reflect.DeepEqual(one, two) { + t.Errorf("instances are not equal") + t.Logf("%+v", one) + t.Logf("%+v", two) + } + + if err := r.Delete(ctx, testID); err != nil { + t.Error(err) + } + + if err := r.Delete(ctx, testID); err == nil || !errors.Is(err, repo.ErrNotFound) { + // can only delete once + t.Errorf("incorrect error on delete for non-existing id, want: %v, have: %v", repo.ErrNotFound, err) + } + + if err := r.Purge(ctx, testID); err != nil { + t.Error(err) + } + + if err := r.Purge(ctx, testID); err == nil || !errors.Is(err, repo.ErrNotFound) { + // can only purge once + t.Errorf("incorrect error on delete for non-existing id, want: %v, have: %v", repo.ErrNotFound, err) + } + + if _, err := r.Read(ctx, incorrectID); err == nil || !errors.Is(err, repo.ErrNotFound) { + t.Errorf("incorrect error on read for non-existing id, want: %v, have: %v", repo.ErrNotFound, err) + } + + if err := r.Update(ctx, incorrectID, one); err == nil || !errors.Is(err, repo.ErrNotFound) { + t.Errorf("incorrect error on update for non-existing id, want: %v, have: %v", repo.ErrNotFound, err) + } + + if err := r.Delete(ctx, incorrectID); err == nil || !errors.Is(err, repo.ErrNotFound) { + t.Errorf("incorrect error on delete for non-existing id, want: %v, have: %v", repo.ErrNotFound, err) + } + + if err := r.Purge(ctx, incorrectID); err == nil || !errors.Is(err, repo.ErrNotFound) { + t.Errorf("incorrect error on purge for non-existing id, want: %v, have: %v", repo.ErrNotFound, err) + } +}