Skip to content

Commit c3bad94

Browse files
committed
fix: alter shared source fresh schema will make it non-shared
fix #19799 Note: This problem only exists for `REFRESH SCHEMA` (i.e., source with schema regitry), but not for `ADD COLUMN`, because only the former updated `source_info` Signed-off-by: xxchan <[email protected]>
1 parent 5a85b55 commit c3bad94

File tree

6 files changed

+134
-54
lines changed

6 files changed

+134
-54
lines changed

e2e_test/source_inline/kafka/alter/add_column_shared.slt

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,16 @@ alter source s add column v3 varchar;
5050

5151
# New MV will have v3.
5252

53+
# Check it should still be shared source <https://github.com/risingwavelabs/risingwave/issues/19799>
54+
query
55+
explain create materialized view mv_after_alter as select * from s;
56+
----
57+
StreamMaterialize { columns: [v1, v2, v3, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
58+
└─StreamProject { exprs: [v1, v2, v3, _row_id] }
59+
└─StreamRowIdGen { row_id_index: 5 }
60+
└─StreamSourceScan { columns: [v1, v2, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id, v3] }
61+
62+
5363
statement ok
5464
create materialized view mv_after_alter as select * from s;
5565

@@ -106,16 +116,6 @@ select * from mv_after_alter;
106116
7 g g1
107117
8 h h1
108118

109-
query error
110-
select * from mv_after_alter_2;
111-
----
112-
db error: ERROR: Failed to run the query
113-
114-
Caused by these errors (recent errors listed first):
115-
1: Catalog error
116-
2: table or source not found: mv_after_alter_2
117-
118-
119119

120120
# Batch select from source will have v3.
121121

@@ -146,6 +146,19 @@ select * from mv_before_alter;
146146
8 h
147147

148148

149+
query ?? rowsort
150+
select * from mv_before_alter;
151+
----
152+
1 a
153+
2 b
154+
3 c
155+
4 d
156+
5 e
157+
6 f
158+
7 g
159+
8 h
160+
161+
149162
statement ok
150163
drop source s cascade;
151164

@@ -195,5 +208,3 @@ drop source s cascade;
195208

196209
system ok
197210
rpk topic delete shared_source_alter;
198-
199-
# TODO: test alter source with schema registry

e2e_test/source_inline/kafka/protobuf/alter_source_shared.slt

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,16 @@ sleep 5s
5454
statement ok
5555
ALTER SOURCE src_user REFRESH SCHEMA;
5656

57+
# Check it should still be shared source <https://github.com/risingwavelabs/risingwave/issues/19799>
58+
query
59+
EXPLAIN CREATE MATERIALIZED VIEW mv_user_more AS SELECT * FROM src_user;
60+
----
61+
StreamMaterialize { columns: [id, name, address, city, gender, sc, _rw_kafka_timestamp, age, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
62+
└─StreamProject { exprs: [id, name, address, city, gender, sc, _rw_kafka_timestamp, age, _row_id] }
63+
└─StreamRowIdGen { row_id_index: 9 }
64+
└─StreamSourceScan { columns: [id, name, address, city, gender, sc, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id, age] }
65+
66+
5767
statement ok
5868
CREATE MATERIALIZED VIEW mv_user_more AS SELECT * FROM src_user;
5969

@@ -99,5 +109,5 @@ SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM mv_user_more;
99109
30 104 0 1020
100110

101111

102-
# statement ok
103-
# DROP SOURCE src_user CASCADE;
112+
statement ok
113+
DROP SOURCE src_user CASCADE;

src/connector/src/with_options.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,3 +230,9 @@ impl TryFrom<&WithOptionsSecResolved> for Option<SinkFormatDesc> {
230230
}
231231
}
232232
}
233+
234+
impl Get for WithOptionsSecResolved {
235+
fn get(&self, key: &str) -> Option<&String> {
236+
self.inner.get(key)
237+
}
238+
}

src/frontend/src/handler/alter_source_with_sr.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,14 @@ use risingwave_sqlparser::ast::{
3131
use risingwave_sqlparser::parser::Parser;
3232

3333
use super::alter_table_column::schema_has_schema_registry;
34-
use super::create_source::{
35-
bind_columns_from_source, generate_stream_graph_for_source, validate_compatibility,
36-
};
34+
use super::create_source::{generate_stream_graph_for_source, validate_compatibility};
3735
use super::util::SourceSchemaCompatExt;
3836
use super::{HandlerArgs, RwPgResponse};
3937
use crate::catalog::root_catalog::SchemaPath;
4038
use crate::catalog::source_catalog::SourceCatalog;
4139
use crate::catalog::{DatabaseId, SchemaId};
4240
use crate::error::{ErrorCode, Result};
41+
use crate::handler::create_source::{bind_columns_from_source, CreateSourceType};
4342
use crate::session::SessionImpl;
4443
use crate::utils::resolve_secret_ref_in_with_options;
4544
use crate::{Binder, WithOptions};
@@ -164,8 +163,13 @@ pub async fn refresh_sr_and_get_columns_diff(
164163
bail_not_implemented!("altering a cdc source is not supported");
165164
}
166165

167-
let (Some(columns_from_resolve_source), source_info) =
168-
bind_columns_from_source(session, format_encode, Either::Right(&with_properties)).await?
166+
let (Some(columns_from_resolve_source), source_info) = bind_columns_from_source(
167+
session,
168+
format_encode,
169+
Either::Right(&with_properties),
170+
CreateSourceType::from_with_properties(session, &with_properties),
171+
)
172+
.await?
169173
else {
170174
// Source without schema registry is rejected.
171175
unreachable!("source without schema registry is rejected")
@@ -277,7 +281,6 @@ pub async fn handle_alter_source_with_sr(
277281
source.version += 1;
278282

279283
let pb_source = source.to_prost(schema_id, database_id);
280-
281284
let catalog_writer = session.catalog_writer()?;
282285
if source.info.is_shared() {
283286
let graph = generate_stream_graph_for_source(handler_args, source.clone())?;

src/frontend/src/handler/create_source.rs

Lines changed: 73 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -302,12 +302,71 @@ fn get_name_strategy_or_default(name_strategy: Option<AstString>) -> Result<Opti
302302
}
303303
}
304304

305+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
306+
pub enum CreateSourceType {
307+
SharedCdc,
308+
/// e.g., shared Kafka source
309+
SharedNonCdc,
310+
NonShared,
311+
/// create table with connector
312+
Table,
313+
}
314+
315+
impl CreateSourceType {
316+
pub fn from_with_properties(
317+
session: &SessionImpl,
318+
with_properties: &impl WithPropertiesExt,
319+
) -> Self {
320+
if with_properties.is_shareable_cdc_connector() {
321+
CreateSourceType::SharedCdc
322+
} else if with_properties.is_shareable_non_cdc_connector()
323+
&& session
324+
.env()
325+
.streaming_config()
326+
.developer
327+
.enable_shared_source
328+
&& session.config().streaming_use_shared_source()
329+
{
330+
CreateSourceType::SharedNonCdc
331+
} else {
332+
CreateSourceType::NonShared
333+
}
334+
}
335+
336+
pub fn is_shared(&self) -> bool {
337+
matches!(
338+
self,
339+
CreateSourceType::SharedCdc | CreateSourceType::SharedNonCdc
340+
)
341+
}
342+
}
343+
305344
/// Resolves the schema of the source from external schema file.
306345
/// See <https://www.risingwave.dev/docs/current/sql-create-source> for more information.
307346
///
308347
/// Note: the returned schema strictly corresponds to the schema.
309348
/// Other special columns like additional columns (`INCLUDE`), and `row_id` column are not included.
310-
pub(crate) async fn bind_columns_from_source(
349+
pub async fn bind_columns_from_source(
350+
session: &SessionImpl,
351+
format_encode: &FormatEncodeOptions,
352+
with_properties: Either<&WithOptions, &WithOptionsSecResolved>,
353+
create_source_type: CreateSourceType,
354+
) -> Result<(Option<Vec<ColumnCatalog>>, StreamSourceInfo)> {
355+
let (columns_from_resolve_source, mut source_info) =
356+
if create_source_type == CreateSourceType::SharedCdc {
357+
bind_columns_from_source_for_cdc(session, format_encode)?
358+
} else {
359+
bind_columns_from_source_for_non_cdc(session, format_encode, with_properties).await?
360+
};
361+
if create_source_type.is_shared() {
362+
// Note: this field should be called is_shared. Check field doc for more details.
363+
source_info.cdc_source_job = true;
364+
source_info.is_distributed = create_source_type == CreateSourceType::SharedNonCdc;
365+
}
366+
Ok((columns_from_resolve_source, source_info))
367+
}
368+
369+
async fn bind_columns_from_source_for_non_cdc(
311370
session: &SessionImpl,
312371
format_encode: &FormatEncodeOptions,
313372
with_properties: Either<&WithOptions, &WithOptionsSecResolved>,
@@ -1542,9 +1601,7 @@ pub async fn bind_create_source_or_table_with_connector(
15421601
source_info: StreamSourceInfo,
15431602
include_column_options: IncludeOption,
15441603
col_id_gen: &mut ColumnIdGenerator,
1545-
// `true` for "create source", `false` for "create table with connector"
1546-
is_create_source: bool,
1547-
is_shared_non_cdc: bool,
1604+
create_source_type: CreateSourceType,
15481605
source_rate_limit: Option<u32>,
15491606
) -> Result<(SourceCatalog, DatabaseId, SchemaId)> {
15501607
let session = &handler_args.session;
@@ -1553,6 +1610,7 @@ pub async fn bind_create_source_or_table_with_connector(
15531610
let (database_id, schema_id) =
15541611
session.get_database_and_schema_id_for_create(schema_name.clone())?;
15551612

1613+
let is_create_source = create_source_type != CreateSourceType::Table;
15561614
if !is_create_source && with_properties.is_iceberg_connector() {
15571615
return Err(ErrorCode::BindError(
15581616
"can't CREATE TABLE with iceberg connector\n\nHint: use CREATE SOURCE instead"
@@ -1609,7 +1667,7 @@ pub async fn bind_create_source_or_table_with_connector(
16091667

16101668
// For shared sources, we will include partition and offset cols in the SourceExecutor's *output*, to be used by the SourceBackfillExecutor.
16111669
// For shared CDC source, the schema is different. See debezium_cdc_source_schema, CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS
1612-
if is_shared_non_cdc {
1670+
if create_source_type == CreateSourceType::SharedNonCdc {
16131671
let (columns_exist, additional_columns) = source_add_partition_offset_cols(
16141672
&columns,
16151673
&with_properties.get_connector().unwrap(),
@@ -1748,26 +1806,14 @@ pub async fn handle_create_source(
17481806
let format_encode = stmt.format_encode.into_v2_with_warning();
17491807
let with_properties = bind_connector_props(&handler_args, &format_encode, true)?;
17501808

1751-
let create_cdc_source_job = with_properties.is_shareable_cdc_connector();
1752-
let is_shared_non_cdc = with_properties.is_shareable_non_cdc_connector()
1753-
&& session
1754-
.env()
1755-
.streaming_config()
1756-
.developer
1757-
.enable_shared_source
1758-
&& session.config().streaming_use_shared_source();
1759-
let is_shared = create_cdc_source_job || is_shared_non_cdc;
1760-
1761-
let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job {
1762-
bind_columns_from_source_for_cdc(&session, &format_encode)?
1763-
} else {
1764-
bind_columns_from_source(&session, &format_encode, Either::Left(&with_properties)).await?
1765-
};
1766-
if is_shared {
1767-
// Note: this field should be called is_shared. Check field doc for more details.
1768-
source_info.cdc_source_job = true;
1769-
source_info.is_distributed = !create_cdc_source_job;
1770-
}
1809+
let create_source_type = CreateSourceType::from_with_properties(&session, &*with_properties);
1810+
let (columns_from_resolve_source, source_info) = bind_columns_from_source(
1811+
&session,
1812+
&format_encode,
1813+
Either::Left(&with_properties),
1814+
create_source_type,
1815+
)
1816+
.await?;
17711817
let mut col_id_gen = ColumnIdGenerator::new_initial();
17721818

17731819
let (source_catalog, database_id, schema_id) = bind_create_source_or_table_with_connector(
@@ -1783,8 +1829,7 @@ pub async fn handle_create_source(
17831829
source_info,
17841830
stmt.include_column_options,
17851831
&mut col_id_gen,
1786-
true,
1787-
is_shared_non_cdc,
1832+
create_source_type,
17881833
overwrite_options.source_rate_limit,
17891834
)
17901835
.await?;
@@ -1802,7 +1847,7 @@ pub async fn handle_create_source(
18021847

18031848
let catalog_writer = session.catalog_writer()?;
18041849

1805-
if is_shared {
1850+
if create_source_type.is_shared() {
18061851
let graph = generate_stream_graph_for_source(handler_args, source_catalog)?;
18071852
catalog_writer.create_source(source, Some(graph)).await?;
18081853
} else {

src/frontend/src/handler/create_table.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ use risingwave_sqlparser::ast::{
5959
use risingwave_sqlparser::parser::{IncludeOption, Parser};
6060
use thiserror_ext::AsReport;
6161

62+
use super::create_source::{bind_columns_from_source, CreateSourceType};
6263
use super::{create_sink, create_source, RwPgResponse};
6364
use crate::binder::{bind_data_type, bind_struct_field, Clause, SecureCompareContext};
6465
use crate::catalog::root_catalog::SchemaPath;
@@ -68,8 +69,8 @@ use crate::catalog::{check_valid_column_name, ColumnId, DatabaseId, SchemaId};
6869
use crate::error::{ErrorCode, Result, RwError};
6970
use crate::expr::{Expr, ExprImpl, ExprRewriter};
7071
use crate::handler::create_source::{
71-
bind_columns_from_source, bind_connector_props, bind_create_source_or_table_with_connector,
72-
bind_source_watermark, handle_addition_columns, UPSTREAM_SOURCE_KEY,
72+
bind_connector_props, bind_create_source_or_table_with_connector, bind_source_watermark,
73+
handle_addition_columns, UPSTREAM_SOURCE_KEY,
7374
};
7475
use crate::handler::HandlerArgs;
7576
use crate::optimizer::plan_node::generic::{CdcScanOptions, SourceNodeKind};
@@ -497,8 +498,13 @@ pub(crate) async fn gen_create_table_plan_with_source(
497498
let session = &handler_args.session;
498499
let with_properties = bind_connector_props(&handler_args, &format_encode, false)?;
499500

500-
let (columns_from_resolve_source, source_info) =
501-
bind_columns_from_source(session, &format_encode, Either::Left(&with_properties)).await?;
501+
let (columns_from_resolve_source, source_info) = bind_columns_from_source(
502+
session,
503+
&format_encode,
504+
Either::Left(&with_properties),
505+
CreateSourceType::Table,
506+
)
507+
.await?;
502508

503509
let overwrite_options = OverwriteOptions::new(&mut handler_args);
504510
let rate_limit = overwrite_options.source_rate_limit;
@@ -515,8 +521,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
515521
source_info,
516522
include_column_options,
517523
&mut col_id_gen,
518-
false,
519-
false,
524+
CreateSourceType::Table,
520525
rate_limit,
521526
)
522527
.await?;

0 commit comments

Comments
 (0)