Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: decide tag column in log api follow table schema if table exists #5138

Merged
merged 4 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion src/frontend/src/instance/log_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ use servers::error::{
};
use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
use servers::query_handler::PipelineHandler;
use session::context::QueryContextRef;
use session::context::{QueryContext, QueryContextRef};
use snafu::ResultExt;
use table::Table;

use crate::instance::Instance;

Expand Down Expand Up @@ -84,6 +85,18 @@ impl PipelineHandler for Instance {
.await
.context(PipelineSnafu)
}

async fn get_table(
&self,
table: &str,
query_ctx: &QueryContext,
) -> std::result::Result<Option<Arc<Table>>, catalog::error::Error> {
let catalog = query_ctx.current_catalog();
let schema = query_ctx.current_schema();
self.catalog_manager
.table(catalog, &schema, table, None)
.await
}
}

impl Instance {
Expand Down
117 changes: 100 additions & 17 deletions src/pipeline/src/etl/transform/transformer/greptime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
pub mod coerce;

use std::collections::HashSet;
use std::sync::Arc;

use ahash::HashMap;
use api::helper::proto_value_type;
Expand Down Expand Up @@ -367,20 +368,15 @@ fn json_value_to_row(
Ok(Row { values: row })
}

/// Identity pipeline for Greptime
/// This pipeline will convert the input JSON array to Greptime Rows
/// 1. The pipeline will add a default timestamp column to the schema
/// 2. The pipeline not resolve NULL value
/// 3. The pipeline assumes that the json format is fixed
/// 4. The pipeline will return an error if the same column datatype is mismatched
/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema.
pub fn identity_pipeline(array: Vec<serde_json::Value>) -> Result<Rows> {
fn identity_pipeline_inner<'a>(
array: Vec<serde_json::Value>,
tag_column_names: Option<impl Iterator<Item = &'a String>>,
) -> Result<Rows> {
let mut rows = Vec::with_capacity(array.len());

let mut schema = SchemaInfo::default();
let mut schema_info = SchemaInfo::default();
for value in array {
if let serde_json::Value::Object(map) = value {
let row = json_value_to_row(&mut schema, map)?;
let row = json_value_to_row(&mut schema_info, map)?;
rows.push(row);
}
}
Expand All @@ -395,23 +391,57 @@ pub fn identity_pipeline(array: Vec<serde_json::Value>) -> Result<Rows> {
let ts = GreptimeValue {
value_data: Some(ValueData::TimestampNanosecondValue(ns)),
};
let column_count = schema.schema.len();
let column_count = schema_info.schema.len();
for row in rows.iter_mut() {
let diff = column_count - row.values.len();
for _ in 0..diff {
row.values.push(GreptimeValue { value_data: None });
}
row.values.push(ts.clone());
}
schema.schema.push(greptime_timestamp_schema);
schema_info.schema.push(greptime_timestamp_schema);

// set the semantic type of the row key column to Tag
if let Some(tag_column_names) = tag_column_names {
tag_column_names.for_each(|tag_column_name| {
if let Some(index) = schema_info.index.get(tag_column_name) {
schema_info.schema[*index].semantic_type = SemanticType::Tag as i32;
}
});
}
Ok(Rows {
schema: schema.schema,
schema: schema_info.schema,
rows,
})
}

/// Identity pipeline for Greptime
/// This pipeline will convert the input JSON array to Greptime Rows
/// params table is used to set the semantic type of the row key column to Tag
/// 1. The pipeline will add a default timestamp column to the schema
/// 2. The pipeline not resolve NULL value
/// 3. The pipeline assumes that the json format is fixed
/// 4. The pipeline will return an error if the same column datatype is mismatched
/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema.
pub fn identity_pipeline(
array: Vec<serde_json::Value>,
table: Option<Arc<table::Table>>,
) -> Result<Rows> {
match table {
Some(table) => {
let table_info = table.table_info();
let tag_column_names = table_info.meta.row_key_column_names();
identity_pipeline_inner(array, Some(tag_column_names))
}
None => identity_pipeline_inner(array, None::<std::iter::Empty<&String>>),
}
}

#[cfg(test)]
mod tests {
use api::v1::SemanticType;

use crate::etl::transform::transformer::greptime::identity_pipeline_inner;
use crate::identity_pipeline;

#[test]
Expand All @@ -437,7 +467,7 @@ mod tests {
"gaga": "gaga"
}),
];
let rows = identity_pipeline(array);
let rows = identity_pipeline(array, None);
assert!(rows.is_err());
assert_eq!(
rows.err().unwrap().to_string(),
Expand Down Expand Up @@ -465,7 +495,7 @@ mod tests {
"gaga": "gaga"
}),
];
let rows = identity_pipeline(array);
let rows = identity_pipeline(array, None);
assert!(rows.is_err());
assert_eq!(
rows.err().unwrap().to_string(),
Expand Down Expand Up @@ -493,13 +523,66 @@ mod tests {
"gaga": "gaga"
}),
];
let rows = identity_pipeline(array);
let rows = identity_pipeline(array, None);
assert!(rows.is_ok());
let rows = rows.unwrap();
assert_eq!(rows.schema.len(), 8);
assert_eq!(rows.rows.len(), 2);
assert_eq!(8, rows.rows[0].values.len());
assert_eq!(8, rows.rows[1].values.len());
}
{
let array = vec![
serde_json::json!({
"woshinull": null,
"name": "Alice",
"age": 20,
"is_student": true,
"score": 99.5,
"hobbies": "reading",
"address": "Beijing",
}),
serde_json::json!({
"name": "Bob",
"age": 21,
"is_student": false,
"score": 88.5,
"hobbies": "swimming",
"address": "Shanghai",
"gaga": "gaga"
}),
];
let tag_column_names = ["name".to_string(), "address".to_string()];
let rows = identity_pipeline_inner(array, Some(tag_column_names.iter()));
assert!(rows.is_ok());
let rows = rows.unwrap();
assert_eq!(rows.schema.len(), 8);
assert_eq!(rows.rows.len(), 2);
assert_eq!(8, rows.rows[0].values.len());
assert_eq!(8, rows.rows[1].values.len());
assert_eq!(
rows.schema
.iter()
.find(|x| x.column_name == "name")
.unwrap()
.semantic_type,
SemanticType::Tag as i32
);
assert_eq!(
rows.schema
.iter()
.find(|x| x.column_name == "address")
.unwrap()
.semantic_type,
SemanticType::Tag as i32
);
assert_eq!(
rows.schema
.iter()
.filter(|x| x.semantic_type == SemanticType::Tag as i32)
.count(),
2
);
}
}
}
13 changes: 9 additions & 4 deletions src/servers/src/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ use session::context::{Channel, QueryContext, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};

