diff --git a/mario/hyper_utils.py b/mario/hyper_utils.py index 3dc6ad6..9b24f47 100644 --- a/mario/hyper_utils.py +++ b/mario/hyper_utils.py @@ -352,6 +352,8 @@ def save_hyper_as_csv(hyper_file: str, file_path: str, **kwargs): if options.use_pantab: # Use pantab to stream hyper to csv + logging.info('Stream hyper to csv with pantab.') + mode = 'w' header = True @@ -368,28 +370,27 @@ def save_hyper_as_csv(hyper_file: str, file_path: str, **kwargs): mode = "a" else: # Use tableau hyper api to stream data to csv - with HyperProcess(telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU) as hyper: + logging.info('Stream hyper to csv with tableau hyper api.') + + with HyperProcess(Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU, 'test') as hyper: with Connection(endpoint=hyper.endpoint, database=temp_hyper) as connection: - # Use an iterator cursor for streaming - result = connection.execute_query(sql) with open_func(file_path, mode, newline='', encoding="utf-8") as f: + writer = csv.writer(f) # write header writer.writerow(columns) - buffer = [] - for row in result: - buffer.append(row) - if len(buffer) >= options.chunk_size: - writer.writerows(buffer) - buffer.clear() - offset += options.chunk_size - - # write remaining - if buffer: - writer.writerows(buffer) - offset += len(buffer) + while True: + query = f"{sql} LIMIT {options.chunk_size} OFFSET {offset}" + result = connection.execute_query(query) + + rows = list(result) + if not rows: + break + + writer.writerows(rows) + offset += options.chunk_size def save_dataframe_as_hyper(df, file_path, **kwargs):