Skip to content

Commit

Permalink
feat: limit total rows copied in COPY TABLE FROM with LIMIT segment
Browse files Browse the repository at this point in the history
  • Loading branch information
irenjj committed May 10, 2024
1 parent b8a325d commit 945d564
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 11 deletions.
2 changes: 2 additions & 0 deletions src/operator/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<
connection,
with,
table_name,
limit,
..
} = match stmt {
CopyTable::To(arg) => arg,
Expand All @@ -328,6 +329,7 @@ fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<
pattern,
direction,
timestamp_range,
limit,
})
}

Expand Down
2 changes: 2 additions & 0 deletions src/operator/src/statement/copy_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ impl StatementExecutor {
pattern: None,
direction: CopyDirection::Export,
timestamp_range: req.time_range,
limit: None,
},
ctx.clone(),
)
Expand Down Expand Up @@ -155,6 +156,7 @@ impl StatementExecutor {
pattern: None,
direction: CopyDirection::Import,
timestamp_range: None,
limit: None,
};
debug!("Copy table, arg: {:?}", req);
match self.copy_table_from(req, ctx.clone()).await {
Expand Down
13 changes: 8 additions & 5 deletions src/operator/src/statement/copy_table_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,11 +379,14 @@ impl StatementExecutor {

let mut rows_inserted = 0;
let mut insert_cost = 0;
let max_insert_rows = req
.with
.get(MAX_INSERT_ROWS)
.and_then(|val| val.parse::<usize>().ok())
.unwrap_or(DEFAULT_MAX_INSERT_ROWS);
let max_insert_rows = if let Some(num) = req.limit {
num as usize
} else {
req.with
.get(MAX_INSERT_ROWS)
.and_then(|val| val.parse::<usize>().ok())
.unwrap_or(DEFAULT_MAX_INSERT_ROWS)
};
for (compat_schema, file_schema_projection, projected_table_schema, file_metadata) in files
{
let mut stream = self
Expand Down
24 changes: 18 additions & 6 deletions src/sql/src/parsers/copy_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl<'a> ParserContext<'a> {
})?;

let req = if self.parser.parse_keyword(Keyword::TO) {
let (with, connection, location) = self.parse_copy_parameters()?;
let (with, connection, location, _) = self.parse_copy_parameters()?;
let argument = CopyDatabaseArgument {
database_name,
with: with.into(),
Expand All @@ -68,7 +68,7 @@ impl<'a> ParserContext<'a> {
self.parser
.expect_keyword(Keyword::FROM)
.context(error::SyntaxSnafu)?;
let (with, connection, location) = self.parse_copy_parameters()?;
let (with, connection, location, _) = self.parse_copy_parameters()?;
let argument = CopyDatabaseArgument {
database_name,
with: with.into(),
Expand All @@ -91,28 +91,30 @@ impl<'a> ParserContext<'a> {
let table_name = Self::canonicalize_object_name(raw_table_name);

if self.parser.parse_keyword(Keyword::TO) {
let (with, connection, location) = self.parse_copy_parameters()?;
let (with, connection, location, limit) = self.parse_copy_parameters()?;
Ok(CopyTable::To(CopyTableArgument {
table_name,
with: with.into(),
connection: connection.into(),
location,
limit,
}))
} else {
self.parser
.expect_keyword(Keyword::FROM)
.context(error::SyntaxSnafu)?;
let (with, connection, location) = self.parse_copy_parameters()?;
let (with, connection, location, limit) = self.parse_copy_parameters()?;
Ok(CopyTable::From(CopyTableArgument {
table_name,
with: with.into(),
connection: connection.into(),
location,
limit,
}))
}
}

fn parse_copy_parameters(&mut self) -> Result<(With, Connection, String)> {
fn parse_copy_parameters(&mut self) -> Result<(With, Connection, String, Option<u64>)> {
let location =
self.parser
.parse_literal_string()
Expand Down Expand Up @@ -142,7 +144,17 @@ impl<'a> ParserContext<'a> {
.map(parse_option_string)
.collect::<Result<Connection>>()?;

Ok((with, connection, location))
let limit = if self.parser.parse_keyword(Keyword::LIMIT) {
Some(self.parser.parse_literal_uint().with_context(|_| error::UnexpectedSnafu {
sql: self.sql,
expected: "maximum rows",
actual: self.peek_token_as_string(),
})?)
} else {
None
};

Ok((with, connection, location, limit))
}
}

Expand Down
1 change: 1 addition & 0 deletions src/sql/src/statements/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ pub struct CopyTableArgument {
pub connection: OptionMap,
/// Copy tbl [To|From] 'location'.
pub location: String,
pub limit: Option<u64>,
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions src/table/src/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ pub struct CopyTableRequest {
pub pattern: Option<String>,
pub direction: CopyDirection,
pub timestamp_range: Option<TimestampRange>,
pub limit: Option<u64>,
}

#[derive(Debug, Clone, Default)]
Expand Down
20 changes: 20 additions & 0 deletions tests/cases/standalone/common/copy/copy_from_fs_parquet.result
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,22 @@ select count(*) from with_limit_rows;
| 2 |
+----------+

CREATE TABLE with_limit_rows_segment(host string, cpu double, memory double, ts timestamp time index);

Affected Rows: 0

Copy with_limit_rows FROM '/tmp/demo/export/parquet_files/' LIMIT 2;

Affected Rows: 2

select count(*) from with_limit_rows_segment;

+----------+
| COUNT(*) |
+----------+
| 0 |
+----------+

drop table demo;

Affected Rows: 0
Expand Down Expand Up @@ -137,3 +153,7 @@ drop table with_limit_rows;

Affected Rows: 0

drop table with_limit_rows_segment;

Affected Rows: 0

8 changes: 8 additions & 0 deletions tests/cases/standalone/common/copy/copy_from_fs_parquet.sql
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ Copy with_limit_rows FROM '/tmp/demo/export/parquet_files/' WITH (MAX_INSERT_ROW

select count(*) from with_limit_rows;

CREATE TABLE with_limit_rows_segment(host string, cpu double, memory double, ts timestamp time index);

Copy with_limit_rows FROM '/tmp/demo/export/parquet_files/' LIMIT 2;

select count(*) from with_limit_rows_segment;

drop table demo;

drop table demo_2;
Expand All @@ -53,3 +59,5 @@ drop table with_pattern;
drop table without_limit_rows;

drop table with_limit_rows;

drop table with_limit_rows_segment;

0 comments on commit 945d564

Please sign in to comment.