use crate::error::{
DecodeOtlpRequestSnafu, Error, InvalidParameterSnafu, ParseJson5Snafu, ParseJsonSnafu,
PipelineSnafu, Result, UnsupportedContentTypeSnafu,
CatalogSnafu, DecodeOtlpRequestSnafu, Error, InvalidParameterSnafu, ParseJson5Snafu,
ParseJsonSnafu, PipelineSnafu, Result, UnsupportedContentTypeSnafu,
};
use crate::http::extractor::LogTableName;
use crate::http::header::CONTENT_TYPE_PROTOBUF_STR;
Expand Down Expand Up @@ -612,10 +612,15 @@ async fn ingest_logs_inner(
let mut results = Vec::with_capacity(pipeline_data.len());
let transformed_data: Rows;
if pipeline_name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME {
let rows = pipeline::identity_pipeline(pipeline_data)
let table = state
.get_table(&table_name, &query_ctx)
.await
.context(CatalogSnafu)?;
let rows = pipeline::identity_pipeline(pipeline_data, table)
.context(PipelineTransformSnafu)
.context(PipelineSnafu)?;
transformed_data = rows;

transformed_data = rows
} else {
let pipeline = state
.get_pipeline(&pipeline_name, version, query_ctx.clone())
Expand Down
8 changes: 7 additions & 1 deletion src/servers/src/query_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequ
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion, PipelineWay};
use serde_json::Value;
use session::context::QueryContextRef;
use session::context::{QueryContext, QueryContextRef};

use crate::error::Result;
use crate::influxdb::InfluxdbRequest;
Expand Down Expand Up @@ -164,4 +164,10 @@ pub trait PipelineHandler {
version: PipelineVersion,
query_ctx: QueryContextRef,
) -> Result<Option<()>>;

async fn get_table(
&self,
table: &str,
query_ctx: &QueryContext,
) -> std::result::Result<Option<Arc<table::Table>>, catalog::error::Error>;
}
Loading