Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: decide tag column in log api follow table schema if table exists #5138

Merged
merged 4 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/frontend/src/instance/log_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
use servers::query_handler::PipelineHandler;
use session::context::QueryContextRef;
use snafu::ResultExt;
use table::Table;

use crate::instance::Instance;

Expand Down Expand Up @@ -84,6 +85,17 @@ impl PipelineHandler for Instance {
.await
.context(PipelineSnafu)
}

async fn get_table(
&self,
catalog: &str,
schema: &str,
table: &str,
) -> std::result::Result<Option<Arc<Table>>, catalog::error::Error> {
self.catalog_manager
.table(catalog, schema, table, None)
.await
}
}

impl Instance {
Expand Down
34 changes: 25 additions & 9 deletions src/pipeline/src/etl/transform/transformer/greptime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
pub mod coerce;

use std::collections::HashSet;
use std::sync::Arc;

use ahash::HashMap;
use api::helper::proto_value_type;
Expand Down Expand Up @@ -374,13 +375,16 @@ fn json_value_to_row(
/// 3. The pipeline assumes that the json format is fixed
/// 4. The pipeline will return an error if the same column datatype is mismatched
/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema.
pub fn identity_pipeline(array: Vec<serde_json::Value>) -> Result<Rows> {
pub fn identity_pipeline(
array: Vec<serde_json::Value>,
table: Option<Arc<table::Table>>,
) -> Result<Rows> {
let mut rows = Vec::with_capacity(array.len());

let mut schema = SchemaInfo::default();
let mut schema_info = SchemaInfo::default();
for value in array {
if let serde_json::Value::Object(map) = value {
let row = json_value_to_row(&mut schema, map)?;
let row = json_value_to_row(&mut schema_info, map)?;
rows.push(row);
}
}
Expand All @@ -395,17 +399,29 @@ pub fn identity_pipeline(array: Vec<serde_json::Value>) -> Result<Rows> {
let ts = GreptimeValue {
value_data: Some(ValueData::TimestampNanosecondValue(ns)),
};
let column_count = schema.schema.len();
let column_count = schema_info.schema.len();
for row in rows.iter_mut() {
let diff = column_count - row.values.len();
for _ in 0..diff {
row.values.push(GreptimeValue { value_data: None });
}
row.values.push(ts.clone());
}
schema.schema.push(greptime_timestamp_schema);
schema_info.schema.push(greptime_timestamp_schema);

// set the semantic type of the row key column to Tag
if let Some(table) = table {
let table_schema = &table.table_info().meta;
table_schema
.row_key_column_names()
.for_each(|tag_column_name| {
if let Some(index) = schema_info.index.get(tag_column_name) {
schema_info.schema[*index].semantic_type = SemanticType::Tag as i32;
}
});
}
Ok(Rows {
schema: schema.schema,
schema: schema_info.schema,
rows,
})
}
Expand Down Expand Up @@ -437,7 +453,7 @@ mod tests {
"gaga": "gaga"
}),
];
let rows = identity_pipeline(array);
let rows = identity_pipeline(array, None);
assert!(rows.is_err());
assert_eq!(
rows.err().unwrap().to_string(),
Expand Down Expand Up @@ -465,7 +481,7 @@ mod tests {
"gaga": "gaga"
}),
];
let rows = identity_pipeline(array);
let rows = identity_pipeline(array, None);
assert!(rows.is_err());
assert_eq!(
rows.err().unwrap().to_string(),
Expand Down Expand Up @@ -493,7 +509,7 @@ mod tests {
"gaga": "gaga"
}),
];
let rows = identity_pipeline(array);
let rows = identity_pipeline(array, None);
assert!(rows.is_ok());
let rows = rows.unwrap();
assert_eq!(rows.schema.len(), 8);
Expand Down
15 changes: 11 additions & 4 deletions src/servers/src/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ use session::context::{Channel, QueryContext, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};

use crate::error::{
DecodeOtlpRequestSnafu, Error, InvalidParameterSnafu, ParseJson5Snafu, ParseJsonSnafu,
PipelineSnafu, Result, UnsupportedContentTypeSnafu,
CatalogSnafu, DecodeOtlpRequestSnafu, Error, InvalidParameterSnafu, ParseJson5Snafu,
ParseJsonSnafu, PipelineSnafu, Result, UnsupportedContentTypeSnafu,
};
use crate::http::extractor::LogTableName;
use crate::http::header::CONTENT_TYPE_PROTOBUF_STR;
Expand Down Expand Up @@ -612,10 +612,17 @@ async fn ingest_logs_inner(
let mut results = Vec::with_capacity(pipeline_data.len());
let transformed_data: Rows;
if pipeline_name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME {
let rows = pipeline::identity_pipeline(pipeline_data)
let catalog = query_ctx.current_catalog();
let schema = query_ctx.current_schema();
let table = state
.get_table(catalog, &schema, &table_name)
.await
.context(CatalogSnafu)?;
let rows = pipeline::identity_pipeline(pipeline_data, table)
.context(PipelineTransformSnafu)
.context(PipelineSnafu)?;
transformed_data = rows;

transformed_data = rows
} else {
let pipeline = state
.get_pipeline(&pipeline_name, version, query_ctx.clone())
Expand Down
7 changes: 7 additions & 0 deletions src/servers/src/query_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,11 @@ pub trait PipelineHandler {
version: PipelineVersion,
query_ctx: QueryContextRef,
) -> Result<Option<()>>;

async fn get_table(
&self,
catalog: &str,
schema: &str,
table: &str,
) -> std::result::Result<Option<Arc<table::Table>>, catalog::error::Error>;
}
Loading