From e145c3b6a4bdd6e8cbeb2e2701c128bc312d7add Mon Sep 17 00:00:00 2001 From: Jesse Wright <63333554+jeswr@users.noreply.github.com> Date: Fri, 7 Mar 2025 19:04:40 +0000 Subject: [PATCH 1/4] fix: allow import to be treated as Promise per the RDF/JS spec --- package-lock.json | 2 + package.json | 1 + src/N3Store.js | 52 +++++++++++++++++- test/N3Store-test.js | 125 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 179 insertions(+), 1 deletion(-) diff --git a/package-lock.json b/package-lock.json index ef7f9039..d0faa2f8 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,6 +10,7 @@ "license": "MIT", "dependencies": { "buffer": "^6.0.3", + "events": "^3.3.0", "queue-microtask": "^1.1.2", "readable-stream": "^4.0.0" }, @@ -5526,6 +5527,7 @@ "version": "3.3.0", "resolved": "https://registry.npmjs.org/events/-/events-3.3.0.tgz", "integrity": "sha512-mQw+2fkQbALzQ7V0MY0IqdnXNOeTtP4r0lN9z7AAawCXgqea7bDii20AYrIBrFd/Hx0M2Ocz6S111CaFkUcb0Q==", + "license": "MIT", "engines": { "node": ">=0.8.x" } diff --git a/package.json b/package.json index 09487a6e..1068cfe1 100644 --- a/package.json +++ b/package.json @@ -25,6 +25,7 @@ ], "dependencies": { "buffer": "^6.0.3", + "events": "^3.3.0", "queue-microtask": "^1.1.2", "readable-stream": "^4.0.0" }, diff --git a/src/N3Store.js b/src/N3Store.js index 8eaac9eb..95f82b0a 100644 --- a/src/N3Store.js +++ b/src/N3Store.js @@ -1,4 +1,5 @@ // **N3Store** objects store N3 quads by graph in memory. +import EventEmitter from 'events'; import { Readable } from 'readable-stream'; import { default as N3DataFactory, termToId, termFromId } from './N3DataFactory'; import namespaces from './IRIs'; @@ -410,8 +411,57 @@ export default class N3Store { // ### `import` adds a stream of quads to the store import(stream) { + // Add quads to the store as they arrive stream.on('data', quad => { this.addQuad(quad); }); - return stream; + + // Create a promise that resolves when the stream ends + const promise = new Promise((resolve, reject) => { + // Create proxy that combines N3Store with EventEmitter capabilities + const storeProxy = new Proxy(this, { + get(target, prop, receiver) { + if (prop in EventEmitter.prototype) { + return Reflect.get(stream, prop, receiver); + } + return Reflect.get(target, prop, receiver); + }, + }); + + // Check if stream is already closed/ended + if (stream.readableEnded || stream.destroyed || stream.readable === false || stream.closed || stream.ended) { + // Resolve immediately if stream is already closed + resolve(storeProxy); + } + else { + // Otherwise, wait for end/error events + // eslint-disable-next-line func-style + const onEnd = () => { + // eslint-disable-next-line no-use-before-define + stream.removeListener('error', onError); + resolve(storeProxy); + }; + + // eslint-disable-next-line func-style + const onError = err => { + stream.removeListener('end', onEnd); + reject(err); + }; + + stream.once('end', onEnd); + stream.once('error', onError); + } + }); + + // Return a proxy that acts as both stream and promise without mutating the stream object + return new Proxy(stream, { + get(target, prop) { + // Forward Promise methods to the promise object + if (prop === 'then' || prop === 'catch' || prop === 'finally') { + return promise[prop].bind(promise); + } + // All other properties and methods are from the stream + return target[prop]; + }, + }); } // ### `removeQuad` removes a quad from the store if it exists diff --git a/test/N3Store-test.js b/test/N3Store-test.js index 4a88aca9..18229ae1 100644 --- a/test/N3Store-test.js +++ b/test/N3Store-test.js @@ -2397,6 +2397,131 @@ describe('Store', () => { it('should have size 2', () => { expect(empty.size).toEqual(2); }); }); + describe('#import promise', () => { + it('should have size 2', async () => { + const stream = new ArrayReader([ + new Quad(new NamedNode('s1'), new NamedNode('p2'), new NamedNode('o2')), + new Quad(new NamedNode('s1'), new NamedNode('p1'), new NamedNode('o1')), + ]); + + expect((await new Store().import(stream)).size).toEqual(2); + }); + }); + + describe('N3Store import', () => { + it('should not add "end" or "error" listeners until await is called', async () => { + const stream = new ArrayReader([ + new Quad(new NamedNode('s1'), new NamedNode('p2'), new NamedNode('o2')), + new Quad(new NamedNode('s1'), new NamedNode('p1'), new NamedNode('o1')), + ]); + const store = new Store(); + + const newListners = []; + stream.on('newListener', (event, listener) => { newListners.push({ event, listener }); }); + + const importPromise = store.import(stream); + + expect(stream.listenerCount('end')).toEqual(1); + expect(stream.listenerCount('error')).toEqual(1); + + expect(newListners).toEqual([ + { event: 'data', listener: expect.any(Function) }, + { event: 'end', listener: expect.any(Function) }, + { event: 'error', listener: expect.any(Function) }, + ]); + + await importPromise; + + expect(stream.listenerCount('end')).toEqual(0); + expect(stream.listenerCount('error')).toEqual(0); + + // Checking that 'end' and 'error' listeners were added and then removed in importing + expect(newListners).toEqual([ + { event: 'data', listener: expect.any(Function) }, + { event: 'end', listener: expect.any(Function) }, + { event: 'error', listener: expect.any(Function) }, + ]); + }); + + it('should still be a functional event emitter before and after awaiting', async () => { + const stream = new ArrayReader([ + new Quad(new NamedNode('s1'), new NamedNode('p2'), new NamedNode('o2')), + new Quad(new NamedNode('s1'), new NamedNode('p1'), new NamedNode('o1')), + ]); + const store = new Store(); + const importPromise = store.import(stream); + + let receivedPreAwait = false; + stream.on('testEvent', data => { + receivedPreAwait = true; + expect(data).toBe('testData'); + }); + stream.emit('testEvent', 'testData'); + expect(receivedPreAwait).toBe(true); + + await importPromise; + + let receivedPostAwait = false; + stream.on('testEvent2', data => { + receivedPostAwait = true; + expect(data).toBe('testDataAfterAwait'); + }); + stream.emit('testEvent2', 'testDataAfterAwait'); + expect(receivedPostAwait).toBe(true); + }); + + it('should not add store attributes to the stream and vice-versa; pre and post import', async () => { + const stream = new ArrayReader([ + new Quad(new NamedNode('s1'), new NamedNode('p2'), new NamedNode('o2')), + new Quad(new NamedNode('s1'), new NamedNode('p1'), new NamedNode('o1')), + ]); + const store = new Store(); + const importPromise = store.import(stream); + + expect(stream.addQuad).toBeUndefined(); + expect(store.addQuad).not.toBeUndefined(); + + expect(store.on).toBeUndefined(); + expect(stream.on).not.toBeUndefined(); + + const res = await importPromise; + + expect(stream.addQuad).toBeUndefined(); + expect(store.addQuad).not.toBeUndefined(); + expect(res.addQuad).not.toBeUndefined(); + expect(importPromise.addQuad).toBeUndefined(); + + expect(store.on).toBeUndefined(); + expect(stream.on).not.toBeUndefined(); + expect(res.on).not.toBeUndefined(); + }); + + it('should resolve the promise if importing a completed stream', async () => { + const stream = new ArrayReader([]); + const store = new Store(); + + stream.on('data', () => {}); + + await new Promise(resolve => { + stream.on('end', resolve); + stream.on('error', resolve); + }); + + const importResult = await store.import(stream); + + expect(importResult).toBeTruthy(); + }); + + it('should reject if there is an error in the stream', async () => { + const stream = new ArrayReader([]); + const store = new Store(); + const imported = store.import(stream); + stream.emit('error', 'Test error'); + + await expect(imported).rejects.toEqual('Test error'); + }); + }); + describe('#forEach', () => { it('should iterate over quads', () => { let count = 0; From 8626c615d0785f99c79339e415f6dfa112a46eba Mon Sep 17 00:00:00 2001 From: Jesse Wright <63333554+jeswr@users.noreply.github.com> Date: Sun, 16 Mar 2025 14:55:23 +0000 Subject: [PATCH 2/4] chore: code cleanup --- src/N3Store.js | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/N3Store.js b/src/N3Store.js index 95f82b0a..501e3763 100644 --- a/src/N3Store.js +++ b/src/N3Store.js @@ -419,10 +419,7 @@ export default class N3Store { // Create proxy that combines N3Store with EventEmitter capabilities const storeProxy = new Proxy(this, { get(target, prop, receiver) { - if (prop in EventEmitter.prototype) { - return Reflect.get(stream, prop, receiver); - } - return Reflect.get(target, prop, receiver); + return Reflect.get(prop in EventEmitter.prototype ? stream : target, prop, receiver); }, }); From feb5b322a063c666de69a0505e39028233ecadcd Mon Sep 17 00:00:00 2001 From: Jesse Wright <63333554+jeswr@users.noreply.github.com> Date: Sun, 16 Mar 2025 15:02:20 +0000 Subject: [PATCH 3/4] chore: improve docs --- src/N3Store.js | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/N3Store.js b/src/N3Store.js index 501e3763..79f1cab0 100644 --- a/src/N3Store.js +++ b/src/N3Store.js @@ -409,7 +409,12 @@ export default class N3Store { return !this.readQuads(subjectOrQuad, predicate, object, graph).next().done; } - // ### `import` adds a stream of quads to the store + /** + * `import` adds a stream of quads to the store + * + * @returns {EventEmitter & Promise} A proxy object that acts as both an EventEmitter + * (for backward compatibility) and a Promise that resolves to the store when the stream is complete. + */ import(stream) { // Add quads to the store as they arrive stream.on('data', quad => { this.addQuad(quad); }); From 9445788cf2d26e1afb9ad476aa79732886467b32 Mon Sep 17 00:00:00 2001 From: Jesse Wright <63333554+jeswr@users.noreply.github.com> Date: Sun, 16 Mar 2025 16:18:06 +0000 Subject: [PATCH 4/4] chore: remove trailing spaces --- src/N3Store.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/N3Store.js b/src/N3Store.js index 79f1cab0..585470ef 100644 --- a/src/N3Store.js +++ b/src/N3Store.js @@ -411,8 +411,8 @@ export default class N3Store { /** * `import` adds a stream of quads to the store - * - * @returns {EventEmitter & Promise} A proxy object that acts as both an EventEmitter + * + * @returns {EventEmitter & Promise} A proxy object that acts as both an EventEmitter * (for backward compatibility) and a Promise that resolves to the store when the stream is complete. */ import(stream) {