Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: allow import to be treated as Promise<Store> per the RDF/JS spec #503

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
],
"dependencies": {
"buffer": "^6.0.3",
"events": "^3.3.0",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already a transitive dependency via readable-stream

"queue-microtask": "^1.1.2",
"readable-stream": "^4.0.0"
},
Expand Down
56 changes: 54 additions & 2 deletions src/N3Store.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// **N3Store** objects store N3 quads by graph in memory.
import EventEmitter from 'events';
import { Readable } from 'readable-stream';
import { default as N3DataFactory, termToId, termFromId } from './N3DataFactory';
import namespaces from './IRIs';
Expand Down Expand Up @@ -408,10 +409,61 @@ export default class N3Store {
return !this.readQuads(subjectOrQuad, predicate, object, graph).next().done;
}

// ### `import` adds a stream of quads to the store
/**
* `import` adds a stream of quads to the store
*
* @returns {EventEmitter & Promise<Store & EventEmitter>} A proxy object that acts as both an EventEmitter
* (for backward compatibility) and a Promise that resolves to the store when the stream is complete.
*/
import(stream) {
// Add quads to the store as they arrive
stream.on('data', quad => { this.addQuad(quad); });
return stream;

// Create a promise that resolves when the stream ends
const promise = new Promise((resolve, reject) => {
// Create proxy that combines N3Store with EventEmitter capabilities
const storeProxy = new Proxy(this, {
get(target, prop, receiver) {
return Reflect.get(prop in EventEmitter.prototype ? stream : target, prop, receiver);
},
});

// Check if stream is already closed/ended
if (stream.readableEnded || stream.destroyed || stream.readable === false || stream.closed || stream.ended) {
// Resolve immediately if stream is already closed
resolve(storeProxy);
}
else {
// Otherwise, wait for end/error events
// eslint-disable-next-line func-style
const onEnd = () => {
// eslint-disable-next-line no-use-before-define
stream.removeListener('error', onError);
resolve(storeProxy);
};

// eslint-disable-next-line func-style
const onError = err => {
stream.removeListener('end', onEnd);
reject(err);
};

stream.once('end', onEnd);
stream.once('error', onError);
Comment on lines +451 to +452
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to only create this promise when then, catch, or finally were used in the below proxy - and thus only start listening to the stream at that point in time.

Unfortunately, that turned out to not be an option; as EventEmitters error if the error event is called and there are no error listeners - which causes the last added test case to break.

}
});

// Return a proxy that acts as both stream and promise without mutating the stream object
return new Proxy(stream, {
get(target, prop) {
// Forward Promise methods to the promise object
if (prop === 'then' || prop === 'catch' || prop === 'finally') {
return promise[prop].bind(promise);
}
// All other properties and methods are from the stream
return target[prop];
},
});
}

// ### `removeQuad` removes a quad from the store if it exists
Expand Down
125 changes: 125 additions & 0 deletions test/N3Store-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2397,6 +2397,131 @@ describe('Store', () => {
it('should have size 2', () => { expect(empty.size).toEqual(2); });
});

describe('#import promise', () => {
it('should have size 2', async () => {
const stream = new ArrayReader([
new Quad(new NamedNode('s1'), new NamedNode('p2'), new NamedNode('o2')),
new Quad(new NamedNode('s1'), new NamedNode('p1'), new NamedNode('o1')),
]);

expect((await new Store().import(stream)).size).toEqual(2);
});
});

describe('N3Store import', () => {
it('should not add "end" or "error" listeners until await is called', async () => {
const stream = new ArrayReader([
new Quad(new NamedNode('s1'), new NamedNode('p2'), new NamedNode('o2')),
new Quad(new NamedNode('s1'), new NamedNode('p1'), new NamedNode('o1')),
]);
const store = new Store();

const newListners = [];
stream.on('newListener', (event, listener) => { newListners.push({ event, listener }); });

const importPromise = store.import(stream);

expect(stream.listenerCount('end')).toEqual(1);
expect(stream.listenerCount('error')).toEqual(1);

expect(newListners).toEqual([
{ event: 'data', listener: expect.any(Function) },
{ event: 'end', listener: expect.any(Function) },
{ event: 'error', listener: expect.any(Function) },
]);

await importPromise;

expect(stream.listenerCount('end')).toEqual(0);
expect(stream.listenerCount('error')).toEqual(0);

// Checking that 'end' and 'error' listeners were added and then removed in importing
expect(newListners).toEqual([
{ event: 'data', listener: expect.any(Function) },
{ event: 'end', listener: expect.any(Function) },
{ event: 'error', listener: expect.any(Function) },
]);
});

it('should still be a functional event emitter before and after awaiting', async () => {
const stream = new ArrayReader([
new Quad(new NamedNode('s1'), new NamedNode('p2'), new NamedNode('o2')),
new Quad(new NamedNode('s1'), new NamedNode('p1'), new NamedNode('o1')),
]);
const store = new Store();
const importPromise = store.import(stream);

let receivedPreAwait = false;
stream.on('testEvent', data => {
receivedPreAwait = true;
expect(data).toBe('testData');
});
stream.emit('testEvent', 'testData');
expect(receivedPreAwait).toBe(true);

await importPromise;

let receivedPostAwait = false;
stream.on('testEvent2', data => {
receivedPostAwait = true;
expect(data).toBe('testDataAfterAwait');
});
stream.emit('testEvent2', 'testDataAfterAwait');
expect(receivedPostAwait).toBe(true);
});

it('should not add store attributes to the stream and vice-versa; pre and post import', async () => {
const stream = new ArrayReader([
new Quad(new NamedNode('s1'), new NamedNode('p2'), new NamedNode('o2')),
new Quad(new NamedNode('s1'), new NamedNode('p1'), new NamedNode('o1')),
]);
const store = new Store();
const importPromise = store.import(stream);

expect(stream.addQuad).toBeUndefined();
expect(store.addQuad).not.toBeUndefined();

expect(store.on).toBeUndefined();
expect(stream.on).not.toBeUndefined();

const res = await importPromise;

expect(stream.addQuad).toBeUndefined();
expect(store.addQuad).not.toBeUndefined();
expect(res.addQuad).not.toBeUndefined();
expect(importPromise.addQuad).toBeUndefined();

expect(store.on).toBeUndefined();
expect(stream.on).not.toBeUndefined();
expect(res.on).not.toBeUndefined();
});

it('should resolve the promise if importing a completed stream', async () => {
const stream = new ArrayReader([]);
const store = new Store();

stream.on('data', () => {});

await new Promise(resolve => {
stream.on('end', resolve);
stream.on('error', resolve);
});

const importResult = await store.import(stream);

expect(importResult).toBeTruthy();
});

it('should reject if there is an error in the stream', async () => {
const stream = new ArrayReader([]);
const store = new Store();
const imported = store.import(stream);
stream.emit('error', 'Test error');

await expect(imported).rejects.toEqual('Test error');
});
});

describe('#forEach', () => {
it('should iterate over quads', () => {
let count = 0;
Expand Down