Skip to content

Commit

Permalink
Add optional limit on DeleteRelationships calls in the datastore
Browse files Browse the repository at this point in the history
  • Loading branch information
josephschorr committed Feb 29, 2024
1 parent 3387b62 commit 8476881
Show file tree
Hide file tree
Showing 16 changed files with 443 additions and 72 deletions.
24 changes: 20 additions & 4 deletions internal/datastore/crdb/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common"
log "github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
)

Expand Down Expand Up @@ -197,7 +198,7 @@ func exactRelationshipClause(r *core.RelationTuple) sq.Eq {
}
}

func (rwt *crdbReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter) error {
func (rwt *crdbReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) {
// Add clauses for the ResourceFilter
query := queryDeleteTuples.Where(sq.Eq{colNamespace: filter.ResourceType})
if filter.OptionalResourceId != "" {
Expand All @@ -219,19 +220,34 @@ func (rwt *crdbReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1
}
rwt.addOverlapKey(subjectFilter.SubjectType)
}

// Add the limit, if any.
delOpts := options.NewDeleteOptionsWithOptionsAndDefaults(opts...)
var delLimit uint64
if delOpts.DeleteLimit != nil && *delOpts.DeleteLimit > 0 {
delLimit = *delOpts.DeleteLimit
}

if delLimit > 0 {
query = query.Limit(delLimit)
}

sql, args, err := query.ToSql()
if err != nil {
return fmt.Errorf(errUnableToDeleteRelationships, err)
return false, fmt.Errorf(errUnableToDeleteRelationships, err)
}

modified, err := rwt.tx.Exec(ctx, sql, args...)
if err != nil {
return fmt.Errorf(errUnableToDeleteRelationships, err)
return false, fmt.Errorf(errUnableToDeleteRelationships, err)
}

rwt.relCountChange -= modified.RowsAffected()
if delLimit > 0 && uint64(modified.RowsAffected()) == delLimit {
return true, nil
}

return nil
return false, nil
}

func (rwt *crdbReadWriteTXN) WriteNamespaces(ctx context.Context, newConfigs ...*core.NamespaceDefinition) error {
Expand Down
34 changes: 25 additions & 9 deletions internal/datastore/memdb/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
"github.com/authzed/spicedb/pkg/tuple"
)
Expand Down Expand Up @@ -117,38 +118,53 @@ func (rwt *memdbReadWriteTx) toCaveatReference(mutation *core.RelationTupleUpdat
return cr
}

func (rwt *memdbReadWriteTx) DeleteRelationships(_ context.Context, filter *v1.RelationshipFilter) error {
func (rwt *memdbReadWriteTx) DeleteRelationships(_ context.Context, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) {
rwt.mustLock()
defer rwt.Unlock()

tx, err := rwt.txSource()
if err != nil {
return err
return false, err
}

delOpts := options.NewDeleteOptionsWithOptionsAndDefaults(opts...)
var delLimit uint64
if delOpts.DeleteLimit != nil && *delOpts.DeleteLimit > 0 {
delLimit = *delOpts.DeleteLimit
}

return rwt.deleteWithLock(tx, filter)
return rwt.deleteWithLock(tx, filter, delLimit)
}

// caller must already hold the concurrent access lock
func (rwt *memdbReadWriteTx) deleteWithLock(tx *memdb.Txn, filter *v1.RelationshipFilter) error {
func (rwt *memdbReadWriteTx) deleteWithLock(tx *memdb.Txn, filter *v1.RelationshipFilter, limit uint64) (bool, error) {
// Create an iterator to find the relevant tuples
bestIter, err := iteratorForFilter(tx, datastore.RelationshipsFilterFromPublicFilter(filter))
if err != nil {
return err
return false, err
}
filteredIter := memdb.NewFilterIterator(bestIter, relationshipFilterFilterFunc(filter))

// Collect the tuples into a slice of mutations for the changelog
var mutations []*core.RelationTupleUpdate
var counter uint64

metLimit := false
for row := filteredIter.Next(); row != nil; row = filteredIter.Next() {
rt, err := row.(*relationship).RelationTuple()
if err != nil {
return err
return false, err
}
mutations = append(mutations, tuple.Delete(rt))
counter++

if limit > 0 && counter == limit {
metLimit = true
break
}
}

return rwt.write(tx, mutations...)
return metLimit, rwt.write(tx, mutations...)
}

func (rwt *memdbReadWriteTx) WriteNamespaces(_ context.Context, newConfigs ...*core.NamespaceDefinition) error {
Expand Down Expand Up @@ -201,9 +217,9 @@ func (rwt *memdbReadWriteTx) DeleteNamespaces(_ context.Context, nsNames ...stri
}

// Delete the relationships from the namespace
if err := rwt.deleteWithLock(tx, &v1.RelationshipFilter{
if _, err := rwt.deleteWithLock(tx, &v1.RelationshipFilter{
ResourceType: nsName,
}); err != nil {
}, 0); err != nil {
return fmt.Errorf("unable to delete relationships from deleted namespace: %w", err)
}
}
Expand Down
32 changes: 27 additions & 5 deletions internal/datastore/mysql/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/authzed/spicedb/internal/datastore/common"
log "github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
"github.com/authzed/spicedb/pkg/spiceerrors"
"github.com/authzed/spicedb/pkg/tuple"
Expand Down Expand Up @@ -213,7 +214,7 @@ func (rwt *mysqlReadWriteTXN) WriteRelationships(ctx context.Context, mutations
return nil
}

func (rwt *mysqlReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter) error {
func (rwt *mysqlReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) {
// Add clauses for the ResourceFilter
query := rwt.DeleteTupleQuery.Where(sq.Eq{colNamespace: filter.ResourceType})
if filter.OptionalResourceId != "" {
Expand All @@ -236,16 +237,37 @@ func (rwt *mysqlReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v

query = query.Set(colDeletedTxn, rwt.newTxnID)

// Add the limit, if any.
delOpts := options.NewDeleteOptionsWithOptionsAndDefaults(opts...)
var delLimit uint64
if delOpts.DeleteLimit != nil && *delOpts.DeleteLimit > 0 {
delLimit = *delOpts.DeleteLimit
}

if delLimit > 0 {
query = query.Limit(delLimit)
}

querySQL, args, err := query.ToSql()
if err != nil {
return fmt.Errorf(errUnableToDeleteRelationships, err)
return false, fmt.Errorf(errUnableToDeleteRelationships, err)
}

if _, err := rwt.tx.ExecContext(ctx, querySQL, args...); err != nil {
return fmt.Errorf(errUnableToDeleteRelationships, err)
modified, err := rwt.tx.ExecContext(ctx, querySQL, args...)
if err != nil {
return false, fmt.Errorf(errUnableToDeleteRelationships, err)
}

return nil
rowsAffected, err := modified.RowsAffected()
if err != nil {
return false, fmt.Errorf(errUnableToDeleteRelationships, err)
}

if delLimit > 0 && uint64(rowsAffected) == delLimit {
return true, nil
}

return false, nil
}

func (rwt *mysqlReadWriteTXN) WriteNamespaces(ctx context.Context, newNamespaces ...*core.NamespaceDefinition) error {
Expand Down
80 changes: 76 additions & 4 deletions internal/datastore/postgres/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ import (
"errors"
"fmt"

"github.com/authzed/spicedb/pkg/datastore/options"
"github.com/authzed/spicedb/pkg/spiceerrors"

"github.com/authzed/spicedb/pkg/tuple"

sq "github.com/Masterminds/squirrel"
v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
"github.com/jackc/pgx/v5"
Expand All @@ -18,6 +17,7 @@ import (
pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common"
"github.com/authzed/spicedb/pkg/datastore"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
"github.com/authzed/spicedb/pkg/tuple"
)

const (
Expand Down Expand Up @@ -48,7 +48,15 @@ var (
colCaveatContext,
)

deleteTuple = psql.Update(tableTuple).Where(sq.Eq{colDeletedXid: liveDeletedTxnID})
deleteTuple = psql.Update(tableTuple).Where(sq.Eq{colDeletedXid: liveDeletedTxnID})
selectForDelete = psql.Select(
colNamespace,
colObjectID,
colRelation,
colUsersetNamespace,
colUsersetObjectID,
colUsersetRelation,
).From(tableTuple).Where(sq.Eq{colDeletedXid: liveDeletedTxnID})
)

type pgReadWriteTXN struct {
Expand Down Expand Up @@ -269,7 +277,71 @@ func (rwt *pgReadWriteTXN) WriteRelationships(ctx context.Context, mutations []*
return nil
}

func (rwt *pgReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter) error {
func (rwt *pgReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) {
delOpts := options.NewDeleteOptionsWithOptionsAndDefaults(opts...)
if delOpts.DeleteLimit != nil && *delOpts.DeleteLimit > 0 {
return rwt.deleteRelationshipsWithLimit(ctx, filter, *delOpts.DeleteLimit)
}

return false, rwt.deleteRelationships(ctx, filter)
}

func (rwt *pgReadWriteTXN) deleteRelationshipsWithLimit(ctx context.Context, filter *v1.RelationshipFilter, limit uint64) (bool, error) {
// Construct a select query for the relationships to be removed.
query := selectForDelete.Where(sq.Eq{colNamespace: filter.ResourceType})
if filter.OptionalResourceId != "" {
query = query.Where(sq.Eq{colObjectID: filter.OptionalResourceId})
}
if filter.OptionalRelation != "" {
query = query.Where(sq.Eq{colRelation: filter.OptionalRelation})
}

// Add clauses for the SubjectFilter
if subjectFilter := filter.OptionalSubjectFilter; subjectFilter != nil {
query = query.Where(sq.Eq{colUsersetNamespace: subjectFilter.SubjectType})
if subjectFilter.OptionalSubjectId != "" {
query = query.Where(sq.Eq{colUsersetObjectID: subjectFilter.OptionalSubjectId})
}
if relationFilter := subjectFilter.OptionalRelation; relationFilter != nil {
query = query.Where(sq.Eq{colUsersetRelation: stringz.DefaultEmpty(relationFilter.Relation, datastore.Ellipsis)})
}
}

query = query.Limit(limit)

selectSQL, args, err := query.ToSql()
if err != nil {
return false, fmt.Errorf(errUnableToDeleteRelationships, err)
}

args = append(args, rwt.newXID)
if len(args) != 3 {
return false, spiceerrors.MustBugf("expected 3 arguments, got %d", len(args))
}

// Construct a CTE to update the relationships as removed.
cteSQL := fmt.Sprintf(
"WITH found_tuples AS (%s)\nUPDATE %s SET %s = $3 WHERE (%s, %s, %s, %s, %s, %s) IN (select * from found_tuples)",
selectSQL,
tableTuple,
colDeletedXid,
colNamespace,
colObjectID,
colRelation,
colUsersetNamespace,
colUsersetObjectID,
colUsersetRelation,
)

result, err := rwt.tx.Exec(ctx, cteSQL, args...)
if err != nil {
return false, fmt.Errorf(errUnableToDeleteRelationships, err)
}

return result.RowsAffected() == int64(limit), nil
}

func (rwt *pgReadWriteTXN) deleteRelationships(ctx context.Context, filter *v1.RelationshipFilter) error {
// Add clauses for the ResourceFilter
query := deleteTuple.Where(sq.Eq{colNamespace: filter.ResourceType})
if filter.OptionalResourceId != "" {
Expand Down
2 changes: 1 addition & 1 deletion internal/datastore/proxy/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func (rwt *observableRWT) DeleteNamespaces(ctx context.Context, nsNames ...strin
return rwt.delegate.DeleteNamespaces(ctx, nsNames...)
}

func (rwt *observableRWT) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter) error {
func (rwt *observableRWT) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, options ...options.DeleteOptionsOption) (bool, error) {
ctx, closer := observe(ctx, "DeleteRelationships", trace.WithAttributes(
filterToAttributes(filter)...,
))
Expand Down
4 changes: 2 additions & 2 deletions internal/datastore/proxy/proxy_test/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,9 @@ func (dm *MockReadWriteTransaction) WriteRelationships(_ context.Context, mutati
return args.Error(0)
}

func (dm *MockReadWriteTransaction) DeleteRelationships(_ context.Context, filter *v1.RelationshipFilter) error {
func (dm *MockReadWriteTransaction) DeleteRelationships(_ context.Context, filter *v1.RelationshipFilter, options ...options.DeleteOptionsOption) (bool, error) {
args := dm.Called(filter)
return args.Error(0)
return false, args.Error(0)
}

func (dm *MockReadWriteTransaction) WriteNamespaces(_ context.Context, newConfigs ...*core.NamespaceDefinition) error {
Expand Down
9 changes: 9 additions & 0 deletions internal/datastore/spanner/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,15 @@ var queryTuples = sql.Select(
colCaveatContext,
).From(tableRelationship)

var queryTuplesForDelete = sql.Select(
colNamespace,
colObjectID,
colRelation,
colUsersetNamespace,
colUsersetObjectID,
colUsersetRelation,
).From(tableRelationship)

var schema = common.NewSchemaInformation(
colNamespace,
colObjectID,
Expand Down
Loading

0 comments on commit 8476881

Please sign in to comment.