From 8b2921a62c8aefc65358373fbe5762695bfd2299 Mon Sep 17 00:00:00 2001 From: Lokesh Kumar Date: Sat, 22 Mar 2025 16:42:42 +0100 Subject: [PATCH] feat(processing_engine): Store triggers by DbId and TriggerId instead of name --- Cargo.lock | 1 + influxdb3/tests/cli/mod.rs | 4 +- influxdb3_catalog/src/catalog.rs | 7 +- influxdb3_catalog/src/catalog/update.rs | 1 + influxdb3_catalog/src/log.rs | 1 + influxdb3_catalog/src/snapshot.rs | 3 + influxdb3_processing_engine/Cargo.toml | 1 + influxdb3_processing_engine/src/lib.rs | 212 +++++++++------------ influxdb3_processing_engine/src/manager.rs | 12 +- influxdb3_processing_engine/src/plugins.rs | 44 +++-- 10 files changed, 133 insertions(+), 153 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 69c7d2f8f64..1be4c1bce72 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3019,6 +3019,7 @@ dependencies = [ "influxdb3_cache", "influxdb3_catalog", "influxdb3_client", + "influxdb3_id", "influxdb3_internal_api", "influxdb3_py_api", "influxdb3_sys_events", diff --git a/influxdb3/tests/cli/mod.rs b/influxdb3/tests/cli/mod.rs index d92a39fdb47..31ea7146271 100644 --- a/influxdb3/tests/cli/mod.rs +++ b/influxdb3/tests/cli/mod.rs @@ -2516,7 +2516,7 @@ def process_scheduled_call(influxdb3_local, schedule_time, args=None): .unwrap(); // Wait for trigger to run several times - tokio::time::sleep(std::time::Duration::from_millis(3100)).await; + tokio::time::sleep(std::time::Duration::from_millis(5000)).await; // Query to see what values were written before disabling let first_query_result = server @@ -2535,7 +2535,7 @@ def process_scheduled_call(influxdb3_local, schedule_time, args=None): server.enable_trigger(db_name, trigger_name).run().unwrap(); // Wait for trigger to run again - tokio::time::sleep(std::time::Duration::from_millis(1100)).await; + tokio::time::sleep(std::time::Duration::from_millis(5000)).await; // Query results after re-enabling let second_query_result = server diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index fa0567a01b1..1709764a825 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -300,9 +300,8 @@ impl Catalog { self.inner.read().db_exists(db_id) } - /// Get active triggers by database and trigger name - // NOTE: this could be id-based in future - pub fn active_triggers(&self) -> Vec<(Arc, Arc)> { + /// Get active triggers by database ID and trigger ID + pub fn active_triggers(&self) -> Vec<(DbId, TriggerId)> { let inner = self.inner.read(); let result = inner .databases @@ -314,7 +313,7 @@ impl Catalog { if trigger.disabled { None } else { - Some((Arc::clone(&db.name), Arc::clone(&trigger.trigger_name))) + Some((db.id, trigger.trigger_id)) } }) }) diff --git a/influxdb3_catalog/src/catalog/update.rs b/influxdb3_catalog/src/catalog/update.rs index e95e8bd4dd6..ec3d192c5ed 100644 --- a/influxdb3_catalog/src/catalog/update.rs +++ b/influxdb3_catalog/src/catalog/update.rs @@ -539,6 +539,7 @@ impl Catalog { db.id, db.name(), vec![DatabaseCatalogOp::CreateTrigger(TriggerDefinition { + database_id: db.id, trigger_id, trigger_name: trigger_name.into(), plugin_filename: plugin_filename.to_string(), diff --git a/influxdb3_catalog/src/log.rs b/influxdb3_catalog/src/log.rs index e94aa23cde9..90ee316fa10 100644 --- a/influxdb3_catalog/src/log.rs +++ b/influxdb3_catalog/src/log.rs @@ -642,6 +642,7 @@ pub struct TriggerDefinition { pub trigger_id: TriggerId, pub trigger_name: Arc, pub plugin_filename: String, + pub database_id: DbId, pub database_name: Arc, pub node_id: Arc, pub trigger: TriggerSpecificationDefinition, diff --git a/influxdb3_catalog/src/snapshot.rs b/influxdb3_catalog/src/snapshot.rs index cdde4a25352..7145e45da6d 100644 --- a/influxdb3_catalog/src/snapshot.rs +++ b/influxdb3_catalog/src/snapshot.rs @@ -211,6 +211,7 @@ impl Snapshot for TableDefinition { #[derive(Debug, Serialize, Deserialize)] pub(crate) struct ProcessingEngineTriggerSnapshot { + pub database_id: DbId, pub trigger_id: TriggerId, pub trigger_name: Arc, pub node_id: Arc, @@ -227,6 +228,7 @@ impl Snapshot for TriggerDefinition { fn snapshot(&self) -> Self::Serialized { Self::Serialized { + database_id: self.database_id, trigger_id: self.trigger_id, trigger_name: Arc::clone(&self.trigger_name), node_id: Arc::clone(&self.node_id), @@ -241,6 +243,7 @@ impl Snapshot for TriggerDefinition { fn from_snapshot(snap: Self::Serialized) -> Self { Self { + database_id: snap.database_id, trigger_id: snap.trigger_id, trigger_name: snap.trigger_name, node_id: snap.node_id, diff --git a/influxdb3_processing_engine/Cargo.toml b/influxdb3_processing_engine/Cargo.toml index cd3e56f7cc3..d41c9108051 100644 --- a/influxdb3_processing_engine/Cargo.toml +++ b/influxdb3_processing_engine/Cargo.toml @@ -19,6 +19,7 @@ hyper.workspace = true iox_time.workspace = true influxdb3_catalog = { path = "../influxdb3_catalog" } influxdb3_client = { path = "../influxdb3_client" } +influxdb3_id = { path = "../influxdb3_id" } influxdb3_internal_api = { path = "../influxdb3_internal_api" } influxdb3_py_api = { path = "../influxdb3_py_api" } influxdb3_types = { path = "../influxdb3_types" } diff --git a/influxdb3_processing_engine/src/lib.rs b/influxdb3_processing_engine/src/lib.rs index 81d36ea884a..783f71e743b 100644 --- a/influxdb3_processing_engine/src/lib.rs +++ b/influxdb3_processing_engine/src/lib.rs @@ -7,13 +7,13 @@ use anyhow::Context; use bytes::Bytes; use hashbrown::HashMap; use hyper::{Body, Response}; -use influxdb3_catalog::CatalogError; use influxdb3_catalog::catalog::Catalog; use influxdb3_catalog::channel::CatalogUpdateReceiver; use influxdb3_catalog::log::{ CatalogBatch, DatabaseCatalogOp, DeleteTriggerLog, PluginType, TriggerDefinition, TriggerIdentifier, TriggerSpecificationDefinition, ValidPluginFilename, }; +use influxdb3_id::{DbId, TriggerId}; use influxdb3_internal_api::query_executor::QueryExecutor; use influxdb3_py_api::system_py::CacheStore; use influxdb3_sys_events::SysEventStore; @@ -24,7 +24,7 @@ use influxdb3_types::http::{ use influxdb3_wal::{SnapshotDetails, WalContents, WalFileNotifier}; use influxdb3_write::WriteBuffer; use iox_time::TimeProvider; -use observability_deps::tracing::{debug, error, warn}; +use observability_deps::tracing::{error, warn}; use parking_lot::Mutex; use std::any::Any; use std::path::PathBuf; @@ -39,7 +39,7 @@ pub mod plugins; pub mod virtualenv; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ProcessingEngineManagerImpl { environment_manager: ProcessingEngineEnvironmentManager, catalog: Arc, @@ -49,15 +49,15 @@ pub struct ProcessingEngineManagerImpl { time_provider: Arc, sys_event_store: Arc, cache: Arc>, - plugin_event_tx: RwLock, + plugin_event_tx: Arc>, } -#[derive(Debug, Default)] +#[derive(Default, Debug, Clone)] struct PluginChannels { - /// Map of database to wal trigger name to handler - wal_triggers: HashMap>>, - /// Map of database to schedule trigger name to handler - schedule_triggers: HashMap>>, + /// Map of database to wal trigger id to handler + wal_triggers: HashMap<(DbId, TriggerId), mpsc::Sender>, + /// Map of database to schedule trigger id to handler + schedule_triggers: HashMap<(DbId, TriggerId), mpsc::Sender>, /// Map of request path to the request trigger handler request_triggers: HashMap>, } @@ -68,51 +68,47 @@ impl PluginChannels { // returns Ok(Some(receiver)) if there was a sender to the named trigger. async fn send_shutdown( &self, - db: String, - trigger: String, + db: &DbId, + trigger: &TriggerId, trigger_spec: &TriggerSpecificationDefinition, ) -> Result>, ProcessingEngineError> { match trigger_spec { TriggerSpecificationDefinition::SingleTableWalWrite { .. } | TriggerSpecificationDefinition::AllTablesWalWrite => { - if let Some(trigger_map) = self.wal_triggers.get(&db) { - if let Some(sender) = trigger_map.get(&trigger) { - // create a one shot to wait for the shutdown to complete - let (tx, rx) = oneshot::channel(); - if sender.send(WalEvent::Shutdown(tx)).await.is_err() { - return Err(ProcessingEngineError::TriggerShutdownError { - database: db, - trigger_name: trigger, - }); - } - return Ok(Some(rx)); + if let Some(sender) = self.wal_triggers.get(&(*db, *trigger)) { + // create a one shot to wait for the shutdown to complete + let (tx, rx) = oneshot::channel(); + if sender.send(WalEvent::Shutdown(tx)).await.is_err() { + return Err(ProcessingEngineError::TriggerShutdownError { + database_id: *db, + trigger_id: *trigger, + }); } + return Ok(Some(rx)); } } TriggerSpecificationDefinition::Schedule { .. } | TriggerSpecificationDefinition::Every { .. } => { - if let Some(trigger_map) = self.schedule_triggers.get(&db) { - if let Some(sender) = trigger_map.get(&trigger) { - // create a one shot to wait for the shutdown to complete - let (tx, rx) = oneshot::channel(); - if sender.send(ScheduleEvent::Shutdown(tx)).await.is_err() { - return Err(ProcessingEngineError::TriggerShutdownError { - database: db, - trigger_name: trigger, - }); - } - return Ok(Some(rx)); + if let Some(sender) = self.schedule_triggers.get(&(*db, *trigger)) { + // create a one shot to wait for the shutdown to complete + let (tx, rx) = oneshot::channel(); + if sender.send(ScheduleEvent::Shutdown(tx)).await.is_err() { + return Err(ProcessingEngineError::TriggerShutdownError { + database_id: *db, + trigger_id: *trigger, + }); } + return Ok(Some(rx)); } } - TriggerSpecificationDefinition::RequestPath { .. } => { - if let Some(sender) = self.request_triggers.get(&trigger) { + TriggerSpecificationDefinition::RequestPath { path } => { + if let Some(sender) = self.request_triggers.get(path) { // create a one shot to wait for the shutdown to complete let (tx, rx) = oneshot::channel(); if sender.send(RequestEvent::Shutdown(tx)).await.is_err() { return Err(ProcessingEngineError::TriggerShutdownError { - database: db, - trigger_name: trigger, + database_id: *db, + trigger_id: *trigger, }); } return Ok(Some(rx)); @@ -125,45 +121,38 @@ impl PluginChannels { fn remove_trigger( &mut self, - db: String, - trigger: String, + db_id: &DbId, + trigger_id: &TriggerId, trigger_spec: &TriggerSpecificationDefinition, ) { match trigger_spec { TriggerSpecificationDefinition::SingleTableWalWrite { .. } | TriggerSpecificationDefinition::AllTablesWalWrite => { - if let Some(trigger_map) = self.wal_triggers.get_mut(&db) { - trigger_map.remove(&trigger); - } + self.wal_triggers.remove(&(*db_id, *trigger_id)); } TriggerSpecificationDefinition::Schedule { .. } | TriggerSpecificationDefinition::Every { .. } => { - if let Some(trigger_map) = self.schedule_triggers.get_mut(&db) { - trigger_map.remove(&trigger); - } + self.schedule_triggers.remove(&(*db_id, *trigger_id)); } - TriggerSpecificationDefinition::RequestPath { .. } => { - self.request_triggers.remove(&trigger); + TriggerSpecificationDefinition::RequestPath { path } => { + self.request_triggers.remove(path); } } } - fn add_wal_trigger(&mut self, db: String, trigger: String) -> mpsc::Receiver { + fn add_wal_trigger(&mut self, db_id: DbId, trigger_id: TriggerId) -> mpsc::Receiver { let (tx, rx) = mpsc::channel(PLUGIN_EVENT_BUFFER_SIZE); - self.wal_triggers.entry(db).or_default().insert(trigger, tx); + self.wal_triggers.insert((db_id, trigger_id), tx); rx } fn add_schedule_trigger( &mut self, - db: String, - trigger: String, + db_id: DbId, + trigger_id: TriggerId, ) -> mpsc::Receiver { let (tx, rx) = mpsc::channel(PLUGIN_EVENT_BUFFER_SIZE); - self.schedule_triggers - .entry(db) - .or_default() - .insert(trigger, tx); + self.schedule_triggers.insert((db_id, trigger_id), tx); rx } @@ -174,14 +163,12 @@ impl PluginChannels { } async fn send_wal_contents(&self, wal_contents: Arc) { - for (db, trigger_map) in &self.wal_triggers { - for (trigger, sender) in trigger_map { - if let Err(e) = sender - .send(WalEvent::WriteWalContents(Arc::clone(&wal_contents))) - .await - { - warn!(%e, %db, ?trigger, "error sending wal contents to plugin"); - } + for ((db_id, trigger_id), sender) in &self.wal_triggers { + if let Err(e) = sender + .send(WalEvent::WriteWalContents(Arc::clone(&wal_contents))) + .await + { + warn!(%e, %db_id, ?trigger_id, "error sending wal contents to plugin"); } } } @@ -358,32 +345,25 @@ impl LocalPlugin { } impl ProcessingEngineManagerImpl { - // TODO(trevor): should this be id-based and not use names? async fn run_trigger( self: Arc, - db_name: &str, - trigger_name: &str, + db_id: &DbId, + trigger_id: &TriggerId, ) -> Result<(), ProcessingEngineError> { - debug!(db_name, trigger_name, "starting trigger"); - { let db_schema = self .catalog - .db_schema(db_name) - .ok_or_else(|| ProcessingEngineError::DatabaseNotFound(db_name.to_string()))?; + .db_schema_by_id(db_id) + .ok_or_else(|| ProcessingEngineError::DatabaseNotFound(*db_id))?; let trigger = db_schema .processing_engine_triggers - .get_by_name(trigger_name) - .clone() - .ok_or_else(|| CatalogError::ProcessingEngineTriggerNotFound { - database_name: db_name.to_string(), - trigger_name: trigger_name.to_string(), - })?; + .get_by_id(trigger_id) + .ok_or_else(|| ProcessingEngineError::TriggerNotFound(*trigger_id))?; if trigger.node_id != self.node_id { error!( "Not running trigger {}, as it is configured for node id {}. Multi-node not supported in core, so this shouldn't happen.", - trigger_name, trigger.node_id + trigger.trigger_name, trigger.node_id ); return Ok(()); } @@ -401,10 +381,10 @@ impl ProcessingEngineManagerImpl { .plugin_event_tx .write() .await - .add_wal_trigger(db_name.to_string(), trigger_name.to_string()); + .add_wal_trigger(*db_id, *trigger_id); plugins::run_wal_contents_plugin( - db_name.to_string(), + *db_id, plugin_code, trigger, plugin_context, @@ -416,10 +396,10 @@ impl ProcessingEngineManagerImpl { .plugin_event_tx .write() .await - .add_schedule_trigger(db_name.to_string(), trigger_name.to_string()); + .add_schedule_trigger(*db_id, *trigger_id); plugins::run_schedule_plugin( - db_name.to_string(), + *db_id, plugin_code, trigger, Arc::clone(&self.time_provider), @@ -438,13 +418,7 @@ impl ProcessingEngineManagerImpl { .await .add_request_trigger(path.to_string()); - plugins::run_request_plugin( - db_name.to_string(), - plugin_code, - trigger, - plugin_context, - rec, - ) + plugins::run_request_plugin(*db_id, plugin_code, trigger, plugin_context, rec) } } } @@ -452,33 +426,25 @@ impl ProcessingEngineManagerImpl { Ok(()) } - // TODO(trevor): should this be id-based and not use names? async fn stop_trigger( &self, - db_name: &str, - trigger_name: &str, + db_id: &DbId, + trigger_id: &TriggerId, ) -> Result<(), ProcessingEngineError> { let db_schema = self .catalog - .db_schema(db_name) - .ok_or_else(|| ProcessingEngineError::DatabaseNotFound(db_name.to_string()))?; + .db_schema_by_id(db_id) + .ok_or_else(|| ProcessingEngineError::DatabaseNotFound(*db_id))?; let trigger = db_schema .processing_engine_triggers - .get_by_name(trigger_name) - .ok_or_else(|| CatalogError::ProcessingEngineTriggerNotFound { - database_name: db_name.to_string(), - trigger_name: trigger_name.to_string(), - })?; + .get_by_id(trigger_id) + .ok_or_else(|| ProcessingEngineError::TriggerNotFound(*trigger_id))?; let Some(shutdown_rx) = self .plugin_event_tx .write() .await - .send_shutdown( - db_name.to_string(), - trigger_name.to_string(), - &trigger.trigger, - ) + .send_shutdown(db_id, trigger_id, &trigger.trigger) .await? else { return Ok(()); @@ -489,25 +455,22 @@ impl ProcessingEngineManagerImpl { "shutdown trigger receiver dropped, may have received multiple shutdown requests" ); } else { - self.plugin_event_tx.write().await.remove_trigger( - db_name.to_string(), - trigger_name.to_string(), - &trigger.trigger, - ); + self.plugin_event_tx + .write() + .await + .remove_trigger(db_id, trigger_id, &trigger.trigger); } self.cache .lock() - .drop_trigger_cache(db_name.to_string(), trigger_name.to_string()); + .drop_trigger_cache(db_id.to_string(), trigger_id.to_string()); Ok(()) } pub async fn start_triggers(self: Arc) -> Result<(), ProcessingEngineError> { let triggers = self.catalog.active_triggers(); - for (db_name, trigger_name) in triggers { - Arc::clone(&self) - .run_trigger(&db_name, &trigger_name) - .await?; + for (db_id, trigger_id) in triggers { + Arc::clone(&self).run_trigger(&db_id, &trigger_id).await?; } Ok(()) } @@ -596,7 +559,6 @@ impl ProcessingEngineManagerImpl { body: request_body, response_tx: tx, }; - self.plugin_event_tx .write() .await @@ -680,14 +642,14 @@ fn background_catalog_update( let processing_engine_manager = Arc::clone(&processing_engine_manager); match op { DatabaseCatalogOp::CreateTrigger(TriggerDefinition { - trigger_name, - database_name, + trigger_id, + database_id, disabled, .. }) => { if !disabled { if let Err(error) = processing_engine_manager - .run_trigger(database_name, trigger_name) + .run_trigger(database_id, trigger_id) .await { error!(?error, "failed to run the created trigger"); @@ -695,36 +657,36 @@ fn background_catalog_update( } } DatabaseCatalogOp::EnableTrigger(TriggerIdentifier { - db_name, - trigger_name, + db_id, + trigger_id, .. }) => { if let Err(error) = processing_engine_manager - .run_trigger(db_name, trigger_name) + .run_trigger(db_id, trigger_id) .await { error!(?error, "failed to run the trigger"); } } DatabaseCatalogOp::DeleteTrigger(DeleteTriggerLog { - trigger_name, + trigger_id, force: true, .. }) => { if let Err(error) = processing_engine_manager - .stop_trigger(&batch.database_name, trigger_name) + .stop_trigger(&batch.database_id, trigger_id) .await { error!(?error, "failed to disable the trigger"); } } DatabaseCatalogOp::DisableTrigger(TriggerIdentifier { - db_name, - trigger_name, + db_id, + trigger_id, .. }) => { if let Err(error) = processing_engine_manager - .stop_trigger(db_name, trigger_name) + .stop_trigger(db_id, trigger_id) .await { error!(?error, "failed to disable the trigger"); diff --git a/influxdb3_processing_engine/src/manager.rs b/influxdb3_processing_engine/src/manager.rs index 75e1a1e3ce0..22edf0c95fc 100644 --- a/influxdb3_processing_engine/src/manager.rs +++ b/influxdb3_processing_engine/src/manager.rs @@ -1,12 +1,13 @@ use crate::environment::PluginEnvironmentError; use influxdb3_catalog::CatalogError; +use influxdb3_id::{DbId, TriggerId}; use std::fmt::Debug; use thiserror::Error; #[derive(Debug, Error)] pub enum ProcessingEngineError { #[error("database not found: {0}")] - DatabaseNotFound(String), + DatabaseNotFound(DbId), #[error("catalog update error: {0}")] CatalogUpdateError(#[from] CatalogError), @@ -23,10 +24,10 @@ pub enum ProcessingEngineError { #[error("plugin error: {0}")] PluginError(#[from] crate::plugins::PluginError), - #[error("failed to shutdown trigger {trigger_name} in database {database}")] + #[error("failed to shutdown trigger {trigger_id} in database {database_id}")] TriggerShutdownError { - database: String, - trigger_name: String, + database_id: DbId, + trigger_id: TriggerId, }, #[error("request trigger not found")] @@ -37,4 +38,7 @@ pub enum ProcessingEngineError { #[error("error installing python packages: {0}")] PythonPackageError(#[from] PluginEnvironmentError), + + #[error("trigger not found: {0}")] + TriggerNotFound(TriggerId), } diff --git a/influxdb3_processing_engine/src/plugins.rs b/influxdb3_processing_engine/src/plugins.rs index 3ee17c507c7..7aed5d90205 100644 --- a/influxdb3_processing_engine/src/plugins.rs +++ b/influxdb3_processing_engine/src/plugins.rs @@ -31,6 +31,7 @@ use std::str::FromStr; use std::sync::Arc; use thiserror::Error; +use influxdb3_id::DbId; use tokio::sync::mpsc; #[derive(Debug, Error)] @@ -87,13 +88,13 @@ pub enum PluginError { } pub(crate) fn run_wal_contents_plugin( - db_name: String, + db_id: DbId, plugin_code: Arc, trigger_definition: Arc, context: PluginContext, plugin_receiver: mpsc::Receiver, ) { - let trigger_plugin = TriggerPlugin::new(db_name, plugin_code, trigger_definition, context); + let trigger_plugin = TriggerPlugin::new(db_id, plugin_code, trigger_definition, context); tokio::task::spawn(async move { trigger_plugin @@ -111,7 +112,7 @@ pub struct ProcessingEngineEnvironmentManager { } pub(crate) fn run_schedule_plugin( - db_name: String, + db_id: DbId, plugin_code: Arc, trigger_definition: Arc, time_provider: Arc, @@ -127,7 +128,7 @@ pub(crate) fn run_schedule_plugin( ))); } - let trigger_plugin = TriggerPlugin::new(db_name, plugin_code, trigger_definition, context); + let trigger_plugin = TriggerPlugin::new(db_id, plugin_code, trigger_definition, context); let runner = python_plugin::ScheduleTriggerRunner::try_new( &trigger_plugin.trigger_definition.trigger, @@ -144,13 +145,13 @@ pub(crate) fn run_schedule_plugin( } pub(crate) fn run_request_plugin( - db_name: String, + db_id: DbId, plugin_code: Arc, trigger_definition: Arc, context: PluginContext, plugin_receiver: mpsc::Receiver, ) { - let trigger_plugin = TriggerPlugin::new(db_name, plugin_code, trigger_definition, context); + let trigger_plugin = TriggerPlugin::new(db_id, plugin_code, trigger_definition, context); tokio::task::spawn(async move { trigger_plugin .run_request_plugin(plugin_receiver) @@ -174,7 +175,7 @@ pub(crate) struct PluginContext { struct TriggerPlugin { trigger_definition: Arc, plugin_code: Arc, - db_name: String, + db_id: DbId, write_buffer: Arc, query_executor: Arc, manager: Arc, @@ -209,7 +210,7 @@ mod python_plugin { impl TriggerPlugin { pub(crate) fn new( - db_name: String, + db_id: DbId, plugin_code: Arc, trigger_definition: Arc, context: PluginContext, @@ -221,7 +222,7 @@ mod python_plugin { Self { trigger_definition, plugin_code, - db_name, + db_id, write_buffer: Arc::clone(&context.write_buffer), query_executor: Arc::clone(&context.query_executor), manager: Arc::clone(&context.manager), @@ -320,9 +321,9 @@ mod python_plugin { /// it is done in a separate task so that the caller can send back shutdown. pub(crate) fn send_disable_trigger(&self) { let manager = Arc::clone(&self.manager); - let db_name = Arc::clone(&self.trigger_definition.database_name); - let trigger_name = Arc::clone(&self.trigger_definition.trigger_name); - let fut = async move { manager.stop_trigger(&db_name, &trigger_name).await }; + let db_id = self.db_id; + let trigger_id = self.trigger_definition.trigger_id; + let fut = async move { manager.stop_trigger(&db_id, &trigger_id).await }; // start the disable call, then look for the shutdown message tokio::spawn(fut); } @@ -341,7 +342,7 @@ mod python_plugin { tokio::select! { _ = time_provider.sleep_until(next_run_instant) => { - let Some(schema) = self.write_buffer.catalog().db_schema(self.db_name.as_str()) else { + let Some(schema) = self.write_buffer.catalog().db_schema_by_id(&self.trigger_definition.database_id) else { return Err(PluginError::MissingDb); }; @@ -448,8 +449,10 @@ mod python_plugin { break; } Some(RequestEvent::Request(request)) => { - let Some(schema) = - self.write_buffer.catalog().db_schema(self.db_name.as_str()) + let Some(schema) = self + .write_buffer + .catalog() + .db_schema_by_id(&self.trigger_definition.database_id) else { error!(?self.trigger_definition, "missing db schema"); return Err(PluginError::MissingDb); @@ -546,7 +549,11 @@ mod python_plugin { &self, wal_contents: Arc, ) -> Result { - let Some(schema) = self.write_buffer.catalog().db_schema(self.db_name.as_str()) else { + let Some(schema) = self + .write_buffer + .catalog() + .db_schema_by_id(&self.trigger_definition.database_id) + else { return Err(PluginError::MissingDb); }; @@ -554,7 +561,7 @@ mod python_plugin { match wal_op { WalOp::Write(write_batch) => { // determine if this write batch is for this database - if write_batch.database_name != self.trigger_definition.database_name { + if write_batch.database_id != self.trigger_definition.database_id { continue; } let table_filter = match &self.trigger_definition.trigger { @@ -675,7 +682,8 @@ mod python_plugin { if let Err(e) = self .write_buffer .write_lp( - NamespaceName::new(self.db_name.clone()).unwrap(), + NamespaceName::new(self.trigger_definition.database_name.to_string()) + .unwrap(), plugin_return_state.write_back_lines.join("\n").as_str(), Time::from_timestamp_nanos(ingest_time.as_nanos() as i64), false,