Skip to content

Commit

Permalink
feat(connector): add minio file scan type and enhance test (#19950)
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu authored Jan 7, 2025
1 parent 5084d92 commit f530c10
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 22 deletions.
55 changes: 55 additions & 0 deletions e2e_test/s3/file_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,46 @@ def do_test(config, file_num, item_num_per_file, prefix):
def _table():
return 's3_test_parquet'

print("test table function file scan")
cur.execute(f'''
SELECT
id,
name,
sex,
mark,
test_int,
test_int8,
test_uint8,
test_uint16,
test_uint32,
test_uint64,
test_float_16,
test_real,
test_double_precision,
test_varchar,
test_bytea,
test_date,
test_time,
test_timestamp_s,
test_timestamp_ms,
test_timestamp_us,
test_timestamp_ns,
test_timestamptz_s,
test_timestamptz_ms,
test_timestamptz_us,
test_timestamptz_ns
FROM file_scan(
'parquet',
's3',
'http://127.0.0.1:9301',
'hummockadmin',
'hummockadmin',
's3://hummock001/test_file_scan/test_file_scan.parquet'
);''')
result = cur.fetchone()
assert result[0] == 0, f'file scan assertion failed: the first column is {result[0]}, expect 0.'

print("file scan test pass")
# Execute a SELECT statement
cur.execute(f'''CREATE TABLE {_table()}(
id bigint primary key,
Expand Down Expand Up @@ -491,6 +531,21 @@ def _assert_greater(field, got, expect):
_s3(idx),
_local(idx)
)
# put parquet file to test table function file scan
if data:
first_file_data = data[0]
first_table = pa.Table.from_pandas(pd.DataFrame(first_file_data))

first_file_name = f"test_file_scan.parquet"
first_file_path = f"test_file_scan/{first_file_name}"

pq.write_table(first_table, "data_0.parquet")

client.fput_object(
"hummock001",
first_file_path,
"data_0.parquet"
)

# do test
do_test(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id)
Expand Down
1 change: 1 addition & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ message FileScanNode {
string s3_access_key = 5;
string s3_secret_key = 6;
repeated string file_location = 7;
string s3_endpoint = 8;
}

message GcsFileScanNode {
Expand Down
8 changes: 5 additions & 3 deletions src/batch/executors/src/executor/s3_file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use risingwave_connector::source::iceberg::{
extract_bucket_and_file_name, new_s3_operator, read_parquet_file, FileScanBackend,
};
use risingwave_pb::batch_plan::file_scan_node;
use risingwave_pb::batch_plan::file_scan_node::StorageType;
use risingwave_pb::batch_plan::plan_node::NodeBody;

use crate::error::BatchError;
Expand All @@ -38,6 +37,7 @@ pub struct S3FileScanExecutor {
s3_region: String,
s3_access_key: String,
s3_secret_key: String,
s3_endpoint: String,
batch_size: usize,
schema: Schema,
identity: String,
Expand Down Expand Up @@ -67,13 +67,15 @@ impl S3FileScanExecutor {
batch_size: usize,
schema: Schema,
identity: String,
s3_endpoint: String,
) -> Self {
Self {
file_format,
file_location,
s3_region,
s3_access_key,
s3_secret_key,
s3_endpoint,
batch_size,
schema,
identity,
Expand All @@ -90,6 +92,7 @@ impl S3FileScanExecutor {
self.s3_access_key.clone(),
self.s3_secret_key.clone(),
bucket.clone(),
self.s3_endpoint.clone(),
)?;
let chunk_stream =
read_parquet_file(op, file_name, None, None, self.batch_size, 0).await?;
Expand All @@ -115,8 +118,6 @@ impl BoxedExecutorBuilder for FileScanExecutorBuilder {
NodeBody::FileScan
)?;

assert_eq!(file_scan_node.storage_type, StorageType::S3 as i32);

Ok(Box::new(S3FileScanExecutor::new(
match file_scan_node::FileFormat::try_from(file_scan_node.file_format).unwrap() {
file_scan_node::FileFormat::Parquet => FileFormat::Parquet,
Expand All @@ -129,6 +130,7 @@ impl BoxedExecutorBuilder for FileScanExecutorBuilder {
source.context().get_config().developer.chunk_size,
Schema::from_iter(file_scan_node.columns.iter().map(Field::from)),
source.plan_node().get_identity().clone(),
file_scan_node.s3_endpoint.clone(),
)))
}
}
18 changes: 9 additions & 9 deletions src/connector/src/source/iceberg/parquet_file_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,16 @@ pub fn new_s3_operator(
s3_access_key: String,
s3_secret_key: String,
bucket: String,
s3_endpoint: String,
) -> ConnectorResult<Operator> {
// Create s3 builder.
let mut builder = S3::default().bucket(&bucket).region(&s3_region);
builder = builder.secret_access_key(&s3_access_key);
builder = builder.secret_access_key(&s3_secret_key);
builder = builder.endpoint(&format!(
"https://{}.s3.{}.amazonaws.com",
bucket, s3_region
));

let mut builder = S3::default();
builder = builder
.region(&s3_region)
.endpoint(&s3_endpoint)
.access_key_id(&s3_access_key)
.secret_access_key(&s3_secret_key)
.bucket(&bucket)
.disable_config_load();
let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
Expand Down
19 changes: 14 additions & 5 deletions src/frontend/src/expr/table_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,24 @@ impl TableFunction {
};
let op = match file_scan_backend {
FileScanBackend::S3 => {
let (bucket, _) =
extract_bucket_and_file_name(&input_file_location, &file_scan_backend)?;

let (bucket, _) = extract_bucket_and_file_name(
&eval_args[5].clone(),
&file_scan_backend,
)?;

let (s3_region, s3_endpoint) = match eval_args[2].starts_with("http") {
true => ("us-east-1".to_owned(), eval_args[2].clone()), /* for minio, hard code region as not used but needed. */
false => (
eval_args[2].clone(),
format!("https://{}.s3.{}.amazonaws.com", bucket, eval_args[2],),
),
};
new_s3_operator(
eval_args[2].clone(),
s3_region.clone(),
eval_args[3].clone(),
eval_args[4].clone(),
bucket.clone(),
s3_endpoint.clone(),
)?
}
FileScanBackend::Gcs => {
Expand All @@ -189,7 +199,6 @@ impl TableFunction {
Ok::<Vec<String>, anyhow::Error>(files)
})
})?;

if files.is_empty() {
return Err(BindError(
"file_scan function only accepts non-empty directory".to_owned(),
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/batch_file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ impl ToBatchPb for BatchFileScan {
s3_access_key: file_scan.s3_access_key.clone(),
s3_secret_key: file_scan.s3_secret_key.clone(),
file_location: file_scan.file_location.clone(),
s3_endpoint: file_scan.s3_endpoint.clone(),
}),
generic::FileScanBackend::GcsFileScan(gcs_file_scan) => {
NodeBody::GcsFileScan(GcsFileScanNode {
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/generic/file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ pub struct FileScan {
pub s3_access_key: String,
pub s3_secret_key: String,
pub file_location: Vec<String>,
pub s3_endpoint: String,

#[educe(PartialEq(ignore))]
#[educe(Hash(ignore))]
Expand Down
6 changes: 4 additions & 2 deletions src/frontend/src/optimizer/plan_node/logical_file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,22 @@ impl LogicalFileScan {
s3_access_key: String,
s3_secret_key: String,
file_location: Vec<String>,
s3_endpoint: String,
) -> Self {
assert!("parquet".eq_ignore_ascii_case(&file_format));
assert!("s3".eq_ignore_ascii_case(&storage_type));
let storage_type = generic::StorageType::S3;

let core = generic::FileScanBackend::FileScan(generic::FileScan {
schema,
file_format: generic::FileFormat::Parquet,
storage_type: generic::StorageType::S3,
storage_type,
s3_region,
s3_access_key,
s3_secret_key,
file_location,
ctx,
s3_endpoint,
});

let base = PlanBase::new_logical_with_core(&core);
Expand Down Expand Up @@ -89,7 +92,6 @@ impl LogicalFileScan {
});

let base = PlanBase::new_logical_with_core(&core);

LogicalFileScan { base, core }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use itertools::Itertools;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_connector::source::iceberg::{extract_bucket_and_file_name, FileScanBackend};

use super::{BoxedRule, Rule};
use crate::expr::{Expr, TableFunctionType};
Expand Down Expand Up @@ -63,11 +64,19 @@ impl Rule for TableFunctionToFileScanRule {
);

if "s3".eq_ignore_ascii_case(&eval_args[1]) {
let s3_region = eval_args[2].clone();
let s3_access_key = eval_args[3].clone();
let s3_secret_key = eval_args[4].clone();
// The rest of the arguments are file locations
let file_location = eval_args[5..].iter().cloned().collect_vec();

let (bucket, _) =
extract_bucket_and_file_name(&file_location[0], &FileScanBackend::S3).ok()?;
let (s3_region, s3_endpoint) = match eval_args[2].starts_with("http") {
true => ("us-east-1".to_owned(), eval_args[2].clone()), /* for minio, hard code region as not used but needed. */
false => (
eval_args[2].clone(),
format!("https://{}.s3.{}.amazonaws.com", bucket, eval_args[2],),
),
};
Some(
LogicalFileScan::new_s3_logical_file_scan(
logical_table_function.ctx(),
Expand All @@ -78,6 +87,7 @@ impl Rule for TableFunctionToFileScanRule {
s3_access_key,
s3_secret_key,
file_location,
s3_endpoint,
)
.into(),
)
Expand Down
1 change: 0 additions & 1 deletion src/object_store/src/object/opendal_engine/opendal_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ impl OpendalObjectStore {
"http://"
};
let (address, bucket) = rest.split_once('/').unwrap();

let builder = S3::default()
.bucket(bucket)
.region("custom")
Expand Down

0 comments on commit f530c10

Please sign in to comment.