Skip to content

Commit

Permalink
feat: limit total rows copied in COPY TABLE FROM stmt (#3819)
Browse files Browse the repository at this point in the history
* feat: limit total rows copied in `COPY TABLE FROM` stmt

* fix: break outer loop

* fmt

* fixup

* test: add limit rows test

* fix test

* fix test: add drop

* fix test

* fix test

* fix test

* fix: change to const
  • Loading branch information
irenjj authored May 7, 2024
1 parent 6e1cc1d commit 5274806
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 14 deletions.
23 changes: 19 additions & 4 deletions src/operator/src/statement/copy_table_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ use crate::statement::StatementExecutor;

const DEFAULT_BATCH_SIZE: usize = 8192;
const DEFAULT_READ_BUFFER: usize = 256 * 1024;
const MAX_INSERT_ROWS: &str = "max_insert_rows";
const DEFAULT_MAX_INSERT_ROWS: usize = 1000;

enum FileMetadata {
Parquet {
Expand Down Expand Up @@ -377,6 +379,11 @@ impl StatementExecutor {

let mut rows_inserted = 0;
let mut insert_cost = 0;
let max_insert_rows = req
.with
.get(MAX_INSERT_ROWS)
.and_then(|val| val.parse::<usize>().ok())
.unwrap_or(DEFAULT_MAX_INSERT_ROWS);
for (compat_schema, file_schema_projection, projected_table_schema, file_metadata) in files
{
let mut stream = self
Expand Down Expand Up @@ -427,6 +434,10 @@ impl StatementExecutor {
rows_inserted += rows;
insert_cost += cost;
}

if rows_inserted >= max_insert_rows {
return Ok(gen_insert_output(rows_inserted, insert_cost));
}
}

if !pending.is_empty() {
Expand All @@ -436,13 +447,17 @@ impl StatementExecutor {
}
}

Ok(Output::new(
OutputData::AffectedRows(rows_inserted),
OutputMeta::new_with_cost(insert_cost),
))
Ok(gen_insert_output(rows_inserted, insert_cost))
}
}

fn gen_insert_output(rows_inserted: usize, insert_cost: usize) -> Output {
Output::new(
OutputData::AffectedRows(rows_inserted),
OutputMeta::new_with_cost(insert_cost),
)
}

/// Executes all pending inserts all at once, drain pending requests and reset pending bytes.
async fn batch_insert(
pending: &mut Vec<impl Future<Output = Result<Output>>>,
Expand Down
72 changes: 66 additions & 6 deletions tests/cases/standalone/common/copy/copy_from_fs_parquet.result
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,27 @@ insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 16552765570

Affected Rows: 2

Copy demo TO '/tmp/demo/export/parquet/demo.parquet';
Copy demo TO '/tmp/demo/export/parquet_files/demo.parquet';

Affected Rows: 2

CREATE TABLE demo_2(host string, cpu double, memory double, ts TIMESTAMP time index);

Affected Rows: 0

insert into demo_2(host, cpu, memory, ts) values ('host3', 77.7, 1111, 1655276555000), ('host4', 99.9, 444.4, 1655276556000);

Affected Rows: 2

Copy demo_2 TO '/tmp/demo/export/parquet_files/demo_2.parquet';

Affected Rows: 2

CREATE TABLE with_filename(host string, cpu double, memory double, ts timestamp time index);

Affected Rows: 0

Copy with_filename FROM '/tmp/demo/export/parquet/demo.parquet';
Copy with_filename FROM '/tmp/demo/export/parquet_files/demo.parquet';

Affected Rows: 2

Expand All @@ -31,15 +43,17 @@ CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time

Affected Rows: 0

Copy with_path FROM '/tmp/demo/export/parquet/';
Copy with_path FROM '/tmp/demo/export/parquet_files/';

Affected Rows: 2
Affected Rows: 4

select * from with_path order by ts;

+-------+------+--------+---------------------+
| host | cpu | memory | ts |
+-------+------+--------+---------------------+
| host3 | 77.7 | 1111.0 | 2022-06-15T07:02:35 |
| host4 | 99.9 | 444.4 | 2022-06-15T07:02:36 |
| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 |
| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 |
+-------+------+--------+---------------------+
Expand All @@ -48,23 +62,61 @@ CREATE TABLE with_pattern(host string, cpu double, memory double, ts timestamp t

Affected Rows: 0

Copy with_pattern FROM '/tmp/demo/export/parquet/' WITH (PATTERN = 'demo.*');
Copy with_pattern FROM '/tmp/demo/export/parquet_files/' WITH (PATTERN = 'demo.*');

Affected Rows: 2
Affected Rows: 4

select * from with_pattern order by ts;

+-------+------+--------+---------------------+
| host | cpu | memory | ts |
+-------+------+--------+---------------------+
| host3 | 77.7 | 1111.0 | 2022-06-15T07:02:35 |
| host4 | 99.9 | 444.4 | 2022-06-15T07:02:36 |
| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 |
| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 |
+-------+------+--------+---------------------+

CREATE TABLE without_limit_rows(host string, cpu double, memory double, ts timestamp time index);

Affected Rows: 0

Copy without_limit_rows FROM '/tmp/demo/export/parquet_files/';

Affected Rows: 4

select count(*) from without_limit_rows;

+----------+
| COUNT(*) |
+----------+
| 4 |
+----------+

CREATE TABLE with_limit_rows(host string, cpu double, memory double, ts timestamp time index);

Affected Rows: 0

Copy with_limit_rows FROM '/tmp/demo/export/parquet_files/' WITH (MAX_INSERT_ROWS = 2);

Affected Rows: 2

select count(*) from with_limit_rows;

+----------+
| COUNT(*) |
+----------+
| 2 |
+----------+

drop table demo;

Affected Rows: 0

drop table demo_2;

Affected Rows: 0

drop table with_filename;

Affected Rows: 0
Expand All @@ -77,3 +129,11 @@ drop table with_pattern;

Affected Rows: 0

drop table without_limit_rows;

Affected Rows: 0

drop table with_limit_rows;

Affected Rows: 0

32 changes: 28 additions & 4 deletions tests/cases/standalone/common/copy/copy_from_fs_parquet.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,54 @@ CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time inde

insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000);

Copy demo TO '/tmp/demo/export/parquet/demo.parquet';
Copy demo TO '/tmp/demo/export/parquet_files/demo.parquet';

CREATE TABLE demo_2(host string, cpu double, memory double, ts TIMESTAMP time index);

insert into demo_2(host, cpu, memory, ts) values ('host3', 77.7, 1111, 1655276555000), ('host4', 99.9, 444.4, 1655276556000);

Copy demo_2 TO '/tmp/demo/export/parquet_files/demo_2.parquet';

CREATE TABLE with_filename(host string, cpu double, memory double, ts timestamp time index);

Copy with_filename FROM '/tmp/demo/export/parquet/demo.parquet';
Copy with_filename FROM '/tmp/demo/export/parquet_files/demo.parquet';

select * from with_filename order by ts;

CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time index);

Copy with_path FROM '/tmp/demo/export/parquet/';
Copy with_path FROM '/tmp/demo/export/parquet_files/';

select * from with_path order by ts;

CREATE TABLE with_pattern(host string, cpu double, memory double, ts timestamp time index);

Copy with_pattern FROM '/tmp/demo/export/parquet/' WITH (PATTERN = 'demo.*');
Copy with_pattern FROM '/tmp/demo/export/parquet_files/' WITH (PATTERN = 'demo.*');

select * from with_pattern order by ts;

CREATE TABLE without_limit_rows(host string, cpu double, memory double, ts timestamp time index);

Copy without_limit_rows FROM '/tmp/demo/export/parquet_files/';

select count(*) from without_limit_rows;

CREATE TABLE with_limit_rows(host string, cpu double, memory double, ts timestamp time index);

Copy with_limit_rows FROM '/tmp/demo/export/parquet_files/' WITH (MAX_INSERT_ROWS = 2);

select count(*) from with_limit_rows;

drop table demo;

drop table demo_2;

drop table with_filename;

drop table with_path;

drop table with_pattern;

drop table without_limit_rows;

drop table with_limit_rows;

0 comments on commit 5274806

Please sign in to comment.