Skip to content

Commit

Permalink
feat: transformMapStatsSummary, onDone is awaited
Browse files Browse the repository at this point in the history
  • Loading branch information
kirillgroshkov committed Apr 18, 2024
1 parent 2707a49 commit 25fff9e
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 66 deletions.
6 changes: 5 additions & 1 deletion src/stream/progressLogger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,11 @@ export class ProgressLogger<T> implements Disposable {
)} rows with total RPS of ${yellow(rpsTotal)}`,
)

this.cfg.onProgressDone?.(o)
try {
this.cfg.onProgressDone?.(o)
} catch (err) {
logger.error(err)
}
}
}
}
Expand Down
28 changes: 28 additions & 0 deletions src/stream/transform/transformMap.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
_pipelineToArray,
transformMap,
TransformMapStats,
transformMapStatsSummary,
} from '../../index'

beforeAll(() => {
Expand Down Expand Up @@ -163,6 +164,33 @@ test('transformMap errorMode=THROW_AGGREGATED', async () => {
"ok": false,
"started": 1529539200000,
}
`)

expect(transformMapStatsSummary(stats!)).toMatchInlineSnapshot(`
"### Transform summary
0 ms spent
4 / 3 row(s) in / out
1 error(s)"
`)

expect(
transformMapStatsSummary({
...stats!,
name: 'MyCustomJob',
extra: {
key1: 'value1',
n1: 145,
},
}),
).toMatchInlineSnapshot(`
"### MyCustomJob summary
0 ms spent
4 / 3 row(s) in / out
1 error(s)
key1: value1
n1: 145"
`)
})

Expand Down
113 changes: 72 additions & 41 deletions src/stream/transform/transformMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@ import {
_anyToError,
_hc,
_since,
_stringify,
AbortableAsyncMapper,
AsyncPredicate,
CommonLogger,
END,
ErrorMode,
pFilter,
Promisable,
SKIP,
StringMap,
UnixTimestampMillisNumber,
} from '@naturalcycles/js-lib'
import through2Concurrent = require('through2-concurrent')
import { yellow } from '../../colors/colors'
import { appendToGithubSummary } from '../../fs/json2env'
import { AbortableTransform } from '../pipeline/pipeline'
import { TransformTyped } from '../stream.model'
import { pipelineClose } from '../stream.util'
Expand Down Expand Up @@ -62,9 +63,9 @@ export interface TransformMapOptions<IN = any, OUT = IN> {
* Callback is called **before** [possible] Aggregated error is thrown,
* and before [possible] THROW_IMMEDIATELY error.
*
* onDone callback will be called before Error is thrown.
* onDone callback will be awaited before Error is thrown.
*/
onDone?: (stats: TransformMapStats) => any
onDone?: (stats: TransformMapStats) => Promisable<any>

/**
* Progress metric
Expand All @@ -91,6 +92,20 @@ export interface TransformMapStats {
started: UnixTimestampMillisNumber
}

export interface TransformMapStatsSummary extends TransformMapStats {
/**
* Name of the summary, defaults to `Transform`
*/
name?: string

/**
* Allows to pass extra key-value object, which will be rendered as:
* key: value
* key2: value2
*/
extra?: StringMap<any>
}

// doesn't work, cause here we don't construct our Transform instance ourselves
// export class TransformMap extends AbortableTransform {}

Expand Down Expand Up @@ -137,14 +152,18 @@ export function transformMap<IN = any, OUT = IN>(
logErrorStats(true)

if (collectedErrors.length) {
onDone?.({
ok: false,
collectedErrors,
countErrors: errors,
countIn: index + 1,
countOut,
started,
})
try {
await onDone?.({
ok: false,
collectedErrors,
countErrors: errors,
countIn: index + 1,
countOut,
started,
})
} catch (err) {
logger.error(err)
}

// emit Aggregated error
cb(
Expand All @@ -156,14 +175,18 @@ export function transformMap<IN = any, OUT = IN>(
} else {
// emit no error

onDone?.({
ok: true,
collectedErrors,
countErrors: errors,
countIn: index + 1,
countOut,
started,
})
try {
await onDone?.({
ok: true,
collectedErrors,
countErrors: errors,
countIn: index + 1,
countOut,
started,
})
} catch (err) {
logger.error(err)
}

cb()
}
Expand Down Expand Up @@ -210,14 +233,20 @@ export function transformMap<IN = any, OUT = IN>(

if (errorMode === ErrorMode.THROW_IMMEDIATELY) {
isSettled = true
onDone?.({
ok: false,
collectedErrors,
countErrors: errors,
countIn: index + 1,
countOut,
started,
})

try {
await onDone?.({
ok: false,
collectedErrors,
countErrors: errors,
countIn: index + 1,
countOut,
started,
})
} catch (err) {
logger.error(err)
}

return cb(err) // Emit error immediately
}

Expand All @@ -237,18 +266,20 @@ export function transformMap<IN = any, OUT = IN>(
}
}

export function appendTransformMapStatsToGithubSummary(
stats: TransformMapStats & { name?: string; extra?: StringMap<any> },
): void {
const { countIn, countOut, countErrors, started, name = 'Transform', extra = {} } = stats

appendToGithubSummary(
...[
`### ${name} summary\n`,
`${_since(started)} spent`,
`${_hc(countIn)} / ${_hc(countOut)} rows in / out`,
countErrors ? `${countErrors} errors` : '',
...Object.entries(extra).map(([k, v]) => `${k}: ${v}`),
].filter(Boolean),
)
/**
* Renders TransformMapStatsSummary into a friendly string,
* to be used e.g in Github Actions summary or Slack.
*/
export function transformMapStatsSummary(summary: TransformMapStatsSummary): string {
const { countIn, countOut, countErrors, started, name = 'Transform', extra = {} } = summary

return [
`### ${name} summary\n`,
`${_since(started)} spent`,
`${_hc(countIn)} / ${_hc(countOut)} row(s) in / out`,
countErrors ? `${countErrors} error(s)` : '',
...Object.entries(extra).map(([k, v]) => `${k}: ${_stringify(v)}`),
]
.filter(Boolean)
.join('\n')
}
62 changes: 38 additions & 24 deletions src/stream/transform/transformMapSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,20 @@ export function transformMapSync<IN = any, OUT = IN>(

if (errorMode === ErrorMode.THROW_IMMEDIATELY) {
isSettled = true
onDone?.({
ok: false,
collectedErrors,
countErrors: errors,
countIn: index + 1,
countOut,
started,
})

try {
onDone?.({
ok: false,
collectedErrors,
countErrors: errors,
countIn: index + 1,
countOut,
started,
})
} catch (err) {
logger.error(err)
}

// Emit error immediately
return cb(err as Error)
}
Expand All @@ -164,14 +170,18 @@ export function transformMapSync<IN = any, OUT = IN>(
logErrorStats(true)

if (collectedErrors.length) {
onDone?.({
ok: false,
collectedErrors,
countErrors: errors,
countIn: index + 1,
countOut,
started,
})
try {
onDone?.({
ok: false,
collectedErrors,
countErrors: errors,
countIn: index + 1,
countOut,
started,
})
} catch (err) {
logger.error(err)
}

// emit Aggregated error
cb(
Expand All @@ -183,14 +193,18 @@ export function transformMapSync<IN = any, OUT = IN>(
} else {
// emit no error

onDone?.({
ok: true,
collectedErrors,
countErrors: errors,
countIn: index + 1,
countOut,
started,
})
try {
onDone?.({
ok: true,
collectedErrors,
countErrors: errors,
countIn: index + 1,
countOut,
started,
})
} catch (err) {
logger.error(err)
}

cb()
}
Expand Down

0 comments on commit 25fff9e

Please sign in to comment.