Skip to content

[ENH]: soft delete databases, add FinishDatabaseDeletion gRPC method to hard delete databases #4627

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions go/pkg/sysdb/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package coordinator

import (
"context"
"time"

"github.com/chroma-core/chroma/go/pkg/common"
"github.com/chroma-core/chroma/go/pkg/proto/coordinatorpb"
Expand Down Expand Up @@ -255,3 +256,15 @@ func (s *Coordinator) BatchGetCollectionVersionFilePaths(ctx context.Context, re
func (s *Coordinator) BatchGetCollectionSoftDeleteStatus(ctx context.Context, req *coordinatorpb.BatchGetCollectionSoftDeleteStatusRequest) (*coordinatorpb.BatchGetCollectionSoftDeleteStatusResponse, error) {
return s.catalog.BatchGetCollectionSoftDeleteStatus(ctx, req.CollectionIds)
}

func (s *Coordinator) FinishDatabaseDeletion(ctx context.Context, req *coordinatorpb.FinishDatabaseDeletionRequest) (*coordinatorpb.FinishDatabaseDeletionResponse, error) {
numDeleted, err := s.catalog.FinishDatabaseDeletion(ctx, time.Unix(req.CutoffTime.Seconds, int64(req.CutoffTime.Nanos)))
if err != nil {
return nil, err
}

res := &coordinatorpb.FinishDatabaseDeletionResponse{
NumDeleted: numDeleted,
}
return res, nil
}
43 changes: 28 additions & 15 deletions go/pkg/sysdb/coordinator/table_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,27 +183,36 @@ func (tc *Catalog) DeleteDatabase(ctx context.Context, deleteDatabase *model.Del
if len(databases) == 0 {
return common.ErrDatabaseNotFound
}
err = tc.metaDomain.DatabaseDb(txCtx).Delete(databases[0].ID)
err = tc.metaDomain.DatabaseDb(txCtx).SoftDelete(databases[0].ID)
if err != nil {
return err
}

collections, err := tc.metaDomain.CollectionDb(txCtx).GetCollections(nil, nil, deleteDatabase.Tenant, deleteDatabase.Name, nil, nil)
if err != nil {
return err
}

for _, collection := range collections {
collectionID, err := types.Parse(collection.Collection.ID)
if err != nil {
return err
}

err = tc.softDeleteCollection(txCtx, &model.DeleteCollection{
ID: collectionID,
TenantID: deleteDatabase.Tenant,
DatabaseName: deleteDatabase.Name,
})
if err != nil {
return err
}
}

return nil
})
}

func (tc *Catalog) GetAllDatabases(ctx context.Context, ts types.Timestamp) ([]*model.Database, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method was never used

databases, err := tc.metaDomain.DatabaseDb(ctx).GetAllDatabases()
if err != nil {
log.Error("error getting all databases", zap.Error(err))
return nil, err
}
result := make([]*model.Database, 0, len(databases))
for _, database := range databases {
result = append(result, convertDatabaseToModel(database))
}
return result, nil
}

