From fe249dad577699ea98448b82d4ac37b3fd45a535 Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Fri, 5 Jan 2024 21:11:39 +0000 Subject: [PATCH 1/5] Add DeleteWithPrefix to postgres v2 Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- state/postgresql/v2/metadata.go | 11 ++++++- state/postgresql/v2/metadata.yaml | 1 + state/postgresql/v2/postgresql.go | 52 ++++++++++++++++++++++++++++++- tests/config/state/tests.yml | 6 ++-- 4 files changed, 65 insertions(+), 5 deletions(-) diff --git a/state/postgresql/v2/metadata.go b/state/postgresql/v2/metadata.go index a21829807d..e01fa52fcd 100644 --- a/state/postgresql/v2/metadata.go +++ b/state/postgresql/v2/metadata.go @@ -23,10 +23,15 @@ import ( "github.com/dapr/kit/ptr" ) -type pgTable string +type ( + pgTable string + pgFunction string +) const ( pgTableState pgTable = "state" + + pgFunctionKeyPrefix pgFunction = "key_prefix" ) const ( @@ -85,3 +90,7 @@ func (m *pgMetadata) InitWithMetadata(meta state.Metadata, azureADEnabled bool) func (m pgMetadata) TableName(table pgTable) string { return m.TablePrefix + string(table) } + +func (m pgMetadata) FunctionName(function pgFunction) string { + return m.TablePrefix + string(function) +} diff --git a/state/postgresql/v2/metadata.yaml b/state/postgresql/v2/metadata.yaml index de53cdbcbc..bdb91c7c12 100644 --- a/state/postgresql/v2/metadata.yaml +++ b/state/postgresql/v2/metadata.yaml @@ -15,6 +15,7 @@ capabilities: - transactional - etag - ttl + - deleteWithPrefix builtinAuthenticationProfiles: - name: "azuread" metadata: diff --git a/state/postgresql/v2/postgresql.go b/state/postgresql/v2/postgresql.go index 1cbb537d85..d4060722a3 100644 --- a/state/postgresql/v2/postgresql.go +++ b/state/postgresql/v2/postgresql.go @@ -20,6 +20,7 @@ import ( "fmt" "reflect" "strconv" + "strings" "time" "github.com/google/uuid" @@ -151,9 +152,10 @@ func (p *PostgreSQL) performMigrations(ctx context.Context) error { } stateTable := p.metadata.TableName(pgTableState) + keyPrefixFunction := p.metadata.FunctionName(pgFunctionKeyPrefix) return m.Perform(ctx, []sqlinternal.MigrationFn{ - // Migration 1: create the table for state + // Migration 0: create the table for state func(ctx context.Context) error { p.logger.Infof("Creating state table: '%s'", stateTable) _, err := p.db.Exec(ctx, @@ -175,6 +177,33 @@ CREATE INDEX ON %[1]s (expires_at); } return nil }, + // Migration 1: add the "key_prefix" function and "prefix" index to the state table + func(ctx context.Context) error { + // Create the "key_prefix" function + // Then add the "prefix" index to the state table that can be used by DeleteWithPrefix + p.logger.Infof("Creating function '%s' and adding 'prefix' index to table '%s'", keyPrefixFunction, stateTable) + _, err := p.db.Exec( + ctx, + fmt.Sprintf( + ` +CREATE FUNCTION %[1]s(k text) RETURNS text +LANGUAGE SQL +IMMUTABLE +LEAKPROOF +RETURNS NULL ON NULL INPUT +RETURN + array_to_string(trim_array(string_to_array(k, '||'),1), '||'); + +CREATE INDEX %[2]s_prefix_idx ON %[2]s (%[1]s("key")) WHERE %[1]s("key") <> ''; +`, + keyPrefixFunction, stateTable, + ), + ) + if err != nil { + return fmt.Errorf("failed to create virtual column: %w", err) + } + return nil + }, }) } @@ -184,6 +213,7 @@ func (p *PostgreSQL) Features() []state.Feature { state.FeatureETag, state.FeatureTransactional, state.FeatureTTL, + state.FeatureDeleteWithPrefix, } } @@ -504,6 +534,26 @@ func (p *PostgreSQL) execMultiOperation(ctx context.Context, op state.Transactio } } +func (p *PostgreSQL) DeleteWithPrefix(ctx context.Context, req state.DeleteWithPrefixRequest) (state.DeleteWithPrefixResponse, error) { + err := req.Validate() + if err != nil { + return state.DeleteWithPrefixResponse{}, err + } + + ctx, cancel := context.WithTimeout(ctx, p.metadata.Timeout) + defer cancel() + + // Trim the trailing "||" from the prefix + result, err := p.db.Exec(ctx, "DELETE FROM "+p.metadata.TableName(pgTableState)+" WHERE "+p.metadata.FunctionName(pgFunctionKeyPrefix)+`("key") = $1`, strings.TrimSuffix(req.Prefix, "||")) + if err != nil { + return state.DeleteWithPrefixResponse{}, err + } + + return state.DeleteWithPrefixResponse{ + Count: result.RowsAffected(), + }, nil +} + func (p *PostgreSQL) CleanupExpired() error { if p.gc != nil { return p.gc.CleanupExpired() diff --git a/tests/config/state/tests.yml b/tests/config/state/tests.yml index e281780aa0..8e141eacfe 100644 --- a/tests/config/state/tests.yml +++ b/tests/config/state/tests.yml @@ -45,12 +45,12 @@ components: # This component requires etags to be numeric badEtag: "1" - component: postgresql.v2.docker - operations: [ "transaction", "etag", "first-write", "ttl" ] + operations: [ "transaction", "etag", "first-write", "ttl", "delete-with-prefix" ] config: # This component requires etags to be UUIDs badEtag: "e9b9e142-74b1-4a2e-8e90-3f4ffeea2e70" - component: postgresql.v2.azure - operations: [ "transaction", "etag", "first-write", "ttl" ] + operations: [ "transaction", "etag", "first-write", "ttl", "delete-with-prefix" ] config: # This component requires etags to be UUIDs badEtag: "e9b9e142-74b1-4a2e-8e90-3f4ffeea2e70" @@ -83,7 +83,7 @@ components: # This component requires etags to be numeric badEtag: "9999999" - component: cockroachdb.v2 - operations: [ "transaction", "etag", "first-write", "ttl" ] + operations: [ "transaction", "etag", "first-write", "ttl", "delete-with-prefix" ] config: # This component requires etags to be UUIDs badEtag: "7b104dbd-1ae2-4772-bfa0-e29c7b89bc9b" From e4b3fbdf26eab9409a4d3479006751536cefa87f Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Sat, 6 Jan 2024 00:17:44 +0000 Subject: [PATCH 2/5] WIP: We need to find a solution because CockroachDB doesn't support UDFs Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- .../docker-compose-cockroachdb.yml | 4 +- common/component/postgresql/v1/postgresql.go | 2 + .../postgresql/v1/postgresql_query.go | 1 + state/cockroachdb/cockroachdb.go | 30 +++++++------ state/cockroachdb/metadata.yaml | 2 +- state/postgresql/v1/metadata.yaml | 1 + state/postgresql/v1/migrations.go | 45 ++++++++++++++----- state/postgresql/v1/postgresql.go | 33 ++++++++++++++ tests/config/state/tests.yml | 6 +-- 9 files changed, 95 insertions(+), 29 deletions(-) diff --git a/.github/infrastructure/docker-compose-cockroachdb.yml b/.github/infrastructure/docker-compose-cockroachdb.yml index 2fa6bff9e4..75582673bf 100644 --- a/.github/infrastructure/docker-compose-cockroachdb.yml +++ b/.github/infrastructure/docker-compose-cockroachdb.yml @@ -1,9 +1,9 @@ version: '2' services: cockroachdb: - image: cockroachdb/cockroach:v21.2.3 + image: cockroachdb/cockroach:v23.1.13 hostname: cockroachdb - command: start-single-node --cluster-name=single-node --logtostderr=WARNING --log-file-verbosity=WARNING --insecure + command: start-single-node --cluster-name=single-node --insecure restart: always ports: - "26257:26257" diff --git a/common/component/postgresql/v1/postgresql.go b/common/component/postgresql/v1/postgresql.go index 141242c604..7c55b43f4f 100644 --- a/common/component/postgresql/v1/postgresql.go +++ b/common/component/postgresql/v1/postgresql.go @@ -64,6 +64,7 @@ type Options struct { type MigrateOptions struct { Logger logger.Logger StateTableName string + KeyPrefixFuncName string MetadataTableName string } @@ -120,6 +121,7 @@ func (p *PostgreSQL) Init(ctx context.Context, meta state.Metadata) error { err = p.migrateFn(ctx, p.db, MigrateOptions{ Logger: p.logger, StateTableName: p.metadata.TableName, + KeyPrefixFuncName: p.metadata.TableName + "_key_prefix", MetadataTableName: p.metadata.MetadataTableName, }) if err != nil { diff --git a/common/component/postgresql/v1/postgresql_query.go b/common/component/postgresql/v1/postgresql_query.go index bd3c63e358..3b551e6fed 100644 --- a/common/component/postgresql/v1/postgresql_query.go +++ b/common/component/postgresql/v1/postgresql_query.go @@ -54,6 +54,7 @@ func (p *PostgreSQLQuery) Features() []state.Feature { state.FeatureTransactional, state.FeatureQueryAPI, state.FeatureTTL, + state.FeatureDeleteWithPrefix, } } diff --git a/state/cockroachdb/cockroachdb.go b/state/cockroachdb/cockroachdb.go index 8e8bbdc910..df9f5d46d1 100644 --- a/state/cockroachdb/cockroachdb.go +++ b/state/cockroachdb/cockroachdb.go @@ -72,12 +72,13 @@ WHERE func ensureTables(ctx context.Context, db pginterfaces.PGXPoolConn, opts postgresql.MigrateOptions) error { exists, err := tableExists(ctx, db, opts.StateTableName) if err != nil { - return err + return fmt.Errorf("failed to check if table '%s' exists: %w", opts.StateTableName, err) } if !exists { opts.Logger.Info("Creating CockroachDB state table") - _, err = db.Exec(ctx, fmt.Sprintf(`CREATE TABLE %s ( + _, err = db.Exec(ctx, fmt.Sprintf(` +CREATE TABLE %s ( key text NOT NULL PRIMARY KEY, value jsonb NOT NULL, isbinary boolean NOT NULL, @@ -86,9 +87,10 @@ func ensureTables(ctx context.Context, db pginterfaces.PGXPoolConn, opts postgre updatedate TIMESTAMP WITH TIME ZONE NULL, expiredate TIMESTAMP WITH TIME ZONE NULL, INDEX expiredate_idx (expiredate) -);`, opts.StateTableName)) +)`, + opts.StateTableName)) if err != nil { - return err + return fmt.Errorf("failed to create state table: %w", err) } } @@ -96,27 +98,29 @@ func ensureTables(ctx context.Context, db pginterfaces.PGXPoolConn, opts postgre _, err = db.Exec(ctx, fmt.Sprintf( `ALTER TABLE %s ADD COLUMN IF NOT EXISTS expiredate TIMESTAMP WITH TIME ZONE NULL;`, opts.StateTableName)) if err != nil { - return err + return fmt.Errorf("failed to add expiredate column to state table: %w", err) } _, err = db.Exec(ctx, fmt.Sprintf( `CREATE INDEX IF NOT EXISTS expiredate_idx ON %s (expiredate);`, opts.StateTableName)) if err != nil { - return err + return fmt.Errorf("failed to create expiredate index on state table: %w", err) } exists, err = tableExists(ctx, db, opts.MetadataTableName) if err != nil { - return err + return fmt.Errorf("failed to check if table '%s' exists: %w", opts.MetadataTableName, err) } if !exists { opts.Logger.Info("Creating CockroachDB metadata table") - _, err = db.Exec(ctx, fmt.Sprintf(`CREATE TABLE %s ( - key text NOT NULL PRIMARY KEY, - value text NOT NULL -);`, opts.MetadataTableName)) + _, err = db.Exec(ctx, fmt.Sprintf(` +CREATE TABLE %s ( + key text NOT NULL PRIMARY KEY, + value text NOT NULL +);`, + opts.MetadataTableName)) if err != nil { - return err + return fmt.Errorf("failed to create metadata table: %w", err) } } @@ -125,6 +129,6 @@ func ensureTables(ctx context.Context, db pginterfaces.PGXPoolConn, opts postgre func tableExists(ctx context.Context, db pginterfaces.PGXPoolConn, tableName string) (bool, error) { exists := false - err := db.QueryRow(ctx, "SELECT EXISTS (SELECT * FROM pg_tables where tablename = $1)", tableName).Scan(&exists) + err := db.QueryRow(ctx, "SELECT EXISTS (SELECT * FROM pg_tables WHERE tablename = $1)", tableName).Scan(&exists) return exists, err } diff --git a/state/cockroachdb/metadata.yaml b/state/cockroachdb/metadata.yaml index 6efc793f79..2d24ad79f7 100644 --- a/state/cockroachdb/metadata.yaml +++ b/state/cockroachdb/metadata.yaml @@ -9,11 +9,11 @@ urls: - title: Reference url: https://docs.dapr.io/reference/components-reference/supported-state-stores/setup-cockroachdb/ capabilities: + - actorStateStore - crud - transactional - etag - ttl - - actorStateStore authenticationProfiles: - title: "Connection string" description: "Authenticate using a Connection String" diff --git a/state/postgresql/v1/metadata.yaml b/state/postgresql/v1/metadata.yaml index fa3199e5c7..b7dc5acdae 100644 --- a/state/postgresql/v1/metadata.yaml +++ b/state/postgresql/v1/metadata.yaml @@ -16,6 +16,7 @@ capabilities: - etag - query - ttl + - deleteWithPrefix builtinAuthenticationProfiles: - name: "azuread" metadata: diff --git a/state/postgresql/v1/migrations.go b/state/postgresql/v1/migrations.go index b7034ab75b..3f0631c85a 100644 --- a/state/postgresql/v1/migrations.go +++ b/state/postgresql/v1/migrations.go @@ -39,14 +39,14 @@ func performMigrations(ctx context.Context, db pginterfaces.PGXPoolConn, opts po opts.Logger.Infof("Creating state table '%s'", opts.StateTableName) _, err := db.Exec( ctx, - fmt.Sprintf( - `CREATE TABLE IF NOT EXISTS %s ( - key text NOT NULL PRIMARY KEY, - value jsonb NOT NULL, - isbinary boolean NOT NULL, - insertdate TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), - updatedate TIMESTAMP WITH TIME ZONE NULL - )`, + fmt.Sprintf(` +CREATE TABLE IF NOT EXISTS %s ( + key text NOT NULL PRIMARY KEY, + value jsonb NOT NULL, + isbinary boolean NOT NULL, + insertdate TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + updatedate TIMESTAMP WITH TIME ZONE NULL +)`, opts.StateTableName, ), ) @@ -68,6 +68,31 @@ func performMigrations(ctx context.Context, db pginterfaces.PGXPoolConn, opts po } return nil }, - }, - ) + // Migration 2: add the "key_prefix" function and "prefix" index to the state table + func(ctx context.Context) error { + // Create the "key_prefix" function + // Then add the "prefix" index to the state table that can be used by DeleteWithPrefix + opts.Logger.Infof("Creating function '%s' and adding 'prefix' index to table '%s'", opts.KeyPrefixFuncName, opts.StateTableName) + _, err := db.Exec( + ctx, + fmt.Sprintf(` +CREATE FUNCTION %[1]s(k text) RETURNS text +LANGUAGE SQL +IMMUTABLE +LEAKPROOF +RETURNS NULL ON NULL INPUT +RETURN + array_to_string(trim_array(string_to_array(k, '||'),1), '||'); + +CREATE INDEX %[2]s_prefix_idx ON %[2]s (%[1]s("key")) WHERE %[1]s("key") <> ''; +`, + opts.KeyPrefixFuncName, opts.StateTableName, + ), + ) + if err != nil { + return fmt.Errorf("failed to create virtual column: %w", err) + } + return nil + }, + }) } diff --git a/state/postgresql/v1/postgresql.go b/state/postgresql/v1/postgresql.go index d8d2be92da..2917924d0e 100644 --- a/state/postgresql/v1/postgresql.go +++ b/state/postgresql/v1/postgresql.go @@ -14,6 +14,9 @@ limitations under the License. package postgresql import ( + "context" + "strings" + postgresql "github.com/dapr/components-contrib/common/component/postgresql/v1" "github.com/dapr/components-contrib/state" "github.com/dapr/kit/logger" @@ -61,3 +64,33 @@ func NewPostgreSQLStateStore(logger logger.Logger) state.Store { }, }) } + +// PostgreSQLStoreWithDeleteWithPrefix is a state store for PostgreSQL that implements the DeleteWithPrefix method +type PostgreSQLStoreWithDeleteWithPrefix struct { + state.Store +} + +// Features returns the features available in this state store. +func (p *PostgreSQLStoreWithDeleteWithPrefix) Features() []state.Feature { + return append(p.Store.Features(), state.FeatureDeleteWithPrefix) +} + +func (p *PostgreSQLStoreWithDeleteWithPrefix) DeleteWithPrefix(ctx context.Context, req state.DeleteWithPrefixRequest) (state.DeleteWithPrefixResponse, error) { + err := req.Validate() + if err != nil { + return state.DeleteWithPrefixResponse{}, err + } + + ctx, cancel := context.WithTimeout(ctx, p.metadata.Timeout) + defer cancel() + + // Trim the trailing "||" from the prefix + result, err := p.db.Exec(ctx, "DELETE FROM "+p.metadata.TableName+" WHERE "+p.metadata.TableName+`_key_prefix("key") = $1`, strings.TrimSuffix(req.Prefix, "||")) + if err != nil { + return state.DeleteWithPrefixResponse{}, err + } + + return state.DeleteWithPrefixResponse{ + Count: result.RowsAffected(), + }, nil +} diff --git a/tests/config/state/tests.yml b/tests/config/state/tests.yml index 8e141eacfe..75848f5458 100644 --- a/tests/config/state/tests.yml +++ b/tests/config/state/tests.yml @@ -35,12 +35,12 @@ components: # This component requires etags to be hex-encoded numbers badEtag: "FFFF" - component: postgresql.v1.docker - operations: [ "transaction", "etag", "first-write", "query", "ttl" ] + operations: [ "transaction", "etag", "first-write", "query", "ttl", "delete-with-prefix" ] config: # This component requires etags to be numeric badEtag: "1" - component: postgresql.v1.azure - operations: [ "transaction", "etag", "first-write", "query", "ttl" ] + operations: [ "transaction", "etag", "first-write", "query", "ttl", "delete-with-prefix" ] config: # This component requires etags to be numeric badEtag: "1" @@ -83,7 +83,7 @@ components: # This component requires etags to be numeric badEtag: "9999999" - component: cockroachdb.v2 - operations: [ "transaction", "etag", "first-write", "ttl", "delete-with-prefix" ] + operations: [ "transaction", "etag", "first-write", "ttl" ] config: # This component requires etags to be UUIDs badEtag: "7b104dbd-1ae2-4772-bfa0-e29c7b89bc9b" From c5e9ae0a980b4c39b01c4a918a8dc3ffb74c3fb9 Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Sat, 6 Jan 2024 00:20:00 +0000 Subject: [PATCH 3/5] Revert "WIP: We need to find a solution because CockroachDB doesn't support UDFs" This reverts commit e4b3fbdf26eab9409a4d3479006751536cefa87f. Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- common/component/postgresql/v1/postgresql.go | 2 - .../postgresql/v1/postgresql_query.go | 1 - state/postgresql/v1/metadata.yaml | 1 - state/postgresql/v1/migrations.go | 45 +++++-------------- state/postgresql/v1/postgresql.go | 33 -------------- tests/config/state/tests.yml | 4 +- 6 files changed, 12 insertions(+), 74 deletions(-) diff --git a/common/component/postgresql/v1/postgresql.go b/common/component/postgresql/v1/postgresql.go index 7c55b43f4f..141242c604 100644 --- a/common/component/postgresql/v1/postgresql.go +++ b/common/component/postgresql/v1/postgresql.go @@ -64,7 +64,6 @@ type Options struct { type MigrateOptions struct { Logger logger.Logger StateTableName string - KeyPrefixFuncName string MetadataTableName string } @@ -121,7 +120,6 @@ func (p *PostgreSQL) Init(ctx context.Context, meta state.Metadata) error { err = p.migrateFn(ctx, p.db, MigrateOptions{ Logger: p.logger, StateTableName: p.metadata.TableName, - KeyPrefixFuncName: p.metadata.TableName + "_key_prefix", MetadataTableName: p.metadata.MetadataTableName, }) if err != nil { diff --git a/common/component/postgresql/v1/postgresql_query.go b/common/component/postgresql/v1/postgresql_query.go index 3b551e6fed..bd3c63e358 100644 --- a/common/component/postgresql/v1/postgresql_query.go +++ b/common/component/postgresql/v1/postgresql_query.go @@ -54,7 +54,6 @@ func (p *PostgreSQLQuery) Features() []state.Feature { state.FeatureTransactional, state.FeatureQueryAPI, state.FeatureTTL, - state.FeatureDeleteWithPrefix, } } diff --git a/state/postgresql/v1/metadata.yaml b/state/postgresql/v1/metadata.yaml index b7dc5acdae..fa3199e5c7 100644 --- a/state/postgresql/v1/metadata.yaml +++ b/state/postgresql/v1/metadata.yaml @@ -16,7 +16,6 @@ capabilities: - etag - query - ttl - - deleteWithPrefix builtinAuthenticationProfiles: - name: "azuread" metadata: diff --git a/state/postgresql/v1/migrations.go b/state/postgresql/v1/migrations.go index 3f0631c85a..b7034ab75b 100644 --- a/state/postgresql/v1/migrations.go +++ b/state/postgresql/v1/migrations.go @@ -39,14 +39,14 @@ func performMigrations(ctx context.Context, db pginterfaces.PGXPoolConn, opts po opts.Logger.Infof("Creating state table '%s'", opts.StateTableName) _, err := db.Exec( ctx, - fmt.Sprintf(` -CREATE TABLE IF NOT EXISTS %s ( - key text NOT NULL PRIMARY KEY, - value jsonb NOT NULL, - isbinary boolean NOT NULL, - insertdate TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), - updatedate TIMESTAMP WITH TIME ZONE NULL -)`, + fmt.Sprintf( + `CREATE TABLE IF NOT EXISTS %s ( + key text NOT NULL PRIMARY KEY, + value jsonb NOT NULL, + isbinary boolean NOT NULL, + insertdate TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + updatedate TIMESTAMP WITH TIME ZONE NULL + )`, opts.StateTableName, ), ) @@ -68,31 +68,6 @@ CREATE TABLE IF NOT EXISTS %s ( } return nil }, - // Migration 2: add the "key_prefix" function and "prefix" index to the state table - func(ctx context.Context) error { - // Create the "key_prefix" function - // Then add the "prefix" index to the state table that can be used by DeleteWithPrefix - opts.Logger.Infof("Creating function '%s' and adding 'prefix' index to table '%s'", opts.KeyPrefixFuncName, opts.StateTableName) - _, err := db.Exec( - ctx, - fmt.Sprintf(` -CREATE FUNCTION %[1]s(k text) RETURNS text -LANGUAGE SQL -IMMUTABLE -LEAKPROOF -RETURNS NULL ON NULL INPUT -RETURN - array_to_string(trim_array(string_to_array(k, '||'),1), '||'); - -CREATE INDEX %[2]s_prefix_idx ON %[2]s (%[1]s("key")) WHERE %[1]s("key") <> ''; -`, - opts.KeyPrefixFuncName, opts.StateTableName, - ), - ) - if err != nil { - return fmt.Errorf("failed to create virtual column: %w", err) - } - return nil - }, - }) + }, + ) } diff --git a/state/postgresql/v1/postgresql.go b/state/postgresql/v1/postgresql.go index 2917924d0e..d8d2be92da 100644 --- a/state/postgresql/v1/postgresql.go +++ b/state/postgresql/v1/postgresql.go @@ -14,9 +14,6 @@ limitations under the License. package postgresql import ( - "context" - "strings" - postgresql "github.com/dapr/components-contrib/common/component/postgresql/v1" "github.com/dapr/components-contrib/state" "github.com/dapr/kit/logger" @@ -64,33 +61,3 @@ func NewPostgreSQLStateStore(logger logger.Logger) state.Store { }, }) } - -// PostgreSQLStoreWithDeleteWithPrefix is a state store for PostgreSQL that implements the DeleteWithPrefix method -type PostgreSQLStoreWithDeleteWithPrefix struct { - state.Store -} - -// Features returns the features available in this state store. -func (p *PostgreSQLStoreWithDeleteWithPrefix) Features() []state.Feature { - return append(p.Store.Features(), state.FeatureDeleteWithPrefix) -} - -func (p *PostgreSQLStoreWithDeleteWithPrefix) DeleteWithPrefix(ctx context.Context, req state.DeleteWithPrefixRequest) (state.DeleteWithPrefixResponse, error) { - err := req.Validate() - if err != nil { - return state.DeleteWithPrefixResponse{}, err - } - - ctx, cancel := context.WithTimeout(ctx, p.metadata.Timeout) - defer cancel() - - // Trim the trailing "||" from the prefix - result, err := p.db.Exec(ctx, "DELETE FROM "+p.metadata.TableName+" WHERE "+p.metadata.TableName+`_key_prefix("key") = $1`, strings.TrimSuffix(req.Prefix, "||")) - if err != nil { - return state.DeleteWithPrefixResponse{}, err - } - - return state.DeleteWithPrefixResponse{ - Count: result.RowsAffected(), - }, nil -} diff --git a/tests/config/state/tests.yml b/tests/config/state/tests.yml index 75848f5458..b04ba6a14e 100644 --- a/tests/config/state/tests.yml +++ b/tests/config/state/tests.yml @@ -35,12 +35,12 @@ components: # This component requires etags to be hex-encoded numbers badEtag: "FFFF" - component: postgresql.v1.docker - operations: [ "transaction", "etag", "first-write", "query", "ttl", "delete-with-prefix" ] + operations: [ "transaction", "etag", "first-write", "query", "ttl" ] config: # This component requires etags to be numeric badEtag: "1" - component: postgresql.v1.azure - operations: [ "transaction", "etag", "first-write", "query", "ttl", "delete-with-prefix" ] + operations: [ "transaction", "etag", "first-write", "query", "ttl" ] config: # This component requires etags to be numeric badEtag: "1" From df402953255a349a7efdefca28a3e7a6057151cd Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Sat, 6 Jan 2024 00:49:21 +0000 Subject: [PATCH 4/5] Add option to disable DeleteWithPrefix, to support CockroachDB Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- state/postgresql/v2/postgresql.go | 95 ++++++++++++++++++++++--------- tests/conformance/state_test.go | 6 +- 2 files changed, 74 insertions(+), 27 deletions(-) diff --git a/state/postgresql/v2/postgresql.go b/state/postgresql/v2/postgresql.go index d4060722a3..ac546935fb 100644 --- a/state/postgresql/v2/postgresql.go +++ b/state/postgresql/v2/postgresql.go @@ -48,13 +48,22 @@ type PostgreSQL struct { gc sqlinternal.GarbageCollector - enableAzureAD bool + enableAzureAD bool + deleteWithPrefix bool +} + +// PostgreSQLDeleteWithPrefix extends PostgreSQL and adds support for DeleteWithPrefix. +type PostgreSQLDeleteWithPrefix struct { + *PostgreSQL } type Options struct { // Disables support for authenticating with Azure AD // This should be set to "false" when targeting different databases than PostgreSQL (such as CockroachDB) NoAzureAD bool + // Disables support for DeleteWithPrefix + // This should be set to "false" when targeting CockroachDB and other PostgreSQL-compatible database that don't offer the necessary capabilities + NoDeleteWithPrefix bool } // NewPostgreSQLStateStore creates a new instance of PostgreSQL state store v2 with the default options. @@ -67,11 +76,20 @@ func NewPostgreSQLStateStore(logger logger.Logger) state.Store { // NewPostgreSQLStateStoreWithOptions creates a new instance of PostgreSQL state store with options. func NewPostgreSQLStateStoreWithOptions(logger logger.Logger, opts Options) state.Store { s := &PostgreSQL{ - logger: logger, - enableAzureAD: !opts.NoAzureAD, + logger: logger, + enableAzureAD: !opts.NoAzureAD, + deleteWithPrefix: !opts.NoDeleteWithPrefix, } s.BulkStore = state.NewDefaultBulkStore(s) - return s + + // If we want DeleteWithPrefix support, wrap into a PostgreSQLDeleteWithPrefix object + if opts.NoDeleteWithPrefix { + return s + } + + return &PostgreSQLDeleteWithPrefix{ + PostgreSQL: s, + } } // Init sets up Postgres connection and performs migrations @@ -155,7 +173,7 @@ func (p *PostgreSQL) performMigrations(ctx context.Context) error { keyPrefixFunction := p.metadata.FunctionName(pgFunctionKeyPrefix) return m.Perform(ctx, []sqlinternal.MigrationFn{ - // Migration 0: create the table for state + // Migration 1: create the table for state func(ctx context.Context) error { p.logger.Infof("Creating state table: '%s'", stateTable) _, err := p.db.Exec(ctx, @@ -177,8 +195,13 @@ CREATE INDEX ON %[1]s (expires_at); } return nil }, - // Migration 1: add the "key_prefix" function and "prefix" index to the state table + // Migration 2: add the "key_prefix" function and "prefix" index to the state table + // If DeleteWithPrefix support is disabled, this is a no-op, but we keep the migration here because we want the migration level to increase, or bad things will happen if another migration is added in the future func(ctx context.Context) error { + if !p.deleteWithPrefix { + return nil + } + // Create the "key_prefix" function // Then add the "prefix" index to the state table that can be used by DeleteWithPrefix p.logger.Infof("Creating function '%s' and adding 'prefix' index to table '%s'", keyPrefixFunction, stateTable) @@ -209,6 +232,15 @@ CREATE INDEX %[2]s_prefix_idx ON %[2]s (%[1]s("key")) WHERE %[1]s("key") <> ''; // Features returns the features available in this state store. func (p *PostgreSQL) Features() []state.Feature { + return []state.Feature{ + state.FeatureETag, + state.FeatureTransactional, + state.FeatureTTL, + } +} + +// Features returns the features available in this state store. +func (p *PostgreSQLDeleteWithPrefix) Features() []state.Feature { return []state.Feature{ state.FeatureETag, state.FeatureTransactional, @@ -534,26 +566,6 @@ func (p *PostgreSQL) execMultiOperation(ctx context.Context, op state.Transactio } } -func (p *PostgreSQL) DeleteWithPrefix(ctx context.Context, req state.DeleteWithPrefixRequest) (state.DeleteWithPrefixResponse, error) { - err := req.Validate() - if err != nil { - return state.DeleteWithPrefixResponse{}, err - } - - ctx, cancel := context.WithTimeout(ctx, p.metadata.Timeout) - defer cancel() - - // Trim the trailing "||" from the prefix - result, err := p.db.Exec(ctx, "DELETE FROM "+p.metadata.TableName(pgTableState)+" WHERE "+p.metadata.FunctionName(pgFunctionKeyPrefix)+`("key") = $1`, strings.TrimSuffix(req.Prefix, "||")) - if err != nil { - return state.DeleteWithPrefixResponse{}, err - } - - return state.DeleteWithPrefixResponse{ - Count: result.RowsAffected(), - }, nil -} - func (p *PostgreSQL) CleanupExpired() error { if p.gc != nil { return p.gc.CleanupExpired() @@ -586,3 +598,34 @@ func (p *PostgreSQL) GetComponentMetadata() (metadataInfo metadata.MetadataMap) metadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, metadata.StateStoreType) return } + +func (p *PostgreSQLDeleteWithPrefix) DeleteWithPrefix(ctx context.Context, req state.DeleteWithPrefixRequest) (state.DeleteWithPrefixResponse, error) { + err := req.Validate() + if err != nil { + return state.DeleteWithPrefixResponse{}, err + } + + ctx, cancel := context.WithTimeout(ctx, p.metadata.Timeout) + defer cancel() + + // Trim the trailing "||" from the prefix + result, err := p.db.Exec(ctx, "DELETE FROM "+p.metadata.TableName(pgTableState)+" WHERE "+p.metadata.FunctionName(pgFunctionKeyPrefix)+`("key") = $1`, strings.TrimSuffix(req.Prefix, "||")) + if err != nil { + return state.DeleteWithPrefixResponse{}, err + } + + return state.DeleteWithPrefixResponse{ + Count: result.RowsAffected(), + }, nil +} + +// Compile-time interface assertions +var ( + _ state.Store = (*PostgreSQL)(nil) + _ state.TransactionalStore = (*PostgreSQL)(nil) + _ state.BulkStore = (*PostgreSQL)(nil) + _ state.Store = (*PostgreSQLDeleteWithPrefix)(nil) + _ state.TransactionalStore = (*PostgreSQLDeleteWithPrefix)(nil) + _ state.BulkStore = (*PostgreSQLDeleteWithPrefix)(nil) + _ state.DeleteWithPrefix = (*PostgreSQLDeleteWithPrefix)(nil) +) diff --git a/tests/conformance/state_test.go b/tests/conformance/state_test.go index c799508960..502f746ad0 100644 --- a/tests/conformance/state_test.go +++ b/tests/conformance/state_test.go @@ -121,7 +121,11 @@ func loadStateStore(name string) state.Store { case "cockroachdb.v2": // v2 of the component is an alias for the PostgreSQL state store // We still have a conformance test to validate that the component works with CockroachDB - return s_postgresql_v2.NewPostgreSQLStateStoreWithOptions(testLogger, s_postgresql_v2.Options{NoAzureAD: true}) + // CockroachDB does not support Azure AD authentication or the DeleteWithPrefix interface + return s_postgresql_v2.NewPostgreSQLStateStoreWithOptions(testLogger, s_postgresql_v2.Options{ + NoAzureAD: true, + NoDeleteWithPrefix: true, + }) case "memcached": return s_memcached.NewMemCacheStateStore(testLogger) case "rethinkdb": From 9c415f2bd9c388bd0116ec4cbd57dac33e7739da Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Sat, 6 Jan 2024 00:59:59 +0000 Subject: [PATCH 5/5] WIP: YugabyteDB support Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- .../infrastructure/docker-compose-yugabytedb.yml | 13 +++++++++++++ state/postgresql/v2/postgresql.go | 7 ++++--- tests/config/state/tests.yml | 5 +++++ tests/config/state/yugabytedb/v2/statestore.yml | 12 ++++++++++++ tests/conformance/state_test.go | 2 ++ 5 files changed, 36 insertions(+), 3 deletions(-) create mode 100644 .github/infrastructure/docker-compose-yugabytedb.yml create mode 100644 tests/config/state/yugabytedb/v2/statestore.yml diff --git a/.github/infrastructure/docker-compose-yugabytedb.yml b/.github/infrastructure/docker-compose-yugabytedb.yml new file mode 100644 index 0000000000..1ed657533f --- /dev/null +++ b/.github/infrastructure/docker-compose-yugabytedb.yml @@ -0,0 +1,13 @@ +version: '3' + +services: + yugabyte: + image: yugabytedb/yugabyte:2.19.3.0-b140 + command: "bin/yugabyted start --daemon=false" + ports: + - 7000:7000 + - 9000:9000 + - 15433:15433 + - 5433:5433 + - 9042:9042 + diff --git a/state/postgresql/v2/postgresql.go b/state/postgresql/v2/postgresql.go index ac546935fb..1ed3f11e71 100644 --- a/state/postgresql/v2/postgresql.go +++ b/state/postgresql/v2/postgresql.go @@ -214,8 +214,9 @@ LANGUAGE SQL IMMUTABLE LEAKPROOF RETURNS NULL ON NULL INPUT -RETURN - array_to_string(trim_array(string_to_array(k, '||'),1), '||'); +AS $$ + SELECT array_to_string(trim_array(string_to_array(k, '||'),1), '||'); +$$; CREATE INDEX %[2]s_prefix_idx ON %[2]s (%[1]s("key")) WHERE %[1]s("key") <> ''; `, @@ -223,7 +224,7 @@ CREATE INDEX %[2]s_prefix_idx ON %[2]s (%[1]s("key")) WHERE %[1]s("key") <> ''; ), ) if err != nil { - return fmt.Errorf("failed to create virtual column: %w", err) + return err } return nil }, diff --git a/tests/config/state/tests.yml b/tests/config/state/tests.yml index b04ba6a14e..8af3bbe4d3 100644 --- a/tests/config/state/tests.yml +++ b/tests/config/state/tests.yml @@ -54,6 +54,11 @@ components: config: # This component requires etags to be UUIDs badEtag: "e9b9e142-74b1-4a2e-8e90-3f4ffeea2e70" + - component: yugabytedb.v2 + operations: [ "transaction", "etag", "first-write", "ttl", "delete-with-prefix" ] + config: + # This component requires etags to be UUIDs + badEtag: "e9b9e142-74b1-4a2e-8e90-3f4ffeea2e70" - component: sqlite operations: [ "transaction", "etag", "first-write", "ttl", "delete-with-prefix" ] - component: mysql.mysql diff --git a/tests/config/state/yugabytedb/v2/statestore.yml b/tests/config/state/yugabytedb/v2/statestore.yml new file mode 100644 index 0000000000..a475d10a7f --- /dev/null +++ b/tests/config/state/yugabytedb/v2/statestore.yml @@ -0,0 +1,12 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore +spec: + type: state.yugabytedb + version: v2 + metadata: + - name: connectionString + value: "host=localhost user=yugabyte password=yugabyte port=5433 connect_timeout=10 database=yugabyte" + - name: tablePrefix + value: "confv2_" diff --git a/tests/conformance/state_test.go b/tests/conformance/state_test.go index 502f746ad0..8e91e0ad0b 100644 --- a/tests/conformance/state_test.go +++ b/tests/conformance/state_test.go @@ -100,6 +100,8 @@ func loadStateStore(name string) state.Store { return s_postgresql_v2.NewPostgreSQLStateStore(testLogger) case "postgresql.v2.azure": return s_postgresql_v2.NewPostgreSQLStateStore(testLogger) + case "yugabytedb.v2": + return s_postgresql_v2.NewPostgreSQLStateStore(testLogger) case "sqlite": return s_sqlite.NewSQLiteStateStore(testLogger) case "mysql.mysql":