From 7e267f9a46c85554e68cbd9565344e26ef5d13dd Mon Sep 17 00:00:00 2001 From: Lucas Fernandes da Costa Date: Fri, 29 Nov 2024 21:22:35 -0300 Subject: [PATCH 1/2] fix handling values out of bounds in pandas --- apps/api/src/python/query/bigquery.ts | 30 ++++++++++++++++++++------- apps/api/src/python/query/index.ts | 7 +++++-- 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/apps/api/src/python/query/bigquery.ts b/apps/api/src/python/query/bigquery.ts index 72f684e4..67aca364 100644 --- a/apps/api/src/python/query/bigquery.ts +++ b/apps/api/src/python/query/bigquery.ts @@ -36,6 +36,7 @@ export async function makeBigQueryQuery( const code = ` def _briefer_make_bq_query(): + from datetime import datetime from google.cloud import bigquery from google.cloud import bigquery_storage from google.oauth2 import service_account @@ -48,6 +49,7 @@ def _briefer_make_bq_query(): from concurrent.futures import ThreadPoolExecutor import threading import queue + import pytz def get_columns(df): columns = [] @@ -86,8 +88,19 @@ def _briefer_make_bq_query(): if bq_type in ['DATE', 'DATETIME']: df[col] = pd.to_datetime(df[col], errors='coerce') elif bq_type == 'TIMESTAMP': - # Convert to timezone-aware datetime, preserving the original time zone - df[col] = pd.to_datetime(df[col], errors='coerce', utc=True).dt.tz_convert(None) + # Define fallback and maximum possible date + # This is useful because timestamps that are too small or large for pandas are converted to NaT + fallback_date = datetime(1, 1, 1, 0, 0, 0, 0, tzinfo=pytz.utc) + max_date = datetime(9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc) + + # Replace large values with the maximum possible date + df[col] = df[col].apply(lambda x: max_date if x > max_date else x) + + # Replace small values with the fallback date + df[col] = df[col].apply(lambda x: fallback_date if x < fallback_date else x) + + # make sure timezone is in utc and preserve timezone info + df[col] = df[col].apply(lambda x: x.astimezone(pytz.utc) if x.tzinfo else x) elif bq_type == 'TIME': df[col] = pd.to_datetime(df[col], errors='coerce').dt.time elif bq_type == 'NUMERIC': @@ -158,7 +171,7 @@ def _briefer_make_bq_query(): return df bq_storage_client = bigquery_storage.BigQueryReadClient(credentials=credentials) - df_iter = query_result.to_dataframe_iterable(bqstorage_client=bq_storage_client) + arrow_iter = query_result.to_arrow_iterable(bqstorage_client=bq_storage_client) df = pd.DataFrame() initial_rows = [] @@ -166,15 +179,18 @@ def _briefer_make_bq_query(): last_emitted_at = 0 chunks = [] rows_count = 0 - for chunk in df_iter: + for chunk in arrow_iter: if not os.path.exists(flag_file_path): print(json.dumps({"type": "log", "message": "Query aborted"})) aborted = True break - chunk = rename_duplicates(chunk) - rows_count += len(chunk) - chunks.append(chunk) + # make sure that to_pandas does not crash for dates out of bounds + chunk_df = chunk.to_pandas(timestamp_as_object=True) + + chunk_df = rename_duplicates(chunk_df) + rows_count += len(chunk_df) + chunks.append(chunk_df) if len(initial_rows) < 250: df = pd.concat(chunks, ignore_index=True) diff --git a/apps/api/src/python/query/index.ts b/apps/api/src/python/query/index.ts index 91268fea..e593f583 100644 --- a/apps/api/src/python/query/index.ts +++ b/apps/api/src/python/query/index.ts @@ -272,11 +272,13 @@ export async function makeQuery( const code = ` def _briefer_read_query(): + import pyarrow.parquet as pq import pandas as pd retries = 3 while retries > 0: try: - return pd.read_parquet("/home/jupyteruser/.briefer/query-${queryId}.parquet.gzip") + table = pq.read_table("/home/jupyteruser/.briefer/query-${queryId}.parquet.gzip") + return table.to_pandas(timestamp_as_object=True) except: retries -= 1 if retries == 0: @@ -347,8 +349,9 @@ export async function readDataframePage( if not ("${dataframeName}" in globals()): import pandas as pd + import pyarrow.parquet as pq try: - ${dataframeName} = pd.read_parquet("/home/jupyteruser/.briefer/query-${queryId}.parquet.gzip") + ${dataframeName} = pq.read_table("/home/jupyteruser/.briefer/query-${queryId}.parquet.gzip").to_pandas(timestamp_as_object=True) except: print(json.dumps({"type": "not-found"})) From c41b18e317a9d3f94ad2e697008c467bf2fda672 Mon Sep 17 00:00:00 2001 From: Lucas Fernandes da Costa Date: Mon, 2 Dec 2024 10:08:27 -0300 Subject: [PATCH 2/2] handle null dates --- apps/api/src/python/query/bigquery.ts | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/apps/api/src/python/query/bigquery.ts b/apps/api/src/python/query/bigquery.ts index 67aca364..476a4b1b 100644 --- a/apps/api/src/python/query/bigquery.ts +++ b/apps/api/src/python/query/bigquery.ts @@ -88,19 +88,14 @@ def _briefer_make_bq_query(): if bq_type in ['DATE', 'DATETIME']: df[col] = pd.to_datetime(df[col], errors='coerce') elif bq_type == 'TIMESTAMP': - # Define fallback and maximum possible date - # This is useful because timestamps that are too small or large for pandas are converted to NaT + # handle out-of-bounds dates fallback_date = datetime(1, 1, 1, 0, 0, 0, 0, tzinfo=pytz.utc) max_date = datetime(9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc) - - # Replace large values with the maximum possible date - df[col] = df[col].apply(lambda x: max_date if x > max_date else x) - - # Replace small values with the fallback date - df[col] = df[col].apply(lambda x: fallback_date if x < fallback_date else x) + df[col] = df[col].apply(lambda x: max_date if not pd.isnull(x) and x > max_date else x) + df[col] = df[col].apply(lambda x: fallback_date if not pd.isnull(x) and x < fallback_date else x) # make sure timezone is in utc and preserve timezone info - df[col] = df[col].apply(lambda x: x.astimezone(pytz.utc) if x.tzinfo else x) + df[col] = df[col].apply(lambda x: x.astimezone(pytz.utc) if not pd.isnull(x) and x.tzinfo else x) elif bq_type == 'TIME': df[col] = pd.to_datetime(df[col], errors='coerce').dt.time elif bq_type == 'NUMERIC':