From 5b142b42225e84b768236d7ee5d46f90f2925a0a Mon Sep 17 00:00:00 2001 From: Lucas Vieira Date: Thu, 28 Nov 2024 17:27:21 -0300 Subject: [PATCH] improve athena query progress report --- apps/api/src/python/query/athena.ts | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/apps/api/src/python/query/athena.ts b/apps/api/src/python/query/athena.ts index f9978f2..12a5906 100644 --- a/apps/api/src/python/query/athena.ts +++ b/apps/api/src/python/query/athena.ts @@ -264,6 +264,9 @@ def briefer_make_athena_query(): print(json.dumps(result, ensure_ascii=False, default=str)) return + statistics = athena_client.get_query_runtime_statistics(QueryExecutionId=query_id) + total_rows = statistics.get("QueryRuntimeStatistics", {}).get("Rows", {}).get("OutputRows", None) + data = athena_client.get_query_results(QueryExecutionId=query_id) if not os.path.exists(flag_file_path): result = { @@ -302,7 +305,31 @@ def briefer_make_athena_query(): else: output_location= f"{s3_dir}{query_id}.csv" print(json.dumps({"type": "log", "message": f"Downloading s3://{s3_bucket}/{output_location}"})) - s3.download_file(s3_bucket, output_location, f"{tmpdir}/{query_id}.csv") + + output_file_size = s3.head_object(Bucket=s3_bucket, Key=output_location)["ContentLength"] + + last_emitted_at = 0 + total_bytes_transferred = 0 + def callback(bytes_transferred): + nonlocal last_emitted_at, total_bytes_transferred + + if not total_rows: + return + + total_bytes_transferred += bytes_transferred + estimated_rows = int(total_rows * total_bytes_transferred / output_file_size) + result["count"] = estimated_rows + now = time.time() + if now - last_emitted_at > 1: + last_emitted_at = now + print(json.dumps(result, ensure_ascii=False, default=str)) + + s3.download_file( + Bucket=s3_bucket, + Key=output_location, + Filename=f"{tmpdir}/{query_id}.csv", + Callback=callback + ) if not os.path.exists(flag_file_path): result = { "type": "abort-error",