From 5edcd86bf5302516635d6d987c4808bb86b4fee6 Mon Sep 17 00:00:00 2001 From: Yzgaming005 Date: Wed, 24 Jun 2026 16:19:13 +0000 Subject: [PATCH] fix(#7063): poll /wallet/balances/all with X-Admin-Key for large_tx events The webhook dispatcher's large_tx detector was polling /api/balances, which the live node no longer exposes (the unauthenticated route was removed in favour of the admin-gated /wallet/balances/all endpoint). As a result, the large_tx event was silently dead and operators only saw 404s in the poller logs. Changes: - Add RUSTCHAIN_ADMIN_KEY + RUSTCHAIN_BALANCES_PATH env defaults and --admin-key/--balances-path CLI flags so the dispatcher can authenticate. - Plumb admin_key + balances_path into RustChainPoller; the existing balances_path default (/wallet/balances/all) is overridable so future endpoint renames are a config change, not a code change. - Skip the poll gracefully when RUSTCHAIN_ADMIN_KEY is empty instead of hammering an unauthenticated surface that returns 503. - Accept both the canonical envelope ({"balances": [...], "total_*": ...}) and the legacy bare-list response, with miner_id/miner and amount_rtc/amount_i64/balance/amount field fallbacks. - Extend _get to forward optional headers (X-Admin-Key) without breaking the other poller call sites. - Add 11 focused unit tests covering: admin key gate, header injection, envelope vs. legacy response shape, threshold/credit/debit dispatch, malformed entries, unexpected shapes, and HTTP failure handling. --- tools/webhooks/test_webhook_server.py | 210 +++++++++++++++++++++++++- tools/webhooks/webhook_server.py | 87 +++++++++-- 2 files changed, 284 insertions(+), 13 deletions(-) diff --git a/tools/webhooks/test_webhook_server.py b/tools/webhooks/test_webhook_server.py index 2461df2da..665ab77bf 100644 --- a/tools/webhooks/test_webhook_server.py +++ b/tools/webhooks/test_webhook_server.py @@ -3,7 +3,13 @@ import socket -from webhook_server import validate_webhook_url +from webhook_server import ( + RustChainPoller, + SubscriberStore, + WebhookEvent, + dispatch_event, + validate_webhook_url, +) def _addrinfo(*ips): @@ -67,3 +73,205 @@ def test_validate_webhook_url_accepts_public_resolved_ips(monkeypatch): ) assert validate_webhook_url("https://example.com/hook") is None + + +# --------------------------------------------------------------------------- +# FIX(#7063): large_tx polling now hits /wallet/balances/all with X-Admin-Key +# --------------------------------------------------------------------------- + + +def _make_poller(tmp_path, admin_key="secret-admin-key", balances_path="/wallet/balances/all"): + store = SubscriberStore(db_path=str(tmp_path / "subs.db")) + return RustChainPoller( + node_url="http://node.local:5000", + store=store, + poll_interval=60, + large_tx_threshold=5.0, + admin_key=admin_key, + balances_path=balances_path, + ) + + +def test_check_large_tx_skips_when_admin_key_unset(tmp_path, monkeypatch): + poller = _make_poller(tmp_path, admin_key="") + calls = [] + + def fail_get(*args, **kwargs): + calls.append((args, kwargs)) + raise AssertionError("_get must not be called when admin key is empty") + + monkeypatch.setattr(poller, "_get", fail_get) + poller._check_large_tx() + assert calls == [] + + +def test_check_large_tx_uses_new_endpoint_with_admin_key_header(tmp_path, monkeypatch): + poller = _make_poller(tmp_path) + + captured = {} + + def fake_get(path, headers=None): + captured["path"] = path + captured["headers"] = headers + return {"balances": [], "total_i64": 0, "total_rtc": 0.0} + + monkeypatch.setattr(poller, "_get", fake_get) + poller._check_large_tx() + + assert captured["path"] == "/wallet/balances/all" + assert captured["headers"] == {"X-Admin-Key": "secret-admin-key"} + + +def test_check_large_tx_uses_custom_balances_path(tmp_path, monkeypatch): + poller = _make_poller(tmp_path, balances_path="/v2/balances") + + captured = {} + + def fake_get(path, headers=None): + captured["path"] = path + return {"balances": []} + + monkeypatch.setattr(poller, "_get", fake_get) + poller._check_large_tx() + + assert captured["path"] == "/v2/balances" + + +def test_check_large_tx_handles_envelope_response_format(tmp_path, monkeypatch): + poller = _make_poller(tmp_path) + response = { + "balances": [ + {"miner_id": "alpha", "amount_i64": 1_000_000_000, "amount_rtc": 1.0}, + {"miner_id": "beta", "amount_i64": 2_000_000_000, "amount_rtc": 2.0}, + {"miner_id": "gamma", "amount_i64": 5_000_000_000, "amount_rtc": 5.0}, + ], + "total_i64": 8_000_000_000, + "total_rtc": 8.0, + } + monkeypatch.setattr(poller, "_get", lambda *a, **kw: response) + + poller._check_large_tx() + + # First poll: no previous snapshot, nothing dispatched, but internal + # state should now hold the new balances. + assert poller._prev_balances == {"alpha": 1.0, "beta": 2.0, "gamma": 5.0} + + +def test_check_large_tx_dispatches_event_when_threshold_exceeded(tmp_path, monkeypatch): + poller = _make_poller(tmp_path) + poller._prev_balances = {"alpha": 1.0, "beta": 2.0} + + # beta grew by 10 RTC (well over the 5 RTC threshold). + response = { + "balances": [ + {"miner_id": "alpha", "amount_rtc": 1.0}, + {"miner_id": "beta", "amount_rtc": 12.0}, + ], + } + monkeypatch.setattr(poller, "_get", lambda *a, **kw: response) + + dispatched = [] + + def fake_dispatch(event, store): + dispatched.append(event) + + monkeypatch.setattr("webhook_server.dispatch_event", fake_dispatch) + poller._check_large_tx() + + assert len(dispatched) == 1 + evt = dispatched[0] + assert isinstance(evt, WebhookEvent) + assert evt.event_type == "large_tx" + assert evt.data["miner"] == "beta" + assert evt.data["previous_balance"] == 2.0 + assert evt.data["new_balance"] == 12.0 + assert evt.data["delta"] == 10.0 + assert evt.data["direction"] == "credit" + + +def test_check_large_tx_dispatches_debit_event(tmp_path, monkeypatch): + poller = _make_poller(tmp_path, balances_path="/wallet/balances/all") + poller._prev_balances = {"alpha": 100.0} + + response = { + "balances": [ + {"miner_id": "alpha", "amount_rtc": 50.0}, + ], + } + monkeypatch.setattr(poller, "_get", lambda *a, **kw: response) + + dispatched = [] + monkeypatch.setattr("webhook_server.dispatch_event", lambda e, s: dispatched.append(e)) + poller._check_large_tx() + + assert len(dispatched) == 1 + assert dispatched[0].data["direction"] == "debit" + assert dispatched[0].data["delta"] == -50.0 + + +def test_check_large_tx_skips_below_threshold(tmp_path, monkeypatch): + poller = _make_poller(tmp_path) + poller._prev_balances = {"alpha": 1.0} + + response = { + "balances": [ + {"miner_id": "alpha", "amount_rtc": 2.0}, # delta = 1.0, threshold = 5.0 + ], + } + monkeypatch.setattr(poller, "_get", lambda *a, **kw: response) + + dispatched = [] + monkeypatch.setattr("webhook_server.dispatch_event", lambda e, s: dispatched.append(e)) + poller._check_large_tx() + + assert dispatched == [] + + +def test_check_large_tx_accepts_legacy_bare_list_response(tmp_path, monkeypatch): + poller = _make_poller(tmp_path) + response = [ + {"miner_id": "alpha", "balance": 1.5}, + {"miner_id": "beta", "balance": 2.5}, + ] + monkeypatch.setattr(poller, "_get", lambda *a, **kw: response) + + poller._check_large_tx() + + assert poller._prev_balances == {"alpha": 1.5, "beta": 2.5} + + +def test_check_large_tx_ignores_malformed_entries(tmp_path, monkeypatch): + poller = _make_poller(tmp_path) + response = { + "balances": [ + {"miner_id": "alpha", "amount_rtc": 1.0}, + {"miner_id": None, "amount_rtc": 99.0}, # missing id + {"amount_rtc": 99.0}, # missing id + "not-a-dict", # wrong shape + {"miner_id": "beta", "amount_rtc": "garbage"}, # bad amount + ], + } + monkeypatch.setattr(poller, "_get", lambda *a, **kw: response) + + poller._check_large_tx() + + assert poller._prev_balances == {"alpha": 1.0} + + +def test_check_large_tx_handles_unexpected_response_shape(tmp_path, monkeypatch): + poller = _make_poller(tmp_path) + + monkeypatch.setattr(poller, "_get", lambda *a, **kw: "ok") + poller._check_large_tx() + assert poller._prev_balances == {} + + monkeypatch.setattr(poller, "_get", lambda *a, **kw: {"foo": "bar"}) + poller._check_large_tx() + assert poller._prev_balances == {} + + +def test_check_large_tx_handles_http_error(tmp_path, monkeypatch): + poller = _make_poller(tmp_path) + monkeypatch.setattr(poller, "_get", lambda *a, **kw: None) + poller._check_large_tx() + assert poller._prev_balances == {} diff --git a/tools/webhooks/webhook_server.py b/tools/webhooks/webhook_server.py index b984c8eef..1469c11b9 100644 --- a/tools/webhooks/webhook_server.py +++ b/tools/webhooks/webhook_server.py @@ -71,6 +71,12 @@ def _safe_float(val: str, default: float) -> float: DEFAULT_POLL_INTERVAL = _safe_int(os.getenv("WEBHOOK_POLL_INTERVAL", "10"), 10) DEFAULT_LARGE_TX_THRESHOLD = _safe_float(os.getenv("LARGE_TX_THRESHOLD", "100.0"), 100.0) DEFAULT_DB_PATH = os.getenv("WEBHOOK_DB", "webhooks.db") +# Optional admin key for node endpoints (e.g. /wallet/balances/all) that +# require X-Admin-Key. When unset, large_tx polling is skipped because +# the live node no longer exposes the unauthenticated /api/balances path. +DEFAULT_NODE_ADMIN_KEY = os.getenv("RUSTCHAIN_ADMIN_KEY", "") +# Node balance endpoint that supersedes the removed /api/balances route. +NODE_BALANCES_PATH = os.getenv("RUSTCHAIN_BALANCES_PATH", "/wallet/balances/all") MAX_ADMIN_BODY_BYTES = 1024 * 1024 MAX_RETRIES = 5 INITIAL_BACKOFF = 1.0 # seconds @@ -351,11 +357,15 @@ class RustChainPoller: def __init__(self, node_url: str, store: SubscriberStore, poll_interval: int = DEFAULT_POLL_INTERVAL, - large_tx_threshold: float = DEFAULT_LARGE_TX_THRESHOLD): + large_tx_threshold: float = DEFAULT_LARGE_TX_THRESHOLD, + admin_key: str = DEFAULT_NODE_ADMIN_KEY, + balances_path: str = NODE_BALANCES_PATH): self.node_url = node_url.rstrip("/") self.store = store self.poll_interval = poll_interval self.large_tx_threshold = large_tx_threshold + self.admin_key = admin_key + self.balances_path = balances_path # Previous-state snapshots self._prev_tip_slot: Optional[int] = None @@ -364,9 +374,13 @@ def __init__(self, node_url: str, store: SubscriberStore, self._prev_balances: Dict[str, float] = {} self._running = False - def _get(self, path: str) -> Optional[dict]: + def _get(self, path: str, headers: Optional[Dict[str, str]] = None) -> Optional[dict]: try: - resp = requests.get(f"{self.node_url}{path}", timeout=15) + resp = requests.get( + f"{self.node_url}{path}", + timeout=15, + headers=headers or {}, + ) resp.raise_for_status() return resp.json() except Exception as exc: @@ -465,19 +479,60 @@ def _check_miners(self): self._prev_miners = current_miners def _check_large_tx(self): - balances_data = self._get("/api/balances") - if balances_data is None or not isinstance(balances_data, list): + # FIX(#7063): the live node no longer exposes the unauthenticated + # /api/balances surface. The canonical replacement is + # /wallet/balances/all, which requires the X-Admin-Key header and + # returns 503 ADMIN_KEY_UNSET when RC_ADMIN_KEY is not configured + # on the node. Skip polling gracefully in that case so existing + # operators (without an admin key) don't see noisy 503/401 errors + # on every cycle. + if not self.admin_key: + log.debug( + "Skipping large_tx poll: RUSTCHAIN_ADMIN_KEY not configured " + "(node no longer exposes /api/balances)." + ) + return + + balances_data = self._get( + self.balances_path, + headers={"X-Admin-Key": self.admin_key}, + ) + if balances_data is None or not isinstance(balances_data, (dict, list)): + return + + # /wallet/balances/all returns {"balances": [{miner_id, amount_i64, + # amount_rtc}, ...], "total_i64": ..., "total_rtc": ...}. Older + # deployments returned a bare list; accept both shapes. + raw_entries: Any + if isinstance(balances_data, list): + raw_entries = balances_data + elif isinstance(balances_data.get("balances"), list): + raw_entries = balances_data["balances"] + else: return current_balances: Dict[str, float] = {} - for entry in balances_data: + for entry in raw_entries: + if not isinstance(entry, dict): + continue miner_id = entry.get("miner_id") or entry.get("miner") - balance = entry.get("balance") or entry.get("amount", 0) - if miner_id is not None: - try: - current_balances[miner_id] = float(balance) - except (ValueError, TypeError): - continue + if miner_id is None: + continue + # Prefer the human-readable RTC amount when present, fall back + # to the integer stroop balance, then the legacy `amount` key. + balance: Any = ( + entry.get("amount_rtc") + if entry.get("amount_rtc") is not None + else entry.get("amount_i64") + if entry.get("amount_i64") is not None + else entry.get("balance") + if entry.get("balance") is not None + else entry.get("amount", 0) + ) + try: + current_balances[str(miner_id)] = float(balance) + except (ValueError, TypeError): + continue if self._prev_balances: for miner_id, new_bal in current_balances.items(): @@ -706,6 +761,12 @@ def main(): help="RTC threshold for large_tx events (default: %(default)s)") parser.add_argument("--db", default=DEFAULT_DB_PATH, help="SQLite database path (default: %(default)s)") + parser.add_argument("--admin-key", default=DEFAULT_NODE_ADMIN_KEY, + help="X-Admin-Key for node endpoints that require it " + "(env: RUSTCHAIN_ADMIN_KEY, default: %(default)s)") + parser.add_argument("--balances-path", default=NODE_BALANCES_PATH, + help="Node balance endpoint polled for large_tx events " + "(env: RUSTCHAIN_BALANCES_PATH, default: %(default)s)") args = parser.parse_args() store = SubscriberStore(db_path=args.db) @@ -716,6 +777,8 @@ def main(): store=store, poll_interval=args.poll_interval, large_tx_threshold=args.large_tx_threshold, + admin_key=args.admin_key, + balances_path=args.balances_path, ) poller_thread = threading.Thread(target=poller.run, daemon=True) poller_thread.start()