@@ -27,9 +27,9 @@ use tokio::fs::File;
2727use tokio:: io:: { AsyncReadExt , AsyncWriteExt } ;
2828use tokio_stream:: Stream ;
2929
30- use databend_client:: APIClient ;
3130use databend_client:: PresignedResponse ;
3231use databend_client:: QueryResponse ;
32+ use databend_client:: { APIClient , SchemaField } ;
3333use databend_driver_core:: error:: { Error , Result } ;
3434use databend_driver_core:: rows:: { Row , RowIterator , RowStatsIterator , RowWithStats , ServerStats } ;
3535use databend_driver_core:: schema:: { Schema , SchemaRef } ;
@@ -82,7 +82,7 @@ impl Connection for RestAPIConnection {
8282 async fn query_iter_ext ( & self , sql : & str ) -> Result < RowStatsIterator > {
8383 info ! ( "query iter ext: {}" , sql) ;
8484 let resp = self . client . start_query ( sql) . await ?;
85- let resp = self . wait_for_schema ( resp) . await ?;
85+ let resp = self . wait_for_schema ( resp, true ) . await ?;
8686 let ( schema, rows) = RestAPIRows :: from_response ( self . client . clone ( ) , resp) ?;
8787 Ok ( RowStatsIterator :: new ( Arc :: new ( schema) , Box :: pin ( rows) ) )
8888 }
@@ -209,8 +209,14 @@ impl<'o> RestAPIConnection {
209209 } )
210210 }
211211
212- async fn wait_for_schema ( & self , resp : QueryResponse ) -> Result < QueryResponse > {
213- if !resp. data . is_empty ( ) || !resp. schema . is_empty ( ) || resp. stats . progresses . has_progress ( )
212+ async fn wait_for_schema (
213+ & self ,
214+ resp : QueryResponse ,
215+ return_on_progress : bool ,
216+ ) -> Result < QueryResponse > {
217+ if !resp. data . is_empty ( )
218+ || !resp. schema . is_empty ( )
219+ || ( return_on_progress && resp. stats . progresses . has_progress ( ) )
214220 {
215221 return Ok ( resp) ;
216222 }
@@ -228,7 +234,7 @@ impl<'o> RestAPIConnection {
228234
229235 if !result. data . is_empty ( )
230236 || !result. schema . is_empty ( )
231- || result. stats . progresses . has_progress ( )
237+ || ( return_on_progress && result. stats . progresses . has_progress ( ) )
232238 {
233239 break ;
234240 }
@@ -250,6 +256,12 @@ impl<'o> RestAPIConnection {
250256 fn default_copy_options ( ) -> BTreeMap < & ' o str , & ' o str > {
251257 vec ! [ ( "purge" , "true" ) ] . into_iter ( ) . collect ( )
252258 }
259+
260+ pub async fn query_row_batch ( & self , sql : & str ) -> Result < RowBatch > {
261+ let resp = self . client . start_query ( sql) . await ?;
262+ let resp = self . wait_for_schema ( resp, false ) . await ?;
263+ Ok ( RowBatch :: from_response ( self . client . clone ( ) , resp) ?)
264+ }
253265}
254266
255267type PageFut = Pin < Box < dyn Future < Output = Result < QueryResponse > > + Send > > ;
@@ -341,3 +353,48 @@ impl Stream for RestAPIRows {
341353 }
342354 }
343355}
356+
357+ pub struct RowBatch {
358+ schema : Vec < SchemaField > ,
359+ client : Arc < APIClient > ,
360+ query_id : String ,
361+ node_id : Option < String > ,
362+
363+ next_uri : Option < String > ,
364+ data : Vec < Vec < Option < String > > > ,
365+ }
366+
367+ impl RowBatch {
368+ pub fn schema ( & self ) -> Vec < SchemaField > {
369+ self . schema . clone ( )
370+ }
371+
372+ fn from_response ( client : Arc < APIClient > , mut resp : QueryResponse ) -> Result < Self > {
373+ Ok ( Self {
374+ schema : std:: mem:: take ( & mut resp. schema ) ,
375+ client,
376+ query_id : resp. id ,
377+ node_id : resp. node_id ,
378+ next_uri : resp. next_uri ,
379+ data : resp. data ,
380+ } )
381+ }
382+
383+ pub async fn fetch_next_page ( & mut self ) -> Result < Vec < Vec < Option < String > > > > {
384+ if !self . data . is_empty ( ) {
385+ return Ok ( std:: mem:: take ( & mut self . data ) ) ;
386+ }
387+ while let Some ( next_uri) = & self . next_uri {
388+ let resp = self
389+ . client
390+ . query_page ( & self . query_id , & next_uri, & self . node_id )
391+ . await ?;
392+
393+ self . next_uri = resp. next_uri ;
394+ if !resp. data . is_empty ( ) {
395+ return Ok ( resp. data ) ;
396+ }
397+ }
398+ Ok ( vec ! [ ] )
399+ }
400+ }
0 commit comments