Skip to content

Control ordering of file opened based on statistics #17271

@adriangb

Description

@adriangb

In particular for a query that has ORDER BY col ASC LIMIT 10 we will use a TopK operator + dynamic filter pushdown to prune files. This can over >20x faster query performance but effectiveness depends largely on the order in which files are opened: in the pathological case that the files with the largest col are opened first for we'll have to open every single file. The ideal case would be that we open files with the smallest col first and in the first file we find the 10 smallest col and thus are able to skip all others based on statistics. This also causes less churn in the TopK heap, etc.

Currently ListingTable orders files based on a known sort order if provided (see #4177) or their path:

// ObjectStore::list does not guarantee any consistent order and for some
// implementations such as LocalFileSystem, it may be inconsistent. Thus
// Sort files by path to ensure consistent plans when run more than once.
self.files.sort_by(|a, b| a.path().cmp(b.path()));

I'd like to propose that instead we pass down the preferred sort order for the query (instead of a hardcoded known sort order) and try to use statistics to sort the files within each partition/group to best match that sort order. I think that covers both the Influx IOx use cases and more general use cases where strict non-overlapping ordering is not required but generally ordering the file opens to agree with sort operators is beneficial.

I believe the main barrier to this is that sort information is not passed down into TableProvider. It would have to be an additional option to scan. Adding an additional option would be a breaking change that impacts a lot of users and scan already has a lot of option. Hence I propose the following:

struct ScanOptions {
    preferred_ordering: Vec<LexOrdering>,
    filters: Vec<Expr>,
    limit: Option<usize>,
}

struct ScanResult {
    /// The ExecutionPlan to run.
    plan: Arc<dyn ExecutionPlan>,
    // Remaining filters that were not completely evaluated during `scan_with_options()`.
    filters: Vec<Expr>,
}


trait TableProvider {
    fn scan_with_options(&self, options: ScanOptions) -> Result<ScanResult>;
    #[deprecated]
    fn scan(&self, ...) -> Result<Arc<dyn ExecutionPlan>>;
    #[deprecated]
    fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result<Vec<TableProviderFilterPushDown>> { ... 
}

Metadata

Metadata

Assignees

Labels

No labels
No labels

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions