Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Parquet reader produces non-nullable columns #852

Closed
JulienPeloton opened this issue May 31, 2024 · 0 comments · Fixed by #853
Closed

[Bug] Parquet reader produces non-nullable columns #852

JulienPeloton opened this issue May 31, 2024 · 0 comments · Fixed by #853
Labels
apache parquet bug Something isn't working streaming

Comments

@JulienPeloton
Copy link
Member

JulienPeloton commented May 31, 2024

Since the recent change from #793 , we obtain a schema mismatch when polling alerts with the Fink client:

line 425, in _decode_avro_alert
    return fastavro.schemaless_reader(avro_alert, schema)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "fastavro/_read.pyx", line 1141, in fastavro._read.schemaless_reader
  File "fastavro/_read.pyx", line 1168, in fastavro._read.schemaless_reader
  File "fastavro/_read.pyx", line 747, in fastavro._read._read_data
  File "fastavro/_read.pyx", line 620, in fastavro._read.read_record
  File "fastavro/_read.pyx", line 739, in fastavro._read._read_data
  File "fastavro/_read.pyx", line 527, in fastavro._read.read_union
IndexError: list index out of range

Inspecting the schema

from fink_broker.spark_utils import (
  connect_to_raw_database,
  load_parquet_files
)

scitmpdatapath = 'online/science/20240531'

# Static DataFrame
df_static = load_parquet_files(scitmpdatapath)
schema_static = schema_converter.to_avro(df_static.schema)        
s_static = json.loads(schema_static)

# Streaming DataFrame
df_streaming = connect_to_raw_database(scitmpdatapath, scitmpdatapath, latestfirst=False)                             
schema_streaming = schema_converter.to_avro(df_streaming.schema)
s_streaming = json.loads(schema_streaming)

print([i for i in s_streaming['fields'] if i not in s_static['fields']])
[{'name': 'publisher', 'type': ['string', 'null']},
 {'name': 'fink_broker_version', 'type': ['string', 'null']},
 {'name': 'fink_science_version', 'type': ['string', 'null']}]

print(i for i in s_static['fields'] if i not in s_streaming['fields']])
[{'name': 'publisher', 'type': 'string'},
 {'name': 'fink_broker_version', 'type': 'string'},
 {'name': 'fink_science_version', 'type': 'string'}]

The three columns have a different nullable property (nullable in streaming, non-nullable in static)... Note that this can be also obtained by printing the schema from the fink_consumer (TODO: add a CLI argument to print the schema).

Why this is weird?

First, in the documentation of Spark, it is said:

When reading Parquet files, all columns are automatically converted to be nullable for compatibility reasons.

So it makes no sense that our static DataFrame contains non-nullable fields. Second, the code for these fields have not changed. This comes from bin/raw2science.py (added years ago):

# Add library versions
df = df.withColumn("fink_broker_version", F.lit(fbvsn)).withColumn(
    "fink_science_version", F.lit(fsvsn)
)

# Switch publisher
df = df.withColumn("publisher", F.lit("Fink"))

Funny enough, if I just load one file from the entire folder, the fields are nullable as expected:

df_static = spark.read.format('parquet').load('online/science/20240531/<one_file>.parquet')
# nullable

Spark issue

I raised this issue in the Spark bug tracker: https://issues.apache.org/jira/browse/SPARK-48492

Solution

At this point, I see two temporary solutions:

# get the schema from one file 
schema = schema_converter(spark.read.format('parquet').load(files[0]).schema)

# or get it from the streaming df directly in distribute.py#L219
df_tmp = df_tmp.selectExpr(cnames)
schema = schema_converter.to_avro(df_tmp.schema)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
apache parquet bug Something isn't working streaming
Projects
None yet
1 participant