Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(otlp): replace otlp trace attr type from string to jsonb #4918

Merged
merged 9 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions src/frontend/src/instance/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use pipeline::PipelineWay;
use servers::error::{self, AuthSnafu, Result as ServerResult};
use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
use servers::otlp;
use servers::otlp::plugin::TraceParserRef;
use servers::query_handler::OpenTelemetryProtocolHandler;
use session::context::QueryContextRef;
use snafu::ResultExt;
Expand Down Expand Up @@ -64,6 +63,7 @@ impl OpenTelemetryProtocolHandler for Instance {
async fn traces(
&self,
request: ExportTraceServiceRequest,
table_name: String,
ctx: QueryContextRef,
) -> ServerResult<Output> {
self.plugins
Expand All @@ -77,13 +77,7 @@ impl OpenTelemetryProtocolHandler for Instance {
.get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_execute(ctx.clone())?;

let (table_name, spans) = match self.plugins.get::<TraceParserRef>() {
Some(parser) => (parser.table_name(), parser.parse(request)),
None => (
otlp::trace::TRACE_TABLE_NAME.to_string(),
otlp::trace::parse(request),
),
};
let spans = otlp::trace::parse(request);

let (requests, rows) = otlp::trace::to_grpc_insert_requests(table_name, spans)?;

Expand Down
11 changes: 10 additions & 1 deletion src/servers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,14 @@ pub enum Error {
location: Location,
},

#[snafu(display("Invalid table name"))]
InvalidTableName {
#[snafu(source)]
error: tonic::metadata::errors::ToStrError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to initialize a watcher for file {}", path))]
FileWatch {
path: String,
Expand Down Expand Up @@ -620,7 +628,8 @@ impl ErrorExt for Error {
| UnsupportedContentType { .. }
| TimestampOverflow { .. }
| OpenTelemetryLog { .. }
| UnsupportedJsonDataTypeForTag { .. } => StatusCode::InvalidArguments,
| UnsupportedJsonDataTypeForTag { .. }
| InvalidTableName { .. } => StatusCode::InvalidArguments,

Catalog { source, .. } => source.status_code(),
RowWriter { source, .. } => source.status_code(),
Expand Down
16 changes: 13 additions & 3 deletions src/servers/src/grpc/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ use opentelemetry_proto::tonic::collector::trace::v1::{
ExportTraceServiceRequest, ExportTraceServiceResponse,
};
use session::context::{Channel, QueryContext};
use snafu::OptionExt;
use snafu::{OptionExt, ResultExt};
use tonic::{Request, Response, Status};

use crate::error;
use crate::http::header::constants::GREPTIME_TRACE_TABLE_NAME_HEADER_NAME;
use crate::otlp::trace::TRACE_TABLE_NAME;
use crate::query_handler::OpenTelemetryProtocolHandlerRef;

pub struct OtlpService {
Expand All @@ -46,7 +48,15 @@ impl TraceService for OtlpService {
&self,
request: Request<ExportTraceServiceRequest>,
) -> StdResult<Response<ExportTraceServiceResponse>, Status> {
let (_headers, extensions, req) = request.into_parts();
let (headers, extensions, req) = request.into_parts();

let table_name = match headers.get(GREPTIME_TRACE_TABLE_NAME_HEADER_NAME) {
Some(table_name) => table_name
.to_str()
.context(error::InvalidTableNameSnafu)?
.to_string(),
None => TRACE_TABLE_NAME.to_string(),
};

let mut ctx = extensions
.get::<QueryContext>()
Expand All @@ -55,7 +65,7 @@ impl TraceService for OtlpService {
ctx.set_channel(Channel::Otlp);
let ctx = Arc::new(ctx);

let _ = self.handler.traces(req, ctx).await?;
let _ = self.handler.traces(req, table_name, ctx).await?;

Ok(Response::new(ExportTraceServiceResponse {
partial_success: None,
Expand Down
1 change: 1 addition & 0 deletions src/servers/src/http/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub mod constants {
pub const GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME: &str = "x-greptime-log-pipeline-version";
pub const GREPTIME_LOG_TABLE_NAME_HEADER_NAME: &str = "x-greptime-log-table-name";
pub const GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME: &str = "x-greptime-log-extract-keys";
pub const GREPTIME_TRACE_TABLE_NAME_HEADER_NAME: &str = "x-greptime-trace-table-name";
}

pub static GREPTIME_DB_HEADER_FORMAT: HeaderName =
Expand Down
109 changes: 68 additions & 41 deletions src/servers/src/http/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use axum::response::IntoResponse;
use axum::{async_trait, Extension};
use bytes::Bytes;
use common_telemetry::tracing;
use http::HeaderMap;
use opentelemetry_proto::tonic::collector::logs::v1::{
ExportLogsServiceRequest, ExportLogsServiceResponse,
};
Expand All @@ -41,11 +42,13 @@ use snafu::prelude::*;

use super::header::constants::GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME;
use super::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF};
use crate::error::{self, PipelineSnafu, Result};
use crate::error::{self, InvalidUtf8ValueSnafu, PipelineSnafu, Result};
use crate::http::header::constants::{
GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME,
GREPTIME_LOG_TABLE_NAME_HEADER_NAME,
GREPTIME_LOG_TABLE_NAME_HEADER_NAME, GREPTIME_TRACE_TABLE_NAME_HEADER_NAME,
};
use crate::otlp::logs::LOG_TABLE_NAME;
use crate::otlp::trace::TRACE_TABLE_NAME;
use crate::query_handler::OpenTelemetryProtocolHandlerRef;

#[axum_macros::debug_handler]
Expand Down Expand Up @@ -80,10 +83,18 @@ pub async fn metrics(
#[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "traces"))]
pub async fn traces(
State(handler): State<OpenTelemetryProtocolHandlerRef>,
header: HeaderMap,
Extension(mut query_ctx): Extension<QueryContext>,
bytes: Bytes,
) -> Result<OtlpResponse<ExportTraceServiceResponse>> {
let db = query_ctx.get_db_string();
let table_name = extract_string_value_from_header(
&header,
GREPTIME_TRACE_TABLE_NAME_HEADER_NAME,
Some(TRACE_TABLE_NAME),
)?
// safety here, we provide default value for table_name
.unwrap();
query_ctx.set_channel(Channel::Otlp);
let query_ctx = Arc::new(query_ctx);
let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED
Expand All @@ -92,7 +103,7 @@ pub async fn traces(
let request =
ExportTraceServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?;
handler
.traces(request, query_ctx)
.traces(request, table_name, query_ctx)
.await
.map(|o| OtlpResponse {
resp_body: ExportTraceServiceResponse {
Expand All @@ -107,17 +118,31 @@ pub struct PipelineInfo {
pub pipeline_version: Option<String>,
}

fn pipeline_header_error(
header: &HeaderValue,
key: &str,
) -> StdResult<String, (http::StatusCode, String)> {
let header_utf8 = str::from_utf8(header.as_bytes());
match header_utf8 {
Ok(s) => Ok(s.to_string()),
Err(_) => Err((
fn parse_header_value_to_string(header: &HeaderValue) -> Result<String> {
String::from_utf8(header.as_bytes().to_vec()).context(InvalidUtf8ValueSnafu)
}

fn extract_string_value_from_header(
headers: &HeaderMap,
header: &str,
default_table_name: Option<&str>,
) -> Result<Option<String>> {
let table_name = headers.get(header);
match table_name {
Some(name) => parse_header_value_to_string(name).map(Some),
None => match default_table_name {
Some(name) => Ok(Some(name.to_string())),
None => Ok(None),
},
}
}

fn utf8_error(header_name: &str) -> impl Fn(error::Error) -> (StatusCode, String) + use<'_> {
move |_| {
(
StatusCode::BAD_REQUEST,
format!("`{}` header is not valid UTF-8 string type.", key),
)),
format!("`{}` header is not valid UTF-8 string type.", header_name),
)
}
}

Expand All @@ -129,28 +154,27 @@ where
type Rejection = (StatusCode, String);

async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult<Self, Self::Rejection> {
let pipeline_name = parts.headers.get(GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME);
let pipeline_version = parts.headers.get(GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME);
let headers = &parts.headers;
let pipeline_name =
extract_string_value_from_header(headers, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, None)
.map_err(utf8_error(GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME))?;
let pipeline_version = extract_string_value_from_header(
headers,
GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME,
None,
)
.map_err(utf8_error(GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME))?;
match (pipeline_name, pipeline_version) {
(Some(name), Some(version)) => Ok(PipelineInfo {
pipeline_name: Some(pipeline_header_error(
name,
GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME,
)?),
pipeline_version: Some(pipeline_header_error(
version,
GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME,
)?),
pipeline_name: Some(name),
pipeline_version: Some(version),
}),
(None, _) => Ok(PipelineInfo {
pipeline_name: None,
pipeline_version: None,
}),
(Some(name), None) => Ok(PipelineInfo {
pipeline_name: Some(pipeline_header_error(
name,
GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME,
)?),
pipeline_name: Some(name),
pipeline_version: None,
}),
}
Expand All @@ -169,16 +193,16 @@ where
type Rejection = (StatusCode, String);

async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult<Self, Self::Rejection> {
let table_name = parts.headers.get(GREPTIME_LOG_TABLE_NAME_HEADER_NAME);
let table_name = extract_string_value_from_header(
&parts.headers,
GREPTIME_LOG_TABLE_NAME_HEADER_NAME,
Some(LOG_TABLE_NAME),
)
.map_err(utf8_error(GREPTIME_LOG_TABLE_NAME_HEADER_NAME))?
// safety here, we provide default value for table_name
.unwrap();

match table_name {
Some(name) => Ok(TableInfo {
table_name: pipeline_header_error(name, GREPTIME_LOG_TABLE_NAME_HEADER_NAME)?,
}),
None => Ok(TableInfo {
table_name: "opentelemetry_logs".to_string(),
}),
}
Ok(TableInfo { table_name })
}
}

Expand All @@ -192,16 +216,19 @@ where
type Rejection = (StatusCode, String);

async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult<Self, Self::Rejection> {
let select = parts.headers.get(GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME);
let select = extract_string_value_from_header(
&parts.headers,
GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME,
None,
)
.map_err(utf8_error(GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME))?;

match select {
Some(name) => {
let select_header =
pipeline_header_error(name, GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME)?;
if select_header.is_empty() {
if name.is_empty() {
Ok(SelectInfoWrapper(Default::default()))
} else {
Ok(SelectInfoWrapper(SelectInfo::from(select_header)))
Ok(SelectInfoWrapper(SelectInfo::from(name)))
}
}
None => Ok(SelectInfoWrapper(Default::default())),
Expand Down
2 changes: 1 addition & 1 deletion src/servers/src/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@

pub mod logs;
pub mod metrics;
pub mod plugin;
pub mod trace;
mod utils;
41 changes: 3 additions & 38 deletions src/servers/src/otlp/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ use pipeline::{Array, Map, PipelineWay, SchemaInfo, SelectInfo, Value as Pipelin
use snafu::{ensure, ResultExt};

use super::trace::attributes::OtlpAnyValue;
use super::utils::{bytes_to_hex_string, key_value_to_jsonb};
use crate::error::{
IncompatibleSchemaSnafu, OpenTelemetryLogSnafu, Result, UnsupportedJsonDataTypeForTagSnafu,
};
use crate::otlp::trace::span::bytes_to_hex_string;

pub const LOG_TABLE_NAME: &str = "opentelemetry_logs";

/// Convert OpenTelemetry metrics to GreptimeDB insert requests
///
Expand Down Expand Up @@ -772,43 +774,6 @@ fn key_value_to_map(key_values: Vec<KeyValue>) -> BTreeMap<String, PipelineValue
map
}

fn any_value_to_jsonb(value: any_value::Value) -> JsonbValue<'static> {
match value {
any_value::Value::StringValue(s) => JsonbValue::String(s.into()),
any_value::Value::IntValue(i) => JsonbValue::Number(JsonbNumber::Int64(i)),
any_value::Value::DoubleValue(d) => JsonbValue::Number(JsonbNumber::Float64(d)),
any_value::Value::BoolValue(b) => JsonbValue::Bool(b),
any_value::Value::ArrayValue(a) => {
let values = a
.values
.into_iter()
.map(|v| match v.value {
Some(value) => any_value_to_jsonb(value),
None => JsonbValue::Null,
})
.collect();
JsonbValue::Array(values)
}
any_value::Value::KvlistValue(kv) => key_value_to_jsonb(kv.values),
any_value::Value::BytesValue(b) => JsonbValue::String(bytes_to_hex_string(&b).into()),
}
}

fn key_value_to_jsonb(key_values: Vec<KeyValue>) -> JsonbValue<'static> {
let mut map = BTreeMap::new();
for kv in key_values {
let value = match kv.value {
Some(value) => match value.value {
Some(value) => any_value_to_jsonb(value),
None => JsonbValue::Null,
},
None => JsonbValue::Null,
};
map.insert(kv.key.clone(), value);
}
JsonbValue::Object(map)
}

fn log_body_to_string(body: &AnyValue) -> String {
let otlp_value = OtlpAnyValue::from(body);
otlp_value.to_string()
Expand Down
28 changes: 0 additions & 28 deletions src/servers/src/otlp/plugin.rs

This file was deleted.

Loading