Skip to content

Commit

Permalink
Introduce thread safety for IndexedDBEngine
Browse files Browse the repository at this point in the history
Yes, I know JavaScript is single-threaded. I'm just referring to concurrent async operations as "threads".
  • Loading branch information
NoelDeMartin committed Feb 23, 2024
1 parent c4e13f4 commit c39e106
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 25 deletions.
14 changes: 7 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
"homepage": "https://soukai.js.org",
"dependencies": {
"@babel/runtime": "^7.12.18",
"@noeldemartin/utils": "^0.4.0",
"@noeldemartin/utils": "0.5.1-next.f797bb85b7a8fd6eec199f7673738a3288fbe5c7",
"@types/webpack-env": "^1.13.6",
"core-js": "^3.9.0",
"idb": "^5.0.2"
Expand Down
15 changes: 15 additions & 0 deletions src/engines/IndexedDBEngine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import 'fake-indexeddb/auto';

import { deleteDB, openDB } from 'idb';
import { faker } from '@noeldemartin/faker';
import { range, resetMemo } from '@noeldemartin/utils';
import type { IDBPDatabase, IDBPTransaction } from 'idb';

import { bootModels } from '@/models';
Expand All @@ -22,6 +23,7 @@ describe('IndexedDBEngine', () => {
let collectionsConnection: IDBPDatabase | null = null;

beforeEach(async () => {
resetMemo();
bootModels({ User });

databaseName = faker.random.word();
Expand Down Expand Up @@ -205,6 +207,19 @@ describe('IndexedDBEngine', () => {
.rejects.toThrow(DocumentNotFound);
});

it('supports concurrent operations', async () => {
// Arrange
const concurrency = 100;

// Act
await Promise.all(range(concurrency).map(() => engine.create(User.collection, { name: faker.random.word() })));

// Assert
const documents = await getDatabaseDocuments(User.collection);

expect(documents).toHaveLength(concurrency);
});

async function resetDatabase(): Promise<void> {
await initMetaDatabase();
await initCollectionsDatabase();
Expand Down
47 changes: 30 additions & 17 deletions src/engines/IndexedDBEngine.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { deleteDB, openDB } from 'idb';
import { Semaphore, memo } from '@noeldemartin/utils';
import type { DBSchema, IDBPCursorWithValue, IDBPObjectStore, TypedDOMStringList } from 'idb';

import DocumentAlreadyExists from '@/errors/DocumentAlreadyExists';
Expand Down Expand Up @@ -84,16 +85,18 @@ export class IndexedDBEngine implements Engine, ClosesConnections {
delete this._documentsConnection;
}

public async create(collection: string, document: EngineDocument, id?: string): Promise<string> {
id = this.helper.obtainDocumentId(id);
public create(collection: string, document: EngineDocument, id?: string): Promise<string> {
return this.atomicOperation(collection, async () => {
const documentId = this.helper.obtainDocumentId(id);

if (await this.documentExists(collection, id)) {
throw new DocumentAlreadyExists(id);
}
if (await this.documentExists(collection, documentId)) {
throw new DocumentAlreadyExists(documentId);
}

await this.createDocument(collection, id, document);
await this.createDocument(collection, documentId, document);

return id;
return documentId;
});
}

public async readOne(collection: string, id: string): Promise<EngineDocument> {
Expand Down Expand Up @@ -135,23 +138,27 @@ export class IndexedDBEngine implements Engine, ClosesConnections {
}

public async update(collection: string, id: string, updates: EngineUpdates): Promise<void> {
const document = await this.getDocument(collection, id);
await this.atomicOperation(collection, async () => {
const document = await this.getDocument(collection, id);

if (!document) {
throw new DocumentNotFound(id, collection);
}
if (!document) {
throw new DocumentNotFound(id, collection);
}

this.helper.updateAttributes(document, updates);
this.helper.updateAttributes(document, updates);

await this.updateDocument(collection, id, document);
await this.updateDocument(collection, id, document);
});
}

public async delete(collection: string, id: string): Promise<void> {
if (!(await this.documentExists(collection, id))) {
throw new DocumentNotFound(id, collection);
}
await this.atomicOperation(collection, async () => {
if (!(await this.documentExists(collection, id))) {
throw new DocumentNotFound(id, collection);
}

await this.deleteDocument(collection, id);
await this.deleteDocument(collection, id);
});
}

private async collectionExists(collection: string): Promise<boolean> {
Expand Down Expand Up @@ -313,4 +320,10 @@ export class IndexedDBEngine implements Engine, ClosesConnections {
);
}

private atomicOperation<T>(collection: string, operation: () => Promise<T>): Promise<T> {
const lock = memo(`idb-${collection}`, () => new Semaphore());

return lock.run(operation);
}

}

0 comments on commit c39e106

Please sign in to comment.