Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ resolver = "2"
members = ["crates/*"]

[workspace.package]
version = "0.1.0"
version = "0.2.0"
edition = "2024"
license = "MIT OR Apache-2.0"
repository = "https://github.com/partly/api-proxy"
Expand All @@ -16,9 +16,9 @@ rust-version = "1.85"
# - `cargo publish` / `cargo release` use `version` from the registry.
# Keep these versions in lock-step with `workspace.package.version` above;
# `release.toml` (shared-version = true) enforces that on release.
partly-proxy-types = { version = "0.1.0", path = "crates/partly-proxy-types" }
partly-proxy-storage-jsonl = { version = "0.1.0", path = "crates/partly-proxy-storage-jsonl" }
partly-proxy-storage-sqlite = { version = "0.1.0", path = "crates/partly-proxy-storage-sqlite" }
partly-proxy-types = { version = "0.2.0", path = "crates/partly-proxy-types" }
partly-proxy-storage-jsonl = { version = "0.2.0", path = "crates/partly-proxy-storage-jsonl" }
partly-proxy-storage-sqlite = { version = "0.2.0", path = "crates/partly-proxy-storage-sqlite" }
# `partly-proxy-echo` is `publish = false`; only consumed as a dev-dep
# inside the workspace. Path-only is fine since dev-deps without a
# version are stripped from the published manifest.
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ to them.
| Plain HTTP listener + forwarder (hyper 1.x) | HTTP/1.1 + HTTP/2 auto-negotiation |
| Inbound + outbound TLS (rustls) | Custom CAs and `accept_invalid_certs` for testing |
| Recorder + pluggable snapshot storage | NDJSON / SQLite |
| Replay (`MethodPathAndBodyHash` + `Custom`) | O(1) indexed lookup; goes through redaction hooks |
| Replay (`MethodUriAndBodyHash` + `Custom`) | O(1) indexed lookup; goes through redaction hooks |
| Middleware chain with `Next<'_>` | Body rewrites, short-circuit, error recovery |
| Stubs + in-process command plane | Fire-count, artificial delay, pause/resume |
| TCP JSON-Lines control plane | Same command set, cross-language harnesses |
Expand Down
14 changes: 7 additions & 7 deletions SPECIFICATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ pub trait ProxyMiddleware: Send + Sync + 'static {
/// 1. Just before an exchange is persisted (recording).
/// 2. Just before a live request is looked up in a replay source.
/// Because the same transform runs on both sides, the body hash matches
/// and `MethodPathAndBodyHash` lookups still succeed.
/// and `MethodUriAndBodyHash` lookups still succeed.
/// Default: no-op.
fn redact_request_for_snapshot(&self, _req: &mut ProxyRequest) {}

Expand Down Expand Up @@ -320,7 +320,7 @@ Both calls are infallible by design — they are pure rewrites, not policy decis

#### Invariants the caller must preserve

For `MethodPathAndBodyHash` to keep matching, redaction must be **deterministic**: the same input bytes must produce the same output bytes every time, regardless of clock, randomness, or process. Two practical rules follow:
For `MethodUriAndBodyHash` to keep matching, redaction must be **deterministic**: the same input bytes must produce the same output bytes every time, regardless of clock, randomness, or process. Two practical rules follow:

- Don't introduce non-determinism (random tokens, timestamps, generated IDs) — replace them with a fixed placeholder.
- Apply the redaction transform consistently across versions; changing the transform is equivalent to invalidating all prior snapshots indexed by body hash.
Expand Down Expand Up @@ -386,26 +386,26 @@ A stub matches a request when **all** of the following hold (any unset field is
A `ReplaySource` is an immutable snapshot of recorded exchanges with a chosen match strategy:

```rust
let replay = ReplaySource::new(exchanges, MatchStrategy::MethodPathAndBodyHash);
let replay = ReplaySource::new(exchanges, MatchStrategy::MethodUriAndBodyHash);
// or
let replay = ReplaySource::from_jsonl(path, MatchStrategy::MethodPathAndBodyHash)?;
let replay = ReplaySource::from_jsonl(path, MatchStrategy::MethodUriAndBodyHash)?;
```

### 8.1 Match strategies

| Strategy | Key | Notes |
|----------|-----|-------|
| `MethodPathAndBodyHash` (default) | (method, path, SHA-256 hex of body) | Distinguishes identical endpoints called with different payloads |
| `MethodUriAndBodyHash` (default) | (method, origin-form URI [path + query string], SHA-256 hex of body) | Distinguishes identical endpoints called with different query parameters or payloads. Query-string bytes are compared verbatim; reordered parameters are treated as a different request. |
| `Custom(closure)` | arbitrary | `Fn(&RecordedRequest, &Request<Bytes>) -> bool` — falls back to linear scan |

`MethodPathAndBodyHash` builds an index at construction time for O(1) lookup. `Custom` is the only other supported strategy; coarser keys (method-only, method+path, method+URI) are intentionally not provided — callers who want those semantics express them as a `Custom` closure.
`MethodUriAndBodyHash` builds an index at construction time for O(1) lookup. `Custom` is the only other supported strategy; coarser keys (method-only, method+path) and normalised variants (query-parameter canonicalisation, method+URI ignoring query) are intentionally not provided — callers who want those semantics express them as a `Custom` closure.

### 8.1.1 Scale target

Replay must remain usable with snapshot files containing **10,000 to 100,000 exchanges** — these are realistic sizes for a recorded end-to-end suite, not a worst case to be discouraged. Concretely:

- `ReplaySource::from_jsonl(...)` parses a 100k-line file in a single pass; it does not hold the whole file in a `String` and must stream line-by-line (e.g. `BufReader::lines`) to keep peak memory bounded by the largest single exchange, not the file size.
- Index construction for `MethodPathAndBodyHash` is O(n) in the number of exchanges; lookup remains O(1) per request regardless of snapshot size. Hash-map capacity should be preallocated from the exchange count to avoid repeated rehashing during load.
- Index construction for `MethodUriAndBodyHash` is O(n) in the number of exchanges; lookup remains O(1) per request regardless of snapshot size. Hash-map capacity should be preallocated from the exchange count to avoid repeated rehashing during load.
- `Custom` matchers fall back to a linear scan, which is O(n) per request. With a 100k-exchange snapshot this is the slow path; use it sparingly or pre-filter via the upstream/path before invoking the custom predicate.
- Memory budget at 100k exchanges with typical JSON payloads (~1–4 KiB body each) is on the order of hundreds of MiB. The proxy keeps decoded `Bytes` bodies in the source verbatim — there is no per-exchange duplication into the recorder unless `Replay + recording` is enabled.

Expand Down
129 changes: 99 additions & 30 deletions crates/partly-proxy-lib/src/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
//! A `ReplaySource` is an immutable bundle of recorded exchanges plus a
//! match strategy. The two supported strategies (per §8.1) are:
//!
//! - `MethodPathAndBodyHash` — O(1) hash-indexed lookup, built once at
//! construction. The default.
//! - `MethodUriAndBodyHash` — O(1) hash-indexed lookup, built once at
//! construction. Keys on method, origin-form URI (path + query string),
//! and the body SHA-256. The default.
//! - `Custom(closure)` — linear scan with a user predicate. Use sparingly
//! on large snapshots (§8.1.1).
//!
Expand Down Expand Up @@ -42,9 +43,17 @@ pub type CustomMatcher = Arc<dyn Fn(&RecordedRequest, &ProxyRequest) -> bool + S
/// Match strategy for a [`ReplaySource`].
#[derive(Clone, Default)]
pub enum MatchStrategy {
/// `(method, uri.path(), sha256_hex(body))`. Hash-indexed; O(1) lookup.
/// `(method, uri.path_and_query(), sha256_hex(body))`. Hash-indexed;
/// O(1) lookup.
///
/// Includes the query string so APIs that pass their data in query
/// parameters with an empty body (e.g. `GET /vehicle?plate=ABC123`)
/// match per-request rather than collapsing to the first snapshot at
/// that path. Query-string bytes are compared verbatim — parameter
/// reordering is treated as a different request; callers needing
/// canonicalisation should use [`MatchStrategy::Custom`].
#[default]
MethodPathAndBodyHash,
MethodUriAndBodyHash,
/// User-supplied predicate; falls back to a linear scan over every
/// exchange. The closure is invoked with the on-disk
/// [`RecordedRequest`] and the live (already-redacted) [`ProxyRequest`].
Expand All @@ -54,13 +63,13 @@ pub enum MatchStrategy {
impl std::fmt::Debug for MatchStrategy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::MethodPathAndBodyHash => f.write_str("MethodPathAndBodyHash"),
Self::MethodUriAndBodyHash => f.write_str("MethodUriAndBodyHash"),
Self::Custom(_) => f.write_str("Custom(<closure>)"),
}
}
}

/// Key used by `MethodPathAndBodyHash`: (method, path, body sha-256 hex).
/// Key used by `MethodUriAndBodyHash`: (method, path+query, body sha-256 hex).
type IndexKey = (String, String, String);

/// Cheap-to-clone replay source. Behind an `Arc`, so several listeners can
Expand All @@ -73,7 +82,7 @@ pub struct ReplaySource {
struct ReplaySourceInner {
strategy: MatchStrategy,
exchanges: Vec<RecordedExchange>,
/// Populated only for `MethodPathAndBodyHash`. Maps the lookup key to an
/// Populated only for `MethodUriAndBodyHash`. Maps the lookup key to an
/// index into `exchanges`. The *first* exchange written for a given key
/// wins on collision — that way replay is deterministic across reloads.
index: HashMap<IndexKey, usize>,
Expand Down Expand Up @@ -136,7 +145,7 @@ impl ReplaySource {
/// over any storage backend. Peak memory during construction is
/// bounded by the largest single exchange — the stream is consumed
/// one item at a time, then the assembled `Vec` feeds the existing
/// `build_index` for O(1) `MethodPathAndBodyHash` lookups.
/// `build_index` for O(1) `MethodUriAndBodyHash` lookups.
pub async fn from_storage(
storage: &dyn SnapshotStorage,
strategy: MatchStrategy,
Expand Down Expand Up @@ -171,10 +180,10 @@ impl ReplaySource {
let mut redacted = req.clone();
middleware::redact_request(chain, &mut redacted);
let matched = match &self.inner.strategy {
MatchStrategy::MethodPathAndBodyHash => {
MatchStrategy::MethodUriAndBodyHash => {
let key = (
redacted.method.as_str().to_owned(),
redacted.uri.path().to_owned(),
path_and_query_of_uri(&redacted.uri),
sha256_hex(&redacted.body),
);
self.inner
Expand Down Expand Up @@ -206,40 +215,48 @@ fn build_index(
exchanges: &[RecordedExchange],
strategy: &MatchStrategy,
) -> HashMap<IndexKey, usize> {
if !matches!(strategy, MatchStrategy::MethodPathAndBodyHash) {
if !matches!(strategy, MatchStrategy::MethodUriAndBodyHash) {
return HashMap::new();
}
let mut index = HashMap::with_capacity(exchanges.len());
for (i, e) in exchanges.iter().enumerate() {
if !matches!(e.outcome, ExchangeOutcome::Response(_)) {
continue;
}
let path = path_of(&e.request.uri);
let path_and_query = path_and_query_of_str(&e.request.uri);
let key = (
e.request.method.clone(),
path,
path_and_query,
e.request.body_sha256.clone(),
);
index.entry(key).or_insert(i);
}
index
}

/// Extract the path component of a recorded URI string. Tolerant of both
/// origin-form (`/orders`) and absolute-form (`http://host/orders`).
fn path_of(uri: &str) -> String {
/// Extract path+query from a live `http::Uri`. Falls back to the path
/// alone when the URI has no `PathAndQuery` (e.g. asterisk-form `*`).
fn path_and_query_of_uri(uri: &http::Uri) -> String {
uri.path_and_query()
.map_or_else(|| uri.path().to_owned(), |pq| pq.as_str().to_owned())
}

/// Extract path+query from a recorded URI string. Tolerant of both
/// origin-form (`/orders?n=1`) and absolute-form
/// (`http://host/orders?n=1`).
fn path_and_query_of_str(uri: &str) -> String {
if let Ok(u) = uri.parse::<http::Uri>() {
return u.path().to_owned();
return path_and_query_of_uri(&u);
}
// Strip query, then strip scheme+authority if present.
let no_query = uri.split('?').next().unwrap_or(uri);
if let Some(idx) = no_query.find("://") {
if let Some(slash) = no_query[idx + 3..].find('/') {
return no_query[idx + 3 + slash..].to_owned();
// Tolerant fallback for non-spec-compliant recorded URIs: drop the
// scheme+authority prefix if present, keep path and query verbatim.
if let Some(idx) = uri.find("://") {
if let Some(slash) = uri[idx + 3..].find('/') {
return uri[idx + 3 + slash..].to_owned();
}
return "/".to_owned();
}
no_query.to_owned()
uri.to_owned()
}

fn build_header_map(pairs: &[(String, String)]) -> http::HeaderMap {
Expand Down Expand Up @@ -301,7 +318,7 @@ mod tests {
make_exchange(Method::GET, "/health", b"", 200),
make_exchange(Method::POST, "/orders", b"{\"n\":1}", 201),
],
MatchStrategy::MethodPathAndBodyHash,
MatchStrategy::MethodUriAndBodyHash,
);
let resp = src.lookup(&live(Method::GET, "/health", b""), &[]).unwrap();
assert_eq!(resp.status, StatusCode::OK);
Expand All @@ -319,7 +336,7 @@ mod tests {
make_exchange(Method::POST, "/orders", b"{\"n\":1}", 201),
make_exchange(Method::POST, "/orders", b"{\"n\":2}", 202),
],
MatchStrategy::MethodPathAndBodyHash,
MatchStrategy::MethodUriAndBodyHash,
);
let r1 = src
.lookup(&live(Method::POST, "/orders", b"{\"n\":1}"), &[])
Expand All @@ -335,7 +352,7 @@ mod tests {
fn hash_strategy_misses_when_method_or_path_or_body_differs() {
let src = ReplaySource::new(
vec![make_exchange(Method::GET, "/health", b"", 200)],
MatchStrategy::MethodPathAndBodyHash,
MatchStrategy::MethodUriAndBodyHash,
);
assert!(
src.lookup(&live(Method::POST, "/health", b""), &[])
Expand All @@ -348,6 +365,58 @@ mod tests {
);
}

#[test]
fn hash_strategy_distinguishes_by_query_string() {
// Regression: query-string-driven APIs (data in the query, empty
// body) used to collapse to the first snapshot at a given path
// because the lookup key dropped the query. The default strategy
// now includes path+query, so each distinct query string is its
// own entry.
let src = ReplaySource::new(
vec![
make_exchange(Method::GET, "/vehicle?plate=ABC123", b"", 200),
make_exchange(Method::GET, "/vehicle?plate=XYZ999", b"", 201),
],
MatchStrategy::MethodUriAndBodyHash,
);
let abc = src
.lookup(&live(Method::GET, "/vehicle?plate=ABC123", b""), &[])
.unwrap();
assert_eq!(abc.status, StatusCode::OK);
let xyz = src
.lookup(&live(Method::GET, "/vehicle?plate=XYZ999", b""), &[])
.unwrap();
assert_eq!(xyz.status, StatusCode::CREATED);
// Same path with no matching query string must miss, not fall
// back to either recorded entry.
assert!(
src.lookup(&live(Method::GET, "/vehicle?plate=OTHER", b""), &[])
.is_none()
);
assert!(
src.lookup(&live(Method::GET, "/vehicle", b""), &[])
.is_none()
);
}

#[test]
fn hash_strategy_tolerates_absolute_form_recorded_uri() {
// Live requests arrive in origin-form; recorder may store an
// absolute-form URI (`http://host/path?query`). The index must
// strip scheme+authority but keep the query.
let recorded = make_exchange(
Method::GET,
"http://api.example.com/vehicle?plate=ABC123",
b"",
200,
);
let src = ReplaySource::new(vec![recorded], MatchStrategy::MethodUriAndBodyHash);
let hit = src
.lookup(&live(Method::GET, "/vehicle?plate=ABC123", b""), &[])
.unwrap();
assert_eq!(hit.status, StatusCode::OK);
}

#[test]
fn replay_skips_error_outcomes() {
let req = RecordedRequest::from_parts(
Expand All @@ -364,7 +433,7 @@ mod tests {
},
Duration::from_millis(1),
);
let src = ReplaySource::new(vec![ex], MatchStrategy::MethodPathAndBodyHash);
let src = ReplaySource::new(vec![ex], MatchStrategy::MethodUriAndBodyHash);
assert!(src.lookup(&live(Method::GET, "/oops", b""), &[]).is_none());
}

Expand Down Expand Up @@ -426,7 +495,7 @@ mod tests {
.unwrap();
}

let src = ReplaySource::from_jsonl(&path, MatchStrategy::MethodPathAndBodyHash).unwrap();
let src = ReplaySource::from_jsonl(&path, MatchStrategy::MethodUriAndBodyHash).unwrap();
assert_eq!(src.len(), 3);
let resp = src.lookup(&live(Method::GET, "/n/1", b""), &[]).unwrap();
assert_eq!(resp.body, Bytes::from_static(b"body-1"));
Expand Down Expand Up @@ -466,7 +535,7 @@ mod tests {
let storage = MockStorage {
exchanges: exchanges.clone(),
};
let src = ReplaySource::from_storage(&storage, MatchStrategy::MethodPathAndBodyHash)
let src = ReplaySource::from_storage(&storage, MatchStrategy::MethodUriAndBodyHash)
.await
.unwrap();
assert_eq!(src.len(), 2);
Expand Down Expand Up @@ -496,7 +565,7 @@ mod tests {
))]))
}
}
let err = ReplaySource::from_storage(&BadStorage, MatchStrategy::MethodPathAndBodyHash)
let err = ReplaySource::from_storage(&BadStorage, MatchStrategy::MethodUriAndBodyHash)
.await
.unwrap_err();
assert!(matches!(err, ProxyError::Recording(_)));
Expand Down
Loading
Loading