Skip to content

Commit

Permalink
improve proxy connection failure error messages and add counters
Browse files Browse the repository at this point in the history
Failure to connect to a proxy server will now include more context
about the proxy server and protocol in the error message, and
will bump a counter.

Failure to directly bind a source address for the outgoing connection
will bump a counter.

refs: #286
  • Loading branch information
wez committed Sep 20, 2024
1 parent aabbada commit 40938a9
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 16 deletions.
82 changes: 71 additions & 11 deletions crates/kumod/src/egress_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ use kumo_server_common::config_handle::ConfigHandle;
use lruttl::LruCacheWithTtl;
use mlua::prelude::LuaUserData;
use parking_lot::FairMutex as Mutex;
use prometheus::IntCounter;
use serde::{Deserialize, Serialize};
use socksv5::v5::{
SocksV5AuthMethod, SocksV5Command, SocksV5Host, SocksV5RequestStatus, SocksV5Response,
};
use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use std::sync::{Arc, LazyLock};
use std::time::{Duration, Instant};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpSocket, TcpStream};
Expand Down Expand Up @@ -173,19 +174,23 @@ impl EgressSource {
pub async fn connect_to(
&self,
address: SocketAddr,
timeout_duration: Duration,
) -> anyhow::Result<(TcpStream, MaybeProxiedSourceAddress)> {
let source_name = &self.name;

let proxy_proto = self.resolve_proxy_protocol(address)?;
let transport_address = proxy_proto.transport_address(address);

tracing::trace!("will connect {address:?} {transport_address:?} {proxy_proto:?}");
let transport_context = format!("{transport_address:?} {proxy_proto:?}");
let connect_context =
format!("{address:?} transport={transport_address:?} proto={proxy_proto:?}");
tracing::trace!("will connect to {connect_context}");

let socket = match transport_address {
SocketAddr::V4(_) => TcpSocket::new_v4(),
SocketAddr::V6(_) => TcpSocket::new_v6(),
}
.with_context(|| format!("make socket to connect to {transport_address:?}"))?;
.with_context(|| format!("make socket to connect to {connect_context}"))?;

// No need for Nagle with SMTP request/response
socket.set_nodelay(true)?;
Expand All @@ -194,24 +199,72 @@ impl EgressSource {
if let Err(err) = socket.bind(SocketAddr::new(source, 0)) {
let error = format!(
"bind {source:?} for source:{source_name} failed: {err:#} \
while attempting to connect to {transport_address:?}"
while attempting to connect to {connect_context}"
);
static FAILED_BIND: LazyLock<IntCounter> = LazyLock::new(|| {
prometheus::register_int_counter!(
"bind_failures",
"how many times that directly binding a source address has failed"
)
.unwrap()
});
FAILED_BIND.inc();
anyhow::bail!("{error}");
}
}
let mut stream = socket
.connect(transport_address)
.await
.with_context(|| format!("connect to {transport_address:?}"))?;

let source_address = proxy_proto
.perform_handshake(&mut stream, &source_name)
.await?;
let deadline = Instant::now() + timeout_duration;
let is_proxy = proxy_proto.is_proxy();

let mut stream =
match tokio::time::timeout_at(deadline.into(), socket.connect(transport_address)).await
{
Err(_) => {
inc_failed_proxy_connection_attempts(is_proxy);
anyhow::bail!(
"timeout after {timeout_duration:?} \
while connecting to {transport_context}"
);
}
Ok(Err(err)) => {
inc_failed_proxy_connection_attempts(is_proxy);
anyhow::bail!("failed to connect to {transport_context}: {err:#}");
}
Ok(Ok(stream)) => stream,
};

let source_address = tokio::time::timeout_at(
deadline.into(),
proxy_proto.perform_handshake(&mut stream, &source_name),
)
.await
.map_err(|_| {
anyhow::anyhow!(
"timeout after {timeout_duration:?} \
while performing proxy handshake with {transport_context}"
)
})?
.with_context(|| format!("failed to perform proxy handshake with {transport_context}"))?;

Ok((stream, source_address))
}
}

fn inc_failed_proxy_connection_attempts(is_proxy: bool) {
if !is_proxy {
return;
}

static FAILED: LazyLock<IntCounter> = LazyLock::new(|| {
prometheus::register_int_counter!(
"proxy_connection_failures",
"how many times a connection attempt to a proxy server has failed"
)
.unwrap()
});
FAILED.inc();
}

#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, mlua::FromLua)]
#[serde(deny_unknown_fields)]
pub struct EgressPoolEntry {
Expand Down Expand Up @@ -568,6 +621,13 @@ impl<'a> ProxyProto<'a> {
}
}

fn is_proxy(&self) -> bool {
match self {
Self::None => false,
_ => true,
}
}

/// Setup the proxy connection.
/// Returns the source address used by the connection;
/// this is *probably* the external IP, unless your proxy is also
Expand Down
11 changes: 6 additions & 5 deletions crates/kumod/src/smtp_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,12 @@ impl SmtpDispatcher {
// awaiting the shutdown subscription, causing us to uselessly wait
// for the full connect timeout during shutdown.
tokio::spawn(async move {
let (stream, source_address) = tokio::time::timeout(
timeouts.connect_timeout,
egress_source.connect_to(SocketAddr::new(address.addr, port)),
)
.await??;
let (stream, source_address) = egress_source
.connect_to(
SocketAddr::new(address.addr, port),
timeouts.connect_timeout,
)
.await?;

tracing::debug!(
"connected to {address:?} port {port} via source address {source_address:?}"
Expand Down
6 changes: 6 additions & 0 deletions docs/changelog/main.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@
* The log_hooks helper now supports batching using the new `batch_size` parameter
and `send_batch` method. [See the example](../userguide/operation/webhooks.md#batched-hooks)

* Added `proxy_connection_failures` and `bind_failures` counters to track
the number of times that kumod either failed to connect to an egress proxy
server, or failed to directly bind a source address. Both of these events
typically indicate a severe issue with the local infrastructure, either in
terms of a configuration error or production service availability.

## Fixes

* `kcli trace-smtp-client` and `kcli trace-smtp-server` would always report
Expand Down

0 comments on commit 40938a9

Please sign in to comment.