diff --git a/Cargo.lock b/Cargo.lock index 66390fd209..6d71615b57 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3448,12 +3448,16 @@ name = "iceberg-catalog-rest" version = "0.9.0" dependencies = [ "async-trait", + "bytes", "chrono", "http 1.4.0", "iceberg", "iceberg_test_utils", "itertools 0.13.0", "mockito", + "reqsign-aws-v4", + "reqsign-core", + "reqsign-file-read-tokio", "reqwest", "serde", "serde_derive", @@ -5478,6 +5482,60 @@ dependencies = [ "tokio", ] +[[package]] +name = "reqsign-aws-v4" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44eaca382e94505a49f1a4849658d153aebf79d9c1a58e5dd3b10361511e9f43" +dependencies = [ + "anyhow", + "bytes", + "form_urlencoded", + "http 1.4.0", + "log", + "percent-encoding", + "quick-xml 0.39.2", + "reqsign-core", + "rust-ini", + "serde", + "serde_json", + "serde_urlencoded", + "sha1", +] + +[[package]] +name = "reqsign-core" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b10302cf0a7d7e7352ba211fc92c3c5bebf1286153e49cc5aa87348078a8e102" +dependencies = [ + "anyhow", + "base64", + "bytes", + "form_urlencoded", + "futures", + "hex", + "hmac", + "http 1.4.0", + "jiff", + "log", + "percent-encoding", + "sha1", + "sha2", + "windows-sys 0.61.2", +] + +[[package]] +name = "reqsign-file-read-tokio" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2d89295b3d17abea31851cc8de55d843d89c52132c864963c38d41920613dc5" +dependencies = [ + "anyhow", + "reqsign-core", + "tokio", +] + [[package]] name = "reqwest" version = "0.12.28" diff --git a/Cargo.toml b/Cargo.toml index 2f5a515ef0..0040061b89 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -110,6 +110,9 @@ pilota = "0.11.10" pretty_assertions = "1.4" rand = "0.9.3" regex = "1.11.3" +reqsign-aws-v4 = "3.0.0" +reqsign-core = "3.0.0" +reqsign-file-read-tokio = "3.0.0" reqwest = { version = "0.12.12", default-features = false, features = ["json"] } roaring = { version = "0.11" } rstest = "0.26" diff --git a/crates/catalog/rest/Cargo.toml b/crates/catalog/rest/Cargo.toml index 40dd70a952..23d045a4b0 100644 --- a/crates/catalog/rest/Cargo.toml +++ b/crates/catalog/rest/Cargo.toml @@ -30,10 +30,14 @@ repository = { workspace = true } [dependencies] async-trait = { workspace = true } +bytes = { workspace = true } chrono = { workspace = true } http = { workspace = true } iceberg = { workspace = true } itertools = { workspace = true } +reqsign-aws-v4 = { workspace = true } +reqsign-core = { workspace = true } +reqsign-file-read-tokio = { workspace = true } reqwest = { workspace = true } serde = { workspace = true } serde_derive = { workspace = true } diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 7d5df24d52..80a6bd100d 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -40,6 +40,7 @@ use typed_builder::TypedBuilder; use crate::client::{ HttpClient, deserialize_catalog_response, deserialize_unexpected_catalog_error, }; +use crate::signing::HttpRequestSigner; use crate::types::{ CatalogConfig, CommitTableRequest, CommitTableResponse, CreateNamespaceRequest, CreateTableRequest, ListNamespaceResponse, ListTablesResponse, LoadTableResult, @@ -52,6 +53,12 @@ pub const REST_CATALOG_PROP_URI: &str = "uri"; pub const REST_CATALOG_PROP_WAREHOUSE: &str = "warehouse"; /// Disable header redaction in error logs (defaults to false for security) pub const REST_CATALOG_PROP_DISABLE_HEADER_REDACTION: &str = "disable-header-redaction"; +/// Enable AWS SigV4 signing for REST catalog requests +pub const REST_CATALOG_PROP_SIGV4_ENABLED: &str = "rest.sigv4-enabled"; +/// The AWS service name to use for SigV4 signing (e.g. "s3", "execute-api") +pub const REST_CATALOG_PROP_SIGNING_NAME: &str = "rest.signing-name"; +/// The AWS region to use for SigV4 signing (e.g. "us-east-1") +pub const REST_CATALOG_PROP_SIGNING_REGION: &str = "rest.signing-region"; const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1"; const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -62,6 +69,7 @@ const PATH_V1: &str = "v1"; pub struct RestCatalogBuilder { config: RestCatalogConfig, storage_factory: Option>, + signer: Option>, } impl Default for RestCatalogBuilder { @@ -75,10 +83,22 @@ impl Default for RestCatalogBuilder { client: None, }, storage_factory: None, + signer: None, } } } +impl RestCatalogBuilder { + /// Set a custom request signer for the REST catalog. + /// + /// This overrides the signer that would otherwise be built from + /// config properties (e.g. `rest.sigv4-enabled`). + pub fn with_signer(mut self, signer: Arc) -> Self { + self.signer = Some(signer); + self + } +} + impl CatalogBuilder for RestCatalogBuilder { type C = RestCatalog; @@ -123,7 +143,11 @@ impl CatalogBuilder for RestCatalogBuilder { "Catalog uri is required", )) } else { - Ok(RestCatalog::new(self.config, self.storage_factory)) + Ok(RestCatalog::new( + self.config, + self.storage_factory, + self.signer, + )) } }; @@ -212,6 +236,11 @@ impl RestCatalogConfig { self.client.clone() } + /// Build a request signer from the config, or `None` if signing is not enabled. + pub(crate) fn signer(&self) -> Result>> { + self.try_into() + } + /// Get the token from the config. /// /// The client can use this token to send requests. @@ -330,6 +359,28 @@ impl RestCatalogConfig { self.props = props; self } + + /// Check if SigV4 signing is enabled. + pub(crate) fn sigv4_enabled(&self) -> bool { + self.props + .get(REST_CATALOG_PROP_SIGV4_ENABLED) + .map(|v| v.eq_ignore_ascii_case("true")) + .unwrap_or(false) + } + + /// Get the signing region from the config. + pub(crate) fn signing_region(&self) -> Option<&str> { + self.props + .get(REST_CATALOG_PROP_SIGNING_REGION) + .map(|s| s.as_str()) + } + + /// Get the signing name from the config. + pub(crate) fn signing_name(&self) -> Option<&str> { + self.props + .get(REST_CATALOG_PROP_SIGNING_NAME) + .map(|s| s.as_str()) + } } #[derive(Debug)] @@ -351,15 +402,22 @@ pub struct RestCatalog { ctx: OnceCell, /// Storage factory for creating FileIO instances. storage_factory: Option>, + /// Optional custom request signer. + signer: Option>, } impl RestCatalog { /// Creates a `RestCatalog` from a [`RestCatalogConfig`]. - fn new(config: RestCatalogConfig, storage_factory: Option>) -> Self { + fn new( + config: RestCatalogConfig, + storage_factory: Option>, + signer: Option>, + ) -> Self { Self { user_config: config, ctx: OnceCell::new(), storage_factory, + signer, } } @@ -396,7 +454,7 @@ impl RestCatalog { async fn context(&self) -> Result<&RestContext> { self.ctx .get_or_try_init(|| async { - let client = HttpClient::new(&self.user_config)?; + let client = HttpClient::new(&self.user_config, self.signer.clone())?; let catalog_config = RestCatalog::load_config(&client, &self.user_config).await?; let config = self.user_config.clone().merge_with_config(catalog_config); let client = client.update_with(&config)?; @@ -1099,6 +1157,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + None, ); assert_eq!( @@ -1173,6 +1232,7 @@ mod tests { .props(props) .build(), Some(Arc::new(LocalFsStorageFactory)), + None, ); let token = catalog.context().await.unwrap().client.token().await; @@ -1220,6 +1280,7 @@ mod tests { .props(props) .build(), Some(Arc::new(LocalFsStorageFactory)), + None, ); let token = catalog.context().await.unwrap().client.token().await; @@ -1244,6 +1305,7 @@ mod tests { .props(props) .build(), Some(Arc::new(LocalFsStorageFactory)), + None, ); let token = catalog.context().await.unwrap().client.token().await; @@ -1275,6 +1337,7 @@ mod tests { .props(props) .build(), Some(Arc::new(LocalFsStorageFactory)), + None, ); let token = catalog.context().await.unwrap().client.token().await; @@ -1306,6 +1369,7 @@ mod tests { .props(props) .build(), Some(Arc::new(LocalFsStorageFactory)), + None, ); let token = catalog.context().await.unwrap().client.token().await; @@ -1337,6 +1401,7 @@ mod tests { .props(props) .build(), Some(Arc::new(LocalFsStorageFactory)), + None, ); let token = catalog.context().await.unwrap().client.token().await; @@ -1450,6 +1515,7 @@ mod tests { .props(props) .build(), Some(Arc::new(LocalFsStorageFactory)), + None, ); let token = catalog.context().await.unwrap().client.token().await; @@ -1497,6 +1563,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + None, ); let _namespaces = catalog.list_namespaces(None).await.unwrap(); @@ -1527,6 +1594,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + None, ); let namespaces = catalog.list_namespaces(None).await.unwrap(); @@ -1578,6 +1646,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + None, ); let namespaces = catalog.list_namespaces(None).await.unwrap(); @@ -1677,6 +1746,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + None, ); let namespaces = catalog.list_namespaces(None).await.unwrap(); @@ -1730,6 +1800,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + None, ); let namespaces = catalog @@ -1773,6 +1844,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + None, ); let namespaces = catalog @@ -1806,6 +1878,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + None, ); assert!( @@ -1834,6 +1907,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + None, ); catalog @@ -1874,6 +1948,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + None, ); let tables = catalog @@ -1942,6 +2017,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + None, ); let tables = catalog @@ -2073,6 +2149,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + None, ); let tables = catalog @@ -2117,6 +2194,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + None, ); catalog @@ -2146,6 +2224,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + None, ); assert!( @@ -2177,6 +2256,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + None, ); catalog @@ -2211,6 +2291,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + None, ); let table = catalog @@ -2328,6 +2409,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + None, ); let table = catalog @@ -2364,6 +2446,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + None, ); let table_creation = TableCreation::builder() @@ -2513,6 +2596,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + None, ); let table_creation = TableCreation::builder() @@ -2582,6 +2666,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + None, ); let table1 = { @@ -2725,6 +2810,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + None, ); let table1 = { @@ -2789,6 +2875,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + None, ); let table_ident = TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string()); @@ -2840,6 +2927,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + None, ); let table_ident = @@ -2907,4 +2995,29 @@ mod tests { assert_eq!(err.message(), "Catalog uri is required"); } } + + #[test] + fn test_sigv4_config_disabled_by_default() { + let config = RestCatalogConfig::builder() + .uri("https://example.com".to_string()) + .build(); + assert!(!config.sigv4_enabled()); + assert_eq!(config.signing_name(), None); + assert_eq!(config.signing_region(), None); + } + + #[test] + fn test_sigv4_config_enabled() { + let config = RestCatalogConfig::builder() + .uri("https://example.com".to_string()) + .props(HashMap::from([ + ("rest.sigv4-enabled".to_string(), "true".to_string()), + ("rest.signing-name".to_string(), "execute-api".to_string()), + ("rest.signing-region".to_string(), "us-east-1".to_string()), + ])) + .build(); + assert!(config.sigv4_enabled()); + assert_eq!(config.signing_name(), Some("execute-api")); + assert_eq!(config.signing_region(), Some("us-east-1")); + } } diff --git a/crates/catalog/rest/src/client.rs b/crates/catalog/rest/src/client.rs index 07dc0620da..67364d194e 100644 --- a/crates/catalog/rest/src/client.rs +++ b/crates/catalog/rest/src/client.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; use std::fmt::{Debug, Formatter}; +use std::sync::Arc; use http::StatusCode; use iceberg::{Error, ErrorKind, Result}; @@ -26,6 +27,7 @@ use serde::de::DeserializeOwned; use tokio::sync::Mutex; use crate::RestCatalogConfig; +use crate::signing::HttpRequestSigner; use crate::types::{ErrorResponse, TokenResponse}; pub(crate) struct HttpClient { @@ -45,6 +47,8 @@ pub(crate) struct HttpClient { extra_oauth_params: HashMap, /// Whether to disable header redaction in error logs (defaults to false for security). disable_header_redaction: bool, + /// Optional request signer applied to outgoing requests. + signer: Option>, } impl Debug for HttpClient { @@ -58,7 +62,10 @@ impl Debug for HttpClient { impl HttpClient { /// Create a new http client. - pub fn new(cfg: &RestCatalogConfig) -> Result { + pub fn new( + cfg: &RestCatalogConfig, + signer_override: Option>, + ) -> Result { let extra_headers = cfg.extra_headers()?; Ok(HttpClient { client: cfg.client().unwrap_or_default(), @@ -68,6 +75,7 @@ impl HttpClient { extra_headers, extra_oauth_params: cfg.extra_oauth_params(), disable_header_redaction: cfg.disable_header_redaction(), + signer: signer_override.or(cfg.signer()?), }) } @@ -96,6 +104,7 @@ impl HttpClient { self.extra_oauth_params }, disable_header_redaction: cfg.disable_header_redaction(), + signer: self.signer.or(cfg.signer()?), }) } @@ -253,6 +262,9 @@ impl HttpClient { /// Executes the given `Request` and returns a `Response`. pub async fn execute(&self, mut request: Request) -> Result { request.headers_mut().extend(self.extra_headers.clone()); + if let Some(signer) = &self.signer { + request = signer.sign_request(request).await?; + } Ok(self.client.execute(request).await?) } diff --git a/crates/catalog/rest/src/lib.rs b/crates/catalog/rest/src/lib.rs index 6bee950970..fd86286a96 100644 --- a/crates/catalog/rest/src/lib.rs +++ b/crates/catalog/rest/src/lib.rs @@ -53,6 +53,8 @@ mod catalog; mod client; +/// HTTP request signing for the REST catalog. +pub mod signing; mod types; pub use catalog::*; diff --git a/crates/catalog/rest/src/signing.rs b/crates/catalog/rest/src/signing.rs new file mode 100644 index 0000000000..e3c03c4286 --- /dev/null +++ b/crates/catalog/rest/src/signing.rs @@ -0,0 +1,263 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! HTTP request signing for the REST catalog. + +use std::fmt::Debug; +use std::sync::Arc; + +use async_trait::async_trait; +use iceberg::{Error, ErrorKind, Result}; +use reqsign_aws_v4::{Credential, DefaultCredentialProvider, RequestSigner}; +use reqsign_core::{Context, HttpSend, OsEnv, ProvideCredential, Signer}; +use reqsign_file_read_tokio::TokioFileRead; +use reqwest::{Client, Request}; + +use crate::RestCatalogConfig; + +/// A trait for signing HTTP requests. +#[async_trait] +pub trait HttpRequestSigner: Send + Sync + Debug { + /// Sign the request by modifying its headers (and potentially other parts). + async fn sign(&self, parts: &mut http::request::Parts) -> Result<()>; + + /// Sign a full [`reqwest::Request`] by converting it to [`http::request::Parts`] and back. + async fn sign_request(&self, mut request: Request) -> Result { + let (mut parts, _) = http::Request::builder() + .method(request.method().clone()) + .uri(request.url().as_str()) + .body(()) + .expect("request parts derived from a valid request") + .into_parts(); + // Compute the SHA256 hash of the request body for the content hash header. + // Some AWS services (e.g. Glue) reject UNSIGNED-PAYLOAD. + let body_bytes = request.body().and_then(|b| b.as_bytes()).unwrap_or(&[]); + parts.headers.insert( + http::HeaderName::from_static("x-amz-content-sha256"), + http::HeaderValue::from_str(&reqsign_core::hash::hex_sha256(body_bytes)) + .expect("hex string is valid header value"), + ); + self.sign(&mut parts).await?; + // Merge signing headers into the original request, preserving + // application headers (content-type, user-agent, etc.) that + // should not participate in signing. + request.headers_mut().extend(parts.headers); + Ok(request) + } +} + +/// Try to build a request signer from the config, or return `None` if signing is not enabled. +impl TryFrom<&RestCatalogConfig> for Option> { + type Error = Error; + + fn try_from(cfg: &RestCatalogConfig) -> Result { + if !cfg.sigv4_enabled() { + return Ok(None); + } + let signing_region = cfg.signing_region().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "'{}' is required when '{}' is true", + crate::REST_CATALOG_PROP_SIGNING_REGION, + crate::REST_CATALOG_PROP_SIGV4_ENABLED, + ), + ) + })?; + let signing_name = cfg.signing_name().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "'{}' is required when '{}' is true", + crate::REST_CATALOG_PROP_SIGNING_NAME, + crate::REST_CATALOG_PROP_SIGV4_ENABLED, + ), + ) + })?; + Ok(Some(Arc::new(SigV4Signer::new( + cfg.client().unwrap_or_default(), + signing_name, + signing_region, + )))) + } +} + +/// The HttpRequestSigner implementation for AWS SigV4 +#[derive(Debug)] +pub struct SigV4Signer { + /// The inner reqwest signer with an AWS credential loader. + inner: Signer, +} + +impl SigV4Signer { + /// Create a new SigV4 signer using the default AWS credential chain + /// (env vars, profiles, IMDS, etc.). + pub fn new(client: Client, service: &str, region: &str) -> Self { + Self::with_credential_provider(client, service, region, DefaultCredentialProvider::new()) + } + + /// Create a new SigV4 signer with a custom credential provider. + pub fn with_credential_provider( + client: Client, + service: &str, + region: &str, + credential_provider: impl ProvideCredential, + ) -> Self { + let ctx = Context::new() + .with_file_read(TokioFileRead) + .with_http_send(ReqwestHttpSend(client)) + .with_env(OsEnv); + let signer = RequestSigner::new(service, region); + Self { + inner: Signer::new(ctx, credential_provider, signer), + } + } +} + +#[async_trait] +impl HttpRequestSigner for SigV4Signer { + async fn sign(&self, parts: &mut http::request::Parts) -> Result<()> { + // Patch the URI for signing; reqsign-aws-v4 workaround. + let uri = parts.uri.clone(); + parts.uri = patch_uri_for_signing(&uri)?; + // Sign with the patched URI. + self.inner.sign(parts, None).await.map_err(|e| { + Error::new(ErrorKind::Unexpected, "Failed to sign request with SigV4").with_source(e) + })?; + // Restore the original URI in the request; signing should only modify headers. + parts.uri = uri; + Ok(()) + } +} + +/// Pre-encode percent signs in the URI path for correct SigV4 canonical URI computation. +/// +/// Workaround for a bug in `reqsign-aws-v4` where `canonical_request_string` decodes +/// percent-encoded path segments then re-encodes them, losing the double-encoding that +/// AWS SigV4 requires. By replacing `%` with `%25` before signing, reqsign's +/// decode→reencode cycle produces the correct double-encoded form. +/// +/// TODO: remove once fixed upstream in apache/opendal (reqsign-aws-v4). +fn patch_uri_for_signing(uri: &http::Uri) -> Result { + let path = uri.path().replace('%', "%25"); + let paq = if let Some(query) = uri.query() { + format!("{path}?{query}") + } else { + path + }; + let mut parts = uri.clone().into_parts(); + parts.path_and_query = Some(paq.parse().map_err(|e| { + Error::new(ErrorKind::Unexpected, "failed to rebuild URI for signing").with_source(e) + })?); + http::Uri::from_parts(parts).map_err(|e| { + Error::new(ErrorKind::Unexpected, "failed to rebuild URI for signing").with_source(e) + }) +} + +/// Bridges reqwest 0.12 with `reqsign_core::HttpSend`. +/// +/// The published `reqsign-http-send-reqwest` crate requires reqwest >=0.13, +/// which is incompatible with the workspace, so we provide a minimal adapter. +#[derive(Debug)] +struct ReqwestHttpSend(Client); + +/// Implements `HttpSend` for a reqwest 0.12 client. +impl HttpSend for ReqwestHttpSend { + async fn http_send( + &self, + req: http::Request, + ) -> reqsign_core::Result> { + let req = Request::try_from(req).map_err(|e| { + reqsign_core::Error::unexpected("failed to convert request").with_source(e) + })?; + let resp = self.0.execute(req).await.map_err(|e| { + reqsign_core::Error::unexpected("failed to send request").with_source(e) + })?; + let status = resp.status(); + let headers = resp.headers().clone(); + let body = resp.bytes().await.map_err(|e| { + reqsign_core::Error::unexpected("failed to read response body").with_source(e) + })?; + let mut response = http::Response::builder() + .status(status) + .body(body) + .map_err(|e| { + reqsign_core::Error::unexpected("failed to build response").with_source(e) + })?; + *response.headers_mut() = headers; + Ok(response) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_patch_uri_simple_path_unchanged() { + let uri: http::Uri = "https://glue.us-east-1.amazonaws.com/iceberg/v1/namespaces" + .parse() + .unwrap(); + let patched = patch_uri_for_signing(&uri).unwrap(); + assert_eq!(patched.path(), "/iceberg/v1/namespaces"); + } + + #[test] + fn test_patch_uri_arn_path_double_encodes() { + // S3 Tables ARN path with percent-encoded colons + let uri: http::Uri = + "https://s3tables.us-east-1.amazonaws.com/iceberg/v1/arn%3Aaws%3As3tables%3Aus-east-1%3A123456789012%3Abucket/my-table/namespaces" + .parse() + .unwrap(); + let patched = patch_uri_for_signing(&uri).unwrap(); + assert_eq!( + patched.path(), + "/iceberg/v1/arn%253Aaws%253As3tables%253Aus-east-1%253A123456789012%253Abucket/my-table/namespaces" + ); + } + + #[test] + fn test_patch_uri_preserves_query() { + let uri: http::Uri = + "https://example.com/iceberg/v1/arn%3Aaws%3As3tables?pageToken=abc&pageSize=10" + .parse() + .unwrap(); + let patched = patch_uri_for_signing(&uri).unwrap(); + assert_eq!(patched.path(), "/iceberg/v1/arn%253Aaws%253As3tables"); + assert_eq!(patched.query(), Some("pageToken=abc&pageSize=10")); + } + + #[test] + fn test_patch_uri_preserves_authority() { + let uri: http::Uri = "https://s3tables.us-east-1.amazonaws.com/iceberg/v1/arn%3Aaws" + .parse() + .unwrap(); + let patched = patch_uri_for_signing(&uri).unwrap(); + assert_eq!( + patched.authority().unwrap().as_str(), + "s3tables.us-east-1.amazonaws.com" + ); + assert_eq!(patched.scheme_str(), Some("https")); + } + + #[test] + fn test_patch_uri_root_path() { + let uri: http::Uri = "https://example.com/".parse().unwrap(); + let patched = patch_uri_for_signing(&uri).unwrap(); + assert_eq!(patched.path(), "/"); + } +}