From 0704eb6d232297cecc46c597cdb3440cb70aa9be Mon Sep 17 00:00:00 2001 From: paomian Date: Wed, 11 Dec 2024 11:52:28 +0800 Subject: [PATCH 1/4] chore: decide tag column in log api follow table schema if table exists --- src/frontend/src/instance/log_handler.rs | 12 +++++++ .../src/etl/transform/transformer/greptime.rs | 34 ++++++++++++++----- src/servers/src/http/event.rs | 15 +++++--- src/servers/src/query_handler.rs | 7 ++++ 4 files changed, 55 insertions(+), 13 deletions(-) diff --git a/src/frontend/src/instance/log_handler.rs b/src/frontend/src/instance/log_handler.rs index c3422066a387..6346efdab035 100644 --- a/src/frontend/src/instance/log_handler.rs +++ b/src/frontend/src/instance/log_handler.rs @@ -27,6 +27,7 @@ use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef}; use servers::query_handler::PipelineHandler; use session::context::QueryContextRef; use snafu::ResultExt; +use table::Table; use crate::instance::Instance; @@ -84,6 +85,17 @@ impl PipelineHandler for Instance { .await .context(PipelineSnafu) } + + async fn get_table( + &self, + catalog: &str, + schema: &str, + table: &str, + ) -> std::result::Result>, catalog::error::Error> { + self.catalog_manager + .table(catalog, schema, table, None) + .await + } } impl Instance { diff --git a/src/pipeline/src/etl/transform/transformer/greptime.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs index 3b43696b5ab7..99c750dc3ef8 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime.rs @@ -15,6 +15,7 @@ pub mod coerce; use std::collections::HashSet; +use std::sync::Arc; use ahash::HashMap; use api::helper::proto_value_type; @@ -374,13 +375,16 @@ fn json_value_to_row( /// 3. The pipeline assumes that the json format is fixed /// 4. The pipeline will return an error if the same column datatype is mismatched /// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema. -pub fn identity_pipeline(array: Vec) -> Result { +pub fn identity_pipeline( + array: Vec, + table: Option>, +) -> Result { let mut rows = Vec::with_capacity(array.len()); - let mut schema = SchemaInfo::default(); + let mut schema_info = SchemaInfo::default(); for value in array { if let serde_json::Value::Object(map) = value { - let row = json_value_to_row(&mut schema, map)?; + let row = json_value_to_row(&mut schema_info, map)?; rows.push(row); } } @@ -395,7 +399,7 @@ pub fn identity_pipeline(array: Vec) -> Result { let ts = GreptimeValue { value_data: Some(ValueData::TimestampNanosecondValue(ns)), }; - let column_count = schema.schema.len(); + let column_count = schema_info.schema.len(); for row in rows.iter_mut() { let diff = column_count - row.values.len(); for _ in 0..diff { @@ -403,9 +407,21 @@ pub fn identity_pipeline(array: Vec) -> Result { } row.values.push(ts.clone()); } - schema.schema.push(greptime_timestamp_schema); + schema_info.schema.push(greptime_timestamp_schema); + + // set the semantic type of the row key column to Tag + if let Some(table) = table { + let table_schema = &table.table_info().meta; + table_schema + .row_key_column_names() + .for_each(|tag_column_name| { + if let Some(index) = schema_info.index.get(tag_column_name) { + schema_info.schema[*index].semantic_type = SemanticType::Tag as i32; + } + }); + } Ok(Rows { - schema: schema.schema, + schema: schema_info.schema, rows, }) } @@ -437,7 +453,7 @@ mod tests { "gaga": "gaga" }), ]; - let rows = identity_pipeline(array); + let rows = identity_pipeline(array, None); assert!(rows.is_err()); assert_eq!( rows.err().unwrap().to_string(), @@ -465,7 +481,7 @@ mod tests { "gaga": "gaga" }), ]; - let rows = identity_pipeline(array); + let rows = identity_pipeline(array, None); assert!(rows.is_err()); assert_eq!( rows.err().unwrap().to_string(), @@ -493,7 +509,7 @@ mod tests { "gaga": "gaga" }), ]; - let rows = identity_pipeline(array); + let rows = identity_pipeline(array, None); assert!(rows.is_ok()); let rows = rows.unwrap(); assert_eq!(rows.schema.len(), 8); diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 69498c209ab4..7e8a0e6e2981 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -46,8 +46,8 @@ use session::context::{Channel, QueryContext, QueryContextRef}; use snafu::{ensure, OptionExt, ResultExt}; use crate::error::{ - DecodeOtlpRequestSnafu, Error, InvalidParameterSnafu, ParseJson5Snafu, ParseJsonSnafu, - PipelineSnafu, Result, UnsupportedContentTypeSnafu, + CatalogSnafu, DecodeOtlpRequestSnafu, Error, InvalidParameterSnafu, ParseJson5Snafu, + ParseJsonSnafu, PipelineSnafu, Result, UnsupportedContentTypeSnafu, }; use crate::http::extractor::LogTableName; use crate::http::header::CONTENT_TYPE_PROTOBUF_STR; @@ -612,10 +612,17 @@ async fn ingest_logs_inner( let mut results = Vec::with_capacity(pipeline_data.len()); let transformed_data: Rows; if pipeline_name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME { - let rows = pipeline::identity_pipeline(pipeline_data) + let catalog = query_ctx.current_catalog(); + let schema = query_ctx.current_schema(); + let table = state + .get_table(catalog, &schema, &table_name) + .await + .context(CatalogSnafu)?; + let rows = pipeline::identity_pipeline(pipeline_data, table) .context(PipelineTransformSnafu) .context(PipelineSnafu)?; - transformed_data = rows; + + transformed_data = rows } else { let pipeline = state .get_pipeline(&pipeline_name, version, query_ctx.clone()) diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index 58812e9350bc..8e841fa8af27 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -164,4 +164,11 @@ pub trait PipelineHandler { version: PipelineVersion, query_ctx: QueryContextRef, ) -> Result>; + + async fn get_table( + &self, + catalog: &str, + schema: &str, + table: &str, + ) -> std::result::Result>, catalog::error::Error>; } From 074079f6a7e63f1ea32b22160551205d4a0c5d93 Mon Sep 17 00:00:00 2001 From: paomian Date: Thu, 12 Dec 2024 15:14:48 +0800 Subject: [PATCH 2/4] chore: add more test for greptime_identity pipeline --- .../src/etl/transform/transformer/greptime.rs | 105 ++++++++++++++---- 1 file changed, 86 insertions(+), 19 deletions(-) diff --git a/src/pipeline/src/etl/transform/transformer/greptime.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs index 99c750dc3ef8..309f19cac321 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime.rs @@ -368,19 +368,11 @@ fn json_value_to_row( Ok(Row { values: row }) } -/// Identity pipeline for Greptime -/// This pipeline will convert the input JSON array to Greptime Rows -/// 1. The pipeline will add a default timestamp column to the schema -/// 2. The pipeline not resolve NULL value -/// 3. The pipeline assumes that the json format is fixed -/// 4. The pipeline will return an error if the same column datatype is mismatched -/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema. -pub fn identity_pipeline( +fn identity_pipeline_inner( array: Vec, - table: Option>, + tag_column_names: Option>, ) -> Result { let mut rows = Vec::with_capacity(array.len()); - let mut schema_info = SchemaInfo::default(); for value in array { if let serde_json::Value::Object(map) = value { @@ -410,15 +402,12 @@ pub fn identity_pipeline( schema_info.schema.push(greptime_timestamp_schema); // set the semantic type of the row key column to Tag - if let Some(table) = table { - let table_schema = &table.table_info().meta; - table_schema - .row_key_column_names() - .for_each(|tag_column_name| { - if let Some(index) = schema_info.index.get(tag_column_name) { - schema_info.schema[*index].semantic_type = SemanticType::Tag as i32; - } - }); + if let Some(tag_column_names) = tag_column_names { + tag_column_names.iter().for_each(|tag_column_name| { + if let Some(index) = schema_info.index.get(*tag_column_name) { + schema_info.schema[*index].semantic_type = SemanticType::Tag as i32; + } + }); } Ok(Rows { schema: schema_info.schema, @@ -426,8 +415,33 @@ pub fn identity_pipeline( }) } +/// Identity pipeline for Greptime +/// This pipeline will convert the input JSON array to Greptime Rows +/// params table is used to set the semantic type of the row key column to Tag +/// 1. The pipeline will add a default timestamp column to the schema +/// 2. The pipeline not resolve NULL value +/// 3. The pipeline assumes that the json format is fixed +/// 4. The pipeline will return an error if the same column datatype is mismatched +/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema. +pub fn identity_pipeline( + array: Vec, + table: Option>, +) -> Result { + match table { + Some(table) => { + let table_info = table.table_info(); + let tag_column_names = table_info.meta.row_key_column_names().collect(); + identity_pipeline_inner(array, Some(tag_column_names)) + } + None => identity_pipeline_inner(array, None), + } +} + #[cfg(test)] mod tests { + use api::v1::SemanticType; + + use crate::etl::transform::transformer::greptime::identity_pipeline_inner; use crate::identity_pipeline; #[test] @@ -517,5 +531,58 @@ mod tests { assert_eq!(8, rows.rows[0].values.len()); assert_eq!(8, rows.rows[1].values.len()); } + { + let array = vec![ + serde_json::json!({ + "woshinull": null, + "name": "Alice", + "age": 20, + "is_student": true, + "score": 99.5, + "hobbies": "reading", + "address": "Beijing", + }), + serde_json::json!({ + "name": "Bob", + "age": 21, + "is_student": false, + "score": 88.5, + "hobbies": "swimming", + "address": "Shanghai", + "gaga": "gaga" + }), + ]; + let tag_column_names = ["name".to_string(), "address".to_string()]; + let rows = identity_pipeline_inner(array, Some(tag_column_names.iter().collect())); + assert!(rows.is_ok()); + let rows = rows.unwrap(); + assert_eq!(rows.schema.len(), 8); + assert_eq!(rows.rows.len(), 2); + assert_eq!(8, rows.rows[0].values.len()); + assert_eq!(8, rows.rows[1].values.len()); + assert_eq!( + rows.schema + .iter() + .find(|x| x.column_name == "name") + .unwrap() + .semantic_type, + SemanticType::Tag as i32 + ); + assert_eq!( + rows.schema + .iter() + .find(|x| x.column_name == "address") + .unwrap() + .semantic_type, + SemanticType::Tag as i32 + ); + assert_eq!( + rows.schema + .iter() + .filter(|x| x.semantic_type == SemanticType::Tag as i32) + .count(), + 2 + ); + } } } From 58a9a15950e0b8b3023a94ceef74463c6312934e Mon Sep 17 00:00:00 2001 From: paomian Date: Thu, 12 Dec 2024 15:36:57 +0800 Subject: [PATCH 3/4] chore: change pipeline get_table function signature --- src/frontend/src/instance/log_handler.rs | 9 +++++---- src/servers/src/http/event.rs | 4 +--- src/servers/src/query_handler.rs | 5 ++--- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/src/frontend/src/instance/log_handler.rs b/src/frontend/src/instance/log_handler.rs index 6346efdab035..9ae782c7d4ab 100644 --- a/src/frontend/src/instance/log_handler.rs +++ b/src/frontend/src/instance/log_handler.rs @@ -25,7 +25,7 @@ use servers::error::{ }; use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef}; use servers::query_handler::PipelineHandler; -use session::context::QueryContextRef; +use session::context::{QueryContext, QueryContextRef}; use snafu::ResultExt; use table::Table; @@ -88,12 +88,13 @@ impl PipelineHandler for Instance { async fn get_table( &self, - catalog: &str, - schema: &str, table: &str, + query_ctx: &QueryContext, ) -> std::result::Result>, catalog::error::Error> { + let catalog = query_ctx.current_catalog(); + let schema = query_ctx.current_schema(); self.catalog_manager - .table(catalog, schema, table, None) + .table(catalog, &schema, table, None) .await } } diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 7e8a0e6e2981..5069db51975d 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -612,10 +612,8 @@ async fn ingest_logs_inner( let mut results = Vec::with_capacity(pipeline_data.len()); let transformed_data: Rows; if pipeline_name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME { - let catalog = query_ctx.current_catalog(); - let schema = query_ctx.current_schema(); let table = state - .get_table(catalog, &schema, &table_name) + .get_table(&table_name, &query_ctx) .await .context(CatalogSnafu)?; let rows = pipeline::identity_pipeline(pipeline_data, table) diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index 8e841fa8af27..96a01593a8f1 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -39,7 +39,7 @@ use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequ use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion, PipelineWay}; use serde_json::Value; -use session::context::QueryContextRef; +use session::context::{QueryContext, QueryContextRef}; use crate::error::Result; use crate::influxdb::InfluxdbRequest; @@ -167,8 +167,7 @@ pub trait PipelineHandler { async fn get_table( &self, - catalog: &str, - schema: &str, table: &str, + query_ctx: &QueryContext, ) -> std::result::Result>, catalog::error::Error>; } From 904433b8174f4e6c008fb91de3f9b5010ad6e73f Mon Sep 17 00:00:00 2001 From: paomian Date: Thu, 12 Dec 2024 16:45:30 +0800 Subject: [PATCH 4/4] chore: change identity_pipeline_inner tag_column_names type --- .../src/etl/transform/transformer/greptime.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/pipeline/src/etl/transform/transformer/greptime.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs index 309f19cac321..5d69a03ea23e 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime.rs @@ -368,9 +368,9 @@ fn json_value_to_row( Ok(Row { values: row }) } -fn identity_pipeline_inner( +fn identity_pipeline_inner<'a>( array: Vec, - tag_column_names: Option>, + tag_column_names: Option>, ) -> Result { let mut rows = Vec::with_capacity(array.len()); let mut schema_info = SchemaInfo::default(); @@ -403,8 +403,8 @@ fn identity_pipeline_inner( // set the semantic type of the row key column to Tag if let Some(tag_column_names) = tag_column_names { - tag_column_names.iter().for_each(|tag_column_name| { - if let Some(index) = schema_info.index.get(*tag_column_name) { + tag_column_names.for_each(|tag_column_name| { + if let Some(index) = schema_info.index.get(tag_column_name) { schema_info.schema[*index].semantic_type = SemanticType::Tag as i32; } }); @@ -430,10 +430,10 @@ pub fn identity_pipeline( match table { Some(table) => { let table_info = table.table_info(); - let tag_column_names = table_info.meta.row_key_column_names().collect(); + let tag_column_names = table_info.meta.row_key_column_names(); identity_pipeline_inner(array, Some(tag_column_names)) } - None => identity_pipeline_inner(array, None), + None => identity_pipeline_inner(array, None::>), } } @@ -553,7 +553,7 @@ mod tests { }), ]; let tag_column_names = ["name".to_string(), "address".to_string()]; - let rows = identity_pipeline_inner(array, Some(tag_column_names.iter().collect())); + let rows = identity_pipeline_inner(array, Some(tag_column_names.iter())); assert!(rows.is_ok()); let rows = rows.unwrap(); assert_eq!(rows.schema.len(), 8);