Skip to content

Commit 900ee65

Browse files
Support Arrow IPC Stream Files (#18457)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #16688. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> Currently Datafusion can only read Arrow files if the're in the File format, not the Stream format. I work with a bunch of Stream format files and wanted native support. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> To accomplish the above, this PR splits the Arrow datasource into two separate implementations (`ArrowStream*` and `ArrowFile*`) with a facade on top to differentiate between the formats at query planning time. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes, there are end-to-end sqllogictests along with tests for the changes within datasource-arrow. ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> Technically yes, in that we support a new format now. I'm not sure which documentation would need to be updated? --------- Co-authored-by: Martin Grigorov <[email protected]>
1 parent d8845a6 commit 900ee65

File tree

12 files changed

+1206
-171
lines changed

12 files changed

+1206
-171
lines changed
Binary file not shown.
Binary file not shown.

datafusion/core/tests/execution/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,4 @@
1818
mod coop;
1919
mod datasource_split;
2020
mod logical_plan;
21+
mod register_arrow;
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Integration tests for register_arrow API
19+
20+
use datafusion::{execution::options::ArrowReadOptions, prelude::*};
21+
use datafusion_common::Result;
22+
23+
#[tokio::test]
24+
async fn test_register_arrow_auto_detects_format() -> Result<()> {
25+
let ctx = SessionContext::new();
26+
27+
ctx.register_arrow(
28+
"file_format",
29+
"../../datafusion/datasource-arrow/tests/data/example.arrow",
30+
ArrowReadOptions::default(),
31+
)
32+
.await?;
33+
34+
ctx.register_arrow(
35+
"stream_format",
36+
"../../datafusion/datasource-arrow/tests/data/example_stream.arrow",
37+
ArrowReadOptions::default(),
38+
)
39+
.await?;
40+
41+
let file_result = ctx.sql("SELECT * FROM file_format ORDER BY f0").await?;
42+
let stream_result = ctx.sql("SELECT * FROM stream_format ORDER BY f0").await?;
43+
44+
let file_batches = file_result.collect().await?;
45+
let stream_batches = stream_result.collect().await?;
46+
47+
assert_eq!(file_batches.len(), stream_batches.len());
48+
assert_eq!(file_batches[0].schema(), stream_batches[0].schema());
49+
50+
let file_rows: usize = file_batches.iter().map(|b| b.num_rows()).sum();
51+
let stream_rows: usize = stream_batches.iter().map(|b| b.num_rows()).sum();
52+
assert_eq!(file_rows, stream_rows);
53+
54+
Ok(())
55+
}
56+
57+
#[tokio::test]
58+
async fn test_register_arrow_join_file_and_stream() -> Result<()> {
59+
let ctx = SessionContext::new();
60+
61+
ctx.register_arrow(
62+
"file_table",
63+
"../../datafusion/datasource-arrow/tests/data/example.arrow",
64+
ArrowReadOptions::default(),
65+
)
66+
.await?;
67+
68+
ctx.register_arrow(
69+
"stream_table",
70+
"../../datafusion/datasource-arrow/tests/data/example_stream.arrow",
71+
ArrowReadOptions::default(),
72+
)
73+
.await?;
74+
75+
let result = ctx
76+
.sql(
77+
"SELECT a.f0, a.f1, b.f0, b.f1
78+
FROM file_table a
79+
JOIN stream_table b ON a.f0 = b.f0
80+
WHERE a.f0 <= 2
81+
ORDER BY a.f0",
82+
)
83+
.await?;
84+
let batches = result.collect().await?;
85+
86+
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
87+
assert_eq!(total_rows, 2);
88+
89+
Ok(())
90+
}

datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -284,12 +284,12 @@ async fn test_multi_source_schema_adapter_reuse() -> Result<()> {
284284
// Create a test factory
285285
let factory = Arc::new(UppercaseAdapterFactory {});
286286

287-
// Test ArrowSource
287+
// Test ArrowFileSource
288288
{
289289
let schema =
290290
Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
291291
let table_schema = TableSchema::new(schema, vec![]);
292-
let source = ArrowSource::new(table_schema);
292+
let source = ArrowSource::new_file_source(table_schema);
293293
let source_with_adapter = source
294294
.clone()
295295
.with_schema_adapter_factory(factory.clone())

0 commit comments

Comments
 (0)