Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
690 changes: 394 additions & 296 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@ dns-lookup = "1.0.8"
env_logger = "0.9.0"
futures = "0.3.21"
futures-util = "0.3.21"
http-body-util = "0.1"
httparse = "1.7.1"
hyper = { version = "0.14.18", features = ["server", "client", "http1", "http2"] }
hyper-tls = "0.5.0"
hyper = { version = "1", features = ["server", "client", "http1", "http2"] }
hyper-tls = "0.6"
hyper-util = { version = "0.1", features = ["client-legacy", "http1", "http2", "server"] }
log = "0.4"
parking_lot = "0.12.0"
priority-queue = "1.2.1"
regex = "1.5.5"
serde = "1.0"
serde_derive = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_yaml = "0.8"
tokio = { version = ">=1.18.4", features = ["macros", "rt-multi-thread"] }
Expand Down
29 changes: 15 additions & 14 deletions src/http/request.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
use crate::priority_map::PriorityMap;
use base64::encode;
use bytes::Bytes;
use http_body_util::{BodyExt, Empty};
use hyper::header::AUTHORIZATION;
use hyper::{Client, HeaderMap};
use hyper::HeaderMap;
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Client;
use hyper_util::rt::TokioExecutor;
use serde_json::Value;
use std::net::IpAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;

