Skip to content

Commit

Permalink
fix: createWriteStreamAsNDJSON returns an array of transforms
Browse files Browse the repository at this point in the history
which actually ensures the file is fully written before _pipeline is resolved
  • Loading branch information
kirillgroshkov committed Apr 28, 2024
1 parent bb0c4d2 commit 36355b8
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 18 deletions.
2 changes: 1 addition & 1 deletion scripts/ndjsonParseSpeed.script.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ runScript(async () => {
}),
transformLogProgress({ logEvery: 1000, extra: () => ({ keys }) }),
// writableVoid(),
fs2.createWriteStreamAsNDJSON(outputFilePath),
...fs2.createWriteStreamAsNDJSON(outputFilePath),
])
})
29 changes: 13 additions & 16 deletions src/fs/fs2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import fs from 'node:fs'
import fsp from 'node:fs/promises'
import path from 'node:path'
import { createGzip, createUnzip } from 'node:zlib'
import { _jsonParse } from '@naturalcycles/js-lib'
import { _isTruthy, _jsonParse } from '@naturalcycles/js-lib'
import yaml, { DumpOptions } from 'js-yaml'
import { transformToNDJson } from '../stream/ndjson/transformToNDJson'
import { ReadableTyped, WritableTyped } from '../stream/stream.model'
import { ReadableTyped, TransformTyped } from '../stream/stream.model'
import { transformSplitOnNewline } from '../stream/transform/transformSplit'
import { requireFileToExist } from '../util/env.util'

Expand Down Expand Up @@ -353,31 +353,28 @@ class FS2 {
}

/*
Returns a Writable.
Returns an array of Transforms, so that you can ...destructure them at
the end of the _pipeline.
Replaces a list of operations:
- transformToNDJson
- createGzip (only if path ends with '.gz')
- fs.createWriteStream
*/
createWriteStreamAsNDJSON(outputPath: string): WritableTyped<any> {
createWriteStreamAsNDJSON(outputPath: string): TransformTyped<any, any>[] {
this.ensureFile(outputPath)

const transform1 = transformToNDJson()
let transform = transform1
if (outputPath.endsWith('.gz')) {
transform = transform.pipe(
createGzip({
// chunkSize: 64 * 1024, // no observed speedup
}),
)
}
transform.pipe(
return [
transformToNDJson(),
outputPath.endsWith('.gz')
? createGzip({
// chunkSize: 64 * 1024, // no observed speedup
})
: undefined,
fs.createWriteStream(outputPath, {
// highWaterMark: 64 * 1024, // no observed speedup
}),
)
return transform1
].filter(_isTruthy) as TransformTyped<any, any>[]
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/stream/ndjson/ndjsonMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,6 @@ export async function ndjsonMap<IN = any, OUT = any>(
}),
transformLimit({ limit: limitOutput, sourceReadable: readable }),
transformLogProgress({ metric: 'saved', logEvery: logEveryOutput }),
fs2.createWriteStreamAsNDJSON(outputFilePath),
...fs2.createWriteStreamAsNDJSON(outputFilePath),
])
}

0 comments on commit 36355b8

Please sign in to comment.