-
-
Notifications
You must be signed in to change notification settings - Fork 99
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use sort ordering on timestamp array #443
Conversation
server/src/storage/localfs.rs
Outdated
infinite_source: false, | ||
format: Arc::new(file_format), | ||
table_partition_cols: vec![], | ||
collect_stat: true, | ||
target_partitions: 1, | ||
target_partitions: 32, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What the significance of changing this field target_partitions
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Roughly the partition here is number of parallel streams that is generated by datafusion during execution. Having this 1 was causing all files to be grouped in one partition and datafusion is unable to use external sort information for files in a group as it cannot infer order between grouped files and if they are overlapping in time range or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So at max datafusion will hold 32 streams to calculate the output and merge them back using SortPreservingMerge
server/src/storage/s3.rs
Outdated
infinite_source: false, | ||
format: Arc::new(file_format), | ||
table_partition_cols: vec![], | ||
collect_stat: true, | ||
target_partitions: 1, | ||
target_partitions: 32, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This section is repeated for local and s3 mode. Can we move it to the common abstraction?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this needs refactoring
Fixes #430.
Description
Write timestamp sortedness metadata to parquet and provide external sort information to datafusion. This way the SortExec can be avoided in execution plan with most queries which use
order by p_timestamp
.Example
explain select p_timestamp from {{stream_name}} order by p_timestamp asc
In physical plan it is visible that SortExec is eliminated as output_ordering is pushed to ParquetExec node
Note:
This is still not the most optimized version of this query as SortPreservingExec is not really needed here. The issue here is that the Datafusion is not aware that the partitions / files are non overlapping when considering timestamp
Also if the target partition limit is crossed then datafusion again adds SortExec to physical plan.
This PR has: