Skip to content

Commit

Permalink
chore: pipeline dryrun api can currently receives pipeline raw content (
Browse files Browse the repository at this point in the history
#5142)

* chore: pipeline dryrun api can currently receives pipeline raw content

* chore: remove dryrun v1 and add test

* chore: change dryrun pipeline api body schema

* chore: remove useless struct PipelineInfo

* chore: update PipelineDryrunParams doc

* chore: increase code readability

* chore: add some comment for pipeline dryrun test

* Apply suggestions from code review

Co-authored-by: shuiyisong <[email protected]>

* chore: format code

---------

Co-authored-by: shuiyisong <[email protected]>
  • Loading branch information
paomian and shuiyisong authored Dec 12, 2024
1 parent fee75a1 commit e8e9526
Show file tree
Hide file tree
Showing 11 changed files with 304 additions and 140 deletions.
5 changes: 5 additions & 0 deletions src/frontend/src/instance/log_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use client::Output;
use common_error::ext::BoxedError;
use pipeline::pipeline_operator::PipelineOperator;
use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion};
use servers::error::{
AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult,
Expand Down Expand Up @@ -97,6 +98,10 @@ impl PipelineHandler for Instance {
.table(catalog, &schema, table, None)
.await
}

fn build_pipeline(&self, pipeline: &str) -> ServerResult<Pipeline<GreptimeTransformer>> {
PipelineOperator::build_pipeline(pipeline).context(PipelineSnafu)
}
}

impl Instance {
Expand Down
2 changes: 1 addition & 1 deletion src/pipeline/benches/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ transform:
type: uint32
"#;

parse(&Content::Yaml(pipeline_yaml.into())).unwrap()
parse(&Content::Yaml(pipeline_yaml)).unwrap()
}

