diff --git a/src/connector/benches/json_parser_case_insensitive.rs b/src/connector/benches/json_parser_case_insensitive.rs index 4affeb39472a2..ba3a9ad0bbe63 100644 --- a/src/connector/benches/json_parser_case_insensitive.rs +++ b/src/connector/benches/json_parser_case_insensitive.rs @@ -89,7 +89,7 @@ fn create_parser(chunk_size: usize, chunk_num: usize, mode: &str) -> (Parser, In async fn parse(parser: Parser, input: Input) { parser - .into_stream(futures::stream::iter(input.into_iter().map(Ok)).boxed()) + .parse_stream(futures::stream::iter(input.into_iter().map(Ok)).boxed()) .count() // consume the stream .await; } diff --git a/src/connector/benches/nexmark_integration.rs b/src/connector/benches/nexmark_integration.rs index f59ae08a9ec14..47b1ab07742d5 100644 --- a/src/connector/benches/nexmark_integration.rs +++ b/src/connector/benches/nexmark_integration.rs @@ -30,7 +30,7 @@ use risingwave_connector::parser::{ ByteStreamSourceParserImpl, CommonParserConfig, ParserConfig, SpecificParserConfig, }; use risingwave_connector::source::{ - BoxChunkSourceStream, BoxSourceStream, SourceColumnDesc, SourceMessage, SourceMeta, + BoxSourceChunkStream, BoxSourceMessageStream, SourceColumnDesc, SourceMessage, SourceMeta, }; use tracing::Level; use tracing_subscriber::prelude::*; @@ -72,7 +72,7 @@ fn make_batch(use_struct: bool) -> Vec { .collect_vec() } -fn make_data_stream(use_struct: bool) -> BoxSourceStream { +fn make_data_stream(use_struct: bool) -> BoxSourceMessageStream { futures::future::ready(Ok(if use_struct { STRUCT_BATCH.clone() } else { @@ -118,8 +118,8 @@ fn make_parser(use_struct: bool) -> ByteStreamSourceParserImpl { } fn make_stream_iter(use_struct: bool) -> impl Iterator { - let mut stream: BoxChunkSourceStream = make_parser(use_struct) - .into_stream(make_data_stream(use_struct)) + let mut stream: BoxSourceChunkStream = make_parser(use_struct) + .parse_stream(make_data_stream(use_struct)) .boxed(); std::iter::from_fn(move || { diff --git a/src/connector/src/parser/chunk_builder.rs b/src/connector/src/parser/chunk_builder.rs index 27b825ac90ed5..ee2a5f992e800 100644 --- a/src/connector/src/parser/chunk_builder.rs +++ b/src/connector/src/parser/chunk_builder.rs @@ -43,6 +43,12 @@ struct Transaction { } /// A builder for building a [`StreamChunk`] from [`SourceColumnDesc`]. +/// +/// Output chunk size is controlled by `source_ctrl_opts.chunk_size` and `source_ctrl_opts.split_txn`. +/// During building process, it's possible that multiple chunks are built even without any explicit +/// call to `finish_current_chunk`. This mainly happens when we find more than one records in one +/// `SourceMessage` when parsing it. User of this builder should call `consume_ready_chunks` to consume +/// the built chunks from time to time, to avoid the buffer from growing too large. pub struct SourceStreamChunkBuilder { column_descs: Vec, source_ctrl_opts: SourceCtrlOpts, @@ -299,7 +305,7 @@ impl SourceStreamChunkRowWriter<'_> { } (&SourceColumnType::Meta, _) if matches!( - &self.row_meta.map(|ele| ele.meta), + &self.row_meta.map(|ele| ele.source_meta), &Some(SourceMeta::Kafka(_) | SourceMeta::DebeziumCdc(_)) ) => { @@ -318,7 +324,7 @@ impl SourceStreamChunkRowWriter<'_> { ) => { match self.row_meta { Some(row_meta) => { - if let SourceMeta::DebeziumCdc(cdc_meta) = row_meta.meta { + if let SourceMeta::DebeziumCdc(cdc_meta) = row_meta.source_meta { Ok(A::output_for(extract_cdc_meta_column( cdc_meta, col, @@ -334,7 +340,9 @@ impl SourceStreamChunkRowWriter<'_> { } } (_, &Some(AdditionalColumnType::Timestamp(_))) => match self.row_meta { - Some(row_meta) => Ok(A::output_for(extract_timestamp_from_meta(row_meta.meta))), + Some(row_meta) => Ok(A::output_for(extract_timestamp_from_meta( + row_meta.source_meta, + ))), None => parse_field(desc), // parse from payload }, (_, &Some(AdditionalColumnType::CollectionName(_))) => { @@ -344,7 +352,7 @@ impl SourceStreamChunkRowWriter<'_> { (_, &Some(AdditionalColumnType::Subject(_))) => Ok(A::output_for( self.row_meta .as_ref() - .and_then(|ele| extract_subject_from_meta(ele.meta)) + .and_then(|ele| extract_subject_from_meta(ele.source_meta)) .unwrap_or(None), )), (_, &Some(AdditionalColumnType::Partition(_))) => { @@ -369,7 +377,7 @@ impl SourceStreamChunkRowWriter<'_> { .as_ref() .and_then(|ele| { extract_header_inner_from_meta( - ele.meta, + ele.source_meta, header_inner.inner_field.as_ref(), header_inner.data_type.as_ref(), ) @@ -380,7 +388,7 @@ impl SourceStreamChunkRowWriter<'_> { (_, &Some(AdditionalColumnType::Headers(_))) => Ok(A::output_for( self.row_meta .as_ref() - .and_then(|ele| extract_headers_from_meta(ele.meta)) + .and_then(|ele| extract_headers_from_meta(ele.source_meta)) .unwrap_or(None), )), (_, &Some(AdditionalColumnType::Filename(_))) => { diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 410a062f6875a..d9438a4d00db4 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -48,7 +48,7 @@ use crate::parser::maxwell::MaxwellParser; use crate::schema::schema_registry::SchemaRegistryAuth; use crate::source::monitor::GLOBAL_SOURCE_METRICS; use crate::source::{ - BoxSourceStream, ChunkSourceStream, SourceColumnDesc, SourceColumnType, SourceContext, + BoxSourceMessageStream, SourceChunkStream, SourceColumnDesc, SourceColumnType, SourceContext, SourceContextRef, SourceCtrlOpts, SourceMeta, }; @@ -85,7 +85,7 @@ pub use unified::{AccessError, AccessResult}; /// Extracted from the `SourceMessage`. #[derive(Clone, Copy, Debug)] pub struct MessageMeta<'a> { - meta: &'a SourceMeta, + source_meta: &'a SourceMeta, split_id: &'a str, offset: &'a str, } @@ -102,7 +102,7 @@ impl<'a> MessageMeta<'a> { // Extract the offset from the meta data. SourceColumnType::Offset => Some(self.offset.into()), // Extract custom meta data per connector. - SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = self.meta => { + SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = self.source_meta => { assert_eq!( desc.name.as_str(), KAFKA_TIMESTAMP_COLUMN_NAME, @@ -110,7 +110,7 @@ impl<'a> MessageMeta<'a> { ); kafka_meta.extract_timestamp() } - SourceColumnType::Meta if let SourceMeta::DebeziumCdc(cdc_meta) = self.meta => { + SourceColumnType::Meta if let SourceMeta::DebeziumCdc(cdc_meta) = self.source_meta => { assert_eq!( desc.name.as_str(), TABLE_NAME_COLUMN_NAME, @@ -161,7 +161,7 @@ pub enum ParserFormat { /// `ByteStreamSourceParser` is the entrypoint abstraction for parsing messages. /// It consumes bytes of one individual message and produces parsed records. /// -/// It's used by [`ByteStreamSourceParserImpl::into_stream`]. `pub` is for benchmark only. +/// It's used by [`ByteStreamSourceParserImpl::parse_stream`]. `pub` is for benchmark only. pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static { /// The column descriptors of the output chunk. fn columns(&self) -> &[SourceColumnDesc]; @@ -202,25 +202,25 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static { #[easy_ext::ext(SourceParserIntoStreamExt)] impl P { - /// Parse a stream of vectors of `SourceMessage` into a stream of [`StreamChunk`]. + /// Parse a `SourceMessage` stream into a [`StreamChunk`] stream. /// /// # Arguments /// - /// - `msg_stream`: A stream of vectors of `SourceMessage`. + /// - `msg_stream`: A stream of batches of `SourceMessage`. /// /// # Returns /// - /// A [`ChunkSourceStream`] which is a stream of parsed chunks. Each of the parsed chunks - /// are guaranteed to have less than or equal to `source_ctrl_opts.chunk_size` rows, unless - /// there's a large transaction and `source_ctrl_opts.split_txn` is false. - pub fn into_stream(self, msg_stream: BoxSourceStream) -> impl ChunkSourceStream { + /// A [`SourceChunkStream`] of parsed chunks. Each of the parsed chunks are guaranteed + /// to have less than or equal to `source_ctrl_opts.chunk_size` rows, unless there's a + /// large transaction and `source_ctrl_opts.split_txn` is false. + pub fn parse_stream(self, msg_stream: BoxSourceMessageStream) -> impl SourceChunkStream { let actor_id = self.source_ctx().actor_id; let source_id = self.source_ctx().source_id.table_id(); // The stream will be long-lived. We use `instrument_with` here to create // a new span for the polling of each chunk. let source_ctrl_opts = self.source_ctx().source_ctrl_opts; - into_chunk_stream_inner(self, msg_stream, source_ctrl_opts) + parse_message_stream(self, msg_stream, source_ctrl_opts) .instrument_with(move || tracing::info_span!("source_parse_chunk", actor_id, source_id)) } } @@ -228,9 +228,9 @@ impl P { // TODO: when upsert is disabled, how to filter those empty payload // Currently, an err is returned for non upsert with empty payload #[try_stream(ok = StreamChunk, error = crate::error::ConnectorError)] -async fn into_chunk_stream_inner( +async fn parse_message_stream( mut parser: P, - msg_stream: BoxSourceStream, + msg_stream: BoxSourceMessageStream, source_ctrl_opts: SourceCtrlOpts, ) { let mut chunk_builder = @@ -266,7 +266,7 @@ async fn into_chunk_stream_inner( "handling a heartbeat message" ); chunk_builder.heartbeat(MessageMeta { - meta: &heartbeat_msg.meta, + source_meta: &heartbeat_msg.meta, split_id: &heartbeat_msg.split_id, offset: &heartbeat_msg.offset, }); @@ -312,7 +312,7 @@ async fn into_chunk_stream_inner( msg.key, msg.payload, chunk_builder.row_writer().with_meta(MessageMeta { - meta: &msg.meta, + source_meta: &msg.meta, split_id: &msg.split_id, offset: &msg.offset, }), @@ -428,16 +428,19 @@ pub enum ByteStreamSourceParserImpl { impl ByteStreamSourceParserImpl { /// Converts `SourceMessage` vec stream into [`StreamChunk`] stream. - pub fn into_stream(self, msg_stream: BoxSourceStream) -> impl ChunkSourceStream + Unpin { + pub fn parse_stream( + self, + msg_stream: BoxSourceMessageStream, + ) -> impl SourceChunkStream + Unpin { #[auto_enum(futures03::Stream)] let stream = match self { - Self::Csv(parser) => parser.into_stream(msg_stream), - Self::Debezium(parser) => parser.into_stream(msg_stream), - Self::DebeziumMongoJson(parser) => parser.into_stream(msg_stream), - Self::Maxwell(parser) => parser.into_stream(msg_stream), - Self::CanalJson(parser) => parser.into_stream(msg_stream), - Self::Plain(parser) => parser.into_stream(msg_stream), - Self::Upsert(parser) => parser.into_stream(msg_stream), + Self::Csv(parser) => parser.parse_stream(msg_stream), + Self::Debezium(parser) => parser.parse_stream(msg_stream), + Self::DebeziumMongoJson(parser) => parser.parse_stream(msg_stream), + Self::Maxwell(parser) => parser.parse_stream(msg_stream), + Self::CanalJson(parser) => parser.parse_stream(msg_stream), + Self::Plain(parser) => parser.parse_stream(msg_stream), + Self::Upsert(parser) => parser.parse_stream(msg_stream), }; Box::pin(stream) } @@ -513,7 +516,7 @@ pub mod test_utils { }) .collect_vec(); - self.into_stream(futures::stream::once(async { Ok(source_messages) }).boxed()) + self.parse_stream(futures::stream::once(async { Ok(source_messages) }).boxed()) .next() .await .unwrap() @@ -531,7 +534,7 @@ pub mod test_utils { }) .collect_vec(); - self.into_stream(futures::stream::once(async { Ok(source_messages) }).boxed()) + self.parse_stream(futures::stream::once(async { Ok(source_messages) }).boxed()) .next() .await .unwrap() diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index fcd8c9f308b6a..7db9c7909cb43 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -95,7 +95,7 @@ impl PlainParser { // plain parser also used in the shared cdc source, // we need to handle transaction metadata and schema change messages here if let Some(msg_meta) = writer.row_meta() - && let SourceMeta::DebeziumCdc(cdc_meta) = msg_meta.meta + && let SourceMeta::DebeziumCdc(cdc_meta) = msg_meta.source_meta && let Some(data) = payload { match cdc_meta.msg_type { @@ -252,7 +252,7 @@ mod tests { let mut transactional = false; // for untransactional source, we expect emit a chunk for each message batch let message_stream = source_message_stream(transactional); - let chunk_stream = crate::parser::into_chunk_stream_inner( + let chunk_stream = crate::parser::parse_message_stream( parser, message_stream.boxed(), SourceCtrlOpts::for_test(), @@ -293,7 +293,7 @@ mod tests { // for transactional source, we expect emit a single chunk for the transaction transactional = true; let message_stream = source_message_stream(transactional); - let chunk_stream = crate::parser::into_chunk_stream_inner( + let chunk_stream = crate::parser::parse_message_stream( parser, message_stream.boxed(), SourceCtrlOpts::for_test(), @@ -426,7 +426,7 @@ mod tests { cdc_message::CdcMessageType::TransactionMeta, )); let msg_meta = MessageMeta { - meta: &cdc_meta, + source_meta: &cdc_meta, split_id: "1001", offset: "", }; @@ -500,7 +500,7 @@ mod tests { cdc_message::CdcMessageType::SchemaChange, )); let msg_meta = MessageMeta { - meta: &cdc_meta, + source_meta: &cdc_meta, split_id: "1001", offset: "", }; diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 1742334ee088c..0a8ce55006dbd 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -353,20 +353,22 @@ pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result Ok(SourceStruct::new(format, encode)) } -/// Stream of [`SourceMessage`]. -pub type BoxSourceStream = BoxStream<'static, crate::error::ConnectorResult>>; +/// Stream of [`SourceMessage`]. Messages flow through the stream in the unit of a batch. +pub type BoxSourceMessageStream = + BoxStream<'static, crate::error::ConnectorResult>>; +/// Stream of [`StreamChunk`]s parsed from the messages from the external source. +pub type BoxSourceChunkStream = BoxStream<'static, crate::error::ConnectorResult>; // Manually expand the trait alias to improve IDE experience. -pub trait ChunkSourceStream: +pub trait SourceChunkStream: Stream> + Send + 'static { } -impl ChunkSourceStream for T where +impl SourceChunkStream for T where T: Stream> + Send + 'static { } -pub type BoxChunkSourceStream = BoxStream<'static, crate::error::ConnectorResult>; pub type BoxTryStream = BoxStream<'static, crate::error::ConnectorResult>; /// [`SplitReader`] is a new abstraction of the external connector read interface which is @@ -385,7 +387,7 @@ pub trait SplitReader: Sized + Send { columns: Option>, ) -> crate::error::ConnectorResult; - fn into_stream(self) -> BoxChunkSourceStream; + fn into_stream(self) -> BoxSourceChunkStream; fn backfill_info(&self) -> HashMap { HashMap::new() diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index 7cef818f1855b..dededcbe0b037 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -33,7 +33,7 @@ use crate::parser::ParserConfig; use crate::source::base::SourceMessage; use crate::source::cdc::{CdcProperties, CdcSourceType, CdcSourceTypeTrait, DebeziumCdcSplit}; use crate::source::{ - into_chunk_stream, BoxChunkSourceStream, Column, SourceContextRef, SplitId, SplitMetaData, + into_chunk_stream, BoxSourceChunkStream, Column, SourceContextRef, SplitId, SplitMetaData, SplitReader, }; @@ -199,7 +199,7 @@ impl SplitReader for CdcSplitReader { Ok(instance) } - fn into_stream(self) -> BoxChunkSourceStream { + fn into_stream(self) -> BoxSourceChunkStream { let parser_config = self.parser_config.clone(); let source_context = self.source_ctx.clone(); into_chunk_stream(self.into_data_stream(), parser_config, source_context) diff --git a/src/connector/src/source/common.rs b/src/connector/src/source/common.rs index bbe0a4d06aa96..69597ffd2dd43 100644 --- a/src/connector/src/source/common.rs +++ b/src/connector/src/source/common.rs @@ -93,7 +93,7 @@ pub(crate) async fn into_chunk_stream( let parser = crate::parser::ByteStreamSourceParserImpl::create(parser_config, source_ctx).await?; #[for_await] - for chunk in parser.into_stream(data_stream) { + for chunk in parser.parse_stream(data_stream) { yield chunk?; } } diff --git a/src/connector/src/source/datagen/source/reader.rs b/src/connector/src/source/datagen/source/reader.rs index 6771a00f465cd..8955255df2624 100644 --- a/src/connector/src/source/datagen/source/reader.rs +++ b/src/connector/src/source/datagen/source/reader.rs @@ -28,7 +28,7 @@ use crate::source::data_gen_util::spawn_data_generation_stream; use crate::source::datagen::source::SEQUENCE_FIELD_KIND; use crate::source::datagen::{DatagenProperties, DatagenSplit, FieldDesc}; use crate::source::{ - into_chunk_stream, BoxChunkSourceStream, Column, DataType, SourceContextRef, SourceMessage, + into_chunk_stream, BoxSourceChunkStream, Column, DataType, SourceContextRef, SourceMessage, SplitId, SplitMetaData, SplitReader, }; @@ -142,7 +142,7 @@ impl SplitReader for DatagenSplitReader { }) } - fn into_stream(self) -> BoxChunkSourceStream { + fn into_stream(self) -> BoxSourceChunkStream { // Will buffer at most 4 event chunks. const BUFFER_SIZE: usize = 4; // spawn_data_generation_stream(self.generator.into_native_stream(), BUFFER_SIZE).boxed() diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs index b48123caa229c..2f998bd043a0c 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs @@ -33,7 +33,7 @@ use crate::source::filesystem::nd_streaming::need_nd_streaming; use crate::source::filesystem::OpendalFsSplit; use crate::source::iceberg::read_parquet_file; use crate::source::{ - BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SourceMeta, SplitMetaData, + BoxSourceChunkStream, Column, SourceContextRef, SourceMessage, SourceMeta, SplitMetaData, SplitReader, }; @@ -69,7 +69,7 @@ impl SplitReader for OpendalReader { Ok(opendal_reader) } - fn into_stream(self) -> BoxChunkSourceStream { + fn into_stream(self) -> BoxSourceChunkStream { self.into_stream_inner() } } @@ -109,7 +109,7 @@ impl OpendalReader { let parser = ByteStreamSourceParserImpl::create(self.parser_config.clone(), source_ctx) .await?; - chunk_stream = Box::pin(parser.into_stream(line_stream)); + chunk_stream = Box::pin(parser.parse_stream(line_stream)); } #[for_await] diff --git a/src/connector/src/source/filesystem/s3/source/reader.rs b/src/connector/src/source/filesystem/s3/source/reader.rs index a09cf0ef074cd..3917f772cb4ad 100644 --- a/src/connector/src/source/filesystem/s3/source/reader.rs +++ b/src/connector/src/source/filesystem/s3/source/reader.rs @@ -40,7 +40,7 @@ use crate::source::filesystem::file_common::FsSplit; use crate::source::filesystem::nd_streaming::need_nd_streaming; use crate::source::filesystem::s3::S3Properties; use crate::source::{ - into_chunk_stream, BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SourceMeta, + into_chunk_stream, BoxSourceChunkStream, Column, SourceContextRef, SourceMessage, SourceMeta, }; const STREAM_READER_CAPACITY: usize = 4096; @@ -201,7 +201,7 @@ impl SplitReader for S3FileReader { Ok(s3_file_reader) } - fn into_stream(self) -> BoxChunkSourceStream { + fn into_stream(self) -> BoxSourceChunkStream { self.into_stream_inner() } } diff --git a/src/connector/src/source/filesystem/s3/source/split_stream.rs b/src/connector/src/source/filesystem/s3/source/split_stream.rs index 4824bdcdffd12..57aa26cef31af 100644 --- a/src/connector/src/source/filesystem/s3/source/split_stream.rs +++ b/src/connector/src/source/filesystem/s3/source/split_stream.rs @@ -18,7 +18,7 @@ use futures::io::Cursor; use futures::AsyncBufReadExt; use futures_async_stream::try_stream; -use crate::source::{BoxSourceStream, SourceMessage}; +use crate::source::{BoxSourceMessageStream, SourceMessage}; #[try_stream(boxed, ok = Vec, error = crate::error::ConnectorError)] /// This function splits a byte stream by the newline separator "(\r)\n" into a message stream. @@ -27,7 +27,7 @@ use crate::source::{BoxSourceStream, SourceMessage}; /// - When a bytes chunk does not end with "(\r)\n", we should not treat the last segment as a new line /// message, but keep it for the next chunk, and prepend it to the first line of the next chunk. /// - When a bytes chunk ends with "(\r)\n", there is no additional action required. -pub(super) async fn split_stream(data_stream: BoxSourceStream) { +pub(super) async fn split_stream(data_stream: BoxSourceMessageStream) { let mut last_message = None; #[for_await] diff --git a/src/connector/src/source/google_pubsub/source/reader.rs b/src/connector/src/source/google_pubsub/source/reader.rs index abc3ecd7743e3..b0d4f145f7570 100644 --- a/src/connector/src/source/google_pubsub/source/reader.rs +++ b/src/connector/src/source/google_pubsub/source/reader.rs @@ -23,7 +23,7 @@ use crate::error::{ConnectorError, ConnectorResult as Result}; use crate::parser::ParserConfig; use crate::source::google_pubsub::{PubsubProperties, PubsubSplit}; use crate::source::{ - into_chunk_stream, BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SplitId, + into_chunk_stream, BoxSourceChunkStream, Column, SourceContextRef, SourceMessage, SplitId, SplitMetaData, SplitReader, }; @@ -105,7 +105,7 @@ impl SplitReader for PubsubSplitReader { }) } - fn into_stream(self) -> BoxChunkSourceStream { + fn into_stream(self) -> BoxSourceChunkStream { let parser_config = self.parser_config.clone(); let source_context = self.source_ctx.clone(); into_chunk_stream(self.into_data_stream(), parser_config, source_context) diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index 1cf640ca04e9a..190c0d146230a 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -37,7 +37,7 @@ use crate::connector_common::IcebergCommon; use crate::error::{ConnectorError, ConnectorResult}; use crate::parser::ParserConfig; use crate::source::{ - BoxChunkSourceStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties, + BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties, SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields, }; pub const ICEBERG_CONNECTOR: &str = "iceberg"; @@ -490,7 +490,7 @@ impl SplitReader for IcebergFileReader { unimplemented!() } - fn into_stream(self) -> BoxChunkSourceStream { + fn into_stream(self) -> BoxSourceChunkStream { unimplemented!() } } diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index 20fde897ceb4f..4c8a78adbfec4 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -36,7 +36,7 @@ use crate::source::kafka::{ KafkaContextCommon, KafkaProperties, KafkaSplit, RwConsumerContext, KAFKA_ISOLATION_LEVEL, }; use crate::source::{ - into_chunk_stream, BackfillInfo, BoxChunkSourceStream, Column, SourceContextRef, SplitId, + into_chunk_stream, BackfillInfo, BoxSourceChunkStream, Column, SourceContextRef, SplitId, SplitImpl, SplitMetaData, SplitReader, }; @@ -178,7 +178,7 @@ impl SplitReader for KafkaSplitReader { }) } - fn into_stream(self) -> BoxChunkSourceStream { + fn into_stream(self) -> BoxSourceChunkStream { let parser_config = self.parser_config.clone(); let source_context = self.source_ctx.clone(); into_chunk_stream(self.into_data_stream(), parser_config, source_context) diff --git a/src/connector/src/source/kinesis/source/reader.rs b/src/connector/src/source/kinesis/source/reader.rs index c033a907f38a4..b1b290b90be9c 100644 --- a/src/connector/src/source/kinesis/source/reader.rs +++ b/src/connector/src/source/kinesis/source/reader.rs @@ -31,7 +31,7 @@ use crate::source::kinesis::source::message::from_kinesis_record; use crate::source::kinesis::split::{KinesisOffset, KinesisSplit}; use crate::source::kinesis::KinesisProperties; use crate::source::{ - into_chunk_stream, BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SplitId, + into_chunk_stream, BoxSourceChunkStream, Column, SourceContextRef, SourceMessage, SplitId, SplitMetaData, SplitReader, }; @@ -115,7 +115,7 @@ impl SplitReader for KinesisSplitReader { }) } - fn into_stream(self) -> BoxChunkSourceStream { + fn into_stream(self) -> BoxSourceChunkStream { let parser_config = self.parser_config.clone(); let source_context = self.source_ctx.clone(); into_chunk_stream(self.into_data_stream(), parser_config, source_context) diff --git a/src/connector/src/source/mqtt/source/reader.rs b/src/connector/src/source/mqtt/source/reader.rs index f4ab79bfffed8..19cf5b5c5e8b5 100644 --- a/src/connector/src/source/mqtt/source/reader.rs +++ b/src/connector/src/source/mqtt/source/reader.rs @@ -25,7 +25,7 @@ use crate::error::ConnectorResult as Result; use crate::parser::ParserConfig; use crate::source::common::into_chunk_stream; use crate::source::mqtt::MqttProperties; -use crate::source::{BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SplitReader}; +use crate::source::{BoxSourceChunkStream, Column, SourceContextRef, SourceMessage, SplitReader}; pub struct MqttSplitReader { eventloop: rumqttc::v5::EventLoop, @@ -77,7 +77,7 @@ impl SplitReader for MqttSplitReader { }) } - fn into_stream(self) -> BoxChunkSourceStream { + fn into_stream(self) -> BoxSourceChunkStream { let parser_config = self.parser_config.clone(); let source_context = self.source_ctx.clone(); into_chunk_stream(self.into_data_stream(), parser_config, source_context) diff --git a/src/connector/src/source/nats/source/reader.rs b/src/connector/src/source/nats/source/reader.rs index 5bcae105e07be..9562c266a5527 100644 --- a/src/connector/src/source/nats/source/reader.rs +++ b/src/connector/src/source/nats/source/reader.rs @@ -25,7 +25,7 @@ use crate::parser::ParserConfig; use crate::source::common::into_chunk_stream; use crate::source::nats::NatsProperties; use crate::source::{ - BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SplitId, SplitReader, + BoxSourceChunkStream, Column, SourceContextRef, SourceMessage, SplitId, SplitReader, }; pub struct NatsSplitReader { @@ -105,7 +105,7 @@ impl SplitReader for NatsSplitReader { }) } - fn into_stream(self) -> BoxChunkSourceStream { + fn into_stream(self) -> BoxSourceChunkStream { let parser_config = self.parser_config.clone(); let source_context = self.source_ctx.clone(); into_chunk_stream(self.into_data_stream(), parser_config, source_context) diff --git a/src/connector/src/source/nexmark/source/reader.rs b/src/connector/src/source/nexmark/source/reader.rs index bfc97593aa6f9..ba1858ddbe422 100644 --- a/src/connector/src/source/nexmark/source/reader.rs +++ b/src/connector/src/source/nexmark/source/reader.rs @@ -35,7 +35,7 @@ use crate::source::nexmark::source::combined_event::{ }; use crate::source::nexmark::{NexmarkProperties, NexmarkSplit}; use crate::source::{ - BoxChunkSourceStream, Column, SourceContextRef, SplitId, SplitMetaData, SplitReader, + BoxSourceChunkStream, Column, SourceContextRef, SplitId, SplitMetaData, SplitReader, }; #[derive(Debug)] @@ -107,7 +107,7 @@ impl SplitReader for NexmarkSplitReader { }) } - fn into_stream(self) -> BoxChunkSourceStream { + fn into_stream(self) -> BoxSourceChunkStream { let actor_id = self.source_ctx.actor_id.to_string(); let fragment_id = self.source_ctx.fragment_id.to_string(); let source_id = self.source_ctx.source_id.to_string(); diff --git a/src/connector/src/source/pulsar/source/reader.rs b/src/connector/src/source/pulsar/source/reader.rs index 62c8e13152138..c03a9ff342e3b 100644 --- a/src/connector/src/source/pulsar/source/reader.rs +++ b/src/connector/src/source/pulsar/source/reader.rs @@ -38,7 +38,7 @@ use crate::parser::ParserConfig; use crate::source::pulsar::split::PulsarSplit; use crate::source::pulsar::{PulsarEnumeratorOffset, PulsarProperties}; use crate::source::{ - into_chunk_stream, BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SplitId, + into_chunk_stream, BoxSourceChunkStream, Column, SourceContextRef, SourceMessage, SplitId, SplitMetaData, SplitReader, }; @@ -86,7 +86,7 @@ impl SplitReader for PulsarSplitReader { } } - fn into_stream(self) -> BoxChunkSourceStream { + fn into_stream(self) -> BoxSourceChunkStream { match self { Self::Broker(reader) => { let (parser_config, source_context) = @@ -240,7 +240,7 @@ impl SplitReader for PulsarBrokerReader { }) } - fn into_stream(self) -> BoxChunkSourceStream { + fn into_stream(self) -> BoxSourceChunkStream { let parser_config = self.parser_config.clone(); let source_context = self.source_ctx.clone(); into_chunk_stream(self.into_data_stream(), parser_config, source_context) diff --git a/src/connector/src/source/reader/fs_reader.rs b/src/connector/src/source/reader/fs_reader.rs index ae05bc64ca1a5..d220793b10e86 100644 --- a/src/connector/src/source/reader/fs_reader.rs +++ b/src/connector/src/source/reader/fs_reader.rs @@ -24,7 +24,7 @@ use risingwave_common::catalog::ColumnId; use crate::error::ConnectorResult; use crate::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig}; use crate::source::{ - create_split_reader, BoxChunkSourceStream, ConnectorProperties, ConnectorState, + create_split_reader, BoxSourceChunkStream, ConnectorProperties, ConnectorState, SourceColumnDesc, SourceContext, SplitReader, }; use crate::{dispatch_source_prop, WithOptionsSecResolved}; @@ -79,7 +79,7 @@ impl FsSourceReader { state: ConnectorState, column_ids: Vec, source_ctx: Arc, - ) -> ConnectorResult { + ) -> ConnectorResult { let config = self.config.clone(); let columns = self.get_target_columns(column_ids)?; diff --git a/src/connector/src/source/reader/reader.rs b/src/connector/src/source/reader/reader.rs index f849e7ba21aa3..48c82c1080e1b 100644 --- a/src/connector/src/source/reader/reader.rs +++ b/src/connector/src/source/reader/reader.rs @@ -36,7 +36,7 @@ use crate::source::filesystem::opendal_source::{ }; use crate::source::filesystem::{FsPageItem, OpendalFsSplit}; use crate::source::{ - create_split_reader, BackfillInfo, BoxChunkSourceStream, BoxTryStream, Column, + create_split_reader, BackfillInfo, BoxSourceChunkStream, BoxTryStream, Column, ConnectorProperties, ConnectorState, SourceColumnDesc, SourceContext, SplitId, SplitImpl, SplitReader, WaitCheckpointTask, }; @@ -149,7 +149,7 @@ impl SourceReader { state: ConnectorState, column_ids: Vec, source_ctx: Arc, - ) -> ConnectorResult<(BoxChunkSourceStream, HashMap)> { + ) -> ConnectorResult<(BoxSourceChunkStream, HashMap)> { let Some(splits) = state else { return Ok((pending().boxed(), HashMap::new())); }; @@ -210,7 +210,7 @@ impl SourceReader { }) } - /// Build `SplitReader`s and then `BoxChunkSourceStream` from the given `ConnectorState` (`SplitImpl`s). + /// Build `SplitReader`s and then `BoxSourceChunkStream` from the given `ConnectorState` (`SplitImpl`s). /// /// If `seek_to_latest` is true, will also return the latest splits after seek. pub async fn build_stream( @@ -219,7 +219,7 @@ impl SourceReader { column_ids: Vec, source_ctx: Arc, seek_to_latest: bool, - ) -> ConnectorResult<(BoxChunkSourceStream, Option>)> { + ) -> ConnectorResult<(BoxSourceChunkStream, Option>)> { let Some(splits) = state else { return Ok((pending().boxed(), None)); }; diff --git a/src/connector/src/source/test_source.rs b/src/connector/src/source/test_source.rs index 57f659873936b..1ff8d75e4cc2a 100644 --- a/src/connector/src/source/test_source.rs +++ b/src/connector/src/source/test_source.rs @@ -25,7 +25,7 @@ use with_options::WithOptions; use crate::error::ConnectorResult; use crate::parser::ParserConfig; use crate::source::{ - BoxChunkSourceStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties, + BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties, SplitEnumerator, SplitId, SplitMetaData, SplitReader, TryFromBTreeMap, }; @@ -45,7 +45,7 @@ pub type BoxIntoSourceStream = Box< ParserConfig, SourceContextRef, Option>, - ) -> BoxChunkSourceStream + ) -> BoxSourceChunkStream + Send + 'static, >; @@ -69,7 +69,7 @@ impl BoxSource { ParserConfig, SourceContextRef, Option>, - ) -> BoxChunkSourceStream + ) -> BoxSourceChunkStream + Send + 'static, ) -> BoxSource { @@ -219,7 +219,7 @@ impl SplitReader for TestSourceSplitReader { }) } - fn into_stream(self) -> BoxChunkSourceStream { + fn into_stream(self) -> BoxSourceChunkStream { (get_registry() .box_source .lock() diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs index 650bd3ece8670..36a710b56bee9 100644 --- a/src/stream/src/executor/source/fetch_executor.rs +++ b/src/stream/src/executor/source/fetch_executor.rs @@ -29,7 +29,7 @@ use risingwave_connector::source::filesystem::opendal_source::{ use risingwave_connector::source::filesystem::OpendalFsSplit; use risingwave_connector::source::reader::desc::SourceDesc; use risingwave_connector::source::{ - BoxChunkSourceStream, SourceContext, SourceCtrlOpts, SplitImpl, SplitMetaData, + BoxSourceChunkStream, SourceContext, SourceCtrlOpts, SplitImpl, SplitMetaData, }; use risingwave_storage::store::PrefetchOptions; use thiserror_ext::AsReport; @@ -159,7 +159,7 @@ impl FsFetchExecutor { source_desc: &SourceDesc, batch: SplitBatch, rate_limit_rps: Option, - ) -> StreamExecutorResult { + ) -> StreamExecutorResult { let (stream, _) = source_desc .source .build_stream(batch, column_ids, Arc::new(source_ctx), false) diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index a1c2fe503c7a3..e128f091ee67b 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -27,7 +27,7 @@ use risingwave_common::util::epoch::EpochPair; use risingwave_connector::error::ConnectorError; use risingwave_connector::source::reader::desc::{FsSourceDesc, SourceDescBuilder}; use risingwave_connector::source::{ - BoxChunkSourceStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, + BoxSourceChunkStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData, }; use tokio::sync::mpsc::UnboundedReceiver; @@ -91,7 +91,7 @@ impl FsSourceExecutor { &mut self, source_desc: &FsSourceDesc, state: ConnectorState, - ) -> StreamExecutorResult { + ) -> StreamExecutorResult { let column_ids = source_desc .columns .iter() diff --git a/src/stream/src/executor/source/mod.rs b/src/stream/src/executor/source/mod.rs index 7911d99a6c4b0..7c22db7f52233 100644 --- a/src/stream/src/executor/source/mod.rs +++ b/src/stream/src/executor/source/mod.rs @@ -24,7 +24,7 @@ use risingwave_common::array::StreamChunk; use risingwave_common::bail; use risingwave_common::row::Row; use risingwave_connector::error::ConnectorError; -use risingwave_connector::source::{BoxChunkSourceStream, SourceColumnDesc, SplitId}; +use risingwave_connector::source::{BoxSourceChunkStream, SourceColumnDesc, SplitId}; use risingwave_pb::plan_common::additional_column::ColumnType; use risingwave_pb::plan_common::AdditionalColumn; pub use state_table_handler::*; @@ -120,7 +120,7 @@ pub fn prune_additional_cols( } #[try_stream(ok = StreamChunk, error = ConnectorError)] -pub async fn apply_rate_limit(stream: BoxChunkSourceStream, rate_limit_rps: Option) { +pub async fn apply_rate_limit(stream: BoxSourceChunkStream, rate_limit_rps: Option) { if rate_limit_rps == Some(0) { // block the stream until the rate limit is reset let future = futures::future::pending::<()>(); diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index 1cb1dd95f38ad..efd4372d9acc4 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -28,7 +28,7 @@ use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::types::JsonbVal; use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder}; use risingwave_connector::source::{ - BackfillInfo, BoxChunkSourceStream, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, + BackfillInfo, BoxSourceChunkStream, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData, }; use risingwave_hummock_sdk::HummockReadEpoch; @@ -283,7 +283,7 @@ impl SourceBackfillExecutorInner { &self, source_desc: &SourceDesc, splits: Vec, - ) -> StreamExecutorResult<(BoxChunkSourceStream, HashMap)> { + ) -> StreamExecutorResult<(BoxSourceChunkStream, HashMap)> { let column_ids = source_desc .columns .iter() diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 545f281279f23..c62f5722a3cd3 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -32,7 +32,7 @@ use risingwave_connector::parser::schema_change::SchemaChangeEnvelope; use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder}; use risingwave_connector::source::reader::reader::SourceReader; use risingwave_connector::source::{ - BoxChunkSourceStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, + BoxSourceChunkStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData, WaitCheckpointTask, }; use risingwave_hummock_sdk::HummockReadEpoch; @@ -127,7 +127,7 @@ impl SourceExecutor { source_desc: &SourceDesc, state: ConnectorState, seek_to_latest: bool, - ) -> StreamExecutorResult<(BoxChunkSourceStream, Option>)> { + ) -> StreamExecutorResult<(BoxSourceChunkStream, Option>)> { let (column_ids, source_ctx) = self.prepare_source_stream_build(source_desc); let (stream, latest_splits) = source_desc .source @@ -538,7 +538,7 @@ impl SourceExecutor { // because we can rely on the persisted source states to recover the source stream // and can avoid the potential race with "seek to latest" // https://github.com/risingwavelabs/risingwave/issues/19681#issuecomment-2532183002 - let mut reader_and_splits: Option<(BoxChunkSourceStream, Option>)> = + let mut reader_and_splits: Option<(BoxSourceChunkStream, Option>)> = None; let source_reader = source_desc.source.clone(); let (column_ids, source_ctx) = self.prepare_source_stream_build(&source_desc); @@ -868,9 +868,9 @@ impl SourceExecutor { async fn build_source_stream_and_poll_barrier( barrier_stream: &mut BoxStream<'static, StreamExecutorResult>, - reader_and_splits: &mut Option<(BoxChunkSourceStream, Option>)>, + reader_and_splits: &mut Option<(BoxSourceChunkStream, Option>)>, build_future: &mut Pin< - Box>)>>, + Box>)>>, >, ) -> StreamExecutorResult> { if reader_and_splits.is_some() {