Skip to content

Commit

Permalink
refactor(frontend): reuse def purification for assembling cdc table d…
Browse files Browse the repository at this point in the history
…ef when schema change

Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Jan 7, 2025
1 parent d6eb75b commit afc5463
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 28 deletions.
4 changes: 1 addition & 3 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ pub async fn get_replace_table_plan(
table_name: ObjectName,
new_definition: Statement,
old_catalog: &Arc<TableCatalog>,
new_version_columns: Option<Vec<ColumnCatalog>>, // only provided in auto schema change
) -> Result<(
Option<Source>,
Table,
Expand Down Expand Up @@ -141,7 +140,6 @@ pub async fn get_replace_table_plan(
on_conflict,
with_version_column,
cdc_table_info,
new_version_columns,
include_column_options,
engine,
)
Expand Down Expand Up @@ -391,7 +389,7 @@ pub async fn handle_alter_table_column(
};

let (source, table, graph, col_index_mapping, job_type) =
get_replace_table_plan(&session, table_name, definition, &original_catalog, None).await?;
get_replace_table_plan(&session, table_name, definition, &original_catalog).await?;

let catalog_writer = session.catalog_writer()?;

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/alter_table_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub async fn handle_refresh_schema(

let (source, table, graph, col_index_mapping, job_type) = {
let result =
get_replace_table_plan(&session, table_name, definition, &original_table, None).await;
get_replace_table_plan(&session, table_name, definition, &original_table).await;
match result {
Ok((source, table, graph, col_index_mapping, job_type)) => {
Ok((source, table, graph, col_index_mapping, job_type))
Expand Down
1 change: 0 additions & 1 deletion src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,6 @@ pub(crate) async fn reparse_table_for_sink(
on_conflict,
with_version_column,
None,
None,
include_column_options,
engine,
)
Expand Down
26 changes: 4 additions & 22 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1149,13 +1149,13 @@ pub(super) async fn handle_create_table_plan(
}

let (mut columns, pk_names) =
bind_cdc_table_schema(&column_defs, &constraints, None)?;
bind_cdc_table_schema(&column_defs, &constraints)?;
// read default value definition from external db
let (options, secret_refs) = cdc_with_options.clone().into_parts();
let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
.context("failed to extract external table config")?;

let table = ExternalTableImpl::connect(config)
let table: ExternalTableImpl = ExternalTableImpl::connect(config)
.await
.context("failed to auto derive table schema")?;
let external_columns: Vec<_> = table
Expand Down Expand Up @@ -1309,25 +1309,9 @@ async fn bind_cdc_table_schema_externally(
fn bind_cdc_table_schema(
column_defs: &Vec<ColumnDef>,
constraints: &Vec<TableConstraint>,
new_version_columns: Option<Vec<ColumnCatalog>>,
) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
let mut columns = bind_sql_columns(column_defs)?;
// If new_version_columns is provided, we are in the process of auto schema change.
// update the default value column since the default value column is not set in the
// column sql definition.
if let Some(new_version_columns) = new_version_columns {
for (col, new_version_col) in columns
.iter_mut()
.zip_eq_fast(new_version_columns.into_iter())
{
assert_eq!(col.name(), new_version_col.name());
col.column_desc.generated_or_default_column =
new_version_col.column_desc.generated_or_default_column;
}
}

let columns = bind_sql_columns(column_defs)?;
let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;

Ok((columns, pk_names))
}

Expand Down Expand Up @@ -1805,7 +1789,6 @@ pub async fn generate_stream_graph_for_replace_table(
on_conflict: Option<OnConflict>,
with_version_column: Option<String>,
cdc_table_info: Option<CdcTableInfo>,
new_version_columns: Option<Vec<ColumnCatalog>>,
include_column_options: IncludeOption,
engine: Engine,
) -> Result<(StreamFragmentGraph, Table, Option<PbSource>, TableJobType)> {
Expand Down Expand Up @@ -1861,8 +1844,7 @@ pub async fn generate_stream_graph_for_replace_table(
cdc_table.external_table_name.clone(),
)?;

let (columns, pk_names) =
bind_cdc_table_schema(&column_defs, &constraints, new_version_columns)?;
let (columns, pk_names) = bind_cdc_table_schema(&column_defs, &constraints)?;

let context: OptimizerContextRef =
OptimizerContext::new(handler_args, ExplainOptions::default()).into();
Expand Down
1 change: 0 additions & 1 deletion src/frontend/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ async fn get_new_table_plan(
table_name,
new_table_definition,
&original_catalog,
Some(new_version_columns),
)
.await?;

Expand Down

0 comments on commit afc5463

Please sign in to comment.