Skip to content

Commit

Permalink
refactor(source): rename some source parser related types and methods (
Browse files Browse the repository at this point in the history
…#19863)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Dec 19, 2024
1 parent f3d8e0d commit adb8651
Show file tree
Hide file tree
Showing 28 changed files with 113 additions and 100 deletions.
2 changes: 1 addition & 1 deletion src/connector/benches/json_parser_case_insensitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ fn create_parser(chunk_size: usize, chunk_num: usize, mode: &str) -> (Parser, In

async fn parse(parser: Parser, input: Input) {
parser
.into_stream(futures::stream::iter(input.into_iter().map(Ok)).boxed())
.parse_stream(futures::stream::iter(input.into_iter().map(Ok)).boxed())
.count() // consume the stream
.await;
}
Expand Down
8 changes: 4 additions & 4 deletions src/connector/benches/nexmark_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use risingwave_connector::parser::{
ByteStreamSourceParserImpl, CommonParserConfig, ParserConfig, SpecificParserConfig,
};
use risingwave_connector::source::{
BoxChunkSourceStream, BoxSourceStream, SourceColumnDesc, SourceMessage, SourceMeta,
BoxSourceChunkStream, BoxSourceMessageStream, SourceColumnDesc, SourceMessage, SourceMeta,
};
use tracing::Level;
use tracing_subscriber::prelude::*;
Expand Down Expand Up @@ -72,7 +72,7 @@ fn make_batch(use_struct: bool) -> Vec<SourceMessage> {
.collect_vec()
}

fn make_data_stream(use_struct: bool) -> BoxSourceStream {
fn make_data_stream(use_struct: bool) -> BoxSourceMessageStream {
futures::future::ready(Ok(if use_struct {
STRUCT_BATCH.clone()
} else {
Expand Down Expand Up @@ -118,8 +118,8 @@ fn make_parser(use_struct: bool) -> ByteStreamSourceParserImpl {
}

fn make_stream_iter(use_struct: bool) -> impl Iterator<Item = StreamChunk> {
let mut stream: BoxChunkSourceStream = make_parser(use_struct)
.into_stream(make_data_stream(use_struct))
let mut stream: BoxSourceChunkStream = make_parser(use_struct)
.parse_stream(make_data_stream(use_struct))
.boxed();

std::iter::from_fn(move || {
Expand Down
20 changes: 14 additions & 6 deletions src/connector/src/parser/chunk_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ struct Transaction {
}

/// A builder for building a [`StreamChunk`] from [`SourceColumnDesc`].
///
/// Output chunk size is controlled by `source_ctrl_opts.chunk_size` and `source_ctrl_opts.split_txn`.
/// During building process, it's possible that multiple chunks are built even without any explicit
/// call to `finish_current_chunk`. This mainly happens when we find more than one records in one
/// `SourceMessage` when parsing it. User of this builder should call `consume_ready_chunks` to consume
/// the built chunks from time to time, to avoid the buffer from growing too large.
pub struct SourceStreamChunkBuilder {
column_descs: Vec<SourceColumnDesc>,
source_ctrl_opts: SourceCtrlOpts,
Expand Down Expand Up @@ -299,7 +305,7 @@ impl SourceStreamChunkRowWriter<'_> {
}
(&SourceColumnType::Meta, _)
if matches!(
&self.row_meta.map(|ele| ele.meta),
&self.row_meta.map(|ele| ele.source_meta),
&Some(SourceMeta::Kafka(_) | SourceMeta::DebeziumCdc(_))
) =>
{
Expand All @@ -318,7 +324,7 @@ impl SourceStreamChunkRowWriter<'_> {
) => {
match self.row_meta {
Some(row_meta) => {
if let SourceMeta::DebeziumCdc(cdc_meta) = row_meta.meta {
if let SourceMeta::DebeziumCdc(cdc_meta) = row_meta.source_meta {
Ok(A::output_for(extract_cdc_meta_column(
cdc_meta,
col,
Expand All @@ -334,7 +340,9 @@ impl SourceStreamChunkRowWriter<'_> {
}
}
(_, &Some(AdditionalColumnType::Timestamp(_))) => match self.row_meta {
Some(row_meta) => Ok(A::output_for(extract_timestamp_from_meta(row_meta.meta))),
Some(row_meta) => Ok(A::output_for(extract_timestamp_from_meta(
row_meta.source_meta,
))),
None => parse_field(desc), // parse from payload
},
(_, &Some(AdditionalColumnType::CollectionName(_))) => {
Expand All @@ -344,7 +352,7 @@ impl SourceStreamChunkRowWriter<'_> {
(_, &Some(AdditionalColumnType::Subject(_))) => Ok(A::output_for(
self.row_meta
.as_ref()
.and_then(|ele| extract_subject_from_meta(ele.meta))
.and_then(|ele| extract_subject_from_meta(ele.source_meta))
.unwrap_or(None),
)),
(_, &Some(AdditionalColumnType::Partition(_))) => {
Expand All @@ -369,7 +377,7 @@ impl SourceStreamChunkRowWriter<'_> {
.as_ref()
.and_then(|ele| {
extract_header_inner_from_meta(
ele.meta,
ele.source_meta,
header_inner.inner_field.as_ref(),
header_inner.data_type.as_ref(),
)
Expand All @@ -380,7 +388,7 @@ impl SourceStreamChunkRowWriter<'_> {
(_, &Some(AdditionalColumnType::Headers(_))) => Ok(A::output_for(
self.row_meta
.as_ref()
.and_then(|ele| extract_headers_from_meta(ele.meta))
.and_then(|ele| extract_headers_from_meta(ele.source_meta))
.unwrap_or(None),
)),
(_, &Some(AdditionalColumnType::Filename(_))) => {
Expand Down
55 changes: 29 additions & 26 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::parser::maxwell::MaxwellParser;
use crate::schema::schema_registry::SchemaRegistryAuth;
use crate::source::monitor::GLOBAL_SOURCE_METRICS;
use crate::source::{
BoxSourceStream, ChunkSourceStream, SourceColumnDesc, SourceColumnType, SourceContext,
BoxSourceMessageStream, SourceChunkStream, SourceColumnDesc, SourceColumnType, SourceContext,
SourceContextRef, SourceCtrlOpts, SourceMeta,
};

Expand Down Expand Up @@ -85,7 +85,7 @@ pub use unified::{AccessError, AccessResult};
/// Extracted from the `SourceMessage`.
#[derive(Clone, Copy, Debug)]
pub struct MessageMeta<'a> {
meta: &'a SourceMeta,
source_meta: &'a SourceMeta,
split_id: &'a str,
offset: &'a str,
}
Expand All @@ -102,15 +102,15 @@ impl<'a> MessageMeta<'a> {
// Extract the offset from the meta data.
SourceColumnType::Offset => Some(self.offset.into()),
// Extract custom meta data per connector.
SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = self.meta => {
SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = self.source_meta => {
assert_eq!(
desc.name.as_str(),
KAFKA_TIMESTAMP_COLUMN_NAME,
"unexpected kafka meta column name"
);
kafka_meta.extract_timestamp()
}
SourceColumnType::Meta if let SourceMeta::DebeziumCdc(cdc_meta) = self.meta => {
SourceColumnType::Meta if let SourceMeta::DebeziumCdc(cdc_meta) = self.source_meta => {
assert_eq!(
desc.name.as_str(),
TABLE_NAME_COLUMN_NAME,
Expand Down Expand Up @@ -161,7 +161,7 @@ pub enum ParserFormat {
/// `ByteStreamSourceParser` is the entrypoint abstraction for parsing messages.
/// It consumes bytes of one individual message and produces parsed records.
///
/// It's used by [`ByteStreamSourceParserImpl::into_stream`]. `pub` is for benchmark only.
/// It's used by [`ByteStreamSourceParserImpl::parse_stream`]. `pub` is for benchmark only.
pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
/// The column descriptors of the output chunk.
fn columns(&self) -> &[SourceColumnDesc];
Expand Down Expand Up @@ -202,35 +202,35 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {

#[easy_ext::ext(SourceParserIntoStreamExt)]
impl<P: ByteStreamSourceParser> P {
/// Parse a stream of vectors of `SourceMessage` into a stream of [`StreamChunk`].
/// Parse a `SourceMessage` stream into a [`StreamChunk`] stream.
///
/// # Arguments
///
/// - `msg_stream`: A stream of vectors of `SourceMessage`.
/// - `msg_stream`: A stream of batches of `SourceMessage`.
///
/// # Returns
///
/// A [`ChunkSourceStream`] which is a stream of parsed chunks. Each of the parsed chunks
/// are guaranteed to have less than or equal to `source_ctrl_opts.chunk_size` rows, unless
/// there's a large transaction and `source_ctrl_opts.split_txn` is false.
pub fn into_stream(self, msg_stream: BoxSourceStream) -> impl ChunkSourceStream {
/// A [`SourceChunkStream`] of parsed chunks. Each of the parsed chunks are guaranteed
/// to have less than or equal to `source_ctrl_opts.chunk_size` rows, unless there's a
/// large transaction and `source_ctrl_opts.split_txn` is false.
pub fn parse_stream(self, msg_stream: BoxSourceMessageStream) -> impl SourceChunkStream {
let actor_id = self.source_ctx().actor_id;
let source_id = self.source_ctx().source_id.table_id();

// The stream will be long-lived. We use `instrument_with` here to create
// a new span for the polling of each chunk.
let source_ctrl_opts = self.source_ctx().source_ctrl_opts;
into_chunk_stream_inner(self, msg_stream, source_ctrl_opts)
parse_message_stream(self, msg_stream, source_ctrl_opts)
.instrument_with(move || tracing::info_span!("source_parse_chunk", actor_id, source_id))
}
}

// TODO: when upsert is disabled, how to filter those empty payload
// Currently, an err is returned for non upsert with empty payload
#[try_stream(ok = StreamChunk, error = crate::error::ConnectorError)]
async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
async fn parse_message_stream<P: ByteStreamSourceParser>(
mut parser: P,
msg_stream: BoxSourceStream,
msg_stream: BoxSourceMessageStream,
source_ctrl_opts: SourceCtrlOpts,
) {
let mut chunk_builder =
Expand Down Expand Up @@ -266,7 +266,7 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
"handling a heartbeat message"
);
chunk_builder.heartbeat(MessageMeta {
meta: &heartbeat_msg.meta,
source_meta: &heartbeat_msg.meta,
split_id: &heartbeat_msg.split_id,
offset: &heartbeat_msg.offset,
});
Expand Down Expand Up @@ -312,7 +312,7 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
msg.key,
msg.payload,
chunk_builder.row_writer().with_meta(MessageMeta {
meta: &msg.meta,
source_meta: &msg.meta,
split_id: &msg.split_id,
offset: &msg.offset,
}),
Expand Down Expand Up @@ -428,16 +428,19 @@ pub enum ByteStreamSourceParserImpl {

impl ByteStreamSourceParserImpl {
/// Converts `SourceMessage` vec stream into [`StreamChunk`] stream.
pub fn into_stream(self, msg_stream: BoxSourceStream) -> impl ChunkSourceStream + Unpin {
pub fn parse_stream(
self,
msg_stream: BoxSourceMessageStream,
) -> impl SourceChunkStream + Unpin {
#[auto_enum(futures03::Stream)]
let stream = match self {
Self::Csv(parser) => parser.into_stream(msg_stream),
Self::Debezium(parser) => parser.into_stream(msg_stream),
Self::DebeziumMongoJson(parser) => parser.into_stream(msg_stream),
Self::Maxwell(parser) => parser.into_stream(msg_stream),
Self::CanalJson(parser) => parser.into_stream(msg_stream),
Self::Plain(parser) => parser.into_stream(msg_stream),
Self::Upsert(parser) => parser.into_stream(msg_stream),
Self::Csv(parser) => parser.parse_stream(msg_stream),
Self::Debezium(parser) => parser.parse_stream(msg_stream),
Self::DebeziumMongoJson(parser) => parser.parse_stream(msg_stream),
Self::Maxwell(parser) => parser.parse_stream(msg_stream),
Self::CanalJson(parser) => parser.parse_stream(msg_stream),
Self::Plain(parser) => parser.parse_stream(msg_stream),
Self::Upsert(parser) => parser.parse_stream(msg_stream),
};
Box::pin(stream)
}
Expand Down Expand Up @@ -513,7 +516,7 @@ pub mod test_utils {
})
.collect_vec();

self.into_stream(futures::stream::once(async { Ok(source_messages) }).boxed())
self.parse_stream(futures::stream::once(async { Ok(source_messages) }).boxed())
.next()
.await
.unwrap()
Expand All @@ -531,7 +534,7 @@ pub mod test_utils {
})
.collect_vec();

self.into_stream(futures::stream::once(async { Ok(source_messages) }).boxed())
self.parse_stream(futures::stream::once(async { Ok(source_messages) }).boxed())
.next()
.await
.unwrap()
Expand Down
10 changes: 5 additions & 5 deletions src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl PlainParser {
// plain parser also used in the shared cdc source,
// we need to handle transaction metadata and schema change messages here
if let Some(msg_meta) = writer.row_meta()
&& let SourceMeta::DebeziumCdc(cdc_meta) = msg_meta.meta
&& let SourceMeta::DebeziumCdc(cdc_meta) = msg_meta.source_meta
&& let Some(data) = payload
{
match cdc_meta.msg_type {
Expand Down Expand Up @@ -252,7 +252,7 @@ mod tests {
let mut transactional = false;
// for untransactional source, we expect emit a chunk for each message batch
let message_stream = source_message_stream(transactional);
let chunk_stream = crate::parser::into_chunk_stream_inner(
let chunk_stream = crate::parser::parse_message_stream(
parser,
message_stream.boxed(),
SourceCtrlOpts::for_test(),
Expand Down Expand Up @@ -293,7 +293,7 @@ mod tests {
// for transactional source, we expect emit a single chunk for the transaction
transactional = true;
let message_stream = source_message_stream(transactional);
let chunk_stream = crate::parser::into_chunk_stream_inner(
let chunk_stream = crate::parser::parse_message_stream(
parser,
message_stream.boxed(),
SourceCtrlOpts::for_test(),
Expand Down Expand Up @@ -426,7 +426,7 @@ mod tests {
cdc_message::CdcMessageType::TransactionMeta,
));
let msg_meta = MessageMeta {
meta: &cdc_meta,
source_meta: &cdc_meta,
split_id: "1001",
offset: "",
};
Expand Down Expand Up @@ -500,7 +500,7 @@ mod tests {
cdc_message::CdcMessageType::SchemaChange,
));
let msg_meta = MessageMeta {
meta: &cdc_meta,
source_meta: &cdc_meta,
split_id: "1001",
offset: "",
};
Expand Down
14 changes: 8 additions & 6 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,20 +353,22 @@ pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result<SourceStruct>
Ok(SourceStruct::new(format, encode))
}

/// Stream of [`SourceMessage`].
pub type BoxSourceStream = BoxStream<'static, crate::error::ConnectorResult<Vec<SourceMessage>>>;
/// Stream of [`SourceMessage`]. Messages flow through the stream in the unit of a batch.
pub type BoxSourceMessageStream =
BoxStream<'static, crate::error::ConnectorResult<Vec<SourceMessage>>>;
/// Stream of [`StreamChunk`]s parsed from the messages from the external source.
pub type BoxSourceChunkStream = BoxStream<'static, crate::error::ConnectorResult<StreamChunk>>;

// Manually expand the trait alias to improve IDE experience.
pub trait ChunkSourceStream:
pub trait SourceChunkStream:
Stream<Item = crate::error::ConnectorResult<StreamChunk>> + Send + 'static
{
}
impl<T> ChunkSourceStream for T where
impl<T> SourceChunkStream for T where
T: Stream<Item = crate::error::ConnectorResult<StreamChunk>> + Send + 'static
{
}

pub type BoxChunkSourceStream = BoxStream<'static, crate::error::ConnectorResult<StreamChunk>>;
pub type BoxTryStream<M> = BoxStream<'static, crate::error::ConnectorResult<M>>;

/// [`SplitReader`] is a new abstraction of the external connector read interface which is
Expand All @@ -385,7 +387,7 @@ pub trait SplitReader: Sized + Send {
columns: Option<Vec<Column>>,
) -> crate::error::ConnectorResult<Self>;

fn into_stream(self) -> BoxChunkSourceStream;
fn into_stream(self) -> BoxSourceChunkStream;

fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
HashMap::new()
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::parser::ParserConfig;
use crate::source::base::SourceMessage;
use crate::source::cdc::{CdcProperties, CdcSourceType, CdcSourceTypeTrait, DebeziumCdcSplit};
use crate::source::{
into_chunk_stream, BoxChunkSourceStream, Column, SourceContextRef, SplitId, SplitMetaData,
into_chunk_stream, BoxSourceChunkStream, Column, SourceContextRef, SplitId, SplitMetaData,
SplitReader,
};

Expand Down Expand Up @@ -199,7 +199,7 @@ impl<T: CdcSourceTypeTrait> SplitReader for CdcSplitReader<T> {
Ok(instance)
}

fn into_stream(self) -> BoxChunkSourceStream {
fn into_stream(self) -> BoxSourceChunkStream {
let parser_config = self.parser_config.clone();
let source_context = self.source_ctx.clone();
into_chunk_stream(self.into_data_stream(), parser_config, source_context)
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub(crate) async fn into_chunk_stream(
let parser =
crate::parser::ByteStreamSourceParserImpl::create(parser_config, source_ctx).await?;
#[for_await]
for chunk in parser.into_stream(data_stream) {
for chunk in parser.parse_stream(data_stream) {
yield chunk?;
}
}
4 changes: 2 additions & 2 deletions src/connector/src/source/datagen/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::source::data_gen_util::spawn_data_generation_stream;
use crate::source::datagen::source::SEQUENCE_FIELD_KIND;
use crate::source::datagen::{DatagenProperties, DatagenSplit, FieldDesc};
use crate::source::{
into_chunk_stream, BoxChunkSourceStream, Column, DataType, SourceContextRef, SourceMessage,
into_chunk_stream, BoxSourceChunkStream, Column, DataType, SourceContextRef, SourceMessage,
SplitId, SplitMetaData, SplitReader,
};

Expand Down Expand Up @@ -142,7 +142,7 @@ impl SplitReader for DatagenSplitReader {
})
}

fn into_stream(self) -> BoxChunkSourceStream {
fn into_stream(self) -> BoxSourceChunkStream {
// Will buffer at most 4 event chunks.
const BUFFER_SIZE: usize = 4;
// spawn_data_generation_stream(self.generator.into_native_stream(), BUFFER_SIZE).boxed()
Expand Down
Loading

0 comments on commit adb8651

Please sign in to comment.