fn criterion_benchmark(c: &mut Criterion) {
Expand Down
18 changes: 7 additions & 11 deletions src/pipeline/src/etl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ const PROCESSORS: &str = "processors";
const TRANSFORM: &str = "transform";
const TRANSFORMS: &str = "transforms";

pub enum Content {
Json(String),
Yaml(String),
pub enum Content<'a> {
Json(&'a str),
Yaml(&'a str),
}

pub fn parse<T>(input: &Content) -> Result<Pipeline<T>>
Expand Down Expand Up @@ -379,8 +379,7 @@ transform:
- field: field2
type: uint32
"#;
let pipeline: Pipeline<GreptimeTransformer> =
parse(&Content::Yaml(pipeline_yaml.into())).unwrap();
let pipeline: Pipeline<GreptimeTransformer> = parse(&Content::Yaml(pipeline_yaml)).unwrap();
let mut payload = pipeline.init_intermediate_state();
pipeline.prepare(input_value, &mut payload).unwrap();
assert_eq!(&["my_field"].to_vec(), pipeline.required_keys());
Expand Down Expand Up @@ -432,8 +431,7 @@ transform:
- field: ts
type: timestamp, ns
index: time"#;
let pipeline: Pipeline<GreptimeTransformer> =
parse(&Content::Yaml(pipeline_str.into())).unwrap();
let pipeline: Pipeline<GreptimeTransformer> = parse(&Content::Yaml(pipeline_str)).unwrap();
let mut payload = pipeline.init_intermediate_state();
pipeline
.prepare(serde_json::Value::String(message), &mut payload)
Expand Down Expand Up @@ -509,8 +507,7 @@ transform:
type: uint32
"#;

let pipeline: Pipeline<GreptimeTransformer> =
parse(&Content::Yaml(pipeline_yaml.into())).unwrap();
let pipeline: Pipeline<GreptimeTransformer> = parse(&Content::Yaml(pipeline_yaml)).unwrap();
let mut payload = pipeline.init_intermediate_state();
pipeline.prepare(input_value, &mut payload).unwrap();
assert_eq!(&["my_field"].to_vec(), pipeline.required_keys());
Expand Down Expand Up @@ -554,8 +551,7 @@ transform:
index: time
"#;

let pipeline: Pipeline<GreptimeTransformer> =
parse(&Content::Yaml(pipeline_yaml.into())).unwrap();
let pipeline: Pipeline<GreptimeTransformer> = parse(&Content::Yaml(pipeline_yaml)).unwrap();
let schema = pipeline.schemas().clone();
let mut result = pipeline.init_intermediate_state();
pipeline.prepare(input_value, &mut result).unwrap();
Expand Down
5 changes: 5 additions & 0 deletions src/pipeline/src/manager/pipeline_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,4 +243,9 @@ impl PipelineOperator {
})
.await
}

/// Compile a pipeline.
pub fn build_pipeline(pipeline: &str) -> Result<Pipeline<GreptimeTransformer>> {
PipelineTable::compile_pipeline(pipeline)
}
}
2 changes: 1 addition & 1 deletion src/pipeline/src/manager/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ impl PipelineTable {

/// Compile a pipeline from a string.
pub fn compile_pipeline(pipeline: &str) -> Result<Pipeline<GreptimeTransformer>> {
let yaml_content = Content::Yaml(pipeline.into());
let yaml_content = Content::Yaml(pipeline);
parse::<GreptimeTransformer>(&yaml_content).context(CompilePipelineSnafu)
}

Expand Down
2 changes: 1 addition & 1 deletion src/pipeline/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use pipeline::{parse, Content, GreptimeTransformer, Pipeline};
pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows {
let input_value = serde_json::from_str::<serde_json::Value>(input_str).unwrap();

let yaml_content = Content::Yaml(pipeline_yaml.into());
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline<GreptimeTransformer> =
parse(&yaml_content).expect("failed to parse pipeline");
let mut result = pipeline.init_intermediate_state();
Expand Down
2 changes: 1 addition & 1 deletion src/pipeline/tests/dissect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ transform:

let input_value = serde_json::from_str::<serde_json::Value>(input_str).unwrap();

let yaml_content = pipeline::Content::Yaml(pipeline_yaml.into());
let yaml_content = pipeline::Content::Yaml(pipeline_yaml);
let pipeline: pipeline::Pipeline<pipeline::GreptimeTransformer> =
pipeline::parse(&yaml_content).expect("failed to parse pipeline");
let mut result = pipeline.init_intermediate_state();
Expand Down
10 changes: 5 additions & 5 deletions src/pipeline/tests/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ transform:
.map(|(_, d)| GreptimeValue { value_data: d })
.collect::<Vec<GreptimeValue>>();

let yaml_content = Content::Yaml(pipeline_yaml.into());
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline<GreptimeTransformer> =
parse(&yaml_content).expect("failed to parse pipeline");
let mut stats = pipeline.init_intermediate_state();
Expand Down Expand Up @@ -487,7 +487,7 @@ transform:
type: json
"#;

let yaml_content = Content::Yaml(pipeline_yaml.into());
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline<GreptimeTransformer> = parse(&yaml_content).unwrap();

let mut status = pipeline.init_intermediate_state();
Expand Down Expand Up @@ -592,7 +592,7 @@ transform:
type: json
"#;

let yaml_content = Content::Yaml(pipeline_yaml.into());
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline<GreptimeTransformer> = parse(&yaml_content).unwrap();

let mut status = pipeline.init_intermediate_state();
Expand Down Expand Up @@ -655,7 +655,7 @@ transform:
index: timestamp
"#;

let yaml_content = Content::Yaml(pipeline_yaml.into());
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline<GreptimeTransformer> = parse(&yaml_content).unwrap();

let mut status = pipeline.init_intermediate_state();
Expand Down Expand Up @@ -691,7 +691,7 @@ transform:
- message
type: string
"#;
let yaml_content = Content::Yaml(pipeline_yaml.into());
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline<GreptimeTransformer> = parse(&yaml_content).unwrap();

let mut status = pipeline.init_intermediate_state();
Expand Down
142 changes: 109 additions & 33 deletions src/servers/src/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use lazy_static::lazy_static;
use loki_api::prost_types::Timestamp;
use pipeline::error::PipelineTransformSnafu;
use pipeline::util::to_pipeline_version;
use pipeline::PipelineVersion;
use pipeline::{GreptimeTransformer, PipelineVersion};
use prost::Message;
use serde::{Deserialize, Serialize};
use serde_json::{Deserializer, Map, Value};
Expand Down Expand Up @@ -276,39 +276,11 @@ fn transform_ndjson_array_factory(
})
}

