diff --git a/src/frontend/src/instance/log_handler.rs b/src/frontend/src/instance/log_handler.rs index 9ae782c7d4ab..2da2d6717d3b 100644 --- a/src/frontend/src/instance/log_handler.rs +++ b/src/frontend/src/instance/log_handler.rs @@ -19,6 +19,7 @@ use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use client::Output; use common_error::ext::BoxedError; +use pipeline::pipeline_operator::PipelineOperator; use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion}; use servers::error::{ AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult, @@ -97,6 +98,10 @@ impl PipelineHandler for Instance { .table(catalog, &schema, table, None) .await } + + fn build_pipeline(&self, pipeline: &str) -> ServerResult> { + PipelineOperator::build_pipeline(pipeline).context(PipelineSnafu) + } } impl Instance { diff --git a/src/pipeline/benches/processor.rs b/src/pipeline/benches/processor.rs index 09462753d892..8cf221af5b10 100644 --- a/src/pipeline/benches/processor.rs +++ b/src/pipeline/benches/processor.rs @@ -223,7 +223,7 @@ transform: type: uint32 "#; - parse(&Content::Yaml(pipeline_yaml.into())).unwrap() + parse(&Content::Yaml(pipeline_yaml)).unwrap() } fn criterion_benchmark(c: &mut Criterion) { diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index 9bd47a899ec6..45feb4b02ff6 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -37,9 +37,9 @@ const PROCESSORS: &str = "processors"; const TRANSFORM: &str = "transform"; const TRANSFORMS: &str = "transforms"; -pub enum Content { - Json(String), - Yaml(String), +pub enum Content<'a> { + Json(&'a str), + Yaml(&'a str), } pub fn parse(input: &Content) -> Result> @@ -379,8 +379,7 @@ transform: - field: field2 type: uint32 "#; - let pipeline: Pipeline = - parse(&Content::Yaml(pipeline_yaml.into())).unwrap(); + let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); let mut payload = pipeline.init_intermediate_state(); pipeline.prepare(input_value, &mut payload).unwrap(); assert_eq!(&["my_field"].to_vec(), pipeline.required_keys()); @@ -432,8 +431,7 @@ transform: - field: ts type: timestamp, ns index: time"#; - let pipeline: Pipeline = - parse(&Content::Yaml(pipeline_str.into())).unwrap(); + let pipeline: Pipeline = parse(&Content::Yaml(pipeline_str)).unwrap(); let mut payload = pipeline.init_intermediate_state(); pipeline .prepare(serde_json::Value::String(message), &mut payload) @@ -509,8 +507,7 @@ transform: type: uint32 "#; - let pipeline: Pipeline = - parse(&Content::Yaml(pipeline_yaml.into())).unwrap(); + let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); let mut payload = pipeline.init_intermediate_state(); pipeline.prepare(input_value, &mut payload).unwrap(); assert_eq!(&["my_field"].to_vec(), pipeline.required_keys()); @@ -554,8 +551,7 @@ transform: index: time "#; - let pipeline: Pipeline = - parse(&Content::Yaml(pipeline_yaml.into())).unwrap(); + let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); let schema = pipeline.schemas().clone(); let mut result = pipeline.init_intermediate_state(); pipeline.prepare(input_value, &mut result).unwrap(); diff --git a/src/pipeline/src/manager/pipeline_operator.rs b/src/pipeline/src/manager/pipeline_operator.rs index 2e838144a483..4f43b89e2e74 100644 --- a/src/pipeline/src/manager/pipeline_operator.rs +++ b/src/pipeline/src/manager/pipeline_operator.rs @@ -243,4 +243,9 @@ impl PipelineOperator { }) .await } + + /// Compile a pipeline. + pub fn build_pipeline(pipeline: &str) -> Result> { + PipelineTable::compile_pipeline(pipeline) + } } diff --git a/src/pipeline/src/manager/table.rs b/src/pipeline/src/manager/table.rs index 7b3719b66707..c2a36c63ec6d 100644 --- a/src/pipeline/src/manager/table.rs +++ b/src/pipeline/src/manager/table.rs @@ -203,7 +203,7 @@ impl PipelineTable { /// Compile a pipeline from a string. pub fn compile_pipeline(pipeline: &str) -> Result> { - let yaml_content = Content::Yaml(pipeline.into()); + let yaml_content = Content::Yaml(pipeline); parse::(&yaml_content).context(CompilePipelineSnafu) } diff --git a/src/pipeline/tests/common.rs b/src/pipeline/tests/common.rs index aa96d14d5591..d825c91e4cb3 100644 --- a/src/pipeline/tests/common.rs +++ b/src/pipeline/tests/common.rs @@ -19,7 +19,7 @@ use pipeline::{parse, Content, GreptimeTransformer, Pipeline}; pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows { let input_value = serde_json::from_str::(input_str).unwrap(); - let yaml_content = Content::Yaml(pipeline_yaml.into()); + let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).expect("failed to parse pipeline"); let mut result = pipeline.init_intermediate_state(); diff --git a/src/pipeline/tests/dissect.rs b/src/pipeline/tests/dissect.rs index 7577d58080c7..56386d0e860a 100644 --- a/src/pipeline/tests/dissect.rs +++ b/src/pipeline/tests/dissect.rs @@ -270,7 +270,7 @@ transform: let input_value = serde_json::from_str::(input_str).unwrap(); - let yaml_content = pipeline::Content::Yaml(pipeline_yaml.into()); + let yaml_content = pipeline::Content::Yaml(pipeline_yaml); let pipeline: pipeline::Pipeline = pipeline::parse(&yaml_content).expect("failed to parse pipeline"); let mut result = pipeline.init_intermediate_state(); diff --git a/src/pipeline/tests/pipeline.rs b/src/pipeline/tests/pipeline.rs index e68c7b9e6a6e..de724e1a27d2 100644 --- a/src/pipeline/tests/pipeline.rs +++ b/src/pipeline/tests/pipeline.rs @@ -417,7 +417,7 @@ transform: .map(|(_, d)| GreptimeValue { value_data: d }) .collect::>(); - let yaml_content = Content::Yaml(pipeline_yaml.into()); + let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).expect("failed to parse pipeline"); let mut stats = pipeline.init_intermediate_state(); @@ -487,7 +487,7 @@ transform: type: json "#; - let yaml_content = Content::Yaml(pipeline_yaml.into()); + let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); let mut status = pipeline.init_intermediate_state(); @@ -592,7 +592,7 @@ transform: type: json "#; - let yaml_content = Content::Yaml(pipeline_yaml.into()); + let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); let mut status = pipeline.init_intermediate_state(); @@ -655,7 +655,7 @@ transform: index: timestamp "#; - let yaml_content = Content::Yaml(pipeline_yaml.into()); + let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); let mut status = pipeline.init_intermediate_state(); @@ -691,7 +691,7 @@ transform: - message type: string "#; - let yaml_content = Content::Yaml(pipeline_yaml.into()); + let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); let mut status = pipeline.init_intermediate_state(); diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 5069db51975d..b6b520627d66 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -38,7 +38,7 @@ use lazy_static::lazy_static; use loki_api::prost_types::Timestamp; use pipeline::error::PipelineTransformSnafu; use pipeline::util::to_pipeline_version; -use pipeline::PipelineVersion; +use pipeline::{GreptimeTransformer, PipelineVersion}; use prost::Message; use serde::{Deserialize, Serialize}; use serde_json::{Deserializer, Map, Value}; @@ -276,39 +276,11 @@ fn transform_ndjson_array_factory( }) } -#[axum_macros::debug_handler] -pub async fn pipeline_dryrun( - State(log_state): State, - Query(query_params): Query, - Extension(mut query_ctx): Extension, - TypedHeader(content_type): TypedHeader, - payload: String, +/// Dryrun pipeline with given data +fn dryrun_pipeline_inner( + value: Vec, + pipeline: &pipeline::Pipeline, ) -> Result { - let handler = log_state.log_handler; - let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu { - reason: "pipeline_name is required", - })?; - - let version = to_pipeline_version(query_params.version).context(PipelineSnafu)?; - - let ignore_errors = query_params.ignore_errors.unwrap_or(false); - - let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?; - - ensure!( - value.len() <= 10, - InvalidParameterSnafu { - reason: "too many rows for dryrun", - } - ); - - query_ctx.set_channel(Channel::Http); - let query_ctx = Arc::new(query_ctx); - - let pipeline = handler - .get_pipeline(&pipeline_name, version, query_ctx.clone()) - .await?; - let mut intermediate_state = pipeline.init_intermediate_state(); let mut results = Vec::with_capacity(value.len()); @@ -387,6 +359,110 @@ pub async fn pipeline_dryrun( Ok(Json(result).into_response()) } +/// Dryrun pipeline with given data +/// pipeline_name and pipeline_version to specify pipeline stored in db +/// pipeline to specify pipeline raw content +/// data to specify data +/// data maght be list of string or list of object +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct PipelineDryrunParams { + pub pipeline_name: Option, + pub pipeline_version: Option, + pub pipeline: Option, + pub data: Vec, +} + +/// Check if the payload is valid json +/// Check if the payload contains pipeline or pipeline_name and data +/// Return Some if valid, None if invalid +fn check_pipeline_dryrun_params_valid(payload: &str) -> Option { + match serde_json::from_str::(payload) { + // payload with pipeline or pipeline_name and data is array + Ok(params) if params.pipeline.is_some() || params.pipeline_name.is_some() => Some(params), + // because of the pipeline_name or pipeline is required + Ok(_) => None, + // invalid json + Err(_) => None, + } +} + +/// Check if the pipeline_name exists +fn check_pipeline_name_exists(pipeline_name: Option) -> Result { + pipeline_name.context(InvalidParameterSnafu { + reason: "pipeline_name is required", + }) +} + +/// Check if the data length less than 10 +fn check_data_valid(data_len: usize) -> Result<()> { + ensure!( + data_len <= 10, + InvalidParameterSnafu { + reason: "data is required", + } + ); + Ok(()) +} + +#[axum_macros::debug_handler] +pub async fn pipeline_dryrun( + State(log_state): State, + Query(query_params): Query, + Extension(mut query_ctx): Extension, + TypedHeader(content_type): TypedHeader, + payload: String, +) -> Result { + let handler = log_state.log_handler; + + match check_pipeline_dryrun_params_valid(&payload) { + Some(params) => { + let data = params.data; + + check_data_valid(data.len())?; + + match params.pipeline { + None => { + let version = + to_pipeline_version(params.pipeline_version).context(PipelineSnafu)?; + let pipeline_name = check_pipeline_name_exists(params.pipeline_name)?; + let pipeline = handler + .get_pipeline(&pipeline_name, version, Arc::new(query_ctx)) + .await?; + dryrun_pipeline_inner(data, &pipeline) + } + Some(pipeline) => { + let pipeline = handler.build_pipeline(&pipeline)?; + dryrun_pipeline_inner(data, &pipeline) + } + } + } + None => { + // This path is for back compatibility with the previous dry run code + // where the payload is just data (JSON or plain text) and the pipeline name + // is specified using query param. + let pipeline_name = check_pipeline_name_exists(query_params.pipeline_name)?; + + let version = to_pipeline_version(query_params.version).context(PipelineSnafu)?; + + let ignore_errors = query_params.ignore_errors.unwrap_or(false); + + let value = + extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?; + + check_data_valid(value.len())?; + + query_ctx.set_channel(Channel::Http); + let query_ctx = Arc::new(query_ctx); + + let pipeline = handler + .get_pipeline(&pipeline_name, version, query_ctx.clone()) + .await?; + + dryrun_pipeline_inner(value, &pipeline) + } + } +} + #[axum_macros::debug_handler] pub async fn loki_ingest( State(log_state): State, diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index 96a01593a8f1..ff92d3c5d15b 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -170,4 +170,7 @@ pub trait PipelineHandler { table: &str, query_ctx: &QueryContext, ) -> std::result::Result>, catalog::error::Error>; + + //// Build a pipeline from a string. + fn build_pipeline(&self, pipeline: &str) -> Result>; } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 5a48fef39e43..ab2ec4ea6777 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1319,7 +1319,7 @@ pub async fn test_test_pipeline_api(store_type: StorageType) { // handshake let client = TestClient::new(app); - let body = r#" + let pipeline_content = r#" processors: - date: field: time @@ -1346,7 +1346,7 @@ transform: let res = client .post("/v1/events/pipelines/test") .header("Content-Type", "application/x-yaml") - .body(body) + .body(pipeline_content) .send() .await; @@ -1367,113 +1367,192 @@ transform: let pipeline = pipelines.first().unwrap(); assert_eq!(pipeline.get("name").unwrap(), "test"); - // 2. write data - let data_body = r#" + let dryrun_schema = json!([ + { + "colume_type": "FIELD", + "data_type": "INT32", + "fulltext": false, + "name": "id1" + }, + { + "colume_type": "FIELD", + "data_type": "INT32", + "fulltext": false, + "name": "id2" + }, + { + "colume_type": "FIELD", + "data_type": "STRING", + "fulltext": false, + "name": "type" + }, + { + "colume_type": "FIELD", + "data_type": "STRING", + "fulltext": false, + "name": "log" + }, + { + "colume_type": "FIELD", + "data_type": "STRING", + "fulltext": false, + "name": "logger" + }, + { + "colume_type": "TIMESTAMP", + "data_type": "TIMESTAMP_NANOSECOND", + "fulltext": false, + "name": "time" + } + ]); + let dryrun_rows = json!([ [ - { - "id1": "2436", - "id2": "2528", - "logger": "INTERACT.MANAGER", - "type": "I", - "time": "2024-05-25 20:16:37.217", - "log": "ClusterAdapter:enter sendTextDataToCluster\\n" - } - ] - "#; - let res = client - .post("/v1/events/pipelines/dryrun?pipeline_name=test") - .header("Content-Type", "application/json") - .body(data_body) - .send() - .await; - assert_eq!(res.status(), StatusCode::OK); - let body: Value = res.json().await; - let schema = &body["schema"]; - let rows = &body["rows"]; - assert_eq!( - schema, - &json!([ { - "colume_type": "FIELD", "data_type": "INT32", - "fulltext": false, - "name": "id1" + "key": "id1", + "semantic_type": "FIELD", + "value": 2436 }, { - "colume_type": "FIELD", "data_type": "INT32", - "fulltext": false, - "name": "id2" + "key": "id2", + "semantic_type": "FIELD", + "value": 2528 }, { - "colume_type": "FIELD", "data_type": "STRING", - "fulltext": false, - "name": "type" + "key": "type", + "semantic_type": "FIELD", + "value": "I" }, { - "colume_type": "FIELD", "data_type": "STRING", - "fulltext": false, - "name": "log" + "key": "log", + "semantic_type": "FIELD", + "value": "ClusterAdapter:enter sendTextDataToCluster\\n" }, { - "colume_type": "FIELD", "data_type": "STRING", - "fulltext": false, - "name": "logger" + "key": "logger", + "semantic_type": "FIELD", + "value": "INTERACT.MANAGER" }, { - "colume_type": "TIMESTAMP", "data_type": "TIMESTAMP_NANOSECOND", - "fulltext": false, - "name": "time" + "key": "time", + "semantic_type": "TIMESTAMP", + "value": "2024-05-25 20:16:37.217+0000" } - ]) - ); - assert_eq!( - rows, - &json!([ - [ - { - "data_type": "INT32", - "key": "id1", - "semantic_type": "FIELD", - "value": 2436 - }, - { - "data_type": "INT32", - "key": "id2", - "semantic_type": "FIELD", - "value": 2528 - }, - { - "data_type": "STRING", - "key": "type", - "semantic_type": "FIELD", - "value": "I" - }, - { - "data_type": "STRING", - "key": "log", - "semantic_type": "FIELD", - "value": "ClusterAdapter:enter sendTextDataToCluster\\n" - }, - { - "data_type": "STRING", - "key": "logger", - "semantic_type": "FIELD", - "value": "INTERACT.MANAGER" - }, + ] + ]); + { + // test original api + let data_body = r#" + [ + { + "id1": "2436", + "id2": "2528", + "logger": "INTERACT.MANAGER", + "type": "I", + "time": "2024-05-25 20:16:37.217", + "log": "ClusterAdapter:enter sendTextDataToCluster\\n" + } + ] + "#; + let res = client + .post("/v1/events/pipelines/dryrun?pipeline_name=test") + .header("Content-Type", "application/json") + .body(data_body) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body: Value = res.json().await; + let schema = &body["schema"]; + let rows = &body["rows"]; + assert_eq!(schema, &dryrun_schema); + assert_eq!(rows, &dryrun_rows); + } + { + // test new api specify pipeline via pipeline_name + let body = r#" + { + "pipeline_name": "test", + "data": [ { - "data_type": "TIMESTAMP_NANOSECOND", - "key": "time", - "semantic_type": "TIMESTAMP", - "value": "2024-05-25 20:16:37.217+0000" + "id1": "2436", + "id2": "2528", + "logger": "INTERACT.MANAGER", + "type": "I", + "time": "2024-05-25 20:16:37.217", + "log": "ClusterAdapter:enter sendTextDataToCluster\\n" } ] - ]) - ); + } + "#; + let res = client + .post("/v1/events/pipelines/dryrun") + .header("Content-Type", "application/json") + .body(body) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body: Value = res.json().await; + let schema = &body["schema"]; + let rows = &body["rows"]; + assert_eq!(schema, &dryrun_schema); + assert_eq!(rows, &dryrun_rows); + } + { + // test new api specify pipeline via pipeline raw data + let mut body = json!({ + "data": [ + { + "id1": "2436", + "id2": "2528", + "logger": "INTERACT.MANAGER", + "type": "I", + "time": "2024-05-25 20:16:37.217", + "log": "ClusterAdapter:enter sendTextDataToCluster\\n" + } + ] + }); + body["pipeline"] = json!(pipeline_content); + let res = client + .post("/v1/events/pipelines/dryrun") + .header("Content-Type", "application/json") + .body(body.to_string()) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body: Value = res.json().await; + let schema = &body["schema"]; + let rows = &body["rows"]; + assert_eq!(schema, &dryrun_schema); + assert_eq!(rows, &dryrun_rows); + } + { + // failback to old version api + // not pipeline and pipeline_name in the body + let body = json!({ + "data": [ + { + "id1": "2436", + "id2": "2528", + "logger": "INTERACT.MANAGER", + "type": "I", + "time": "2024-05-25 20:16:37.217", + "log": "ClusterAdapter:enter sendTextDataToCluster\\n" + } + ] + }); + let res = client + .post("/v1/events/pipelines/dryrun") + .header("Content-Type", "application/json") + .body(body.to_string()) + .send() + .await; + assert_eq!(res.status(), StatusCode::BAD_REQUEST); + } guard.remove_all().await; }