@@ -284,65 +284,80 @@ private async Task<bool> Cancel(QueryCancellationReason reason = QueryCancellati
284284 /// </summary>
285285 internal async Task < ResponseQueueStatement > Advance ( )
286286 {
287- if ( this . Statement . nextUri . Contains ( "/executing" ) )
287+ try
288288 {
289- if ( this . Statement . nextUri . Contains ( "? " ) )
289+ if ( this . Statement . nextUri . Contains ( "/executing " ) )
290290 {
291- this . Statement . nextUri += $ "&targetResultSize={ Constants . MaxTargetResultSizeMB } MB";
291+ if ( this . Statement . nextUri . Contains ( "?" ) )
292+ {
293+ this . Statement . nextUri += $ "&targetResultSize={ Constants . MaxTargetResultSizeMB } MB";
294+ }
295+ else
296+ {
297+ this . Statement . nextUri += $ "?targetResultSize={ Constants . MaxTargetResultSizeMB } MB";
298+ }
292299 }
293- else
300+
301+ logger ? . LogDebug ( "Trino: request: {1}" , this . Statement . nextUri ) ;
302+
303+ string responseStr = await this . GetAsync ( new Uri ( this . Statement . nextUri ) , OK ) . ConfigureAwait ( false ) ;
304+ logger ? . LogDebug ( "Trino: response: {1}" , responseStr ) ;
305+ QueryResultPage response = JsonConvert . DeserializeObject < QueryResultPage > ( responseStr ) ;
306+ logger ? . LogDebug ( "Trino: response at {0} msec with state {1}" , stopwatch . ElapsedMilliseconds ,
307+ response . stats . state ) ;
308+
309+ // Note, the size is estimated based on the response string size which is not the actual deserialized size.
310+ ResponseQueueStatement responseQueueItem = new ResponseQueueStatement ( response , responseStr . Length ) ;
311+ if ( responseQueueItem . Response . error != null )
294312 {
295- this . Statement . nextUri += $ "?targetResultSize={ Constants . MaxTargetResultSizeMB } MB";
313+ State . StateTransition ( TrinoQueryStates . CLIENT_ERROR , TrinoQueryStates . RUNNING ) ;
314+ throw new TrinoException ( responseQueueItem . Response . error . message ,
315+ responseQueueItem . Response . error ) ;
296316 }
297- }
298- logger ? . LogDebug ( "Trino: request: {1}" , this . Statement . nextUri ) ;
299317
300- string responseStr = await this . GetAsync ( new Uri ( this . Statement . nextUri ) , OK ) . ConfigureAwait ( false ) ;
301- logger ? . LogDebug ( "Trino: response: {1}" , responseStr ) ;
302- QueryResultPage response = JsonConvert . DeserializeObject < QueryResultPage > ( responseStr ) ;
303- logger ? . LogDebug ( "Trino: response at {0} msec with state {1}" , stopwatch . ElapsedMilliseconds , response . stats . state ) ;
318+ // Make status available
319+ this . Statement = responseQueueItem . Response ;
304320
305- // Note, the size is estimated based on the response string size which is not the actual deserialized size.
306- ResponseQueueStatement responseQueueItem = new ResponseQueueStatement ( response , responseStr . Length ) ;
307- if ( responseQueueItem . Response . error != null )
308- {
309- State . StateTransition ( TrinoQueryStates . CLIENT_ERROR , TrinoQueryStates . RUNNING ) ;
310- throw new TrinoException ( responseQueueItem . Response . error . message , responseQueueItem . Response . error ) ;
311- }
312-
313- if ( cancellationToken . IsCancellationRequested )
314- {
315- await this . Cancel ( QueryCancellationReason . USER_CANCEL ) . ConfigureAwait ( false ) ;
316- throw new OperationCanceledException ( "Cancellation requested" ) ;
317- }
321+ // If no next URI, the query is completed.
322+ if ( this . Statement . IsLastPage )
323+ {
324+ this . Finish ( ) ;
325+ }
326+ else if ( this . IsTimeout )
327+ {
328+ logger ? . LogInformation ( "Trino: Query timed out queryId:{0}, run time: {1} s, timeout {2} s." ,
329+ Statement ? . id , this . stopwatch . Elapsed . TotalSeconds ,
330+ Session . Properties . ClientRequestTimeout . Value . TotalSeconds ) ;
331+ await this . Cancel ( QueryCancellationReason . TIMEOUT ) . ConfigureAwait ( false ) ;
332+ throw new TimeoutException (
333+ $ "Trino query ran for { this . stopwatch . Elapsed . TotalSeconds } s, exceeding the timeout of { Session . Properties . Timeout . Value . TotalSeconds } s.") ;
334+ }
318335
319- // Make status available
320- this . Statement = responseQueueItem . Response ;
336+ // Do not wait if the query had data - the next page may be ready immediately.
337+ if ( ! responseQueueItem . Response . HasData && ! State . IsFinished && readCount > 4 )
338+ {
339+ logger ? . LogDebug ( "Trino: No data yet, backoff wait queryId:{0}, delay {1} msec" , Statement ? . id ,
340+ readDelay ) ;
341+ await Task . Delay ( ( int ) readDelay ) . ConfigureAwait ( false ) ;
342+ if ( readDelay < MAX_READ_DELAY_MSEC )
343+ {
344+ readDelay *= BACKOFF_AMOUNT ;
345+ }
346+ }
321347
322- // If no next URI, the query is completed.
323- if ( this . Statement . IsLastPage )
324- {
325- this . Finish ( ) ;
348+ readCount ++ ;
349+ return responseQueueItem ;
326350 }
327- else if ( this . IsTimeout )
351+ catch ( Exception )
328352 {
329- logger ? . LogInformation ( "Trino: Query timed out queryId:{0}, run time: {1} s, timeout {2} s." , Statement ? . id , this . stopwatch . Elapsed . TotalSeconds , Session . Properties . ClientRequestTimeout . Value . TotalSeconds ) ;
330- await this . Cancel ( QueryCancellationReason . TIMEOUT ) . ConfigureAwait ( false ) ;
331- throw new TimeoutException ( $ "Trino query ran for { this . stopwatch . Elapsed . TotalSeconds } s, exceeding the timeout of { Session . Properties . Timeout . Value . TotalSeconds } s.") ;
332- }
333-
334- // Do not wait if the query had data - the next page may be ready immediately.
335- if ( ! responseQueueItem . Response . HasData && ! State . IsFinished && readCount > 4 )
336- {
337- logger ? . LogDebug ( "Trino: No data yet, backoff wait queryId:{0}, delay {1} msec" , Statement ? . id , readDelay ) ;
338- await Task . Delay ( ( int ) readDelay ) . ConfigureAwait ( false ) ;
339- if ( readDelay < MAX_READ_DELAY_MSEC )
353+ if ( cancellationToken . IsCancellationRequested )
340354 {
341- readDelay *= BACKOFF_AMOUNT ;
355+ await this . Cancel ( QueryCancellationReason . USER_CANCEL ) . ConfigureAwait ( false ) ;
356+ throw new OperationCanceledException ( "Cancellation requested" ) ;
342357 }
358+
359+ throw ;
343360 }
344- readCount ++ ;
345- return responseQueueItem ;
346361 }
347362
348363 /// <summary>
0 commit comments