diff --git a/Cargo.lock b/Cargo.lock index 6fc5d8ffe94..03dcc88b54b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3414,6 +3414,7 @@ dependencies = [ "influxdb3_cache", "influxdb3_catalog", "influxdb3_client", + "influxdb3_id", "influxdb3_internal_api", "influxdb3_py_api", "influxdb3_shutdown", diff --git a/influxdb3_processing_engine/Cargo.toml b/influxdb3_processing_engine/Cargo.toml index 8c0256e9ba1..d48a70f9085 100644 --- a/influxdb3_processing_engine/Cargo.toml +++ b/influxdb3_processing_engine/Cargo.toml @@ -45,6 +45,7 @@ object_store.workspace = true parquet_file.workspace = true tempfile.workspace = true test-log.workspace = true +influxdb3_id = { path = "../influxdb3_id" } [lints] workspace = true diff --git a/influxdb3_processing_engine/src/plugins.rs b/influxdb3_processing_engine/src/plugins.rs index a362e2245ec..b3d6189255f 100644 --- a/influxdb3_processing_engine/src/plugins.rs +++ b/influxdb3_processing_engine/src/plugins.rs @@ -1077,8 +1077,10 @@ pub(crate) fn run_test_schedule_plugin( mod tests { use super::*; use crate::virtualenv::init_pyo3; + use chrono::{TimeZone, Utc}; use hashbrown::HashMap; - use influxdb3_catalog::catalog::Catalog; + use influxdb3_catalog::catalog::{Catalog, DatabaseSchema}; + use influxdb3_id::DbId; use influxdb3_internal_api::query_executor::UnimplementedQueryExecutor; use influxdb3_write::Precision; use influxdb3_write::write_buffer::validator::WriteValidator; @@ -1208,6 +1210,7 @@ def process_writes(influxdb3_local, table_batches, args=None): .await .unwrap(), ); + catalog.create_database("foo").await.unwrap(); let namespace = NamespaceName::new("foodb").unwrap(); let validator = WriteValidator::initialize(namespace.clone(), Arc::clone(&catalog)).unwrap(); @@ -1288,4 +1291,40 @@ def process_writes(influxdb3_local, table_batches, args=None): let expected_error = "line protocol parse error on write to db foodb: WriteLineError { original_line: \"cpu,host=A f1=not_an_int\", line_number: 2, error_message: \"invalid column type for column 'f1', expected iox::column_type::field::integer, got iox::column_type::field::string\" }"; assert_eq!(response.errors[0], expected_error); } + + #[tokio::test] + async fn test_schedule_plugin_py_api_surface_area() { + init_pyo3(); + + let code = r#" +def process_scheduled_call(influxdb3_local, call_time, args=None): + allowed = {"info", "warn", "error", "query", "write", "cache", "write_to_db"} + attrs = {name for name in dir(influxdb3_local) if not name.startswith("__")} + extras = attrs - allowed + missing = allowed - attrs + if extras or missing: + raise RuntimeError(f"unexpected attributes: extras={sorted(extras)}, missing={sorted(missing)}") +"#; + + let cache = Arc::new(Mutex::new(CacheStore::new( + Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))), + Duration::from_secs(10), + ))); + + let result = influxdb3_py_api::system_py::execute_schedule_trigger( + code, + Utc.timestamp_opt(0, 0).unwrap(), + Arc::new(DatabaseSchema::new(DbId::from(0), Arc::from("test_db"))), + Arc::new(UnimplementedQueryExecutor), + None, + &None::>, + PyCache::new_test_cache(cache, "_shared_test".to_string()), + None, + ); + + assert!( + result.is_ok(), + "PyPluginCallApi exposes unexpected Python methods: {result:?}" + ); + } } diff --git a/influxdb3_py_api/src/system_py.rs b/influxdb3_py_api/src/system_py.rs index 5c7e0e8b547..b660784cf9a 100644 --- a/influxdb3_py_api/src/system_py.rs +++ b/influxdb3_py_api/src/system_py.rs @@ -157,15 +157,6 @@ impl PyPluginCallApi { Ok(()) } - fn log_args_to_string(&self, args: &Bound<'_, PyTuple>) -> PyResult { - let line = args - .try_iter()? - .map(|arg| arg?.str()?.extract::()) - .collect::, _>>()? - .join(" "); - Ok(line) - } - fn write(&self, line_builder: &Bound<'_, PyAny>) -> PyResult<()> { // Get the built line from the LineBuilder object let line = line_builder.getattr("build")?.call0()?; @@ -322,6 +313,15 @@ impl PyPluginCallApi { logger.log(level, log_line); } } + + fn log_args_to_string(&self, args: &Bound<'_, PyTuple>) -> PyResult { + let line = args + .try_iter()? + .map(|arg| arg?.str()?.extract::()) + .collect::, _>>()? + .join(" "); + Ok(line) + } } // constant for the process writes call site string