#[axum_macros::debug_handler]
pub async fn pipeline_dryrun(
State(log_state): State<LogState>,
Query(query_params): Query<LogIngesterQueryParams>,
Extension(mut query_ctx): Extension<QueryContext>,
TypedHeader(content_type): TypedHeader<ContentType>,
payload: String,
/// Dryrun pipeline with given data
fn dryrun_pipeline_inner(
value: Vec<Value>,
pipeline: &pipeline::Pipeline<GreptimeTransformer>,
) -> Result<Response> {
let handler = log_state.log_handler;
let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu {
reason: "pipeline_name is required",
})?;

let version = to_pipeline_version(query_params.version).context(PipelineSnafu)?;

let ignore_errors = query_params.ignore_errors.unwrap_or(false);

let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;

ensure!(
value.len() <= 10,
InvalidParameterSnafu {
reason: "too many rows for dryrun",
}
);

query_ctx.set_channel(Channel::Http);
let query_ctx = Arc::new(query_ctx);

let pipeline = handler
.get_pipeline(&pipeline_name, version, query_ctx.clone())
.await?;

let mut intermediate_state = pipeline.init_intermediate_state();

let mut results = Vec::with_capacity(value.len());
Expand Down Expand Up @@ -387,6 +359,110 @@ pub async fn pipeline_dryrun(
Ok(Json(result).into_response())
}

/// Dryrun pipeline with given data
/// pipeline_name and pipeline_version to specify pipeline stored in db
/// pipeline to specify pipeline raw content
/// data to specify data
/// data maght be list of string or list of object
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct PipelineDryrunParams {
pub pipeline_name: Option<String>,
pub pipeline_version: Option<String>,
pub pipeline: Option<String>,
pub data: Vec<Value>,
}

/// Check if the payload is valid json
/// Check if the payload contains pipeline or pipeline_name and data
/// Return Some if valid, None if invalid
fn check_pipeline_dryrun_params_valid(payload: &str) -> Option<PipelineDryrunParams> {
match serde_json::from_str::<PipelineDryrunParams>(payload) {
// payload with pipeline or pipeline_name and data is array
Ok(params) if params.pipeline.is_some() || params.pipeline_name.is_some() => Some(params),
// because of the pipeline_name or pipeline is required
Ok(_) => None,
// invalid json
Err(_) => None,
}
}

/// Check if the pipeline_name exists
fn check_pipeline_name_exists(pipeline_name: Option<String>) -> Result<String> {
pipeline_name.context(InvalidParameterSnafu {
reason: "pipeline_name is required",
})
}

/// Check if the data length less than 10
fn check_data_valid(data_len: usize) -> Result<()> {
ensure!(
data_len <= 10,
InvalidParameterSnafu {
reason: "data is required",
}
);
Ok(())
}

#[axum_macros::debug_handler]
pub async fn pipeline_dryrun(
State(log_state): State<LogState>,
Query(query_params): Query<LogIngesterQueryParams>,
Extension(mut query_ctx): Extension<QueryContext>,
TypedHeader(content_type): TypedHeader<ContentType>,
payload: String,
) -> Result<Response> {
let handler = log_state.log_handler;

match check_pipeline_dryrun_params_valid(&payload) {
Some(params) => {
let data = params.data;

check_data_valid(data.len())?;

match params.pipeline {
None => {
let version =
to_pipeline_version(params.pipeline_version).context(PipelineSnafu)?;
let pipeline_name = check_pipeline_name_exists(params.pipeline_name)?;
let pipeline = handler
.get_pipeline(&pipeline_name, version, Arc::new(query_ctx))
.await?;
dryrun_pipeline_inner(data, &pipeline)
}
Some(pipeline) => {
let pipeline = handler.build_pipeline(&pipeline)?;
dryrun_pipeline_inner(data, &pipeline)
}
}
}
None => {
// This path is for back compatibility with the previous dry run code
// where the payload is just data (JSON or plain text) and the pipeline name
// is specified using query param.
let pipeline_name = check_pipeline_name_exists(query_params.pipeline_name)?;

let version = to_pipeline_version(query_params.version).context(PipelineSnafu)?;

let ignore_errors = query_params.ignore_errors.unwrap_or(false);

let value =
extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;

check_data_valid(value.len())?;

query_ctx.set_channel(Channel::Http);
let query_ctx = Arc::new(query_ctx);

let pipeline = handler
.get_pipeline(&pipeline_name, version, query_ctx.clone())
.await?;

dryrun_pipeline_inner(value, &pipeline)
}
}
}

#[axum_macros::debug_handler]
pub async fn loki_ingest(
State(log_state): State<LogState>,
Expand Down
3 changes: 3 additions & 0 deletions src/servers/src/query_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,7 @@ pub trait PipelineHandler {
table: &str,
query_ctx: &QueryContext,
) -> std::result::Result<Option<Arc<table::Table>>, catalog::error::Error>;

//// Build a pipeline from a string.
fn build_pipeline(&self, pipeline: &str) -> Result<Pipeline<GreptimeTransformer>>;
}
Loading

0 comments on commit e8e9526

Please sign in to comment.