Skip to content

Upgrade opentelemetry deps from 0.26 -> 0.29 #907

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ derive_more = { version = "2.0", features = ["constructor", "display", "from", "
thiserror = "2"
tonic = "0.12"
tonic-build = "0.12"
opentelemetry = { version = "0.26", features = ["metrics"] }
opentelemetry = { version = "0.29", features = ["metrics"] }
prost = "0.13"
prost-types = "0.13"

Expand Down
8 changes: 4 additions & 4 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ itertools = "0.14"
lru = "0.13"
mockall = "0.13"
opentelemetry = { workspace = true, features = ["metrics"], optional = true }
opentelemetry_sdk = { version = "0.26", features = ["rt-tokio", "metrics"], optional = true }
opentelemetry-otlp = { version = "0.26", features = ["tokio", "metrics", "tls", "http-proto", "reqwest-client", ], optional = true }
opentelemetry-prometheus = { git = "https://github.com/open-telemetry/opentelemetry-rust.git", rev = "e911383", optional = true }
opentelemetry_sdk = { version = "0.29", features = ["rt-tokio", "metrics", "spec_unstable_metrics_views"], optional = true }
opentelemetry-otlp = { version = "0.29", features = ["tokio", "metrics", "tls", "http-proto", "reqwest-client", "grpc-tonic" ], optional = true }
opentelemetry-prometheus = { version = "0.29", optional = true }
parking_lot = { version = "0.12", features = ["send_guard"] }
pid = "4.0"
pin-project = "1.0"
prometheus = "0.13"
prometheus = "0.14"
prost = { workspace = true }
prost-types = { version = "0.6", package = "prost-wkt-types" }
rand = "0.9"
Expand Down
105 changes: 51 additions & 54 deletions core/src/telemetry/otel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,16 @@ use super::{
};
use crate::{abstractions::dbg_panic, telemetry::metrics::DEFAULT_S_BUCKETS};
use opentelemetry::{
self, Key, KeyValue, Value, global,
self, Key, KeyValue, Value,
metrics::{Meter, MeterProvider as MeterProviderT},
};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, WithTonicConfig};
use opentelemetry_sdk::{
Resource,
metrics::{
Aggregation, Instrument, InstrumentKind, MeterProviderBuilder, PeriodicReader,
SdkMeterProvider, View, data::Temporality, new_view, reader::TemporalitySelector,
Aggregation, Instrument, InstrumentKind, MeterProviderBuilder, MetricError, PeriodicReader,
SdkMeterProvider, Temporality, View, new_view,
},
runtime,
};
use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration};
use temporal_sdk_core_api::telemetry::{
Expand All @@ -35,10 +34,10 @@ use temporal_sdk_core_api::telemetry::{
use tokio::task::AbortHandle;
use tonic::{metadata::MetadataMap, transport::ClientTlsConfig};

fn histo_view(
metric_name: &'static str,
use_seconds: bool,
) -> opentelemetry::metrics::Result<Box<dyn View>> {
/// A specialized `Result` type for metric operations.
type Result<T> = std::result::Result<T, MetricError>;

fn histo_view(metric_name: &'static str, use_seconds: bool) -> Result<Box<dyn View>> {
let buckets = default_buckets_for(metric_name, use_seconds);
new_view(
Instrument::new().name(format!("*{metric_name}")),
Expand All @@ -56,7 +55,7 @@ pub(super) fn augment_meter_provider_with_defaults(
global_tags: &HashMap<String, String>,
use_seconds: bool,
bucket_overrides: HistogramBucketOverrides,
) -> opentelemetry::metrics::Result<MeterProviderBuilder> {
) -> Result<MeterProviderBuilder> {
for (name, buckets) in bucket_overrides.overrides {
mpb = mpb.with_view(new_view(
Instrument::new().name(format!("*{name}")),
Expand Down Expand Up @@ -117,31 +116,28 @@ pub(super) fn augment_meter_provider_with_defaults(
/// Create an OTel meter that can be used as a [CoreMeter] to export metrics over OTLP.
pub fn build_otlp_metric_exporter(
opts: OtelCollectorOptions,
) -> Result<CoreOtelMeter, anyhow::Error> {
global::set_error_handler(|err| {
tracing::error!("{}", err);
})?;
Comment on lines -121 to -123
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

) -> std::result::Result<CoreOtelMeter, anyhow::Error> {
let exporter = match opts.protocol {
OtlpProtocol::Grpc => {
let mut exporter = opentelemetry_otlp::TonicExporterBuilder::default()
let mut exporter = opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_endpoint(opts.url.to_string());
if opts.url.scheme() == "https" || opts.url.scheme() == "grpcs" {
exporter = exporter.with_tls_config(ClientTlsConfig::new().with_native_roots());
}
exporter
.with_metadata(MetadataMap::from_headers((&opts.headers).try_into()?))
.build_metrics_exporter(Box::new(metric_temporality_to_selector(
opts.metric_temporality,
)))?
.with_temporality(metric_temporality_to_temporality(opts.metric_temporality))
.build()?
}
OtlpProtocol::Http => opentelemetry_otlp::HttpExporterBuilder::default()
OtlpProtocol::Http => opentelemetry_otlp::MetricExporter::builder()
.with_http()
.with_endpoint(opts.url.to_string())
.with_headers(opts.headers)
.build_metrics_exporter(Box::new(metric_temporality_to_selector(
opts.metric_temporality,
)))?,
.with_temporality(metric_temporality_to_temporality(opts.metric_temporality))
.build()?,
};
let reader = PeriodicReader::builder(exporter, runtime::Tokio)
let reader = PeriodicReader::builder(exporter)
.with_interval(opts.metric_periodicity)
.build();
let mp = augment_meter_provider_with_defaults(
Expand Down Expand Up @@ -170,7 +166,7 @@ pub struct StartedPromServer {
/// Requires a Tokio runtime to exist, and will block briefly while binding the server endpoint.
pub fn start_prometheus_metric_exporter(
opts: PrometheusExporterOptions,
) -> Result<StartedPromServer, anyhow::Error> {
) -> std::result::Result<StartedPromServer, anyhow::Error> {
let (srv, exporter) = PromServer::new(&opts)?;
let meter_provider = augment_meter_provider_with_defaults(
MeterProviderBuilder::default().with_reader(exporter),
Expand Down Expand Up @@ -228,7 +224,7 @@ impl CoreMeter for CoreOtelMeter {
.u64_counter(params.name)
.with_unit(params.unit)
.with_description(params.description)
.init(),
.build(),
)
}

Expand All @@ -238,7 +234,7 @@ impl CoreMeter for CoreOtelMeter {
.u64_histogram(params.name)
.with_unit(params.unit)
.with_description(params.description)
.init(),
.build(),
)
}

Expand All @@ -248,7 +244,7 @@ impl CoreMeter for CoreOtelMeter {
.f64_histogram(params.name)
.with_unit(params.unit)
.with_description(params.description)
.init(),
.build(),
)
}

Expand All @@ -268,7 +264,7 @@ impl CoreMeter for CoreOtelMeter {
.u64_gauge(params.name)
.with_unit(params.unit)
.with_description(params.description)
.init(),
.build(),
)
}

Expand All @@ -278,7 +274,7 @@ impl CoreMeter for CoreOtelMeter {
.f64_gauge(params.name)
.with_unit(params.unit)
.with_description(params.description)
.init(),
.build(),
)
}
}
Expand All @@ -303,41 +299,42 @@ fn default_resource_instance() -> &'static Resource {

static INSTANCE: OnceLock<Resource> = OnceLock::new();
INSTANCE.get_or_init(|| {
let resource = Resource::default();
if resource.get(Key::from("service.name")) == Some(Value::from("unknown_service")) {
let resource = Resource::builder().build();
if resource.get(&Key::from("service.name")) == Some(Value::from("unknown_service")) {
// otel spec recommends to leave service.name as unknown_service but we want to
// maintain backwards compatability with existing library behaviour
return resource.merge(&Resource::new([KeyValue::new(
"service.name",
TELEM_SERVICE_NAME,
)]));
return Resource::builder_empty()
.with_attributes(
resource
.iter()
.map(|(k, v)| KeyValue::new(k.clone(), v.clone())),
)
.with_attribute(KeyValue::new("service.name", TELEM_SERVICE_NAME))
.build();
}
resource
})
}

fn default_resource(override_values: &HashMap<String, String>) -> Resource {
let override_kvs = override_values
.iter()
.map(|(k, v)| KeyValue::new(k.clone(), v.clone()));
default_resource_instance()
.clone()
.merge(&Resource::new(override_kvs))
}

#[derive(Clone)]
struct ConstantTemporality(Temporality);

impl TemporalitySelector for ConstantTemporality {
fn temporality(&self, _: InstrumentKind) -> Temporality {
self.0
}
Resource::builder_empty()
.with_attributes(
default_resource_instance()
.iter()
.map(|(k, v)| KeyValue::new(k.clone(), v.clone())),
)
.with_attributes(
override_values
.iter()
.map(|(k, v)| KeyValue::new(k.clone(), v.clone())),
)
.build()
}

fn metric_temporality_to_selector(t: MetricTemporality) -> impl TemporalitySelector + Clone {
fn metric_temporality_to_temporality(t: MetricTemporality) -> Temporality {
match t {
MetricTemporality::Cumulative => ConstantTemporality(Temporality::Cumulative),
MetricTemporality::Delta => ConstantTemporality(Temporality::Delta),
MetricTemporality::Cumulative => Temporality::Cumulative,
MetricTemporality::Delta => Temporality::Delta,
}
}

Expand All @@ -349,7 +346,7 @@ pub(crate) mod tests {
#[test]
pub(crate) fn default_resource_instance_service_name_default() {
let resource = default_resource_instance();
let service_name = resource.get(Key::from("service.name"));
let service_name = resource.get(&Key::from("service.name"));
assert_eq!(service_name, Some(Value::from(TELEM_SERVICE_NAME)));
}
}
76 changes: 1 addition & 75 deletions tests/integ_tests/metrics_tests.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
use crate::integ_tests::mk_nexus_endpoint;
use anyhow::anyhow;
use assert_matches::assert_matches;
use std::{
collections::HashMap,
env,
string::ToString,
sync::{Arc, Mutex},
time::Duration,
};
use std::{collections::HashMap, env, string::ToString, sync::Arc, time::Duration};
use temporal_client::{
REQUEST_LATENCY_HISTOGRAM_NAME, WorkflowClientTrait, WorkflowOptions, WorkflowService,
};
Expand Down Expand Up @@ -60,7 +54,6 @@ use temporal_sdk_core_test_utils::{
get_integ_server_options, get_integ_telem_options, prom_metrics,
};
use tokio::{join, sync::Barrier};
use tracing_subscriber::fmt::MakeWriter;
use url::Url;

pub(crate) async fn get_text(endpoint: String) -> String {
Expand Down Expand Up @@ -1111,70 +1104,3 @@ async fn evict_on_complete_does_not_count_as_forced_eviction() {
// Metric shouldn't show up at all, since it's zero the whole time.
assert!(!body.contains("temporal_sticky_cache_total_forced_eviction"));
}

struct CapturingWriter {
buf: Arc<Mutex<Vec<u8>>>,
}

impl MakeWriter<'_> for CapturingWriter {
type Writer = CapturingHandle;

fn make_writer(&self) -> Self::Writer {
CapturingHandle(self.buf.clone())
}
}

struct CapturingHandle(Arc<Mutex<Vec<u8>>>);

impl std::io::Write for CapturingHandle {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let mut b = self.0.lock().unwrap();
b.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}

#[tokio::test]
async fn otel_errors_logged_as_errors() {
// Set up tracing subscriber to capture ERROR logs
let logs = Arc::new(Mutex::new(Vec::new()));
let writer = CapturingWriter { buf: logs.clone() };
let subscriber = tracing_subscriber::fmt().with_writer(writer).finish();
let _guard = tracing::subscriber::set_default(subscriber);

let opts = OtelCollectorOptionsBuilder::default()
.url("https://localhost:12345/v1/metrics".parse().unwrap()) // Nothing bound on that port
.build()
.unwrap();
let exporter = build_otlp_metric_exporter(opts).unwrap();

let telemopts = TelemetryOptionsBuilder::default()
.metrics(Arc::new(exporter) as Arc<dyn CoreMeter>)
.build()
.unwrap();

let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap();
let mut starter = CoreWfStarter::new_with_runtime("otel_errors_logged_as_errors", rt);
let _worker = starter.get_worker().await;

// Wait to allow exporter to attempt sending metrics and fail.
// Windows takes a while to fail the network attempt for some reason so 5s.
tokio::time::sleep(Duration::from_secs(5)).await;

let logs = logs.lock().unwrap();
let log_str = String::from_utf8_lossy(&logs);

assert!(
log_str.contains("ERROR"),
"Expected ERROR log not found in logs: {}",
log_str
);
assert!(
log_str.contains("Metrics exporter otlp failed with the grpc server returns error"),
"Expected an OTel exporter error message in logs: {}",
log_str
);
}
Loading