From bdee34e437987347385ae7cf656f94e5c8d4a997 Mon Sep 17 00:00:00 2001
From: zwang28 <84491488@qq.com>
Date: Fri, 3 Jan 2025 12:28:47 +0800
Subject: [PATCH 1/4] fix(meta): fix notification when dropping tables
---
src/meta/service/src/notification_service.rs | 11 +++---
src/meta/src/controller/catalog/drop_op.rs | 23 ++++++++++--
src/meta/src/controller/catalog/list_op.rs | 2 +-
src/meta/src/controller/catalog/mod.rs | 35 +++++++++++++++---
src/meta/src/manager/notification.rs | 34 ------------------
src/meta/src/stream/stream_manager.rs | 38 ++++++++++++++++++--
6 files changed, 92 insertions(+), 51 deletions(-)
diff --git a/src/meta/service/src/notification_service.rs b/src/meta/service/src/notification_service.rs
index b766689cc9bf3..35d5eb257fc89 100644
--- a/src/meta/service/src/notification_service.rs
+++ b/src/meta/service/src/notification_service.rs
@@ -194,21 +194,20 @@ impl NotificationServiceImpl {
Ok((nodes, notification_version))
}
- async fn get_tables_and_creating_tables_snapshot(
- &self,
- ) -> MetaResult<(Vec
, NotificationVersion)> {
+ async fn get_tables_snapshot(&self) -> MetaResult<(Vec, NotificationVersion)> {
let catalog_guard = self
.metadata_manager
.catalog_controller
.get_inner_read_guard()
.await;
- let tables = catalog_guard.list_all_state_tables().await?;
+ let mut tables = catalog_guard.list_all_state_tables(None).await?;
+ tables.extend(catalog_guard.dropped_tables.values().cloned());
let notification_version = self.env.notification_manager().current_version().await;
Ok((tables, notification_version))
}
async fn compactor_subscribe(&self) -> MetaResult {
- let (tables, catalog_version) = self.get_tables_and_creating_tables_snapshot().await?;
+ let (tables, catalog_version) = self.get_tables_snapshot().await?;
Ok(MetaSnapshot {
tables,
@@ -294,7 +293,7 @@ impl NotificationServiceImpl {
}
async fn hummock_subscribe(&self) -> MetaResult {
- let (tables, catalog_version) = self.get_tables_and_creating_tables_snapshot().await?;
+ let (tables, catalog_version) = self.get_tables_snapshot().await?;
let hummock_version = self
.hummock_manager
.on_current_version(|version| version.into())
diff --git a/src/meta/src/controller/catalog/drop_op.rs b/src/meta/src/controller/catalog/drop_op.rs
index 46928fcb487cb..453cab1ac2b13 100644
--- a/src/meta/src/controller/catalog/drop_op.rs
+++ b/src/meta/src/controller/catalog/drop_op.rs
@@ -12,8 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use super::*;
+use risingwave_pb::catalog::PbTable;
+use super::*;
impl CatalogController {
pub async fn drop_relation(
&self,
@@ -21,7 +22,7 @@ impl CatalogController {
object_id: ObjectId,
drop_mode: DropMode,
) -> MetaResult<(ReleaseContext, NotificationVersion)> {
- let inner = self.inner.write().await;
+ let mut inner = self.inner.write().await;
let txn = inner.db.begin().await?;
let obj: PartialObject = Object::find_by_id(object_id)
.into_partial_model()
@@ -239,11 +240,21 @@ impl CatalogController {
// notify about them.
self.notify_users_update(user_infos).await;
+ let dropped_tables = inner
+ .list_all_state_tables(Some(to_drop_state_table_ids.iter().copied().collect()))
+ .await?;
+ inner.dropped_tables.extend(
+ dropped_tables
+ .into_iter()
+ .map(|t| (TableId::try_from(t.id).unwrap(), t)),
+ );
let relation_group = build_relation_group_for_delete(to_drop_objects);
let version = self
.notify_frontend(NotificationOperation::Delete, relation_group)
.await;
+ // Hummock observers and compactor observers are notified once the corresponding barrier is completed.
+ // They only need RelationInfo::Table.
let fragment_mappings = fragment_ids
.into_iter()
@@ -576,4 +587,12 @@ impl CatalogController {
.await;
Ok(version)
}
+
+ pub async fn complete_dropped_tables(
+ &self,
+ table_ids: impl Iterator- ,
+ ) -> Vec {
+ let mut inner = self.inner.write().await;
+ inner.complete_dropped_tables(table_ids)
+ }
}
diff --git a/src/meta/src/controller/catalog/list_op.rs b/src/meta/src/controller/catalog/list_op.rs
index 387cc353c60ad..e0f69f3294c7c 100644
--- a/src/meta/src/controller/catalog/list_op.rs
+++ b/src/meta/src/controller/catalog/list_op.rs
@@ -104,7 +104,7 @@ impl CatalogController {
pub async fn list_all_state_tables(&self) -> MetaResult> {
let inner = self.inner.read().await;
- inner.list_all_state_tables().await
+ inner.list_all_state_tables(None).await
}
pub async fn list_readonly_table_ids(&self, schema_id: SchemaId) -> MetaResult> {
diff --git a/src/meta/src/controller/catalog/mod.rs b/src/meta/src/controller/catalog/mod.rs
index 9277eda30f0df..4cade80439be5 100644
--- a/src/meta/src/controller/catalog/mod.rs
+++ b/src/meta/src/controller/catalog/mod.rs
@@ -146,6 +146,7 @@ impl CatalogController {
inner: RwLock::new(CatalogControllerInner {
db: meta_store.conn,
creating_table_finish_notifier: HashMap::new(),
+ dropped_tables: HashMap::new(),
}),
};
@@ -172,6 +173,8 @@ pub struct CatalogControllerInner {
/// On notifying, we can remove the entry from this map.
pub creating_table_finish_notifier:
HashMap>>>,
+ /// Tables have been dropped from the meta store, but the corresponding barrier remains unfinished.
+ pub dropped_tables: HashMap,
}
impl CatalogController {
@@ -616,11 +619,16 @@ impl CatalogControllerInner {
}
/// `list_all_tables` return all tables and internal tables.
- pub async fn list_all_state_tables(&self) -> MetaResult> {
- let table_objs = Table::find()
- .find_also_related(Object)
- .all(&self.db)
- .await?;
+ /// `table_ids_filter` is used for filtering if it's set.
+ pub async fn list_all_state_tables(
+ &self,
+ table_ids_filter: Option>,
+ ) -> MetaResult> {
+ let mut table_objs = Table::find().find_also_related(Object);
+ if let Some(table_ids_filter) = table_ids_filter {
+ table_objs = table_objs.filter(table::Column::TableId.is_in(table_ids_filter));
+ }
+ let table_objs = table_objs.all(&self.db).await?;
Ok(table_objs
.into_iter()
@@ -861,4 +869,21 @@ impl CatalogControllerInner {
.await?;
Ok(table_ids)
}
+
+ /// Since the tables have been dropped from both meta store and streaming jobs, this method removes those table copies.
+ /// Returns the removed table copies.
+ pub(crate) fn complete_dropped_tables(
+ &mut self,
+ table_ids: impl Iterator
- ,
+ ) -> Vec {
+ let mut res = vec![];
+ for table_id in table_ids {
+ if let Some(t) = self.dropped_tables.remove(&table_id) {
+ res.push(t);
+ continue;
+ }
+ tracing::warn!("table {table_id} not found");
+ }
+ res
+ }
}
diff --git a/src/meta/src/manager/notification.rs b/src/meta/src/manager/notification.rs
index f3548389c09ae..db145ceca3252 100644
--- a/src/meta/src/manager/notification.rs
+++ b/src/meta/src/manager/notification.rs
@@ -197,45 +197,11 @@ impl NotificationManager {
.await
}
- pub async fn notify_hummock_relation_info(
- &self,
- operation: Operation,
- relation_info: RelationInfo,
- ) -> NotificationVersion {
- self.notify_with_version(
- SubscribeType::Hummock.into(),
- operation,
- Info::RelationGroup(RelationGroup {
- relations: vec![Relation {
- relation_info: relation_info.into(),
- }],
- }),
- )
- .await
- }
-
pub async fn notify_compactor(&self, operation: Operation, info: Info) -> NotificationVersion {
self.notify_with_version(SubscribeType::Compactor.into(), operation, info)
.await
}
- pub async fn notify_compactor_relation_info(
- &self,
- operation: Operation,
- relation_info: RelationInfo,
- ) -> NotificationVersion {
- self.notify_with_version(
- SubscribeType::Compactor.into(),
- operation,
- Info::RelationGroup(RelationGroup {
- relations: vec![Relation {
- relation_info: relation_info.into(),
- }],
- }),
- )
- .await
- }
-
pub async fn notify_compute(&self, operation: Operation, info: Info) -> NotificationVersion {
self.notify_with_version(SubscribeType::Compute.into(), operation, info)
.await
diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs
index aca86c1745e73..95e76d9f90c0a 100644
--- a/src/meta/src/stream/stream_manager.rs
+++ b/src/meta/src/stream/stream_manager.rs
@@ -21,6 +21,9 @@ use risingwave_common::bail;
use risingwave_common::catalog::{DatabaseId, TableId};
use risingwave_meta_model::{ObjectId, WorkerId};
use risingwave_pb::catalog::{CreateType, Subscription, Table};
+use risingwave_pb::meta::relation::PbRelationInfo;
+use risingwave_pb::meta::subscribe_response::{Operation, PbInfo};
+use risingwave_pb::meta::{PbRelation, PbRelationGroup};
use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
use risingwave_pb::stream_plan::Dispatcher;
use thiserror_ext::AsReport;
@@ -517,7 +520,7 @@ impl GlobalStreamManager {
|| !streaming_job_ids.is_empty()
|| !state_table_ids.is_empty()
{
- let _ = self
+ let res = self
.barrier_scheduler
.run_command(
database_id,
@@ -528,8 +531,8 @@ impl GlobalStreamManager {
.collect(),
actors: removed_actors,
unregistered_state_table_ids: state_table_ids
- .into_iter()
- .map(|table_id| TableId::new(table_id as _))
+ .iter()
+ .map(|table_id| TableId::new(*table_id as _))
.collect(),
unregistered_fragment_ids: fragment_ids,
},
@@ -538,9 +541,38 @@ impl GlobalStreamManager {
.inspect_err(|err| {
tracing::error!(error = ?err.as_report(), "failed to run drop command");
});
+ if res.is_ok() {
+ self.post_dropping_streaming_jobs(state_table_ids).await;
+ }
}
}
+ async fn post_dropping_streaming_jobs(
+ &self,
+ state_table_ids: Vec,
+ ) {
+ let tables = self
+ .metadata_manager
+ .catalog_controller
+ .complete_dropped_tables(state_table_ids.into_iter())
+ .await;
+ let relations = tables
+ .into_iter()
+ .map(|t| PbRelation {
+ relation_info: Some(PbRelationInfo::Table(t)),
+ })
+ .collect();
+ let group = PbInfo::RelationGroup(PbRelationGroup { relations });
+ self.env
+ .notification_manager()
+ .notify_hummock(Operation::Delete, group.clone())
+ .await;
+ self.env
+ .notification_manager()
+ .notify_compactor(Operation::Delete, group)
+ .await;
+ }
+
/// Cancel streaming jobs and return the canceled table ids.
/// 1. Send cancel message to stream jobs (via `cancel_jobs`).
/// 2. Send cancel message to recovered stream jobs (via `barrier_scheduler`).
From d12624572d171c4465c2a4fdc3cdc2ac1f3202a6 Mon Sep 17 00:00:00 2001
From: zwang28 <84491488@qq.com>
Date: Fri, 3 Jan 2025 18:48:24 +0800
Subject: [PATCH 2/4] bugfix
---
src/meta/src/controller/catalog/drop_op.rs | 8 +++-----
1 file changed, 3 insertions(+), 5 deletions(-)
diff --git a/src/meta/src/controller/catalog/drop_op.rs b/src/meta/src/controller/catalog/drop_op.rs
index 453cab1ac2b13..7c0bfe2660d99 100644
--- a/src/meta/src/controller/catalog/drop_op.rs
+++ b/src/meta/src/controller/catalog/drop_op.rs
@@ -222,7 +222,9 @@ impl CatalogController {
.into_tuple()
.all(&txn)
.await?;
-
+ let dropped_tables = inner
+ .list_all_state_tables(Some(to_drop_state_table_ids.iter().copied().collect()))
+ .await?;
// delete all in to_drop_objects.
let res = Object::delete_many()
.filter(object::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
@@ -235,14 +237,10 @@ impl CatalogController {
));
}
let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?;
-
txn.commit().await?;
// notify about them.
self.notify_users_update(user_infos).await;
- let dropped_tables = inner
- .list_all_state_tables(Some(to_drop_state_table_ids.iter().copied().collect()))
- .await?;
inner.dropped_tables.extend(
dropped_tables
.into_iter()
From f52773af8dab02d1d735946fbfa29204382e053d Mon Sep 17 00:00:00 2001
From: zwang28 <84491488@qq.com>
Date: Fri, 3 Jan 2025 19:12:08 +0800
Subject: [PATCH 3/4] Don't request another db connection while holding a txn.
---
src/meta/service/src/notification_service.rs | 2 +-
src/meta/src/controller/catalog/drop_op.rs | 25 +++++++++++++-------
src/meta/src/controller/catalog/list_op.rs | 2 +-
src/meta/src/controller/catalog/mod.rs | 15 ++++--------
4 files changed, 24 insertions(+), 20 deletions(-)
diff --git a/src/meta/service/src/notification_service.rs b/src/meta/service/src/notification_service.rs
index 35d5eb257fc89..60e23f9138367 100644
--- a/src/meta/service/src/notification_service.rs
+++ b/src/meta/service/src/notification_service.rs
@@ -200,7 +200,7 @@ impl NotificationServiceImpl {
.catalog_controller
.get_inner_read_guard()
.await;
- let mut tables = catalog_guard.list_all_state_tables(None).await?;
+ let mut tables = catalog_guard.list_all_state_tables().await?;
tables.extend(catalog_guard.dropped_tables.values().cloned());
let notification_version = self.env.notification_manager().current_version().await;
Ok((tables, notification_version))
diff --git a/src/meta/src/controller/catalog/drop_op.rs b/src/meta/src/controller/catalog/drop_op.rs
index 7c0bfe2660d99..8b4749cd1b228 100644
--- a/src/meta/src/controller/catalog/drop_op.rs
+++ b/src/meta/src/controller/catalog/drop_op.rs
@@ -222,9 +222,20 @@ impl CatalogController {
.into_tuple()
.all(&txn)
.await?;
- let dropped_tables = inner
- .list_all_state_tables(Some(to_drop_state_table_ids.iter().copied().collect()))
- .await?;
+ let dropped_tables = Table::find()
+ .find_also_related(Object)
+ .filter(
+ table::Column::TableId.is_in(
+ to_drop_state_table_ids
+ .iter()
+ .copied()
+ .collect::>(),
+ ),
+ )
+ .all(&txn)
+ .await?
+ .into_iter()
+ .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
// delete all in to_drop_objects.
let res = Object::delete_many()
.filter(object::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
@@ -241,11 +252,9 @@ impl CatalogController {
// notify about them.
self.notify_users_update(user_infos).await;
- inner.dropped_tables.extend(
- dropped_tables
- .into_iter()
- .map(|t| (TableId::try_from(t.id).unwrap(), t)),
- );
+ inner
+ .dropped_tables
+ .extend(dropped_tables.map(|t| (TableId::try_from(t.id).unwrap(), t)));
let relation_group = build_relation_group_for_delete(to_drop_objects);
let version = self
diff --git a/src/meta/src/controller/catalog/list_op.rs b/src/meta/src/controller/catalog/list_op.rs
index e0f69f3294c7c..387cc353c60ad 100644
--- a/src/meta/src/controller/catalog/list_op.rs
+++ b/src/meta/src/controller/catalog/list_op.rs
@@ -104,7 +104,7 @@ impl CatalogController {
pub async fn list_all_state_tables(&self) -> MetaResult> {
let inner = self.inner.read().await;
- inner.list_all_state_tables(None).await
+ inner.list_all_state_tables().await
}
pub async fn list_readonly_table_ids(&self, schema_id: SchemaId) -> MetaResult> {
diff --git a/src/meta/src/controller/catalog/mod.rs b/src/meta/src/controller/catalog/mod.rs
index 4cade80439be5..1063e45ec50db 100644
--- a/src/meta/src/controller/catalog/mod.rs
+++ b/src/meta/src/controller/catalog/mod.rs
@@ -619,16 +619,11 @@ impl CatalogControllerInner {
}
/// `list_all_tables` return all tables and internal tables.
- /// `table_ids_filter` is used for filtering if it's set.
- pub async fn list_all_state_tables(
- &self,
- table_ids_filter: Option>,
- ) -> MetaResult> {
- let mut table_objs = Table::find().find_also_related(Object);
- if let Some(table_ids_filter) = table_ids_filter {
- table_objs = table_objs.filter(table::Column::TableId.is_in(table_ids_filter));
- }
- let table_objs = table_objs.all(&self.db).await?;
+ pub async fn list_all_state_tables(&self) -> MetaResult> {
+ let table_objs = Table::find()
+ .find_also_related(Object)
+ .all(&self.db)
+ .await?;
Ok(table_objs
.into_iter()
From fcfdb8fbe875ee2da20b203de14caca1409d8097 Mon Sep 17 00:00:00 2001
From: zwang28 <84491488@qq.com>
Date: Fri, 3 Jan 2025 19:37:17 +0800
Subject: [PATCH 4/4] fix incorrect assertion
---
.../src/compactor_observer/observer_manager.rs | 16 +++++++---------
src/storage/src/hummock/observer_manager.rs | 16 +++++++---------
2 files changed, 14 insertions(+), 18 deletions(-)
diff --git a/src/storage/compactor/src/compactor_observer/observer_manager.rs b/src/storage/compactor/src/compactor_observer/observer_manager.rs
index 21a00eabfb5a2..c07c5fb53b86d 100644
--- a/src/storage/compactor/src/compactor_observer/observer_manager.rs
+++ b/src/storage/compactor/src/compactor_observer/observer_manager.rs
@@ -41,20 +41,18 @@ impl ObserverState for CompactorObserverNode {
for relation in relation_group.relations {
match relation.relation_info.unwrap() {
RelationInfo::Table(table_catalog) => {
- assert!(
- resp.version > self.version,
- "resp version={:?}, current version={:?}",
- resp.version,
- self.version
- );
-
self.handle_catalog_notification(resp.operation(), table_catalog);
-
- self.version = resp.version;
}
_ => panic!("error type notification"),
};
}
+ assert!(
+ resp.version > self.version,
+ "resp version={:?}, current version={:?}",
+ resp.version,
+ self.version
+ );
+ self.version = resp.version;
}
Info::HummockVersionDeltas(_) => {}
Info::SystemParams(p) => {
diff --git a/src/storage/src/hummock/observer_manager.rs b/src/storage/src/hummock/observer_manager.rs
index 9824eade5cb4f..3b82a9b223d00 100644
--- a/src/storage/src/hummock/observer_manager.rs
+++ b/src/storage/src/hummock/observer_manager.rs
@@ -52,20 +52,18 @@ impl ObserverState for HummockObserverNode {
for relation in relation_group.relations {
match relation.relation_info.unwrap() {
RelationInfo::Table(table_catalog) => {
- assert!(
- resp.version > self.version,
- "resp version={:?}, current version={:?}",
- resp.version,
- self.version
- );
-
self.handle_catalog_notification(resp.operation(), table_catalog);
-
- self.version = resp.version;
}
_ => panic!("error type notification"),
};
}
+ assert!(
+ resp.version > self.version,
+ "resp version={:?}, current version={:?}",
+ resp.version,
+ self.version
+ );
+ self.version = resp.version;
}
Info::HummockVersionDeltas(hummock_version_deltas) => {
let _ = self