Skip to content

Commit

Permalink
Added Generators and Async Generators support
Browse files Browse the repository at this point in the history
fixes developit#35

tldr; Mainly wrapped the existing promise api to get it to work
with async generators.
I took atleast two iterations to get to this point. At first I thought
to do the job with a readable Stream implementing async iterable
on the main thread, but then was afraid of inconsistencies that
would arise between the two apis. For example Readable Stream
when finished will only return `{ done: true, value: undefined }`
whereas async iterables can return `{ done: true, value: any }`
when `any` is any value.

So, then I decided to make a async generator that could talk to
the worker for better compatibility. One thing to note is that the worker
data onmessage receives an extra piece for the status to cause the
iterator to use. This is similar to the Promise status, but for
generators.
  • Loading branch information
johnsonjo4531 committed Dec 28, 2019
1 parent 2dd902a commit 6d69e9a
Show file tree
Hide file tree
Showing 4 changed files with 280 additions and 42 deletions.
37 changes: 37 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,43 @@ console.log(await getName('developit'))

[🔄 **Run this example on JSFiddle**](https://jsfiddle.net/developit/mf9fbma5/)

## Generator Example

Greenlet can now work with `Generators` and `AsyncGenerators` and will always return an `AsyncGenerator` in their
place. This means you can fetch small portions of data as you need it.

```js
import greenlet from '../greenlet.js';

let lazyGetRepos = greenlet(async function* (username, returnNumber = 10) {
let url = `https://api.github.com/users/${username}/repos`;
let res = await fetch(url);
let repos = await res.json();
while (repos.length > 0) {
let newReturnNumber = yield repos.splice(0, returnNumber);
if (typeof newReturnNumber !== 'undefined') {
returnNumber = newReturnNumber;
}
}
});

const repoIter = lazyGetRepos('developit', 5);
// you could call these over any amount of time...
console.log(await repoIter.next()); // {value: Array(5), done: false}
console.log(await repoIter.next()); // {value: Array(5), done: false}
console.log(await repoIter.next(10)); // {value: Array(10), done: false}
// when your done clean up the asyncIterator
console.log(await repoIter.return()); // {value: undefined, done: true}

// or use for await of syntax to iterate through all values;
const repoIter2 = lazyGetRepos('developit', 5);

