From 7c815ef0f346881cedb26abc50cfc138403faacf Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 20 Feb 2023 19:23:43 +0800 Subject: [PATCH 1/7] feat: don't report metric/label not found as error Signed-off-by: Ruihang Xia --- src/promql/src/error.rs | 10 ++++++---- src/promql/src/planner.rs | 2 +- src/servers/src/promql.rs | 18 +++++++++++++++++- 3 files changed, 24 insertions(+), 6 deletions(-) diff --git a/src/promql/src/error.rs b/src/promql/src/error.rs index 0f422a46295e..fb7fc36c7da4 100644 --- a/src/promql/src/error.rs +++ b/src/promql/src/error.rs @@ -107,15 +107,17 @@ impl ErrorExt for Error { | UnsupportedExpr { .. } | UnexpectedToken { .. } | MultipleVector { .. } - | LabelNotFound { .. } | ExpectExpr { .. } => StatusCode::InvalidArguments, + UnknownTable { .. } - | TableNotFound { .. } | DataFusionPlanning { .. } | UnexpectedPlanExpr { .. } | IllegalRange { .. } - | EmptyRange { .. } - | TableNameNotFound { .. } => StatusCode::Internal, + | EmptyRange { .. } => StatusCode::Internal, + + TableNotFound { .. } | TableNameNotFound { .. } => StatusCode::TableNotFound, + + LabelNotFound { .. } => StatusCode::TableColumnNotFound, } } fn backtrace_opt(&self) -> Option<&Backtrace> { diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 2f6fec3eebd6..6d9962bda189 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -495,7 +495,7 @@ impl PromPlanner { let table = self .schema_provider .get_table_provider(TableReference::Bare { table: &table_name }) - .context(DataFusionPlanningSnafu)? + .context(TableNotFoundSnafu { table: &table_name })? .as_any() .downcast_ref::() .context(UnknownTableSnafu)? diff --git a/src/servers/src/promql.rs b/src/servers/src/promql.rs index 7c2379192afb..44d52c4af101 100644 --- a/src/servers/src/promql.rs +++ b/src/servers/src/promql.rs @@ -21,6 +21,7 @@ use axum::body::BoxBody; use axum::extract::{Query, State}; use axum::{routing, Form, Json, Router}; use common_error::prelude::ErrorExt; +use common_error::status_code::StatusCode; use common_query::Output; use common_recordbatch::RecordBatches; use common_telemetry::info; @@ -216,7 +217,22 @@ impl PromqlJsonResponse { json }; - response.unwrap_or_else(|err| Self::error(err.status_code().to_string(), err.to_string())) + match response { + Ok(resp) => resp, + Err(err) => { + // Prometheus won't report error if querying nonexist label and metric + if err.status_code() == StatusCode::TableNotFound + || err.status_code() == StatusCode::TableColumnNotFound + { + Self::success(PromqlData { + result_type: "matrix".to_string(), + ..Default::default() + }) + } else { + Self::error(err.status_code().to_string(), err.to_string()) + } + } + } } fn record_batches_to_data(batches: RecordBatches, metric_name: String) -> Result { From 351b095a3f95942511091a2d8ffbf2091ecb45ca Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 20 Feb 2023 19:45:34 +0800 Subject: [PATCH 2/7] feat: impl unary expr Signed-off-by: Ruihang Xia --- src/promql/src/planner.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 6d9962bda189..1472144c49f4 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -129,10 +129,13 @@ impl PromPlanner { .build() .context(DataFusionPlanningSnafu)? } - PromExpr::Unary(UnaryExpr { .. }) => UnsupportedExprSnafu { - name: "Prom Unary Expr", + PromExpr::Unary(UnaryExpr { expr }) => { + // Unary Expr in PromQL implys the `-` operator + let input = self.prom_expr_to_plan(*expr.clone())?; + self.projection_for_each_value_column(input, |col| { + Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into())))) + })? } - .fail()?, PromExpr::Binary(PromBinaryExpr { lhs, rhs, op, .. }) => { match ( Self::try_build_literal_expr(lhs), From 6cbcc99029d8c8471ced654f77930463d40202c6 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 20 Feb 2023 19:53:00 +0800 Subject: [PATCH 3/7] feat: impl paren expr Signed-off-by: Ruihang Xia --- src/promql/src/planner.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 1472144c49f4..8a7e010e31da 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -202,10 +202,7 @@ impl PromPlanner { } } } - PromExpr::Paren(ParenExpr { .. }) => UnsupportedExprSnafu { - name: "Prom Paren Expr", - } - .fail()?, + PromExpr::Paren(ParenExpr { expr }) => self.prom_expr_to_plan(*expr.clone())?, PromExpr::Subquery(SubqueryExpr { .. }) => UnsupportedExprSnafu { name: "Prom Subquery", } From 987a0036deff672d3dffaae08a3caab8f66dd17e Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 20 Feb 2023 20:43:20 +0800 Subject: [PATCH 4/7] feat: support bool keyword Signed-off-by: Ruihang Xia --- src/promql/src/planner.rs | 68 ++++++++++++++++++++++++++++++++++----- 1 file changed, 60 insertions(+), 8 deletions(-) diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 8a7e010e31da..560f08615060 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -22,7 +22,7 @@ use datafusion::datasource::DefaultTableSource; use datafusion::logical_expr::expr::AggregateFunction; use datafusion::logical_expr::expr_rewriter::normalize_cols; use datafusion::logical_expr::{ - AggregateFunction as AggregateFunctionEnum, BinaryExpr, BuiltinScalarFunction, Extension, + AggregateFunction as AggregateFunctionEnum, BinaryExpr, BuiltinScalarFunction, Cast, Extension, LogicalPlan, LogicalPlanBuilder, Operator, }; use datafusion::optimizer::utils; @@ -30,6 +30,7 @@ use datafusion::prelude::{Column, Expr as DfExpr, JoinType}; use datafusion::scalar::ScalarValue; use datafusion::sql::planner::ContextProvider; use datafusion::sql::TableReference; +use datatypes::arrow::datatypes::DataType as ArrowDataType; use promql_parser::label::{MatchOp, Matchers, METRIC_NAME}; use promql_parser::parser::{ token, AggModifier, AggregateExpr, BinaryExpr as PromBinaryExpr, Call, EvalStmt, @@ -136,7 +137,18 @@ impl PromPlanner { Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into())))) })? } - PromExpr::Binary(PromBinaryExpr { lhs, rhs, op, .. }) => { + PromExpr::Binary(PromBinaryExpr { + lhs, + rhs, + op, + modifier, + }) => { + let should_cast_to_bool = if let Some(modifier) = modifier { + modifier.return_bool && Self::is_token_a_comparison_op(*op) + } else { + false + }; + match ( Self::try_build_literal_expr(lhs), Self::try_build_literal_expr(rhs), @@ -150,22 +162,36 @@ impl PromPlanner { (Some(expr), None) => { let input = self.prom_expr_to_plan(*rhs.clone())?; self.projection_for_each_value_column(input, |col| { - Ok(DfExpr::BinaryExpr(BinaryExpr { + let mut binary_expr = DfExpr::BinaryExpr(BinaryExpr { left: Box::new(expr.clone()), op: Self::prom_token_to_binary_op(*op)?, right: Box::new(DfExpr::Column(col.into())), - })) + }); + if should_cast_to_bool { + binary_expr = DfExpr::Cast(Cast { + expr: Box::new(binary_expr), + data_type: ArrowDataType::Float64, + }); + } + Ok(binary_expr) })? } // lhs is a column, rhs is a literal (None, Some(expr)) => { let input = self.prom_expr_to_plan(*lhs.clone())?; self.projection_for_each_value_column(input, |col| { - Ok(DfExpr::BinaryExpr(BinaryExpr { + let mut binary_expr = DfExpr::BinaryExpr(BinaryExpr { left: Box::new(DfExpr::Column(col.into())), op: Self::prom_token_to_binary_op(*op)?, right: Box::new(expr.clone()), - })) + }); + if should_cast_to_bool { + binary_expr = DfExpr::Cast(Cast { + expr: Box::new(binary_expr), + data_type: ArrowDataType::Float64, + }); + } + Ok(binary_expr) })? } // both are columns. join them on time index @@ -193,11 +219,18 @@ impl PromPlanner { .context(DataFusionPlanningSnafu)? .qualified_column(); - Ok(DfExpr::BinaryExpr(BinaryExpr { + let mut binary_expr = DfExpr::BinaryExpr(BinaryExpr { left: Box::new(DfExpr::Column(left_col)), op: Self::prom_token_to_binary_op(*op)?, right: Box::new(DfExpr::Column(right_col)), - })) + }); + if should_cast_to_bool { + binary_expr = DfExpr::Cast(Cast { + expr: Box::new(binary_expr), + data_type: ArrowDataType::Float64, + }); + } + Ok(binary_expr) })? } } @@ -750,6 +783,19 @@ impl PromPlanner { } } + /// Check if the given op is a [comparison operator](https://prometheus.io/docs/prometheus/latest/querying/operators/#comparison-binary-operators). + fn is_token_a_comparison_op(token: TokenType) -> bool { + match token.id() { + token::T_EQLC + | token::T_NEQ + | token::T_GTR + | token::T_LSS + | token::T_GTE + | token::T_LTE => true, + _ => false, + } + } + /// Build a inner join on time index column and tag columns to concat two logical plans. /// The left plan will be alised as [`LEFT_PLAN_JOIN_ALIAS`]. fn join_on_non_value_columns( @@ -1364,4 +1410,10 @@ mod test { let plan_result = PromPlanner::stmt_to_plan(eval_stmt, context_provider); assert!(plan_result.is_err()); } + + #[test] + fn bool_grammar() { + let prom_expr = parser::parse("demo_num_cpus + (1 != bool 2)").unwrap(); + println!("prom_expr: {:#?}", prom_expr); + } } From bf4bdc50a364fdcde331623a8faed4e1ffd9db4b Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 21 Feb 2023 11:42:07 +0800 Subject: [PATCH 5/7] add some tests Signed-off-by: Ruihang Xia --- src/promql/src/planner.rs | 76 +++++++++++++++++++++++++++------------ 1 file changed, 53 insertions(+), 23 deletions(-) diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 560f08615060..7e2fe290b17d 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -1363,9 +1363,8 @@ mod test { assert_eq!(plan.display_indent_schema().to_string(), expected); } - #[tokio::test] - async fn binary_op_literal_column() { - let prom_expr = parser::parse(r#"1 + some_metric{tag_0="bar"}"#).unwrap(); + async fn indie_query_plan_compare(query: &str, expected: String) { + let prom_expr = parser::parse(query).unwrap(); let eval_stmt = EvalStmt { expr: prom_expr, start: UNIX_EPOCH, @@ -1379,7 +1378,13 @@ mod test { let context_provider = build_test_context_provider("some_metric".to_string(), 1, 1).await; let plan = PromPlanner::stmt_to_plan(eval_stmt, context_provider).unwrap(); - let expected = String::from( + assert_eq!(plan.display_indent_schema().to_string(), expected); + } + + #[tokio::test] + async fn binary_op_literal_column() { + let query = r#"1 + some_metric{tag_0="bar"}"#; + let expected = String::from( "Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + some_metric.field_0 AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), Float64(1) + field_0:Float64;N]\ \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ @@ -1389,31 +1394,56 @@ mod test { \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" ); - assert_eq!(plan.display_indent_schema().to_string(), expected); + indie_query_plan_compare(query, expected).await; } - // TODO(ruihang): pure literal arithmetic is not supported yet. #[tokio::test] + #[ignore = "pure literal arithmetic is not supported yet"] async fn binary_op_literal_literal() { - let prom_expr = parser::parse(r#"1 + 1"#).unwrap(); - let eval_stmt = EvalStmt { - expr: prom_expr, - start: UNIX_EPOCH, - end: UNIX_EPOCH - .checked_add(Duration::from_secs(100_000)) - .unwrap(), - interval: Duration::from_secs(5), - lookback_delta: Duration::from_secs(1), - }; + let query = r#"1 + 1"#; + let expected = String::from(""); - let context_provider = build_test_context_provider("some_metric".to_string(), 1, 1).await; - let plan_result = PromPlanner::stmt_to_plan(eval_stmt, context_provider); - assert!(plan_result.is_err()); + indie_query_plan_compare(query, expected).await; + } + + #[tokio::test] + async fn simple_bool_grammar() { + let query = "some_metric != bool 1.2345"; + let expected = String::from( + "Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 != Float64(1.2345) AS Float64) AS field_0 != Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 != Float64(1.2345):Float64;N]\ + \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" + ); + + indie_query_plan_compare(query, expected).await; } - #[test] - fn bool_grammar() { - let prom_expr = parser::parse("demo_num_cpus + (1 != bool 2)").unwrap(); - println!("prom_expr: {:#?}", prom_expr); + #[tokio::test] + #[ignore = "pure literal arithmetic is not supported yet"] + async fn bool_with_additional_arithmetic() { + let query = "some_metric + (1 == bool 2)"; + let expected = String::from(""); + + indie_query_plan_compare(query, expected).await; + } + + #[tokio::test] + async fn simple_unary() { + let query = "-some_metric"; + let expected = String::from( + "Projection: some_metric.tag_0, some_metric.timestamp, (- some_metric.field_0) AS (- field_0) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), (- field_0):Float64;N]\ + \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" + ); + + indie_query_plan_compare(query, expected).await; } } From 9b693b2c9bf6a14deb4cb5d26de9328a6a654db7 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 21 Feb 2023 14:52:28 +0800 Subject: [PATCH 6/7] ignore nonexistence labels during planning Signed-off-by: Ruihang Xia --- src/promql/src/error.rs | 10 ---------- src/promql/src/planner.rs | 42 +++++++++++---------------------------- 2 files changed, 12 insertions(+), 40 deletions(-) diff --git a/src/promql/src/error.rs b/src/promql/src/error.rs index fb7fc36c7da4..b9483cb02e55 100644 --- a/src/promql/src/error.rs +++ b/src/promql/src/error.rs @@ -16,7 +16,6 @@ use std::any::Any; use common_error::prelude::*; use datafusion::error::DataFusionError; -use promql_parser::label::Label; use promql_parser::parser::{Expr as PromExpr, TokenType}; #[derive(Debug, Snafu)] @@ -49,13 +48,6 @@ pub enum Error { #[snafu(display("Cannot find value columns in table {}", table))] ValueNotFound { table: String, backtrace: Backtrace }, - #[snafu(display("Cannot find label {} in table {}", label, table,))] - LabelNotFound { - table: String, - label: Label, - backtrace: Backtrace, - }, - #[snafu(display("Cannot find the table {}", table))] TableNotFound { table: String, @@ -116,8 +108,6 @@ impl ErrorExt for Error { | EmptyRange { .. } => StatusCode::Internal, TableNotFound { .. } | TableNameNotFound { .. } => StatusCode::TableNotFound, - - LabelNotFound { .. } => StatusCode::TableColumnNotFound, } } fn backtrace_opt(&self) -> Option<&Backtrace> { diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 7e2fe290b17d..1f8b68506dd3 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -37,13 +37,13 @@ use promql_parser::parser::{ Expr as PromExpr, Function, MatrixSelector, NumberLiteral, Offset, ParenExpr, StringLiteral, SubqueryExpr, TokenType, UnaryExpr, VectorSelector, }; -use snafu::{ensure, OptionExt, ResultExt}; +use snafu::{OptionExt, ResultExt}; use table::table::adapter::DfTableProviderAdapter; use crate::error::{ - DataFusionPlanningSnafu, ExpectExprSnafu, LabelNotFoundSnafu, MultipleVectorSnafu, Result, - TableNameNotFoundSnafu, TableNotFoundSnafu, TimeIndexNotFoundSnafu, UnexpectedTokenSnafu, - UnknownTableSnafu, UnsupportedExprSnafu, ValueNotFoundSnafu, + DataFusionPlanningSnafu, ExpectExprSnafu, MultipleVectorSnafu, Result, TableNameNotFoundSnafu, + TableNotFoundSnafu, TimeIndexNotFoundSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, + UnsupportedExprSnafu, ValueNotFoundSnafu, }; use crate::extension_plan::{ InstantManipulate, Millisecond, RangeManipulate, SeriesDivide, SeriesNormalize, @@ -409,20 +409,10 @@ impl PromPlanner { AggModifier::By(labels) => { let mut exprs = Vec::with_capacity(labels.len()); for label in labels { - let field = input_schema - .field_with_unqualified_name(label) - .map_err(|_| { - LabelNotFoundSnafu { - table: self - .ctx - .table_name - .clone() - .unwrap_or("no_table_name".to_string()), - label: label.clone(), - } - .build() - })?; - exprs.push(DfExpr::Column(Column::from(field.name()))); + // nonexistence label will be ignored + if let Ok(field) = input_schema.field_with_unqualified_name(label) { + exprs.push(DfExpr::Column(Column::from(field.name()))); + } } // change the tag columns in context @@ -439,21 +429,13 @@ impl PromPlanner { .iter() .map(|f| f.name()) .collect::>(); + // remove "without"-ed fields + // nonexistence label will be ignored for label in labels { - ensure!( - // ensure this field was existed - all_fields.remove(label), - LabelNotFoundSnafu { - table: self - .ctx - .table_name - .clone() - .unwrap_or("no_table_name".to_string()), - label: label.clone(), - } - ); + all_fields.remove(label); } + // remove time index and value fields if let Some(time_index) = &self.ctx.time_index_column { all_fields.remove(time_index); From 8d5d12fe22e506dee11df5c9ee7aeac9970d320b Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 21 Feb 2023 14:55:02 +0800 Subject: [PATCH 7/7] fix clippy Signed-off-by: Ruihang Xia --- src/promql/src/planner.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 1f8b68506dd3..2bed42609469 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -767,15 +767,15 @@ impl PromPlanner { /// Check if the given op is a [comparison operator](https://prometheus.io/docs/prometheus/latest/querying/operators/#comparison-binary-operators). fn is_token_a_comparison_op(token: TokenType) -> bool { - match token.id() { + matches!( + token.id(), token::T_EQLC - | token::T_NEQ - | token::T_GTR - | token::T_LSS - | token::T_GTE - | token::T_LTE => true, - _ => false, - } + | token::T_NEQ + | token::T_GTR + | token::T_LSS + | token::T_GTE + | token::T_LTE + ) } /// Build a inner join on time index column and tag columns to concat two logical plans.