Skip to content

Commit

Permalink
feat: delete Kafka consumer group on drop (#20065)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored Jan 17, 2025
1 parent 439956e commit bb90e0f
Show file tree
Hide file tree
Showing 18 changed files with 481 additions and 206 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ axum = "=0.7.4" # TODO: 0.7.5+ does not work with current toolchain
axum-extra = "0.9"
futures-async-stream = "0.2.9"
hytra = "0.1"
rdkafka = { package = "madsim-rdkafka", version = "0.4.1", features = [
rdkafka = { package = "madsim-rdkafka", version = "0.4.3", features = [
"cmake-build",
] }
hashbrown = { version = "0.14", features = ["ahash", "inline-more", "nightly"] }
Expand Down
40 changes: 35 additions & 5 deletions e2e_test/source_inline/kafka/consumer_group.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,46 @@ async function get_fragment_id_of_mv(mv_name) {
}

async function list_consumer_groups(fragment_id) {
return (await $`rpk group list`)
let all_groups = (await $`rpk group list`)
.toString()
.trim()
.split("\n")
.slice(1)
.map((line) => {
const [_broker_id, group_name] = line.split(/\s+/);
return group_name;
})
.filter((group_name) => {
return group_name.startsWith(`rw-consumer-${fragment_id}`);
});
if (fragment_id) {
return all_groups.filter((group_name) => {
return group_name.endsWith(`-${fragment_id}`);
});
} else {
return all_groups;
}
}

async function count_consumer_groups() {
let map = (await $`rpk group list`)
.toString()
.trim()
.split("\n")
.slice(1)
.map((line) => {
const [_broker_id, group_name] = line.split(/\s+/);
console.error(group_name);
return group_name.split("-").slice(0, -1).join("-");
})
.reduce((acc, group_name_prefix) => {
acc.set(group_name_prefix, (acc.get(group_name_prefix) || 0) + 1);
return acc;
}, new Map());
let mapAsc = new Map([...map.entries()].sort());

let res = "";
for (const [group_name_prefix, count] of mapAsc) {
res += `${group_name_prefix}: ${count}\n`;
}
return res;
}

async function describe_consumer_group(group_name) {
Expand Down Expand Up @@ -73,9 +101,11 @@ async function list_consumer_group_lags(fragment_id) {
);
}

const fragment_id = await get_fragment_id_of_mv(mv);
const fragment_id = mv ? await get_fragment_id_of_mv(mv) : undefined;
if (command == "list-groups") {
echo`${await list_consumer_groups(fragment_id)}`;
} else if (command == "count-groups") {
echo`${await count_consumer_groups()}`;
} else if (command == "list-members") {
echo`${await list_consumer_group_members(fragment_id)}`;
} else if (command == "list-lags") {
Expand Down
88 changes: 24 additions & 64 deletions e2e_test/source_inline/kafka/consumer_group.slt.serial
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,9 @@ cat <<EOF | rpk topic produce test_consumer_group -f "%p %v\\n" -p 0
2 {"x":"c"}
EOF

statement ok
CREATE SOURCE s(x varchar)
WITH(
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_consumer_group',
scan.startup.mode = 'earliest',
) FORMAT PLAIN ENCODE JSON;

# custom group id prefix
statement ok
CREATE SOURCE s2(x varchar)
CREATE SOURCE s(x varchar)
WITH(
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_consumer_group',
Expand All @@ -34,9 +26,6 @@ WITH(
statement ok
CREATE MATERIALIZED VIEW mv AS SELECT * from s;

statement ok
CREATE MATERIALIZED VIEW mv2 AS SELECT * from s2;

query ?
SELECT * FROM s order by x;
----
Expand All @@ -54,9 +43,18 @@ b
c


# There are 2 consumer groups, 1 for batch query (not listed below), 1 for MV.
# shared source starts from latest, so we produce more data to ensure consumer group is created.
system ok
cat <<EOF | rpk topic produce test_consumer_group -f "%p %v\\n" -p 0
0 {"x":"d"}
1 {"x":"e"}
2 {"x":"f"}
EOF


# There are 2 consumer groups, 1 for batch query (not listed below), 1 for Source.
# All of them are "Empty" state with 0 members, because we manually `assign` partitions to them.
# At the beginning, the MV's consumer group will not occur. They will be created after committing offset to Kafka.
# At the beginning, the Source's consumer group will not occur. They will be created after committing offset to Kafka.
# (enable.auto.commit defaults to true, and auto.commit.interval.ms defaults to 5s)
sleep 5s

Expand All @@ -73,65 +71,27 @@ system ok
0


# We try to interfere by creating consumers that subscribing to the topic with the RW's group id.
system ok
./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-groups | tr ',' '\\n' | xargs -P4 -I {} sh -c "rpk topic consume test_consumer_group -g {}" &

# Wait a while for them to subscribe to the topic.
sleep 15s

# Test delete consumer group on drop

system ok
cat <<EOF | rpk topic produce test_consumer_group -f "%p %v\\n" -p 0
0 {"x":"d"}
1 {"x":"e"}
2 {"x":"f"}
EOF

sleep 2s

# Verify that RisingWave's Kafka consumer works independently from the console consumers subscribing to the same group.
query ?
SELECT * FROM mv order by x;
# my_group: 1 source fragment, 1 backfill fragment, 1 batch query
# TODO: drop backfill fragment on backfill finish
# We only check my_group to avoid interfering with other tests.
system ok retry 3 backoff 5s
./e2e_test/source_inline/kafka/consumer_group.mjs count-groups | grep my_group
----
a
b
c
d
e
f


query ?
SELECT * FROM mv2 order by x;
----
a
b
c
d
e
f
my_group: 3


statement ok
DROP SOURCE s CASCADE;

statement ok
DROP SOURCE s2 CASCADE;

## fragment id is not deterministic so comment out
# system ok
# rpk group list
# ---
# BROKER GROUP STATE
# 0 my_group-8 Empty
# 0 rw-consumer-3 Empty
# 0 rw-consumer-4294967295 Empty
# 0 rw-consumer-7 Empty
sleep 1s

system ok retry 3 backoff 5s
./e2e_test/source_inline/kafka/consumer_group.mjs count-groups | grep my_group
----
my_group: 2

system ok
pkill rpk

system ok
rpk topic delete test_consumer_group
126 changes: 126 additions & 0 deletions e2e_test/source_inline/kafka/consumer_group_non_shared.slt.serial
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
control substitution on

statement ok
SET streaming_use_shared_source TO false;

# Note either `./risedev rpk` or `rpk` is ok here.
# risedev-env contains env var RPK_BROKERS, which is read by rpk
system ok
rpk topic create test_consumer_group_non_shared -p 3

system ok
cat <<EOF | rpk topic produce test_consumer_group_non_shared -f "%p %v\\n" -p 0
0 {"x":"a"}
1 {"x":"b"}
2 {"x":"c"}
EOF

# custom group id prefix
statement ok
CREATE SOURCE s(x varchar)
WITH(
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_consumer_group_non_shared',
scan.startup.mode = 'earliest',
group.id.prefix = 'my_group_non_shared'
) FORMAT PLAIN ENCODE JSON;


statement ok
CREATE MATERIALIZED VIEW mv AS SELECT * from s;

query ?
SELECT * FROM s order by x;
----
a
b
c

sleep 2s

query ?
SELECT * FROM mv order by x;
----
a
b
c


# There are 2 consumer groups, 1 for batch query (not listed below), 1 for MV.
# All of them are "Empty" state with 0 members, because we manually `assign` partitions to them.
# At the beginning, the MV's consumer group will not occur. They will be created after committing offset to Kafka.
# (enable.auto.commit defaults to true, and auto.commit.interval.ms defaults to 5s)
sleep 5s

system ok
./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-members
----
0


# The lag for MV's group is 0.
system ok
./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-lags
----
0


# We try to interfere by creating consumers that subscribing to the topic with the RW's group id.
system ok
./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-groups | tr ',' '\\n' | xargs -P4 -I {} sh -c "rpk topic consume test_consumer_group_non_shared -g {}" &

# Wait a while for them to subscribe to the topic.
sleep 15s


system ok
cat <<EOF | rpk topic produce test_consumer_group_non_shared -f "%p %v\\n" -p 0
0 {"x":"d"}
1 {"x":"e"}
2 {"x":"f"}
EOF

sleep 2s

# Verify that RisingWave's Kafka consumer works independently from the console consumers subscribing to the same group.
query ?
SELECT * FROM mv order by x;
----
a
b
c
d
e
f


system ok
pkill rpk

# Test delete consumer group on drop

# my_group_non_shared: 1 source fragment, 1 batch query
# We only check my_group_non_shared to avoid interfering with other tests.
system ok retry 3 backoff 5s
./e2e_test/source_inline/kafka/consumer_group.mjs count-groups | grep my_group_non_shared
----
my_group_non_shared: 2


statement ok
DROP MATERIALIZED VIEW mv;

sleep 1s

system ok retry 3 backoff 5s
./e2e_test/source_inline/kafka/consumer_group.mjs count-groups | grep my_group_non_shared
----
my_group_non_shared: 1



statement ok
DROP SOURCE s CASCADE;

system ok
rpk topic delete test_consumer_group_non_shared
15 changes: 15 additions & 0 deletions src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,18 @@ pub fn current_cluster_version() -> String {
PG_VERSION, RW_VERSION, GIT_SHA
)
}

/// Panics if `debug_assertions` is set, otherwise logs a warning.
///
/// Note: unlike `panic` which returns `!`, this macro returns `()`,
/// which cannot be used like `result.unwrap_or_else(|| panic_if_debug!(...))`.
#[macro_export]
macro_rules! panic_if_debug {
($($arg:tt)*) => {
if cfg!(debug_assertions) {
panic!($($arg)*)
} else {
tracing::warn!($($arg)*)
}
};
}
9 changes: 9 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ pub trait SplitEnumerator: Sized + Send {
async fn new(properties: Self::Properties, context: SourceEnumeratorContextRef)
-> Result<Self>;
async fn list_splits(&mut self) -> Result<Vec<Self::Split>>;
/// Do some cleanup work when a fragment is dropped, e.g., drop Kafka consumer group.
async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
Ok(())
}
}

pub type SourceContextRef = Arc<SourceContext>;
Expand All @@ -201,6 +205,7 @@ pub type SourceEnumeratorContextRef = Arc<SourceEnumeratorContext>;
#[async_trait]
pub trait AnySplitEnumerator: Send {
async fn list_splits(&mut self) -> Result<Vec<SplitImpl>>;
async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()>;
}

#[async_trait]
Expand All @@ -210,6 +215,10 @@ impl<T: SplitEnumerator<Split: Into<SplitImpl>>> AnySplitEnumerator for T {
.await
.map(|s| s.into_iter().map(|s| s.into()).collect())
}

async fn on_drop_fragments(&mut self, _fragment_ids: Vec<u32>) -> Result<()> {
SplitEnumerator::on_drop_fragments(self, _fragment_ids).await
}
}

/// The max size of a chunk yielded by source stream.
Expand Down
Loading

0 comments on commit bb90e0f

Please sign in to comment.