func (tc *Catalog) CreateTenant(ctx context.Context, createTenant *model.CreateTenant, ts types.Timestamp) (*model.Tenant, error) {
var result *model.Tenant

Expand Down Expand Up @@ -681,7 +690,7 @@ func (tc *Catalog) softDeleteCollection(ctx context.Context, deleteCollection *m

// Generate new name with timestamp and random number
oldName := *collections[0].Collection.Name
newName := fmt.Sprintf("_deleted_%s_%s", oldName, *types.FromUniqueID(deleteCollection.ID))
newName := fmt.Sprintf("_deleted_%s_%s", oldName, deleteCollection.ID.String())

dbCollection := &dbmodel.Collection{
ID: deleteCollection.ID.String(),
Expand Down Expand Up @@ -2211,3 +2220,7 @@ func (tc *Catalog) GetVersionFileNamesForCollection(ctx context.Context, tenantI

return collectionEntry.VersionFileName, nil
}

func (tc *Catalog) FinishDatabaseDeletion(ctx context.Context, cutoffTime time.Time) (uint64, error) {
return tc.metaDomain.DatabaseDb(ctx).FinishDatabaseDeletion(cutoffTime)
}
4 changes: 2 additions & 2 deletions go/pkg/sysdb/grpc/collection_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (s *Server) GetCollectionWithSegments(ctx context.Context, req *coordinator
collection, segments, err := s.coordinator.GetCollectionWithSegments(ctx, parsedCollectionID)
if err != nil {
log.Error("GetCollectionWithSegments failed. ", zap.Error(err), zap.String("collection_id", collectionID))
if err == common.ErrCollectionNotFound {
if err == common.ErrCollectionNotFound || err == common.ErrCollectionSoftDeleted {
return res, grpcutils.BuildNotFoundGrpcError(err.Error())
}
return res, grpcutils.BuildInternalGrpcError(err.Error())
Expand Down Expand Up @@ -404,7 +404,7 @@ func (s *Server) ForkCollection(ctx context.Context, req *coordinatorpb.ForkColl
collection, segments, err := s.coordinator.ForkCollection(ctx, forkCollection)
if err != nil {
log.Error("ForkCollection failed. ", zap.Error(err), zap.String("collection_id", sourceCollectionID))
if err == common.ErrCollectionNotFound {
if err == common.ErrCollectionNotFound || err == common.ErrCollectionSoftDeleted {
return res, grpcutils.BuildNotFoundGrpcError(err.Error())
}
if err == common.ErrCollectionLogPositionStale {
Expand Down
8 changes: 8 additions & 0 deletions go/pkg/sysdb/grpc/tenant_database_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,11 @@ func (s *Server) GetLastCompactionTimeForTenant(ctx context.Context, req *coordi
}
return res, nil
}

func (s *Server) FinishDatabaseDeletion(ctx context.Context, req *coordinatorpb.FinishDatabaseDeletionRequest) (*coordinatorpb.FinishDatabaseDeletionResponse, error) {
res, err := s.coordinator.FinishDatabaseDeletion(ctx, req)
if err != nil {
return nil, grpcutils.BuildInternalGrpcError(err.Error())
}
return res, nil
}
45 changes: 39 additions & 6 deletions go/pkg/sysdb/grpc/tenant_database_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dbcore"
"github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dbmodel"
s3metastore "github.com/chroma-core/chroma/go/pkg/sysdb/metastore/s3"
"github.com/chroma-core/chroma/go/pkg/types"
"github.com/google/uuid"
"github.com/pingcap/log"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -116,7 +117,7 @@ func (suite *TenantDatabaseServiceTestSuite) TestServer_DeleteDatabase() {
tenantName := "TestDeleteDatabase"
databaseName := "TestDeleteDatabase"
// Generate random uuid for db id
databaseeId := uuid.New().String()
databaseId := uuid.New().String()

_, err := suite.catalog.CreateTenant(context.Background(), &model.CreateTenant{
Name: tenantName,
Expand All @@ -127,29 +128,61 @@ func (suite *TenantDatabaseServiceTestSuite) TestServer_DeleteDatabase() {
_, err = suite.catalog.CreateDatabase(context.Background(), &model.CreateDatabase{
Tenant: tenantName,
Name: databaseName,
ID: databaseeId,
ID: databaseId,
Ts: time.Now().Unix(),
}, time.Now().Unix())
suite.NoError(err)

collectionID := types.NewUniqueID()
_, _, err = suite.catalog.CreateCollection(context.Background(), &model.CreateCollection{
ID: collectionID,
TenantID: tenantName,
DatabaseName: databaseName,
Name: "TestCollection",
}, time.Now().Unix())
suite.NoError(err)

timeBeforeSoftDelete := time.Now()

err = suite.catalog.DeleteDatabase(context.Background(), &model.DeleteDatabase{
Tenant: tenantName,
Name: databaseName,
})
suite.NoError(err)

// Check that associated collection was deleted
var count int64
// Check that associated collection was soft deleted
var collections []*dbmodel.Collection
suite.NoError(suite.db.Find(&collections).Count(&count).Error)
suite.Equal(int64(0), count)
suite.NoError(suite.db.Find(&collections).Error)
suite.Equal(1, len(collections))
suite.Equal(true, collections[0].IsDeleted)

// Database should not be eligible for hard deletion yet because it still has a (soft deleted) collection
numDeleted, err := suite.catalog.FinishDatabaseDeletion(context.Background(), time.Now())
suite.NoError(err)
suite.Equal(uint64(0), numDeleted)

// Hard delete associated collection
suite.NoError(err)
suite.NoError(suite.catalog.DeleteCollection(context.Background(), &model.DeleteCollection{
TenantID: tenantName,
DatabaseName: databaseName,
ID: collectionID,
}, false))

// Database should now be eligible for hard deletion, but first verify that database is not deleted if cutoff time is prior to soft delete
numDeleted, err = suite.catalog.FinishDatabaseDeletion(context.Background(), timeBeforeSoftDelete)
suite.NoError(err)
suite.Equal(uint64(0), numDeleted)

// Hard delete database
numDeleted, err = suite.catalog.FinishDatabaseDeletion(context.Background(), time.Now())
suite.NoError(err)
suite.Equal(uint64(1), numDeleted)

// Verify that database is hard deleted
var databases []*dbmodel.Database
suite.NoError(suite.db.Debug().Where("id = ?", databaseId).Find(&databases).Error)
suite.Equal(0, len(databases))
}

func TestTenantDatabaseServiceTestSuite(t *testing.T) {
Expand Down
62 changes: 44 additions & 18 deletions go/pkg/sysdb/metastore/db/dao/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dao

import (
"errors"
"time"

"github.com/chroma-core/chroma/go/pkg/common"
"github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dbmodel"
Expand All @@ -28,21 +29,12 @@ func (s *databaseDb) DeleteByTenantIdAndName(tenantId string, databaseName strin
return len(databases), err
}

func (s *databaseDb) GetAllDatabases() ([]*dbmodel.Database, error) {
var databases []*dbmodel.Database
query := s.db.Table("databases")

if err := query.Find(&databases).Error; err != nil {
return nil, err
}
return databases, nil
}

func (s *databaseDb) ListDatabases(limit *int32, offset *int32, tenantID string) ([]*dbmodel.Database, error) {
var databases []*dbmodel.Database
query := s.db.Table("databases").
Select("databases.id, databases.name, databases.tenant_id").
Where("databases.tenant_id = ?", tenantID).
Where("databases.is_deleted = ?", false).
Order("databases.created_at ASC")

if limit != nil {
Expand All @@ -65,7 +57,8 @@ func (s *databaseDb) GetDatabases(tenantID string, databaseName string) ([]*dbmo
query := s.db.Table("databases").
Select("databases.id, databases.name, databases.tenant_id").
Where("databases.name = ?", databaseName).
Where("databases.tenant_id = ?", tenantID)
Where("databases.tenant_id = ?", tenantID).
Where("databases.is_deleted = ?", false)

if err := query.Find(&databases).Error; err != nil {
log.Error("GetDatabases", zap.Error(err))
Expand Down Expand Up @@ -95,13 +88,13 @@ func (s *databaseDb) Insert(database *dbmodel.Database) error {
return err
}

func (s *databaseDb) Delete(databaseID string) error {
func (s *databaseDb) SoftDelete(databaseID string) error {
return s.db.Transaction(func(tx *gorm.DB) error {
if err := tx.Where("id = ?", databaseID).Delete(&dbmodel.Database{}).Error; err != nil {
return err
}

if err := tx.Where("database_id = ?", databaseID).Delete(&dbmodel.Collection{}).Error; err != nil {
if err := tx.Table("databases").
Where("id = ?", databaseID).
Update("is_deleted", true).
Update("updated_at", time.Now()).
Error; err != nil {
return err
}

Expand All @@ -113,11 +106,44 @@ func (s *databaseDb) GetDatabasesByTenantID(tenantID string) ([]*dbmodel.Databas
var databases []*dbmodel.Database
query := s.db.Table("databases").
Select("databases.id, databases.name, databases.tenant_id").
Where("databases.tenant_id = ?", tenantID)
Where("databases.tenant_id = ?", tenantID).
Where("databases.is_deleted = ?", false)

if err := query.Find(&databases).Error; err != nil {
log.Error("GetDatabasesByTenantID", zap.Error(err))
return nil, err
}
return databases, nil
}

func (s *databaseDb) FinishDatabaseDeletion(cutoffTime time.Time) (uint64, error) {
numDeleted := uint64(0)

for {
// Only hard delete databases that were soft deleted prior to the cutoff time and have no collections
databasesSubQuery := s.db.
Table("databases d").
Select("d.id").
Joins("LEFT JOIN collections c ON c.database_id = d.id").
Where("d.is_deleted = ?", true).
Where("d.updated_at < ?", cutoffTime).
Group("d.id").
Having("COUNT(c.id) = 0").
Limit(1000)

res := s.db.Table("databases").
Where("id IN (?)", databasesSubQuery).
Delete(&dbmodel.Database{})
if res.Error != nil {
return numDeleted, res.Error
}

numDeleted += uint64(res.RowsAffected)

if res.RowsAffected == 0 {
break
}
}

return numDeleted, nil
}
4 changes: 2 additions & 2 deletions go/pkg/sysdb/metastore/db/dbmodel/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ func (v Database) TableName() string {

//go:generate mockery --name=IDatabaseDb
type IDatabaseDb interface {
GetAllDatabases() ([]*Database, error)
GetDatabases(tenantID string, databaseName string) ([]*Database, error)
ListDatabases(limit *int32, offset *int32, tenantID string) ([]*Database, error)
Insert(in *Database) error
DeleteAll() error
Delete(databaseID string) error
SoftDelete(databaseID string) error
FinishDatabaseDeletion(cutoffTime time.Time) (uint64, error)
}
9 changes: 9 additions & 0 deletions idl/chromadb/proto/coordinator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ message DeleteDatabaseRequest {

message DeleteDatabaseResponse {}

message FinishDatabaseDeletionRequest {
google.protobuf.Timestamp cutoff_time = 1;
}

message FinishDatabaseDeletionResponse {
uint64 num_deleted = 1;
}

message CreateTenantRequest {
string name = 2; // Names are globally unique
}
Expand Down Expand Up @@ -496,6 +504,7 @@ service SysDB {
rpc GetDatabase(GetDatabaseRequest) returns (GetDatabaseResponse) {}
rpc ListDatabases(ListDatabasesRequest) returns (ListDatabasesResponse) {}
rpc DeleteDatabase(DeleteDatabaseRequest) returns (DeleteDatabaseResponse) {}
rpc FinishDatabaseDeletion(FinishDatabaseDeletionRequest) returns (FinishDatabaseDeletionResponse) {}
rpc CreateTenant(CreateTenantRequest) returns (CreateTenantResponse) {}
rpc GetTenant(GetTenantRequest) returns (GetTenantResponse) {}
rpc CreateSegment(CreateSegmentRequest) returns (CreateSegmentResponse) {}
Expand Down
Loading
Loading