Skip to content

Commit afad860

Browse files
committed
Add leader map view with geolocation
1 parent 0368d6b commit afad860

16 files changed

Lines changed: 1038 additions & 66 deletions

File tree

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ docker run -p 3000:3000 --env-file .env leader-stream
4949
| `TRACK_LOOKAHEAD` | Slots to prefetch per tracked validator | 5000 |
5050
| `STATIC_DIR` | Override static dir | `<repo>/leader-stream/public` |
5151
| `NEXT_PUBLIC_LEADER_STREAM_URL` | Override SSE path injected into HTML | `/api/leader-stream` |
52+
| `MAXMIND_DB_PATH` | Path to the MaxMind MMDB file to use for geolocation | `./GeoLite2-City.mmdb` |
53+
| `MAXMIND_LICENSE_KEY` | Optional MaxMind license key for downloading GeoLite/GeoIP2 | none |
54+
| `MAXMIND_DB_DOWNLOAD_URL` | Override URL for downloading the MMDB (expects raw file or tar.gz) | none |
55+
| `MAXMIND_FALLBACK_URL` | Fallback URL for a free/test MaxMind database when no key is present | MaxMind test DB |
56+
| `MAXMIND_EDITION_ID` | Edition ID when downloading via license key | `GeoLite2-City` |
5257

5358
See `.env.example` and `k8s/secret.env.example` for templates.
5459

@@ -57,6 +62,7 @@ Static docs at `/docs.html` (source: `leader-stream/public/docs.html`). Key endp
5762
- `GET /api/next-leaders?limit=1000`
5863
- `GET /api/current-slot`
5964
- `GET /api/leader-stream?track=<validator>` (SSE)
65+
- `GET /map` globe view of upcoming leaders with geolocation (uses `/api/leader-path`)
6066

6167
## Deployment (Kubernetes)
6268
`k8s/` uses Kustomize. Replace image `ghcr.io/trustless-engineering/leader-stream:${GIT_SHA}` and supply your own overlays/secrets:

leader-stream/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ anyhow = "1"
1414
async-stream = "0.3"
1515
axum = { version = "0.7", features = ["macros"] }
1616
bytes = "1"
17+
flate2 = "1"
1718
futures-util = "0.3"
19+
maxminddb = "0.24"
1820
reqwest = { version = "0.11", features = ["json", "rustls-tls", "stream"] }
1921
rustls = { version = "0.23", default-features = false, features = ["ring"] }
2022
serde_json = "1"
@@ -23,6 +25,7 @@ tokio-tungstenite = { version = "0.23", features = ["rustls-tls-native-roots"] }
2325
tower-http = { version = "0.5", features = ["fs"] }
2426
tracing = "0.1"
2527
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
28+
tar = "0.4"
2629
url = "2"
2730
dotenvy = "0.15"
2831

leader-stream/src/config.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ pub(crate) struct Config {
2222
pub(crate) ws_ping_interval: Duration,
2323
pub(crate) leader_lookahead: usize,
2424
pub(crate) track_lookahead: usize,
25+
pub(crate) maxmind_db_path: String,
26+
pub(crate) maxmind_license_key: Option<String>,
27+
pub(crate) maxmind_edition_id: String,
28+
pub(crate) maxmind_db_download_url: Option<String>,
29+
pub(crate) maxmind_fallback_url: Option<String>,
2530
}
2631

