Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert automatically to arrow strings #86

Merged
merged 7 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion dask_bigquery/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import pandas as pd
import pyarrow
from dask.base import tokenize
from dask.dataframe._compat import PANDAS_GE_220
from dask.dataframe.utils import pyarrow_strings_enabled
from google.api_core import client_info as rest_client_info
from google.api_core import exceptions
from google.api_core.gapic_v1 import client_info as grpc_client_info
Expand Down Expand Up @@ -95,6 +97,7 @@ def bigquery_read(
read_kwargs: dict,
arrow_options: dict,
credentials: dict = None,
convert_string: bool = False,
) -> pd.DataFrame:
"""Read a single batch of rows via BQ Storage API, in Arrow binary format.

Expand All @@ -114,7 +117,15 @@ def bigquery_read(
BigQuery Storage API Stream "name"
NOTE: Please set if reading from Storage API without any `row_restriction`.
https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1beta1#stream
convert_string: bool
Whether to convert arrow strings directly to arrpw strings in the DataFrame
phofl marked this conversation as resolved.
Show resolved Hide resolved
"""
arrow_options = arrow_options.copy()
if convert_string:
types_mapper = _get_types_mapper(arrow_options.get("types_mapper", {}.get))
if types_mapper is not None:
arrow_options["types_mapper"] = types_mapper

with bigquery_clients(project_id, credentials=credentials) as (_, bqs_client):
session = bqs_client.create_read_session(make_create_read_session_request())
schema = pyarrow.ipc.read_schema(
Expand All @@ -130,6 +141,37 @@ def bigquery_read(
return pd.concat(shards)


def _get_types_mapper(user_mapper):
type_mappers = []

# always use the user-defined mapper first, if available
if user_mapper is not None:
type_mappers.append(user_mapper)

type_mappers.append({pyarrow.string(): pd.StringDtype("pyarrow")}.get)
if PANDAS_GE_220:
type_mappers.append({pyarrow.large_string(): pd.StringDtype("pyarrow")}.get)
type_mappers.append({pyarrow.date32(): pd.ArrowDtype(pyarrow.date32())}.get)
type_mappers.append({pyarrow.date64(): pd.ArrowDtype(pyarrow.date64())}.get)

def _convert_decimal_type(type):
if pyarrow.types.is_decimal(type):
return pd.ArrowDtype(type)
return None

type_mappers.append(_convert_decimal_type)

def default_types_mapper(pyarrow_dtype):
"""Try all type mappers in order, starting from the user type mapper."""
for type_converter in type_mappers:
converted_type = type_converter(pyarrow_dtype)
if converted_type is not None:
return converted_type

if len(type_mappers) > 0:
return default_types_mapper


def read_gbq(
project_id: str,
dataset_id: str,
Expand Down Expand Up @@ -196,13 +238,19 @@ def make_create_read_session_request():
),
)

arrow_options_meta = arrow_options.copy()
if pyarrow_strings_enabled():
types_mapper = _get_types_mapper(arrow_options.get("types_mapper", {}.get))
if types_mapper is not None:
arrow_options_meta["types_mapper"] = types_mapper
Comment on lines +241 to +245
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have this twice, once here and once in bigquery_read. Thoughts on keeping this here, passing arrow_options (with the correct types_mapper) through to dd.from_map below? That way we could drop the convert_string= parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this too.

I'd like to avoid serialising that stuff in the graph, just passing a flag seems a lot easier.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Are the types_mapper entries we're adding particularly large?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It adds a few callables, which doesn't seem like a good idea (didn't do any profiling)


# Create a read session in order to detect the schema.
# Read sessions are light weight and will be auto-deleted after 24 hours.
session = bqs_client.create_read_session(make_create_read_session_request())
schema = pyarrow.ipc.read_schema(
pyarrow.py_buffer(session.arrow_schema.serialized_schema)
)
meta = schema.empty_table().to_pandas(**arrow_options)
meta = schema.empty_table().to_pandas(**arrow_options_meta)

return dd.from_map(
partial(
Expand All @@ -212,6 +260,7 @@ def make_create_read_session_request():
read_kwargs=read_kwargs,
arrow_options=arrow_options,
credentials=credentials,
convert_string=pyarrow_strings_enabled(),
),
[stream.name for stream in session.streams],
meta=meta,
Expand Down
Empty file added dask_bigquery/tests/__init__.py
Empty file.
16 changes: 16 additions & 0 deletions dask_bigquery/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import uuid
from datetime import datetime, timedelta, timezone

import dask
import dask.dataframe as dd
import gcsfs
import google.auth
Expand Down Expand Up @@ -387,6 +388,21 @@ def test_arrow_options(table):
assert ddf.dtypes["name"] == pd.StringDtype(storage="pyarrow")


@pytest.mark.parametrize("convert_string", [True, False])
def test_convert_string(table, convert_string):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this looks nice. It'd be good to include an assert_eq check too to make sure the values are as expected, not just the name column dtype.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have this in a few other places already and I can't run tests locally, so don't want to mess around with things too much

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem. Just pushed a small commit to update this (and one other) test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx

project_id, dataset_id, table_id = table
with dask.config.set({"dataframe.convert-string": convert_string}):
ddf = read_gbq(
project_id=project_id,
dataset_id=dataset_id,
table_id=table_id,
)
if convert_string:
assert ddf.dtypes["name"] == pd.StringDtype(storage="pyarrow")
else:
assert ddf.dtypes["name"] == object


@pytest.mark.skipif(sys.platform == "darwin", reason="Segfaults on macOS")
def test_read_required_partition_filter(df, required_partition_filter_table):
project_id, dataset_id, table_id = required_partition_filter_table
Expand Down
Loading