Best practices for memory-efficient deduplication of pre-sorted Parquet files #16776
Replies: 6 comments 6 replies
-
👋 Give your description, I am surprised that this query is using a HashAggregateStream -- the hash aggregate needs to buffer the entire dataset in RAM / spill it which is why it is likely running out of memory Given that the data is sorted by col_1 and col_2, I would expect this query to use the streaming aggregate operatior (which should not have much memory at all) What does the plan look like for this: EXPLAIN SELECT
col_1,
col_2,
first_value(col_3) AS col_3
first_value(col_4) AS col_4
FROM
example
GROUP BY
col_1, col_2
ORDER BY
col_1, col_2 Can you get the different operator when you remove the first/last value aggregates? EXPLAIN SELECT
col_1,
col_2 -- NOTE remove the first_value / last_value aggregates
FROM
example
GROUP BY
col_1, col_2
ORDER BY
col_1, col_2 |
Beta Was this translation helpful? Give feedback.
-
Addressing Question 1. The query plan for the original query: CREATE EXTERNAL TABLE example (
col_1 VARCHAR(50) NOT NULL,
col_2 BIGINT NOT NULL,
col_3 VARCHAR(50),
col_4 VARCHAR(50),
col_5 VARCHAR(50),
col_6 VARCHAR(100) NOT NULL,
col_7 VARCHAR(50),
col_8 DOUBLE
)
WITH ORDER (col_1 ASC, col_2 ASC)
STORED AS PARQUET
LOCATION '/tmp/redacted/*.parquet';
EXPLAIN COPY (
SELECT
col_1,
col_2,
col_3,
col_4,
col_5,
col_6,
first_value(col_7) AS col_7,
first_value(col_8) AS col_8
FROM
example
GROUP BY
col_1, col_2, col_3, col_4, col_5, col_6
ORDER BY
col_1 ASC, col_2 ASC
)
TO '/tmp/result.parquet'
STORED AS PARQUET
OPTIONS (compression 'zstd(1)'); The resulting
|
Beta Was this translation helpful? Give feedback.
-
Addressing Question 2) It's not possible to remove the
Instead, I removed CREATE EXTERNAL TABLE example (
col_1 VARCHAR(50) NOT NULL,
col_2 BIGINT NOT NULL,
col_3 VARCHAR(50),
col_4 VARCHAR(50),
col_5 VARCHAR(50),
col_6 VARCHAR(100) NOT NULL,
col_7 VARCHAR(50),
col_8 DOUBLE
)
WITH ORDER (col_1 ASC, col_2 ASC)
STORED AS PARQUET
LOCATION '/tmp/redacted/*.parquet';
COPY (
SELECT
col_1,
col_2,
col_3,
col_4,
col_5,
col_6
FROM
example
GROUP BY
col_1, col_2, col_3, col_4, col_5, col_6
ORDER BY
col_1 ASC, col_2 ASC
)
TO '/tmp/result_part2.parquet'
STORED AS PARQUET
OPTIONS (compression 'zstd(1)'); The resulting
Executing the query results in:
|
Beta Was this translation helpful? Give feedback.
-
The above results were performed with the following setup:
|
Beta Was this translation helpful? Give feedback.
-
I created a public Gist which you can find here: https://gist.github.com/zheniasigayev/2e5e471c9070cfa685d938bced47aa7f. I confirmed that the 2 queries that I provided in the discussion above produced the same query plan, and memory consumers, when run against the generated parquet files. |
Beta Was this translation helpful? Give feedback.
-
I created a GitHub issue with relevant details summarized. See: |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Hi DataFusion folks!
I'm trying to deduplicate pre-sorted data stored in several Parquet files.
Asks:
Any suggestions, or recommendations on changes I can make to the query or configuration that could make deduplicating and merging several pre-sorted Parquet files more performant in a memory constrained environment?
Context:
I am consistently running out of memory with the error:
I currently do this by:
GROUP BY
duplicate columns, and usefirst_value()
aggregation function to select only the first occurrencedatafusion-cli -m 8G -d 50G --top-memory-consumers 25
I have explored various configuration settings found here: https://datafusion.apache.org/user-guide/configs.html
I've seen some GitHub discussions. This is a unique use-case:
Beta Was this translation helpful? Give feedback.
All reactions