@@ -3,8 +3,7 @@ import * as v8 from 'node:v8';
33import * as fs from 'node:fs' ;
44import * as os from 'node:os' ;
55import { Heap } from 'heap-js' ;
6- import { BehaviorSubject , lastValueFrom , of } from 'rxjs' ;
7- import { concatMap , delay , map , repeat , takeWhile } from 'rxjs/operators' ;
6+ import { Subject , firstValueFrom } from 'rxjs' ;
87import * as native from '@temporalio/core-bridge' ;
98import {
109 pollLogs ,
@@ -107,7 +106,7 @@ export function makeTelemetryFilterString(options: MakeTelemetryFilterStringOpti
107106}
108107
109108/** A logger that buffers logs from both Node.js and Rust Core and emits logs in the right order */
110- export class CoreLogger extends DefaultLogger {
109+ class BufferedLogger extends DefaultLogger {
111110 protected buffer = new Heap < LogEntry > ( ( a , b ) => Number ( a . timestampNanos - b . timestampNanos ) ) ;
112111
113112 constructor ( protected readonly next : Logger ) {
@@ -135,7 +134,7 @@ export class Runtime {
135134 protected pendingCreations = 0 ;
136135 /** Track the registered native objects to automatically shutdown when all have been deregistered */
137136 protected readonly backRefs = new Set < TrackedNativeObject > ( ) ;
138- protected readonly shouldPollForLogs = new BehaviorSubject < boolean > ( false ) ;
137+ protected readonly stopPollingForLogs = new Subject < void > ( ) ;
139138 protected readonly logPollPromise : Promise < void > ;
140139 public readonly logger : Logger ;
141140 protected readonly shutdownSignalCallbacks = new Set < ( ) => void > ( ) ;
@@ -152,7 +151,7 @@ export class Runtime {
152151
153152 protected constructor ( public readonly native : native . Runtime , public readonly options : CompiledRuntimeOptions ) {
154153 if ( this . isForwardingLogs ( ) ) {
155- const logger = ( this . logger = new CoreLogger ( this . options . logger ) ) ;
154+ const logger = ( this . logger = new BufferedLogger ( this . options . logger ) ) ;
156155 this . logPollPromise = this . initLogPolling ( logger ) ;
157156 } else {
158157 this . logger = this . options . logger ;
@@ -265,31 +264,37 @@ export class Runtime {
265264 } ;
266265 }
267266
268- protected async initLogPolling ( logger : CoreLogger ) : Promise < void > {
269- this . shouldPollForLogs . next ( true ) ;
270-
267+ protected async initLogPolling ( logger : BufferedLogger ) : Promise < void > {
271268 if ( ! this . isForwardingLogs ( ) ) {
272269 return ;
273270 }
271+
272+ const stopPollingForLogs = firstValueFrom ( this . stopPollingForLogs ) ;
273+
274274 const poll = promisify ( pollLogs ) ;
275+ const doPoll = async ( ) => {
276+ const logs = await poll ( this . native ) ;
277+ for ( const log of logs ) {
278+ const meta : Record < string | symbol , unknown > = {
279+ [ LogTimestamp ] : timeOfDayToBigint ( log . timestamp ) ,
280+ } ;
281+ logger . log ( log . level , log . message , meta ) ;
282+ }
283+ } ;
284+
275285 try {
276- await lastValueFrom (
277- of ( this . shouldPollForLogs ) . pipe (
278- map ( ( subject ) => subject . getValue ( ) ) ,
279- takeWhile ( ( shouldPoll ) => shouldPoll ) ,
280- concatMap ( ( ) => poll ( this . native ) ) ,
281- map ( ( logs ) => {
282- for ( const log of logs ) {
283- logger . log ( log . level , log . message , {
284- [ LogTimestamp ] : timeOfDayToBigint ( log . timestamp ) ,
285- } ) ;
286- }
287- logger . flush ( ) ;
288- } ) ,
289- delay ( 3 ) , // Don't go wild polling as fast as possible
290- repeat ( )
291- )
292- ) ;
286+ for ( ; ; ) {
287+ await doPoll ( ) ;
288+ logger . flush ( ) ;
289+ const stop = await Promise . race ( [
290+ stopPollingForLogs . then ( ( ) => true ) ,
291+ new Promise < boolean > ( ( resolve ) => setTimeout ( ( ) => resolve ( false ) , 3 ) ) ,
292+ ] ) ;
293+ if ( stop ) {
294+ await doPoll ( ) ;
295+ break ;
296+ }
297+ }
293298 } catch ( error ) {
294299 // Log using the original logger instead of buffering
295300 this . options . logger . warn ( 'Error gathering forwarded logs from core' , { error } ) ;
@@ -311,7 +316,7 @@ export class Runtime {
311316 */
312317 flushLogs ( ) : void {
313318 if ( this . isForwardingLogs ( ) ) {
314- const logger = this . logger as CoreLogger ;
319+ const logger = this . logger as BufferedLogger ;
315320 logger . flush ( ) ;
316321 }
317322 }
@@ -474,7 +479,7 @@ export class Runtime {
474479 public async shutdown ( ) : Promise < void > {
475480 delete Runtime . _instance ;
476481 this . teardownShutdownHook ( ) ;
477- this . shouldPollForLogs . next ( false ) ;
482+ this . stopPollingForLogs . next ( ) ;
478483 // This will effectively drain all logs
479484 await this . logPollPromise ;
480485 await promisify ( runtimeShutdown ) ( this . native ) ;
0 commit comments