Skip to content

Commit

Permalink
Fix concurrent idb operations with different collections
Browse files Browse the repository at this point in the history
  • Loading branch information
NoelDeMartin committed Jul 13, 2024
1 parent 16ec8f4 commit 2cbd4b1
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 12 deletions.
19 changes: 16 additions & 3 deletions src/engines/IndexedDBEngine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,15 +209,23 @@ describe('IndexedDBEngine', () => {

it('supports concurrent operations', async () => {
// Arrange
const tables = 4;
const concurrency = 100;

closeConnections();

// Act
await Promise.all(range(concurrency).map(() => engine.create(User.collection, { name: faker.random.word() })));
await Promise.all(
range(concurrency)
.map(i => engine.create(`${User.collection}-${i % tables}`, { name: faker.random.word() })),
);

// Assert
const documents = await getDatabaseDocuments(User.collection);
await reopenConnections();

const documents = await Promise.all(range(tables).map(i => getDatabaseDocuments(`${User.collection}-${i}`)));

expect(documents).toHaveLength(concurrency);
expect(documents.flat()).toHaveLength(concurrency);
});

async function resetDatabase(): Promise<void> {
Expand Down Expand Up @@ -257,6 +265,11 @@ describe('IndexedDBEngine', () => {
});
}

async function reopenConnections(): Promise<void> {
collectionsConnection = await openDB(`soukai-${databaseName}`);
metadataConnection = await openDB(`soukai-${databaseName}-meta`);
}

function closeConnections(): void {
if (metadataConnection) {
metadataConnection.close();
Expand Down
14 changes: 5 additions & 9 deletions src/engines/IndexedDBEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ export class IndexedDBEngine implements Engine, ClosesConnections {

private database: string;
private helper: EngineHelper;
private lock: Semaphore;
private _metadataConnection?: DatabaseConnection<MetadataSchema>;
private _documentsConnection?: DatabaseConnection<DocumentsSchema>;

public constructor(database: null | string = null) {
this.database = database ? 'soukai-' + database : 'soukai';
this.helper = new EngineHelper();
this.lock = memo(`idb-${this.database}`, () => new Semaphore());
}

public async getCollections(): Promise<string[]> {
Expand Down Expand Up @@ -86,7 +88,7 @@ export class IndexedDBEngine implements Engine, ClosesConnections {
}

public create(collection: string, document: EngineDocument, id?: string): Promise<string> {
return this.atomicOperation(collection, async () => {
return this.lock.run(async () => {
const documentId = this.helper.obtainDocumentId(id);

if (await this.documentExists(collection, documentId)) {
Expand Down Expand Up @@ -138,7 +140,7 @@ export class IndexedDBEngine implements Engine, ClosesConnections {
}

public async update(collection: string, id: string, updates: EngineUpdates): Promise<void> {
await this.atomicOperation(collection, async () => {
await this.lock.run(async () => {
const document = await this.getDocument(collection, id);

if (!document) {
Expand All @@ -152,7 +154,7 @@ export class IndexedDBEngine implements Engine, ClosesConnections {
}

public async delete(collection: string, id: string): Promise<void> {
await this.atomicOperation(collection, async () => {
await this.lock.run(async () => {
if (!(await this.documentExists(collection, id))) {
throw new DocumentNotFound(id, collection);
}
Expand Down Expand Up @@ -320,10 +322,4 @@ export class IndexedDBEngine implements Engine, ClosesConnections {
);
}

private atomicOperation<T>(collection: string, operation: () => Promise<T>): Promise<T> {
const lock = memo(`idb-${collection}`, () => new Semaphore());

return lock.run(operation);
}

}

0 comments on commit 2cbd4b1

Please sign in to comment.