Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions mario/hyper_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ def save_hyper_as_csv(hyper_file: str, file_path: str, **kwargs):

if options.use_pantab:
# Use pantab to stream hyper to csv
logging.info('Stream hyper to csv with pantab.')

mode = 'w'
header = True

Expand All @@ -368,28 +370,27 @@ def save_hyper_as_csv(hyper_file: str, file_path: str, **kwargs):
mode = "a"
else:
# Use tableau hyper api to stream data to csv
with HyperProcess(telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU) as hyper:
logging.info('Stream hyper to csv with tableau hyper api.')

with HyperProcess(Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU, 'test') as hyper:
with Connection(endpoint=hyper.endpoint, database=temp_hyper) as connection:
# Use an iterator cursor for streaming
result = connection.execute_query(sql)

with open_func(file_path, mode, newline='', encoding="utf-8") as f:

writer = csv.writer(f)
# write header
writer.writerow(columns)

buffer = []
for row in result:
buffer.append(row)
if len(buffer) >= options.chunk_size:
writer.writerows(buffer)
buffer.clear()
offset += options.chunk_size

# write remaining
if buffer:
writer.writerows(buffer)
offset += len(buffer)
while True:
query = f"{sql} LIMIT {options.chunk_size} OFFSET {offset}"
result = connection.execute_query(query)

rows = list(result)
if not rows:
break

writer.writerows(rows)
offset += options.chunk_size


def save_dataframe_as_hyper(df, file_path, **kwargs):
Expand Down