Skip to content

Commit 40e0450

Browse files
authored
STR-314: add Unix domain socket support for gRPC listener (#671)
* feat: UDS multi-listener support with SpyIncoming and MeteredLayer * feat: add UDS client support with connect_uds and --uds CLI flag
1 parent 79482ae commit 40e0450

8 files changed

Lines changed: 590 additions & 146 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ The minor version will be incremented upon a breaking change and the patch versi
1111
## [Unreleased]
1212

1313
### Features
14+
- geyser: add Unix domain socket support for multiple gRPC listener ([#671](https://github.com/rpcpool/yellowstone-grpc/pull/671))
15+
- client: add `connect_uds` method for Unix domain socket connections ([#671](https://github.com/rpcpool/yellowstone-grpc/pull/671))
16+
- client-simple: add `--uds` flag for Unix domain socket connections ([#671](https://github.com/rpcpool/yellowstone-grpc/pull/671))
1417

1518
- geyser: configurable per-subscriber subscription limit with enforce/observe mode ([#680](https://github.com/rpcpool/yellowstone-grpc/pull/680))
1619

Cargo.lock

Lines changed: 20 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ thiserror = "2.0.16"
6262
tokio = "1.47.1"
6363
tokio-stream = "0.1.17"
6464
tokio-util = "0.7.0"
65+
tower = "0.4"
6566
tower-layer = "0.3.3"
6667
tonic = "0.14.0"
6768
tonic-build = "0.14.0"

examples/rust/src/bin/client.rs

Lines changed: 89 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ use {
2121
},
2222
tokio::{fs, sync::Mutex},
2323
tonic::transport::{channel::ClientTlsConfig, Certificate},
24-
yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError, Interceptor},
24+
yellowstone_grpc_client::{
25+
GeyserGrpcBuilder, GeyserGrpcClient, GeyserGrpcClientError, Interceptor,
26+
},
2527
yellowstone_grpc_geyser::plugin::{convert_from, filter::message::FilteredUpdate},
2628
yellowstone_grpc_proto::{
2729
geyser::SlotStatus,
@@ -89,6 +91,10 @@ struct Args {
8991
#[clap(long)]
9092
connect_timeout_ms: Option<u64>,
9193

94+
/// Connect via Unix Domain Socket at this path instead of TCP
95+
#[clap(long)]
96+
uds: Option<PathBuf>,
97+
9298
/// Sets the tower service default internal buffer size, default is 1024
9399
#[clap(long)]
94100
buffer_size: Option<usize>,
@@ -150,7 +156,7 @@ impl Args {
150156
Some(self.commitment.unwrap_or_default().into())
151157
}
152158

153-
async fn connect(&self) -> anyhow::Result<GeyserGrpcClient<impl Interceptor + Clone>> {
159+
async fn build(&self) -> anyhow::Result<GeyserGrpcBuilder> {
154160
let mut tls_config = ClientTlsConfig::new().with_native_roots();
155161
if let Some(path) = &self.ca_certificate {
156162
let bytes = fs::read(path).await?;
@@ -206,7 +212,7 @@ impl Args {
206212
builder = builder.timeout(Duration::from_millis(duration));
207213
}
208214

209-
builder.connect().await.map_err(Into::into)
215+
Ok(builder)
210216
}
211217
}
212218

@@ -652,6 +658,71 @@ impl Action {
652658
}
653659
}
654660

661+
async fn run_action(
662+
mut client: GeyserGrpcClient<impl Interceptor>,
663+
action: &Action,
664+
commitment: Option<CommitmentLevel>,
665+
) -> anyhow::Result<()> {
666+
match action {
667+
Action::HealthCheck => client
668+
.health_check()
669+
.await
670+
.map_err(anyhow::Error::new)
671+
.map(|response| info!("response: {response:?}")),
672+
Action::HealthWatch => geyser_health_watch(client).await,
673+
Action::Subscribe(_) => {
674+
let (request, resub, stats, verify_encoding) = action
675+
.get_subscribe_request(commitment)
676+
.await?
677+
.ok_or_else(|| anyhow::anyhow!("expect subscribe action"))?;
678+
679+
geyser_subscribe(client, request, resub, stats, verify_encoding).await
680+
}
681+
Action::SubscribeDeshred(_) => {
682+
let (request, stats) = action
683+
.get_subscribe_deshred_request()
684+
.ok_or_else(|| anyhow::anyhow!("expect subscribe deshred action"))?;
685+
686+
geyser_subscribe_deshred(client, request, stats).await
687+
}
688+
Action::SubscribeReplayInfo => client
689+
.subscribe_replay_info()
690+
.await
691+
.map_err(anyhow::Error::new)
692+
.map(|response| info!("response: {response:?}")),
693+
Action::Ping { count } => client
694+
.ping(*count)
695+
.await
696+
.map_err(anyhow::Error::new)
697+
.map(|response| info!("response: {response:?}")),
698+
Action::GetLatestBlockhash => client
699+
.get_latest_blockhash(commitment)
700+
.await
701+
.map_err(anyhow::Error::new)
702+
.map(|response| info!("response: {response:?}")),
703+
Action::GetBlockHeight => client
704+
.get_block_height(commitment)
705+
.await
706+
.map_err(anyhow::Error::new)
707+
.map(|response| info!("response: {response:?}")),
708+
Action::GetSlot => client
709+
.get_slot(commitment)
710+
.await
711+
.map_err(anyhow::Error::new)
712+
.map(|response| info!("response: {response:?}")),
713+
Action::IsBlockhashValid { blockhash } => client
714+
.is_blockhash_valid(blockhash.clone(), commitment)
715+
.await
716+
.map_err(anyhow::Error::new)
717+
.map(|response| info!("response: {response:?}")),
718+
Action::GetVersion => client
719+
.get_version()
720+
.await
721+
.map_err(anyhow::Error::new)
722+
.map(|response| info!("response: {response:?}")),
723+
}
724+
}
725+
655726
#[tokio::main]
656727
async fn main() -> anyhow::Result<()> {
657728
env::set_var(
@@ -680,74 +751,29 @@ async fn main() -> anyhow::Result<()> {
680751
drop(zero_attempts);
681752

682753
let commitment = args.get_commitment();
683-
let mut client = args.connect().await.map_err(backoff::Error::transient)?;
684-
info!("Connected");
754+
let builder = args.build().await.map_err(backoff::Error::transient)?;
685755

686-
match &args.action {
687-
Action::HealthCheck => client
688-
.health_check()
689-
.await
690-
.map_err(anyhow::Error::new)
691-
.map(|response| info!("response: {response:?}")),
692-
Action::HealthWatch => geyser_health_watch(client).await,
693-
Action::Subscribe(_) => {
694-
let (request, resub, stats, verify_encoding) = args
695-
.action
696-
.get_subscribe_request(commitment)
697-
.await
698-
.map_err(backoff::Error::Permanent)?
699-
.ok_or(backoff::Error::Permanent(anyhow::anyhow!(
700-
"expect subscribe action"
701-
)))?;
702-
703-
geyser_subscribe(client, request, resub, stats, verify_encoding).await
704-
}
705-
Action::SubscribeDeshred(_) => {
706-
let (request, stats) = args.action.get_subscribe_deshred_request().ok_or(
707-
backoff::Error::Permanent(anyhow::anyhow!(
708-
"expect subscribe deshred action"
709-
)),
710-
)?;
711-
712-
geyser_subscribe_deshred(client, request, stats).await
713-
}
714-
Action::SubscribeReplayInfo => client
715-
.subscribe_replay_info()
716-
.await
717-
.map_err(anyhow::Error::new)
718-
.map(|response| info!("response: {response:?}")),
719-
Action::Ping { count } => client
720-
.ping(*count)
721-
.await
722-
.map_err(anyhow::Error::new)
723-
.map(|response| info!("response: {response:?}")),
724-
Action::GetLatestBlockhash => client
725-
.get_latest_blockhash(commitment)
726-
.await
727-
.map_err(anyhow::Error::new)
728-
.map(|response| info!("response: {response:?}")),
729-
Action::GetBlockHeight => client
730-
.get_block_height(commitment)
756+
if let Some(uds_path) = &args.uds {
757+
let client = builder
758+
.connect_uds(uds_path)
731759
.await
732760
.map_err(anyhow::Error::new)
733-
.map(|response| info!("response: {response:?}")),
734-
Action::GetSlot => client
735-
.get_slot(commitment)
761+
.map_err(backoff::Error::transient)?;
762+
info!("Connected");
763+
run_action(client, &args.action, commitment)
736764
.await
737-
.map_err(anyhow::Error::new)
738-
.map(|response| info!("response: {response:?}")),
739-
Action::IsBlockhashValid { blockhash } => client
740-
.is_blockhash_valid(blockhash.clone(), commitment)
765+
.map_err(backoff::Error::transient)?;
766+
} else {
767+
let client = builder
768+
.connect()
741769
.await
742770
.map_err(anyhow::Error::new)
743-
.map(|response| info!("response: {response:?}")),
744-
Action::GetVersion => client
745-
.get_version()
771+
.map_err(backoff::Error::transient)?;
772+
info!("Connected");
773+
run_action(client, &args.action, commitment)
746774
.await
747-
.map_err(anyhow::Error::new)
748-
.map(|response| info!("response: {response:?}")),
775+
.map_err(backoff::Error::transient)?;
749776
}
750-
.map_err(backoff::Error::transient)?;
751777

752778
Ok::<(), backoff::Error<anyhow::Error>>(())
753779
}

yellowstone-grpc-client/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,13 @@ account-data-as-bytes = [
1818
[dependencies]
1919
bytes = { workspace = true }
2020
futures = { workspace = true }
21+
hyper-util = { workspace = true }
2122
thiserror = { workspace = true }
23+
tokio = { workspace = true }
2224
tonic = { workspace = true, features = ["tls-native-roots"] }
2325
tonic-health = { workspace = true }
26+
tower = { workspace = true, features = ["util"] }
27+
2428

2529
# Yellowstone
2630
yellowstone-grpc-proto = { workspace = true, features = ["tonic", "tonic-compression"] }

yellowstone-grpc-client/src/lib.rs

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,16 @@ use {
66
sink::{Sink, SinkExt},
77
stream::Stream,
88
},
9-
std::time::Duration,
9+
std::{path::PathBuf, time::Duration},
10+
tokio::net::UnixStream,
1011
tonic::{
1112
codec::{CompressionEncoding, Streaming},
1213
metadata::{errors::InvalidMetadataValue, AsciiMetadataValue, MetadataValue},
1314
service::interceptor::InterceptedService,
14-
transport::channel::{Channel, Endpoint},
15+
transport::{
16+
channel::{Channel, Endpoint},
17+
Uri,
18+
},
1519
Request, Response, Status,
1620
},
1721
tonic_health::pb::{health_client::HealthClient, HealthCheckRequest, HealthCheckResponse},
@@ -333,6 +337,32 @@ impl GeyserGrpcBuilder {
333337
self.build(channel)
334338
}
335339

340+
/// Connect to a gRPC server over a Unix Domain Socket.
341+
///
342+
/// The `path` is the filesystem path to the socket (e.g. "/tmp/yellowstone.sock").
343+
/// tonic requires a dummy HTTP URI for the channel, but the actual transport
344+
/// goes through the UDS connector.
345+
pub async fn connect_uds(
346+
self,
347+
path: impl Into<PathBuf>,
348+
) -> GeyserGrpcBuilderResult<GeyserGrpcClient<impl Interceptor + Clone>> {
349+
let path = path.into();
350+
351+
// tonic needs an Endpoint to hang config off of, but the URI is ignored
352+
// by the connector — all traffic goes through the UnixStream.
353+
let channel = Endpoint::from_static("http://[::]:0")
354+
.connect_with_connector(tower::service_fn(move |_: Uri| {
355+
let path = path.clone();
356+
async move {
357+
let stream = UnixStream::connect(path).await?;
358+
Ok::<_, std::io::Error>(hyper_util::rt::TokioIo::new(stream))
359+
}
360+
}))
361+
.await?;
362+
363+
self.build(channel)
364+
}
365+
336366
// Set x-token
337367
pub fn x_token<T>(self, x_token: Option<T>) -> GeyserGrpcBuilderResult<Self>
338368
where

0 commit comments

Comments
 (0)