Skip to content

[Parquet][Python]: Reading subset by feeding data asynchronously to parquet parser #45352

Open
@MarkusSintonen

Description

@MarkusSintonen

There doesn't seem to be anyway in IO interfaces to use async code to feed data into the parquet parsers. However there seems to be a hacky workaround which seems to work via using anonymous mmap and feeding the parser via that:

class MyAsyncReader(Protocol):
    async def parquet_size(self) -> int: ...  # Size stored separately when writing elsewhere
    async def parquet_meta(self) -> bytes: ...  # Metadata stored separately when writing elsewhere
    async def parquet_data(self, start_offset: int, end_offset: int) -> bytes: ...


async def query(reader: MyAsyncReader, filter: Expression) -> Table:
    size = await reader.parquet_size()
    meta = await reader.parquet_meta()

    anon_mmap = mmap.mmap(-1, size, flags=mmap.MAP_ANONYMOUS | mmap.MAP_PRIVATE)
    try:
        anon_mmap.seek(size - len(meta))  # Meta to tail
        anon_mmap.write(meta)

        frag = ParquetFileFormat().make_fragment(anon_mmap).subset(filter)

        first_row_col = frag.row_groups[0].metadata.column(0)
        last_row_col = frag.row_groups[-1].metadata.column(frag.metadata.num_columns - 1)
        start_offset = offset(first_row_col)
        end_offset = offset(last_row_col) + last_row_col.total_compressed_size

        anon_mmap.seek(start_offset)
        anon_mmap.write(await reader.parquet_data(start_offset, end_offset))  # Feed needed data for parser

        return frag.to_table()  # Parse the subset of row groups
    finally:
        anon_mmap.close()


def offset(meta: ColumnChunkMetaData) -> int:
    return (
        min(meta.dictionary_page_offset, meta.data_page_offset)  # Is there a better way to get this?
        if meta.dictionary_page_offset is not None
        else meta.data_page_offset
    )

Is there any other way to feed data into the file parser externally? Using the anonymous mmap feels hacky to feed data into the parser. There are the IO interfaces but none of these are suitable for async code. Also is there a better way to get the file offsets based on the filter-expression other than above?

We can not rely on ThreadPoolExecutor (or ProcessPoolExecutor) for doing the blocking IO. We can not consume threads as the processing is heavily IO bound with very high level of concurrency. Where most of the work goes into waiting for the IO. So we can not consume threads to just wait for the IO. With the pure async IO code it is able to handle much higher level of concurrency as the work is not bound by the parquet-parsing.

Component(s)

Python, Parquet

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions