Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support unary, paren, bool keyword and nonexistent metric/label in PromQL #1049

Merged
merged 7 commits into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 4 additions & 12 deletions src/promql/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::any::Any;

use common_error::prelude::*;
use datafusion::error::DataFusionError;
use promql_parser::label::Label;
use promql_parser::parser::{Expr as PromExpr, TokenType};

#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -49,13 +48,6 @@ pub enum Error {
#[snafu(display("Cannot find value columns in table {}", table))]
ValueNotFound { table: String, backtrace: Backtrace },

#[snafu(display("Cannot find label {} in table {}", label, table,))]
LabelNotFound {
table: String,
label: Label,
backtrace: Backtrace,
},

#[snafu(display("Cannot find the table {}", table))]
TableNotFound {
table: String,
Expand Down Expand Up @@ -107,15 +99,15 @@ impl ErrorExt for Error {
| UnsupportedExpr { .. }
| UnexpectedToken { .. }
| MultipleVector { .. }
| LabelNotFound { .. }
| ExpectExpr { .. } => StatusCode::InvalidArguments,

UnknownTable { .. }
| TableNotFound { .. }
| DataFusionPlanning { .. }
| UnexpectedPlanExpr { .. }
| IllegalRange { .. }
| EmptyRange { .. }
| TableNameNotFound { .. } => StatusCode::Internal,
| EmptyRange { .. } => StatusCode::Internal,

TableNotFound { .. } | TableNameNotFound { .. } => StatusCode::TableNotFound,
}
}
fn backtrace_opt(&self) -> Option<&Backtrace> {
Expand Down
194 changes: 129 additions & 65 deletions src/promql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,28 @@ use datafusion::datasource::DefaultTableSource;
use datafusion::logical_expr::expr::AggregateFunction;
use datafusion::logical_expr::expr_rewriter::normalize_cols;
use datafusion::logical_expr::{
AggregateFunction as AggregateFunctionEnum, BinaryExpr, BuiltinScalarFunction, Extension,
AggregateFunction as AggregateFunctionEnum, BinaryExpr, BuiltinScalarFunction, Cast, Extension,
LogicalPlan, LogicalPlanBuilder, Operator,
};
use datafusion::optimizer::utils;
use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
use datafusion::scalar::ScalarValue;
use datafusion::sql::planner::ContextProvider;
use datafusion::sql::TableReference;
use datatypes::arrow::datatypes::DataType as ArrowDataType;
use promql_parser::label::{MatchOp, Matchers, METRIC_NAME};
use promql_parser::parser::{
token, AggModifier, AggregateExpr, BinaryExpr as PromBinaryExpr, Call, EvalStmt,
Expr as PromExpr, Function, MatrixSelector, NumberLiteral, Offset, ParenExpr, StringLiteral,
SubqueryExpr, TokenType, UnaryExpr, VectorSelector,
};
use snafu::{ensure, OptionExt, ResultExt};
use snafu::{OptionExt, ResultExt};
use table::table::adapter::DfTableProviderAdapter;

use crate::error::{
DataFusionPlanningSnafu, ExpectExprSnafu, LabelNotFoundSnafu, MultipleVectorSnafu, Result,
TableNameNotFoundSnafu, TableNotFoundSnafu, TimeIndexNotFoundSnafu, UnexpectedTokenSnafu,
UnknownTableSnafu, UnsupportedExprSnafu, ValueNotFoundSnafu,
DataFusionPlanningSnafu, ExpectExprSnafu, MultipleVectorSnafu, Result, TableNameNotFoundSnafu,
TableNotFoundSnafu, TimeIndexNotFoundSnafu, UnexpectedTokenSnafu, UnknownTableSnafu,
UnsupportedExprSnafu, ValueNotFoundSnafu,
};
use crate::extension_plan::{
InstantManipulate, Millisecond, RangeManipulate, SeriesDivide, SeriesNormalize,
Expand Down Expand Up @@ -129,11 +130,25 @@ impl<S: ContextProvider> PromPlanner<S> {
.build()
.context(DataFusionPlanningSnafu)?
}
PromExpr::Unary(UnaryExpr { .. }) => UnsupportedExprSnafu {
name: "Prom Unary Expr",
PromExpr::Unary(UnaryExpr { expr }) => {
// Unary Expr in PromQL implys the `-` operator
let input = self.prom_expr_to_plan(*expr.clone())?;
self.projection_for_each_value_column(input, |col| {
Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
})?
}
.fail()?,
PromExpr::Binary(PromBinaryExpr { lhs, rhs, op, .. }) => {
PromExpr::Binary(PromBinaryExpr {
lhs,
rhs,
op,
modifier,
}) => {
let should_cast_to_bool = if let Some(modifier) = modifier {
modifier.return_bool && Self::is_token_a_comparison_op(*op)
} else {
false
};

match (
Self::try_build_literal_expr(lhs),
Self::try_build_literal_expr(rhs),
Expand All @@ -147,22 +162,36 @@ impl<S: ContextProvider> PromPlanner<S> {
(Some(expr), None) => {
let input = self.prom_expr_to_plan(*rhs.clone())?;
self.projection_for_each_value_column(input, |col| {
Ok(DfExpr::BinaryExpr(BinaryExpr {
let mut binary_expr = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(expr.clone()),
op: Self::prom_token_to_binary_op(*op)?,
right: Box::new(DfExpr::Column(col.into())),
}))
});
if should_cast_to_bool {
binary_expr = DfExpr::Cast(Cast {
expr: Box::new(binary_expr),
data_type: ArrowDataType::Float64,
});
}
Ok(binary_expr)
})?
}
// lhs is a column, rhs is a literal
(None, Some(expr)) => {
let input = self.prom_expr_to_plan(*lhs.clone())?;
self.projection_for_each_value_column(input, |col| {
Ok(DfExpr::BinaryExpr(BinaryExpr {
let mut binary_expr = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(DfExpr::Column(col.into())),
op: Self::prom_token_to_binary_op(*op)?,
right: Box::new(expr.clone()),
}))
});
if should_cast_to_bool {
binary_expr = DfExpr::Cast(Cast {
expr: Box::new(binary_expr),
data_type: ArrowDataType::Float64,
});
}
Ok(binary_expr)
})?
}
// both are columns. join them on time index
Expand Down Expand Up @@ -190,19 +219,23 @@ impl<S: ContextProvider> PromPlanner<S> {
.context(DataFusionPlanningSnafu)?
.qualified_column();

Ok(DfExpr::BinaryExpr(BinaryExpr {
let mut binary_expr = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(DfExpr::Column(left_col)),
op: Self::prom_token_to_binary_op(*op)?,
right: Box::new(DfExpr::Column(right_col)),
}))
});
if should_cast_to_bool {
binary_expr = DfExpr::Cast(Cast {
expr: Box::new(binary_expr),
data_type: ArrowDataType::Float64,
});
}
Ok(binary_expr)
})?
}
}
}
PromExpr::Paren(ParenExpr { .. }) => UnsupportedExprSnafu {
name: "Prom Paren Expr",
}
.fail()?,
PromExpr::Paren(ParenExpr { expr }) => self.prom_expr_to_plan(*expr.clone())?,
PromExpr::Subquery(SubqueryExpr { .. }) => UnsupportedExprSnafu {
name: "Prom Subquery",
}
Expand Down Expand Up @@ -376,20 +409,10 @@ impl<S: ContextProvider> PromPlanner<S> {
AggModifier::By(labels) => {
let mut exprs = Vec::with_capacity(labels.len());
for label in labels {
let field = input_schema
.field_with_unqualified_name(label)
.map_err(|_| {
LabelNotFoundSnafu {
table: self
.ctx
.table_name
.clone()
.unwrap_or("no_table_name".to_string()),
label: label.clone(),
}
.build()
})?;
exprs.push(DfExpr::Column(Column::from(field.name())));
// nonexistence label will be ignored
if let Ok(field) = input_schema.field_with_unqualified_name(label) {
exprs.push(DfExpr::Column(Column::from(field.name())));
}
}

// change the tag columns in context
Expand All @@ -406,21 +429,13 @@ impl<S: ContextProvider> PromPlanner<S> {
.iter()
.map(|f| f.name())
.collect::<BTreeSet<_>>();

// remove "without"-ed fields
// nonexistence label will be ignored
for label in labels {
ensure!(
// ensure this field was existed
all_fields.remove(label),
LabelNotFoundSnafu {
table: self
.ctx
.table_name
.clone()
.unwrap_or("no_table_name".to_string()),
label: label.clone(),
}
);
all_fields.remove(label);
}

// remove time index and value fields
if let Some(time_index) = &self.ctx.time_index_column {
all_fields.remove(time_index);
Expand Down Expand Up @@ -495,7 +510,7 @@ impl<S: ContextProvider> PromPlanner<S> {
let table = self
.schema_provider
.get_table_provider(TableReference::Bare { table: &table_name })
.context(DataFusionPlanningSnafu)?
.context(TableNotFoundSnafu { table: &table_name })?
.as_any()
.downcast_ref::<DefaultTableSource>()
.context(UnknownTableSnafu)?
Expand Down Expand Up @@ -750,6 +765,19 @@ impl<S: ContextProvider> PromPlanner<S> {
}
}

/// Check if the given op is a [comparison operator](https://prometheus.io/docs/prometheus/latest/querying/operators/#comparison-binary-operators).
fn is_token_a_comparison_op(token: TokenType) -> bool {
matches!(
token.id(),
token::T_EQLC
| token::T_NEQ
| token::T_GTR
| token::T_LSS
| token::T_GTE
| token::T_LTE
)
}

/// Build a inner join on time index column and tag columns to concat two logical plans.
/// The left plan will be alised as [`LEFT_PLAN_JOIN_ALIAS`].
fn join_on_non_value_columns(
Expand Down Expand Up @@ -1317,9 +1345,8 @@ mod test {
assert_eq!(plan.display_indent_schema().to_string(), expected);
}

#[tokio::test]
async fn binary_op_literal_column() {
let prom_expr = parser::parse(r#"1 + some_metric{tag_0="bar"}"#).unwrap();
async fn indie_query_plan_compare(query: &str, expected: String) {
let prom_expr = parser::parse(query).unwrap();
let eval_stmt = EvalStmt {
expr: prom_expr,
start: UNIX_EPOCH,
Expand All @@ -1333,7 +1360,13 @@ mod test {
let context_provider = build_test_context_provider("some_metric".to_string(), 1, 1).await;
let plan = PromPlanner::stmt_to_plan(eval_stmt, context_provider).unwrap();

let expected = String::from(
assert_eq!(plan.display_indent_schema().to_string(), expected);
}

#[tokio::test]
async fn binary_op_literal_column() {
let query = r#"1 + some_metric{tag_0="bar"}"#;
let expected = String::from(
"Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + some_metric.field_0 AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), Float64(1) + field_0:Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
Expand All @@ -1343,25 +1376,56 @@ mod test {
\n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
);

assert_eq!(plan.display_indent_schema().to_string(), expected);
indie_query_plan_compare(query, expected).await;
}

// TODO(ruihang): pure literal arithmetic is not supported yet.
#[tokio::test]
#[ignore = "pure literal arithmetic is not supported yet"]
async fn binary_op_literal_literal() {
let prom_expr = parser::parse(r#"1 + 1"#).unwrap();
let eval_stmt = EvalStmt {
expr: prom_expr,
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
};
let query = r#"1 + 1"#;
let expected = String::from("");

let context_provider = build_test_context_provider("some_metric".to_string(), 1, 1).await;
let plan_result = PromPlanner::stmt_to_plan(eval_stmt, context_provider);
assert!(plan_result.is_err());
indie_query_plan_compare(query, expected).await;
}

#[tokio::test]
async fn simple_bool_grammar() {
let query = "some_metric != bool 1.2345";
let expected = String::from(
"Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 != Float64(1.2345) AS Float64) AS field_0 != Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 != Float64(1.2345):Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
);

indie_query_plan_compare(query, expected).await;
}

#[tokio::test]
#[ignore = "pure literal arithmetic is not supported yet"]
async fn bool_with_additional_arithmetic() {
let query = "some_metric + (1 == bool 2)";
let expected = String::from("");

indie_query_plan_compare(query, expected).await;
}

#[tokio::test]
async fn simple_unary() {
let query = "-some_metric";
let expected = String::from(
"Projection: some_metric.tag_0, some_metric.timestamp, (- some_metric.field_0) AS (- field_0) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), (- field_0):Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
);

indie_query_plan_compare(query, expected).await;
}
}
18 changes: 17 additions & 1 deletion src/servers/src/promql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use axum::body::BoxBody;
use axum::extract::{Query, State};
use axum::{routing, Form, Json, Router};
use common_error::prelude::ErrorExt;
use common_error::status_code::StatusCode;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::info;
Expand Down Expand Up @@ -216,7 +217,22 @@ impl PromqlJsonResponse {
json
};

response.unwrap_or_else(|err| Self::error(err.status_code().to_string(), err.to_string()))
match response {
Ok(resp) => resp,
Err(err) => {
// Prometheus won't report error if querying nonexist label and metric
if err.status_code() == StatusCode::TableNotFound
|| err.status_code() == StatusCode::TableColumnNotFound
{
Self::success(PromqlData {
result_type: "matrix".to_string(),
..Default::default()
})
} else {
Self::error(err.status_code().to_string(), err.to_string())
}
}
}
}

fn record_batches_to_data(batches: RecordBatches, metric_name: String) -> Result<PromqlData> {
Expand Down