diff --git a/.github/infrastructure/docker-compose-cockroachdb.yml b/.github/infrastructure/docker-compose-cockroachdb.yml index 2fa6bff9e4..75582673bf 100644 --- a/.github/infrastructure/docker-compose-cockroachdb.yml +++ b/.github/infrastructure/docker-compose-cockroachdb.yml @@ -1,9 +1,9 @@ version: '2' services: cockroachdb: - image: cockroachdb/cockroach:v21.2.3 + image: cockroachdb/cockroach:v23.1.13 hostname: cockroachdb - command: start-single-node --cluster-name=single-node --logtostderr=WARNING --log-file-verbosity=WARNING --insecure + command: start-single-node --cluster-name=single-node --insecure restart: always ports: - "26257:26257" diff --git a/.github/infrastructure/docker-compose-yugabytedb.yml b/.github/infrastructure/docker-compose-yugabytedb.yml new file mode 100644 index 0000000000..1ed657533f --- /dev/null +++ b/.github/infrastructure/docker-compose-yugabytedb.yml @@ -0,0 +1,13 @@ +version: '3' + +services: + yugabyte: + image: yugabytedb/yugabyte:2.19.3.0-b140 + command: "bin/yugabyted start --daemon=false" + ports: + - 7000:7000 + - 9000:9000 + - 15433:15433 + - 5433:5433 + - 9042:9042 + diff --git a/state/cockroachdb/cockroachdb.go b/state/cockroachdb/cockroachdb.go index 8e8bbdc910..df9f5d46d1 100644 --- a/state/cockroachdb/cockroachdb.go +++ b/state/cockroachdb/cockroachdb.go @@ -72,12 +72,13 @@ WHERE func ensureTables(ctx context.Context, db pginterfaces.PGXPoolConn, opts postgresql.MigrateOptions) error { exists, err := tableExists(ctx, db, opts.StateTableName) if err != nil { - return err + return fmt.Errorf("failed to check if table '%s' exists: %w", opts.StateTableName, err) } if !exists { opts.Logger.Info("Creating CockroachDB state table") - _, err = db.Exec(ctx, fmt.Sprintf(`CREATE TABLE %s ( + _, err = db.Exec(ctx, fmt.Sprintf(` +CREATE TABLE %s ( key text NOT NULL PRIMARY KEY, value jsonb NOT NULL, isbinary boolean NOT NULL, @@ -86,9 +87,10 @@ func ensureTables(ctx context.Context, db pginterfaces.PGXPoolConn, opts postgre updatedate TIMESTAMP WITH TIME ZONE NULL, expiredate TIMESTAMP WITH TIME ZONE NULL, INDEX expiredate_idx (expiredate) -);`, opts.StateTableName)) +)`, + opts.StateTableName)) if err != nil { - return err + return fmt.Errorf("failed to create state table: %w", err) } } @@ -96,27 +98,29 @@ func ensureTables(ctx context.Context, db pginterfaces.PGXPoolConn, opts postgre _, err = db.Exec(ctx, fmt.Sprintf( `ALTER TABLE %s ADD COLUMN IF NOT EXISTS expiredate TIMESTAMP WITH TIME ZONE NULL;`, opts.StateTableName)) if err != nil { - return err + return fmt.Errorf("failed to add expiredate column to state table: %w", err) } _, err = db.Exec(ctx, fmt.Sprintf( `CREATE INDEX IF NOT EXISTS expiredate_idx ON %s (expiredate);`, opts.StateTableName)) if err != nil { - return err + return fmt.Errorf("failed to create expiredate index on state table: %w", err) } exists, err = tableExists(ctx, db, opts.MetadataTableName) if err != nil { - return err + return fmt.Errorf("failed to check if table '%s' exists: %w", opts.MetadataTableName, err) } if !exists { opts.Logger.Info("Creating CockroachDB metadata table") - _, err = db.Exec(ctx, fmt.Sprintf(`CREATE TABLE %s ( - key text NOT NULL PRIMARY KEY, - value text NOT NULL -);`, opts.MetadataTableName)) + _, err = db.Exec(ctx, fmt.Sprintf(` +CREATE TABLE %s ( + key text NOT NULL PRIMARY KEY, + value text NOT NULL +);`, + opts.MetadataTableName)) if err != nil { - return err + return fmt.Errorf("failed to create metadata table: %w", err) } } @@ -125,6 +129,6 @@ func ensureTables(ctx context.Context, db pginterfaces.PGXPoolConn, opts postgre func tableExists(ctx context.Context, db pginterfaces.PGXPoolConn, tableName string) (bool, error) { exists := false - err := db.QueryRow(ctx, "SELECT EXISTS (SELECT * FROM pg_tables where tablename = $1)", tableName).Scan(&exists) + err := db.QueryRow(ctx, "SELECT EXISTS (SELECT * FROM pg_tables WHERE tablename = $1)", tableName).Scan(&exists) return exists, err } diff --git a/state/cockroachdb/metadata.yaml b/state/cockroachdb/metadata.yaml index 6efc793f79..2d24ad79f7 100644 --- a/state/cockroachdb/metadata.yaml +++ b/state/cockroachdb/metadata.yaml @@ -9,11 +9,11 @@ urls: - title: Reference url: https://docs.dapr.io/reference/components-reference/supported-state-stores/setup-cockroachdb/ capabilities: + - actorStateStore - crud - transactional - etag - ttl - - actorStateStore authenticationProfiles: - title: "Connection string" description: "Authenticate using a Connection String" diff --git a/state/postgresql/v2/metadata.go b/state/postgresql/v2/metadata.go index a21829807d..e01fa52fcd 100644 --- a/state/postgresql/v2/metadata.go +++ b/state/postgresql/v2/metadata.go @@ -23,10 +23,15 @@ import ( "github.com/dapr/kit/ptr" ) -type pgTable string +type ( + pgTable string + pgFunction string +) const ( pgTableState pgTable = "state" + + pgFunctionKeyPrefix pgFunction = "key_prefix" ) const ( @@ -85,3 +90,7 @@ func (m *pgMetadata) InitWithMetadata(meta state.Metadata, azureADEnabled bool) func (m pgMetadata) TableName(table pgTable) string { return m.TablePrefix + string(table) } + +func (m pgMetadata) FunctionName(function pgFunction) string { + return m.TablePrefix + string(function) +} diff --git a/state/postgresql/v2/metadata.yaml b/state/postgresql/v2/metadata.yaml index de53cdbcbc..bdb91c7c12 100644 --- a/state/postgresql/v2/metadata.yaml +++ b/state/postgresql/v2/metadata.yaml @@ -15,6 +15,7 @@ capabilities: - transactional - etag - ttl + - deleteWithPrefix builtinAuthenticationProfiles: - name: "azuread" metadata: diff --git a/state/postgresql/v2/postgresql.go b/state/postgresql/v2/postgresql.go index 1cbb537d85..1ed3f11e71 100644 --- a/state/postgresql/v2/postgresql.go +++ b/state/postgresql/v2/postgresql.go @@ -20,6 +20,7 @@ import ( "fmt" "reflect" "strconv" + "strings" "time" "github.com/google/uuid" @@ -47,13 +48,22 @@ type PostgreSQL struct { gc sqlinternal.GarbageCollector - enableAzureAD bool + enableAzureAD bool + deleteWithPrefix bool +} + +// PostgreSQLDeleteWithPrefix extends PostgreSQL and adds support for DeleteWithPrefix. +type PostgreSQLDeleteWithPrefix struct { + *PostgreSQL } type Options struct { // Disables support for authenticating with Azure AD // This should be set to "false" when targeting different databases than PostgreSQL (such as CockroachDB) NoAzureAD bool + // Disables support for DeleteWithPrefix + // This should be set to "false" when targeting CockroachDB and other PostgreSQL-compatible database that don't offer the necessary capabilities + NoDeleteWithPrefix bool } // NewPostgreSQLStateStore creates a new instance of PostgreSQL state store v2 with the default options. @@ -66,11 +76,20 @@ func NewPostgreSQLStateStore(logger logger.Logger) state.Store { // NewPostgreSQLStateStoreWithOptions creates a new instance of PostgreSQL state store with options. func NewPostgreSQLStateStoreWithOptions(logger logger.Logger, opts Options) state.Store { s := &PostgreSQL{ - logger: logger, - enableAzureAD: !opts.NoAzureAD, + logger: logger, + enableAzureAD: !opts.NoAzureAD, + deleteWithPrefix: !opts.NoDeleteWithPrefix, } s.BulkStore = state.NewDefaultBulkStore(s) - return s + + // If we want DeleteWithPrefix support, wrap into a PostgreSQLDeleteWithPrefix object + if opts.NoDeleteWithPrefix { + return s + } + + return &PostgreSQLDeleteWithPrefix{ + PostgreSQL: s, + } } // Init sets up Postgres connection and performs migrations @@ -151,6 +170,7 @@ func (p *PostgreSQL) performMigrations(ctx context.Context) error { } stateTable := p.metadata.TableName(pgTableState) + keyPrefixFunction := p.metadata.FunctionName(pgFunctionKeyPrefix) return m.Perform(ctx, []sqlinternal.MigrationFn{ // Migration 1: create the table for state @@ -175,6 +195,39 @@ CREATE INDEX ON %[1]s (expires_at); } return nil }, + // Migration 2: add the "key_prefix" function and "prefix" index to the state table + // If DeleteWithPrefix support is disabled, this is a no-op, but we keep the migration here because we want the migration level to increase, or bad things will happen if another migration is added in the future + func(ctx context.Context) error { + if !p.deleteWithPrefix { + return nil + } + + // Create the "key_prefix" function + // Then add the "prefix" index to the state table that can be used by DeleteWithPrefix + p.logger.Infof("Creating function '%s' and adding 'prefix' index to table '%s'", keyPrefixFunction, stateTable) + _, err := p.db.Exec( + ctx, + fmt.Sprintf( + ` +CREATE FUNCTION %[1]s(k text) RETURNS text +LANGUAGE SQL +IMMUTABLE +LEAKPROOF +RETURNS NULL ON NULL INPUT +AS $$ + SELECT array_to_string(trim_array(string_to_array(k, '||'),1), '||'); +$$; + +CREATE INDEX %[2]s_prefix_idx ON %[2]s (%[1]s("key")) WHERE %[1]s("key") <> ''; +`, + keyPrefixFunction, stateTable, + ), + ) + if err != nil { + return err + } + return nil + }, }) } @@ -187,6 +240,16 @@ func (p *PostgreSQL) Features() []state.Feature { } } +// Features returns the features available in this state store. +func (p *PostgreSQLDeleteWithPrefix) Features() []state.Feature { + return []state.Feature{ + state.FeatureETag, + state.FeatureTransactional, + state.FeatureTTL, + state.FeatureDeleteWithPrefix, + } +} + func (p *PostgreSQL) GetDB() *pgxpool.Pool { // We can safely cast to *pgxpool.Pool because this method is never used in unit tests where we mock the DB return p.db.(*pgxpool.Pool) @@ -536,3 +599,34 @@ func (p *PostgreSQL) GetComponentMetadata() (metadataInfo metadata.MetadataMap) metadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, metadata.StateStoreType) return } + +func (p *PostgreSQLDeleteWithPrefix) DeleteWithPrefix(ctx context.Context, req state.DeleteWithPrefixRequest) (state.DeleteWithPrefixResponse, error) { + err := req.Validate() + if err != nil { + return state.DeleteWithPrefixResponse{}, err + } + + ctx, cancel := context.WithTimeout(ctx, p.metadata.Timeout) + defer cancel() + + // Trim the trailing "||" from the prefix + result, err := p.db.Exec(ctx, "DELETE FROM "+p.metadata.TableName(pgTableState)+" WHERE "+p.metadata.FunctionName(pgFunctionKeyPrefix)+`("key") = $1`, strings.TrimSuffix(req.Prefix, "||")) + if err != nil { + return state.DeleteWithPrefixResponse{}, err + } + + return state.DeleteWithPrefixResponse{ + Count: result.RowsAffected(), + }, nil +} + +// Compile-time interface assertions +var ( + _ state.Store = (*PostgreSQL)(nil) + _ state.TransactionalStore = (*PostgreSQL)(nil) + _ state.BulkStore = (*PostgreSQL)(nil) + _ state.Store = (*PostgreSQLDeleteWithPrefix)(nil) + _ state.TransactionalStore = (*PostgreSQLDeleteWithPrefix)(nil) + _ state.BulkStore = (*PostgreSQLDeleteWithPrefix)(nil) + _ state.DeleteWithPrefix = (*PostgreSQLDeleteWithPrefix)(nil) +) diff --git a/tests/config/state/tests.yml b/tests/config/state/tests.yml index 5e5328e098..989f5eb04d 100644 --- a/tests/config/state/tests.yml +++ b/tests/config/state/tests.yml @@ -45,12 +45,17 @@ components: # This component requires etags to be numeric badEtag: "1" - component: postgresql.v2.docker - operations: [ "transaction", "etag", "first-write", "ttl" ] + operations: [ "transaction", "etag", "first-write", "ttl", "delete-with-prefix" ] config: # This component requires etags to be UUIDs badEtag: "e9b9e142-74b1-4a2e-8e90-3f4ffeea2e70" - component: postgresql.v2.azure - operations: [ "transaction", "etag", "first-write", "ttl" ] + operations: [ "transaction", "etag", "first-write", "ttl", "delete-with-prefix" ] + config: + # This component requires etags to be UUIDs + badEtag: "e9b9e142-74b1-4a2e-8e90-3f4ffeea2e70" + - component: yugabytedb.v2 + operations: [ "transaction", "etag", "first-write", "ttl", "delete-with-prefix" ] config: # This component requires etags to be UUIDs badEtag: "e9b9e142-74b1-4a2e-8e90-3f4ffeea2e70" diff --git a/tests/config/state/yugabytedb/v2/statestore.yml b/tests/config/state/yugabytedb/v2/statestore.yml new file mode 100644 index 0000000000..a475d10a7f --- /dev/null +++ b/tests/config/state/yugabytedb/v2/statestore.yml @@ -0,0 +1,12 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore +spec: + type: state.yugabytedb + version: v2 + metadata: + - name: connectionString + value: "host=localhost user=yugabyte password=yugabyte port=5433 connect_timeout=10 database=yugabyte" + - name: tablePrefix + value: "confv2_" diff --git a/tests/conformance/state_test.go b/tests/conformance/state_test.go index c799508960..8e91e0ad0b 100644 --- a/tests/conformance/state_test.go +++ b/tests/conformance/state_test.go @@ -100,6 +100,8 @@ func loadStateStore(name string) state.Store { return s_postgresql_v2.NewPostgreSQLStateStore(testLogger) case "postgresql.v2.azure": return s_postgresql_v2.NewPostgreSQLStateStore(testLogger) + case "yugabytedb.v2": + return s_postgresql_v2.NewPostgreSQLStateStore(testLogger) case "sqlite": return s_sqlite.NewSQLiteStateStore(testLogger) case "mysql.mysql": @@ -121,7 +123,11 @@ func loadStateStore(name string) state.Store { case "cockroachdb.v2": // v2 of the component is an alias for the PostgreSQL state store // We still have a conformance test to validate that the component works with CockroachDB - return s_postgresql_v2.NewPostgreSQLStateStoreWithOptions(testLogger, s_postgresql_v2.Options{NoAzureAD: true}) + // CockroachDB does not support Azure AD authentication or the DeleteWithPrefix interface + return s_postgresql_v2.NewPostgreSQLStateStoreWithOptions(testLogger, s_postgresql_v2.Options{ + NoAzureAD: true, + NoDeleteWithPrefix: true, + }) case "memcached": return s_memcached.NewMemCacheStateStore(testLogger) case "rethinkdb":