diff --git a/Cargo.lock b/Cargo.lock index f14aac1..00b0187 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -44,150 +44,6 @@ version = "1.0.102" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" -[[package]] -name = "async-channel" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" -dependencies = [ - "concurrent-queue", - "event-listener 2.5.3", - "futures-core", -] - -[[package]] -name = "async-channel" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "924ed96dd52d1b75e9c1a3e6275715fd320f5f9439fb5a4a11fa51f4221158d2" -dependencies = [ - "concurrent-queue", - "event-listener-strategy", - "futures-core", - "pin-project-lite", -] - -[[package]] -name = "async-executor" -version = "1.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c96bf972d85afc50bf5ab8fe2d54d1586b4e0b46c97c50a0c9e71e2f7bcd812a" -dependencies = [ - "async-task", - "concurrent-queue", - "fastrand", - "futures-lite", - "pin-project-lite", - "slab", -] - -[[package]] -name = "async-global-executor" -version = "2.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" -dependencies = [ - "async-channel 2.5.0", - "async-executor", - "async-io", - "async-lock", - "blocking", - "futures-lite", - "once_cell", -] - -[[package]] -name = "async-io" -version = "2.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "456b8a8feb6f42d237746d4b3e9a178494627745c3c56c6ea55d92ba50d026fc" -dependencies = [ - "autocfg", - "cfg-if", - "concurrent-queue", - "futures-io", - "futures-lite", - "parking", - "polling", - "rustix", - "slab", - "windows-sys 0.61.2", -] - -[[package]] -name = "async-lock" -version = "3.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "290f7f2596bd5b78a9fec8088ccd89180d7f9f55b94b0576823bbbdc72ee8311" -dependencies = [ - "event-listener 5.4.1", - "event-listener-strategy", - "pin-project-lite", -] - -[[package]] -name = "async-process" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc50921ec0055cdd8a16de48773bfeec5c972598674347252c0399676be7da75" -dependencies = [ - "async-channel 2.5.0", - "async-io", - "async-lock", - "async-signal", - "async-task", - "blocking", - "cfg-if", - "event-listener 5.4.1", - "futures-lite", - "rustix", -] - -[[package]] -name = "async-signal" -version = "0.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52b5aaafa020cf5053a01f2a60e8ff5dccf550f0f77ec54a4e47285ac2bab485" -dependencies = [ - "async-io", - "async-lock", - "atomic-waker", - "cfg-if", - "futures-core", - "futures-io", - "rustix", - "signal-hook-registry", - "slab", - "windows-sys 0.61.2", -] - -[[package]] -name = "async-std" -version = "1.13.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c8e079a4ab67ae52b7403632e4618815d6db36d2a010cfe41b02c1b1578f93b" -dependencies = [ - "async-channel 1.9.0", - "async-global-executor", - "async-io", - "async-lock", - "async-process", - "crossbeam-utils", - "futures-channel", - "futures-core", - "futures-io", - "futures-lite", - "gloo-timers", - "kv-log-macro", - "log", - "memchr", - "once_cell", - "pin-project-lite", - "pin-utils", - "slab", - "wasm-bindgen-futures", -] - [[package]] name = "async-stream" version = "0.3.6" @@ -210,12 +66,6 @@ dependencies = [ "syn", ] -[[package]] -name = "async-task" -version = "4.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" - [[package]] name = "async-trait" version = "0.1.89" @@ -291,19 +141,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "blocking" -version = "1.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e83f8d02be6967315521be875afa792a316e28d57b5a2d401897e2a7921b7f21" -dependencies = [ - "async-channel 2.5.0", - "async-task", - "futures-io", - "futures-lite", - "piper", -] - [[package]] name = "bumpalo" version = "3.20.2" @@ -623,12 +460,6 @@ dependencies = [ "windows-sys 0.61.2", ] -[[package]] -name = "event-listener" -version = "2.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" - [[package]] name = "event-listener" version = "5.4.1" @@ -640,16 +471,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "event-listener-strategy" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" -dependencies = [ - "event-listener 5.4.1", - "pin-project-lite", -] - [[package]] name = "fastrand" version = "2.4.1" @@ -759,19 +580,6 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" -[[package]] -name = "futures-lite" -version = "2.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f78e10609fe0e0b3f4157ffab1876319b5b0db102a2c60dc4626306dc46b44ad" -dependencies = [ - "fastrand", - "futures-core", - "futures-io", - "parking", - "pin-project-lite", -] - [[package]] name = "futures-macro" version = "0.3.32" @@ -862,24 +670,6 @@ dependencies = [ "wasip3", ] -[[package]] -name = "glob" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" - -[[package]] -name = "gloo-timers" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" -dependencies = [ - "futures-channel", - "futures-core", - "js-sys", - "wasm-bindgen", -] - [[package]] name = "h2" version = "0.4.14" @@ -1262,15 +1052,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "kv-log-macro" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" -dependencies = [ - "log", -] - [[package]] name = "lazy_static" version = "1.5.0" @@ -1326,9 +1107,6 @@ name = "log" version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" -dependencies = [ - "value-bag", -] [[package]] name = "lru-slab" @@ -1398,60 +1176,6 @@ version = "11.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" -[[package]] -name = "opentelemetry" -version = "0.27.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab70038c28ed37b97d8ed414b6429d343a8bbf44c9f79ec854f3a643029ba6d7" -dependencies = [ - "futures-core", - "futures-sink", - "js-sys", - "pin-project-lite", - "thiserror 1.0.69", - "tracing", -] - -[[package]] -name = "opentelemetry-http" -version = "0.27.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10a8a7f5f6ba7c1b286c2fbca0454eaba116f63bbe69ed250b642d36fbb04d80" -dependencies = [ - "async-trait", - "bytes", - "http", - "opentelemetry", -] - -[[package]] -name = "opentelemetry-semantic-conventions" -version = "0.27.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc1b6902ff63b32ef6c489e8048c5e253e2e4a803ea3ea7e783914536eb15c52" - -[[package]] -name = "opentelemetry_sdk" -version = "0.27.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "231e9d6ceef9b0b2546ddf52335785ce41252bc7474ee8ba05bfad277be13ab8" -dependencies = [ - "async-std", - "async-trait", - "futures-channel", - "futures-executor", - "futures-util", - "glob", - "opentelemetry", - "percent-encoding", - "rand 0.8.6", - "serde_json", - "thiserror 1.0.69", - "tokio", - "tokio-stream", - "tracing", -] - [[package]] name = "parking" version = "2.2.1" @@ -1483,7 +1207,7 @@ dependencies = [ [[package]] name = "partly-proxy-echo" -version = "0.2.0" +version = "0.3.0" dependencies = [ "base64", "bytes", @@ -1500,7 +1224,7 @@ dependencies = [ [[package]] name = "partly-proxy-lib" -version = "0.2.0" +version = "0.3.0" dependencies = [ "async-trait", "base64", @@ -1515,10 +1239,6 @@ dependencies = [ "hyper", "hyper-rustls", "hyper-util", - "opentelemetry", - "opentelemetry-http", - "opentelemetry-semantic-conventions", - "opentelemetry_sdk", "partly-proxy-echo", "partly-proxy-storage-jsonl", "partly-proxy-storage-sqlite", @@ -1537,7 +1257,6 @@ dependencies = [ "tokio-rustls", "tokio-util", "tracing", - "tracing-opentelemetry", "tracing-subscriber", "uuid", "webpki-roots", @@ -1545,7 +1264,7 @@ dependencies = [ [[package]] name = "partly-proxy-storage-jsonl" -version = "0.2.0" +version = "0.3.0" dependencies = [ "async-stream", "async-trait", @@ -1561,7 +1280,7 @@ dependencies = [ [[package]] name = "partly-proxy-storage-sqlite" -version = "0.2.0" +version = "0.3.0" dependencies = [ "async-stream", "async-trait", @@ -1576,7 +1295,7 @@ dependencies = [ [[package]] name = "partly-proxy-types" -version = "0.2.0" +version = "0.3.0" dependencies = [ "async-trait", "base64", @@ -1614,23 +1333,6 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" -[[package]] -name = "pin-utils" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" - -[[package]] -name = "piper" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c835479a4443ded371d6c535cbfd8d31ad92c5d23ae9770a61bc155e4992a3c1" -dependencies = [ - "atomic-waker", - "fastrand", - "futures-io", -] - [[package]] name = "pkg-config" version = "0.3.33" @@ -1665,20 +1367,6 @@ dependencies = [ "plotters-backend", ] -[[package]] -name = "polling" -version = "3.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d0e4f59085d47d8241c88ead0f274e8a0cb551f3625263c05eb8dd897c34218" -dependencies = [ - "cfg-if", - "concurrent-queue", - "hermit-abi", - "pin-project-lite", - "rustix", - "windows-sys 0.61.2", -] - [[package]] name = "potential_utf" version = "0.1.5" @@ -1751,7 +1439,7 @@ dependencies = [ "bytes", "getrandom 0.3.4", "lru-slab", - "rand 0.9.4", + "rand", "ring", "rustc-hash", "rustls", @@ -1798,35 +1486,14 @@ version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" -[[package]] -name = "rand" -version = "0.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ca0ecfa931c29007047d1bc58e623ab12e5590e8c7cc53200d5202b69266d8a" -dependencies = [ - "libc", - "rand_chacha 0.3.1", - "rand_core 0.6.4", -] - [[package]] name = "rand" version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44c5af06bb1b7d3216d91932aed5265164bf384dc89cd6ba05cf59a35f5f76ea" dependencies = [ - "rand_chacha 0.9.0", - "rand_core 0.9.5", -] - -[[package]] -name = "rand_chacha" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" -dependencies = [ - "ppv-lite86", - "rand_core 0.6.4", + "rand_chacha", + "rand_core", ] [[package]] @@ -1836,16 +1503,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" dependencies = [ "ppv-lite86", - "rand_core 0.9.5", -] - -[[package]] -name = "rand_core" -version = "0.6.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" -dependencies = [ - "getrandom 0.2.17", + "rand_core", ] [[package]] @@ -2224,7 +1882,7 @@ dependencies = [ "crc", "crossbeam-queue", "either", - "event-listener 5.4.1", + "event-listener", "futures-core", "futures-intrusive", "futures-io", @@ -2638,22 +2296,6 @@ dependencies = [ "tracing-core", ] -[[package]] -name = "tracing-opentelemetry" -version = "0.28.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97a971f6058498b5c0f1affa23e7ea202057a7301dbff68e968b2d578bcbd053" -dependencies = [ - "js-sys", - "once_cell", - "opentelemetry", - "opentelemetry_sdk", - "tracing", - "tracing-core", - "tracing-subscriber", - "web-time", -] - [[package]] name = "tracing-subscriber" version = "0.3.23" @@ -2738,12 +2380,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" -[[package]] -name = "value-bag" -version = "1.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ba6f5989077681266825251a52748b8c1d8a4ad098cc37e440103d0ea717fc0" - [[package]] name = "vcpkg" version = "0.2.15" diff --git a/Cargo.toml b/Cargo.toml index 0c0d5f8..9d161cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,10 +3,10 @@ resolver = "2" members = ["crates/*"] [workspace.package] -version = "0.2.0" +version = "0.3.0" edition = "2024" license = "MIT OR Apache-2.0" -repository = "https://github.com/partly/api-proxy" +repository = "https://github.com/thepartly/api-proxy" authors = ["Partly "] rust-version = "1.85" @@ -16,9 +16,9 @@ rust-version = "1.85" # - `cargo publish` / `cargo release` use `version` from the registry. # Keep these versions in lock-step with `workspace.package.version` above; # `release.toml` (shared-version = true) enforces that on release. -partly-proxy-types = { version = "0.2.0", path = "crates/partly-proxy-types" } -partly-proxy-storage-jsonl = { version = "0.2.0", path = "crates/partly-proxy-storage-jsonl" } -partly-proxy-storage-sqlite = { version = "0.2.0", path = "crates/partly-proxy-storage-sqlite" } +partly-proxy-types = { version = "0.3.0", path = "crates/partly-proxy-types" } +partly-proxy-storage-jsonl = { version = "0.3.0", path = "crates/partly-proxy-storage-jsonl" } +partly-proxy-storage-sqlite = { version = "0.3.0", path = "crates/partly-proxy-storage-sqlite" } # `partly-proxy-echo` is `publish = false`; only consumed as a dev-dep # inside the workspace. Path-only is fine since dev-deps without a # version are stripped from the published manifest. @@ -87,19 +87,6 @@ rcgen = "0.13" # bodies via `to_async(&runtime).iter_custom(...)`. criterion = { version = "0.5", features = ["async_tokio"] } -# OpenTelemetry — version-suffixed renames so multiple minor versions can -# coexist in the workspace as the OTEL Rust crates churn. See the proxy -# lib's `otel_0_*` features. Add a new block per supported minor; do not -# remove old ones until consumers have migrated. -opentelemetry_0_27 = { package = "opentelemetry", version = "0.27" } -opentelemetry-http_0_27 = { package = "opentelemetry-http", version = "0.27" } -opentelemetry-semantic-conventions_0_27 = { package = "opentelemetry-semantic-conventions", version = "0.27", features = ["semconv_experimental"] } -tracing-opentelemetry_0_28 = { package = "tracing-opentelemetry", version = "0.28", default-features = false } - -# Test-only OTEL SDK for the integration tests' in-memory span exporter. -# Not pulled in by the lib itself. -opentelemetry_sdk_0_27 = { package = "opentelemetry_sdk", version = "0.27", features = ["testing"] } - [profile.dev] opt-level = 0 diff --git a/README.md b/README.md index 298c7fd..326fbc4 100644 --- a/README.md +++ b/README.md @@ -129,55 +129,3 @@ to them. | Wait-for `AssertSeen` / `AssertCount` | Overshoot terminates fast | | Hosting example (`examples/host.rs`) | Env-var-driven; ~30 lines | | TypeScript client + vitest | Mock + real-binary e2e suites | -| OpenTelemetry (`otel_0_*` features) | W3C extraction + opt-in upstream injection | - -## OpenTelemetry - -Tracing/instrumentation support is feature-gated and off by default. The -library: - -- Extracts the W3C `traceparent`/`tracestate` from inbound requests and - parents a server span on the incoming context (opt-out per upstream - via `without_otel_extraction`, or per request via `with_otel_filter`). -- Injects the resulting context onto the response so callers can - correlate (equivalent of the older `OtelInResponseLayer`). -- **Does not** inject context onto outbound requests unless explicitly - asked via `with_otel_propagation_to_upstream` on the upstream's - `ProxyConfig`. -- Records HTTP attributes per the current OTEL semantic conventions - (`http.request.method`, `http.response.status_code`, `http.route` = - upstream name, `url.path`, `url.scheme`, etc.) and maps 5xx responses - to `Status::Error`. - -The library does **not** install a tracer provider, exporter, -propagator, or `tracing-subscriber`. That is the host binary's -responsibility — it has full control over which exporter, sampler, and -resource attributes to use. The lib just consults -`opentelemetry::global` for whatever the host has set up. - -### Version pinning - -The OTEL Rust crates ship breaking changes at every 0.x bump and -ecosystem crates can be stuck on different versions for months. One -Cargo feature per OTEL minor lets the lib track multiple minors as the -ecosystem migrates: - -| Feature | `opentelemetry` minor | Sibling crates | -| ------------ | --------------------- | ------------------------------------------------------------------------------ | -| `otel_0_27` | 0.27.x | `opentelemetry-http` 0.27, `opentelemetry-semantic-conventions` 0.27, `tracing-opentelemetry` 0.28 | - -Only one `otel_0_*` feature may be enabled at a time — `lib.rs` carries -a `compile_error!` guard that trips when more than one is selected -(the host's installed propagator must match the lib's compiled-in OTEL -version to round-trip context). Future versions are additive: a new -`otel_0_28` feature, new dep renames, and a new `v0_28.rs` impl module -all sit alongside the existing ones with no renames. - -```toml -[dependencies] -partly-proxy-lib = { version = "0.1", features = ["otel_0_27"] } -``` - -In the host binary, install a tracer provider, propagator, and -`tracing-subscriber` against the matching `opentelemetry` minor; the -lib will see them via `opentelemetry::global`. diff --git a/SPECIFICATION.md b/SPECIFICATION.md index ff1b18e..ffa75bb 100644 --- a/SPECIFICATION.md +++ b/SPECIFICATION.md @@ -639,12 +639,6 @@ The crate does not ship a hosting binary — wiring `ProxyClusterBuilder` into a - **Hop-by-hop headers.** Headers are forwarded as-is; `hyper`'s defaults handle `Content-Length` and `Transfer-Encoding` normalisation, but other hop-by-hop headers are not stripped. - **One-shot replay.** Replay snapshots are reusable — there is no per-snapshot consumption tracking and no "all snapshots consumed" assertion. - **Connection cap.** No semaphore on accepted connections; very high concurrency is bounded only by the OS and the upstream. -- **Telemetry.** Tracing is supported via `tracing`. OpenTelemetry support - is feature-gated (`otel_0_*` features, off by default) and provides - W3C TraceContext extraction, response-side injection, and opt-in - upstream injection — see §21. The library does **not** install a - tracer provider, exporter, propagator, or `tracing-subscriber`; the - host binary owns that. --- @@ -676,72 +670,3 @@ The crate does not ship a hosting binary — wiring `ProxyClusterBuilder` into a 2. Bind the JSON-Lines TCP adapter to a known port. 3. Have a non-Rust test harness open a TCP connection and issue commands as JSON lines. For Playwright suites, use the first-party TypeScript client (see §12.3) instead of hand-rolling the protocol. 4. The harness can stub, pause, query traffic, assert, and shut the proxy down without ever linking the Rust crate. - ---- - -## 21. OpenTelemetry - -Feature-gated, off by default. One Cargo feature per `opentelemetry` -minor version (`otel_0_27`, future `otel_0_28`, …) so the crate can -track several minors side-by-side as the OTEL Rust stack evolves. The -features are mutually exclusive — enabling more than one trips a -`compile_error!` — because `opentelemetry::global::*` is non-reentrant -across minor versions. - -### 21.1 Responsibility split - -The library does **not** install any tracer-side machinery. That is -the host binary's job: it picks the exporter (OTLP/gRPC, OTLP/HTTP, …), -the sampler, the resource attributes, the `TracerProvider`, the -`TextMapPropagator`, and the `tracing-subscriber` composition. The lib -calls `opentelemetry::global::get_text_map_propagator` and -`tracing::Span` methods from `tracing-opentelemetry`; whatever the host -installs is what it gets, including a no-op tracer if the host doesn't -install one. - -### 21.2 Propagation contract - -When any `otel_0_*` feature is enabled: - -- **Inbound extraction (opt-out).** Each inbound request creates a - server span. If the request carried a `traceparent`/`tracestate`, - the span is parented to that context. Disable per upstream via - `ProxyConfig::without_otel_extraction()`; skip individual requests - via `ProxyConfig::with_otel_filter(|method, uri| -> bool)`. -- **Response injection (always, when a span was created).** The proxy - emits a `traceparent` on the response so callers can correlate. This - is the W3C-native equivalent of the older - `axum-tracing-opentelemetry::OtelInResponseLayer`. -- **Outbound injection (opt-in).** Disabled by default. Enable per - upstream via `ProxyConfig::with_otel_propagation_to_upstream()`. When - off, the proxy does not add any tracing headers to forwarded - requests; client-supplied tracing headers still flow through - unchanged because the proxy forwards inbound headers as-is. - -### 21.3 Span shape - -Server span (kind `SERVER`): - -- Name: `"{http.request.method} {http.route}"` where `http.route` is - the upstream's configured name (the closest analogue to a route a - proxy has and bounds attribute cardinality). -- Attributes: `http.request.method`, `http.response.status_code`, - `http.route`, `url.path`, `url.query`, `url.scheme`, - `server.address`/`server.port`, `client.address`/`client.port`, - `user_agent.original`, `network.protocol.version`, plus - `partly.proxy.upstream` as a namespaced custom attribute. -- `Status::Error` is set for 5xx responses; other outcomes leave the - status at `Unset`. - -Client span (kind `CLIENT`, child of the server span): - -- Name: `"{http.request.method}"` per OTEL HTTP-client semconv. -- Attributes: `http.request.method`, `url.full`, `server.address`/ - `server.port` from the outbound URI, `http.response.status_code`, - `partly.proxy.upstream`. - -### 21.4 Control plane - -The TCP JSON-Lines control plane is **not** traced — it isn't -user-facing HTTP traffic and the cardinality wouldn't be useful. Only -the per-upstream listeners are wired to the OTEL helpers. diff --git a/crates/partly-proxy-lib/Cargo.toml b/crates/partly-proxy-lib/Cargo.toml index a715d84..975558b 100644 --- a/crates/partly-proxy-lib/Cargo.toml +++ b/crates/partly-proxy-lib/Cargo.toml @@ -21,27 +21,6 @@ storage-jsonl = ["dep:partly-proxy-storage-jsonl"] storage-sqlite = ["dep:partly-proxy-storage-sqlite"] testing = [] -# OpenTelemetry support. One feature per `opentelemetry` minor version so -# the lib can track multiple minors side-by-side as the OTEL Rust stack -# evolves. Enable at most one at a time — `lib.rs` enforces this with a -# `compile_error!`. -# -# The library does *not* install a tracer provider, propagator, exporter, -# or `tracing-subscriber`. The host binary is responsible for that. The -# feature only wires the lib's extraction / injection / span helpers. -# -# `_otel_any` is an internal alias enabled by every version feature so -# cfg gates inside the crate stay version-agnostic. Do not enable it -# directly. -_otel_any = [] -otel_0_27 = [ - "_otel_any", - "dep:opentelemetry_0_27", - "dep:opentelemetry-http_0_27", - "dep:opentelemetry-semantic-conventions_0_27", - "dep:tracing-opentelemetry_0_28", -] - [lints.rust] unsafe_code = "forbid" @@ -96,14 +75,6 @@ uuid = { workspace = true } chrono = { workspace = true } tracing = { workspace = true } -# OpenTelemetry — optional, version-suffixed. The active set is selected -# by the `otel_0_*` feature flags above. None of these are linked unless -# a version feature is enabled. -opentelemetry_0_27 = { workspace = true, optional = true } -opentelemetry-http_0_27 = { workspace = true, optional = true } -opentelemetry-semantic-conventions_0_27 = { workspace = true, optional = true } -tracing-opentelemetry_0_28 = { workspace = true, optional = true } - [dev-dependencies] tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } tempfile = { workspace = true } @@ -120,12 +91,6 @@ partly-proxy-storage-jsonl = { workspace = true } partly-proxy-storage-sqlite = { workspace = true } criterion = { workspace = true } -# Test-only OTEL SDK for the integration tests' in-memory span exporter. -# `opentelemetry_0_27` itself reaches the test crate through the optional -# regular dep when the matching feature is enabled, so it isn't repeated -# here. -opentelemetry_sdk_0_27 = { workspace = true } - [[example]] name = "host" path = "examples/host.rs" diff --git a/crates/partly-proxy-lib/src/config.rs b/crates/partly-proxy-lib/src/config.rs index 340d1d7..013ba0d 100644 --- a/crates/partly-proxy-lib/src/config.rs +++ b/crates/partly-proxy-lib/src/config.rs @@ -6,21 +6,10 @@ //! point where each field is used (when the listener binds, when the outbound //! client is built, etc.). -#[cfg(feature = "_otel_any")] -use std::sync::Arc; use std::{net::SocketAddr, path::PathBuf, time::Duration}; -#[cfg(feature = "_otel_any")] -use http::{Method, Uri}; - -/// Filter applied to incoming requests to decide whether to create an OTEL -/// server span. Returns `true` to trace, `false` to skip. Useful for -/// excluding health probes. -#[cfg(feature = "_otel_any")] -pub type OtelRequestFilter = Arc bool + Send + Sync>; - /// One listener bound to one upstream — see `SPECIFICATION.md` §3.1. -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct ProxyConfig { /// Address the listener binds to. pub bind_addr: SocketAddr, @@ -28,38 +17,6 @@ pub struct ProxyConfig { pub upstream: UpstreamTarget, /// If set, the listener terminates inbound TLS. pub inbound_tls: Option, - /// When `true` (default), each inbound request creates an OTEL server - /// span parented to any `traceparent`/`tracestate` it carried, and the - /// proxy injects the resulting context into the response headers. - /// Set to `false` to bypass OTEL entirely for this listener. - #[cfg(feature = "_otel_any")] - pub otel_extract: bool, - /// When `Some`, called per request to decide whether to create a span - /// (return `true` to trace, `false` to skip). Applied after - /// `otel_extract`. Default: `None` (trace every request). - #[cfg(feature = "_otel_any")] - pub otel_filter: Option, - /// When `true`, the current span's context is injected into outbound - /// request headers before forwarding to the upstream. Default `false`: - /// the proxy does not modify outbound headers for tracing unless asked. - #[cfg(feature = "_otel_any")] - pub otel_propagate_upstream: bool, -} - -impl std::fmt::Debug for ProxyConfig { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let mut d = f.debug_struct("ProxyConfig"); - d.field("bind_addr", &self.bind_addr) - .field("upstream", &self.upstream) - .field("inbound_tls", &self.inbound_tls); - #[cfg(feature = "_otel_any")] - { - d.field("otel_extract", &self.otel_extract) - .field("otel_filter", &self.otel_filter.as_ref().map(|_| "")) - .field("otel_propagate_upstream", &self.otel_propagate_upstream); - } - d.finish() - } } impl ProxyConfig { @@ -69,42 +26,8 @@ impl ProxyConfig { bind_addr, upstream, inbound_tls: None, - #[cfg(feature = "_otel_any")] - otel_extract: true, - #[cfg(feature = "_otel_any")] - otel_filter: None, - #[cfg(feature = "_otel_any")] - otel_propagate_upstream: false, } } - - /// Disable OTEL extraction (and the implicit response-header injection) - /// for this listener. No effect when no `otel_0_*` feature is enabled. - #[cfg(feature = "_otel_any")] - pub fn without_otel_extraction(mut self) -> Self { - self.otel_extract = false; - self - } - - /// Install a request-level filter; returning `false` skips tracing for - /// that request. No effect when no `otel_0_*` feature is enabled. - #[cfg(feature = "_otel_any")] - pub fn with_otel_filter(mut self, f: F) -> Self - where - F: Fn(&Method, &Uri) -> bool + Send + Sync + 'static, - { - self.otel_filter = Some(Arc::new(f)); - self - } - - /// Enable injection of the current trace context into outbound requests - /// forwarded to the upstream. No effect when no `otel_0_*` feature is - /// enabled. - #[cfg(feature = "_otel_any")] - pub fn with_otel_propagation_to_upstream(mut self) -> Self { - self.otel_propagate_upstream = true; - self - } } /// Outbound target description — see `SPECIFICATION.md` §3.2. diff --git a/crates/partly-proxy-lib/src/forwarder.rs b/crates/partly-proxy-lib/src/forwarder.rs index 7a5620b..dba593c 100644 --- a/crates/partly-proxy-lib/src/forwarder.rs +++ b/crates/partly-proxy-lib/src/forwarder.rs @@ -35,11 +35,6 @@ pub(crate) struct Forwarder { client: HyperClient, target: UpstreamTarget, base: BaseUri, - /// When `true`, the current trace context is injected into the - /// outbound request headers before sending to the upstream. Default - /// `false`; set via [`Forwarder::with_otel_propagation`]. - #[cfg(feature = "_otel_any")] - propagate_upstream: bool, } #[derive(Debug, Clone)] @@ -75,30 +70,13 @@ impl Forwarder { client, target, base, - #[cfg(feature = "_otel_any")] - propagate_upstream: false, }) } - /// Enable injection of the current OTEL context into outbound request - /// headers. Default is `false`. - #[cfg(feature = "_otel_any")] - pub(crate) fn with_otel_propagation(mut self, enabled: bool) -> Self { - self.propagate_upstream = enabled; - self - } - /// Forward a `ProxyRequest` to the upstream and return a `ProxyResponse`. /// Connect failures map to `UpstreamConnect`; timeouts and post-handshake /// failures map to `UpstreamRequest`. - /// - /// `upstream_name` is used as an attribute on the client-side OTEL span - /// (when an `otel_0_*` feature is enabled). Ignored otherwise. - pub(crate) async fn forward( - &self, - mut req: ProxyRequest, - #[cfg_attr(not(feature = "_otel_any"), allow(unused_variables))] upstream_name: &str, - ) -> Result { + pub(crate) async fn forward(&self, mut req: ProxyRequest) -> Result { let outbound_uri = self.build_outbound_uri(&req.uri)?; // Recompute the Host header. hyper-util will set one based on the @@ -121,14 +99,6 @@ impl Forwarder { .uri(outbound_uri.clone()) .version(req.version); - // Build the OTEL client span (no-op when no `otel_0_*` feature is on) - // and, when explicitly enabled per upstream, inject the current - // context into the outbound headers. The client span is created - // unconditionally under the feature so the proxy's own timing is - // visible even when the upstream isn't on the trace path. - #[cfg(feature = "_otel_any")] - let client_span = crate::otel::make_client_span(&req.method, &outbound_uri, upstream_name); - if let Some(headers) = builder.headers_mut() { *headers = req.headers.clone(); // hyper-util sets Content-Length from the Full body. If @@ -136,37 +106,22 @@ impl Forwarder { // is stale and would clash with hyper's recomputed value. headers.remove(http::header::CONTENT_LENGTH); headers.remove(http::header::TRANSFER_ENCODING); - - #[cfg(feature = "_otel_any")] - if self.propagate_upstream { - crate::otel::inject_into_request_headers(&client_span, headers); - } } let outbound = builder .body(Full::new(req.body)) .map_err(|e| ProxyError::upstream_request_with("request build failed", e))?; - let request_fut = async { - let fut = self.client.request(outbound); - tokio::time::timeout(self.target.request_timeout, fut) - .await - .map_err(|_| { - ProxyError::upstream_request(format!( - "request to {outbound_uri} timed out after {:?}", - self.target.request_timeout - )) - })? - .map_err(|e| classify_legacy_error(&outbound_uri, e)) - }; - - #[cfg(feature = "_otel_any")] - let resp = { - use tracing::Instrument; - request_fut.instrument(client_span.clone()).await? - }; - #[cfg(not(feature = "_otel_any"))] - let resp = request_fut.await?; + let fut = self.client.request(outbound); + let resp = tokio::time::timeout(self.target.request_timeout, fut) + .await + .map_err(|_| { + ProxyError::upstream_request(format!( + "request to {outbound_uri} timed out after {:?}", + self.target.request_timeout + )) + })? + .map_err(|e| classify_legacy_error(&outbound_uri, e))?; let (resp_parts, resp_body) = resp.into_parts(); let collected = resp_body @@ -175,9 +130,6 @@ impl Forwarder { .map_err(|e| ProxyError::upstream_request_with("response body read failed", e))? .to_bytes(); - #[cfg(feature = "_otel_any")] - crate::otel::record_response_status(&client_span, resp_parts.status); - Ok(ProxyResponse { status: resp_parts.status, headers: resp_parts.headers, diff --git a/crates/partly-proxy-lib/src/lib.rs b/crates/partly-proxy-lib/src/lib.rs index 14ec4b5..0dbf085 100644 --- a/crates/partly-proxy-lib/src/lib.rs +++ b/crates/partly-proxy-lib/src/lib.rs @@ -15,7 +15,6 @@ mod control_plane; mod forwarder; mod listener; pub mod middleware; -mod otel; pub mod proxy_io; pub mod recorder; pub mod replay; @@ -24,15 +23,6 @@ mod tls; mod upstream; pub mod wire; -// Guard: enabling more than one `otel_0_*` feature at a time would link two -// `opentelemetry` minors into one build. Their `global::*` registries are -// version-specific, so the propagator a host installs against one would be -// invisible to the other. Extend the predicate when adding new versions. -#[cfg(all(feature = "otel_0_27", /* future: feature = "otel_0_28" */ false))] -compile_error!( - "partly-proxy-lib: enable at most one `otel_0_*` feature; they are mutually exclusive" -); - pub use assertions::TrafficFilter; pub use builder::{ProxyClusterBuilder, ReplayMissHandler}; pub use cluster::ClusterHandle; diff --git a/crates/partly-proxy-lib/src/listener.rs b/crates/partly-proxy-lib/src/listener.rs index 8d03ea5..7206ed2 100644 --- a/crates/partly-proxy-lib/src/listener.rs +++ b/crates/partly-proxy-lib/src/listener.rs @@ -66,30 +66,11 @@ pub(crate) async fn spawn_listener( .map_err(ProxyError::Bind)?; let bound_addr = listener.local_addr().map_err(ProxyError::Bind)?; - // OTEL inputs are read before `spec.config.upstream` is moved into the - // Forwarder. `bind_addr`/`scheme` are also captured here so the - // listener can populate the OTEL runtime once the socket is bound. - #[cfg(feature = "_otel_any")] - let otel_runtime = crate::upstream::OtelRuntime { - bind_addr: bound_addr, - scheme: if spec.config.inbound_tls.is_some() { - "https" - } else { - "http" - }, - extract: spec.config.otel_extract, - filter: spec.config.otel_filter.clone(), - }; - #[cfg(feature = "_otel_any")] - let propagate_upstream = spec.config.otel_propagate_upstream; - let forwarder = Forwarder::new(spec.config.upstream)?; - #[cfg(feature = "_otel_any")] - let forwarder = forwarder.with_otel_propagation(propagate_upstream); let mut middleware = global_middleware; middleware.extend(spec.middleware); - let runtime = UpstreamRuntime::new( + let runtime = Arc::new(UpstreamRuntime::new( spec.name, forwarder, recorder, @@ -97,10 +78,7 @@ pub(crate) async fn spawn_listener( spec.replay, spec.mode, spec.replay_miss_handler, - ); - #[cfg(feature = "_otel_any")] - let runtime = runtime.with_otel(otel_runtime); - let runtime = Arc::new(runtime); + )); let task = tokio::spawn(accept_loop( listener, @@ -276,24 +254,16 @@ fn is_fatal_accept(e: &std::io::Error) -> bool { async fn handle_request( req: Request, runtime: Arc, - peer: SocketAddr, + _peer: SocketAddr, ) -> std::result::Result>, Infallible> { - use tracing::Instrument; - // Lifecycle stage 3: pause gate. pause_gate(&runtime).await; let started = Instant::now(); let (parts, body) = req.into_parts(); - // Build the OTEL server span before the body is read so it spans the - // full inbound processing. Returns `Span::none()` when no `otel_0_*` - // feature is on, when `otel_extract` is false for this listener, or - // when the per-request filter rejected the request. - let span = build_server_span(&parts, peer, &runtime); - let runtime_for_block = runtime.clone(); - let mut response = async move { + let response = async move { use http_body_util::BodyExt; // Lifecycle stage 4: body collection. @@ -364,62 +334,12 @@ async fn handle_request( } } } - .instrument(span.clone()) .await; - // Lifecycle stage 9: emit response. With OTEL on, also inject the trace - // context into the response headers and record the status on the span. - // Both helpers are no-ops when no `otel_0_*` feature is enabled or when - // `span` is `Span::none()`. - crate::otel::inject_into_response_headers(&span, response.headers_mut()); - crate::otel::record_response_status(&span, response.status()); let _ = runtime; // ensure we hold the Arc until response is built Ok(response) } -/// Build the OTEL server span for an inbound request, or `Span::none()` -/// when no OTEL feature is enabled, extraction is disabled for this -/// listener, or the per-request filter rejected the request. -#[allow(unused_variables)] -fn build_server_span( - parts: &http::request::Parts, - peer: SocketAddr, - runtime: &UpstreamRuntime, -) -> tracing::Span { - #[cfg(feature = "_otel_any")] - { - if !runtime.otel.extract { - return tracing::Span::none(); - } - if let Some(filter) = &runtime.otel.filter { - if !filter(&parts.method, &parts.uri) { - return tracing::Span::none(); - } - } - let parent = crate::otel::extract_parent_context(&parts.headers); - let user_agent = parts - .headers - .get(http::header::USER_AGENT) - .and_then(|v| v.to_str().ok()); - let span = crate::otel::make_server_span(&crate::otel::ServerSpanInputs { - method: &parts.method, - uri: &parts.uri, - version: parts.version, - peer, - bind_addr: runtime.otel.bind_addr, - scheme: runtime.otel.scheme, - user_agent, - upstream_name: &runtime.name, - }); - crate::otel::apply_parent(&span, parent); - span - } - #[cfg(not(feature = "_otel_any"))] - { - tracing::Span::none() - } -} - /// Lifecycle stage 3: block while `runtime.pause` is true. async fn pause_gate(runtime: &UpstreamRuntime) { let mut rx = runtime.pause_receiver(); @@ -468,10 +388,7 @@ impl Terminal for LiveTerminal<'_> { // Stamp before awaiting so the marker is present even // if the forward errors. ctx.insert(ResponseSource::Upstream); - self.runtime - .forwarder - .forward(req, &self.runtime.name) - .await + self.runtime.forwarder.forward(req).await } } }) diff --git a/crates/partly-proxy-lib/src/otel/mod.rs b/crates/partly-proxy-lib/src/otel/mod.rs deleted file mode 100644 index 09151b4..0000000 --- a/crates/partly-proxy-lib/src/otel/mod.rs +++ /dev/null @@ -1,96 +0,0 @@ -//! OpenTelemetry helpers — context extraction, span construction, and -//! response/request header injection. -//! -//! Call sites in `listener.rs` and `forwarder.rs` use the functions -//! re-exported here without `cfg` gates of their own; the per-version -//! impl modules (`v0_27`, future `v0_28`, …) provide the active -//! implementation, and a stub module under `#[cfg(not(feature = -//! "_otel_any"))]` provides no-ops so the surface compiles whether or -//! not any OTEL minor is enabled. -//! -//! The crate does not initialise a tracer provider, exporter, -//! propagator, or `tracing-subscriber`. The host binary owns all of -//! that. These helpers consult `opentelemetry::global` for whatever -//! propagator the host has installed. -//! -//! # Adding a new OTEL minor -//! -//! 1. Add `opentelemetry_0_X` / `opentelemetry-http_0_X` / etc. blocks -//! to the workspace `Cargo.toml`. -//! 2. Add an `otel_0_X = ["_otel_any", ...]` feature in -//! `partly-proxy-lib/Cargo.toml`. -//! 3. Copy `v0_27.rs` to `v0_X.rs` and update import paths. -//! 4. Add `#[cfg(feature = "otel_0_X")] #[path = "v0_X.rs"] mod inner;` -//! here, and extend the `compile_error!` guard in `lib.rs`. -//! 5. Add a `tests/otel_v0_X.rs` patterned on the existing one. - -use std::net::SocketAddr; - -use http::{Method, Uri, Version}; - -/// Inputs needed to build the server span for one inbound request. -/// -/// Lives in `mod.rs` so the version-specific impl modules and the -/// no-op stub share the same shape and the call site in `listener.rs` -/// constructs it once. -#[allow(dead_code)] -pub(crate) struct ServerSpanInputs<'a> { - pub method: &'a Method, - pub uri: &'a Uri, - pub version: Version, - pub peer: SocketAddr, - pub bind_addr: SocketAddr, - pub scheme: &'static str, - pub user_agent: Option<&'a str>, - pub upstream_name: &'a str, -} - -#[cfg(feature = "otel_0_27")] -#[path = "v0_27.rs"] -mod inner; - -#[cfg(not(feature = "_otel_any"))] -#[allow(dead_code)] -mod inner { - //! No-op stub. Compiled when no `otel_0_*` feature is on. - //! - //! Most fns aren't called from the no-OTEL request path (the call - //! sites are themselves inside `#[cfg(feature = "_otel_any")]` - //! blocks). They exist so the API surface in `mod.rs` is stable and - //! the future stubs/version impls stay symmetric. - - use http::{HeaderMap, Method, StatusCode, Uri}; - use tracing::Span; - - use super::ServerSpanInputs; - - /// Opaque parent context. Empty when the feature is off. - #[derive(Debug, Default)] - pub(crate) struct ParentContext; - - pub(crate) fn extract_parent_context(_headers: &HeaderMap) -> ParentContext { - ParentContext - } - - pub(crate) fn make_server_span(_inputs: &ServerSpanInputs<'_>) -> Span { - Span::none() - } - - pub(crate) fn apply_parent(_span: &Span, _parent: ParentContext) {} - - pub(crate) fn inject_into_response_headers(_span: &Span, _headers: &mut HeaderMap) {} - - pub(crate) fn record_response_status(_span: &Span, _status: StatusCode) {} - - pub(crate) fn make_client_span(_method: &Method, _uri: &Uri, _upstream_name: &str) -> Span { - Span::none() - } - - pub(crate) fn inject_into_request_headers(_span: &Span, _headers: &mut HeaderMap) {} -} - -#[allow(unused_imports)] -pub(crate) use inner::{ - ParentContext, apply_parent, extract_parent_context, inject_into_request_headers, - inject_into_response_headers, make_client_span, make_server_span, record_response_status, -}; diff --git a/crates/partly-proxy-lib/src/otel/v0_27.rs b/crates/partly-proxy-lib/src/otel/v0_27.rs deleted file mode 100644 index 301dd74..0000000 --- a/crates/partly-proxy-lib/src/otel/v0_27.rs +++ /dev/null @@ -1,128 +0,0 @@ -//! OpenTelemetry 0.27 implementation. Mirrors the API in `mod.rs`. -//! -//! Imports use the version-suffixed crate renames (`opentelemetry_0_27`, -//! `opentelemetry_http_0_27`, …) declared in the workspace `Cargo.toml` -//! so multiple OTEL minors can coexist. - -use http::{HeaderMap, Method, StatusCode, Uri, Version}; -use opentelemetry_0_27::{ - Context, global, - trace::{Status, TraceContextExt}, -}; -use opentelemetry_http_0_27::{HeaderExtractor, HeaderInjector}; -use opentelemetry_semantic_conventions_0_27::attribute::{ - CLIENT_ADDRESS, CLIENT_PORT, HTTP_REQUEST_METHOD, HTTP_RESPONSE_STATUS_CODE, HTTP_ROUTE, - NETWORK_PROTOCOL_VERSION, SERVER_ADDRESS, SERVER_PORT, URL_FULL, URL_PATH, URL_QUERY, - URL_SCHEME, USER_AGENT_ORIGINAL, -}; -use tracing::Span; -use tracing_opentelemetry_0_28::OpenTelemetrySpanExt; - -use super::ServerSpanInputs; - -const PARTLY_PROXY_UPSTREAM: &str = "partly.proxy.upstream"; - -/// Parent context extracted from inbound headers. Round-tripped through -/// `apply_parent` rather than handed to callers as an `opentelemetry` -/// type to keep the surface in `mod.rs` version-agnostic. -pub(crate) struct ParentContext(Context); - -impl Default for ParentContext { - fn default() -> Self { - Self(Context::new()) - } -} - -pub(crate) fn extract_parent_context(headers: &HeaderMap) -> ParentContext { - let cx = global::get_text_map_propagator(|prop| prop.extract(&HeaderExtractor(headers))); - ParentContext(cx) -} - -pub(crate) fn make_server_span(inputs: &ServerSpanInputs<'_>) -> Span { - let display_name = format!("{} {}", inputs.method.as_str(), inputs.upstream_name); - let span = tracing::info_span!( - "http.server.request", - otel.name = %display_name, - otel.kind = "server", - ); - - span.set_attribute(HTTP_REQUEST_METHOD, inputs.method.as_str().to_owned()); - span.set_attribute(HTTP_ROUTE, inputs.upstream_name.to_owned()); - span.set_attribute(URL_PATH, inputs.uri.path().to_owned()); - if let Some(q) = inputs.uri.query() { - span.set_attribute(URL_QUERY, q.to_owned()); - } - span.set_attribute(URL_SCHEME, inputs.scheme); - span.set_attribute(SERVER_ADDRESS, inputs.bind_addr.ip().to_string()); - span.set_attribute(SERVER_PORT, i64::from(inputs.bind_addr.port())); - span.set_attribute(CLIENT_ADDRESS, inputs.peer.ip().to_string()); - span.set_attribute(CLIENT_PORT, i64::from(inputs.peer.port())); - if let Some(ua) = inputs.user_agent { - span.set_attribute(USER_AGENT_ORIGINAL, ua.to_owned()); - } - if let Some(v) = http_version_str(inputs.version) { - span.set_attribute(NETWORK_PROTOCOL_VERSION, v); - } - span.set_attribute(PARTLY_PROXY_UPSTREAM, inputs.upstream_name.to_owned()); - - span -} - -pub(crate) fn apply_parent(span: &Span, parent: ParentContext) { - if parent.0.span().span_context().is_valid() { - span.set_parent(parent.0); - } -} - -pub(crate) fn inject_into_response_headers(span: &Span, headers: &mut HeaderMap) { - let cx = span.context(); - global::get_text_map_propagator(|prop| { - prop.inject_context(&cx, &mut HeaderInjector(headers)); - }); -} - -pub(crate) fn record_response_status(span: &Span, status: StatusCode) { - span.set_attribute(HTTP_RESPONSE_STATUS_CODE, i64::from(status.as_u16())); - if status.is_server_error() { - span.set_status(Status::error( - status.canonical_reason().unwrap_or("error").to_owned(), - )); - } -} - -pub(crate) fn make_client_span(method: &Method, uri: &Uri, upstream_name: &str) -> Span { - let display_name = method.as_str().to_owned(); - let span = tracing::info_span!( - "http.client.request", - otel.name = %display_name, - otel.kind = "client", - ); - span.set_attribute(HTTP_REQUEST_METHOD, method.as_str().to_owned()); - span.set_attribute(URL_FULL, uri.to_string()); - if let Some(authority) = uri.authority() { - span.set_attribute(SERVER_ADDRESS, authority.host().to_owned()); - if let Some(port) = authority.port_u16() { - span.set_attribute(SERVER_PORT, i64::from(port)); - } - } - span.set_attribute(PARTLY_PROXY_UPSTREAM, upstream_name.to_owned()); - span -} - -pub(crate) fn inject_into_request_headers(span: &Span, headers: &mut HeaderMap) { - let cx = span.context(); - global::get_text_map_propagator(|prop| { - prop.inject_context(&cx, &mut HeaderInjector(headers)); - }); -} - -fn http_version_str(v: Version) -> Option<&'static str> { - match v { - Version::HTTP_09 => Some("0.9"), - Version::HTTP_10 => Some("1.0"), - Version::HTTP_11 => Some("1.1"), - Version::HTTP_2 => Some("2"), - Version::HTTP_3 => Some("3"), - _ => None, - } -} diff --git a/crates/partly-proxy-lib/src/upstream.rs b/crates/partly-proxy-lib/src/upstream.rs index e38bbab..8096c84 100644 --- a/crates/partly-proxy-lib/src/upstream.rs +++ b/crates/partly-proxy-lib/src/upstream.rs @@ -15,30 +15,6 @@ use crate::{ replay::ReplaySource, stub::StubStore, }; -/// OTEL fields cached on the runtime so the request path can build server -/// spans without re-reading `ProxyConfig`. Only present when an -/// `otel_0_*` feature is on. -#[cfg(feature = "_otel_any")] -#[derive(Clone)] -pub(crate) struct OtelRuntime { - pub bind_addr: std::net::SocketAddr, - pub scheme: &'static str, - pub extract: bool, - pub filter: Option, -} - -#[cfg(feature = "_otel_any")] -impl Default for OtelRuntime { - fn default() -> Self { - Self { - bind_addr: "0.0.0.0:0".parse().expect("static addr parses"), - scheme: "http", - extract: true, - filter: None, - } - } -} - /// Per-upstream runtime state shared with every accepted connection and with /// the command processor. pub(crate) struct UpstreamRuntime { @@ -59,9 +35,6 @@ pub(crate) struct UpstreamRuntime { pub mode: Mode, /// Called when a replay miss occurs to produce the response. pub replay_miss_handler: crate::builder::ReplayMissHandler, - /// OTEL-only fields. Populated via [`UpstreamRuntime::with_otel`]. - #[cfg(feature = "_otel_any")] - pub otel: OtelRuntime, } impl UpstreamRuntime { @@ -86,19 +59,9 @@ impl UpstreamRuntime { replay, mode, replay_miss_handler, - #[cfg(feature = "_otel_any")] - otel: OtelRuntime::default(), } } - /// Attach the listener-specific OTEL configuration. Called once per - /// upstream during `spawn_listener` after the bound address is known. - #[cfg(feature = "_otel_any")] - pub(crate) fn with_otel(mut self, otel: OtelRuntime) -> Self { - self.otel = otel; - self - } - /// Borrow a fresh `pause` receiver — every accept-loop task uses one to /// await resume signals on lifecycle stage 3. pub(crate) fn pause_receiver(&self) -> watch::Receiver { diff --git a/crates/partly-proxy-lib/tests/otel_v0_27.rs b/crates/partly-proxy-lib/tests/otel_v0_27.rs deleted file mode 100644 index c448472..0000000 --- a/crates/partly-proxy-lib/tests/otel_v0_27.rs +++ /dev/null @@ -1,398 +0,0 @@ -//! OpenTelemetry contract tests against the 0.27 stack. -//! -//! Builds: only with `--features otel_0_27`. The test installs the -//! `TraceContextPropagator`, a `TracerProvider` with an -//! `InMemorySpanExporter`, and a `tracing-opentelemetry` layer that -//! turns the proxy's `tracing::Span`s into recorded OTEL spans. -//! -//! Globals are set once per process (the OTEL `global::*` registries -//! are non-reentrant), so tests share a single exporter and serialise -//! through a `Mutex`. Each test resets the exporter before running. - -#![cfg(feature = "otel_0_27")] - -use std::{net::SocketAddr, sync::OnceLock, time::Duration}; - -use http::Method; -use opentelemetry_0_27::{ - global, - trace::{SpanKind, Status, TraceContextExt, TracerProvider as _}, -}; -use opentelemetry_sdk_0_27::{ - propagation::TraceContextPropagator, testing::trace::InMemorySpanExporter, - trace::TracerProvider, -}; -use partly_proxy_echo as echo; -use partly_proxy_lib::{ProxyClusterBuilder, ProxyConfig, UpstreamTarget}; -use serde_json::Value; -use tokio::{ - sync::{Mutex, MutexGuard}, - task::JoinHandle, -}; -use tracing_subscriber::layer::SubscriberExt; - -const TRACE_ID: &str = "0af7651916cd43dd8448eb211c80319c"; -const PARENT_SPAN_ID: &str = "b7ad6b7169203331"; - -struct Harness { - exporter: InMemorySpanExporter, - serial: Mutex<()>, -} - -static HARNESS: OnceLock = OnceLock::new(); - -fn init() -> &'static Harness { - HARNESS.get_or_init(|| { - global::set_text_map_propagator(TraceContextPropagator::new()); - - let exporter = InMemorySpanExporter::default(); - let provider = TracerProvider::builder() - .with_simple_exporter(exporter.clone()) - .build(); - let tracer = provider.tracer("partly-proxy-lib-test"); - let _ = global::set_tracer_provider(provider); - - let subscriber = tracing_subscriber::registry() - .with(tracing_opentelemetry_0_28::layer().with_tracer(tracer)); - // Ignore the error — another test binary or earlier installation - // may have already set the global default. The propagator and - // provider above are what we actually rely on. - let _ = tracing::subscriber::set_global_default(subscriber); - - Harness { - exporter, - serial: Mutex::new(()), - } - }) -} - -/// Acquire the serial lock and reset the in-memory exporter so each -/// scenario sees only its own spans. Async because the lock is a -/// `tokio::sync::Mutex` — required so we can hold it across the -/// request awaits without tripping `clippy::await_holding_lock`. -async fn scenario() -> (&'static Harness, MutexGuard<'static, ()>) { - let h = init(); - let guard = h.serial.lock().await; - h.exporter.reset(); - (h, guard) -} - -async fn spawn_echo() -> (SocketAddr, JoinHandle<()>) { - let (addr, listener) = echo::bind("127.0.0.1:0".parse().unwrap()).await.unwrap(); - let task = tokio::spawn(async move { - let _ = echo::serve(listener).await; - }); - (addr, task) -} - -fn http_client() -> reqwest::Client { - reqwest::Client::builder() - .no_proxy() - .timeout(Duration::from_secs(5)) - .build() - .unwrap() -} - -fn base_cfg(echo_addr: SocketAddr) -> ProxyConfig { - ProxyConfig::http( - "127.0.0.1:0".parse().unwrap(), - UpstreamTarget::new(format!("http://{echo_addr}")) - .with_connect_timeout(Duration::from_secs(2)) - .with_request_timeout(Duration::from_secs(5)), - ) -} - -fn server_spans( - spans: &[opentelemetry_sdk_0_27::export::trace::SpanData], -) -> Vec<&opentelemetry_sdk_0_27::export::trace::SpanData> { - spans - .iter() - .filter(|s| s.span_kind == SpanKind::Server) - .collect() -} - -fn attr<'a>( - span: &'a opentelemetry_sdk_0_27::export::trace::SpanData, - key: &str, -) -> Option<&'a opentelemetry_0_27::Value> { - span.attributes - .iter() - .find(|kv| kv.key.as_str() == key) - .map(|kv| &kv.value) -} - -#[tokio::test] -async fn inbound_traceparent_is_extracted() { - let (h, _g) = scenario().await; - let (echo_addr, _t) = spawn_echo().await; - let cluster = ProxyClusterBuilder::new() - .add_upstream("api", base_cfg(echo_addr)) - .run() - .await - .unwrap(); - let proxy = cluster.addr("api").unwrap(); - - let traceparent = format!("00-{TRACE_ID}-{PARENT_SPAN_ID}-01"); - let resp = http_client() - .get(format!("http://{proxy}/orders/42")) - .header("traceparent", &traceparent) - .send() - .await - .unwrap(); - assert_eq!(resp.status(), 200); - // Drain the body so the server-side `handle_request` future completes - // and the OTEL span gets exported before we read the exporter. - let _ = resp.bytes().await.unwrap(); - cluster.shutdown().await.unwrap(); - - let spans = h.exporter.get_finished_spans().unwrap(); - let server = server_spans(&spans); - let server = server.first().expect("server span recorded"); - - assert_eq!(server.span_context.trace_id().to_string(), TRACE_ID); - assert_eq!(server.parent_span_id.to_string(), PARENT_SPAN_ID); - assert_eq!( - attr(server, "http.route").and_then(opt_str), - Some("api".to_string()) - ); - assert_eq!( - attr(server, "http.request.method").and_then(opt_str), - Some("GET".to_string()) - ); - assert_eq!( - attr(server, "url.path").and_then(opt_str), - Some("/orders/42".to_string()) - ); -} - -#[tokio::test] -async fn response_carries_traceparent_matching_trace_id() { - let (_h, _g) = scenario().await; - let (echo_addr, _t) = spawn_echo().await; - let cluster = ProxyClusterBuilder::new() - .add_upstream("api", base_cfg(echo_addr)) - .run() - .await - .unwrap(); - let proxy = cluster.addr("api").unwrap(); - - let traceparent = format!("00-{TRACE_ID}-{PARENT_SPAN_ID}-01"); - let resp = http_client() - .get(format!("http://{proxy}/")) - .header("traceparent", &traceparent) - .send() - .await - .unwrap(); - - let resp_traceparent = resp - .headers() - .get("traceparent") - .and_then(|v| v.to_str().ok()) - .map(str::to_owned) - .expect("response carries traceparent"); - let _ = resp.bytes().await.unwrap(); - cluster.shutdown().await.unwrap(); - - let segments: Vec<&str> = resp_traceparent.split('-').collect(); - assert_eq!(segments.len(), 4, "well-formed traceparent"); - assert_eq!(segments[0], "00"); - assert_eq!(segments[1], TRACE_ID); - // span_id is the proxy's server span, not the caller's. - assert_ne!(segments[2], PARENT_SPAN_ID); -} - -#[tokio::test] -async fn outbound_injection_off_by_default() { - let (_h, _g) = scenario().await; - let (echo_addr, _t) = spawn_echo().await; - let cluster = ProxyClusterBuilder::new() - .add_upstream("api", base_cfg(echo_addr)) - .run() - .await - .unwrap(); - let proxy = cluster.addr("api").unwrap(); - - // No traceparent on the inbound side: if the proxy minted one, the - // upstream would still see it. We expect none, since propagation is - // off by default. - let body: Value = http_client() - .get(format!("http://{proxy}/")) - .send() - .await - .unwrap() - .json() - .await - .unwrap(); - cluster.shutdown().await.unwrap(); - - let echoed = echoed_header(&body, "traceparent"); - assert!( - echoed.is_none(), - "expected no traceparent forwarded upstream, got {echoed:?}" - ); -} - -#[tokio::test] -async fn outbound_injection_when_enabled() { - let (_h, _g) = scenario().await; - let (echo_addr, _t) = spawn_echo().await; - let cfg = base_cfg(echo_addr).with_otel_propagation_to_upstream(); - let cluster = ProxyClusterBuilder::new() - .add_upstream("api", cfg) - .run() - .await - .unwrap(); - let proxy = cluster.addr("api").unwrap(); - - let traceparent = format!("00-{TRACE_ID}-{PARENT_SPAN_ID}-01"); - let body: Value = http_client() - .get(format!("http://{proxy}/")) - .header("traceparent", &traceparent) - .send() - .await - .unwrap() - .json() - .await - .unwrap(); - cluster.shutdown().await.unwrap(); - - let echoed = echoed_header(&body, "traceparent").expect("upstream sees traceparent"); - let segments: Vec<&str> = echoed.split('-').collect(); - assert_eq!(segments.len(), 4); - assert_eq!(segments[1], TRACE_ID, "trace_id preserved across the proxy"); -} - -#[tokio::test] -async fn extraction_can_be_disabled() { - let (h, _g) = scenario().await; - let (echo_addr, _t) = spawn_echo().await; - let cfg = base_cfg(echo_addr).without_otel_extraction(); - let cluster = ProxyClusterBuilder::new() - .add_upstream("api", cfg) - .run() - .await - .unwrap(); - let proxy = cluster.addr("api").unwrap(); - - let traceparent = format!("00-{TRACE_ID}-{PARENT_SPAN_ID}-01"); - let resp = http_client() - .get(format!("http://{proxy}/")) - .header("traceparent", &traceparent) - .send() - .await - .unwrap(); - assert_eq!(resp.status(), 200); - let has_traceparent = resp.headers().get("traceparent").is_some(); - let _ = resp.bytes().await.unwrap(); - cluster.shutdown().await.unwrap(); - - assert!( - !has_traceparent, - "no traceparent injected when extraction is disabled" - ); - - let spans = h.exporter.get_finished_spans().unwrap(); - assert!( - server_spans(&spans).is_empty(), - "no server span recorded when extraction is disabled" - ); -} - -#[tokio::test] -async fn filter_skips_selected_requests() { - let (h, _g) = scenario().await; - let (echo_addr, _t) = spawn_echo().await; - let cfg = base_cfg(echo_addr) - .with_otel_filter(|_m: &Method, uri: &http::Uri| uri.path() != "/healthz"); - let cluster = ProxyClusterBuilder::new() - .add_upstream("api", cfg) - .run() - .await - .unwrap(); - let proxy = cluster.addr("api").unwrap(); - - let traceparent = format!("00-{TRACE_ID}-{PARENT_SPAN_ID}-01"); - let resp = http_client() - .get(format!("http://{proxy}/healthz")) - .header("traceparent", &traceparent) - .send() - .await - .unwrap(); - assert_eq!(resp.status(), 200); - let has_traceparent = resp.headers().get("traceparent").is_some(); - let _ = resp.bytes().await.unwrap(); - cluster.shutdown().await.unwrap(); - - assert!(!has_traceparent); - let spans = h.exporter.get_finished_spans().unwrap(); - assert!(server_spans(&spans).is_empty()); -} - -#[tokio::test] -async fn server_error_maps_to_error_status() { - let (h, _g) = scenario().await; - let (echo_addr, _t) = spawn_echo().await; - let cluster = ProxyClusterBuilder::new() - .add_upstream("api", base_cfg(echo_addr)) - .run() - .await - .unwrap(); - let proxy = cluster.addr("api").unwrap(); - - let resp = http_client() - .get(format!("http://{proxy}/_status/500")) - .send() - .await - .unwrap(); - assert_eq!(resp.status(), 500); - let _ = resp.bytes().await.unwrap(); - cluster.shutdown().await.unwrap(); - - let spans = h.exporter.get_finished_spans().unwrap(); - let server = server_spans(&spans); - let server = server - .first() - .expect("server span recorded for 5xx response"); - assert!( - matches!(server.status, Status::Error { .. }), - "expected Status::Error, got {:?}", - server.status - ); - assert_eq!( - attr(server, "http.response.status_code").and_then(opt_i64), - Some(500) - ); -} - -fn echoed_header(body: &Value, name: &str) -> Option { - body.get("headers")?.as_array()?.iter().find_map(|kv| { - let arr = kv.as_array()?; - let key = arr.first()?.as_str()?; - if key.eq_ignore_ascii_case(name) { - arr.get(1)?.as_str().map(str::to_owned) - } else { - None - } - }) -} - -fn opt_str(v: &opentelemetry_0_27::Value) -> Option { - match v { - opentelemetry_0_27::Value::String(s) => Some(s.to_string()), - _ => None, - } -} - -fn opt_i64(v: &opentelemetry_0_27::Value) -> Option { - match v { - opentelemetry_0_27::Value::I64(i) => Some(*i), - _ => None, - } -} - -// Silence unused-warning from the unused TraceContextExt import; it's -// needed only for trait disambiguation in some configurations. -#[allow(dead_code)] -fn _trace_context_ext_in_scope(cx: &opentelemetry_0_27::Context) -> bool { - cx.span().span_context().is_valid() -} diff --git a/ts-client/package-lock.json b/ts-client/package-lock.json index 449e584..44c1b48 100644 --- a/ts-client/package-lock.json +++ b/ts-client/package-lock.json @@ -1,12 +1,12 @@ { "name": "@partly/proxy-client", - "version": "0.2.0", + "version": "0.3.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@partly/proxy-client", - "version": "0.2.0", + "version": "0.3.0", "license": "MIT OR Apache-2.0", "devDependencies": { "@types/node": "^22", diff --git a/ts-client/package.json b/ts-client/package.json index ef02d51..e761de1 100644 --- a/ts-client/package.json +++ b/ts-client/package.json @@ -1,6 +1,6 @@ { "name": "@partly/proxy-client", - "version": "0.2.0", + "version": "0.3.0", "description": "TypeScript client for the partly-proxy-lib JSON-Lines TCP control plane", "license": "MIT OR Apache-2.0", "type": "module",