- Introducing pipes/convert
- About
- Installing
- API Reference
- fromIterable
- fromObservable
- fromPromise
- toObservable
- toPromise
The pipesjs/convert module provides various functions to convert web streams to other similar data structures like highland streams, Observables, Rx streams etc. and vice versa.
For more about Web Streams, refer to the spec.
The convert module provides various functions to convert web streams to other similar data structures like highland streams, Observables, Rx streams etc. and vice versa. Here's more about Web Streams from the spec itself:
Large swathes of the web platform are built on streaming data: that is, data that is created, processed, and consumed in an incremental fashion, without ever reading all of it into memory. The Streams Standard provides a common set of APIs for creating and interfacing with such streaming data, embodied in readable streams, writable streams, and transform streams.
The spec is still evolving but has reached a fairly stable stage with a reference implementation as well. The API has almost been finalized and Streams are coming to the web very soon!
At it's core, the API exposes three major components:
ReadableStreamencapsulates a source producing values and emits them.TransformStreamare essentially{ readable, writable}pairs that take a function which can be used to transform the values flowing through it.WritableStreamencapsulates a sink that receives values and writes to it.
Streams are essentially data structures that handle sequential flow of values. You can split streams, merge them and connect them together in various ways. What's amazing is that, in most cases, they can handle backpressure automatically, so you don't have to mess with the underlying details.
For further information, the spec is quite informative and easy to read. Jake Archibald also wrote a great blog post on them.
Heads up: If you're coming from node land, web streams are quite a lot different from node streams and incompatible with each other.
The library depends on @pipes/core, so make sure you include it in before including the library.
You can use either of the builds from the dist folder:
<script src="path/to/web-streams-polyfill.js"></script>
<script src="path/to/pipes.convert.js"></script>And in your code, all the functions will be available on the window.Pipes.convert variable.
let { toNodeStream, fromIterable } = window.Pipes.convert;The library has a peer-dependency on @pipes/core, so to install it:
npm install @pipes/core @pipes/convertThe library is split up into modules, so you can both require the whole library or only parts of it:
let { fromIterable } = require("@pipes/convert");
let fromIterable = require("@pipes/convert/fromIterable");
let fromIterable = require("@pipes/convert/from/iterable");If you want, you can directly import the es6 modules like so:
import pipesConvert from "@pipes/convert/src";
import { fromIterable } from "@pipes/convert/src";
import fromIterable from "@pipes/convert/src/fromIterable";
import fromIterable from "@pipes/convert/src/from/iterable";The convert library consists of the following functions:
Set up code for examples
// Setup
let createReadable = data => new ReadableStream({
start (controller) {
this.data = data || [1,2,3];
// Kickstart stream
controller.enqueue( this.data.pop() );
},
pull (controller) {
if ( !this.data.length )
return controller.close()
controller.enqueue( this.data.pop() );
}
}),
createWritable = () => new WritableStream({
write (chunk) {
console.log( chunk );
}
});This function takes an iterable and returns a readable stream that queues the iterated values sequentially.
Parameters
iterableIterable<T>
Examples
let
input = [1,2,3,4,5],
// input = function* gen() { yield* input; },
// input = input.join("");
let writable, res=[];
// Create test streams
writable = createTestWritable( c => res.push( c ));
// Connect the streams
connect(
fromIterable( input ),
writable
); // res == inputReturns ReadableStream
This function takes any ReadableStream and returns an Observable
that emits chunks to subscribers when
they arrive.
Parameters
observableObservable
Examples
let input = [1,2,3],
output = [],
observable, writable;
// Create test streams
writable = createTestWritable( i => output.push( i ));
// Test the promise
return fromObservable( Observable.from( input ) )
.pipeTo( writable );Returns ReadableStream
This function takes any promise and returns a readable stream that queues the resolved value or errors on rejection.
Parameters
promisePromise<T>
Examples
let
input = 42;
promise = new Promise( resolve => resolve( input ) ),
writable;
// Create test streams
writable = createTestWritable( c => assert.equal( c, input ));
connect(
fromPromise( promise ),
writable
); // 42Returns ReadableStream
This function takes any ReadableStream and returns an Observable
that emits chunks to subscribers when
they arrive.
Parameters
streamReadableStream
Examples
let input = [1,2,3],
output = [],
readable;
// Create test streams
readable = createTestReadable( input );
// Test the promise
toObservable( readable )
.subscribe({
next (val) { output.push( val ); },
complete () {
assert.deepEqual( input, output );
}
});Returns Observable
This function takes any ReadableStream and returns a promise
that resolves with an Array of the stream's contents when
the stream closes.
Parameters
streamReadableStream
Examples
let input = [1,2,3],
output = [1,2,3],
readable;
// Create test streams
readable = createTestReadable( input );
// Test the promise
toPromise( readable )
.then( result => {
assert.deepEqual( result, output );
done();
});