2732
impl Config {
@@ -32,10 +37,7 @@ impl Config {
3237
.clone()
3338
.unwrap_or_else(|| DEFAULT_RPC_URL.to_string());
3439
if using_default_rpc {
35-
warn!(
36-
"SOLANA_RPC_URL not set; defaulting to {}",
37-
DEFAULT_RPC_URL
38-
);
40+
warn!("SOLANA_RPC_URL not set; defaulting to {}", DEFAULT_RPC_URL);
3941
}
4042
let rpc_x_token = read_env_first(&["SOLANA_RPC_X_TOKEN"]);
4143
let ws_override = read_env_first(&["SOLANA_WSS_URL", "SOLANA_WS_URL"]);
@@ -94,6 +96,14 @@ impl Config {
9496
.and_then(|value| value.parse::<usize>().ok())
9597
.unwrap_or(DEFAULT_TRACK_LOOKAHEAD);
9698

99+
let maxmind_db_path =
100+
env::var("MAXMIND_DB_PATH").unwrap_or_else(|_| "./GeoLite2-City.mmdb".to_string());
101+
let maxmind_license_key = read_env_first(&["MAXMIND_LICENSE_KEY", "GEOIP_LICENSE_KEY"]);
102+
let maxmind_edition_id =
103+
env::var("MAXMIND_EDITION_ID").unwrap_or_else(|_| "GeoLite2-City".to_string());
104+
let maxmind_db_download_url = read_env_first(&["MAXMIND_DB_DOWNLOAD_URL"]);
105+
let maxmind_fallback_url = read_env_first(&["MAXMIND_FALLBACK_URL"]);
106+
97107
Ok(Self {
98108
rpc_url,
99109
rpc_x_token,
@@ -106,6 +116,11 @@ impl Config {
106116
ws_ping_interval,
107117
leader_lookahead,
108118
track_lookahead,
119+
maxmind_db_path,
120+
maxmind_license_key,
121+
maxmind_edition_id,
122+
maxmind_db_download_url,
123+
maxmind_fallback_url,
109124
})
110125
}
111126
}

leader-stream/src/docs.html

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ <h1>
4646
<a class="api-link" href="#next-leaders">/api/next-leaders</a>
4747
<a class="api-link" href="#current-slot">/api/current-slot</a>
4848
<a class="api-link" href="#leader-stream">/api/leader-stream</a>
49+
<a class="api-link" href="#leader-path">/api/leader-path</a>
4950
</div>
5051
</aside>
5152
</header>
@@ -158,6 +159,37 @@ <h2>GET /api/leader-stream</h2>
158159
</div>
159160
</div>
160161
</section>
162+
163+
<section class="panel docs-panel" id="leader-path">
164+
<div class="panel-head">
165+
<div>
166+
<h2>GET /api/leader-path</h2>
167+
<p class="sub">Returns upcoming leaders plus MaxMind geolocation data for the map view.</p>
168+
</div>
169+
</div>
170+
<div class="docs-grid">
171+
<div class="docs-block">
172+
<span class="docs-label">Query params</span>
173+
<ul class="docs-list">
174+
<li><span class="docs-key">limit</span> Optional integer. Defaults to 1000, clamps between 1 and 5000. Invalid values fall back to the default.</li>
175+
</ul>
176+
</div>
177+
<div class="docs-block">
178+
<span class="docs-label">Response fields</span>
179+
<ul class="docs-list">
180+
<li><span class="docs-key">currentSlot</span> Latest slot used as the starting point.</li>
181+
<li><span class="docs-key">limit</span> The resolved limit after clamping.</li>
182+
<li><span class="docs-key">slotMs</span> Estimated milliseconds per slot.</li>
183+
<li><span class="docs-key">path</span> Array of rows with <span class="docs-key">slot</span>, <span class="docs-key">leader</span>, <span class="docs-key">ip</span>, <span class="docs-key">port</span>, and geolocation fields <span class="docs-key">latitude</span>, <span class="docs-key">longitude</span>, <span class="docs-key">city</span>, <span class="docs-key">country</span>.</li>
184+
<li><span class="docs-key">ts</span> Server timestamp (ms since epoch).</li>
185+
</ul>
186+
</div>
187+
<div class="docs-block docs-span">
188+
<span class="docs-label">Example</span>
189+
<pre class="docs-code"><code class="docs-example" data-command="curl" data-endpoint="/api/leader-path?limit=250"></code></pre>
190+
</div>
191+
</div>
192+
</section>
161193
</main>
162194
<script>
163195
(function () {

leader-stream/src/geo.rs

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
use std::collections::HashMap;
2+
use std::fs;
3+
use std::io::{Cursor, Read};
4+
use std::net::IpAddr;
5+
use std::path::{Path, PathBuf};
6+
use std::sync::Arc;
7+
use std::time::Duration;
8+
9+
use anyhow::{anyhow, Context, Result};
10+
use flate2::read::GzDecoder;
11+
use maxminddb::geoip2::City;
12+
use maxminddb::Reader;
13+
use reqwest::Client;
14+
use tar::Archive;
15+
use tokio::sync::RwLock;
16+
use tracing::{info, warn};
17+
18+
use crate::config::Config;
19+
20+
#[derive(Clone, Debug)]
21+
pub(crate) struct GeoPoint {
22+
pub(crate) latitude: f64,
23+
pub(crate) longitude: f64,
24+
pub(crate) city: Option<String>,
25+
pub(crate) country: Option<String>,
26+
}
27+
28+
#[derive(Clone)]
29+
pub(crate) struct GeoIpService {
30+
reader: Option<Arc<Reader<Vec<u8>>>>,
31+
cache: Arc<RwLock<HashMap<String, Option<GeoPoint>>>>,
32+
}
33+
34+
impl GeoIpService {
35+
pub(crate) fn from_reader(reader: Reader<Vec<u8>>) -> Self {
36+
Self {
37+
reader: Some(Arc::new(reader)),
38+
cache: Arc::new(RwLock::new(HashMap::new())),
39+
}
40+
}
41+
42+
#[cfg(test)]
43+
pub(crate) fn from_static(entries: HashMap<String, Option<GeoPoint>>) -> Self {
44+
Self {
45+
reader: None,
46+
cache: Arc::new(RwLock::new(entries)),
47+
}
48+
}
49+
50+
pub(crate) async fn lookup(&self, ip: &str) -> Option<GeoPoint> {
51+
if ip.is_empty() {
52+
return None;
53+
}
54+
55+
{
56+
let cache = self.cache.read().await;
57+
if let Some(result) = cache.get(ip) {
58+
return result.clone();
59+
}
60+
}
61+
62+
let ip_addr: IpAddr = match ip.parse() {
63+
Ok(addr) => addr,
64+
Err(_) => {
65+
self.cache_write(ip, None).await;
66+
return None;
67+
}
68+
};
69+
70+
let reader = match self.reader.as_ref() {
71+
Some(reader) => reader,
72+
None => {
73+
self.cache_write(ip, None).await;
74+
return None;
75+
}
76+
};
77+
78+
let result = reader
79+
.lookup::<City>(ip_addr)
80+
.ok()
81+
.and_then(|city| extract_point(&city));
82+
self.cache_write(ip, result.clone()).await;
83+
result
84+
}
85+
86+
async fn cache_write(&self, ip: &str, value: Option<GeoPoint>) {
87+
let mut cache = self.cache.write().await;
88+
cache.insert(ip.to_string(), value);
89+
}
90+
}
91+
92+
pub(crate) async fn load_geoip(config: &Config) -> Result<GeoIpService> {
93+
let path = resolve_database_path(config)?;
94+
if !path.exists() {
95+
download_database(config, &path).await?;
96+
}
97+
let reader = Reader::open_readfile(&path)
98+
.with_context(|| format!("failed to open MaxMind database at {}", path.display()))?;
99+
Ok(GeoIpService::from_reader(reader))
100+
}
101+
102+
fn resolve_database_path(config: &Config) -> Result<PathBuf> {
103+
let path = PathBuf::from(config.maxmind_db_path.clone());
104+
if let Some(parent) = path.parent() {
105+
fs::create_dir_all(parent)
106+
.with_context(|| format!("failed to create database directory {}", parent.display()))?;
107+
}
108+
Ok(path)
109+
}
110+
111+
async fn download_database(config: &Config, target: &Path) -> Result<()> {
112+
let timeout = std::cmp::min(config.request_timeout, Duration::from_secs(5));
113+
let client = Client::builder()
114+
.timeout(timeout)
115+
.build()
116+
.context("failed to build HTTP client for database download")?;
117+
118+
if let Some(url) = config.maxmind_db_download_url.as_ref() {
119+
if let Err(err) = fetch_and_write(&client, url, target, true).await {
120+
warn!(
121+
?err,
122+
"failed to download MaxMind database from MAXMIND_DB_DOWNLOAD_URL"
123+
);
124+
} else {
125+
info!("downloaded MaxMind database from custom URL");
126+
return Ok(());
127+
}
128+
}
129+
130+
if let Some(key) = config.maxmind_license_key.as_ref() {
131+
let url = format!("https://download.maxmind.com/app/geoip_download?edition_id={}&license_key={}&suffix=tar.gz", config.maxmind_edition_id, key);
132+
if let Err(err) = fetch_and_write(&client, &url, target, false).await {
133+
warn!(?err, "failed to download MaxMind database with license key");
134+
} else {
135+
info!("downloaded MaxMind database using license key");
136+
return Ok(());
137+
}
138+
}
139+
140+
let url = config
141+
.maxmind_fallback_url
142+
.as_deref()
143+
.unwrap_or("https://raw.githubusercontent.com/maxmind/MaxMind-DB/main/test-data/GeoLite2-City-Test.mmdb");
144+
fetch_and_write(&client, url, target, true)
145+
.await
146+
.context("failed to download fallback MaxMind database")
147+
}
148+
149+
async fn fetch_and_write(client: &Client, url: &str, target: &Path, raw_mmdb: bool) -> Result<()> {
150+
let response = client
151+
.get(url)
152+
.send()
153+
.await
154+
.context("database request failed")?
155+
.error_for_status()
156+
.context("database request returned error status")?;
157+
158+
let bytes = response
159+
.bytes()
160+
.await
161+
.context("failed to read database body")?;
162+
163+
if raw_mmdb {
164+
if url.ends_with(".gz") {
165+
let mut decoder = GzDecoder::new(Cursor::new(bytes));
166+
let mut buf = Vec::new();
167+
decoder
168+
.read_to_end(&mut buf)
169+
.context("failed to decompress database")?;
170+
fs::write(target, &buf).context("failed to write database file")?;
171+
return Ok(());
172+
} else {
173+
fs::write(target, &bytes).context("failed to write database file")?;
174+
return Ok(());
175+
}
176+
}
177+
178+
let decoder = GzDecoder::new(Cursor::new(bytes));
179+
let mut archive = Archive::new(decoder);
180+
for entry in archive
181+
.entries()
182+
.context("failed to iterate archive entries")?
183+
{
184+
let mut entry = entry.context("failed to read archive entry")?;
185+
let path = entry
186+
.path()
187+
.context("failed to read archive path")?
188+
.into_owned();
189+
if path.extension().map(|ext| ext == "mmdb").unwrap_or(false) {
190+
let mut buf = Vec::new();
191+
entry
192+
.read_to_end(&mut buf)
193+
.context("failed to read mmdb entry")?;
194+
fs::write(target, &buf).context("failed to write database file")?;
195+
return Ok(());
196+
}
197+
}
198+
199+
Err(anyhow!("mmdb file not found in archive"))
200+
}
201+
202+
fn extract_point(city: &City) -> Option<GeoPoint> {
203+
let location = city.location.as_ref()?;
204+
let latitude = location.latitude?;
205+
let longitude = location.longitude?;
206+
let city_name = city
207+
.city
208+
.as_ref()
209+
.and_then(|record| record.names.as_ref())
210+
.and_then(|names| names.get("en").cloned());
211+
let country_name = city
212+
.country
213+
.as_ref()
214+
.and_then(|record| record.names.as_ref())
215+
.and_then(|names| names.get("en").cloned());
216+
Some(GeoPoint {
217+
latitude,
218+
longitude,
219+
city: city_name,
220+
country: country_name,
221+
})
222+
}

0 commit comments

Comments
 (0)