diff --git a/index.js b/index.js index 2ab60d1..a2980ac 100644 --- a/index.js +++ b/index.js @@ -1,4 +1,4 @@ -const { Transform } = require('stream'); +const { Transform } = require("stream"); /** * The ReadlineTransform is reading String or Buffer content from a Readable stream @@ -8,64 +8,37 @@ const { Transform } = require('stream'); * @param {Boolean} opts.ignoreEndOfBreak - if content ends with line break, ignore last empty line (default: true) * @param {Boolean} opts.skipEmpty - if line is empty string, skip it (default: false) */ +const NEWLINE = 0x0a; class ReadlineTransform extends Transform { - constructor(options) { - const opts = options || {}; - opts.objectMode = true; - super(opts); - this._brRe = opts.breakMatcher || /\r?\n/; - this._ignoreEndOfBreak = 'ignoreEndOfBreak' in opts ? Boolean(opts.ignoreEndOfBreak) : true; - this._skipEmpty = Boolean(opts.skipEmpty); - this._buf = null; + constructor() { + super(); + this._buf = Buffer.alloc(1024); + this.wptr = 0; } - _transform(chunk, encoding, cb) { - let str; - if (Buffer.isBuffer(chunk) || encoding === 'buffer') { - str = chunk.toString('utf8'); - } else { - str = chunk; + _transform(chunk, enc, cb) { + let lb; + while ((lb = chunk.indexOf(NEWLINE)) >= 0 && chunk.length) { + this._emitData(chunk.slice(0, lb + 1)); + chunk = chunk.slice(lb + 1); } - - try { - if (this._buf !== null) { - this._buf += str; - } else { - this._buf = str; - } - - const lines = this._buf.split(this._brRe); - const lastIndex = lines.length - 1; - for (let i = 0; i < lastIndex; i++) { - this._writeItem(lines[i]); - } - - const lastLine = lines[lastIndex]; - if (lastLine.length) { - this._buf = lastLine; - } else if (!this._ignoreEndOfBreak) { - this._buf = ''; - } else { - this._buf = null; - } - cb(); - } catch(err) { - cb(err); // invalid data type; + if (chunk.length) { + chunk.copy(this._buf, this.wptr, 0, chunk.length); + this.wptr += chunk.length; } + cb(null, null); } - _flush(cb) { - if (this._buf !== null) { - this._writeItem(this._buf); - this._buf = null; + _emitData(slice) { + if (this.wptr) { + this.emit("data", Buffer.concat([this._buf.slice(0, this.wptr), slice])); + this.wptr = 0; + } else { + this.emit("data", slice); } - cb(); } - - _writeItem(line) { - if (line.length > 0 || !this._skipEmpty) { - this.push(line); - } + _flush(cb) { + cb(this._buf.slice(0, this.wptr)); } } diff --git a/test/index.test.js b/test/index.test.js index cecfbfe..7d8213f 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -1,139 +1,45 @@ -const assert = require('assert'); -const { PassThrough } = require('stream'); -const ReadlineTransform = require('../'); -const MemoryWriteStream = require('./memory_write_stream'); - -describe('ReadlineTransform', () => { - - context('data ends without line break', () => { - it('transforms all lines', (done) => { - const readStream = new PassThrough(); - const transform = new ReadlineTransform(); - const writeStream = new MemoryWriteStream(); - - writeStream.on('finish', () => { - assert.deepEqual(writeStream.data, ['foo', 'bar', 'baz']); - done(); - }); - - readStream.pipe(transform).pipe(writeStream); - - readStream.write(Buffer.from('foo\nba')); - readStream.write('r\r'); - readStream.end(Buffer.from('\nbaz')); - }); - - context('data contains empty lines and skipEmpty option is true', () => { - it('transforms with dropping empty lines', (done) => { - const readStream = new PassThrough(); - const transform = new ReadlineTransform({ skipEmpty: true }); - const writeStream = new MemoryWriteStream(); - - writeStream.on('finish', () => { - assert.deepEqual(writeStream.data, ['foo', 'bar', 'baz']); - done(); - }); - - readStream.pipe(transform).pipe(writeStream); - - readStream.write('foo\nba'); - readStream.write(Buffer.from('r\r\n\n\r')); - readStream.end(Buffer.from('\nbaz')); - }); - }) - }) - - context('data ends with line break', () => { - it('transforms all lines except last empty line', (done) => { - const readStream = new PassThrough(); - const transform = new ReadlineTransform(); - const writeStream = new MemoryWriteStream(); - - writeStream.on('finish', () => { - assert.deepEqual(writeStream.data, ['foo', 'bar', '', 'baz']); - done(); - }); - - readStream.pipe(transform).pipe(writeStream); - - readStream.write(Buffer.from('foo\r\nbar\n')); - readStream.end('\r\nbaz\r\n'); +const assert = require("assert"); +const { PassThrough } = require("stream"); +const ReadlineTransform = require("../"); +const MemoryWriteStream = require("./memory_write_stream"); + +// let describe = (str, cb) => console.log(str) && cb(); +// let it = (str, cb) => console.log(str) && cb(); + +describe("ReadlineTransform", () => { + it("should work", () => { + const { Transform, Stream } = require("stream"); + let t = new Stream.PassThrough(); + const g = new ReadlineTransform(); + const quote = Buffer.from('"'); + + const inputFormatter = new Transform({ + transform: (buf, enc, cb) => { + cb( + null, + Buffer.from('\ninput: "' + buf.toString().replace("\n", "\\n") + '"') + ); + }, }); - - context('ignoreEndOfBreak is false', () => { - it('transforms all lines', (done) => { - const readStream = new PassThrough(); - const transform = new ReadlineTransform({ ignoreEndOfBreak: false }); - const writeStream = new MemoryWriteStream(); - - writeStream.on('finish', () => { - assert.deepEqual(writeStream.data, ['foo', 'bar', '', 'baz', '']); - done(); - }); - - readStream.pipe(transform).pipe(writeStream); - - readStream.write(Buffer.from('foo\r\nbar\n')); - readStream.end('\r\nbaz\r\n'); - }); - }) - - context('skipEmpty option is true', () => { - it('transforms with dropping empty lines', (done) => { - const readStream = new PassThrough(); - const transform = new ReadlineTransform({ skipEmpty: true }); - const writeStream = new MemoryWriteStream(); - - writeStream.on('finish', () => { - assert.deepEqual(writeStream.data, ['foo', 'bar', 'baz']); - done(); - }); - - readStream.pipe(transform).pipe(writeStream); - - readStream.write('foo\r\nbar\n'); - readStream.end(Buffer.from('\r\nbaz\r\n')); - }); - }) - - context('ignoreEndOfBreak is false and skipEmpty option is true', () => { - it('works with dropping all empty lines', (done) => { - const readStream = new PassThrough(); - const transform = new ReadlineTransform({ ignoreEndOfBreak: false, skipEmpty: true }); - const writeStream = new MemoryWriteStream(); - - writeStream.on('finish', () => { - assert.deepEqual(writeStream.data, ['foo', ' ', 'bar']); - done(); - }); - - readStream.pipe(transform).pipe(writeStream); - - readStream.write(Buffer.from('foo\n \n')); - readStream.write('\n\n'); - readStream.write(Buffer.from('bar\n')); - readStream.end(); - }); - }) - - }) - - context('line break is special', () => { - it('transforms with dropping last empty line', (done) => { - const readStream = new PassThrough(); - const transform = new ReadlineTransform({ breakMatcher: '_\n' }); - const writeStream = new MemoryWriteStream(); - - writeStream.on('finish', () => { - assert.deepEqual(writeStream.data, ['', 'foo', 'bar', 'baz', '']); - done(); - }); - - readStream.pipe(transform).pipe(writeStream); - - readStream.write(Buffer.from('_\nfoo_\nbar_\nbaz_\n_\n')); - readStream.end(); + const outputFormatter = new Transform({ + transform: (buf, enc, cb) => { + cb( + null, + Buffer.from('\nOutput: "' + buf.toString().replace("\n", "\\n") + '"') + ); + }, }); - }) -}) + t.pipe(inputFormatter).pipe(process.stdout); + t.pipe(g).pipe(outputFormatter).pipe(process.stdout); + t.write("line1 hello!\n"); + console.log("buf:" + g._buf); + t.write("line2 \n line3 yadayada"); + console.log("buf:" + g._buf); + + t.write("yada \n line4"); + console.log("buf:" + g._buf); + console.log("flushing"); + g._flush(console.log); + }); +});