From 6d69e9a109e61efe0ceca864ab7492001cad079c Mon Sep 17 00:00:00 2001 From: John Johnson II Date: Fri, 27 Dec 2019 20:54:15 -0700 Subject: [PATCH 1/5] Added Generators and Async Generators support fixes https://github.com/developit/greenlet/issues/35 tldr; Mainly wrapped the existing promise api to get it to work with async generators. I took atleast two iterations to get to this point. At first I thought to do the job with a readable Stream implementing async iterable on the main thread, but then was afraid of inconsistencies that would arise between the two apis. For example Readable Stream when finished will only return `{ done: true, value: undefined }` whereas async iterables can return `{ done: true, value: any }` when `any` is any value. So, then I decided to make a async generator that could talk to the worker for better compatibility. One thing to note is that the worker data onmessage receives an extra piece for the status to cause the iterator to use. This is similar to the Promise status, but for generators. --- README.md | 37 +++++++++++++ greenlet.js | 137 ++++++++++++++++++++++++++++++++++------------- greenlet.test.js | 132 +++++++++++++++++++++++++++++++++++++++++++-- index.d.ts | 16 ++++-- 4 files changed, 280 insertions(+), 42 deletions(-) diff --git a/README.md b/README.md index 917a0c2..e8fffa7 100644 --- a/README.md +++ b/README.md @@ -58,6 +58,43 @@ console.log(await getName('developit')) [🔄 **Run this example on JSFiddle**](https://jsfiddle.net/developit/mf9fbma5/) +## Generator Example + +Greenlet can now work with `Generators` and `AsyncGenerators` and will always return an `AsyncGenerator` in their +place. This means you can fetch small portions of data as you need it. + +```js +import greenlet from '../greenlet.js'; + +let lazyGetRepos = greenlet(async function* (username, returnNumber = 10) { + let url = `https://api.github.com/users/${username}/repos`; + let res = await fetch(url); + let repos = await res.json(); + while (repos.length > 0) { + let newReturnNumber = yield repos.splice(0, returnNumber); + if (typeof newReturnNumber !== 'undefined') { + returnNumber = newReturnNumber; + } + } +}); + +const repoIter = lazyGetRepos('developit', 5); +// you could call these over any amount of time... +console.log(await repoIter.next()); // {value: Array(5), done: false} +console.log(await repoIter.next()); // {value: Array(5), done: false} +console.log(await repoIter.next(10)); // {value: Array(10), done: false} +// when your done clean up the asyncIterator +console.log(await repoIter.return()); // {value: undefined, done: true} + +// or use for await of syntax to iterate through all values; +const repoIter2 = lazyGetRepos('developit', 5); + +for await (const repos of repoIter2) { + console.log(items); +} +// no need to clean up if you have exhausted the iterator. +``` + ## Transferable ready diff --git a/greenlet.js b/greenlet.js index bcf3493..2f04389 100644 --- a/greenlet.js +++ b/greenlet.js @@ -1,34 +1,69 @@ /** Move an async function into its own thread. - * @param {Function} asyncFunction An (async) function to run in a Worker. + * @param {Function} asyncFunction An (async) or async generator function to run in a Worker. + * @param {{useTransferables?: boolean}} options + * useTransferables defaults to true. * @public */ -export default function greenlet(asyncFunction) { +export default function greenlet(asyncFunction, options = {}) { + const defaults = { + useTransferables: true + }; + const { useTransferables } = { ...defaults, ...options }; // A simple counter is used to generate worker-global unique ID's for RPC: - let currentId = 0; + let promiseIds = 0; + + // A simple counter is use to generate worker-global generator ID's for RPC: + let genIds = 0; // Outward-facing promises store their "controllers" (`[request, reject]`) here: const promises = {}; // Use a data URI for the worker's src. It inlines the target function and an RPC handler: - const script = '$$='+asyncFunction+';onmessage='+(e => { - /* global $$ */ - - // Invoking within then() captures exceptions in the supplied async function as rejections - Promise.resolve(e.data[1]).then( - v => $$.apply($$, v) - ).then( - // success handler - callback(id, SUCCESS(0), result) - // if `d` is transferable transfer zero-copy - d => { - postMessage([e.data[0], 0, d], [d].filter(x => ( - (x instanceof ArrayBuffer) || - (x instanceof MessagePort) || - (self.ImageBitmap && x instanceof ImageBitmap) - ))); - }, - // error handler - callback(id, ERROR(1), error) - er => { postMessage([e.data[0], 1, '' + er]); } - ); + const script = `$$=${asyncFunction};USET=${useTransferables};GENS={};onmessage=` + (e => { + const getTransferables = d => !USET ? [] : d.filter(x => ( + (x instanceof ArrayBuffer) || + (x instanceof MessagePort) || + (self.ImageBitmap && x instanceof ImageBitmap) + )); + const [promiseID, args, status, genID] = e.data; + /* global $$, GENS, USET */ + if ($$.constructor.name === 'AsyncGeneratorFunction' || $$.constructor.name === 'GeneratorFunction') { + Promise.resolve(args).then( + // either apply the generator function or use it's iterator + v => !GENS[genID] ? $$.apply($$, v) : GENS[genID][status](v[0]) + ).then( + // success handler - callback(id, SUCCESS(0)) + // if `d` is transferable transfer zero-copy + d => { + // setup the generator + if (!GENS[genID]) { + GENS[genID] = [d.next.bind(d), d.return.bind(d), d.throw.bind(d)]; + return postMessage([promiseID, 0, { value: undefined, done: false }]); + } + // yield the value + postMessage([promiseID, 0, d], getTransferables([d.value])); + if (d.done) { + GENS[promiseID] = null; + } + }, + // error handler - callback(id, ERROR(1), error) + er => { postMessage([promiseID, 1, '' + er]); } + ); + } + else { + // Invoking within then() captures exceptions in the supplied async function as rejections + Promise.resolve(args).then( + v => $$.apply($$, v) + ).then( + // success handler - callback(id, SUCCESS(0), result) + // if `d` is transferable transfer zero-copy + d => { + postMessage([promiseID, 0, d], getTransferables([d])); + }, + // error handler - callback(id, ERROR(1), error) + er => { postMessage([e.data[0], 1, '' + er]); } + ); + } }); const workerURL = URL.createObjectURL(new Blob([script])); // Create an "inline" worker (1:1 at definition time) @@ -48,20 +83,50 @@ export default function greenlet(asyncFunction) { promises[e.data[0]] = null; }; - // Return a proxy function that forwards calls to the worker & returns a promise for the result. - return function (args) { - args = [].slice.call(arguments); - return new Promise(function () { - // Add the promise controller to the registry - promises[++currentId] = arguments; + const passMessagePromise = (args, status, genID) => new Promise(function () { + // Add the promise controller to the registry + promises[++promiseIds] = arguments; - // Send an RPC call to the worker - call(id, params) - // The filter is to provide a list of transferables to send zero-copy - worker.postMessage([currentId, args], args.filter(x => ( - (x instanceof ArrayBuffer) || - (x instanceof MessagePort) || - (self.ImageBitmap && x instanceof ImageBitmap) - ))); - }); + // Send an RPC call to the worker - call(id, params) + // The filter is to provide a list of transferables to send zero-copy + worker.postMessage([promiseIds, args, status, genID], !useTransferables ? [] : args.filter(x => ( + (x instanceof ArrayBuffer) || + (x instanceof MessagePort) || + (self.ImageBitmap && x instanceof ImageBitmap) + ))); + }); + // if it's a generator or async generator function return a async generator function. + if (asyncFunction.constructor.name === 'AsyncGeneratorFunction' || asyncFunction.constructor.name === 'GeneratorFunction') { + return async function* workerPassthrough(...args) { + const genID = ++genIds; + try { + let result = await passMessagePromise(args, 0, genID); + let value; + while (!result.done) { + // request next value + result = await passMessagePromise([value], 0, genID); + if (result.done) { + break; + } + value = yield result.value; + } + return result.value; + } + catch (err) { + // send error message + await passMessagePromise(['' + err], 2, genID); + throw err; + } + finally { + // send return message + await passMessagePromise([undefined], 1, genID); + } + }; + } + + // Return a proxy function that forwards calls to the worker & returns a promise for the result. + return function () { + const args = [].slice.call(arguments); + return passMessagePromise(args); }; } diff --git a/greenlet.test.js b/greenlet.test.js index 2528a6c..1027df5 100644 --- a/greenlet.test.js +++ b/greenlet.test.js @@ -1,4 +1,4 @@ -import greenlet from 'greenlet'; +import greenlet from './greenlet'; describe('greenlet', () => { it('should return an async function', () => { @@ -7,6 +7,14 @@ describe('greenlet', () => { expect(g()).toEqual(jasmine.any(Promise)); }); + it('should return an async generator function', () => { + let g = greenlet(function* () { + yield 'one'; + }); + // expect that it has an iterator + expect(!!g()[Symbol.asyncIterator]).toEqual(true); + }); + it('should invoke sync functions', async () => { let foo = greenlet( a => 'foo: '+a ); @@ -28,11 +36,129 @@ describe('greenlet', () => { }); it('should invoke async functions', async () => { - let bar = greenlet( a => new Promise( resolve => { - resolve('bar: '+a); + let bar = greenlet(a => new Promise(resolve => { + resolve('bar: ' + a); })); let ret = await bar('test'); expect(ret).toEqual('bar: test'); }); + + it('should take values from next', async () => { + let g = greenlet(function* () { + const num2 = yield 1; + yield 2 + num2; + }); + + const it = g(); + expect((await it.next()).value).toEqual(1); + expect((await it.next(2)).value).toEqual(4); + }); + + it('should return both done as true and the value', async () => { + // eslint-disable-next-line require-yield + let g = greenlet(function* (num1) { + return num1; + }); + + const it = g(3); + const { done, value } = (await it.next()); + + expect(value).toEqual(3); + expect(done).toEqual(true); + }); + + it('should only iterate yielded values with for await of', async () => { + let g = greenlet(function* () { + yield 3; + yield 1; + yield 4; + return 1; + }); + + const arr = []; + for await (const item of g()) { + arr.push(item); + } + + expect(arr[0]).toEqual(3); + expect(arr[1]).toEqual(1); + expect(arr[2]).toEqual(4); + expect(arr[3]).toEqual(undefined); + }); + + it('should return early with return method of async iterator', async () => { + let g = greenlet(function* () { + yield 1; + yield 2; + yield 3; + return 4; + }); + + + const it = g(); + expect([ + await it.next(), + await it.next(), + await it.return(7), + await it.next(), + await it.next() + ]).toEqual([ + { value: 1, done: false }, + { value: 2, done: false }, + { value: 7, done: true }, + { value: undefined, done: true }, + { value: undefined, done: true } + ]); + }); + + it('should throw early with return method of async iterator', async () => { + let g = greenlet(function* () { + yield 1; + yield 2; + yield 3; + return 4; + }); + + + const it = g(); + // expect this to reject! + await (async () => ([ + await it.next(), + await it.return(), + await it.throw('foo'), + await it.next(), + await it.next() + ]))().then(() => { + throw new Error('Promise should not have resolved'); + }, () => { /** since it should error, we recover and ignore the error */}); + }); + + it('should act like an equivalent async iterator', async () => { + async function* noG () { + const num2 = yield 1; + yield 2 + num2; + yield 3; + return 4; + } + + let g = greenlet(noG); + + + const it = g(); + const it2 = noG(); + expect([ + await it.next(), + await it.next(2), + await it.next(), + await it.next(), + await it.next() + ]).toEqual([ + await it2.next(), + await it2.next(2), + await it2.next(), + await it2.next(), + await it2.next() + ]); + }); }); diff --git a/index.d.ts b/index.d.ts index 98f4cd8..92a5687 100644 --- a/index.d.ts +++ b/index.d.ts @@ -1,5 +1,15 @@ -type AsyncFunction = (...args: S) => Promise; +interface Options { + useTransferables: boolean +} -type MaybeAsyncFunction = (...args: S) => (T | Promise); +type AsyncGenFunction = (...args: S) => AsyncGenerator; -export default function greenlet(fn: MaybeAsyncFunction): AsyncFunction; +type AsyncFunction = (...args: S) => Promise; + +type GenFunction = (...args: S) => (Generator | AsyncGenerator); + +type MaybeAsyncFunctionButNotGen = (...args: S) => (Exclude | AsyncGenerator> | Promise); + +type GreenletFnType = MaybeAsyncFunctionButNotGen | GenFunction; + +export default function greenlet = GreenletFnType>(fn: U, options?: Options): U extends GenFunction ? AsyncGenFunction : U extends MaybeAsyncFunctionButNotGen ? AsyncFunction : never; From 70e95f5a9b7148f429a32b01b374582fed0fe3ae Mon Sep 17 00:00:00 2001 From: John Johnson II Date: Fri, 27 Dec 2019 21:31:36 -0700 Subject: [PATCH 2/5] =?UTF-8?q?Shortened=20the=20code=20a=20bit=20?= =?UTF-8?q?=F0=9F=91=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- greenlet.js | 43 +++++++++++++++++-------------------------- 1 file changed, 17 insertions(+), 26 deletions(-) diff --git a/greenlet.js b/greenlet.js index 2f04389..d2a2891 100644 --- a/greenlet.js +++ b/greenlet.js @@ -27,17 +27,18 @@ export default function greenlet(asyncFunction, options = {}) { )); const [promiseID, args, status, genID] = e.data; /* global $$, GENS, USET */ - if ($$.constructor.name === 'AsyncGeneratorFunction' || $$.constructor.name === 'GeneratorFunction') { - Promise.resolve(args).then( - // either apply the generator function or use it's iterator - v => !GENS[genID] ? $$.apply($$, v) : GENS[genID][status](v[0]) - ).then( - // success handler - callback(id, SUCCESS(0)) - // if `d` is transferable transfer zero-copy - d => { + Promise.resolve(args).then( + // either apply the async/generator/async generator function or use a generator function's iterator + v => !GENS[genID] ? $$.apply($$, v) : GENS[genID][status](v[0]) + ).then( + // success handler - callback(id, SUCCESS(0)) + // if `d` is transferable transfer zero-copy + d => { + if ($$.constructor.name === 'AsyncGeneratorFunction' || $$.constructor.name === 'GeneratorFunction') { // setup the generator if (!GENS[genID]) { GENS[genID] = [d.next.bind(d), d.return.bind(d), d.throw.bind(d)]; + // return an initial message of success. return postMessage([promiseID, 0, { value: undefined, done: false }]); } // yield the value @@ -45,25 +46,15 @@ export default function greenlet(asyncFunction, options = {}) { if (d.done) { GENS[promiseID] = null; } - }, - // error handler - callback(id, ERROR(1), error) - er => { postMessage([promiseID, 1, '' + er]); } - ); - } - else { - // Invoking within then() captures exceptions in the supplied async function as rejections - Promise.resolve(args).then( - v => $$.apply($$, v) - ).then( - // success handler - callback(id, SUCCESS(0), result) - // if `d` is transferable transfer zero-copy - d => { + } + else { + // here we know it's just an async function that needs it's return value. postMessage([promiseID, 0, d], getTransferables([d])); - }, - // error handler - callback(id, ERROR(1), error) - er => { postMessage([e.data[0], 1, '' + er]); } - ); - } + } + }, + // error handler - callback(id, ERROR(1), error) + er => { postMessage([promiseID, 1, '' + er]); } + ); }); const workerURL = URL.createObjectURL(new Blob([script])); // Create an "inline" worker (1:1 at definition time) From c7dcecadbcc772620048f4d6b3c94a7f7a440ef1 Mon Sep 17 00:00:00 2001 From: John Johnson II Date: Fri, 27 Dec 2019 22:17:35 -0700 Subject: [PATCH 3/5] Moved globals up further. --- greenlet.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/greenlet.js b/greenlet.js index d2a2891..134ffd3 100644 --- a/greenlet.js +++ b/greenlet.js @@ -20,13 +20,13 @@ export default function greenlet(asyncFunction, options = {}) { // Use a data URI for the worker's src. It inlines the target function and an RPC handler: const script = `$$=${asyncFunction};USET=${useTransferables};GENS={};onmessage=` + (e => { + /* global $$, GENS, USET */ const getTransferables = d => !USET ? [] : d.filter(x => ( (x instanceof ArrayBuffer) || (x instanceof MessagePort) || (self.ImageBitmap && x instanceof ImageBitmap) )); const [promiseID, args, status, genID] = e.data; - /* global $$, GENS, USET */ Promise.resolve(args).then( // either apply the async/generator/async generator function or use a generator function's iterator v => !GENS[genID] ? $$.apply($$, v) : GENS[genID][status](v[0]) From a960aba96c8d4ac499cda27d69a439289741972a Mon Sep 17 00:00:00 2001 From: John Johnson II Date: Sun, 29 Dec 2019 23:46:42 -0700 Subject: [PATCH 4/5] fixed cleanup bug --- greenlet.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/greenlet.js b/greenlet.js index 134ffd3..706ca83 100644 --- a/greenlet.js +++ b/greenlet.js @@ -44,7 +44,7 @@ export default function greenlet(asyncFunction, options = {}) { // yield the value postMessage([promiseID, 0, d], getTransferables([d.value])); if (d.done) { - GENS[promiseID] = null; + GENS[genID] = null; } } else { From 83ff34966e79f85653312b7a841d820a85576a46 Mon Sep 17 00:00:00 2001 From: John Johnson II Date: Tue, 31 Dec 2019 16:22:03 -0700 Subject: [PATCH 5/5] fixed bundling issue Microbundle wouldn't build, because the previous commits used async generator syntax. This is now fixed since we are now using the async iterator protocol instead. --- greenlet.js | 47 +++++++++++++++++---------------- greenlet.test.js | 67 ++++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 87 insertions(+), 27 deletions(-) diff --git a/greenlet.js b/greenlet.js index 706ca83..ba61e6e 100644 --- a/greenlet.js +++ b/greenlet.js @@ -88,30 +88,33 @@ export default function greenlet(asyncFunction, options = {}) { }); // if it's a generator or async generator function return a async generator function. if (asyncFunction.constructor.name === 'AsyncGeneratorFunction' || asyncFunction.constructor.name === 'GeneratorFunction') { - return async function* workerPassthrough(...args) { + return function workerPassthrough () { const genID = ++genIds; - try { - let result = await passMessagePromise(args, 0, genID); - let value; - while (!result.done) { - // request next value - result = await passMessagePromise([value], 0, genID); - if (result.done) { - break; - } - value = yield result.value; + const init = passMessagePromise([].slice.call(arguments), 0, genID); + return { + done: false, + async next (value) { + await init; + if (this.done) { return { value: undefined, done: true }; } + const result = await passMessagePromise([value], 0, genID); + if (result.done) { return this.return(result.value); } + return result; + }, + async return (value) { + await init; + await passMessagePromise([undefined], 1, genID); + this.done = true; + return { value, done: true }; + }, + async throw (err) { + await init; + await passMessagePromise(['' + err], 2, genID); + throw err; + }, + [Symbol.asyncIterator] () { + return this; } - return result.value; - } - catch (err) { - // send error message - await passMessagePromise(['' + err], 2, genID); - throw err; - } - finally { - // send return message - await passMessagePromise([undefined], 1, genID); - } + }; }; } diff --git a/greenlet.test.js b/greenlet.test.js index 1027df5..3ffb2ec 100644 --- a/greenlet.test.js +++ b/greenlet.test.js @@ -57,15 +57,18 @@ describe('greenlet', () => { it('should return both done as true and the value', async () => { // eslint-disable-next-line require-yield - let g = greenlet(function* (num1) { + function* f (num1) { return num1; - }); + } + let g = greenlet(f); const it = g(3); + const it2 = f(3); const { done, value } = (await it.next()); + const { done: done2, value: value2 } = (await it2.next()); - expect(value).toEqual(3); - expect(done).toEqual(true); + expect(value).toEqual(value2); + expect(done).toEqual(done2); }); it('should only iterate yielded values with for await of', async () => { @@ -111,7 +114,7 @@ describe('greenlet', () => { { value: undefined, done: true } ]); }); - + it('should throw early with return method of async iterator', async () => { let g = greenlet(function* () { yield 1; @@ -161,4 +164,58 @@ describe('greenlet', () => { await it2.next() ]); }); + + it('should throw like an equivalent async iterator', async () => { + async function* noG () { + const num2 = yield 1; + yield 2 + num2; + yield 3; + return 4; + } + + let g = greenlet(noG); + + + const it = g(); + const it2 = noG(); + expect([ + await it.next(), + await it.next(2), + await it.throw().catch(e => 2), + await it.return(), + await it.throw().catch(e => 3) + ]).toEqual([ + await it2.next(), + await it2.next(2), + await it2.throw().catch(e => 2), + await it2.return(), + await it2.throw().catch(e => 3) + ]); + }); + + it('should return like an equivalent async iterator', async () => { + async function* noG () { + const num2 = yield 1; + yield 2 + num2; + yield 3; + return 4; + } + + let g = greenlet(noG); + + + const it = g(); + const it2 = noG(); + expect([ + await it.next(), + await it.next(2), + await it.return(), + await it.return() + ]).toEqual([ + await it2.next(), + await it2.next(2), + await it2.return(), + await it2.return() + ]); + }); });