Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgres v2: Add support for DeleteWithPrefix #3307

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/infrastructure/docker-compose-cockroachdb.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
version: '2'
services:
cockroachdb:
image: cockroachdb/cockroach:v21.2.3
image: cockroachdb/cockroach:v23.1.13
hostname: cockroachdb
command: start-single-node --cluster-name=single-node --logtostderr=WARNING --log-file-verbosity=WARNING --insecure
command: start-single-node --cluster-name=single-node --insecure
restart: always
ports:
- "26257:26257"
Expand Down
13 changes: 13 additions & 0 deletions .github/infrastructure/docker-compose-yugabytedb.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
version: '3'

services:
yugabyte:
image: yugabytedb/yugabyte:2.19.3.0-b140
command: "bin/yugabyted start --daemon=false"
ports:
- 7000:7000
- 9000:9000
- 15433:15433
- 5433:5433
- 9042:9042

30 changes: 17 additions & 13 deletions state/cockroachdb/cockroachdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,13 @@ WHERE
func ensureTables(ctx context.Context, db pginterfaces.PGXPoolConn, opts postgresql.MigrateOptions) error {
exists, err := tableExists(ctx, db, opts.StateTableName)
if err != nil {
return err
return fmt.Errorf("failed to check if table '%s' exists: %w", opts.StateTableName, err)
}

if !exists {
opts.Logger.Info("Creating CockroachDB state table")
_, err = db.Exec(ctx, fmt.Sprintf(`CREATE TABLE %s (
_, err = db.Exec(ctx, fmt.Sprintf(`
CREATE TABLE %s (
key text NOT NULL PRIMARY KEY,
value jsonb NOT NULL,
isbinary boolean NOT NULL,
Expand All @@ -86,37 +87,40 @@ func ensureTables(ctx context.Context, db pginterfaces.PGXPoolConn, opts postgre
updatedate TIMESTAMP WITH TIME ZONE NULL,
expiredate TIMESTAMP WITH TIME ZONE NULL,
INDEX expiredate_idx (expiredate)
);`, opts.StateTableName))
)`,
opts.StateTableName))
if err != nil {
return err
return fmt.Errorf("failed to create state table: %w", err)
}
}

// If table was created before v1.11.
_, err = db.Exec(ctx, fmt.Sprintf(
`ALTER TABLE %s ADD COLUMN IF NOT EXISTS expiredate TIMESTAMP WITH TIME ZONE NULL;`, opts.StateTableName))
if err != nil {
return err
return fmt.Errorf("failed to add expiredate column to state table: %w", err)
}
_, err = db.Exec(ctx, fmt.Sprintf(
`CREATE INDEX IF NOT EXISTS expiredate_idx ON %s (expiredate);`, opts.StateTableName))
if err != nil {
return err
return fmt.Errorf("failed to create expiredate index on state table: %w", err)
}

exists, err = tableExists(ctx, db, opts.MetadataTableName)
if err != nil {
return err
return fmt.Errorf("failed to check if table '%s' exists: %w", opts.MetadataTableName, err)
}

if !exists {
opts.Logger.Info("Creating CockroachDB metadata table")
_, err = db.Exec(ctx, fmt.Sprintf(`CREATE TABLE %s (
key text NOT NULL PRIMARY KEY,
value text NOT NULL
);`, opts.MetadataTableName))
_, err = db.Exec(ctx, fmt.Sprintf(`
CREATE TABLE %s (
key text NOT NULL PRIMARY KEY,
value text NOT NULL
);`,
opts.MetadataTableName))
if err != nil {
return err
return fmt.Errorf("failed to create metadata table: %w", err)
}
}

Expand All @@ -125,6 +129,6 @@ func ensureTables(ctx context.Context, db pginterfaces.PGXPoolConn, opts postgre

func tableExists(ctx context.Context, db pginterfaces.PGXPoolConn, tableName string) (bool, error) {
exists := false
err := db.QueryRow(ctx, "SELECT EXISTS (SELECT * FROM pg_tables where tablename = $1)", tableName).Scan(&exists)
err := db.QueryRow(ctx, "SELECT EXISTS (SELECT * FROM pg_tables WHERE tablename = $1)", tableName).Scan(&exists)
return exists, err
}
2 changes: 1 addition & 1 deletion state/cockroachdb/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ urls:
- title: Reference
url: https://docs.dapr.io/reference/components-reference/supported-state-stores/setup-cockroachdb/
capabilities:
- actorStateStore
- crud
- transactional
- etag
- ttl
- actorStateStore
authenticationProfiles:
- title: "Connection string"
description: "Authenticate using a Connection String"
Expand Down
11 changes: 10 additions & 1 deletion state/postgresql/v2/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@ import (
"github.com/dapr/kit/ptr"
)

type pgTable string
type (
pgTable string
pgFunction string
)

const (
pgTableState pgTable = "state"

pgFunctionKeyPrefix pgFunction = "key_prefix"
)

const (
Expand Down Expand Up @@ -85,3 +90,7 @@ func (m *pgMetadata) InitWithMetadata(meta state.Metadata, azureADEnabled bool)
func (m pgMetadata) TableName(table pgTable) string {
return m.TablePrefix + string(table)
}

func (m pgMetadata) FunctionName(function pgFunction) string {
return m.TablePrefix + string(function)
}
1 change: 1 addition & 0 deletions state/postgresql/v2/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ capabilities:
- transactional
- etag
- ttl
- deleteWithPrefix
builtinAuthenticationProfiles:
- name: "azuread"
metadata:
Expand Down
102 changes: 98 additions & 4 deletions state/postgresql/v2/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"reflect"
"strconv"
"strings"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -47,13 +48,22 @@ type PostgreSQL struct {

gc sqlinternal.GarbageCollector

enableAzureAD bool
enableAzureAD bool
deleteWithPrefix bool
}

// PostgreSQLDeleteWithPrefix extends PostgreSQL and adds support for DeleteWithPrefix.
type PostgreSQLDeleteWithPrefix struct {
*PostgreSQL
}

type Options struct {
// Disables support for authenticating with Azure AD
// This should be set to "false" when targeting different databases than PostgreSQL (such as CockroachDB)
NoAzureAD bool
// Disables support for DeleteWithPrefix
// This should be set to "false" when targeting CockroachDB and other PostgreSQL-compatible database that don't offer the necessary capabilities
NoDeleteWithPrefix bool
}

// NewPostgreSQLStateStore creates a new instance of PostgreSQL state store v2 with the default options.
Expand All @@ -66,11 +76,20 @@ func NewPostgreSQLStateStore(logger logger.Logger) state.Store {
// NewPostgreSQLStateStoreWithOptions creates a new instance of PostgreSQL state store with options.
func NewPostgreSQLStateStoreWithOptions(logger logger.Logger, opts Options) state.Store {
s := &PostgreSQL{
logger: logger,
enableAzureAD: !opts.NoAzureAD,
logger: logger,
enableAzureAD: !opts.NoAzureAD,
deleteWithPrefix: !opts.NoDeleteWithPrefix,
}
s.BulkStore = state.NewDefaultBulkStore(s)
return s

// If we want DeleteWithPrefix support, wrap into a PostgreSQLDeleteWithPrefix object
if opts.NoDeleteWithPrefix {
return s
}

return &PostgreSQLDeleteWithPrefix{
PostgreSQL: s,
}
}

// Init sets up Postgres connection and performs migrations
Expand Down Expand Up @@ -151,6 +170,7 @@ func (p *PostgreSQL) performMigrations(ctx context.Context) error {
}

stateTable := p.metadata.TableName(pgTableState)
keyPrefixFunction := p.metadata.FunctionName(pgFunctionKeyPrefix)

return m.Perform(ctx, []sqlinternal.MigrationFn{
// Migration 1: create the table for state
Expand All @@ -175,6 +195,39 @@ CREATE INDEX ON %[1]s (expires_at);
}
return nil
},
// Migration 2: add the "key_prefix" function and "prefix" index to the state table
// If DeleteWithPrefix support is disabled, this is a no-op, but we keep the migration here because we want the migration level to increase, or bad things will happen if another migration is added in the future
func(ctx context.Context) error {
if !p.deleteWithPrefix {
return nil
}

// Create the "key_prefix" function
// Then add the "prefix" index to the state table that can be used by DeleteWithPrefix
p.logger.Infof("Creating function '%s' and adding 'prefix' index to table '%s'", keyPrefixFunction, stateTable)
_, err := p.db.Exec(
ctx,
fmt.Sprintf(
`
CREATE FUNCTION %[1]s(k text) RETURNS text
LANGUAGE SQL
IMMUTABLE
LEAKPROOF
RETURNS NULL ON NULL INPUT
AS $$
SELECT array_to_string(trim_array(string_to_array(k, '||'),1), '||');
$$;

CREATE INDEX %[2]s_prefix_idx ON %[2]s (%[1]s("key")) WHERE %[1]s("key") <> '';
`,
keyPrefixFunction, stateTable,
),
)
if err != nil {
return err
}
return nil
},
})
}

Expand All @@ -187,6 +240,16 @@ func (p *PostgreSQL) Features() []state.Feature {
}
}

// Features returns the features available in this state store.
func (p *PostgreSQLDeleteWithPrefix) Features() []state.Feature {
return []state.Feature{
state.FeatureETag,
state.FeatureTransactional,
state.FeatureTTL,
state.FeatureDeleteWithPrefix,
}
}

func (p *PostgreSQL) GetDB() *pgxpool.Pool {
// We can safely cast to *pgxpool.Pool because this method is never used in unit tests where we mock the DB
return p.db.(*pgxpool.Pool)
Expand Down Expand Up @@ -536,3 +599,34 @@ func (p *PostgreSQL) GetComponentMetadata() (metadataInfo metadata.MetadataMap)
metadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, metadata.StateStoreType)
return
}

