diff --git a/apps/api/src/python/query/bigquery.ts b/apps/api/src/python/query/bigquery.ts index 72f684e4..67aca364 100644 --- a/apps/api/src/python/query/bigquery.ts +++ b/apps/api/src/python/query/bigquery.ts @@ -36,6 +36,7 @@ export async function makeBigQueryQuery( const code = ` def _briefer_make_bq_query(): + from datetime import datetime from google.cloud import bigquery from google.cloud import bigquery_storage from google.oauth2 import service_account @@ -48,6 +49,7 @@ def _briefer_make_bq_query(): from concurrent.futures import ThreadPoolExecutor import threading import queue + import pytz def get_columns(df): columns = [] @@ -86,8 +88,19 @@ def _briefer_make_bq_query(): if bq_type in ['DATE', 'DATETIME']: df[col] = pd.to_datetime(df[col], errors='coerce') elif bq_type == 'TIMESTAMP': - # Convert to timezone-aware datetime, preserving the original time zone - df[col] = pd.to_datetime(df[col], errors='coerce', utc=True).dt.tz_convert(None) + # Define fallback and maximum possible date + # This is useful because timestamps that are too small or large for pandas are converted to NaT + fallback_date = datetime(1, 1, 1, 0, 0, 0, 0, tzinfo=pytz.utc) + max_date = datetime(9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc) + + # Replace large values with the maximum possible date + df[col] = df[col].apply(lambda x: max_date if x > max_date else x) + + # Replace small values with the fallback date + df[col] = df[col].apply(lambda x: fallback_date if x < fallback_date else x) + + # make sure timezone is in utc and preserve timezone info + df[col] = df[col].apply(lambda x: x.astimezone(pytz.utc) if x.tzinfo else x) elif bq_type == 'TIME': df[col] = pd.to_datetime(df[col], errors='coerce').dt.time elif bq_type == 'NUMERIC': @@ -158,7 +171,7 @@ def _briefer_make_bq_query(): return df bq_storage_client = bigquery_storage.BigQueryReadClient(credentials=credentials) - df_iter = query_result.to_dataframe_iterable(bqstorage_client=bq_storage_client) + arrow_iter = query_result.to_arrow_iterable(bqstorage_client=bq_storage_client) df = pd.DataFrame() initial_rows = [] @@ -166,15 +179,18 @@ def _briefer_make_bq_query(): last_emitted_at = 0 chunks = [] rows_count = 0 - for chunk in df_iter: + for chunk in arrow_iter: if not os.path.exists(flag_file_path): print(json.dumps({"type": "log", "message": "Query aborted"})) aborted = True break - chunk = rename_duplicates(chunk) - rows_count += len(chunk) - chunks.append(chunk) + # make sure that to_pandas does not crash for dates out of bounds + chunk_df = chunk.to_pandas(timestamp_as_object=True) + + chunk_df = rename_duplicates(chunk_df) + rows_count += len(chunk_df) + chunks.append(chunk_df) if len(initial_rows) < 250: df = pd.concat(chunks, ignore_index=True) diff --git a/apps/api/src/python/query/index.ts b/apps/api/src/python/query/index.ts index 91268fea..e593f583 100644 --- a/apps/api/src/python/query/index.ts +++ b/apps/api/src/python/query/index.ts @@ -272,11 +272,13 @@ export async function makeQuery( const code = ` def _briefer_read_query(): + import pyarrow.parquet as pq import pandas as pd retries = 3 while retries > 0: try: - return pd.read_parquet("/home/jupyteruser/.briefer/query-${queryId}.parquet.gzip") + table = pq.read_table("/home/jupyteruser/.briefer/query-${queryId}.parquet.gzip") + return table.to_pandas(timestamp_as_object=True) except: retries -= 1 if retries == 0: @@ -347,8 +349,9 @@ export async function readDataframePage( if not ("${dataframeName}" in globals()): import pandas as pd + import pyarrow.parquet as pq try: - ${dataframeName} = pd.read_parquet("/home/jupyteruser/.briefer/query-${queryId}.parquet.gzip") + ${dataframeName} = pq.read_table("/home/jupyteruser/.briefer/query-${queryId}.parquet.gzip").to_pandas(timestamp_as_object=True) except: print(json.dumps({"type": "not-found"}))