diff --git a/src/storage/database/adapter.ts b/src/storage/database/adapter.ts index 251ae8835..d8c232303 100644 --- a/src/storage/database/adapter.ts +++ b/src/storage/database/adapter.ts @@ -54,6 +54,41 @@ export interface ListBucketOptions { search?: string } +type ObjectWriteData = Pick & { + bucket_id: string + name: string + version: string +} + +type ObjectMoveData = Pick & { version: string } + +type DeletedObject = Pick + +type LockedDeletedObject = Pick + +type UploadObject = Pick< + Obj, + | 'id' + | 'name' + | 'bucket_id' + | 'owner' + | 'owner_id' + | 'metadata' + | 'user_metadata' + | 'version' + | 'created_at' + | 'updated_at' + | 'last_accessed_at' +> + +type PreviousUploadObject = Pick + +type ReplacedObject = Pick + +type MovedObject = Pick + +type SourceMoveObject = Pick + export interface Database { tenantHost: string tenantId: string @@ -66,12 +101,61 @@ export interface Database { asSuperUser(): Database - withTransaction( - fn: (db: Database) => Promise, + testObjectPermission(data: ObjectWriteData, isUpsert?: boolean): Promise + + testMoveObjectPermission( + sourceBucketId: string, + sourceObjectName: string, + data: Pick + ): Promise + + testDeleteObjectPermission(bucketId: string, objectName: string): Promise + + createAnalyticsBucketTransaction(data: Pick): Promise + + deleteEmptyBucket(bucketId: string): Promise + + deleteObjectWithLock(bucketId: string, objectName: string): Promise + + deleteObjectsTransaction( + bucketId: string, + objectNames: string[], + by: keyof Obj + ): Promise + + completeUploadTransaction( + data: ObjectWriteData + ): Promise<{ obj: UploadObject; isNew: boolean; previousObject?: PreviousUploadObject }> + + upsertCopyDestination( + data: ObjectWriteData + ): Promise<{ destinationObject: UploadObject; replacedObject?: ReplacedObject }> + + moveObjectDestination( + sourceBucketId: string, + sourceObjectName: string, + destinationBucketId: string, + destinationObjectName: string, + data: ObjectMoveData + ): Promise<{ + destObject: MovedObject + sourceObject: SourceMoveObject + }> + + acquireObjectLockForTransaction( + bucketId: string, + objectName: string, + version: string | undefined, transactionOptions?: TransactionOptions - ): Promise + ): Promise - testPermission(fn: (db: Database) => T | Promise): Promise> + adjustMultipartUploadProgress(uploadId: string, diff: number): Promise + + prepareMultipartUploadPart( + uploadId: string, + contentLength: number, + maxFileSize: number + ): Promise createBucket( data: Pick< @@ -194,7 +278,6 @@ export interface Database { bucketId: string, objectName: string, version: string, - signature: string, owner?: string, userMetadata?: Record, metadata?: Partial diff --git a/src/storage/database/knex.test.ts b/src/storage/database/knex.test.ts index 04cd3ada3..dfdbfed57 100644 --- a/src/storage/database/knex.test.ts +++ b/src/storage/database/knex.test.ts @@ -34,8 +34,9 @@ function createStorageKnexTestHarness() { describe('StorageKnexDB.testPermission', () => { it('returns the callback result after rolling back the transaction', async () => { const { db, connection, transaction } = createStorageKnexTestHarness() + const testPermission = db['testPermission'].bind(db) - const result = await db.testPermission(async (txDb) => { + const result = await testPermission(async (txDb) => { expect(txDb).toBeInstanceOf(StorageKnexDB) expect(txDb).not.toBe(db) return 'allowed' @@ -52,8 +53,10 @@ describe('StorageKnexDB.testPermission', () => { const { db, transaction } = createStorageKnexTestHarness() const error = new Error('permission denied') + const testPermission = db['testPermission'].bind(db) + await expect( - db.testPermission(async () => { + testPermission(async () => { throw error }) ).rejects.toBe(error) diff --git a/src/storage/database/knex.ts b/src/storage/database/knex.ts index 697355759..91cb438a1 100644 --- a/src/storage/database/knex.ts +++ b/src/storage/database/knex.ts @@ -1,3 +1,4 @@ +import { decrypt, encrypt } from '@internal/auth' import { TenantConnection } from '@internal/database' import { DBMigration, tenantHasMigrations } from '@internal/database/migrations' import { @@ -64,10 +65,10 @@ export class StorageKnexDB implements Database { this.latestMigration = options.latestMigration } - async withTransaction( - fn: (db: Database) => Promise, + private async withTransaction Promise>( + fn: T, opts?: TransactionOptions - ): Promise { + ): Promise>> { const tnx = await this.connection.transactionProvider(this.options.tnx, opts)() try { @@ -80,7 +81,7 @@ export class StorageKnexDB implements Database { const opts = { ...this.options, tnx } const storageWithTnx = new StorageKnexDB(this.connection, opts) - const result = await fn(storageWithTnx) + const result = (await fn(storageWithTnx)) as Awaited> await tnx.commit() return result } catch (e) { @@ -104,7 +105,7 @@ export class StorageKnexDB implements Database { }) } - async testPermission(fn: (db: Database) => T | Promise): Promise> { + private async testPermission(fn: (db: Database) => T | Promise): Promise> { return this.withTransaction(async (db) => { const result = await fn(db) throw new TestPermissionRollbackError(result) @@ -117,6 +118,215 @@ export class StorageKnexDB implements Database { }) } + async testObjectPermission( + data: Pick & { + bucket_id: string + name: string + version: string + }, + isUpsert?: boolean + ) { + await this.testPermission((db) => { + return isUpsert ? db.upsertObject(data) : db.createObject(data) + }) + } + + async testMoveObjectPermission( + sourceBucketId: string, + sourceObjectName: string, + data: Pick + ) { + await this.testPermission((db) => { + return Promise.all([ + db.findObject(sourceBucketId, sourceObjectName, 'id'), + db.updateObject(sourceBucketId, sourceObjectName, data), + ]) + }) + } + + async testDeleteObjectPermission(bucketId: string, objectName: string) { + await this.testPermission(async (db) => { + const deleted = await db.deleteObject(bucketId, objectName) + + if (!deleted) { + throw ERRORS.NoSuchKey(objectName) + } + }) + } + + createAnalyticsBucketTransaction(data: Pick) { + return this.withTransaction(async (db) => { + return db.createAnalyticsBucket(data) + }) + } + + deleteEmptyBucket(bucketId: string) { + return this.withTransaction(async (db) => { + await db.asSuperUser().findBucketById(bucketId, 'id', { forUpdate: true }) + const countObjects = await db.asSuperUser().countObjectsInBucket(bucketId, 1) + + if (countObjects && countObjects > 0) { + throw ERRORS.BucketNotEmpty(bucketId) + } + + const deleted = await db.deleteBucket(bucketId) + + if (!deleted) { + throw ERRORS.NoSuchBucket(bucketId) + } + + return deleted + }) + } + + deleteObjectWithLock(bucketId: string, objectName: string) { + return this.withTransaction(async (db) => { + const obj = await db.asSuperUser().findObject(bucketId, objectName, 'id,version,metadata', { + forUpdate: true, + }) + + const deleted = await db.deleteObject(bucketId, objectName) + + if (!deleted) { + throw ERRORS.AccessDenied('Access denied') + } + + return obj + }) + } + + deleteObjectsTransaction(bucketId: string, objectNames: string[], by: keyof Obj) { + return this.withTransaction(async (db) => { + return db.deleteObjects(bucketId, objectNames, by) + }) + } + + completeUploadTransaction( + data: Pick & { + bucket_id: string + name: string + version: string + } + ) { + return this.withTransaction(async (db) => { + await db.waitObjectLock(data.bucket_id, data.name, undefined, { timeout: 5000 }) + const currentObj = await db.findObject(data.bucket_id, data.name, 'id, version, metadata', { + forUpdate: true, + dontErrorOnEmpty: true, + }) + const isNew = !Boolean(currentObj) + const newObject = await db.upsertObject(data) + return { obj: newObject, isNew, previousObject: currentObj } + }) + } + + upsertCopyDestination( + data: Pick & { + bucket_id: string + name: string + version: string + } + ) { + return this.withTransaction(async (db) => { + await db.waitObjectLock(data.bucket_id, data.name, undefined, { timeout: 3000 }) + const existingDestObject = await db.findObject( + data.bucket_id, + data.name, + 'id,name,metadata,version,bucket_id', + { dontErrorOnEmpty: true, forUpdate: true } + ) + const destinationObject = await db.upsertObject(data) + return { destinationObject, replacedObject: existingDestObject } + }) + } + + moveObjectDestination( + sourceBucketId: string, + sourceObjectName: string, + destinationBucketId: string, + destinationObjectName: string, + data: Pick & { version: string } + ) { + return this.withTransaction(async (db) => { + await db.waitObjectLock(sourceBucketId, destinationObjectName, undefined, { timeout: 5000 }) + const sourceObject = await db.findObject( + sourceBucketId, + sourceObjectName, + 'id,version,metadata,user_metadata', + { forUpdate: true, dontErrorOnEmpty: false } + ) + await db.updateObject(sourceBucketId, sourceObjectName, { + name: destinationObjectName, + bucket_id: destinationBucketId, + version: data.version, + owner: data.owner, + metadata: data.metadata, + user_metadata: data.user_metadata, + }) + return { + destObject: { + id: sourceObject.id, + name: destinationObjectName, + bucket_id: destinationBucketId, + version: data.version, + owner: data.owner, + metadata: data.metadata, + }, + sourceObject, + } + }) + } + + acquireObjectLockForTransaction( + bucketId: string, + objectName: string, + version: string | undefined, + transactionOptions?: TransactionOptions + ) { + return this.withTransaction(async (db) => { + await db.mustLockObject(bucketId, objectName, version) + }, transactionOptions) + } + + adjustMultipartUploadProgress(uploadId: string, diff: number) { + return this.withTransaction(async (db) => { + const multipart = await db.findMultipartUpload(uploadId, 'in_progress_size', { + forUpdate: true, + }) + const progress = multipart.in_progress_size + diff + await db.updateMultipartUploadProgress( + uploadId, + progress, + multipartUploadProgressSignature(progress) + ) + }) + } + + prepareMultipartUploadPart(uploadId: string, contentLength: number, maxFileSize: number) { + return this.withTransaction(async (db) => { + const multipart = await db.findMultipartUpload( + uploadId, + 'in_progress_size,version,upload_signature,user_metadata,metadata', + { forUpdate: true } + ) + + if (multipartUploadProgress(multipart.upload_signature) !== multipart.in_progress_size) { + throw ERRORS.InvalidUploadSignature() + } + + const currentProgress = multipart.in_progress_size + contentLength + if (currentProgress > maxFileSize) { + throw ERRORS.EntityTooLarge() + } + await db.updateMultipartUploadProgress( + uploadId, + currentProgress, + multipartUploadProgressSignature(currentProgress) + ) + return multipart + }) + } + deleteAnalyticsBucket(id: string, opts?: { soft: boolean }): Promise { return this.runQuery('DeleteAnalyticsBucket', async (knex, signal) => { if (opts?.soft) { @@ -918,7 +1128,6 @@ export class StorageKnexDB implements Database { bucketId: string, objectName: string, version: string, - signature: string, owner?: string, userMetadata?: Record, metadata?: Partial @@ -929,7 +1138,7 @@ export class StorageKnexDB implements Database { bucket_id: bucketId, key: objectName, version, - upload_signature: signature, + upload_signature: multipartUploadProgressSignature(0), owner_id: owner, user_metadata: userMetadata, } @@ -1135,6 +1344,16 @@ export class StorageKnexDB implements Database { } } +function multipartUploadProgressSignature(progress: number) { + return `${encrypt('progress:' + progress.toString())}` +} + +function multipartUploadProgress(signature: string) { + const originalSignature = decrypt(signature) + const [, value] = originalSignature.split(':') + return parseInt(value, 10) +} + export class DBError extends StorageBackendError implements RenderableError { constructor(options: StorageErrorOptions) { super(options) diff --git a/src/storage/events/objects/object-admin-delete-all-before.ts b/src/storage/events/objects/object-admin-delete-all-before.ts index c8c941cd8..9d9574463 100644 --- a/src/storage/events/objects/object-admin-delete-all-before.ts +++ b/src/storage/events/objects/object-admin-delete-all-before.ts @@ -72,39 +72,37 @@ export class ObjectAdminDeleteAllBefore extends BaseEvent { - const deleted = await trx.deleteObjects( - bucketId, - objects.map(({ id }) => id!), - 'id' - ) + const deleted = await storage.db.deleteObjectsTransaction( + bucketId, + objects.map(({ id }) => id!), + 'id' + ) + + if (deleted.length > 0) { + const prefixes: string[] = [] + + for (const { name, version } of deleted) { + const fileName = withOptionalVersion(`${tenantId}/${bucketId}/${name}`, version) + prefixes.push(fileName) + prefixes.push(fileName + '.info') + } - if (deleted && deleted.length > 0) { - const prefixes: string[] = [] - - for (const { name, version } of deleted) { - const fileName = withOptionalVersion(`${tenantId}/${bucketId}/${name}`, version) - prefixes.push(fileName) - prefixes.push(fileName + '.info') - } - - await backend.deleteObjects(storageS3Bucket, prefixes) - - await Promise.allSettled( - deleted.map((object) => - ObjectRemoved.sendWebhook({ - tenant: job.data.tenant, - name: object.name, - bucketId, - reqId: job.data.reqId, - sbReqId: job.data.sbReqId, - version: object.version, - metadata: object.metadata, - }) - ) + await backend.deleteObjects(storageS3Bucket, prefixes) + + await Promise.allSettled( + deleted.map((object) => + ObjectRemoved.sendWebhook({ + tenant: job.data.tenant, + name: object.name, + bucketId, + reqId: job.data.reqId, + sbReqId: job.data.sbReqId, + version: object.version, + metadata: object.metadata, + }) ) - } - }) + ) + } } if (!moreObjectsToDelete) { diff --git a/src/storage/object-delete.test.ts b/src/storage/object-delete.test.ts index b0acec7c5..7c74912df 100644 --- a/src/storage/object-delete.test.ts +++ b/src/storage/object-delete.test.ts @@ -6,31 +6,18 @@ import { StorageObjectLocator } from './locator' import { ObjectStorage } from './object' function createObjectStorage({ - findObject = vi.fn().mockResolvedValue({ - id: 'object-id', - version: 'version-1', - }), - deleteObject = vi.fn().mockResolvedValue({ - name: 'private/file.txt', + deleteObjectWithLock = vi.fn().mockResolvedValue({ version: 'version-1', }), }: { - findObject?: ReturnType - deleteObject?: ReturnType + deleteObjectWithLock?: ReturnType } = {}) { const backend = { deleteObject: vi.fn(), } as unknown as StorageBackendAdapter - const superUserDb = { - findObject, - } - const scopedDb = { - asSuperUser: vi.fn(() => superUserDb), - deleteObject, - } const db = { tenantId: 'tenant-id', - withTransaction: vi.fn((fn) => fn(scopedDb)), + deleteObjectWithLock, } as unknown as Database const location = { getRootLocation: vi.fn(() => 'root-bucket'), @@ -40,8 +27,7 @@ function createObjectStorage({ return { backend, - deleteObject, - findObject, + deleteObjectWithLock, location, storage, } @@ -49,8 +35,8 @@ function createObjectStorage({ describe('ObjectStorage.deleteObject', () => { it('throws AccessDenied when the object exists but scoped delete is blocked by RLS', async () => { - const { backend, deleteObject, findObject, storage } = createObjectStorage({ - deleteObject: vi.fn().mockResolvedValue(undefined), + const { backend, deleteObjectWithLock, storage } = createObjectStorage({ + deleteObjectWithLock: vi.fn().mockRejectedValue(ERRORS.AccessDenied('Access denied')), }) await expect(storage.deleteObject('private/file.txt')).rejects.toMatchObject({ @@ -59,16 +45,13 @@ describe('ObjectStorage.deleteObject', () => { message: 'Access denied', }) - expect(findObject).toHaveBeenCalledWith('bucket', 'private/file.txt', 'id,version', { - forUpdate: true, - }) - expect(deleteObject).toHaveBeenCalledWith('bucket', 'private/file.txt') + expect(deleteObjectWithLock).toHaveBeenCalledWith('bucket', 'private/file.txt') expect(backend.deleteObject).not.toHaveBeenCalled() }) it('keeps true missing objects as NoSuchKey before attempting scoped delete', async () => { - const { backend, deleteObject, storage } = createObjectStorage({ - findObject: vi.fn().mockRejectedValue(ERRORS.NoSuchKey('missing.txt')), + const { backend, deleteObjectWithLock, storage } = createObjectStorage({ + deleteObjectWithLock: vi.fn().mockRejectedValue(ERRORS.NoSuchKey('missing.txt')), }) await expect(storage.deleteObject('missing.txt')).rejects.toMatchObject({ @@ -76,7 +59,7 @@ describe('ObjectStorage.deleteObject', () => { httpStatusCode: 404, }) - expect(deleteObject).not.toHaveBeenCalled() + expect(deleteObjectWithLock).toHaveBeenCalledWith('bucket', 'missing.txt') expect(backend.deleteObject).not.toHaveBeenCalled() }) }) diff --git a/src/storage/object.ts b/src/storage/object.ts index d06183d84..f1b69a456 100644 --- a/src/storage/object.ts +++ b/src/storage/object.ts @@ -126,29 +126,17 @@ export class ObjectStorage { * @param objectName */ async deleteObject(objectName: string) { - const obj = await this.db.withTransaction(async (db) => { - const obj = await db.asSuperUser().findObject(this.bucketId, objectName, 'id,version', { - forUpdate: true, - }) - - const deleted = await db.deleteObject(this.bucketId, objectName) - - if (!deleted) { - throw ERRORS.AccessDenied('Access denied') - } - - await this.backend.deleteObject( - this.location.getRootLocation(), - this.location.getKeyLocation({ - tenantId: this.db.tenantId, - bucketId: this.bucketId, - objectName, - }), - obj.version - ) - - return obj - }) + const obj = await this.db.deleteObjectWithLock(this.bucketId, objectName) + + await this.backend.deleteObject( + this.location.getRootLocation(), + this.location.getKeyLocation({ + tenantId: this.db.tenantId, + bucketId: this.bucketId, + objectName, + }), + obj.version + ) await ObjectRemoved.sendWebhook({ tenant: this.db.tenant(), @@ -179,54 +167,52 @@ export class ObjectStorage { urlParamLength += encodeURIComponent(prefix).length + 9 // length of '%22%2C%22' } - await this.db.withTransaction(async (db) => { - const data = await db.deleteObjects(this.bucketId, prefixesSubset, 'name') + const data = await this.db.deleteObjectsTransaction(this.bucketId, prefixesSubset, 'name') - if (data.length > 0) { - results = results.concat(data) + if (data.length > 0) { + // if successfully deleted, delete from s3 too + // todo: consider moving this to a queue + const prefixesToDelete = data.reduce((all, { name, version }) => { + all.push( + this.location.getKeyLocation({ + tenantId: this.db.tenantId, + bucketId: this.bucketId, + objectName: name, + version, + }) + ) - // if successfully deleted, delete from s3 too - // todo: consider moving this to a queue - const prefixesToDelete = data.reduce((all, { name, version }) => { + if (version) { all.push( this.location.getKeyLocation({ - tenantId: db.tenantId, + tenantId: this.db.tenantId, bucketId: this.bucketId, objectName: name, version, - }) + }) + '.info' ) + } + return all + }, [] as string[]) - if (version) { - all.push( - this.location.getKeyLocation({ - tenantId: db.tenantId, - bucketId: this.bucketId, - objectName: name, - version, - }) + '.info' - ) - } - return all - }, [] as string[]) - - await this.backend.deleteObjects(this.location.getRootLocation(), prefixesToDelete) - - await Promise.allSettled( - data.map((object) => - ObjectRemoved.sendWebhook({ - tenant: db.tenant(), - name: object.name, - bucketId: this.bucketId, - reqId: this.db.reqId, - sbReqId: this.db.sbReqId, - version: object.version, - metadata: object.metadata, - }) - ) + await this.backend.deleteObjects(this.location.getRootLocation(), prefixesToDelete) + + await Promise.allSettled( + data.map((object) => + ObjectRemoved.sendWebhook({ + tenant: this.db.tenant(), + name: object.name, + bucketId: this.bucketId, + reqId: this.db.reqId, + sbReqId: this.db.sbReqId, + version: object.version, + metadata: object.metadata, + }) ) - } - }) + ) + } + + results = results.concat(data) } return results @@ -366,22 +352,9 @@ export class ObjectStorage { newVersion ) - const destinationObject = await this.db.asSuperUser().withTransaction(async (db) => { - await db.waitObjectLock(destinationBucket, destinationKey, undefined, { - timeout: 3000, - }) - - const existingDestObject = await db.findObject( - destinationBucket, - destinationKey, - 'id,name,metadata,version,bucket_id', - { - dontErrorOnEmpty: true, - forUpdate: true, - } - ) - - const destinationObject = await db.upsertObject({ + const { destinationObject, replacedObject } = await this.db + .asSuperUser() + .upsertCopyDestination({ ...originObject, bucket_id: destinationBucket, name: destinationKey, @@ -395,19 +368,16 @@ export class ObjectStorage { version: newVersion, }) - if (existingDestObject) { - await ObjectAdminDelete.send({ - name: existingDestObject.name, - bucketId: existingDestObject.bucket_id ?? destinationBucket, - tenant: this.db.tenant(), - version: existingDestObject.version, - reqId: this.db.reqId, - sbReqId: this.db.sbReqId, - }) - } - - return destinationObject - }) + if (replacedObject) { + await ObjectAdminDelete.send({ + name: replacedObject.name, + bucketId: replacedObject.bucket_id ?? destinationBucket, + tenant: this.db.tenant(), + version: replacedObject.version, + reqId: this.db.reqId, + sbReqId: this.db.sbReqId, + }) + } await ObjectCreatedCopyEvent.sendWebhook({ tenant: this.db.tenant(), @@ -468,16 +438,11 @@ export class ObjectStorage { objectName: destinationObjectName, }) - await this.db.testPermission((db) => { - return Promise.all([ - db.findObject(this.bucketId, sourceObjectName, 'id'), - db.updateObject(this.bucketId, sourceObjectName, { - name: destinationObjectName, - version: newVersion, - bucket_id: destinationBucket, - owner, - }), - ]) + await this.db.testMoveObjectPermission(this.bucketId, sourceObjectName, { + name: destinationObjectName, + version: newVersion, + bucket_id: destinationBucket, + owner, }) const sourceObj = await this.db @@ -505,78 +470,59 @@ export class ObjectStorage { newVersion ) - return this.db.asSuperUser().withTransaction(async (db) => { - await db.waitObjectLock(this.bucketId, destinationObjectName, undefined, { - timeout: 5000, - }) - - const sourceObject = await db.findObject( + const { destObject, sourceObject } = await this.db + .asSuperUser() + .moveObjectDestination( this.bucketId, sourceObjectName, - 'id,version,metadata,user_metadata', + destinationBucket, + destinationObjectName, { - forUpdate: true, - dontErrorOnEmpty: false, + version: newVersion, + owner, + metadata, + user_metadata: sourceObj.user_metadata, } ) - await db.updateObject(this.bucketId, sourceObjectName, { - name: destinationObjectName, - bucket_id: destinationBucket, - version: newVersion, - owner, - metadata, - user_metadata: sourceObj.user_metadata, - }) + await ObjectAdminDelete.send({ + name: sourceObjectName, + bucketId: this.bucketId, + tenant: this.db.tenant(), + version: sourceObj.version, + reqId: this.db.reqId, + sbReqId: this.db.sbReqId, + }) - await ObjectAdminDelete.send({ + await Promise.allSettled([ + ObjectRemovedMove.sendWebhook({ + tenant: this.db.tenant(), name: sourceObjectName, bucketId: this.bucketId, - tenant: this.db.tenant(), - version: sourceObj.version, reqId: this.db.reqId, sbReqId: this.db.sbReqId, - }) - - await Promise.allSettled([ - ObjectRemovedMove.sendWebhook({ - tenant: this.db.tenant(), + version: sourceObject.version, + metadata: sourceObject.metadata, + }), + ObjectCreatedMove.sendWebhook({ + tenant: this.db.tenant(), + name: destinationObjectName, + version: newVersion, + bucketId: destinationBucket, + metadata, + uploadType, + oldObject: { name: sourceObjectName, bucketId: this.bucketId, reqId: this.db.reqId, - sbReqId: this.db.sbReqId, version: sourceObject.version, - metadata: sourceObject.metadata, - }), - ObjectCreatedMove.sendWebhook({ - tenant: this.db.tenant(), - name: destinationObjectName, - version: newVersion, - bucketId: destinationBucket, - metadata, - uploadType, - oldObject: { - name: sourceObjectName, - bucketId: this.bucketId, - reqId: this.db.reqId, - version: sourceObject.version, - }, - reqId: this.db.reqId, - sbReqId: this.db.sbReqId, - }), - ]) - - return { - destObject: { - id: sourceObject.id, - name: destinationObjectName, - bucket_id: destinationBucket, - version: newVersion, - owner, - metadata, }, - } - }) + reqId: this.db.reqId, + sbReqId: this.db.sbReqId, + }), + ]) + + return { destObject } } catch (e) { await ObjectAdminDelete.send({ name: destinationObjectName, diff --git a/src/storage/protocols/s3/s3-handler.ts b/src/storage/protocols/s3/s3-handler.ts index bf1848d27..ce3378cd4 100644 --- a/src/storage/protocols/s3/s3-handler.ts +++ b/src/storage/protocols/s3/s3-handler.ts @@ -17,7 +17,6 @@ import { UploadPartCommandInput, UploadPartCopyCommandInput, } from '@aws-sdk/client-s3' -import { decrypt, encrypt } from '@internal/auth' import { ERRORS, ErrorCode, isStorageError } from '@internal/errors' import { logger, logSchema } from '@internal/monitoring' import { PassThrough, Readable } from 'stream' @@ -445,19 +444,11 @@ export class S3ProtocolHandler { throw ERRORS.InvalidUploadId(uploadId) } - const signature = this.uploadSignature({ in_progress_size: 0 }) await this.storage.db .asSuperUser() - .createMultipartUpload( - uploadId, - Bucket, - Key, - version, - signature, - this.owner, - command.Metadata, - { mimetype: command.ContentType } - ) + .createMultipartUpload(uploadId, Bucket, Key, version, this.owner, command.Metadata, { + mimetype: command.ContentType, + }) return { responseBody: { @@ -671,15 +662,7 @@ export class S3ProtocolHandler { } } catch (e) { try { - await this.storage.db.asSuperUser().withTransaction(async (db) => { - const multipart = await db.findMultipartUpload(UploadId, 'in_progress_size', { - forUpdate: true, - }) - - const diff = multipart.in_progress_size - ContentLength - const signature = this.uploadSignature({ in_progress_size: diff }) - await db.updateMultipartUploadProgress(UploadId, diff, signature) - }) + await this.storage.db.asSuperUser().adjustMultipartUploadProgress(UploadId, -ContentLength) } catch (e) { logSchema.error(logger, 'Failed to update multipart upload progress', { type: 's3', @@ -1283,12 +1266,10 @@ export class S3ProtocolHandler { const uploader = new Uploader(this.storage.backend, this.storage.db, this.storage.location) - const [destinationBucket] = await this.storage.db.asSuperUser().withTransaction(async (db) => { - return Promise.all([ - db.findBucketById(Bucket, 'file_size_limit'), - db.findBucketById(sourceBucketName, 'id'), - ]) - }) + const [destinationBucket] = await Promise.all([ + this.storage.db.asSuperUser().findBucketById(Bucket, 'file_size_limit'), + this.storage.db.asSuperUser().findBucketById(sourceBucketName, 'id'), + ]) const maxFileSize = await getFileSizeLimit( this.storage.db.tenantId, destinationBucket?.file_size_limit @@ -1374,49 +1355,14 @@ export class S3ProtocolHandler { return metadata } - protected uploadSignature({ in_progress_size }: { in_progress_size: number }) { - return `${encrypt('progress:' + in_progress_size.toString())}` - } - - protected decryptUploadSignature(signature: string) { - const originalSignature = decrypt(signature) - const [, value] = originalSignature.split(':') - - return { - progress: parseInt(value, 10), - } - } - protected async shouldAllowPartUpload( uploadId: string, contentLength: number, maxFileSize: number ) { - return this.storage.db.asSuperUser().withTransaction(async (db) => { - const multipart = await db.findMultipartUpload( - uploadId, - 'in_progress_size,version,upload_signature,user_metadata,metadata', - { - forUpdate: true, - } - ) - - const { progress } = this.decryptUploadSignature(multipart.upload_signature) - - if (progress !== multipart.in_progress_size) { - throw ERRORS.InvalidUploadSignature() - } - - const currentProgress = multipart.in_progress_size + contentLength - - if (currentProgress > maxFileSize) { - throw ERRORS.EntityTooLarge() - } - - const signature = this.uploadSignature({ in_progress_size: currentProgress }) - await db.updateMultipartUploadProgress(uploadId, currentProgress, signature) - return multipart - }) + return this.storage.db + .asSuperUser() + .prepareMultipartUploadPart(uploadId, contentLength, maxFileSize) } } diff --git a/src/storage/protocols/tus/postgres-locker.ts b/src/storage/protocols/tus/postgres-locker.ts index 3ffa5d04c..7862a30ca 100644 --- a/src/storage/protocols/tus/postgres-locker.ts +++ b/src/storage/protocols/tus/postgres-locker.ts @@ -73,42 +73,7 @@ export class PgLock implements Lock { async lock(stopSignal: AbortSignal, cancelReq: RequestRelease): Promise { await new Promise((resolve, reject) => { - this.db - .withTransaction( - async (db) => { - const abortController = new AbortController() - let onAbort: (() => void) | undefined - - try { - onAbort = () => { - abortController.abort() - } - stopSignal.addEventListener('abort', onAbort) - - const acquired = await Promise.race([ - this.waitTimeout(5000, abortController.signal), - this.acquireLock(db, this.id, abortController.signal), - ]) - - if (!acquired) { - throw ERRORS.LockTimeout() - } - - this.isLocked = true - - await new Promise((innerResolve) => { - this.tnxResolver = innerResolve - resolve() - }) - } finally { - abortController.abort() - } - }, - { - timeout: 5 * 60 * 1000, // 5 minutes - } - ) - .catch(reject) + this.holdLockTransaction(stopSignal, resolve).catch(reject) }) this.notifier.onRelease(this.id, () => cancelReq()) @@ -129,31 +94,57 @@ export class PgLock implements Lock { } } - protected async acquireLock(db: Database, id: string, signal: AbortSignal) { - const uploadId = UploadId.fromString(id) - - while (!signal.aborted) { - try { - await db.mustLockObject(uploadId.bucket, uploadId.objectName, uploadId.version) - return true - } catch (e) { - if (e instanceof StorageBackendError && e.code === ErrorCode.ResourceLocked) { - await this.notifier.release(id) - await new Promise((resolve) => { - const timeoutId = setTimeout(resolve, 500) - const cleanup = () => { - clearTimeout(timeoutId) - signal.removeEventListener('abort', cleanup) + protected async holdLockTransaction(stopSignal: AbortSignal, resolve: () => void) { + const abortController = new AbortController() + const uploadId = UploadId.fromString(this.id) + const onAbort = () => abortController.abort() + + try { + stopSignal.addEventListener('abort', onAbort) + + const acquired = await Promise.race([ + this.waitTimeout(5000, abortController.signal), + (async () => { + while (!abortController.signal.aborted) { + try { + await this.db.acquireObjectLockForTransaction( + uploadId.bucket, + uploadId.objectName, + uploadId.version, + { + timeout: 5 * 60 * 1000, + } + ) + this.isLocked = true + resolve() + return true + } catch (e) { + if (e instanceof StorageBackendError && e.code === ErrorCode.ResourceLocked) { + await this.notifier.release(this.id) + await new Promise((resolve) => { + const timeoutId = setTimeout(resolve, 500) + const cleanup = () => { + clearTimeout(timeoutId) + abortController.signal.removeEventListener('abort', cleanup) + } + abortController.signal.addEventListener('abort', cleanup, { once: true }) + }) + continue + } + throw e } - signal.addEventListener('abort', cleanup, { once: true }) - }) - continue - } - throw e + } + return false + })(), + ]) + + if (!acquired) { + throw ERRORS.LockTimeout() } + } finally { + abortController.abort() + stopSignal.removeEventListener('abort', onAbort) } - - return false } protected waitTimeout(timeout: number, signal: AbortSignal) { diff --git a/src/storage/storage.ts b/src/storage/storage.ts index 6805437e5..d5930f66c 100644 --- a/src/storage/storage.ts +++ b/src/storage/storage.ts @@ -145,39 +145,35 @@ export class Storage { } async createIcebergBucket(data: Parameters[0]) { - return this.db.withTransaction(async (db) => { - const result = await db.createAnalyticsBucket(data) - - await BucketCreatedEvent.invokeOrSend( - { - bucketId: result.id, - bucketName: result.name, - type: 'ANALYTICS', - tenant: { - ref: db.tenantId, - host: db.tenantHost, - }, - sbReqId: db.sbReqId, + const result = await this.db.createAnalyticsBucketTransaction(data) + const tenant = this.db.tenant() + + await BucketCreatedEvent.invokeOrSend( + { + bucketId: result.id, + bucketName: result.name, + type: 'ANALYTICS', + tenant, + sbReqId: this.db.sbReqId, + }, + { + sendWhenError: (error) => { + if (error instanceof StorageBackendError) { + return false + } + + logSchema.error(logger, 'Failed to invoke BucketCreatedEvent handler', { + project: tenant.ref, + type: 'event', + error, + sbReqId: this.db.sbReqId, + }) + return true }, - { - sendWhenError: (error) => { - if (error instanceof StorageBackendError) { - return false - } - - logSchema.error(logger, 'Failed to invoke BucketCreatedEvent handler', { - project: db.tenantId, - type: 'event', - error, - sbReqId: db.sbReqId, - }) - return true - }, - } - ) + } + ) - return result - }) + return result } /** @@ -223,25 +219,7 @@ export class Storage { * @param id */ async deleteBucket(id: string) { - return this.db.withTransaction(async (db) => { - await db.asSuperUser().findBucketById(id, 'id', { - forUpdate: true, - }) - - const countObjects = await db.asSuperUser().countObjectsInBucket(id, 1) - - if (countObjects && countObjects > 0) { - throw ERRORS.BucketNotEmpty(id) - } - - const deleted = await db.deleteBucket(id) - - if (!deleted) { - throw ERRORS.NoSuchBucket(id) - } - - return deleted - }) + return this.db.deleteEmptyBucket(id) } async deleteIcebergBucket(name: string) { @@ -291,13 +269,7 @@ export class Storage { } // ensure delete permissions - await this.db.testPermission(async (db) => { - const deleted = await db.deleteObject(bucketId, objects[0].name) - - if (!deleted) { - throw ERRORS.NoSuchKey(objects[0].name) - } - }) + await this.db.testDeleteObjectPermission(bucketId, objects[0].name) // use queue to recursively delete all objects created before the specified time await ObjectAdminDeleteAllBefore.send({ diff --git a/src/storage/uploader.ts b/src/storage/uploader.ts index a7b4d24a8..7710c6521 100644 --- a/src/storage/uploader.ts +++ b/src/storage/uploader.ts @@ -72,29 +72,17 @@ export class Uploader { async canUpload(options: CanUploadOptions) { const shouldCreateObject = !options.isUpsert - if (shouldCreateObject) { - await this.db.testPermission((db) => { - return db.createObject({ - bucket_id: options.bucketId, - name: options.objectName, - version: '1', - owner: options.owner, - metadata: options.metadata, - user_metadata: options.userMetadata, - }) - }) - } else { - await this.db.testPermission((db) => { - return db.upsertObject({ - bucket_id: options.bucketId, - name: options.objectName, - version: '1', - owner: options.owner, - metadata: options.metadata, - user_metadata: options.userMetadata, - }) - }) - } + await this.db.testObjectPermission( + { + bucket_id: options.bucketId, + name: options.objectName, + version: '1', + owner: options.owner, + metadata: options.metadata, + user_metadata: options.userMetadata, + }, + !shouldCreateObject + ) } /** @@ -211,84 +199,70 @@ export class Uploader { const abController = new AbortController() db.connection.setAbortSignal(abController.signal) - return await db.withTransaction(async (db) => { - await db.waitObjectLock(bucketId, objectName, undefined, { - timeout: 5000, - }) - - const currentObj = await db.findObject(bucketId, objectName, 'id, version, metadata', { - forUpdate: true, - dontErrorOnEmpty: true, - }) - - const isNew = !Boolean(currentObj) - - // update object - const newObject = await db.upsertObject({ - bucket_id: bucketId, - name: objectName, - metadata: objectMetadata, - user_metadata: userMetadata, - version, - owner, - }) - - const events: Promise[] = [] - - // schedule the deletion of the previous file - if (currentObj && currentObj.version !== version) { - events.push( - ObjectAdminDelete.send({ - name: objectName, - bucketId, - tenant: this.db.tenant(), - version: currentObj.version, - reqId: this.db.reqId, - sbReqId: this.db.sbReqId, - }) - ) - } + const result = await db.completeUploadTransaction({ + bucket_id: bucketId, + name: objectName, + metadata: objectMetadata, + user_metadata: userMetadata, + version, + owner, + }) - const event = isUpsert && !isNew ? ObjectCreatedPutEvent : ObjectCreatedPostEvent + const events: Promise[] = [] + // schedule the deletion of the previous file + if (result.previousObject && result.previousObject.version !== version) { events.push( - event - .sendWebhook({ - tenant: this.db.tenant(), - name: objectName, - version, - bucketId, - metadata: objectMetadata, - reqId: this.db.reqId, - sbReqId: this.db.sbReqId, - uploadType, - }) - .catch((e) => { - logSchema.error(logger, 'Failed to send webhook', { - type: 'event', - error: e, - project: this.db.tenantId, - sbReqId: this.db.sbReqId, - metadata: JSON.stringify({ - name: objectName, - bucketId, - metadata: objectMetadata, - reqId: this.db.reqId, - uploadType, - }), - }) - }) + ObjectAdminDelete.send({ + name: objectName, + bucketId, + tenant: this.db.tenant(), + version: result.previousObject.version, + reqId: this.db.reqId, + sbReqId: this.db.sbReqId, + }) ) + } - await Promise.all(events) + const event = isUpsert && !result.isNew ? ObjectCreatedPutEvent : ObjectCreatedPostEvent + + events.push( + event + .sendWebhook({ + tenant: this.db.tenant(), + name: objectName, + version, + bucketId, + metadata: objectMetadata, + reqId: this.db.reqId, + sbReqId: this.db.sbReqId, + uploadType, + }) + .catch((e) => { + logSchema.error(logger, 'Failed to send webhook', { + type: 'event', + error: e, + project: this.db.tenantId, + sbReqId: this.db.sbReqId, + metadata: JSON.stringify({ + name: objectName, + bucketId, + metadata: objectMetadata, + reqId: this.db.reqId, + uploadType, + }), + }) + }) + ) - fileUploadedSuccess.add(1, { - uploadType, - tenantId: this.db.tenantId, - }) + await Promise.all(events) - return { obj: newObject, isNew, metadata: objectMetadata } + fileUploadedSuccess.add(1, { + uploadType, + tenantId: this.db.tenantId, }) + + return { ...result, metadata: objectMetadata } } catch (e) { await ObjectAdminDelete.send({ name: objectName, diff --git a/src/test/s3-protocol.test.ts b/src/test/s3-protocol.test.ts index 783420afb..eb052ea07 100644 --- a/src/test/s3-protocol.test.ts +++ b/src/test/s3-protocol.test.ts @@ -2948,7 +2948,6 @@ describe('Migration compatibility', () => { bucketId, 'test-pre-migration.txt', randomUUID(), - 'sig', undefined, undefined, { @@ -2981,7 +2980,6 @@ describe('Migration compatibility', () => { bucketId, 'test-post-migration.txt', randomUUID(), - 'sig', undefined, undefined, metadata @@ -3004,7 +3002,6 @@ describe('Migration compatibility', () => { bucketId, 'test-find.txt', randomUUID(), - 'sig', undefined, undefined, { diff --git a/src/test/uploader.test.ts b/src/test/uploader.test.ts index eab05ef39..119250937 100644 --- a/src/test/uploader.test.ts +++ b/src/test/uploader.test.ts @@ -13,7 +13,7 @@ type CompleteUploadResult = Awaited> function createUploader( backend: Partial & Pick, db: Partial & - Pick + Pick ) { return new Uploader( backend as UploaderBackend, @@ -305,7 +305,7 @@ describe('fileUploadFromRequest', () => { tenantId: 'stub-tenant', reqId: 'req-1', tenant: () => ({ ref: 'stub-tenant', host: 'stub-tenant.local' }), - testPermission: vi.fn().mockResolvedValue(undefined), + testObjectPermission: vi.fn().mockResolvedValue(undefined), } ) @@ -344,16 +344,9 @@ describe('fileUploadFromRequest', () => { tenantId: 'stub-tenant', reqId: 'req-1', tenant: () => ({ ref: 'stub-tenant', host: 'stub-tenant.local' }), - testPermission: vi.fn(async (fn) => - fn({ - createObject: vi.fn(async (payload: { metadata?: { contentLength?: number } }) => { - capturedWrites.push(payload) - }), - upsertObject: vi.fn(async (payload: { metadata?: { contentLength?: number } }) => { - capturedWrites.push(payload) - }), - }) - ), + testObjectPermission: vi.fn(async (payload) => { + capturedWrites.push(payload as { metadata?: { contentLength?: number } }) + }), }) const completeUploadSpy = vi.spyOn(uploader, 'completeUpload').mockResolvedValue({ metadata: { eTag: '"etag"' },