func (p *PostgreSQLDeleteWithPrefix) DeleteWithPrefix(ctx context.Context, req state.DeleteWithPrefixRequest) (state.DeleteWithPrefixResponse, error) {
err := req.Validate()
if err != nil {
return state.DeleteWithPrefixResponse{}, err
}

ctx, cancel := context.WithTimeout(ctx, p.metadata.Timeout)
defer cancel()

// Trim the trailing "||" from the prefix
result, err := p.db.Exec(ctx, "DELETE FROM "+p.metadata.TableName(pgTableState)+" WHERE "+p.metadata.FunctionName(pgFunctionKeyPrefix)+`("key") = $1`, strings.TrimSuffix(req.Prefix, "||"))
if err != nil {
return state.DeleteWithPrefixResponse{}, err
}

return state.DeleteWithPrefixResponse{
Count: result.RowsAffected(),
}, nil
}

// Compile-time interface assertions
var (
_ state.Store = (*PostgreSQL)(nil)
_ state.TransactionalStore = (*PostgreSQL)(nil)
_ state.BulkStore = (*PostgreSQL)(nil)
_ state.Store = (*PostgreSQLDeleteWithPrefix)(nil)
_ state.TransactionalStore = (*PostgreSQLDeleteWithPrefix)(nil)
_ state.BulkStore = (*PostgreSQLDeleteWithPrefix)(nil)
_ state.DeleteWithPrefix = (*PostgreSQLDeleteWithPrefix)(nil)
)
9 changes: 7 additions & 2 deletions tests/config/state/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,17 @@ components:
# This component requires etags to be numeric
badEtag: "1"
- component: postgresql.v2.docker
operations: [ "transaction", "etag", "first-write", "ttl" ]
operations: [ "transaction", "etag", "first-write", "ttl", "delete-with-prefix" ]
config:
# This component requires etags to be UUIDs
badEtag: "e9b9e142-74b1-4a2e-8e90-3f4ffeea2e70"
- component: postgresql.v2.azure
operations: [ "transaction", "etag", "first-write", "ttl" ]
operations: [ "transaction", "etag", "first-write", "ttl", "delete-with-prefix" ]
config:
# This component requires etags to be UUIDs
badEtag: "e9b9e142-74b1-4a2e-8e90-3f4ffeea2e70"
- component: yugabytedb.v2
operations: [ "transaction", "etag", "first-write", "ttl", "delete-with-prefix" ]
config:
# This component requires etags to be UUIDs
badEtag: "e9b9e142-74b1-4a2e-8e90-3f4ffeea2e70"
Expand Down
12 changes: 12 additions & 0 deletions tests/config/state/yugabytedb/v2/statestore.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
spec:
type: state.yugabytedb
version: v2
metadata:
- name: connectionString
value: "host=localhost user=yugabyte password=yugabyte port=5433 connect_timeout=10 database=yugabyte"
- name: tablePrefix
value: "confv2_"
8 changes: 7 additions & 1 deletion tests/conformance/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@
return s_postgresql_v2.NewPostgreSQLStateStore(testLogger)
case "postgresql.v2.azure":
return s_postgresql_v2.NewPostgreSQLStateStore(testLogger)
case "yugabytedb.v2":
return s_postgresql_v2.NewPostgreSQLStateStore(testLogger)
case "sqlite":
return s_sqlite.NewSQLiteStateStore(testLogger)
case "mysql.mysql":
Expand All @@ -121,7 +123,11 @@
case "cockroachdb.v2":
// v2 of the component is an alias for the PostgreSQL state store
// We still have a conformance test to validate that the component works with CockroachDB
return s_postgresql_v2.NewPostgreSQLStateStoreWithOptions(testLogger, s_postgresql_v2.Options{NoAzureAD: true})
// CockroachDB does not support Azure AD authentication or the DeleteWithPrefix interface

Check failure on line 126 in tests/conformance/state_test.go

View workflow job for this annotation

GitHub Actions / Build linux_amd64 binaries

Duplicate words (CockroachDB) found (dupword)
return s_postgresql_v2.NewPostgreSQLStateStoreWithOptions(testLogger, s_postgresql_v2.Options{
NoAzureAD: true,
NoDeleteWithPrefix: true,
})
case "memcached":
return s_memcached.NewMemCacheStateStore(testLogger)
case "rethinkdb":
Expand Down
Loading