diff --git a/Cargo.lock b/Cargo.lock index 9b286ee4..a210364f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4164,6 +4164,7 @@ dependencies = [ "gethostname", "ipnet", "lazy_static", + "pin-project-lite", "serde", "serde_json", "thiserror 2.0.18", @@ -4176,7 +4177,6 @@ dependencies = [ "ts_control_noise", "ts_control_serde", "ts_dynbitset", - "ts_hexdump", "ts_http_util", "ts_keys", "ts_packet", diff --git a/Cargo.toml b/Cargo.toml index f65306e8..e57fde1c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -216,7 +216,7 @@ url.workspace = true [dev-dependencies] # Dependencies for examples -clap = { workspace = true, features = ["derive"] } +clap = { workspace = true, features = ["derive", "env"] } include_dir = "0.7" mime_guess = "2.0" tokio = { workspace = true, features = ["full"] } diff --git a/examples/axum/main.rs b/examples/axum/main.rs index 1b628d4d..1e8a14cc 100644 --- a/examples/axum/main.rs +++ b/examples/axum/main.rs @@ -56,13 +56,19 @@ struct Args { /// The auth key to connect with. /// /// Can be omitted if the key file is already authenticated. - #[arg(short = 'k', long)] + #[arg(short = 'k', long, env = "TS_AUTH_KEY")] auth_key: Option, /// The hostname this node will request. #[arg(short = 'H', long, default_value = "axum-example")] hostname: Option, + /// The URL of the control URL to connect to. + /// + /// Uses the Tailscale control server by default if unspecified. + #[arg(long, env = "TS_CONTROL_URL")] + control_url: Option, + /// Port to bind to. #[arg(short, long, default_value_t = 80)] port: u16, @@ -79,9 +85,15 @@ async fn main() -> Result<(), Box> { .init(); let args = Args::parse(); + let mut config = Config::default_with_key_file(&args.key_file).await?; config.requested_hostname = args.hostname; - let dev = Device::new(&config, args.auth_key.clone()).await?; + + if let Some(url) = args.control_url { + config.control_server_url = url; + } + + let dev = Device::new(&config, args.auth_key).await?; let listener = dev .tcp_listen((dev.ipv4_addr().await?, args.port).into()) diff --git a/examples/peer_ping/main.rs b/examples/peer_ping/main.rs index 84bf8493..7b33bdea 100644 --- a/examples/peer_ping/main.rs +++ b/examples/peer_ping/main.rs @@ -16,13 +16,19 @@ struct Args { /// The auth key to connect with. /// /// Can be omitted if the key file is already authenticated. - #[arg(short = 'k', long)] + #[arg(short = 'k', long, env = "TS_AUTH_KEY")] auth_key: Option, /// The hostname this node will request. #[arg(short = 'H', long, default_value = "peer_ping_example")] hostname: Option, + /// The URL of the control URL to connect to. + /// + /// Uses the Tailscale control server by default if unspecified. + #[arg(long, env = "TS_CONTROL_URL")] + control_url: Option, + /// Peer to send messages to. #[clap(short, long)] peer: SocketAddr, @@ -46,7 +52,12 @@ async fn main() -> Result<(), Box> { let mut config = Config::default_with_key_file(&args.key_file).await?; config.requested_hostname = args.hostname; - let dev = Device::new(&config, args.auth_key.clone()).await?; + + if let Some(url) = args.control_url { + config.control_server_url = url; + } + + let dev = Device::new(&config, args.auth_key).await?; let sock = dev.udp_bind((dev.ipv4_addr().await?, 1234).into()).await?; let mut ticker = tokio::time::interval(Duration::from_secs_f64(args.ping_interval_secs)); diff --git a/examples/tcp_echo/main.rs b/examples/tcp_echo/main.rs index 9577ae40..6a3b9177 100644 --- a/examples/tcp_echo/main.rs +++ b/examples/tcp_echo/main.rs @@ -24,13 +24,19 @@ struct Args { /// The auth key to connect with. /// /// Can be omitted if the key file is already authenticated. - #[arg(short = 'k', long)] + #[arg(short = 'k', long, env = "TS_AUTH_KEY")] auth_key: Option, /// The hostname this node will request. #[arg(short = 'H', long, default_value = "tcp_echo_example")] hostname: Option, + /// The URL of the control URL to connect to. + /// + /// Uses the Tailscale control server by default if unspecified. + #[arg(long, env = "TS_CONTROL_URL")] + control_url: Option, + /// Port to listen on (on tailnet IPv4). #[clap(short, long, default_value_t = 1234)] listen_port: u16, @@ -47,8 +53,14 @@ async fn main() -> Result<(), Box> { .init(); let args = Args::parse(); + let mut config = Config::default_with_key_file(&args.key_file).await?; config.requested_hostname = args.hostname; + + if let Some(url) = args.control_url { + config.control_server_url = url; + } + let dev = Device::new(&config, args.auth_key).await?; let sockaddr = (dev.ipv4_addr().await?, args.listen_port).into(); diff --git a/ts_control/Cargo.toml b/ts_control/Cargo.toml index 65007818..a99edb54 100644 --- a/ts_control/Cargo.toml +++ b/ts_control/Cargo.toml @@ -18,7 +18,6 @@ ts_capabilityversion = { workspace = true, features = ["serde"] } ts_control_noise.workspace = true ts_control_serde.workspace = true ts_dynbitset.workspace = true -ts_hexdump.workspace = true ts_http_util.workspace = true ts_keys.workspace = true ts_packet.workspace = true @@ -33,6 +32,7 @@ chrono = { workspace = true, features = ["serde"] } gethostname.workspace = true ipnet = { workspace = true, features = ["serde"] } lazy_static.workspace = true +pin-project-lite.workspace = true serde.workspace = true serde_json.workspace = true thiserror.workspace = true @@ -52,6 +52,8 @@ async_tokio = ["dep:futures-util", "dep:tokio", "dep:tokio-stream"] # Allow derp connections to be made without verifying TLS certs. Only for use in tests. insecure-derp = ["ts_transport_derp/insecure-for-tests"] +# Allow control keys to be fetched over plain HTTP1 without TLS. Only for use in tests. +insecure-keyfetch = [] [lints] workspace = true diff --git a/ts_control/src/control_dialer.rs b/ts_control/src/control_dialer.rs index f48ab594..fadc755a 100644 --- a/ts_control/src/control_dialer.rs +++ b/ts_control/src/control_dialer.rs @@ -255,8 +255,8 @@ where CapabilityVersion::CURRENT, ); - let mut conn = crate::tokio::upgrade_ts2021(url, &init_msg, handshake, h1_client).await?; - let _challenge_packet = crate::tokio::read_challenge_packet(&mut conn).await?; + let conn = crate::tokio::upgrade_ts2021(url, &init_msg, handshake, h1_client).await?; + let conn = crate::tokio::read_challenge_packet(conn).await?; let h2_conn = ts_http_util::http2::connect(conn).await?; tracing::debug!("http2 connection to control established"); diff --git a/ts_control/src/tokio/connect.rs b/ts_control/src/tokio/connect.rs index 13e6ae14..9ad984ff 100644 --- a/ts_control/src/tokio/connect.rs +++ b/ts_control/src/tokio/connect.rs @@ -1,16 +1,15 @@ use alloc::string::String; use core::{fmt, str::FromStr}; +use bytes::Bytes; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite}; use ts_capabilityversion::CapabilityVersion; -use ts_hexdump::{AsHexExt, Case}; use ts_http_util::{BytesBody, ClientExt, EmptyBody, HeaderName, HeaderValue, Http2, ResponseExt}; -use ts_keys::{ChallengePublicKey, MachineKeyPair, MachinePublicKey}; -use ts_packet::PacketMut; +use ts_keys::{MachineKeyPair, MachinePublicKey}; use url::Url; use zerocopy::network_endian::U32; -use crate::tokio::{MapStreamError, RegistrationError}; +use crate::tokio::{MapStreamError, RegistrationError, prefixed_reader::PrefixedReader}; const CHALLENGE_MAGIC: [u8; 5] = [0xFF, 0xFF, 0xFF, b'T', b'S']; const HANDSHAKE_HEADER_KEY: &str = "X-Tailscale-Handshake"; @@ -72,7 +71,7 @@ pub async fn connect( control_url: &Url, machine_keys: &MachineKeyPair, ) -> Result, ConnectionError> { - let h1_client = ts_http_util::http1::connect_tls(control_url).await?; + let h1_client = connect_h1(control_url).await?; let control_public_key = fetch_control_key(control_url).await?; @@ -83,23 +82,44 @@ pub async fn connect( CapabilityVersion::CURRENT, ); - let mut conn = upgrade_ts2021(control_url, &init_msg, handshake, h1_client).await?; - let _challenge_packet = read_challenge_packet(&mut conn).await?; + let conn = upgrade_ts2021(control_url, &init_msg, handshake, h1_client).await?; + + // The early payload (challenge packet) is optional. The server may send + // the magic prefix [FF FF FF 'T' 'S'] followed by a JSON challenge, or it + // may go straight to HTTP/2 (whose first frame starts with different bytes). + // Read the first 9 bytes (same size as an HTTP/2 frame header) and check. + let conn = read_challenge_packet(conn).await?; let h2_conn = ts_http_util::http2::connect(conn).await?; Ok(h2_conn) } +/// Connect an HTTP/1.1 client to the control server, using TLS for https:// +/// URLs and plain TCP for http:// URLs. +async fn connect_h1(url: &Url) -> Result, ConnectionError> { + if url.scheme() == "http" { + Ok(ts_http_util::http1::connect_tcp(url).await?) + } else { + Ok(ts_http_util::http1::connect_tls(url).await?) + } +} + #[tracing::instrument(skip_all, fields(%control_url), ret, err, level = "trace")] pub async fn fetch_control_key(control_url: &Url) -> Result { let mut key_url = control_url.join("/key")?; + + #[cfg(not(feature = "insecure-keyfetch"))] key_url.set_scheme("https").unwrap(); + if key_url.scheme() == "http" { + tracing::warn!("fetching control key over insecure http"); + } + key_url .query_pairs_mut() .extend_pairs([("v", CapabilityVersion::CURRENT.to_string())]); - let client = ts_http_util::http1::connect_tls::(&key_url).await?; + let client = connect_h1(&key_url).await?; let response = client.get(&key_url, None).await?; if !response.status().is_success() { let status = response.status(); @@ -153,21 +173,32 @@ pub async fn upgrade_ts2021( Ok(conn) } -#[tracing::instrument(skip_all, ret, err, level = "trace")] -pub async fn read_challenge_packet( - conn: &mut (impl AsyncRead + Unpin), -) -> Result { +/// Read the optional early payload (challenge packet) from the server. +/// +/// The server may send a challenge packet with magic prefix [FF FF FF 'T' 'S'] followed +/// by a JSON payload, or it may go straight to HTTP/2. This function checks for the magic header +/// and consumes the payload if present, otherwise chaining the bytes back for consumption by the +/// HTTP/2 parser. +#[tracing::instrument(skip_all, err, level = "trace")] +pub async fn read_challenge_packet( + mut conn: Conn, +) -> Result, ConnectionError> +where + Conn: AsyncRead + Unpin, +{ let mut magic = [0u8; CHALLENGE_MAGIC.len()]; conn.read_exact(&mut magic) .await .map_err(|err| ConnectionError::Io { - field: Some("magic"), - stage: "challenge", + field: Some("header"), + stage: "early_payload", err, })?; + + // This isn't an early challenge payload, it's the start of the HTTP/2 header -- chain it back if magic != CHALLENGE_MAGIC { - return Err(ConnectionError::InvalidChallengeMagic(magic)); + return Ok(PrefixedReader::new(conn, Bytes::copy_from_slice(&magic))); } let mut challenge_len: U32 = 0.into(); @@ -178,13 +209,15 @@ pub async fn read_challenge_packet( stage: "challenge", err, })?; + let challenge_len = challenge_len.get() as usize; if challenge_len > MAX_CHALLENGE_LENGTH { return Err(ConnectionError::InvalidChallengeLength(challenge_len)); } - let mut json = PacketMut::new(challenge_len); - conn.read_exact(json.as_mut()) + // Read and discard the challenge JSON. + let mut limited = conn.take(challenge_len as _); + tokio::io::copy(&mut limited, &mut tokio::io::sink()) .await .map_err(|err| ConnectionError::Io { field: Some("body"), @@ -192,20 +225,69 @@ pub async fn read_challenge_packet( err, })?; - #[derive(serde::Deserialize)] - #[serde(rename_all = "camelCase")] - struct ChallengePacket { - node_key_challenge: ChallengePublicKey, - } - tracing::trace!( - "challenge packet:\n{}", - json.iter() - .hexdump(Case::Lower) - .flatten() - .collect::() + n_bytes = challenge_len, + "read and discarded early challenge payload" ); - let packet = serde_json::from_slice::(&json[..])?; - Ok(packet.node_key_challenge) + Ok(PrefixedReader::new( + limited.into_inner(), + Default::default(), + )) +} + +#[cfg(test)] +mod tests { + use tokio::io::{AsyncReadExt, AsyncWriteExt, duplex}; + + use super::*; + + /// Build a challenge packet: magic + big-endian length + JSON body. + fn make_challenge(json: &[u8]) -> Vec { + let mut buf = Vec::new(); + buf.extend_from_slice(&CHALLENGE_MAGIC); + buf.extend_from_slice(&(json.len() as u32).to_be_bytes()); + buf.extend_from_slice(json); + buf + } + + /// Test that when the server sends an early challenge packet (production control + /// server behavior), the magic+length+JSON is consumed and subsequent HTTP/2 data + /// is passed through unmodified. + #[tokio::test] + async fn challenge_present() { + let json = b"{\"nodeKeyChallenge\":\"test\"}"; + let payload = b"HTTP/2 data after challenge"; + + let mut data = make_challenge(json); + data.extend_from_slice(payload); + + let (mut writer, reader) = duplex(1024); + writer.write_all(&data).await.unwrap(); + drop(writer); + + let mut conn = read_challenge_packet(reader).await.unwrap(); + + let mut out = Vec::new(); + conn.read_to_end(&mut out).await.unwrap(); + assert_eq!(out, payload); + } + + /// Test that when the server skips the early challenge and goes straight to HTTP/2 + /// (testcontrol behavior), all bytes are preserved -- the 9-byte peek that didn't + /// match the magic is chained back so the HTTP/2 parser sees the full stream. + #[tokio::test] + async fn challenge_absent() { + let payload = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; + + let (mut writer, reader) = duplex(1024); + writer.write_all(payload).await.unwrap(); + drop(writer); + + let mut conn = read_challenge_packet(reader).await.unwrap(); + + let mut out = Vec::new(); + conn.read_to_end(&mut out).await.unwrap(); + assert_eq!(out, payload); + } } diff --git a/ts_control/src/tokio/mod.rs b/ts_control/src/tokio/mod.rs index 7092d5a5..7baf0d18 100644 --- a/ts_control/src/tokio/mod.rs +++ b/ts_control/src/tokio/mod.rs @@ -13,6 +13,7 @@ pub use map_stream::{FilterUpdate, MapStreamError, PeerUpdate, StateUpdate}; mod ping; pub use ping::PingError; +mod prefixed_reader; mod register; pub use register::{AuthResult, RegistrationError, register}; diff --git a/ts_control/src/tokio/prefixed_reader.rs b/ts_control/src/tokio/prefixed_reader.rs new file mode 100644 index 00000000..046cc5cf --- /dev/null +++ b/ts_control/src/tokio/prefixed_reader.rs @@ -0,0 +1,58 @@ +use core::task::Context; +use std::{io, pin::Pin, task::Poll}; + +use bytes::{BufMut, Bytes}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; + +pin_project_lite::pin_project! { + /// I/O wrapper that has data prefixed before the inner reader. + /// + /// This can't use [`tokio::io::Chain`] because it doesn't impl `AsyncWrite` for the inner type, + /// and we need both. + pub struct PrefixedReader { + prefix: Bytes, + #[pin] + inner: T, + } +} + +impl PrefixedReader { + pub fn new(inner: T, prefix: Bytes) -> PrefixedReader { + PrefixedReader { prefix, inner } + } +} + +impl AsyncRead for PrefixedReader { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let slf = self.project(); + + if slf.prefix.is_empty() { + return slf.inner.poll_read(cx, buf); + } + + buf.put(slf.prefix); + Poll::Ready(Ok(())) + } +} + +impl AsyncWrite for PrefixedReader { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.project().inner.poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_flush(cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_shutdown(cx) + } +} diff --git a/ts_http_util/src/lib.rs b/ts_http_util/src/lib.rs index e8f0be35..a72be838 100644 --- a/ts_http_util/src/lib.rs +++ b/ts_http_util/src/lib.rs @@ -97,7 +97,10 @@ pub fn make_upgrade_req( protocol: &str, extra_headers: impl IntoIterator, ) -> Result, Error> { - let mut req = Request::get(u.as_str()) + // Use POST for the upgrade request. Some server implementations accept both + // GET and POST, but others (e.g. Go's testcontrol) only accept POST. POST + // is what Go's controlhttp client sends, so use it for widest compatibility. + let mut req = Request::post(u.as_str()) .header(HOST, u.host_str().ok_or(Error::InvalidParam)?) .header(UPGRADE, protocol) .header(CONNECTION, "Upgrade")