Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete performance improvements #1771

Merged
merged 3 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 38 additions & 11 deletions internal/datastore/crdb/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common"
log "github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
)

Expand Down Expand Up @@ -96,6 +97,10 @@ func (rwt *crdbReadWriteTXN) WriteRelationships(ctx context.Context, mutations [
bulkTouch := queryTouchTuple
var bulkTouchCount int64

bulkDelete := queryDeleteTuples
bulkDeleteOr := sq.Or{}
var bulkDeleteCount int64

// Process the actual updates
for _, mutation := range mutations {
rel := mutation.Tuple
Expand Down Expand Up @@ -139,20 +144,27 @@ func (rwt *crdbReadWriteTXN) WriteRelationships(ctx context.Context, mutations [
bulkWriteCount++
case core.RelationTupleUpdate_DELETE:
rwt.relCountChange--
sql, args, err := queryDeleteTuples.Where(exactRelationshipClause(rel)).ToSql()
if err != nil {
return fmt.Errorf(errUnableToWriteRelationships, err)
}
bulkDeleteOr = append(bulkDeleteOr, exactRelationshipClause(rel))
bulkDeleteCount++

if _, err := rwt.tx.Exec(ctx, sql, args...); err != nil {
return fmt.Errorf(errUnableToWriteRelationships, err)
}
default:
log.Ctx(ctx).Error().Stringer("operation", mutation.Operation).Msg("unknown operation type")
return fmt.Errorf("unknown mutation operation: %s", mutation.Operation)
}
}

if bulkDeleteCount > 0 {
bulkDelete = bulkDelete.Where(bulkDeleteOr)
sql, args, err := bulkDelete.ToSql()
if err != nil {
return fmt.Errorf(errUnableToWriteRelationships, err)
}

if _, err := rwt.tx.Exec(ctx, sql, args...); err != nil {
return fmt.Errorf(errUnableToWriteRelationships, err)
}
}

bulkUpdateQueries := make([]sq.InsertBuilder, 0, 2)
if bulkWriteCount > 0 {
bulkUpdateQueries = append(bulkUpdateQueries, bulkWrite)
Expand Down Expand Up @@ -186,7 +198,7 @@ func exactRelationshipClause(r *core.RelationTuple) sq.Eq {
}
}

func (rwt *crdbReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter) error {
func (rwt *crdbReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) {
// Add clauses for the ResourceFilter
query := queryDeleteTuples.Where(sq.Eq{colNamespace: filter.ResourceType})
if filter.OptionalResourceId != "" {
Expand All @@ -208,19 +220,34 @@ func (rwt *crdbReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1
}
rwt.addOverlapKey(subjectFilter.SubjectType)
}

// Add the limit, if any.
delOpts := options.NewDeleteOptionsWithOptionsAndDefaults(opts...)
var delLimit uint64
if delOpts.DeleteLimit != nil && *delOpts.DeleteLimit > 0 {
delLimit = *delOpts.DeleteLimit
}

if delLimit > 0 {
query = query.Limit(delLimit)
}

sql, args, err := query.ToSql()
if err != nil {
return fmt.Errorf(errUnableToDeleteRelationships, err)
return false, fmt.Errorf(errUnableToDeleteRelationships, err)
}

modified, err := rwt.tx.Exec(ctx, sql, args...)
if err != nil {
return fmt.Errorf(errUnableToDeleteRelationships, err)
return false, fmt.Errorf(errUnableToDeleteRelationships, err)
}

rwt.relCountChange -= modified.RowsAffected()
if delLimit > 0 && uint64(modified.RowsAffected()) == delLimit {
return true, nil
}

return nil
return false, nil
}

func (rwt *crdbReadWriteTXN) WriteNamespaces(ctx context.Context, newConfigs ...*core.NamespaceDefinition) error {
Expand Down
34 changes: 25 additions & 9 deletions internal/datastore/memdb/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
"github.com/authzed/spicedb/pkg/tuple"
)
Expand Down Expand Up @@ -117,38 +118,53 @@ func (rwt *memdbReadWriteTx) toCaveatReference(mutation *core.RelationTupleUpdat
return cr
}

func (rwt *memdbReadWriteTx) DeleteRelationships(_ context.Context, filter *v1.RelationshipFilter) error {
func (rwt *memdbReadWriteTx) DeleteRelationships(_ context.Context, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) {
rwt.mustLock()
defer rwt.Unlock()

tx, err := rwt.txSource()
if err != nil {
return err
return false, err
}

delOpts := options.NewDeleteOptionsWithOptionsAndDefaults(opts...)
var delLimit uint64
if delOpts.DeleteLimit != nil && *delOpts.DeleteLimit > 0 {
delLimit = *delOpts.DeleteLimit
}

return rwt.deleteWithLock(tx, filter)
return rwt.deleteWithLock(tx, filter, delLimit)
}

// caller must already hold the concurrent access lock
func (rwt *memdbReadWriteTx) deleteWithLock(tx *memdb.Txn, filter *v1.RelationshipFilter) error {
func (rwt *memdbReadWriteTx) deleteWithLock(tx *memdb.Txn, filter *v1.RelationshipFilter, limit uint64) (bool, error) {
// Create an iterator to find the relevant tuples
bestIter, err := iteratorForFilter(tx, datastore.RelationshipsFilterFromPublicFilter(filter))
if err != nil {
return err
return false, err
}
filteredIter := memdb.NewFilterIterator(bestIter, relationshipFilterFilterFunc(filter))

// Collect the tuples into a slice of mutations for the changelog
var mutations []*core.RelationTupleUpdate
var counter uint64

metLimit := false
for row := filteredIter.Next(); row != nil; row = filteredIter.Next() {
rt, err := row.(*relationship).RelationTuple()
if err != nil {
return err
return false, err
}
mutations = append(mutations, tuple.Delete(rt))
counter++

if limit > 0 && counter == limit {
metLimit = true
break
}
}

return rwt.write(tx, mutations...)
return metLimit, rwt.write(tx, mutations...)
}

func (rwt *memdbReadWriteTx) WriteNamespaces(_ context.Context, newConfigs ...*core.NamespaceDefinition) error {
Expand Down Expand Up @@ -201,9 +217,9 @@ func (rwt *memdbReadWriteTx) DeleteNamespaces(_ context.Context, nsNames ...stri
}

// Delete the relationships from the namespace
if err := rwt.deleteWithLock(tx, &v1.RelationshipFilter{
if _, err := rwt.deleteWithLock(tx, &v1.RelationshipFilter{
ResourceType: nsName,
}); err != nil {
}, 0); err != nil {
return fmt.Errorf("unable to delete relationships from deleted namespace: %w", err)
}
}
Expand Down
32 changes: 27 additions & 5 deletions internal/datastore/mysql/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/authzed/spicedb/internal/datastore/common"
log "github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
"github.com/authzed/spicedb/pkg/spiceerrors"
"github.com/authzed/spicedb/pkg/tuple"
Expand Down Expand Up @@ -213,7 +214,7 @@ func (rwt *mysqlReadWriteTXN) WriteRelationships(ctx context.Context, mutations
return nil
}

func (rwt *mysqlReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter) error {
func (rwt *mysqlReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) {
// Add clauses for the ResourceFilter
query := rwt.DeleteTupleQuery.Where(sq.Eq{colNamespace: filter.ResourceType})
if filter.OptionalResourceId != "" {
Expand All @@ -236,16 +237,37 @@ func (rwt *mysqlReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v

query = query.Set(colDeletedTxn, rwt.newTxnID)

// Add the limit, if any.
delOpts := options.NewDeleteOptionsWithOptionsAndDefaults(opts...)
var delLimit uint64
if delOpts.DeleteLimit != nil && *delOpts.DeleteLimit > 0 {
delLimit = *delOpts.DeleteLimit
}

if delLimit > 0 {
query = query.Limit(delLimit)
}

querySQL, args, err := query.ToSql()
if err != nil {
return fmt.Errorf(errUnableToDeleteRelationships, err)
return false, fmt.Errorf(errUnableToDeleteRelationships, err)
}

if _, err := rwt.tx.ExecContext(ctx, querySQL, args...); err != nil {
return fmt.Errorf(errUnableToDeleteRelationships, err)
modified, err := rwt.tx.ExecContext(ctx, querySQL, args...)
if err != nil {
return false, fmt.Errorf(errUnableToDeleteRelationships, err)
}

return nil
rowsAffected, err := modified.RowsAffected()
if err != nil {
return false, fmt.Errorf(errUnableToDeleteRelationships, err)
}

if delLimit > 0 && uint64(rowsAffected) == delLimit {
return true, nil
}

return false, nil
}

func (rwt *mysqlReadWriteTXN) WriteNamespaces(ctx context.Context, newNamespaces ...*core.NamespaceDefinition) error {
Expand Down
80 changes: 76 additions & 4 deletions internal/datastore/postgres/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ import (
"errors"
"fmt"

"github.com/authzed/spicedb/pkg/datastore/options"
"github.com/authzed/spicedb/pkg/spiceerrors"

"github.com/authzed/spicedb/pkg/tuple"

sq "github.com/Masterminds/squirrel"
v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
"github.com/jackc/pgx/v5"
Expand All @@ -18,6 +17,7 @@ import (
pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common"
"github.com/authzed/spicedb/pkg/datastore"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
"github.com/authzed/spicedb/pkg/tuple"
)

const (
Expand Down Expand Up @@ -48,7 +48,15 @@ var (
colCaveatContext,
)

deleteTuple = psql.Update(tableTuple).Where(sq.Eq{colDeletedXid: liveDeletedTxnID})
deleteTuple = psql.Update(tableTuple).Where(sq.Eq{colDeletedXid: liveDeletedTxnID})
selectForDelete = psql.Select(
colNamespace,
colObjectID,
colRelation,
colUsersetNamespace,
colUsersetObjectID,
colUsersetRelation,
).From(tableTuple).Where(sq.Eq{colDeletedXid: liveDeletedTxnID})
)

type pgReadWriteTXN struct {
Expand Down Expand Up @@ -269,7 +277,71 @@ func (rwt *pgReadWriteTXN) WriteRelationships(ctx context.Context, mutations []*
return nil
}

func (rwt *pgReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter) error {
func (rwt *pgReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) {
delOpts := options.NewDeleteOptionsWithOptionsAndDefaults(opts...)
if delOpts.DeleteLimit != nil && *delOpts.DeleteLimit > 0 {
return rwt.deleteRelationshipsWithLimit(ctx, filter, *delOpts.DeleteLimit)
}

return false, rwt.deleteRelationships(ctx, filter)
}

func (rwt *pgReadWriteTXN) deleteRelationshipsWithLimit(ctx context.Context, filter *v1.RelationshipFilter, limit uint64) (bool, error) {
// Construct a select query for the relationships to be removed.
query := selectForDelete.Where(sq.Eq{colNamespace: filter.ResourceType})
if filter.OptionalResourceId != "" {
query = query.Where(sq.Eq{colObjectID: filter.OptionalResourceId})
}
if filter.OptionalRelation != "" {
query = query.Where(sq.Eq{colRelation: filter.OptionalRelation})
}

// Add clauses for the SubjectFilter
if subjectFilter := filter.OptionalSubjectFilter; subjectFilter != nil {
query = query.Where(sq.Eq{colUsersetNamespace: subjectFilter.SubjectType})
if subjectFilter.OptionalSubjectId != "" {
query = query.Where(sq.Eq{colUsersetObjectID: subjectFilter.OptionalSubjectId})
}
if relationFilter := subjectFilter.OptionalRelation; relationFilter != nil {
query = query.Where(sq.Eq{colUsersetRelation: stringz.DefaultEmpty(relationFilter.Relation, datastore.Ellipsis)})
}
}

query = query.Limit(limit)

selectSQL, args, err := query.ToSql()
if err != nil {
return false, fmt.Errorf(errUnableToDeleteRelationships, err)
}

args = append(args, rwt.newXID)
if len(args) != 3 {
return false, spiceerrors.MustBugf("expected 3 arguments, got %d", len(args))
}

// Construct a CTE to update the relationships as removed.
cteSQL := fmt.Sprintf(
"WITH found_tuples AS (%s)\nUPDATE %s SET %s = $3 WHERE (%s, %s, %s, %s, %s, %s) IN (select * from found_tuples)",
selectSQL,
tableTuple,
colDeletedXid,
colNamespace,
colObjectID,
colRelation,
colUsersetNamespace,
colUsersetObjectID,
colUsersetRelation,
)

result, err := rwt.tx.Exec(ctx, cteSQL, args...)
if err != nil {
return false, fmt.Errorf(errUnableToDeleteRelationships, err)
}

return result.RowsAffected() == int64(limit), nil
}

func (rwt *pgReadWriteTXN) deleteRelationships(ctx context.Context, filter *v1.RelationshipFilter) error {
// Add clauses for the ResourceFilter
query := deleteTuple.Where(sq.Eq{colNamespace: filter.ResourceType})
if filter.OptionalResourceId != "" {
Expand Down
2 changes: 1 addition & 1 deletion internal/datastore/proxy/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func (rwt *observableRWT) DeleteNamespaces(ctx context.Context, nsNames ...strin
return rwt.delegate.DeleteNamespaces(ctx, nsNames...)
}

func (rwt *observableRWT) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter) error {
func (rwt *observableRWT) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, options ...options.DeleteOptionsOption) (bool, error) {
ctx, closer := observe(ctx, "DeleteRelationships", trace.WithAttributes(
filterToAttributes(filter)...,
))
Expand Down
4 changes: 2 additions & 2 deletions internal/datastore/proxy/proxy_test/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,9 @@ func (dm *MockReadWriteTransaction) WriteRelationships(_ context.Context, mutati
return args.Error(0)
}

func (dm *MockReadWriteTransaction) DeleteRelationships(_ context.Context, filter *v1.RelationshipFilter) error {
func (dm *MockReadWriteTransaction) DeleteRelationships(_ context.Context, filter *v1.RelationshipFilter, options ...options.DeleteOptionsOption) (bool, error) {
args := dm.Called(filter)
return args.Error(0)
return false, args.Error(0)
}

func (dm *MockReadWriteTransaction) WriteNamespaces(_ context.Context, newConfigs ...*core.NamespaceDefinition) error {
Expand Down
9 changes: 9 additions & 0 deletions internal/datastore/spanner/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,15 @@ var queryTuples = sql.Select(
colCaveatContext,
).From(tableRelationship)

var queryTuplesForDelete = sql.Select(
colNamespace,
colObjectID,
colRelation,
colUsersetNamespace,
colUsersetObjectID,
colUsersetRelation,
).From(tableRelationship)

var schema = common.NewSchemaInformation(
colNamespace,
colObjectID,
Expand Down
Loading
Loading