Skip to content

Commit

Permalink
improve athena query progress report
Browse files Browse the repository at this point in the history
  • Loading branch information
vieiralucas committed Nov 29, 2024
1 parent 8dbbac8 commit 5b142b4
Showing 1 changed file with 28 additions and 1 deletion.
29 changes: 28 additions & 1 deletion apps/api/src/python/query/athena.ts
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ def briefer_make_athena_query():
print(json.dumps(result, ensure_ascii=False, default=str))
return
statistics = athena_client.get_query_runtime_statistics(QueryExecutionId=query_id)
total_rows = statistics.get("QueryRuntimeStatistics", {}).get("Rows", {}).get("OutputRows", None)
data = athena_client.get_query_results(QueryExecutionId=query_id)
if not os.path.exists(flag_file_path):
result = {
Expand Down Expand Up @@ -302,7 +305,31 @@ def briefer_make_athena_query():
else:
output_location= f"{s3_dir}{query_id}.csv"
print(json.dumps({"type": "log", "message": f"Downloading s3://{s3_bucket}/{output_location}"}))
s3.download_file(s3_bucket, output_location, f"{tmpdir}/{query_id}.csv")
output_file_size = s3.head_object(Bucket=s3_bucket, Key=output_location)["ContentLength"]
last_emitted_at = 0
total_bytes_transferred = 0
def callback(bytes_transferred):
nonlocal last_emitted_at, total_bytes_transferred
if not total_rows:
return
total_bytes_transferred += bytes_transferred
estimated_rows = int(total_rows * total_bytes_transferred / output_file_size)
result["count"] = estimated_rows
now = time.time()
if now - last_emitted_at > 1:
last_emitted_at = now
print(json.dumps(result, ensure_ascii=False, default=str))
s3.download_file(
Bucket=s3_bucket,
Key=output_location,
Filename=f"{tmpdir}/{query_id}.csv",
Callback=callback
)
if not os.path.exists(flag_file_path):
result = {
"type": "abort-error",
Expand Down

0 comments on commit 5b142b4

Please sign in to comment.