Skip to content

Commit

Permalink
chore: decide tag column in log api follow table schema if table exists
Browse files Browse the repository at this point in the history
  • Loading branch information
paomian committed Dec 11, 2024
1 parent 3d1b8c4 commit 5b0d152
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 17 deletions.
12 changes: 12 additions & 0 deletions src/frontend/src/instance/log_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
use servers::query_handler::PipelineHandler;
use session::context::QueryContextRef;
use snafu::ResultExt;
use table::Table;

use crate::instance::Instance;

Expand Down Expand Up @@ -84,6 +85,17 @@ impl PipelineHandler for Instance {
.await
.context(PipelineSnafu)
}

async fn get_table(
&self,
catalog: &str,
schema: &str,
table: &str,
) -> std::result::Result<Option<Arc<Table>>, catalog::error::Error> {
self.catalog_manager
.table(catalog, schema, table, None)
.await
}
}

impl Instance {
Expand Down
23 changes: 10 additions & 13 deletions src/pipeline/src/etl/transform/transformer/greptime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,13 +374,13 @@ fn json_value_to_row(
/// 3. The pipeline assumes that the json format is fixed
/// 4. The pipeline will return an error if the same column datatype is mismatched
/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema.
pub fn identity_pipeline(array: Vec<serde_json::Value>) -> Result<Rows> {
pub fn identity_pipeline(array: Vec<serde_json::Value>) -> Result<(SchemaInfo, Vec<Row>)> {
let mut rows = Vec::with_capacity(array.len());

let mut schema = SchemaInfo::default();
let mut schema_info = SchemaInfo::default();
for value in array {
if let serde_json::Value::Object(map) = value {
let row = json_value_to_row(&mut schema, map)?;
let row = json_value_to_row(&mut schema_info, map)?;
rows.push(row);
}
}
Expand All @@ -395,19 +395,16 @@ pub fn identity_pipeline(array: Vec<serde_json::Value>) -> Result<Rows> {
let ts = GreptimeValue {
value_data: Some(ValueData::TimestampNanosecondValue(ns)),
};
let column_count = schema.schema.len();
let column_count = schema_info.schema.len();
for row in rows.iter_mut() {
let diff = column_count - row.values.len();
for _ in 0..diff {
row.values.push(GreptimeValue { value_data: None });
}
row.values.push(ts.clone());
}
schema.schema.push(greptime_timestamp_schema);
Ok(Rows {
schema: schema.schema,
rows,
})
schema_info.schema.push(greptime_timestamp_schema);
Ok((schema_info, rows))
}

#[cfg(test)]
Expand Down Expand Up @@ -496,10 +493,10 @@ mod tests {
let rows = identity_pipeline(array);
assert!(rows.is_ok());
let rows = rows.unwrap();
assert_eq!(rows.schema.len(), 8);
assert_eq!(rows.rows.len(), 2);
assert_eq!(8, rows.rows[0].values.len());
assert_eq!(8, rows.rows[1].values.len());
assert_eq!(rows.0.schema.len(), 8);
assert_eq!(rows.1.len(), 2);
assert_eq!(8, rows.1[0].values.len());
assert_eq!(8, rows.1[1].values.len());
}
}
}
28 changes: 24 additions & 4 deletions src/servers/src/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ use session::context::{Channel, QueryContext, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};

use crate::error::{
DecodeOtlpRequestSnafu, Error, InvalidParameterSnafu, ParseJson5Snafu, ParseJsonSnafu,
PipelineSnafu, Result, UnsupportedContentTypeSnafu,
CatalogSnafu, DecodeOtlpRequestSnafu, Error, InvalidParameterSnafu, ParseJson5Snafu,
ParseJsonSnafu, PipelineSnafu, Result, UnsupportedContentTypeSnafu,
};
use crate::http::extractor::LogTableName;
use crate::http::header::CONTENT_TYPE_PROTOBUF_STR;
Expand Down Expand Up @@ -612,10 +612,30 @@ async fn ingest_logs_inner(
let mut results = Vec::with_capacity(pipeline_data.len());
let transformed_data: Rows;
if pipeline_name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME {
let rows = pipeline::identity_pipeline(pipeline_data)
let (mut schema_info, rows) = pipeline::identity_pipeline(pipeline_data)
.context(PipelineTransformSnafu)
.context(PipelineSnafu)?;
transformed_data = rows;
let catalog = query_ctx.current_catalog();
let schema = query_ctx.current_schema();
let table = state
.get_table(catalog, &schema, &table_name)
// FIXME: handle error
.await
.context(CatalogSnafu)?;

if let Some(table) = table {
let schema = &table.table_info().meta;
schema.row_key_column_names().for_each(|column_name| {
if let Some(index) = schema_info.index.get(column_name) {
schema_info.schema[*index].semantic_type = SemanticType::Tag as i32;
}
});
}

transformed_data = Rows {
rows,
schema: schema_info.schema,
};
} else {
let pipeline = state
.get_pipeline(&pipeline_name, version, query_ctx.clone())
Expand Down
7 changes: 7 additions & 0 deletions src/servers/src/query_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,11 @@ pub trait PipelineHandler {
version: PipelineVersion,
query_ctx: QueryContextRef,
) -> Result<Option<()>>;

async fn get_table(
&self,
catalog: &str,
schema: &str,
table: &str,
) -> std::result::Result<Option<Arc<table::Table>>, catalog::error::Error>;
}

0 comments on commit 5b0d152

Please sign in to comment.