for await (const repos of repoIter2) {
console.log(items);
}
// no need to clean up if you have exhausted the iterator.
```


## Transferable ready

Expand Down
137 changes: 101 additions & 36 deletions greenlet.js
Original file line number Diff line number Diff line change
@@ -1,34 +1,69 @@
/** Move an async function into its own thread.
* @param {Function} asyncFunction An (async) function to run in a Worker.
* @param {Function} asyncFunction An (async) or async generator function to run in a Worker.
* @param {{useTransferables?: boolean}} options
* useTransferables defaults to true.
* @public
*/
export default function greenlet(asyncFunction) {
export default function greenlet(asyncFunction, options = {}) {
const defaults = {
useTransferables: true
};
const { useTransferables } = { ...defaults, ...options };
// A simple counter is used to generate worker-global unique ID's for RPC:
let currentId = 0;
let promiseIds = 0;

// A simple counter is use to generate worker-global generator ID's for RPC:
let genIds = 0;

// Outward-facing promises store their "controllers" (`[request, reject]`) here:
const promises = {};

// Use a data URI for the worker's src. It inlines the target function and an RPC handler:
const script = '$$='+asyncFunction+';onmessage='+(e => {
/* global $$ */

// Invoking within then() captures exceptions in the supplied async function as rejections
Promise.resolve(e.data[1]).then(
v => $$.apply($$, v)
).then(
// success handler - callback(id, SUCCESS(0), result)
// if `d` is transferable transfer zero-copy
d => {
postMessage([e.data[0], 0, d], [d].filter(x => (
(x instanceof ArrayBuffer) ||
(x instanceof MessagePort) ||
(self.ImageBitmap && x instanceof ImageBitmap)
)));
},
// error handler - callback(id, ERROR(1), error)
er => { postMessage([e.data[0], 1, '' + er]); }
);
const script = `$$=${asyncFunction};USET=${useTransferables};GENS={};onmessage=` + (e => {
const getTransferables = d => !USET ? [] : d.filter(x => (
(x instanceof ArrayBuffer) ||
(x instanceof MessagePort) ||
(self.ImageBitmap && x instanceof ImageBitmap)
));
const [promiseID, args, status, genID] = e.data;
/* global $$, GENS, USET */
if ($$.constructor.name === 'AsyncGeneratorFunction' || $$.constructor.name === 'GeneratorFunction') {
Promise.resolve(args).then(
// either apply the generator function or use it's iterator
v => !GENS[genID] ? $$.apply($$, v) : GENS[genID][status](v[0])
).then(
// success handler - callback(id, SUCCESS(0))
// if `d` is transferable transfer zero-copy
d => {
// setup the generator
if (!GENS[genID]) {
GENS[genID] = [d.next.bind(d), d.return.bind(d), d.throw.bind(d)];
return postMessage([promiseID, 0, { value: undefined, done: false }]);
}
// yield the value
postMessage([promiseID, 0, d], getTransferables([d.value]));
if (d.done) {
GENS[promiseID] = null;
}
},
// error handler - callback(id, ERROR(1), error)
er => { postMessage([promiseID, 1, '' + er]); }
);
}
else {
// Invoking within then() captures exceptions in the supplied async function as rejections
Promise.resolve(args).then(
v => $$.apply($$, v)
).then(
// success handler - callback(id, SUCCESS(0), result)
// if `d` is transferable transfer zero-copy
d => {
postMessage([promiseID, 0, d], getTransferables([d]));
},
// error handler - callback(id, ERROR(1), error)
er => { postMessage([e.data[0], 1, '' + er]); }
);
}
});
const workerURL = URL.createObjectURL(new Blob([script]));
// Create an "inline" worker (1:1 at definition time)
Expand All @@ -48,20 +83,50 @@ export default function greenlet(asyncFunction) {
promises[e.data[0]] = null;
};

// Return a proxy function that forwards calls to the worker & returns a promise for the result.
return function (args) {
args = [].slice.call(arguments);
return new Promise(function () {
// Add the promise controller to the registry
promises[++currentId] = arguments;
const passMessagePromise = (args, status, genID) => new Promise(function () {
// Add the promise controller to the registry
promises[++promiseIds] = arguments;

// Send an RPC call to the worker - call(id, params)
// The filter is to provide a list of transferables to send zero-copy
worker.postMessage([currentId, args], args.filter(x => (
(x instanceof ArrayBuffer) ||
(x instanceof MessagePort) ||
(self.ImageBitmap && x instanceof ImageBitmap)
)));
});
// Send an RPC call to the worker - call(id, params)
// The filter is to provide a list of transferables to send zero-copy
worker.postMessage([promiseIds, args, status, genID], !useTransferables ? [] : args.filter(x => (
(x instanceof ArrayBuffer) ||
(x instanceof MessagePort) ||
(self.ImageBitmap && x instanceof ImageBitmap)
)));
});
// if it's a generator or async generator function return a async generator function.
if (asyncFunction.constructor.name === 'AsyncGeneratorFunction' || asyncFunction.constructor.name === 'GeneratorFunction') {
return async function* workerPassthrough(...args) {
const genID = ++genIds;
try {
let result = await passMessagePromise(args, 0, genID);
let value;
while (!result.done) {
// request next value
result = await passMessagePromise([value], 0, genID);
if (result.done) {
break;
}
value = yield result.value;
}
return result.value;
}
catch (err) {
// send error message
await passMessagePromise(['' + err], 2, genID);
throw err;
}
finally {
// send return message
await passMessagePromise([undefined], 1, genID);
}
};
}

// Return a proxy function that forwards calls to the worker & returns a promise for the result.
return function () {
const args = [].slice.call(arguments);
return passMessagePromise(args);
};
}
132 changes: 129 additions & 3 deletions greenlet.test.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import greenlet from 'greenlet';
import greenlet from './greenlet';

describe('greenlet', () => {
it('should return an async function', () => {
Expand All @@ -7,6 +7,14 @@ describe('greenlet', () => {
expect(g()).toEqual(jasmine.any(Promise));
});

it('should return an async generator function', () => {
let g = greenlet(function* () {
yield 'one';
});
// expect that it has an iterator
expect(!!g()[Symbol.asyncIterator]).toEqual(true);
});

it('should invoke sync functions', async () => {
let foo = greenlet( a => 'foo: '+a );

Expand All @@ -28,11 +36,129 @@ describe('greenlet', () => {
});

it('should invoke async functions', async () => {
let bar = greenlet( a => new Promise( resolve => {
resolve('bar: '+a);
let bar = greenlet(a => new Promise(resolve => {
resolve('bar: ' + a);
}));

let ret = await bar('test');
expect(ret).toEqual('bar: test');
});

it('should take values from next', async () => {
let g = greenlet(function* () {
const num2 = yield 1;
yield 2 + num2;
});

const it = g();
expect((await it.next()).value).toEqual(1);
expect((await it.next(2)).value).toEqual(4);
});

it('should return both done as true and the value', async () => {
// eslint-disable-next-line require-yield
let g = greenlet(function* (num1) {
return num1;
});

const it = g(3);
const { done, value } = (await it.next());

expect(value).toEqual(3);
expect(done).toEqual(true);
});

it('should only iterate yielded values with for await of', async () => {
let g = greenlet(function* () {
yield 3;
yield 1;
yield 4;
return 1;
});

const arr = [];
for await (const item of g()) {
arr.push(item);
}

expect(arr[0]).toEqual(3);
expect(arr[1]).toEqual(1);
expect(arr[2]).toEqual(4);
expect(arr[3]).toEqual(undefined);
});

it('should return early with return method of async iterator', async () => {
let g = greenlet(function* () {
yield 1;
yield 2;
yield 3;
return 4;
});


const it = g();
expect([
await it.next(),
await it.next(),
await it.return(7),
await it.next(),
await it.next()
]).toEqual([
{ value: 1, done: false },
{ value: 2, done: false },
{ value: 7, done: true },
{ value: undefined, done: true },
{ value: undefined, done: true }
]);
});

it('should throw early with return method of async iterator', async () => {
let g = greenlet(function* () {
yield 1;
yield 2;
yield 3;
return 4;
});


const it = g();
// expect this to reject!
await (async () => ([
await it.next(),
await it.return(),
await it.throw('foo'),
await it.next(),
await it.next()
]))().then(() => {
throw new Error('Promise should not have resolved');
}, () => { /** since it should error, we recover and ignore the error */});
});

it('should act like an equivalent async iterator', async () => {
async function* noG () {
const num2 = yield 1;
yield 2 + num2;
yield 3;
return 4;
}

let g = greenlet(noG);


const it = g();
const it2 = noG();
expect([
await it.next(),
await it.next(2),
await it.next(),
await it.next(),
await it.next()
]).toEqual([
await it2.next(),
await it2.next(2),
await it2.next(),
await it2.next(),
await it2.next()
]);
});
});
16 changes: 13 additions & 3 deletions index.d.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
type AsyncFunction<S extends any[], T> = (...args: S) => Promise<T>;
interface Options {
useTransferables: boolean
}

type MaybeAsyncFunction<S extends any[], T> = (...args: S) => (T | Promise<T>);
type AsyncGenFunction<S extends any[], T=unknown, TReturn=any, TNext=unknown> = (...args: S) => AsyncGenerator<T, TReturn, TNext>;

export default function greenlet<S extends any[], T>(fn: MaybeAsyncFunction<S, T>): AsyncFunction<S, T>;
type AsyncFunction<S extends any[], T=unknown> = (...args: S) => Promise<T>;

type GenFunction<S extends any[], T=unknown, TReturn=any, TNext=unknown> = (...args: S) => (Generator<T, TReturn, TNext> | AsyncGenerator<T, TReturn, TNext>);

type MaybeAsyncFunctionButNotGen<S extends any[], T=unknown, TReturn=any,TNext=unknown> = (...args: S) => (Exclude<T, Generator<T, TReturn, TNext> | AsyncGenerator<T, TReturn, TNext>> | Promise<T>);

type GreenletFnType<S extends any[], T, TReturn = any, TNext = unknown> = MaybeAsyncFunctionButNotGen<S, T, TReturn, TNext> | GenFunction<S, T, TReturn, TNext>;

export default function greenlet<S extends any[], T=unknown, TReturn=any, TNext=unknown, U extends GreenletFnType<S,T,TReturn,TNext> = GreenletFnType<S, T, TReturn, TNext>>(fn: U, options?: Options): U extends GenFunction<infer S,infer T,infer TReturn,infer TNext> ? AsyncGenFunction<S, T, TReturn, TNext> : U extends MaybeAsyncFunctionButNotGen<infer S,infer T> ? AsyncFunction<S, T> : never;

0 comments on commit 6d69e9a

Please sign in to comment.