Skip to content

Commit

Permalink
feat(frontend): purification for source def sql (#19965)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Jan 6, 2025
1 parent 2601c23 commit 448550d
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 13 deletions.
10 changes: 10 additions & 0 deletions e2e_test/source_inline/kafka/protobuf/alter_source.slt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ FORMAT PLAIN ENCODE PROTOBUF(
message = 'test.User'
);

query T
SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_sources WHERE name = 'src_user';
----
CREATE SOURCE src_user (id INT, name CHARACTER VARYING, address CHARACTER VARYING, city CHARACTER VARYING, gender CHARACTER VARYING, sc STRUCT<file_name CHARACTER VARYING>) INCLUDE timestamp

statement ok
CREATE MATERIALIZED VIEW mv_user AS SELECT * FROM src_user;

Expand Down Expand Up @@ -54,6 +59,11 @@ sleep 5s
statement ok
ALTER SOURCE src_user REFRESH SCHEMA;

query T
SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_sources WHERE name = 'src_user';
----
CREATE SOURCE src_user (id INT, name CHARACTER VARYING, address CHARACTER VARYING, city CHARACTER VARYING, gender CHARACTER VARYING, sc STRUCT<file_name CHARACTER VARYING>, age INT) INCLUDE timestamp

statement ok
CREATE MATERIALIZED VIEW mv_user_more AS SELECT * FROM src_user;

Expand Down
24 changes: 16 additions & 8 deletions src/frontend/src/catalog/purify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,34 @@ use risingwave_sqlparser::ast::*;
use crate::error::Result;
use crate::utils::data_type::DataTypeToAst as _;

/// Try to restore missing column definitions and constraints in the persisted table definition,
/// if the schema of the table is derived from external systems (like schema registry) or it's
/// Try to restore missing column definitions and constraints in the persisted table (or source)
/// definition, if the schema is derived from external systems (like schema registry) or it's
/// created by `CREATE TABLE AS`.
///
/// Returns error if restoring failed, or called on non-`TableType::Table`, or the persisted
/// definition is invalid.
pub fn try_purify_table_create_sql_ast(
/// Returns error if restoring failed, or the persisted definition is invalid.
pub fn try_purify_table_source_create_sql_ast(
mut base: Statement,
columns: &[ColumnCatalog],
row_id_index: Option<usize>,
pk_column_ids: &[ColumnId],
) -> Result<Statement> {
let Statement::CreateTable {
let (Statement::CreateTable {
columns: column_defs,
constraints,
wildcard_idx,
..
} = &mut base
}
| Statement::CreateSource {
stmt:
CreateSourceStatement {
columns: column_defs,
constraints,
wildcard_idx,
..
},
}) = &mut base
else {
bail!("expect `CREATE TABLE` statement, found: `{:?}`", base);
bail!("expect `CREATE TABLE` or `CREATE SOURCE` statement, found: `{base:?}`");
};

// Filter out columns that are not defined by users in SQL.
Expand Down
52 changes: 51 additions & 1 deletion src/frontend/src/catalog/source_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::Context as _;
use itertools::Itertools as _;
use risingwave_common::catalog::{ColumnCatalog, SourceVersionId};
use risingwave_common::util::epoch::Epoch;
use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt};
use risingwave_pb::catalog::source::OptionalAssociatedTableId;
use risingwave_pb::catalog::{PbSource, StreamSourceInfo, WatermarkDesc};
use risingwave_sqlparser::ast;
use risingwave_sqlparser::parser::Parser;
use thiserror_ext::AsReport as _;

use super::purify::try_purify_table_source_create_sql_ast;
use super::{ColumnId, ConnectionId, DatabaseId, OwnedByUserCatalog, SchemaId, SourceId};
use crate::catalog::TableId;
use crate::error::Result;
use crate::session::current::notice_to_user;
use crate::user::UserId;

/// This struct `SourceCatalog` is used in frontend.
Expand Down Expand Up @@ -48,11 +56,22 @@ pub struct SourceCatalog {
}

impl SourceCatalog {
/// Returns the SQL statement that can be used to create this source.
/// Returns the SQL definition when the source was created.
pub fn create_sql(&self) -> String {
self.definition.clone()
}

/// Returns the parsed SQL definition when the source was created.
///
/// Returns error if it's invalid.
pub fn create_sql_ast(&self) -> Result<ast::Statement> {
Ok(Parser::parse_sql(&self.definition)
.context("unable to parse definition sql")?
.into_iter()
.exactly_one()
.context("expecting exactly one statement in definition")?)
}

pub fn to_prost(&self, schema_id: SchemaId, database_id: DatabaseId) -> PbSource {
let (with_properties, secret_refs) = self.with_properties.clone().into_parts();
PbSource {
Expand Down Expand Up @@ -94,6 +113,37 @@ impl SourceCatalog {
}
}

impl SourceCatalog {
/// Returns the SQL definition when the source was created, purified with best effort.
pub fn create_sql_purified(&self) -> String {
self.create_sql_ast_purified()
.map(|stmt| stmt.to_string())
.unwrap_or_else(|_| self.create_sql())
}

/// Returns the parsed SQL definition when the source was created, purified with best effort.
///
/// Returns error if it's invalid.
pub fn create_sql_ast_purified(&self) -> Result<ast::Statement> {
match try_purify_table_source_create_sql_ast(
self.create_sql_ast()?,
&self.columns,
self.row_id_index,
&self.pk_col_ids,
) {
Ok(stmt) => return Ok(stmt),
Err(e) => notice_to_user(format!(
"error occurred while purifying definition for source \"{}\", \
results may be inaccurate: {}",
self.name,
e.as_report()
)),
}

self.create_sql_ast()
}
}

impl From<&PbSource> for SourceCatalog {
fn from(prost: &PbSource) -> Self {
let id = prost.id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ fn read_rw_sources_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwSource>>
append_only: source.append_only,
associated_table_id: source.associated_table_id.map(|id| id.table_id as i32),
connection_id: source.connection_id.map(|id| id as i32),
definition: source.create_sql(),
definition: source.create_sql_purified(),
acl: get_acl_items(&Object::SourceId(source.id), false, &users, username_map),
initialized_at: source.initialized_at_epoch.map(|e| e.as_timestamptz()),
created_at: source.created_at_epoch.map(|e| e.as_timestamptz()),
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use risingwave_sqlparser::ast;
use risingwave_sqlparser::parser::Parser;
use thiserror_ext::AsReport as _;

use super::purify::try_purify_table_create_sql_ast;
use super::purify::try_purify_table_source_create_sql_ast;
use super::{ColumnId, DatabaseId, FragmentId, OwnedByUserCatalog, SchemaId, SinkId};
use crate::error::{ErrorCode, Result, RwError};
use crate::expr::ExprImpl;
Expand Down Expand Up @@ -300,7 +300,7 @@ impl TableCatalog {
self.create_sql_ast()?
};

match try_purify_table_create_sql_ast(
match try_purify_table_source_create_sql_ast(
base,
self.columns(),
self.row_id_index,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ pub fn handle_show_create_object(
.get_source_by_name(&object_name)
.filter(|s| s.associated_table_id.is_none())
.ok_or_else(|| CatalogError::NotFound("source", name.to_string()))?;
source.create_sql()
source.create_sql_purified()
}
ShowCreateType::Index => {
let index = schema
Expand Down

0 comments on commit 448550d

Please sign in to comment.