diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index c1b1c996..bdcb63c2 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -1,17 +1,26 @@ # Architecture -This document describes the high-level software architecture of `tailscale-rs` down to the level of individual crates. -You shouldn't need this document to build applications with `tailscale-rs`! This information is intended for +This document describes the high-level software architecture of `tailscale-rs` down to the level of +individual crates. +You shouldn't need this document to build applications with `tailscale-rs`! This information is +intended for maintainers and anyone who wants to understand or modify the internals. -We assume basic familiarity with the concepts and terminology behind Tailscale and tailnets, especially: -- The [control plane](https://tailscale.com/docs/concepts/control-data-planes#control-plane) (also called "coordination server" or "control server") and [data plane](https://tailscale.com/docs/concepts/control-data-planes#data-plane) +We assume basic familiarity with the concepts and terminology behind Tailscale and tailnets, +especially: + +- The [control plane](https://tailscale.com/docs/concepts/control-data-planes#control-plane) (also + called "coordination server" or "control server") + and [data plane](https://tailscale.com/docs/concepts/control-data-planes#data-plane) - [Access Control Lists (ACLs)/grants](https://tailscale.com/docs/reference/grants-vs-acls) -- [Overlay networks](https://tailscale.com/docs/reference/glossary#overlay-network) and [underlay networks](https://tailscale.com/docs/reference/glossary#underlay-network) +- [Overlay networks](https://tailscale.com/docs/reference/glossary#overlay-network) + and [underlay networks](https://tailscale.com/docs/reference/glossary#underlay-network) - [Relayed connections and DERP servers](https://tailscale.com/docs/reference/derp-servers) -If a term is unfamiliar, the [Tailscale Glossary](https://tailscale.com/docs/reference/glossary) is a good starting -point. More detailed usage instructions and technical details can be found in each crate's documentation. See +If a term is unfamiliar, the [Tailscale Glossary](https://tailscale.com/docs/reference/glossary) is +a good starting +point. More detailed usage instructions and technical details can be found in each crate's +documentation. See [`CONTRIBUTING.md`](CONTRIBUTING.md) for development environment setup, tool use, and guidelines. ## Overview @@ -27,6 +36,7 @@ block-beta ``` `tailscale-rs` crates can generally be separated into five areas: + - **API or Language Bindings**: how your program interacts with `tailscale-rs` - **Runtime**: coordinates and manages the API, control plane, and data plane components - **Control Plane**: communication with Tailscale's control plane @@ -37,97 +47,156 @@ block-beta Crates/libraries that applications will program against. -- [`tailscale`](src/lib.rs): the Rust API, for Rust language access and for other language bindings to build upon. +- [`tailscale`](src/lib.rs): the Rust API, for Rust language access and for other language bindings + to build upon. - [`ts_elixir`](ts_elixir/README.md): Elixir language bindings built on top of the Rust API. -- [`ts_ffi`:](ts_ffi/README.md) C language bindings built on top of the Rust API. This is also commonly referred to as a Foreign Function Interface (FFI). +- [`ts_ffi`:](ts_ffi/README.md) C language bindings built on top of the Rust API. This is also + commonly referred to as a Foreign Function Interface (FFI). - [`ts_python`](ts_python/README.md): Python language bindings built on top of the Rust API. ### Runtime -Crates that tie all the lower level components together, pass communications between components, and provide higher-level abstractions. +Crates that tie all the lower level components together, pass communications between components, and +provide higher-level abstractions. -- [`ts_runtime`](ts_runtime/src/lib.rs): with apologies to The Big Lebowski: "`ts_runtime` really ties the room together". For each API-level `Device`, the runtime uses an actor architecture to manage the lifecycle of the control client, data plane components, netstack, etc. A message bus passes updates and communications between these top-level actors. -- [`ts_netcheck`](ts_netcheck/src/lib.rs): checks network availability and reports latency to DERP servers in different regions. +- [`ts_runtime`](ts_runtime/src/lib.rs): with apologies to The Big Lebowski: "`ts_runtime` really + ties the room together". For each API-level `Device`, the runtime uses an actor architecture to + manage the lifecycle of the control client, data plane components, netstack, etc. A message bus + passes updates and communications between these top-level actors. +- [`ts_netcheck`](ts_netcheck/src/lib.rs): checks network availability and reports latency to DERP + servers in different regions. #### Netstack (Userspace Network Stack) -Implements the TCP, UDP, and raw socket abstractions for the overlay network (tailnet). These are the same sockets returned to the user from methods like `tcp_connect()` on [`tailscale::Device`](src/lib.rs). +Implements the TCP, UDP, and raw socket abstractions for the overlay network (tailnet). These are +the same sockets returned to the user from methods like `tcp_connect()` on [ +`tailscale::Device`](src/lib.rs). -- [`ts_netstack_smoltcp`](ts_netstack_smoltcp/src/lib.rs): a `smoltcp`-based network stack that processes Layer 3+ packets to/from the overlay network, built on top of the other netstack crates. -- [`ts_netstack_smoltcp_core`](ts_netstack_smoltcp_core/src/lib.rs): channel-based abstractions that wrap [`smoltcp`](https://docs.rs/smoltcp/latest/smoltcp/) and provide features such as `async` integration and polling/accept loops. -- [`ts_netstack_smoltcp_socket`](ts_netstack_smoltcp_socket/src/lib.rs): BSD sockets-style blocking and `async` interfaces built on top of the command channels and features in `ts_netstack_smoltcp_core`. +- [`ts_netstack_smoltcp`](ts_netstack_smoltcp/src/lib.rs): a `smoltcp`-based network stack that + processes Layer 3+ packets to/from the overlay network, built on top of the other netstack crates. +- [`ts_netstack_smoltcp_core`](ts_netstack_smoltcp_core/src/lib.rs): channel-based abstractions that + wrap [`smoltcp`](https://docs.rs/smoltcp/latest/smoltcp/) and provide features such as `async` + integration and polling/accept loops. +- [`ts_netstack_smoltcp_socket`](ts_netstack_smoltcp_socket/src/lib.rs): BSD sockets-style blocking + and `async` interfaces built on top of the command channels and features in + `ts_netstack_smoltcp_core`. ### Control Plane -Crates that communicate with Tailscale's control plane (or Headscale) and provide configuration for the data plane. The control plane handles authentication/authorization, node registration, policy updates, network map distribution, and much more for the nodes in a tailnet. +Crates that communicate with Tailscale's control plane (or Headscale) and provide configuration for +the data plane. The control plane handles authentication/authorization, node registration, policy +updates, network map distribution, and much more for the nodes in a tailnet. -- [`ts_control`](ts_control/src/lib.rs): control plane client that handles registration, authorization/authentication, configuration, and streaming updates. -- [`ts_control_noise`](ts_control_noise/src/lib.rs): abstraction that wraps control plane communications in a Noise IK tunnel, transparently handling cryptography for the client. +- [`ts_control`](ts_control/src/lib.rs): control plane client that handles registration, + authorization/authentication, configuration, and streaming updates. +- [`ts_control_noise`](ts_control_noise/src/lib.rs): abstraction that wraps control plane + communications in a Noise IK tunnel, transparently handling cryptography for the client. #### Control Protocol Wire Types -Types and (de)serialization code for control plane traffic "on the wire". `ts_control_serde` contains the bulk of these types; the other crates contain types that have been broken out for easier sharing with other crates. - -- [`ts_capabilityversion`](ts_capabilityversion/src/lib.rs): defines the features a node implements and the effective version of the control plane protocol it supports. -- [`ts_control_serde`](ts_control_serde/src/lib.rs): types representing the majority of control plane traffic "on the wire", plus (de)serialization code and utilities. -- [`ts_nodecapability`](ts_nodecapability/src/lib.rs): defines the capabilities the "self" node has been granted on the tailnet, according to the control plane. -- [`ts_packetfilter_serde`](ts_packetfilter_serde/src/lib.rs): types representing packet filters "on the wire", plus (de)serialization code and utilities. -- [`ts_peercapability`](ts_peercapability/src/lib.rs): defines the capabilities a peer node (as opposed to the "self" node) has been granted on the tailnet, according to the control plane. +Types and (de)serialization code for control plane traffic "on the wire". `ts_control_serde` +contains the bulk of these types; the other crates contain types that have been broken out for +easier sharing with other crates. + +- [`ts_capabilityversion`](ts_capabilityversion/src/lib.rs): defines the features a node implements + and the effective version of the control plane protocol it supports. +- [`ts_control_serde`](ts_control_serde/src/lib.rs): types representing the majority of control + plane traffic "on the wire", plus (de)serialization code and utilities. +- [`ts_nodecapability`](ts_nodecapability/src/lib.rs): defines the capabilities the "self" node has + been granted on the tailnet, according to the control plane. +- [`ts_packetfilter_serde`](ts_packetfilter_serde/src/lib.rs): types representing packet filters "on + the wire", plus (de)serialization code and utilities. +- [`ts_peercapability`](ts_peercapability/src/lib.rs): defines the capabilities a peer node (as + opposed to the "self" node) has been granted on the tailnet, according to the control plane. ### Data Plane -Crates that communicate with other Tailscale nodes on the tailnet. The data plane is responsible for actually exchanging packets between peers on the tailnet, including transport management (DERP, TUN, etc.), routing, packet filtering, and tunneling. +Crates that communicate with other Tailscale nodes on the tailnet. The data plane is responsible for +actually exchanging packets between peers on the tailnet, including transport management (DERP, TUN, +etc.), routing, packet filtering, and tunneling. -- [`ts_dataplane`](ts_dataplane/src/lib.rs): wires all the individual data plane functions together, flowing inbound and outbound packets through the components in the correct order. The various data plane components are described below. +- [`ts_dataplane`](ts_dataplane/src/lib.rs): wires all the individual data plane functions together, + flowing inbound and outbound packets through the components in the correct order. The various data + plane components are described below. #### Packet Filtering - - [`ts_packetfilter`](ts_packetfilter/src/lib.rs): performs filtering of traffic based on a tailnet's policies (ACLs and/or grants), which each node receives from the control plane. - - [`ts_bart_packetfilter`](ts_bart_packetfilter/src/lib.rs): specialization of `ts_bart` used by `ts_packetfilter` for fast filtering decisions. - - [`ts_packetfilter_state`](ts_packetfilter_state/src/lib.rs): converters and adapters between the control protocol wire types in `ts_packetfilter_serde` and the types used in `ts_packetfilter`. +- [`ts_packetfilter`](ts_packetfilter/src/lib.rs): performs filtering of traffic based on a + tailnet's policies (ACLs and/or grants), which each node receives from the control plane. +- [`ts_bart_packetfilter`](ts_bart_packetfilter/src/lib.rs): specialization of `ts_bart` used by + `ts_packetfilter` for fast filtering decisions. +- [`ts_packetfilter_state`](ts_packetfilter_state/src/lib.rs): converters and adapters between the + control protocol wire types in `ts_packetfilter_serde` and the types used in `ts_packetfilter`. #### Routing - - [`ts_overlay_router`](ts_overlay_router/src/lib.rs): routing table implementation for overlay (tailnet) traffic; determines which peer to send outbound traffic to, and which overlay transport should receive inbound packets. - - [`ts_underlay_router`](ts_underlay_router/src/lib.rs): routing table implementation for underlay traffic; determines which underlay transport an outbound packet should be sent from, if any. +- [`ts_overlay_router`](ts_overlay_router/src/lib.rs): routing table implementation for overlay ( + tailnet) traffic; determines which peer to send outbound traffic to, and which overlay transport + should receive inbound packets. +- [`ts_underlay_router`](ts_underlay_router/src/lib.rs): routing table implementation for underlay + traffic; determines which underlay transport an outbound packet should be sent from, if any. #### Transports - - [`ts_transport`](ts_transport/src/lib.rs): traits that define transports and how they move traffic in and out of the overlay/underlay network. - - [`ts_transport_derp`](ts_transport_derp/src/lib.rs): an underlay transport that exchanges packets between nodes via Designated Encrypted Relay for Packets (DERP) relay servers. - - [`ts_transport_tun`](ts_transport_tun/src/lib.rs): an overlay transport that exposes a TUN device on the local machine to send/receive packets on the overlay network (tailnet). +- [`ts_transport`](ts_transport/src/lib.rs): traits that define transports and how they move traffic + in and out of the overlay/underlay network. +- [`ts_derp`](ts_derp/src/lib.rs): an underlay transport that exchanges packets between nodes via + Designated Encrypted Relay for Packets (DERP) relay servers. +- [`ts_transport_tun`](ts_transport_tun/src/lib.rs): an overlay transport that exposes a TUN device + on the local machine to send/receive packets on the overlay network (tailnet). #### Tunneling - - [`ts_tunnel`](ts_tunnel/src/lib.rs): a partial implementation of the WireGuard specification that protects all data plane traffic, and is interoperable with other WireGuard clients, including Tailscale clients. +- [`ts_tunnel`](ts_tunnel/src/lib.rs): a partial implementation of the WireGuard specification that + protects all data plane traffic, and is interoperable with other WireGuard clients, including + Tailscale clients. ### Utilities -Crates used throughout the codebase that provide generic algorithms, data structures, cross-cutting concerns, or development tooling. +Crates used throughout the codebase that provide generic algorithms, data structures, cross-cutting +concerns, or development tooling. #### Algorithms and Data Structures - - [`ts_array256`](ts_array256/src/lib.rs): sparse array of 256 elements with configurable backing store, used with `ts_bart`. - - [`ts_bart`](ts_bart/README.md): BAlanced Routing Table (BART) data structure for fast IP address/prefix search in routing tables and packet filtering. - - [`ts_bitset`](ts_bitset/src/lib.rs): fixed-width bitset used to track presence of elements in `ts_array256`. - - [`ts_dynbitset`](ts_dynbitset/src/lib.rs): growable bitset built on top of `ts_bitset`, used with `ts_bart_packetfilter`. - - [`ts_keys`](ts_keys/src/lib.rs): data structures representing all of Tailscale's x25519 keys (disco, node, machine, etc.). - - [`ts_packet`](ts_packet/src/lib.rs): base types representing network packets. + +- [`ts_array256`](ts_array256/src/lib.rs): sparse array of 256 elements with configurable backing + store, used with `ts_bart`. +- [`ts_bart`](ts_bart/README.md): BAlanced Routing Table (BART) data structure for fast IP + address/prefix search in routing tables and packet filtering. +- [`ts_bitset`](ts_bitset/src/lib.rs): fixed-width bitset used to track presence of elements in + `ts_array256`. +- [`ts_dynbitset`](ts_dynbitset/src/lib.rs): growable bitset built on top of `ts_bitset`, used with + `ts_bart_packetfilter`. +- [`ts_keys`](ts_keys/src/lib.rs): data structures representing all of Tailscale's x25519 keys ( + disco, node, machine, etc.). +- [`ts_packet`](ts_packet/src/lib.rs): base types representing network packets. #### Developer Tooling - - [`checks`](checks/src/main.rs) (and [`bin/check`](bin/check)): runs all the CI/CD checks (linting, testing, etc.) locally. - - `nix` (directory, not a crate): Nix support modules for the Nix Flake. - - `supply-chain` (directory, not a crate): configuration for `cargo-vet`. - - [`ts_devtools`](ts_devtools/src): debugging and integration tools for internals; not useful for application debugging or as examples. + +- [`checks`](checks/src/main.rs) (and [`bin/check`](bin/check)): runs all the CI/CD checks (linting, + testing, etc.) locally. +- `nix` (directory, not a crate): Nix support modules for the Nix Flake. +- `supply-chain` (directory, not a crate): configuration for `cargo-vet`. +- [`ts_devtools`](ts_devtools/src): debugging and integration tools for internals; not useful for + application debugging or as examples. #### Examples, Debugging, and Testing - - [`ts_cli_util`](ts_cli_util/src/lib.rs): helpers for writing command line tools and initializing logging, used in examples. - - [`ts_test_util`](ts_test_util/src/lib.rs): common code used by our unit and integration tests, such as determining if the network is available. - - [`ts_hexdump`](ts_hexdump/src/lib.rs): traits and functions to generate canonical hexdumps of buffers for debug logging. + +- [`ts_cli_util`](ts_cli_util/src/lib.rs): helpers for writing command line tools and initializing + logging, used in examples. +- [`ts_test_util`](ts_test_util/src/lib.rs): common code used by our unit and integration tests, + such as determining if the network is available. +- [`ts_hexdump`](ts_hexdump/src/lib.rs): traits and functions to generate canonical hexdumps of + buffers for debug logging. #### Protocols - - [`ts_disco_protocol`](ts_disco_protocol/src/lib.rs): incomplete implementation of Tailscale's discovery protocol (disco). - - [`ts_http_util`](ts_http_util/src/lib.rs): HTTP/1 and HTTP/2 client utilities used in `ts_control` and `ts_transport_derp`. - - [`ts_tls_util`](ts_tls_util/src/lib.rs): Transport Layer Sockets (TLS) utilities to manage certificates and establish secure connections over HTTP. + +- [`ts_disco_protocol`](ts_disco_protocol/src/lib.rs): incomplete implementation of Tailscale's + discovery protocol (disco). +- [`ts_http_util`](ts_http_util/src/lib.rs): HTTP/1 and HTTP/2 client utilities used in `ts_control` + and `ts_derp`. +- [`ts_tls_util`](ts_tls_util/src/lib.rs): Transport Layer Sockets (TLS) utilities to manage + certificates and establish secure connections over HTTP. #### Time - - [`ts_time`](ts_time/src/lib.rs): event scheduling and wakeup code used by `ts_tunnel` for timers. + +- [`ts_time`](ts_time/src/lib.rs): event scheduling and wakeup code used by `ts_tunnel` for timers. diff --git a/Cargo.lock b/Cargo.lock index f4b3a4aa..95b4f1ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2879,6 +2879,15 @@ dependencies = [ "rand_core 0.9.5", ] +[[package]] +name = "redb" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e925444704b5f17d32bf42f5b6e2df050bceebc3dcd6e71cc73dafe8092e839" +dependencies = [ + "libc", +] + [[package]] name = "redox_syscall" version = "0.5.18" @@ -4151,8 +4160,8 @@ dependencies = [ "tracing-tracy", "tracy-client", "ts_control", + "ts_derp", "ts_netcheck", - "ts_transport_derp", ] [[package]] @@ -4177,6 +4186,7 @@ dependencies = [ "ts_capabilityversion", "ts_control_noise", "ts_control_serde", + "ts_derp", "ts_dynbitset", "ts_http_util", "ts_keys", @@ -4184,7 +4194,6 @@ dependencies = [ "ts_packetfilter", "ts_packetfilter_state", "ts_tls_util", - "ts_transport_derp", "url", "zerocopy", ] @@ -4237,6 +4246,7 @@ name = "ts_dataplane" version = "0.2.0" dependencies = [ "etherparse", + "redb", "tokio", "tracing", "ts_bart", @@ -4248,6 +4258,35 @@ dependencies = [ "ts_transport", "ts_tunnel", "ts_underlay_router", + "zerocopy", +] + +[[package]] +name = "ts_derp" +version = "0.2.0" +dependencies = [ + "bytes", + "crypto_box", + "futures", + "hex", + "reqwest", + "serde", + "serde_json", + "thiserror 2.0.18", + "tokio", + "tokio-util", + "tracing", + "ts_cli_util", + "ts_control_serde", + "ts_hexdump", + "ts_http_util", + "ts_keys", + "ts_packet", + "ts_tls_util", + "ts_transport", + "url", + "yoke", + "zerocopy", ] [[package]] @@ -4260,12 +4299,12 @@ dependencies = [ "tokio", "tracing", "ts_cli_util", + "ts_derp", "ts_keys", "ts_packet", "ts_packetfilter", "ts_packetfilter_state", "ts_transport", - "ts_transport_derp", ] [[package]] @@ -4367,9 +4406,9 @@ dependencies = [ "tracing-test", "ts_control", "ts_control_serde", + "ts_derp", "ts_http_util", "ts_test_util", - "ts_transport_derp", "url", ] @@ -4435,7 +4474,6 @@ dependencies = [ "itertools", "tracing", "ts_bart", - "ts_keys", "ts_packet", "ts_transport", ] @@ -4517,6 +4555,7 @@ dependencies = [ "ts_bart_packetfilter", "ts_control", "ts_dataplane", + "ts_derp", "ts_keys", "ts_netcheck", "ts_netstack_smoltcp", @@ -4525,7 +4564,7 @@ dependencies = [ "ts_packetfilter", "ts_packetfilter_state", "ts_transport", - "ts_transport_derp", + "ts_tunnel", ] [[package]] @@ -4554,35 +4593,7 @@ dependencies = [ name = "ts_transport" version = "0.2.0" dependencies = [ - "ts_keys", - "ts_packet", -] - -[[package]] -name = "ts_transport_derp" -version = "0.2.0" -dependencies = [ - "bytes", - "crypto_box", - "futures", - "hex", - "reqwest", - "serde", - "serde_json", - "thiserror 2.0.18", - "tokio", - "tokio-util", - "tracing", - "ts_cli_util", - "ts_control_serde", - "ts_hexdump", - "ts_http_util", - "ts_keys", "ts_packet", - "ts_tls_util", - "ts_transport", - "url", - "yoke", "zerocopy", ] @@ -4629,7 +4640,6 @@ dependencies = [ name = "ts_underlay_router" version = "0.2.0" dependencies = [ - "ts_keys", "ts_packet", "ts_transport", ] diff --git a/Cargo.toml b/Cargo.toml index e57fde1c..db5fc4a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,9 +12,9 @@ members = [ "ts_control_noise", "ts_control_serde", "ts_dataplane", + "ts_derp", "ts_devtools", "ts_disco_protocol", - "ts_disco_protocol", "ts_dynbitset", "ts_elixir/native/ts_elixir", "ts_ffi", @@ -38,7 +38,6 @@ members = [ "ts_time", "ts_tls_util", "ts_transport", - "ts_transport_derp", "ts_transport_tun", "ts_underlay_router", "ts_tunnel", @@ -116,6 +115,7 @@ ts_control = { path = "ts_control", version = "0.2.0" } ts_control_noise = { path = "ts_control_noise", version = "0.2.0" } ts_control_serde = { path = "ts_control_serde", version = "0.2.0" } ts_dataplane = { path = "ts_dataplane", version = "0.2.0" } +ts_derp = { path = "ts_derp", version = "0.2.0" } ts_disco_protocol = { path = "ts_disco_protocol", version = "0.2.0" } ts_dynbitset = { path = "ts_dynbitset", version = "0.2.0" } ts_hexdump = { path = "ts_hexdump", version = "0.2.0" } @@ -137,7 +137,6 @@ ts_runtime = { path = "ts_runtime", version = "0.2.0" } ts_test_util = { path = "ts_test_util" } ts_time = { path = "ts_time", version = "0.2.0" } ts_transport = { path = "ts_transport", version = "0.2.0" } -ts_transport_derp = { path = "ts_transport_derp", version = "0.2.0" } ts_transport_tun = { path = "ts_transport_tun", version = "0.2.0" } ts_underlay_router = { path = "ts_underlay_router", version = "0.2.0" } ts_tunnel = { path = "ts_tunnel", version = "0.2.0" } diff --git a/ts_cli_util/Cargo.toml b/ts_cli_util/Cargo.toml index 46a30d74..9185e2b2 100644 --- a/ts_cli_util/Cargo.toml +++ b/ts_cli_util/Cargo.toml @@ -16,7 +16,7 @@ rust-version.workspace = true tailscale.workspace = true ts_control.workspace = true ts_netcheck.workspace = true -ts_transport_derp.workspace = true +ts_derp.workspace = true # Unconditionally required dependencies. cfg-if.workspace = true diff --git a/ts_cli_util/src/lib.rs b/ts_cli_util/src/lib.rs index f126306f..8c55bb7b 100644 --- a/ts_cli_util/src/lib.rs +++ b/ts_cli_util/src/lib.rs @@ -5,8 +5,8 @@ use std::sync::Arc; use futures_util::{Stream, StreamExt}; use tracing::level_filters::LevelFilter; use tracing_subscriber::{Layer, layer::SubscriberExt, util::SubscriberInitExt}; +use ts_derp::{RegionId, ServerConnInfo}; use ts_netcheck::RegionResult; -use ts_transport_derp::{RegionId, ServerConnInfo}; /// Result with a boxed [`core::error::Error`] trait object. pub type Result = core::result::Result>; diff --git a/ts_control/Cargo.toml b/ts_control/Cargo.toml index a99edb54..ebaa114b 100644 --- a/ts_control/Cargo.toml +++ b/ts_control/Cargo.toml @@ -24,7 +24,7 @@ ts_packet.workspace = true ts_packetfilter.workspace = true ts_packetfilter_state.workspace = true ts_tls_util.workspace = true -ts_transport_derp.workspace = true +ts_derp.workspace = true # Unconditionally required dependencies. bytes.workspace = true @@ -51,7 +51,7 @@ default = ["async_tokio"] async_tokio = ["dep:futures-util", "dep:tokio", "dep:tokio-stream"] # Allow derp connections to be made without verifying TLS certs. Only for use in tests. -insecure-derp = ["ts_transport_derp/insecure-for-tests"] +insecure-derp = ["ts_derp/insecure-for-tests"] # Allow control keys to be fetched over plain HTTP1 without TLS. Only for use in tests. insecure-keyfetch = [] diff --git a/ts_control/src/derp.rs b/ts_control/src/derp.rs index 6004d45d..036fcdb8 100644 --- a/ts_control/src/derp.rs +++ b/ts_control/src/derp.rs @@ -1,17 +1,17 @@ use alloc::collections::BTreeMap; -use ts_transport_derp::TlsValidationConfig; +use ts_derp::TlsValidationConfig; -/// The full derp state, a map of [`ts_transport_derp::RegionId`]s to [`Region`]s. -pub type Map = BTreeMap; +/// The full derp state, a map of [`ts_derp::RegionId`]s to [`Region`]s. +pub type Map = BTreeMap; -/// Convert a derp map from the [`ts_control_serde`] representation to the [`ts_transport_derp`] +/// Convert a derp map from the [`ts_control_serde`] representation to the [`ts_derp`] /// representation. pub fn convert_derp_map( derp_map: &ts_control_serde::DerpMap<'_>, -) -> impl Iterator { +) -> impl Iterator { derp_map.regions.iter().map(|(id, region)| { - let id = ts_transport_derp::RegionId((*id).into()); + let id = ts_derp::RegionId((*id).into()); let region: Region = region.into(); (id, region) @@ -22,10 +22,10 @@ pub fn convert_derp_map( #[derive(Debug, Clone, PartialEq, Eq)] pub struct Region { /// The info for this region. - pub info: ts_transport_derp::RegionInfo, + pub info: ts_derp::RegionInfo, /// Servers in this region. - pub servers: Vec, + pub servers: Vec, } impl From<&ts_control_serde::DerpRegion<'_>> for Region { @@ -37,15 +37,15 @@ impl From<&ts_control_serde::DerpRegion<'_>> for Region { } } -fn region_info(region: &ts_control_serde::DerpRegion) -> ts_transport_derp::RegionInfo { - ts_transport_derp::RegionInfo { +fn region_info(region: &ts_control_serde::DerpRegion) -> ts_derp::RegionInfo { + ts_derp::RegionInfo { name: region.name.to_string(), code: region.code.to_string(), no_measure_no_home: region.no_measure_no_home, } } -fn server(server: &ts_control_serde::DerpServer) -> ts_transport_derp::ServerConnInfo { +fn server(server: &ts_control_serde::DerpServer) -> ts_derp::ServerConnInfo { const DEFAULT_TLS_PORT: u16 = 443; let https_port = match server.derp_port { @@ -62,7 +62,7 @@ fn server(server: &ts_control_serde::DerpServer) -> ts_transport_derp::ServerCon tls_config = TlsValidationConfig::InsecureForTests; }; - ts_transport_derp::ServerConnInfo { + ts_derp::ServerConnInfo { hostname: server.hostname.to_string(), https_port, stun_port: server.stun_port.into(), @@ -77,13 +77,13 @@ fn server(server: &ts_control_serde::DerpServer) -> ts_transport_derp::ServerCon } } -fn convert_ip_usage(ip: ts_control_serde::DerpIpUsage) -> ts_transport_derp::IpUsage +fn convert_ip_usage(ip: ts_control_serde::DerpIpUsage) -> ts_derp::IpUsage where T: Copy, { match ip { - ts_control_serde::DerpIpUsage::Disable => ts_transport_derp::IpUsage::Disable, - ts_control_serde::DerpIpUsage::UseDns => ts_transport_derp::IpUsage::UseDns, - ts_control_serde::DerpIpUsage::FixedAddr(ip) => ts_transport_derp::IpUsage::FixedAddr(ip), + ts_control_serde::DerpIpUsage::Disable => ts_derp::IpUsage::Disable, + ts_control_serde::DerpIpUsage::UseDns => ts_derp::IpUsage::UseDns, + ts_control_serde::DerpIpUsage::FixedAddr(ip) => ts_derp::IpUsage::FixedAddr(ip), } } diff --git a/ts_control/src/map_request_builder.rs b/ts_control/src/map_request_builder.rs index f5ca7d6e..7c0ae494 100644 --- a/ts_control/src/map_request_builder.rs +++ b/ts_control/src/map_request_builder.rs @@ -67,7 +67,7 @@ impl<'a> MapRequestBuilder<'a> { /// Set the [`NetInfo::preferred_derp`] field (inside [`MapRequest::host_info`] -> /// [`HostInfo::net_info`]). - pub fn preferred_derp(mut self, value: ts_transport_derp::RegionId) -> Self { + pub fn preferred_derp(mut self, value: ts_derp::RegionId) -> Self { self.net_info_mut().preferred_derp = Some(value.0.into()); self } diff --git a/ts_control/src/node.rs b/ts_control/src/node.rs index 3f67bf5a..99a42fe0 100644 --- a/ts_control/src/node.rs +++ b/ts_control/src/node.rs @@ -46,7 +46,7 @@ pub struct Node { pub underlay_addresses: Vec, /// The DERP region for this node, if known. - pub derp_region: Option, + pub derp_region: Option, } impl Node { @@ -155,7 +155,7 @@ impl From<&ts_control_serde::Node<'_>> for Node { .home_derp .or(value.legacy_derp_string) .or_else(|| value.host_info.net_info.as_ref()?.preferred_derp) - .map(|x| ts_transport_derp::RegionId(x.into())), + .map(|x| ts_derp::RegionId(x.into())), } } } diff --git a/ts_control/src/tokio/client.rs b/ts_control/src/tokio/client.rs index 46144f56..8fa1dc1e 100644 --- a/ts_control/src/tokio/client.rs +++ b/ts_control/src/tokio/client.rs @@ -133,7 +133,7 @@ impl AsyncControlClient { #[tracing::instrument(skip_all, fields(map_url = %self.map_url(), %region_id), err, level = "trace")] pub async fn set_home_region<'c>( &mut self, - region_id: ts_transport_derp::RegionId, + region_id: ts_derp::RegionId, latencies: impl IntoIterator, ) -> Result<(), MapStreamError> { tracing::trace!(region = %region_id, "reporting home derp to control server"); @@ -171,7 +171,7 @@ impl AsyncControlClient { #[derive(Debug)] pub enum Command { SetDerpHomeRegion { - id: ts_transport_derp::RegionId, + id: ts_derp::RegionId, latencies: BTreeMap, }, } diff --git a/ts_dataplane/Cargo.toml b/ts_dataplane/Cargo.toml index 0bf16d15..7a35bb4b 100644 --- a/ts_dataplane/Cargo.toml +++ b/ts_dataplane/Cargo.toml @@ -28,7 +28,9 @@ ts_bart.workspace = true # Unconditionally required dependencies. etherparse = "0.19" +redb = "4.1" tracing.workspace = true +zerocopy.workspace = true # Required dependencies for the async_tokio feature. tokio = { workspace = true, optional = true, features = ["time", "sync"] } diff --git a/ts_dataplane/src/async_tokio.rs b/ts_dataplane/src/async_tokio.rs index 4cbd82ca..9ac08eea 100644 --- a/ts_dataplane/src/async_tokio.rs +++ b/ts_dataplane/src/async_tokio.rs @@ -1,14 +1,18 @@ //! The packet processing dataplane, as a tokio task. -use std::{collections::HashMap, convert::Infallible, ops::DerefMut, sync::atomic::AtomicU32}; +use std::{ + collections::HashMap, + convert::Infallible, + ops::DerefMut, + sync::{Arc, atomic::AtomicU32}, +}; use tokio::sync::{Mutex, mpsc}; -use ts_keys::NodePublicKey; use ts_packet::PacketMut; -use ts_transport::{OverlayTransportId, UnderlayTransportId}; +use ts_transport::{OverlayTransportId, PeerId, UnderlayTransportId}; use ts_tunnel::NodeKeyPair; -use crate::{EventResult, InboundResult, OutboundResult}; +use crate::{EventResult, InboundResult, OutboundResult, PeerDb}; /// Queue for packets leaving the data plane "up" into an overlay transport. pub type DataplaneToOverlay = mpsc::UnboundedSender>; @@ -17,10 +21,10 @@ pub type DataplaneToOverlay = mpsc::UnboundedSender>; pub type DataplaneFromOverlay = mpsc::UnboundedReceiver>; /// Queue for packets leaving the data plane "down" into an underlay transport. -pub type DataplaneToUnderlay = mpsc::UnboundedSender<(NodePublicKey, Vec)>; +pub type DataplaneToUnderlay = mpsc::UnboundedSender<(PeerId, Vec)>; /// Queue for packets entering the data plane "up" from an underlay transport. -pub type DataplaneFromUnderlay = mpsc::UnboundedReceiver<(NodePublicKey, Vec)>; +pub type DataplaneFromUnderlay = mpsc::UnboundedReceiver<(PeerId, Vec)>; // TODO: wire in overlay/underlay transport traits @@ -61,11 +65,11 @@ impl DataPlane { /// /// The caller must configure overlay/underlay output queues for the data plane to be useful, /// otherwise all it can do is drop packets. - pub fn new(my_key: NodeKeyPair) -> Self { + pub fn new(my_key: NodeKeyPair, peer_db: Arc) -> Self { let (overlay_up, overlay_down) = mpsc::unbounded_channel(); let (underlay_down, underlay_up) = mpsc::unbounded_channel(); - let sync = crate::DataPlane::new(my_key); + let sync = crate::DataPlane::new(my_key, peer_db); Self { underlay_down, @@ -147,7 +151,7 @@ impl DataPlane { pub async fn step(&self) { enum SelectResult { OverlayDown(Vec), - UnderlayUp(NodePublicKey, Vec), + UnderlayUp(PeerId, Vec), TransportsChanged, Event, } @@ -186,10 +190,10 @@ impl DataPlane { } underlay_pkts = underlay_up.recv() => { - let (node_key, underlay_pkts) = underlay_pkts.unwrap(); - tracing::trace!(%node_key, n_underlay_pkts = underlay_pkts.len()); + let (peer_id, underlay_pkts) = underlay_pkts.unwrap(); + tracing::trace!(%peer_id, n_underlay_pkts = underlay_pkts.len()); - SelectResult::UnderlayUp(node_key, underlay_pkts) + SelectResult::UnderlayUp(peer_id, underlay_pkts) } _ = self.transports_changed.notified() => { @@ -215,14 +219,7 @@ impl DataPlane { (Some(to_peers), Some(loopback)) } - SelectResult::UnderlayUp(node_key, underlay_up) => { - if core.sync.wireguard.peer_id(node_key).is_none() { - core.sync.wireguard.add_peer(ts_tunnel::PeerConfig { - key: node_key, - psk: [0u8; 32].into(), - }); - } - + SelectResult::UnderlayUp(_peer_id, underlay_up) => { let InboundResult { to_local, to_peers } = core.sync.process_inbound(underlay_up); (Some(to_peers), Some(to_local)) @@ -265,13 +262,13 @@ async fn write_to_overlay(slf: &CoreState, packets: HashMap)>, + packets: impl IntoIterator)>, ) { - for ((tid, node_key), packets) in packets { - tracing::trace!(underlay_id = ?tid, %node_key, n_packets = packets.len()); + for ((tid, peer_id), packets) in packets { + tracing::trace!(underlay_id = ?tid, %peer_id, n_packets = packets.len()); if let Some(queue) = slf.underlay_transports.get(&tid) { - queue.send((node_key, packets)).unwrap(); + queue.send((peer_id, packets)).unwrap(); } } } diff --git a/ts_dataplane/src/lib.rs b/ts_dataplane/src/lib.rs index e0b20509..3c245f66 100644 --- a/ts_dataplane/src/lib.rs +++ b/ts_dataplane/src/lib.rs @@ -1,19 +1,21 @@ #![doc = include_str!("../README.md")] -pub mod async_tokio; - use std::{collections::HashMap, sync::Arc, time::Instant}; use ts_bart::RoutingTable; -use ts_keys::NodePublicKey; use ts_overlay_router as or; use ts_packet::PacketMut; use ts_packetfilter::{FilterExt, IpProto}; use ts_time::{Handle, Scheduler}; -use ts_transport::{OverlayTransportId, UnderlayTransportId}; -use ts_tunnel::{Endpoint, NodeKeyPair, PeerConfig}; +use ts_transport::{OverlayTransportId, PeerId, UnderlayTransportId}; +use ts_tunnel::{Endpoint, NodeKeyPair}; use ts_underlay_router as ur; +pub mod async_tokio; +mod peer_map; + +pub use peer_map::{PeerDb, PeerInfo}; + /// A data plane subsystem that can be the subject of timer events. pub enum Subsystem { /// The wireguard component. @@ -25,13 +27,16 @@ pub struct DataPlane { /// Wireguard encryption/decryption. pub wireguard: Endpoint, + /// Mapping from [`PeerId`] to public key and wireguard id. + pub peer_db: Arc, + /// Outbound overlay router. pub or_out: or::outbound::Router, /// Outbound underlay router. pub ur_out: ur::outbound::Router, /// Inbound source filter. - pub src_filter_in: Arc>, + pub src_filter_in: Arc>, /// Inbound overlay router. pub or_in: or::inbound::Router, @@ -47,9 +52,10 @@ pub struct DataPlane { impl DataPlane { /// Creates a new data plane for a wireguard node key. - pub fn new(my_key: NodeKeyPair) -> Self { + pub fn new(my_key: NodeKeyPair, peer_db: Arc) -> Self { DataPlane { wireguard: Endpoint::new(my_key), + peer_db, or_out: Default::default(), ur_out: Default::default(), src_filter_in: Default::default(), @@ -70,19 +76,20 @@ impl DataPlane { let to_wireguard = to_wireguard .into_iter() - .map(|(k, v)| { - let id = self + .filter_map(|(k, v)| { + let info = self.peer_db.get_by_id(k)?; + + // unwrap: either we add the peer or it's in the map, no failure case + let wg_id = self .wireguard - .peer_id(k) - .or_else(|| { - self.wireguard.add_peer(PeerConfig { - key: k, - psk: [0u8; 32].into(), - }) + .add_peer(ts_tunnel::PeerConfig { + key: info.node_key, + psk: [0u8; 32].into(), }) + .or_else(|| self.wireguard.peer_id(info.node_key)) .unwrap(); - (id, v) + Some((wg_id, v)) }) .collect::>(); @@ -90,11 +97,12 @@ impl DataPlane { to_peers: encrypted, } = self.wireguard.send(to_wireguard); - let to_peers = self.ur_out.route( - encrypted - .into_iter() - .filter_map(|(k, v)| Some((self.wireguard.peer_key(k)?, v))), - ); + let to_peers = self + .ur_out + .route(encrypted.into_iter().filter_map(|(k, v)| { + let info = self.get_wg(k)?; + Some((info.peer_id, v)) + })); if let Some(next) = self.wireguard.next_event() && let Some(prev) = self @@ -116,23 +124,31 @@ impl DataPlane { let to_local = to_local .into_iter() - .map(|(peer_id, mut packets)| { - let span = tracing::trace_span!("src_filter_inbound", peer_id = ?peer_id, n_packet = packets.len(), peer_key = tracing::field::Empty).entered(); - - let Some(key) = self.wireguard.peer_key(peer_id) else { + .map(|(peer_id, mut packets)| -> Vec { + let span = tracing::trace_span!( + "src_filter_inbound", + wg_peer_id = ?peer_id, + peer_id = tracing::field::Empty, + n_packet = packets.len(), + peer_key = tracing::field::Empty, + ) + .entered(); + + let Some(info) = self.get_wg(peer_id) else { tracing::warn!("no nodekey for peer"); - return (peer_id, vec![]); + return vec![]; }; - span.record("peer_key", tracing::field::display(key)); + span.record("peer_key", tracing::field::display(&info.node_key)); + // TODO: span.record("peer_id", info.peer_id); packets.retain(|packet| { let Some(src) = packet.get_src_addr() else { tracing::trace!("does not look like ip packet"); return false; }; - let verdict = if let Some(allowed_key) = self.src_filter_in.lookup(src) { - *allowed_key == key + let verdict = if let Some(allowed_peer) = self.src_filter_in.lookup(src) { + *allowed_peer == info.peer_id } else { false }; @@ -140,17 +156,11 @@ impl DataPlane { verdict }); - (peer_id, packets) + packets }) - .map(|(k, mut v)| { - let span = tracing::trace_span!("packet_filter_inbound", peer_id = ?k, n_packet = v.len(), peer_key = tracing::field::Empty).entered(); - - let Some(key) = self.wireguard.peer_key(k) else { - tracing::warn!("no nodekey for peer"); - return (k, vec![]); - }; - - span.record("peer_key", tracing::field::display(key)); + .map(|mut v| { + let _span = + tracing::trace_span!("packet_filter_inbound", n_packet = v.len()).entered(); v.retain(|pkt| { let Ok(pkt) = etherparse::SlicedPacket::from_ip(pkt.as_ref()) else { @@ -159,12 +169,16 @@ impl DataPlane { }; let (proto, src, dst) = match pkt.net { - Some(etherparse::NetSlice::Ipv4(ipv4)) => { - (IpProto::new(ipv4.payload().ip_number.0 as _), ipv4.header().source_addr().into(), ipv4.header().destination_addr().into()) - } - Some(etherparse::NetSlice::Ipv6(ipv6)) => { - (IpProto::new(ipv6.payload().ip_number.0 as _), ipv6.header().source_addr().into(), ipv6.header().destination_addr().into()) - } + Some(etherparse::NetSlice::Ipv4(ipv4)) => ( + IpProto::new(ipv4.payload().ip_number.0 as _), + ipv4.header().source_addr().into(), + ipv4.header().destination_addr().into(), + ), + Some(etherparse::NetSlice::Ipv6(ipv6)) => ( + IpProto::new(ipv6.payload().ip_number.0 as _), + ipv6.header().source_addr().into(), + ipv6.header().destination_addr().into(), + ), _ => { unreachable!("unexpected packet kind"); } @@ -189,24 +203,22 @@ impl DataPlane { // TODO(npry): wire in nodecaps let caps = []; - let verdict = self.packet_filter - .can_access(&info, caps); + let verdict = self.packet_filter.can_access(&info, caps); tracing::trace!(?info, ?caps, verdict); verdict }); - (k, v) + v }); - let to_peers = to_peers - .into_iter() - .filter_map(|(k, v)| Some((self.wireguard.peer_key(k)?, v))); + let to_peers = to_peers.into_iter().filter_map(|(k, v)| { + let info = self.get_wg(k)?; + Some((info.peer_id, v)) + }); - let to_local = self - .or_in - .route(to_local.flat_map(|(_id, packets)| packets)); + let to_local = self.or_in.route(to_local.flatten()); let to_peers = self.ur_out.route(to_peers); if let Some(next) = self.wireguard.next_event() @@ -241,11 +253,10 @@ impl DataPlane { match event { Subsystem::Wireguard => { let res = self.wireguard.dispatch_events(now); - to_peers.extend( - res.to_peers - .into_iter() - .filter_map(|(id, pkts)| Some((self.wireguard.peer_key(id)?, pkts))), - ); + to_peers.extend(res.to_peers.into_iter().filter_map(|(id, pkts)| { + let info = self.get_wg(id)?; + Some((info.peer_id, pkts)) + })); } } } @@ -261,12 +272,20 @@ impl DataPlane { EventResult { to_peers } } + + fn get_wg(&self, wg: ts_tunnel::PeerId) -> Option { + // unwrap: the peer must have just been in the map, it still must be + let key = self.wireguard.peer_key(wg)?; + let info = self.peer_db.get_or_insert(&key); + + Some(info) + } } /// The result of processing outbound packets. pub struct OutboundResult { /// Packets to be sent into underlay transports for transmission. - pub to_peers: HashMap<(UnderlayTransportId, NodePublicKey), Vec>, + pub to_peers: HashMap<(UnderlayTransportId, PeerId), Vec>, /// Packets to be looped back and delivered to overlay transports. pub loopback: HashMap>, } @@ -276,12 +295,12 @@ pub struct InboundResult { /// Decrypted packets to be delivered to overlay transports. pub to_local: HashMap>, /// Encrypted packets to be sent to wireguard peers by the underlay. - pub to_peers: HashMap<(UnderlayTransportId, NodePublicKey), Vec>, + pub to_peers: HashMap<(UnderlayTransportId, PeerId), Vec>, } /// The result of processing an event. #[derive(Default)] pub struct EventResult { /// Encrypted packets to be sent to wireguard peers by the underlay. - pub to_peers: HashMap<(UnderlayTransportId, NodePublicKey), Vec>, + pub to_peers: HashMap<(UnderlayTransportId, PeerId), Vec>, } diff --git a/ts_dataplane/src/peer_map.rs b/ts_dataplane/src/peer_map.rs new file mode 100644 index 00000000..06833c98 --- /dev/null +++ b/ts_dataplane/src/peer_map.rs @@ -0,0 +1,172 @@ +use std::sync::atomic::{AtomicU32, Ordering}; + +use redb::{ReadableDatabase, ReadableTable, TypeName}; +use ts_keys::NodePublicKey; +use zerocopy::{FromBytes, IntoBytes}; + +/// Info about a Tailscale peer. +#[derive( + Debug, + Clone, + Copy, + PartialEq, + Eq, + zerocopy::KnownLayout, + zerocopy::FromBytes, + zerocopy::IntoBytes, + zerocopy::Immutable, +)] +pub struct PeerInfo { + /// The peer's id. + pub peer_id: ts_transport::PeerId, + /// The peer's node key. + pub node_key: NodePublicKey, +} + +impl redb::Value for &PeerInfo { + type SelfType<'a> + = &'a PeerInfo + where + Self: 'a; + + type AsBytes<'a> + = &'a [u8] + where + Self: 'a; + + fn fixed_width() -> Option { + Some(size_of::()) + } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + PeerInfo::ref_from_bytes(data).unwrap() + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'b, + { + value.as_bytes() + } + + fn type_name() -> TypeName { + TypeName::new("PeerInfo") + } +} + +type PeerId = u32; +type Nodekey<'a> = &'a [u8]; + +/// A database that maps [`ts_transport::PeerId`] to [`NodePublicKey`] and vice versa. +pub struct PeerDb { + db: redb::Database, + next_id: AtomicU32, +} + +impl Default for PeerDb { + fn default() -> Self { + let db = redb::Database::builder() + .create_with_backend(redb::backends::InMemoryBackend::new()) + .unwrap(); + + Self { + db, + next_id: AtomicU32::new(0), + } + } +} + +pub const TABLE_PEERS: redb::TableDefinition = + redb::TableDefinition::new("peers"); +pub const TABLE_NODEKEY: redb::TableDefinition = + redb::TableDefinition::new("peers_nodekey"); + +// NOTE(npry): the functions here wrap inner helpers that return Result, while the +// outer function returns T by unwrapping. This is because any error redb may throw while in memory +// storage mode is panic-worthy: it is unrecoverable for our use-case if we have corruption or +// transaction contention in-memory. The inner function just allows a more concise expression of the +// bubble-up error semantics than sprinkling unwraps everywhere (and #![feature(try_blocks)] is +// stuck forever in nightly). + +impl PeerDb { + /// Get a [`PeerInfo`] by its [`NodePublicKey`], allocating a new entry if required. + pub fn get_or_insert(&self, node_key: &NodePublicKey) -> PeerInfo { + fn _get_or_insert(slf: &PeerDb, node_key: &NodePublicKey) -> Result { + let txn = slf.db.begin_write()?; + + let info = { + let mut nodekey = txn.open_table(TABLE_NODEKEY)?; + let mut peers = txn.open_table(TABLE_PEERS)?; + + if let Some(x) = nodekey.get(node_key.as_bytes())? { + // invariant: always an entry in both dbs + let info = peers.get(x.value())?.unwrap(); + return Ok(*info.value()); + } + + let id = slf.next_id.fetch_add(1, Ordering::Relaxed); + + let info = PeerInfo { + node_key: *node_key, + peer_id: ts_transport::PeerId(id), + }; + peers.insert(id, &info)?; + nodekey.insert(node_key.as_bytes(), id)?; + + info + }; + + txn.commit()?; + + Ok(info) + } + + _get_or_insert(self, node_key).unwrap() + } + + /// Remove a peer by [`NodePublicKey`]. + pub fn remove(&self, node_key: &NodePublicKey) { + fn _remove(slf: &PeerDb, node_key: &NodePublicKey) -> Result<(), redb::Error> { + let txn = slf.db.begin_write()?; + + { + let mut nodekey = txn.open_table(TABLE_NODEKEY)?; + let mut peers = txn.open_table(TABLE_PEERS)?; + + let Some(id) = nodekey.remove(node_key.as_bytes())? else { + return Ok(()); + }; + + peers.remove(id.value())?; + } + + txn.commit()?; + + Ok(()) + } + + _remove(self, node_key).unwrap() + } + + /// Get a [`PeerInfo`] by its [`ts_transport::PeerId`]. + pub fn get_by_id(&self, transport_id: ts_transport::PeerId) -> Option { + fn _get_by_id( + slf: &PeerDb, + transport_id: ts_transport::PeerId, + ) -> Result, redb::Error> { + let txn = slf.db.begin_read()?; + + let peers = txn.open_table(TABLE_PEERS)?; + let Some(result) = peers.get(transport_id.0)? else { + return Ok(None); + }; + + Ok(Some(*result.value())) + } + + _get_by_id(self, transport_id).unwrap() + } +} diff --git a/ts_transport_derp/Cargo.toml b/ts_derp/Cargo.toml similarity index 97% rename from ts_transport_derp/Cargo.toml rename to ts_derp/Cargo.toml index 9cead130..fe56268c 100644 --- a/ts_transport_derp/Cargo.toml +++ b/ts_derp/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "ts_transport_derp" +name = "ts_derp" version.workspace = true description = "tailscale derp client" categories = ["network-programming", "encoding", "asynchronous"] diff --git a/ts_transport_derp/README.md b/ts_derp/README.md similarity index 64% rename from ts_transport_derp/README.md rename to ts_derp/README.md index ab18af18..90db7697 100644 --- a/ts_transport_derp/README.md +++ b/ts_derp/README.md @@ -1,3 +1,3 @@ -# ts_transport_derp +# ts_derp Tailscale derp protocol and client. diff --git a/ts_transport_derp/examples/common/mod.rs b/ts_derp/examples/common/mod.rs similarity index 80% rename from ts_transport_derp/examples/common/mod.rs rename to ts_derp/examples/common/mod.rs index dcfa7577..f97a3277 100644 --- a/ts_transport_derp/examples/common/mod.rs +++ b/ts_derp/examples/common/mod.rs @@ -1,8 +1,8 @@ -//! Common code used by multiple `ts_transport_derp` examples. +//! Common code used by multiple `ts_derp` examples. use std::{collections::BTreeMap, num::NonZeroU32}; -use ts_transport_derp::{RegionId, ServerConnInfo, TlsValidationConfig}; +use ts_derp::{RegionId, ServerConnInfo, TlsValidationConfig}; /// ID of DERP Region #1, which is New York City. pub const REGION_1: RegionId = RegionId(NonZeroU32::new(1).unwrap()); @@ -49,14 +49,14 @@ pub async fn load_derp_map() -> BTreeMap> { .collect() } -fn convert_ip_usage(ip: ts_control_serde::DerpIpUsage) -> ts_transport_derp::IpUsage +fn convert_ip_usage(ip: ts_control_serde::DerpIpUsage) -> ts_derp::IpUsage where T: Copy, { match ip { - ts_control_serde::DerpIpUsage::Disable => ts_transport_derp::IpUsage::Disable, - ts_control_serde::DerpIpUsage::UseDns => ts_transport_derp::IpUsage::UseDns, - ts_control_serde::DerpIpUsage::FixedAddr(ip) => ts_transport_derp::IpUsage::FixedAddr(ip), + ts_control_serde::DerpIpUsage::Disable => ts_derp::IpUsage::Disable, + ts_control_serde::DerpIpUsage::UseDns => ts_derp::IpUsage::UseDns, + ts_control_serde::DerpIpUsage::FixedAddr(ip) => ts_derp::IpUsage::FixedAddr(ip), } } diff --git a/ts_transport_derp/examples/listen.rs b/ts_derp/examples/listen.rs similarity index 52% rename from ts_transport_derp/examples/listen.rs rename to ts_derp/examples/listen.rs index 19adc342..f88a0d40 100644 --- a/ts_transport_derp/examples/listen.rs +++ b/ts_derp/examples/listen.rs @@ -3,7 +3,6 @@ //! Intended to test ping/pong/keepalive. use ts_keys::NodeKeyPair; -use ts_transport::UnderlayTransport; mod common; @@ -16,19 +15,16 @@ async fn main() -> ts_cli_util::Result<()> { let keypair = NodeKeyPair::new(); - let client = ts_transport_derp::Client::connect(region, &keypair).await?; + let client = ts_derp::Client::connect(region, &keypair).await?; tracing::info!("derp handshake done"); loop { - for result in client.recv().await { - match result { - Ok((peer, pkts)) => { - let pkts = pkts.into_iter().collect::>(); - tracing::info!(?peer, ?pkts); - } - Err(e) => { - tracing::error!(err = %e, "recv"); - } + match client.recv_one().await { + Ok((peer, pkt)) => { + tracing::info!(?peer, ?pkt); + } + Err(e) => { + tracing::error!(err = %e, "recv"); } } } diff --git a/ts_transport_derp/examples/ping.rs b/ts_derp/examples/ping.rs similarity index 78% rename from ts_transport_derp/examples/ping.rs rename to ts_derp/examples/ping.rs index dbd9165c..a52be949 100644 --- a/ts_transport_derp/examples/ping.rs +++ b/ts_derp/examples/ping.rs @@ -4,7 +4,6 @@ use std::{sync::Arc, time::Duration}; use tokio::task::JoinSet; use ts_keys::NodeKeyPair; -use ts_transport::UnderlayTransport; mod common; @@ -17,7 +16,7 @@ async fn main() -> ts_cli_util::Result<()> { let keypair = NodeKeyPair::new(); - let client = ts_transport_derp::Client::connect(region, &keypair).await?; + let client = ts_derp::Client::connect(region, &keypair).await?; tracing::info!("derp handshake done"); let client = Arc::new(client); @@ -29,7 +28,7 @@ async fn main() -> ts_cli_util::Result<()> { let mut ticker = tokio::time::interval(Duration::from_secs(1)); loop { - if let Err(e) = pinger.send([(keypair.public, vec![vec![1].into()])]).await { + if let Err(e) = pinger.send_one(keypair.public, &[1]).await { tracing::error!(err = %e, "ping"); } else { tracing::info!("ping"); @@ -43,8 +42,8 @@ async fn main() -> ts_cli_util::Result<()> { js.spawn(async move { loop { match recv.recv_one().await { - Ok((pkt, peer)) => { - tracing::info!(?pkt, ?peer, "pong"); + Ok((peer_key, pkt)) => { + tracing::info!(?pkt, %peer_key, "pong"); } Err(e) => { tracing::error!(err = %e, "recv"); diff --git a/ts_transport_derp/src/async_tokio.rs b/ts_derp/src/client.rs similarity index 89% rename from ts_transport_derp/src/async_tokio.rs rename to ts_derp/src/client.rs index f0570271..b561e076 100644 --- a/ts_transport_derp/src/async_tokio.rs +++ b/ts_derp/src/client.rs @@ -10,12 +10,12 @@ use tokio_util::codec::{FramedRead, FramedWrite}; use ts_http_util::Client as _; use ts_keys::{NodeKeyPair, NodePublicKey}; use ts_packet::PacketMut; -use ts_transport::UnderlayTransport; use url::Url; use crate::{ - Error, ServerConnInfo, frame, + Error, ServerConnInfo, Transport, frame, frame::{ClientInfo, FrameType, PeerGone, Ping, RawFrame, ServerInfo, ServerKey}, + transport::NodekeyTransport, }; type DefaultIo = ts_http_util::Upgraded; @@ -23,7 +23,7 @@ type DefaultIo = ts_http_util::Upgraded; /// Type alias for the default derp client over upgraded HTTP on a tokio executor. pub type DefaultClient = Client; -/// Asynchronous DERP transport for a single DERP region. +/// Single-region DERP client. pub struct Client { read_conn: Mutex, frame::Codec>>, write_conn: Mutex, frame::Codec>>, @@ -58,6 +58,12 @@ impl Client where Io: AsyncRead + AsyncWrite, { + /// Convert this derp client into a [`ts_transport::UnderlayTransport`] given the + /// specified `Lookup`. + pub fn into_transport(self, lookup: Lookup) -> Transport { + Transport::new(self, lookup) + } + /// Perform a derp handshake over the given transport and return a [`Client`]. #[tracing::instrument(skip_all)] pub async fn handshake(conn: Io, node_keypair: &NodeKeyPair) -> Result { @@ -117,6 +123,12 @@ where }) } + /// Send a message to a nodekey on the derp server. + pub async fn send_one(&self, node_key: NodePublicKey, msg: &[u8]) -> Result<(), Error> { + self.send_frame_with_extra(&frame::SendPacket { dest: node_key }, msg) + .await + } + /// Send a frame to the derp server. pub async fn send_frame( &self, @@ -191,6 +203,7 @@ where } FrameType::RecvPacket => { let (recv, payload) = frame.as_type::().unwrap(); + return Ok((recv.src, payload.into())); } t => { @@ -274,36 +287,15 @@ fn decrypt_server_info( Ok(sip) } -impl UnderlayTransport for Client +impl NodekeyTransport for Client where Io: AsyncRead + AsyncWrite + Send, { - type Error = Error; - - #[tracing::instrument(fields(%self))] - async fn recv( - &self, - ) -> impl IntoIterator< - Item = Result<(NodePublicKey, impl IntoIterator), Self::Error>, - > { - [self.recv_one().await.map(|(k, pkt)| (k, [pkt]))] + async fn send_one(&self, node_key: NodePublicKey, body: &[u8]) -> Result<(), Error> { + self.send_one(node_key, body).await } - /// Send a batch of packets to a peer via this DERP server. - async fn send(&self, peer_packets: BatchIter) -> Result<(), Self::Error> - where - BatchIter: IntoIterator + Send, - BatchIter::IntoIter: Send, - PacketIter: IntoIterator + Send, - PacketIter::IntoIter: Send, - { - for (peer, packets) in peer_packets { - for packet in packets { - self.send_frame_with_extra(&frame::SendPacket { dest: peer }, packet.as_ref()) - .await?; - } - } - - Ok(()) + async fn recv_one(&self) -> Result<(NodePublicKey, PacketMut), Error> { + self.recv_one().await } } diff --git a/ts_transport_derp/src/dial.rs b/ts_derp/src/dial.rs similarity index 100% rename from ts_transport_derp/src/dial.rs rename to ts_derp/src/dial.rs diff --git a/ts_transport_derp/src/error.rs b/ts_derp/src/error.rs similarity index 100% rename from ts_transport_derp/src/error.rs rename to ts_derp/src/error.rs diff --git a/ts_transport_derp/src/frame/body/client_info.rs b/ts_derp/src/frame/body/client_info.rs similarity index 100% rename from ts_transport_derp/src/frame/body/client_info.rs rename to ts_derp/src/frame/body/client_info.rs diff --git a/ts_transport_derp/src/frame/body/close_peer.rs b/ts_derp/src/frame/body/close_peer.rs similarity index 100% rename from ts_transport_derp/src/frame/body/close_peer.rs rename to ts_derp/src/frame/body/close_peer.rs diff --git a/ts_transport_derp/src/frame/body/forward_packet.rs b/ts_derp/src/frame/body/forward_packet.rs similarity index 100% rename from ts_transport_derp/src/frame/body/forward_packet.rs rename to ts_derp/src/frame/body/forward_packet.rs diff --git a/ts_transport_derp/src/frame/body/health.rs b/ts_derp/src/frame/body/health.rs similarity index 100% rename from ts_transport_derp/src/frame/body/health.rs rename to ts_derp/src/frame/body/health.rs diff --git a/ts_transport_derp/src/frame/body/keep_alive.rs b/ts_derp/src/frame/body/keep_alive.rs similarity index 100% rename from ts_transport_derp/src/frame/body/keep_alive.rs rename to ts_derp/src/frame/body/keep_alive.rs diff --git a/ts_transport_derp/src/frame/body/mod.rs b/ts_derp/src/frame/body/mod.rs similarity index 100% rename from ts_transport_derp/src/frame/body/mod.rs rename to ts_derp/src/frame/body/mod.rs diff --git a/ts_transport_derp/src/frame/body/note_preferred.rs b/ts_derp/src/frame/body/note_preferred.rs similarity index 100% rename from ts_transport_derp/src/frame/body/note_preferred.rs rename to ts_derp/src/frame/body/note_preferred.rs diff --git a/ts_transport_derp/src/frame/body/peer_gone.rs b/ts_derp/src/frame/body/peer_gone.rs similarity index 100% rename from ts_transport_derp/src/frame/body/peer_gone.rs rename to ts_derp/src/frame/body/peer_gone.rs diff --git a/ts_transport_derp/src/frame/body/peer_present.rs b/ts_derp/src/frame/body/peer_present.rs similarity index 100% rename from ts_transport_derp/src/frame/body/peer_present.rs rename to ts_derp/src/frame/body/peer_present.rs diff --git a/ts_transport_derp/src/frame/body/ping.rs b/ts_derp/src/frame/body/ping.rs similarity index 100% rename from ts_transport_derp/src/frame/body/ping.rs rename to ts_derp/src/frame/body/ping.rs diff --git a/ts_transport_derp/src/frame/body/pong.rs b/ts_derp/src/frame/body/pong.rs similarity index 100% rename from ts_transport_derp/src/frame/body/pong.rs rename to ts_derp/src/frame/body/pong.rs diff --git a/ts_transport_derp/src/frame/body/recv_packet.rs b/ts_derp/src/frame/body/recv_packet.rs similarity index 100% rename from ts_transport_derp/src/frame/body/recv_packet.rs rename to ts_derp/src/frame/body/recv_packet.rs diff --git a/ts_transport_derp/src/frame/body/restarting.rs b/ts_derp/src/frame/body/restarting.rs similarity index 100% rename from ts_transport_derp/src/frame/body/restarting.rs rename to ts_derp/src/frame/body/restarting.rs diff --git a/ts_transport_derp/src/frame/body/send_packet.rs b/ts_derp/src/frame/body/send_packet.rs similarity index 100% rename from ts_transport_derp/src/frame/body/send_packet.rs rename to ts_derp/src/frame/body/send_packet.rs diff --git a/ts_transport_derp/src/frame/body/server_info.rs b/ts_derp/src/frame/body/server_info.rs similarity index 100% rename from ts_transport_derp/src/frame/body/server_info.rs rename to ts_derp/src/frame/body/server_info.rs diff --git a/ts_transport_derp/src/frame/body/server_key.rs b/ts_derp/src/frame/body/server_key.rs similarity index 100% rename from ts_transport_derp/src/frame/body/server_key.rs rename to ts_derp/src/frame/body/server_key.rs diff --git a/ts_transport_derp/src/frame/body/watch_conns.rs b/ts_derp/src/frame/body/watch_conns.rs similarity index 100% rename from ts_transport_derp/src/frame/body/watch_conns.rs rename to ts_derp/src/frame/body/watch_conns.rs diff --git a/ts_transport_derp/src/frame/codec.rs b/ts_derp/src/frame/codec.rs similarity index 100% rename from ts_transport_derp/src/frame/codec.rs rename to ts_derp/src/frame/codec.rs diff --git a/ts_transport_derp/src/frame/error.rs b/ts_derp/src/frame/error.rs similarity index 100% rename from ts_transport_derp/src/frame/error.rs rename to ts_derp/src/frame/error.rs diff --git a/ts_transport_derp/src/frame/frame_type.rs b/ts_derp/src/frame/frame_type.rs similarity index 100% rename from ts_transport_derp/src/frame/frame_type.rs rename to ts_derp/src/frame/frame_type.rs diff --git a/ts_transport_derp/src/frame/header.rs b/ts_derp/src/frame/header.rs similarity index 100% rename from ts_transport_derp/src/frame/header.rs rename to ts_derp/src/frame/header.rs diff --git a/ts_transport_derp/src/frame/magic.rs b/ts_derp/src/frame/magic.rs similarity index 100% rename from ts_transport_derp/src/frame/magic.rs rename to ts_derp/src/frame/magic.rs diff --git a/ts_transport_derp/src/frame/mod.rs b/ts_derp/src/frame/mod.rs similarity index 100% rename from ts_transport_derp/src/frame/mod.rs rename to ts_derp/src/frame/mod.rs diff --git a/ts_transport_derp/src/frame/raw.rs b/ts_derp/src/frame/raw.rs similarity index 100% rename from ts_transport_derp/src/frame/raw.rs rename to ts_derp/src/frame/raw.rs diff --git a/ts_transport_derp/src/lib.rs b/ts_derp/src/lib.rs similarity index 97% rename from ts_transport_derp/src/lib.rs rename to ts_derp/src/lib.rs index d58fe004..baa4999d 100644 --- a/ts_transport_derp/src/lib.rs +++ b/ts_derp/src/lib.rs @@ -8,13 +8,17 @@ use core::{ use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout}; -mod async_tokio; +mod client; pub mod dial; mod error; pub mod frame; +mod peer_lookup; +mod transport; -pub use async_tokio::{Client, DefaultClient}; +pub use client::{Client, DefaultClient}; pub use error::Error; +pub use peer_lookup::{DummyStaticLookup, PeerLookup}; +pub use transport::Transport; /// A 24-byte nonce for symmetric encryption with ChaCha20Poly1305. #[repr(C)] diff --git a/ts_derp/src/peer_lookup.rs b/ts_derp/src/peer_lookup.rs new file mode 100644 index 00000000..716deb93 --- /dev/null +++ b/ts_derp/src/peer_lookup.rs @@ -0,0 +1,97 @@ +use std::sync::{Arc, Mutex}; + +/// Trait providing conversion between [`ts_keys::NodePublicKey`] (required to send and +/// receive derp messages) and [`ts_transport::PeerId`] (tailscale-rs internal type). +pub trait PeerLookup: Send + Sync { + /// Convert `key` to a [`ts_transport::PeerId`], allocating a new id if the peer doesn't + /// exist yet. + fn key_to_id(&self, key: &ts_keys::NodePublicKey) -> ts_transport::PeerId; + + /// Convert the `id` to a [`ts_keys::NodePublicKey`]. + /// + /// Returns `None` if the `id` is not stored. + fn id_to_key(&self, id: ts_transport::PeerId) -> Option; +} + +impl PeerLookup for Arc +where + T: PeerLookup, +{ + fn key_to_id(&self, key: &ts_keys::NodePublicKey) -> ts_transport::PeerId { + self.as_ref().key_to_id(key) + } + + fn id_to_key(&self, id: ts_transport::PeerId) -> Option { + self.as_ref().id_to_key(id) + } +} + +impl PeerLookup for Box +where + T: PeerLookup, +{ + fn key_to_id(&self, key: &ts_keys::NodePublicKey) -> ts_transport::PeerId { + self.as_ref().key_to_id(key) + } + + fn id_to_key(&self, id: ts_transport::PeerId) -> Option { + self.as_ref().id_to_key(id) + } +} + +impl PeerLookup for &T +where + T: PeerLookup, +{ + fn key_to_id(&self, key: &ts_keys::NodePublicKey) -> ts_transport::PeerId { + (*self).key_to_id(key) + } + + fn id_to_key(&self, id: ts_transport::PeerId) -> Option { + (*self).id_to_key(id) + } +} + +impl PeerLookup for &mut T +where + T: PeerLookup, +{ + fn key_to_id(&self, key: &ts_keys::NodePublicKey) -> ts_transport::PeerId { + (**self).key_to_id(key) + } + + fn id_to_key(&self, id: ts_transport::PeerId) -> Option { + (**self).id_to_key(id) + } +} + +/// Dummy implementation of [`PeerLookup`] wrapping a [`Vec`], suitable for tests and +/// examples. +/// +/// Peer entries are never removed from the inner `Vec`. +#[doc(hidden)] +#[derive(Default)] +pub struct DummyStaticLookup(Mutex>); + +impl PeerLookup for DummyStaticLookup { + fn key_to_id(&self, key: &ts_keys::NodePublicKey) -> ts_transport::PeerId { + let mut mp = self.0.lock().unwrap(); + + if let Some((id, _)) = mp.iter().enumerate().find(|(_id, k)| *k == key) { + return ts_transport::PeerId(id as _); + } + + let id = mp.len(); + mp.push(*key); + + ts_transport::PeerId(id as _) + } + + fn id_to_key( + &self, + ts_transport::PeerId(id): ts_transport::PeerId, + ) -> Option { + let mp = self.0.lock().unwrap(); + mp.get(id as usize).cloned() + } +} diff --git a/ts_derp/src/transport.rs b/ts_derp/src/transport.rs new file mode 100644 index 00000000..872d4a0c --- /dev/null +++ b/ts_derp/src/transport.rs @@ -0,0 +1,81 @@ +use ts_keys::NodePublicKey; +use ts_packet::PacketMut; +use ts_transport::{PeerId, UnderlayTransport}; + +use crate::{Error, PeerLookup}; + +pub trait NodekeyTransport: Send + Sync { + /// Send a message addressed to the given node key. + fn send_one( + &self, + node_key: NodePublicKey, + body: &[u8], + ) -> impl Future> + Send; + + /// Receive a frame from a particular node key. + fn recv_one(&self) -> impl Future> + Send; +} + +/// An implementation of [`UnderlayTransport`] wrapping a [`NodekeyTransport`] with a +/// [`PeerLookup`]. +pub struct Transport { + inner: NkT, + peer_lookup: Lookup, +} + +impl Transport { + /// Construct a new [`Transport`] with the given [`NodekeyTransport`] and [`PeerLookup`]. + pub fn new(client: NkT, lookup: Lookup) -> Self { + Self { + inner: client, + peer_lookup: lookup, + } + } + + /// Destruct this [`Transport`] into its constituent parts. + pub fn into_parts(self) -> (NkT, Lookup) { + (self.inner, self.peer_lookup) + } +} + +impl UnderlayTransport for Transport +where + NkT: NodekeyTransport, + Lookup: PeerLookup, +{ + type Error = Error; + + async fn recv( + &self, + ) -> impl IntoIterator), Self::Error>> + { + let result = self.inner.recv_one().await.map(|(k, pkt)| { + let id = self.peer_lookup.key_to_id(&k); + (id, [pkt]) + }); + + [result] + } + + /// Send a batch of packets to a peer via this DERP server. + async fn send(&self, peer_packets: BatchIter) -> Result<(), Self::Error> + where + BatchIter: IntoIterator + Send, + BatchIter::IntoIter: Send, + PacketIter: IntoIterator + Send, + PacketIter::IntoIter: Send, + { + for (peer, packets) in peer_packets { + let Some(node_key) = self.peer_lookup.id_to_key(peer) else { + tracing::warn!(peer_id = %peer, "no node key known for peer"); + continue; + }; + + for packet in packets { + self.inner.send_one(node_key, packet.as_ref()).await?; + } + } + + Ok(()) + } +} diff --git a/ts_devtools/Cargo.toml b/ts_devtools/Cargo.toml index 272cb3a1..a49ee220 100644 --- a/ts_devtools/Cargo.toml +++ b/ts_devtools/Cargo.toml @@ -20,11 +20,9 @@ tracing.workspace = true ts_cli_util.workspace = true ts_keys.workspace = true -ts_packet.workspace = true ts_packetfilter.workspace = true ts_packetfilter_state.workspace = true -ts_transport.workspace = true -ts_transport_derp.workspace = true +ts_derp.workspace = true [lints] workspace = true diff --git a/ts_devtools/src/bin/derp_ping.rs b/ts_devtools/src/bin/derp_ping.rs index 218510a2..9c393ee5 100644 --- a/ts_devtools/src/bin/derp_ping.rs +++ b/ts_devtools/src/bin/derp_ping.rs @@ -6,8 +6,6 @@ use std::sync::Arc; use clap::Parser; use tokio::task::JoinSet; use ts_keys::NodePublicKey; -use ts_packet::PacketMut; -use ts_transport::UnderlayTransport; /// Authenticate with control, load the derp map, and attempt to exchange derp pings with /// a selected peer. @@ -38,17 +36,15 @@ async fn main() -> ts_cli_util::Result<()> { let mut tasks = JoinSet::new(); - tracing::info!(?region_id, "starting derp transport"); - - let derp = - ts_transport_derp::Client::connect(&derp_servers, &config.key_state.node_keys).await?; - let derp = Arc::new(derp); - let peer = args .send_to_self .then_some(config.key_state.node_keys.public) .or(args.peer); + tracing::info!(?region_id, "starting derp transport"); + let derp = ts_derp::Client::connect(&derp_servers, &config.key_state.node_keys).await?; + let derp = Arc::new(derp); + if let Some(peer) = peer { tasks.spawn(derp_send_ping(peer, derp.clone())); } else { @@ -64,7 +60,7 @@ async fn main() -> ts_cli_util::Result<()> { static PING_MAX: AtomicU32 = AtomicU32::new(0); -async fn derp_receive_ping(derp: impl Borrow) { +async fn derp_receive_ping(derp: impl Borrow) { use bytes::Buf; let derp = derp.borrow(); @@ -84,19 +80,18 @@ async fn derp_receive_ping(derp: impl Borrow) } #[tracing::instrument(skip(derp), fields(%peer))] -async fn derp_send_ping(peer: NodePublicKey, derp: impl Borrow) { +async fn derp_send_ping(peer: NodePublicKey, derp: impl Borrow) { use bytes::BufMut; let mut ticker = tokio::time::interval(Duration::from_secs(1)); let derp = derp.borrow(); + let mut packet = [0u8; size_of::()]; loop { let val = PING_MAX.fetch_add(1, core::sync::atomic::Ordering::SeqCst); + (&mut packet[..]).put_u32(val); - let mut packet = PacketMut::with_capacity(size_of::()); - packet.put_u32(val); - - derp.send([(peer, [packet])]).await.unwrap(); + derp.send_one(peer, &packet).await.unwrap(); tracing::info!(value = val, "send ping"); ticker.tick().await; diff --git a/ts_netcheck/Cargo.toml b/ts_netcheck/Cargo.toml index 0ba4442c..824f56e5 100644 --- a/ts_netcheck/Cargo.toml +++ b/ts_netcheck/Cargo.toml @@ -14,7 +14,7 @@ rust-version.workspace = true # Our crates. ts_control.workspace = true ts_http_util.workspace = true -ts_transport_derp.workspace = true +ts_derp.workspace = true # Unconditionally required depdendencies. bytes.workspace = true diff --git a/ts_netcheck/src/derp_latency.rs b/ts_netcheck/src/derp_latency.rs index 6b7e7e5a..87240908 100644 --- a/ts_netcheck/src/derp_latency.rs +++ b/ts_netcheck/src/derp_latency.rs @@ -3,7 +3,7 @@ use core::{fmt::Debug, net::SocketAddr, time::Duration}; use ts_control::DerpMap; -use ts_transport_derp::RegionId; +use ts_derp::RegionId; /// Configuration for probing derp map latency. #[derive(Debug, Copy, Clone)] diff --git a/ts_netcheck/src/https.rs b/ts_netcheck/src/https.rs index d97f713c..7ad38dd5 100644 --- a/ts_netcheck/src/https.rs +++ b/ts_netcheck/src/https.rs @@ -4,8 +4,8 @@ use core::{net::SocketAddr, time::Duration}; use std::{io, time::Instant}; use tokio::io::{AsyncRead, AsyncWrite}; +use ts_derp::ServerConnInfo; use ts_http_util::{ClientExt, EmptyBody, Http1}; -use ts_transport_derp::ServerConnInfo; use url::Url; /// Errors that may occur while probing derp latency. @@ -39,9 +39,9 @@ impl From for Error { } } -impl From for Error { - fn from(value: ts_transport_derp::dial::Error) -> Self { - use ts_transport_derp::dial; +impl From for Error { + fn from(value: ts_derp::dial::Error) -> Self { + use ts_derp::dial; match value { dial::Error::Io => Error::Io, @@ -96,8 +96,8 @@ impl Default for Config { /// /// Returns `None` iff no servers could be successfully measured, either due to connectivity errors /// or because they were not configured to be reachable. See the notes on -/// [`dial_region_tls`][ts_transport_derp::dial::dial_region_tls] and -/// [`dial_region_tcp`][ts_transport_derp::dial::dial_region_tcp] for more details on when +/// [`dial_region_tls`][ts_derp::dial::dial_region_tls] and +/// [`dial_region_tcp`][ts_derp::dial::dial_region_tcp] for more details on when /// servers are treated as not configured for reachability. pub async fn measure_https_latency<'c>( servers: impl IntoIterator, @@ -115,18 +115,17 @@ pub async fn measure_https_latency<'c>( let mut servers = servers.into_iter(); loop { - let (conn, server, remote) = - match ts_transport_derp::dial::dial_region_tls(&mut servers).await { - Ok(Some(x)) => x, - Ok(None) => { - tracing::warn!("ran out of servers to dial"); - return None; - } - Err(e) => { - tracing::error!(error = %e, "dialing tls"); - continue; - } - }; + let (conn, server, remote) = match ts_derp::dial::dial_region_tls(&mut servers).await { + Ok(Some(x)) => x, + Ok(None) => { + tracing::warn!("ran out of servers to dial"); + return None; + } + Err(e) => { + tracing::error!(error = %e, "dialing tls"); + continue; + } + }; match measure_server_latency(conn, server, &config).await { Ok(dur) => return Some((dur, server, remote)), @@ -226,7 +225,7 @@ mod test { let info = info(); - let (conn, server, remote) = ts_transport_derp::dial::dial_region_tls([&info]) + let (conn, server, remote) = ts_derp::dial::dial_region_tls([&info]) .await .unwrap() .unwrap(); diff --git a/ts_overlay_router/Cargo.toml b/ts_overlay_router/Cargo.toml index 1ebcb89b..5a033e0a 100644 --- a/ts_overlay_router/Cargo.toml +++ b/ts_overlay_router/Cargo.toml @@ -12,7 +12,6 @@ rust-version.workspace = true [dependencies] ts_bart.workspace = true -ts_keys.workspace = true ts_packet.workspace = true ts_transport.workspace = true diff --git a/ts_overlay_router/src/outbound.rs b/ts_overlay_router/src/outbound.rs index fd2fc5a5..09d5f9c3 100644 --- a/ts_overlay_router/src/outbound.rs +++ b/ts_overlay_router/src/outbound.rs @@ -4,9 +4,8 @@ use std::collections::HashMap; use itertools::Itertools; use ts_bart::{RoutingTable, Table}; -use ts_keys::NodePublicKey; use ts_packet::PacketMut; -use ts_transport::OverlayTransportId; +use ts_transport::{OverlayTransportId, PeerId}; /// An outbound routing action. #[derive(Debug, Clone)] @@ -18,7 +17,7 @@ pub enum RouteAction { Drop, /// Send to a wireguard peer. - Wireguard(NodePublicKey), + Wireguard(PeerId), /// Loop the packet back to a local overlay transport. /// @@ -36,7 +35,7 @@ pub struct Router { #[derive(Debug, Default, Eq, PartialEq)] pub struct Result { /// Packets to send through wireguard. - pub to_wireguard: HashMap>, + pub to_wireguard: HashMap>, /// Packets to return to a local transport. pub loopback: HashMap>, } @@ -102,8 +101,8 @@ mod tests { #[test] fn test_outbound_overlay() { - let peer_a = NodePublicKey::from([1u8; 32]); - let peer_b = NodePublicKey::from([2u8; 32]); + let peer_a = PeerId(0); + let peer_b = PeerId(1); let magicdns = 42.into(); let mut routes = Table::default(); diff --git a/ts_runtime/Cargo.toml b/ts_runtime/Cargo.toml index f9929bbf..a2e33889 100644 --- a/ts_runtime/Cargo.toml +++ b/ts_runtime/Cargo.toml @@ -24,7 +24,8 @@ ts_packet.workspace = true ts_packetfilter = { workspace = true, features = ["checking-filter"] } ts_packetfilter_state.workspace = true ts_transport.workspace = true -ts_transport_derp.workspace = true +ts_derp.workspace = true +ts_tunnel.workspace = true # Unconditionally required dependencies. futures.workspace = true diff --git a/ts_runtime/src/dataplane.rs b/ts_runtime/src/dataplane.rs index f36520dc..d5d1f7b8 100644 --- a/ts_runtime/src/dataplane.rs +++ b/ts_runtime/src/dataplane.rs @@ -5,14 +5,14 @@ use kameo::{ message::{Context, Message}, }; use tokio::sync::mpsc; -use ts_keys::NodePublicKey; use ts_packet::PacketMut; -use ts_transport::{OverlayTransportId, UnderlayTransportId}; +use ts_transport::{OverlayTransportId, PeerId, UnderlayTransportId}; use crate::{ Error, env::Env, packetfilter::PacketFilterState, + peer_tracker::PeerState, route_updater::{PeerRouteUpdate, SelfRouteUpdate}, src_filter::SourceFilterState, }; @@ -24,10 +24,10 @@ pub type OverlayToDataplane = mpsc::UnboundedSender>; pub type OverlayFromDataplane = mpsc::UnboundedReceiver>; /// Queue for packets leaving the underlay to the dataplane. -pub type UnderlayToDataplane = mpsc::UnboundedSender<(NodePublicKey, Vec)>; +pub type UnderlayToDataplane = mpsc::UnboundedSender<(PeerId, Vec)>; /// Queue for packets entering an underlay from the dataplane. -pub type UnderlayFromDataplane = mpsc::UnboundedReceiver<(NodePublicKey, Vec)>; +pub type UnderlayFromDataplane = mpsc::UnboundedReceiver<(PeerId, Vec)>; pub struct DataplaneActor { dataplane: Arc, @@ -68,12 +68,14 @@ impl kameo::Actor for DataplaneActor { async fn on_start(env: Self::Args, slf: ActorRef) -> Result { let dataplane = Arc::new(ts_dataplane::async_tokio::DataPlane::new( env.keys.node_keys, + env.peer_db.clone(), )); env.subscribe::(&slf).await?; env.subscribe::(&slf).await?; env.subscribe::(&slf).await?; env.subscribe::(&slf).await?; + env.subscribe::(&slf).await?; let task_dataplane = dataplane.clone(); @@ -87,6 +89,31 @@ impl kameo::Actor for DataplaneActor { } } +impl Message for DataplaneActor { + type Reply = (); + + async fn handle(&mut self, msg: PeerState, _ctx: &mut Context) { + let mut dp = self.dataplane.inner().await; + + for key in &msg.upserts { + dp.peer_db.get_or_insert(key); + + dp.wireguard.add_peer(ts_tunnel::PeerConfig { + key: *key, + psk: [0u8; 32].into(), + }); + } + + for key in &msg.deletions { + dp.peer_db.remove(key); + + if let Some(peer_id) = dp.wireguard.peer_id(*key) { + dp.wireguard.remove_peer(peer_id); + } + } + } +} + impl Message for DataplaneActor { type Reply = (); diff --git a/ts_runtime/src/env.rs b/ts_runtime/src/env.rs index 2a343341..04a289d8 100644 --- a/ts_runtime/src/env.rs +++ b/ts_runtime/src/env.rs @@ -13,6 +13,7 @@ use crate::{Error, error::ResultExt}; pub struct Env { pub bus: ActorRef, pub keys: Arc, + pub peer_db: Arc, /// Whether the runtime is shutdown. /// @@ -31,6 +32,7 @@ impl Env { bus: MessageBus::spawn_default(), keys: Arc::new(keys), shutdown, + peer_db: Default::default(), } } diff --git a/ts_runtime/src/multiderp.rs b/ts_runtime/src/multiderp.rs index f7551247..1fe5e053 100644 --- a/ts_runtime/src/multiderp.rs +++ b/ts_runtime/src/multiderp.rs @@ -11,9 +11,9 @@ use kameo::{ }; use tokio::{sync::watch, task::JoinSet}; use ts_control::DerpRegion; -use ts_keys::NodeKeyPair; -use ts_transport::{UnderlayTransport, UnderlayTransportId}; -use ts_transport_derp::RegionId; +use ts_derp::RegionId; +use ts_keys::{NodeKeyPair, NodePublicKey}; +use ts_transport::{PeerId, UnderlayTransport, UnderlayTransportId}; use crate::{ Env, Error, @@ -72,6 +72,8 @@ impl Multiderp { }; let (home_derp_tx, mut home_derp_rx) = watch::channel(false); + let peer_db = self.env.peer_db.clone(); + self.tasks.spawn(async move { while !*shutdown.borrow() { tokio::select! { @@ -85,6 +87,7 @@ impl Multiderp { &down, &mut up, &mut home_derp_rx, + &peer_db, ) => if let Err(e) = ret { tracing::error!(error = %e, region_id = %id, "running derp client"); tokio::time::sleep(Duration::from_millis(500)).await; @@ -121,6 +124,18 @@ impl Multiderp { } } +struct PeerDbLookup<'a>(&'a ts_dataplane::PeerDb); + +impl ts_derp::PeerLookup for PeerDbLookup<'_> { + fn id_to_key(&self, id: PeerId) -> Option { + Some(self.0.get_by_id(id)?.node_key) + } + + fn key_to_id(&self, key: &NodePublicKey) -> PeerId { + self.0.get_or_insert(key).peer_id + } +} + #[tracing::instrument(skip_all, fields(region_id = %id), name = "derp packet transport")] async fn run_derp_once( id: RegionId, @@ -129,7 +144,8 @@ async fn run_derp_once( to_dataplane: &UnderlayToDataplane, from_dataplane: &mut UnderlayFromDataplane, home_derp_rx: &mut watch::Receiver, -) -> Result<(), ts_transport_derp::Error> { + peer_db: &ts_dataplane::PeerDb, +) -> Result<(), ts_derp::Error> { const INACTIVITY_TIMEOUT: Duration = Duration::from_secs(10); loop { @@ -153,11 +169,12 @@ async fn run_derp_once( tracing::trace!("establishing derp connection"); - let client = ts_transport_derp::DefaultClient::connect(®ion.servers, &keys).await?; + let client = ts_derp::DefaultClient::connect(®ion.servers, &keys).await?; + let transport = client.into_transport(PeerDbLookup(peer_db)); if let Some(pending) = pending { tracing::trace!("sending queued packet"); - client.send([pending]).await?; + transport.send([pending]).await?; } let mut last_activity = Instant::now(); @@ -169,16 +186,20 @@ async fn run_derp_once( (!*home_derp_rx.borrow()).then(|| last_activity + INACTIVITY_TIMEOUT); tokio::select! { - from_derp = client.recv_one() => { + from_derp = transport.recv() => { last_activity = Instant::now(); - let (peer, pkt) = from_derp?; - tracing::trace!(parent: &span, %peer, len = pkt.len(), "packet from derp server"); + for ret in from_derp { + let (peer_id, pkts) = ret?; + let pkts = pkts.into_iter().collect::>(); - let Ok(()) = to_dataplane.send((peer, vec![pkt])) else { - tracing::error!(parent: &span, "underlay receive channel closed"); - break; - }; + tracing::trace!(parent: &span, %peer_id, len = pkts.len(), "packet from derp server"); + + let Ok(()) = to_dataplane.send((peer_id, pkts)) else { + tracing::error!(parent: &span, "underlay receive channel closed"); + break; + }; + } }, from_net = from_dataplane.recv() => { @@ -191,7 +212,7 @@ async fn run_derp_once( tracing::trace!(parent: &span, peer = %from_net.0, packets = from_net.1.len(), "packets to derp server"); - client.send([from_net]).await?; + transport.send([from_net]).await?; }, _ = option_timeout(inactivity_timeout) => { diff --git a/ts_runtime/src/route_updater.rs b/ts_runtime/src/route_updater.rs index d85d0a54..bd7d863d 100644 --- a/ts_runtime/src/route_updater.rs +++ b/ts_runtime/src/route_updater.rs @@ -5,11 +5,10 @@ use kameo::{ message::{Context, Message}, }; use ts_bart::RoutingTable; -use ts_keys::NodePublicKey; use ts_overlay_router::{ inbound::RouteAction as InboundRouteAction, outbound::RouteAction as OutboundRouteAction, }; -use ts_transport::{OverlayTransportId, UnderlayTransportId}; +use ts_transport::{OverlayTransportId, PeerId, UnderlayTransportId}; use crate::{Error, env::Env, multiderp, multiderp::Multiderp, peer_tracker::PeerState}; @@ -50,7 +49,7 @@ pub struct PeerRouteUpdate { } pub struct PeerRoutesInner { - pub underlay_routes: HashMap, + pub underlay_routes: HashMap, pub overlay_out_routes: ts_bart::Table, } @@ -69,11 +68,15 @@ impl Message for RouteUpdater { for peer in msg.peers.values() { let span = tracing::trace_span!( "peer_update", - peer = %peer.node_key, + peer_key = %peer.node_key, region = tracing::field::Empty, underlay_transport = tracing::field::Empty, + peer_id = tracing::field::Empty, ); + let info = self.env.peer_db.get_or_insert(&peer.node_key); + span.record("peer_id", tracing::field::debug(info.peer_id)); + let Some(region) = peer.derp_region else { tracing::trace!(parent: &span, "peer has no derp region"); continue; @@ -89,7 +92,7 @@ impl Message for RouteUpdater { { Ok(Some(transport_id)) => { span.record("underlay_transport", tracing::field::debug(transport_id)); - underlay_out.insert(peer.node_key, transport_id); + underlay_out.insert(info.peer_id, transport_id); tracing::trace!(parent: &span, "set underlay route"); } Ok(None) => { @@ -103,7 +106,7 @@ impl Message for RouteUpdater { for route in &peer.accepted_routes { tracing::trace!(parent: &span, %route, "routes"); - overlay_out.insert(*route, OutboundRouteAction::Wireguard(peer.node_key)); + overlay_out.insert(*route, OutboundRouteAction::Wireguard(info.peer_id)); } } diff --git a/ts_runtime/src/src_filter.rs b/ts_runtime/src/src_filter.rs index cfa1aae6..221f959f 100644 --- a/ts_runtime/src/src_filter.rs +++ b/ts_runtime/src/src_filter.rs @@ -5,7 +5,7 @@ use kameo::{ message::{Context, Message}, }; use ts_bart::{RoutingTable, Table}; -use ts_keys::NodePublicKey; +use ts_transport::PeerId; use crate::{Error, env::Env, peer_tracker::PeerState}; @@ -25,16 +25,19 @@ impl kameo::Actor for SourceFilterUpdater { } #[derive(Clone)] -pub struct SourceFilterState(pub Arc>); +pub struct SourceFilterState(pub Arc>); impl Message for SourceFilterUpdater { type Reply = (); async fn handle(&mut self, state_update: PeerState, _ctx: &mut Context) { let mut src_filter = Table::default(); + for (nodekey, node) in state_update.peers.iter() { + let info = self.env.peer_db.get_or_insert(nodekey); + for route in node.accepted_routes.iter() { - src_filter.insert(route.to_owned(), *nodekey); + src_filter.insert(route.to_owned(), info.peer_id); } } diff --git a/ts_transport/Cargo.toml b/ts_transport/Cargo.toml index 358284fa..321a7726 100644 --- a/ts_transport/Cargo.toml +++ b/ts_transport/Cargo.toml @@ -12,8 +12,9 @@ rust-version.workspace = true [dependencies] # Our crates. -ts_keys.workspace = true ts_packet.workspace = true +zerocopy.workspace = true + [lints] workspace = true diff --git a/ts_transport/src/lib.rs b/ts_transport/src/lib.rs index 396367c0..03f2a79b 100644 --- a/ts_transport/src/lib.rs +++ b/ts_transport/src/lib.rs @@ -3,9 +3,11 @@ extern crate alloc; -use core::error::Error; +use core::{ + error::Error, + fmt::{Debug, Display, Formatter}, +}; -use ts_keys::NodePublicKey; use ts_packet::PacketMut; /// The unique id of an overlay transport. @@ -40,6 +42,29 @@ impl From for u32 { } } +/// The unique id of a peer. +#[derive( + Debug, + Copy, + Clone, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + zerocopy::FromBytes, + zerocopy::IntoBytes, + zerocopy::Immutable, + zerocopy::KnownLayout, +)] +pub struct PeerId(pub u32); + +impl Display for PeerId { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { + Debug::fmt(self, f) + } +} + /// An abstract transport that can carry packets to configurable destinations. pub trait UnderlayTransport { /// The error type that this transport may produce. @@ -48,16 +73,16 @@ pub trait UnderlayTransport { /// Send packets through the transport. /// /// The return type should be interpreted as meaning essentially - /// `HashMap>`. It is set up this way to enable the caller + /// `HashMap>`. It is set up this way to enable the caller /// to use iterators to transform a collection of a slightly different shape, or e.g. - /// look up `NodePublicKey`s on-the-fly, without having to `.collect()` into an + /// look up `PeerId`s on-the-fly, without having to `.collect()` into an /// intermediary collection. fn send( &self, packet_batch: BatchIter, ) -> impl Future> + Send where - BatchIter: IntoIterator + Send, + BatchIter: IntoIterator + Send, BatchIter::IntoIter: Send, PacketIter: IntoIterator + Send, PacketIter::IntoIter: Send; @@ -65,15 +90,15 @@ pub trait UnderlayTransport { /// Receive packets from the transport. /// /// The return type should be interpreted as meaning essentially - /// `HashMap>`, but allows for the implementation to + /// `HashMap>`, but allows for the implementation to /// use iterators to map a collection of a slightly different shape, or e.g. look up - /// `NodePublicKey`s on-the-fly, without having to `.collect()` into an intermediary + /// `PeerId`s on-the-fly, without having to `.collect()` into an intermediary /// collection. fn recv( &self, ) -> impl Future< Output = impl IntoIterator< - Item = Result<(NodePublicKey, impl IntoIterator), Self::Error>, + Item = Result<(PeerId, impl IntoIterator), Self::Error>, >, > + Send; } diff --git a/ts_underlay_router/Cargo.toml b/ts_underlay_router/Cargo.toml index e176e017..d8d2b05c 100644 --- a/ts_underlay_router/Cargo.toml +++ b/ts_underlay_router/Cargo.toml @@ -11,7 +11,6 @@ license.workspace = true rust-version.workspace = true [dependencies] -ts_keys.workspace = true ts_packet.workspace = true ts_transport.workspace = true diff --git a/ts_underlay_router/src/outbound.rs b/ts_underlay_router/src/outbound.rs index 8519099d..7691a40b 100644 --- a/ts_underlay_router/src/outbound.rs +++ b/ts_underlay_router/src/outbound.rs @@ -2,28 +2,24 @@ use std::collections::HashMap; -use ts_keys::NodePublicKey; use ts_packet::PacketMut; -use ts_transport::UnderlayTransportId; +use ts_transport::{PeerId, UnderlayTransportId}; /// Routes packets that originate from the local device. #[derive(Default)] pub struct Router { /// The transport to use for sending to each wireguard peer. - pub table: HashMap, + pub table: HashMap, } /// The outcome of routing packets. -pub type Result = HashMap<(UnderlayTransportId, NodePublicKey), Vec>; +pub type Result = HashMap<(UnderlayTransportId, PeerId), Vec>; impl Router { /// Assigns a batch of packets to their next hop. /// /// Packets that don't match any routes are dropped. - pub fn route( - &self, - batches: impl IntoIterator)>, - ) -> Result { + pub fn route(&self, batches: impl IntoIterator)>) -> Result { let mut ret = Result::default(); for (peer_id, packets) in batches { @@ -44,11 +40,11 @@ mod tests { #[test] fn test_outbound_underlay() { - let peer_a = NodePublicKey::from([1u8; 32]); - let peer_b = NodePublicKey::from([2u8; 32]); - let peer_c = NodePublicKey::from([3u8; 32]); - let peer_d = NodePublicKey::from([4u8; 32]); - let peer_e = NodePublicKey::from([5u8; 32]); + let peer_a = PeerId(1); + let peer_b = PeerId(2); + let peer_c = PeerId(3); + let peer_d = PeerId(4); + let peer_e = PeerId(5); let transport_a = 5.into(); let transport_b = 6.into(); let transport_c = 7.into();