diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index 4dadffd1240d..d622485a2bf2 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -302,6 +302,7 @@ fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result< connection, with, table_name, + limit, .. } = match stmt { CopyTable::To(arg) => arg, @@ -328,6 +329,7 @@ fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result< pattern, direction, timestamp_range, + limit, }) } diff --git a/src/operator/src/statement/copy_database.rs b/src/operator/src/statement/copy_database.rs index fac6d77a6c15..daed3379aedc 100644 --- a/src/operator/src/statement/copy_database.rs +++ b/src/operator/src/statement/copy_database.rs @@ -90,6 +90,7 @@ impl StatementExecutor { pattern: None, direction: CopyDirection::Export, timestamp_range: req.time_range, + limit: None, }, ctx.clone(), ) @@ -155,6 +156,7 @@ impl StatementExecutor { pattern: None, direction: CopyDirection::Import, timestamp_range: None, + limit: None, }; debug!("Copy table, arg: {:?}", req); match self.copy_table_from(req, ctx.clone()).await { diff --git a/src/operator/src/statement/copy_table_from.rs b/src/operator/src/statement/copy_table_from.rs index aa4e2343a38a..374df8380949 100644 --- a/src/operator/src/statement/copy_table_from.rs +++ b/src/operator/src/statement/copy_table_from.rs @@ -379,11 +379,14 @@ impl StatementExecutor { let mut rows_inserted = 0; let mut insert_cost = 0; - let max_insert_rows = req - .with - .get(MAX_INSERT_ROWS) - .and_then(|val| val.parse::().ok()) - .unwrap_or(DEFAULT_MAX_INSERT_ROWS); + let max_insert_rows = if let Some(num) = req.limit { + num as usize + } else { + req.with + .get(MAX_INSERT_ROWS) + .and_then(|val| val.parse::().ok()) + .unwrap_or(DEFAULT_MAX_INSERT_ROWS) + }; for (compat_schema, file_schema_projection, projected_table_schema, file_metadata) in files { let mut stream = self diff --git a/src/sql/src/parsers/copy_parser.rs b/src/sql/src/parsers/copy_parser.rs index b9e9be85f732..ef05177a4386 100644 --- a/src/sql/src/parsers/copy_parser.rs +++ b/src/sql/src/parsers/copy_parser.rs @@ -56,7 +56,7 @@ impl<'a> ParserContext<'a> { })?; let req = if self.parser.parse_keyword(Keyword::TO) { - let (with, connection, location) = self.parse_copy_parameters()?; + let (with, connection, location, _) = self.parse_copy_parameters()?; let argument = CopyDatabaseArgument { database_name, with: with.into(), @@ -68,7 +68,7 @@ impl<'a> ParserContext<'a> { self.parser .expect_keyword(Keyword::FROM) .context(error::SyntaxSnafu)?; - let (with, connection, location) = self.parse_copy_parameters()?; + let (with, connection, location, _) = self.parse_copy_parameters()?; let argument = CopyDatabaseArgument { database_name, with: with.into(), @@ -91,28 +91,30 @@ impl<'a> ParserContext<'a> { let table_name = Self::canonicalize_object_name(raw_table_name); if self.parser.parse_keyword(Keyword::TO) { - let (with, connection, location) = self.parse_copy_parameters()?; + let (with, connection, location, limit) = self.parse_copy_parameters()?; Ok(CopyTable::To(CopyTableArgument { table_name, with: with.into(), connection: connection.into(), location, + limit, })) } else { self.parser .expect_keyword(Keyword::FROM) .context(error::SyntaxSnafu)?; - let (with, connection, location) = self.parse_copy_parameters()?; + let (with, connection, location, limit) = self.parse_copy_parameters()?; Ok(CopyTable::From(CopyTableArgument { table_name, with: with.into(), connection: connection.into(), location, + limit, })) } } - fn parse_copy_parameters(&mut self) -> Result<(With, Connection, String)> { + fn parse_copy_parameters(&mut self) -> Result<(With, Connection, String, Option)> { let location = self.parser .parse_literal_string() @@ -142,7 +144,17 @@ impl<'a> ParserContext<'a> { .map(parse_option_string) .collect::>()?; - Ok((with, connection, location)) + let limit = if self.parser.parse_keyword(Keyword::LIMIT) { + Some(self.parser.parse_literal_uint().with_context(|_| error::UnexpectedSnafu { + sql: self.sql, + expected: "maximum rows", + actual: self.peek_token_as_string(), + })?) + } else { + None + }; + + Ok((with, connection, location, limit)) } } diff --git a/src/sql/src/statements/copy.rs b/src/sql/src/statements/copy.rs index e99727f89a3f..c68b9d8c0321 100644 --- a/src/sql/src/statements/copy.rs +++ b/src/sql/src/statements/copy.rs @@ -111,6 +111,7 @@ pub struct CopyTableArgument { pub connection: OptionMap, /// Copy tbl [To|From] 'location'. pub location: String, + pub limit: Option, } #[cfg(test)] diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 4ddea65aea0f..10182baeb463 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -228,6 +228,7 @@ pub struct CopyTableRequest { pub pattern: Option, pub direction: CopyDirection, pub timestamp_range: Option, + pub limit: Option, } #[derive(Debug, Clone, Default)] diff --git a/tests/cases/standalone/common/copy/copy_from_fs_parquet.result b/tests/cases/standalone/common/copy/copy_from_fs_parquet.result index 3a2eaed6174c..dba6606bb534 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_parquet.result +++ b/tests/cases/standalone/common/copy/copy_from_fs_parquet.result @@ -109,6 +109,22 @@ select count(*) from with_limit_rows; | 2 | +----------+ +CREATE TABLE with_limit_rows_segment(host string, cpu double, memory double, ts timestamp time index); + +Affected Rows: 0 + +Copy with_limit_rows FROM '/tmp/demo/export/parquet_files/' LIMIT 2; + +Affected Rows: 2 + +select count(*) from with_limit_rows_segment; + ++----------+ +| COUNT(*) | ++----------+ +| 0 | ++----------+ + drop table demo; Affected Rows: 0 @@ -137,3 +153,7 @@ drop table with_limit_rows; Affected Rows: 0 +drop table with_limit_rows_segment; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql b/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql index d2916e4b9322..5bd87a3f49d7 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql +++ b/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql @@ -40,6 +40,12 @@ Copy with_limit_rows FROM '/tmp/demo/export/parquet_files/' WITH (MAX_INSERT_ROW select count(*) from with_limit_rows; +CREATE TABLE with_limit_rows_segment(host string, cpu double, memory double, ts timestamp time index); + +Copy with_limit_rows FROM '/tmp/demo/export/parquet_files/' LIMIT 2; + +select count(*) from with_limit_rows_segment; + drop table demo; drop table demo_2; @@ -53,3 +59,5 @@ drop table with_pattern; drop table without_limit_rows; drop table with_limit_rows; + +drop table with_limit_rows_segment;