pub struct Inner {
pub client: Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>, hyper::Body>,
pub client: Client<hyper_tls::HttpsConnector<HttpConnector>, Empty<Bytes>>,
pub headers: HeaderMap,
pub cache: RwLock<PriorityMap<String, Arc<Value>>>,
}
Expand All @@ -22,15 +27,15 @@ pub struct HttpRequest {
impl HttpRequest {
pub fn new(id: &str, password: &str, cache_capacity: usize, cache_duration: Duration) -> Self {
let mut headers = HeaderMap::new();
let encoded = encode(format!("{}:{}", id, password));
let encoded = encode(format!("{id}:{password}"));

headers.append(
AUTHORIZATION,
format!("Basic {}", encoded).parse().expect("should be ok"),
format!("Basic {encoded}").parse().expect("should be ok"),
);

let connector = hyper_tls::HttpsConnector::new();
let client = Client::builder().build(connector);
let client = Client::builder(TokioExecutor::new()).build(connector);

HttpRequest {
inner: Arc::new(Inner {
Expand All @@ -46,27 +51,23 @@ impl HttpRequest {
}

pub async fn lookup(&self, addr: &IpAddr) -> Result<Arc<Value>, ()> {
let addr_str = format!("{}", addr);
let addr_str = format!("{addr}");

if !self.inner.cache.read().await.contains_key(&addr_str) {
let body = Empty::new();
let mut req = hyper::Request::builder()
.method(hyper::Method::GET)
.uri(format!(
"https://geoip.maxmind.com/geoip/v2.1/city/{}",
addr_str
"https://geoip.maxmind.com/geoip/v2.1/city/{addr_str}"
))
.body(hyper::Body::empty())
.body(body)
.map_err(|_| ())?;

req.headers_mut()
.extend(self.inner.headers.clone().into_iter());

let res = self.inner.client.request(req).await.map_err(|_| ())?;

let bytes = hyper::body::to_bytes(res.into_body())
.await
.map_err(|_| ())?;

let bytes = res.into_body().collect().await.map_err(|_| ())?.to_bytes();
let json = serde_json::from_slice::<Value>(bytes.as_ref()).map_err(|_| ())?;

self.inner
Expand Down
52 changes: 27 additions & 25 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
#[macro_use]
extern crate serde_derive;

use std::env;
use std::io;
use std::net;
Expand All @@ -9,9 +6,11 @@ use std::time::Duration;

use dns_lookup::lookup_host;
use env_logger::Builder;
use hyper::server::conn::Http;
use hyper::{Client, Uri};
use hyper::Uri;
use hyper_tls::HttpsConnector;
use hyper_util::client::legacy::Client;
use hyper_util::rt::TokioExecutor;
use hyper_util::rt::TokioIo;
use log::LevelFilter;
use tokio::net::TcpListener;

Expand Down Expand Up @@ -63,9 +62,7 @@ async fn main() -> io::Result<()> {
TcpListener::bind((net::Ipv4Addr::new(0, 0, 0, 0), config.listener.port)).await?;

let https = HttpsConnector::new();
let client = Client::builder().build::<_, hyper::Body>(https);

let http = Http::new();
let client = Client::builder(TokioExecutor::new()).build(https);

while let Ok((stream, addr)) = listener.accept().await {
let client_hpr = client.clone();
Expand Down Expand Up @@ -94,24 +91,29 @@ async fn main() -> io::Result<()> {
Vec::new()
};

let http_proxy = http.serve_connection(
stream,
Proxy::new(
server_uri,
Some(source),
resolver,
client_hpr,
ip_inclusions,
maxmind_inclusions,
Some(exclusions),
config.server.forwarded_ip_header.clone(),
config.server.use_forwarded_ip_header_only,
),
);

tokio::spawn(http_proxy);
let forwarded_ip_header = config.server.forwarded_ip_header.clone();
let fut = async move {
let http =
hyper_util::server::conn::auto::Builder::new(hyper_util::rt::TokioExecutor::new());
let _ = http
.serve_connection(
TokioIo::new(stream),
Proxy::new(
server_uri,
Some(source),
resolver,
client_hpr,
ip_inclusions,
maxmind_inclusions,
Some(exclusions),
forwarded_ip_header,
config.server.use_forwarded_ip_header_only,
),
)
.await;
};
tokio::spawn(fut);
}

Ok(())
}

Expand Down
33 changes: 17 additions & 16 deletions src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ use std::net::IpAddr;
use std::pin::Pin;

use ::futures;
use futures::task::{Context, Poll};
use bytes::Bytes;
use futures::Future;
use hyper::client::HttpConnector;
use http_body_util::combinators::BoxBody;
use hyper::body::Incoming;
use hyper::service::Service;
use hyper::{Body, Client, Response, Uri};
use hyper::{Request, Response, Uri};
use hyper_tls::HttpsConnector;
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Client;
use log::error;

use crate::proxy::utils::*;
Expand All @@ -17,11 +20,13 @@ use crate::IpResolver;

pub mod utils;

type ResponseBody = BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;

pub struct Proxy {
pub upstream_uri: Uri,
pub source_ip: Option<IpAddr>,
pub resolver: IpResolver,
pub client: Client<HttpsConnector<HttpConnector>>,
pub client: Client<HttpsConnector<HttpConnector>, Incoming>,
pub ip_path_inclusions: Vec<UriPathMatcher>,
pub maxmind_path_inclusions: Vec<UriPathMatcher>,
pub path_exclusions: Option<Vec<UriPathMatcher>>,
Expand All @@ -35,7 +40,7 @@ impl Proxy {
upstream_uri: Uri,
source_ip: Option<IpAddr>,
resolver: IpResolver,
client: Client<HttpsConnector<HttpConnector>>,
client: Client<HttpsConnector<HttpConnector>, Incoming>,
ip_inclusions: Vec<String>,
maxmind_inclusions: Vec<String>,
exclusions: Option<Vec<String>>,
Expand All @@ -50,15 +55,15 @@ impl Proxy {
.iter()
.filter_map(|p| {
UriPathMatcher::new(p)
.map_err(|e| error!("Unable to construct included middleware route: {}", e))
.map_err(|e| error!("Unable to construct included middleware route: {e}"))
.ok()
})
.collect(),
maxmind_path_inclusions: maxmind_inclusions
.iter()
.filter_map(|p| {
UriPathMatcher::new(p)
.map_err(|e| error!("Unable to construct included middleware route: {}", e))
.map_err(|e| error!("Unable to construct included middleware route: {e}"))
.ok()
})
.collect(),
Expand All @@ -67,7 +72,7 @@ impl Proxy {
.filter_map(|p| {
UriPathMatcher::new(p)
.map_err(|e| {
error!("Unable to construct excluded middleware route: {}", e)
error!("Unable to construct excluded middleware route: {e}")
})
.ok()
})
Expand Down Expand Up @@ -120,16 +125,12 @@ impl Proxy {
}
}

impl Service<hyper::Request<hyper::Body>> for Proxy {
type Response = Response<Body>;
impl Service<Request<Incoming>> for Proxy {
type Response = Response<ResponseBody>;
type Error = StringError;
type Future = Pin<Box<dyn Future<Output = Result<Response<Body>, Self::Error>> + Send>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
type Future = Pin<Box<dyn Future<Output = Result<Response<ResponseBody>, Self::Error>> + Send>>;

fn call(&mut self, req: hyper::Request<hyper::Body>) -> Self::Future {
fn call(&self, req: Request<Incoming>) -> Self::Future {
let mut upstream_parts = self.upstream_uri.clone().into_parts();
upstream_parts.path_and_query = req.uri().path_and_query().cloned();

Expand Down
42 changes: 24 additions & 18 deletions src/proxy/utils.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
use hyper::client::HttpConnector;
use bytes::Bytes;
use http_body_util::{BodyExt, Full};
use hyper::body::Incoming;
use hyper::header::{HeaderName, HeaderValue};
use hyper::{Body, Client, HeaderMap, Request, Response, Uri};
use hyper::{HeaderMap, Request, Response, StatusCode, Uri};
use hyper_tls::HttpsConnector;
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Client;
use log::error;
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::str::FromStr;

use crate::IpResolver;

use super::ResponseBody;

const PRUX_ADDR: &str = "Prux-Addr";
const PRUX_CITY: &str = "Prux-City";
const PRUX_COUNTRY: &str = "Prux-Country";
Expand Down Expand Up @@ -64,7 +70,7 @@ pub async fn get_location_hdr(
loc.get("latitude").and_then(|l| l.as_f64()),
loc.get("longitude").and_then(|l| l.as_f64()),
) {
hdr_map.insert(PRUX_COORD.to_string(), format!("{},{}", lat, long));
hdr_map.insert(PRUX_COORD.to_string(), format!("{lat},{long}"));
}

if let Some(acc) = loc.get("accuracy_radius").and_then(|acc| acc.as_f64()) {
Expand Down Expand Up @@ -98,29 +104,29 @@ pub async fn get_location_hdr(
}

pub async fn gen_transmit_fut(
client: &Client<HttpsConnector<HttpConnector>>,
req: Request<Body>,
) -> Response<Body> {
client: &Client<HttpsConnector<HttpConnector>, Incoming>,
req: Request<Incoming>,
) -> Response<ResponseBody> {
match client.request(req).await {
Ok(response) => response,
Ok(response) => response.map(|b| b.map_err(Box::from).boxed()),
Err(e) => {
error!("hyper error: {}", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd keep the error log here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 9f56444.

let mut response =
Response::new(Body::from("Something went wrong, please try again later."));
let (mut parts, body) = response.into_parts();
parts.status = hyper::StatusCode::BAD_GATEWAY;
response = Response::from_parts(parts, body);

response
error!("hyper error: {e}");
let body = Full::new(Bytes::from("Something went wrong, please try again later."))
.map_err(Box::from)
.boxed();
Response::builder()
.status(StatusCode::BAD_GATEWAY)
.body(body)
.unwrap()
}
}
}

pub fn construct_request(
request: Request<Body>,
request: Request<Incoming>,
new_uri: Uri,
headers: Option<HashMap<String, String>>,
) -> Request<Body> {
) -> Request<Incoming> {
let mut request = request;
*request.uri_mut() = new_uri;

Expand Down Expand Up @@ -151,7 +157,7 @@ pub fn ip_is_global(ip: &IpAddr) -> bool {
}

pub fn get_forwarded_ip(
req: &Request<Body>,
req: &Request<Incoming>,
forwarded_ip_header: Option<&str>,
use_forwarded_ip_header_only: bool,
) -> Option<IpAddr> {
Expand Down
5 changes: 3 additions & 2 deletions src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use clap::{crate_name, crate_version, Arg, Command};
use config::{Config, ConfigError, Environment, File as ConfigFile};
use log::LevelFilter;
use serde::{Deserialize, Serialize};
use std::convert::Infallible;
use std::fs::File;
use std::io::Write;
Expand Down Expand Up @@ -185,7 +186,7 @@ impl Settings {
}
}
wrong => {
println!("Specified configuration format is invalid {}", wrong);
println!("Specified configuration format is invalid {wrong}");
::std::process::exit(1);
}
}
Expand All @@ -195,7 +196,7 @@ impl Settings {
use toml::to_string_pretty;

if let Ok(pretty) = to_string_pretty(&settings) {
println!("------------------------PRUX CONFIGURATION------------------------\n{}\n---------------------------------------------------------------------", pretty);
println!("------------------------PRUX CONFIGURATION------------------------\n{pretty}\n---------------------------------------------------------------------");
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ pub trait ToRegex {
fn as_str(&self) -> &str;
}

impl<'a> ToRegex for &'a str {
impl ToRegex for &str {
fn to_regex(&self) -> Result<::regex::Regex, ::regex::Error> {
::regex::Regex::new(self)
}
Expand Down