Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 209 additions & 1 deletion tools/webhooks/test_webhook_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@

import socket

from webhook_server import validate_webhook_url
from webhook_server import (
RustChainPoller,
SubscriberStore,
WebhookEvent,
dispatch_event,
validate_webhook_url,
)


def _addrinfo(*ips):
Expand Down Expand Up @@ -67,3 +73,205 @@ def test_validate_webhook_url_accepts_public_resolved_ips(monkeypatch):
)

assert validate_webhook_url("https://example.com/hook") is None


# ---------------------------------------------------------------------------
# FIX(#7063): large_tx polling now hits /wallet/balances/all with X-Admin-Key
# ---------------------------------------------------------------------------


def _make_poller(tmp_path, admin_key="secret-admin-key", balances_path="/wallet/balances/all"):
store = SubscriberStore(db_path=str(tmp_path / "subs.db"))
return RustChainPoller(
node_url="http://node.local:5000",
store=store,
poll_interval=60,
large_tx_threshold=5.0,
admin_key=admin_key,
balances_path=balances_path,
)


def test_check_large_tx_skips_when_admin_key_unset(tmp_path, monkeypatch):
poller = _make_poller(tmp_path, admin_key="")
calls = []

def fail_get(*args, **kwargs):
calls.append((args, kwargs))
raise AssertionError("_get must not be called when admin key is empty")

monkeypatch.setattr(poller, "_get", fail_get)
poller._check_large_tx()
assert calls == []


def test_check_large_tx_uses_new_endpoint_with_admin_key_header(tmp_path, monkeypatch):
poller = _make_poller(tmp_path)

captured = {}

def fake_get(path, headers=None):
captured["path"] = path
captured["headers"] = headers
return {"balances": [], "total_i64": 0, "total_rtc": 0.0}

monkeypatch.setattr(poller, "_get", fake_get)
poller._check_large_tx()

assert captured["path"] == "/wallet/balances/all"
assert captured["headers"] == {"X-Admin-Key": "secret-admin-key"}


def test_check_large_tx_uses_custom_balances_path(tmp_path, monkeypatch):
poller = _make_poller(tmp_path, balances_path="/v2/balances")

captured = {}

def fake_get(path, headers=None):
captured["path"] = path
return {"balances": []}

monkeypatch.setattr(poller, "_get", fake_get)
poller._check_large_tx()

assert captured["path"] == "/v2/balances"


def test_check_large_tx_handles_envelope_response_format(tmp_path, monkeypatch):
poller = _make_poller(tmp_path)
response = {
"balances": [
{"miner_id": "alpha", "amount_i64": 1_000_000_000, "amount_rtc": 1.0},
{"miner_id": "beta", "amount_i64": 2_000_000_000, "amount_rtc": 2.0},
{"miner_id": "gamma", "amount_i64": 5_000_000_000, "amount_rtc": 5.0},
],
"total_i64": 8_000_000_000,
"total_rtc": 8.0,
}
monkeypatch.setattr(poller, "_get", lambda *a, **kw: response)

poller._check_large_tx()

# First poll: no previous snapshot, nothing dispatched, but internal
# state should now hold the new balances.
assert poller._prev_balances == {"alpha": 1.0, "beta": 2.0, "gamma": 5.0}


def test_check_large_tx_dispatches_event_when_threshold_exceeded(tmp_path, monkeypatch):
poller = _make_poller(tmp_path)
poller._prev_balances = {"alpha": 1.0, "beta": 2.0}

# beta grew by 10 RTC (well over the 5 RTC threshold).
response = {
"balances": [
{"miner_id": "alpha", "amount_rtc": 1.0},
{"miner_id": "beta", "amount_rtc": 12.0},
],
}
monkeypatch.setattr(poller, "_get", lambda *a, **kw: response)

dispatched = []

def fake_dispatch(event, store):
dispatched.append(event)

monkeypatch.setattr("webhook_server.dispatch_event", fake_dispatch)
poller._check_large_tx()

assert len(dispatched) == 1
evt = dispatched[0]
assert isinstance(evt, WebhookEvent)
assert evt.event_type == "large_tx"
assert evt.data["miner"] == "beta"
assert evt.data["previous_balance"] == 2.0
assert evt.data["new_balance"] == 12.0
assert evt.data["delta"] == 10.0
assert evt.data["direction"] == "credit"


def test_check_large_tx_dispatches_debit_event(tmp_path, monkeypatch):
poller = _make_poller(tmp_path, balances_path="/wallet/balances/all")
poller._prev_balances = {"alpha": 100.0}

response = {
"balances": [
{"miner_id": "alpha", "amount_rtc": 50.0},
],
}
monkeypatch.setattr(poller, "_get", lambda *a, **kw: response)

dispatched = []
monkeypatch.setattr("webhook_server.dispatch_event", lambda e, s: dispatched.append(e))
poller._check_large_tx()

assert len(dispatched) == 1
assert dispatched[0].data["direction"] == "debit"
assert dispatched[0].data["delta"] == -50.0


def test_check_large_tx_skips_below_threshold(tmp_path, monkeypatch):
poller = _make_poller(tmp_path)
poller._prev_balances = {"alpha": 1.0}

response = {
"balances": [
{"miner_id": "alpha", "amount_rtc": 2.0}, # delta = 1.0, threshold = 5.0
],
}
monkeypatch.setattr(poller, "_get", lambda *a, **kw: response)

dispatched = []
monkeypatch.setattr("webhook_server.dispatch_event", lambda e, s: dispatched.append(e))
poller._check_large_tx()

assert dispatched == []


def test_check_large_tx_accepts_legacy_bare_list_response(tmp_path, monkeypatch):
poller = _make_poller(tmp_path)
response = [
{"miner_id": "alpha", "balance": 1.5},
{"miner_id": "beta", "balance": 2.5},
]
monkeypatch.setattr(poller, "_get", lambda *a, **kw: response)

poller._check_large_tx()

assert poller._prev_balances == {"alpha": 1.5, "beta": 2.5}


def test_check_large_tx_ignores_malformed_entries(tmp_path, monkeypatch):
poller = _make_poller(tmp_path)
response = {
"balances": [
{"miner_id": "alpha", "amount_rtc": 1.0},
{"miner_id": None, "amount_rtc": 99.0}, # missing id
{"amount_rtc": 99.0}, # missing id
"not-a-dict", # wrong shape
{"miner_id": "beta", "amount_rtc": "garbage"}, # bad amount
],
}
monkeypatch.setattr(poller, "_get", lambda *a, **kw: response)

poller._check_large_tx()

assert poller._prev_balances == {"alpha": 1.0}


def test_check_large_tx_handles_unexpected_response_shape(tmp_path, monkeypatch):
poller = _make_poller(tmp_path)

monkeypatch.setattr(poller, "_get", lambda *a, **kw: "ok")
poller._check_large_tx()
assert poller._prev_balances == {}

monkeypatch.setattr(poller, "_get", lambda *a, **kw: {"foo": "bar"})
poller._check_large_tx()
assert poller._prev_balances == {}


def test_check_large_tx_handles_http_error(tmp_path, monkeypatch):
poller = _make_poller(tmp_path)
monkeypatch.setattr(poller, "_get", lambda *a, **kw: None)
poller._check_large_tx()
assert poller._prev_balances == {}
87 changes: 75 additions & 12 deletions tools/webhooks/webhook_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ def _safe_float(val: str, default: float) -> float:
DEFAULT_POLL_INTERVAL = _safe_int(os.getenv("WEBHOOK_POLL_INTERVAL", "10"), 10)
DEFAULT_LARGE_TX_THRESHOLD = _safe_float(os.getenv("LARGE_TX_THRESHOLD", "100.0"), 100.0)
DEFAULT_DB_PATH = os.getenv("WEBHOOK_DB", "webhooks.db")
# Optional admin key for node endpoints (e.g. /wallet/balances/all) that
# require X-Admin-Key. When unset, large_tx polling is skipped because
# the live node no longer exposes the unauthenticated /api/balances path.
DEFAULT_NODE_ADMIN_KEY = os.getenv("RUSTCHAIN_ADMIN_KEY", "")
# Node balance endpoint that supersedes the removed /api/balances route.
NODE_BALANCES_PATH = os.getenv("RUSTCHAIN_BALANCES_PATH", "/wallet/balances/all")
MAX_ADMIN_BODY_BYTES = 1024 * 1024
MAX_RETRIES = 5
INITIAL_BACKOFF = 1.0 # seconds
Expand Down Expand Up @@ -351,11 +357,15 @@ class RustChainPoller:

def __init__(self, node_url: str, store: SubscriberStore,
poll_interval: int = DEFAULT_POLL_INTERVAL,
large_tx_threshold: float = DEFAULT_LARGE_TX_THRESHOLD):
large_tx_threshold: float = DEFAULT_LARGE_TX_THRESHOLD,
admin_key: str = DEFAULT_NODE_ADMIN_KEY,
balances_path: str = NODE_BALANCES_PATH):
self.node_url = node_url.rstrip("/")
self.store = store
self.poll_interval = poll_interval
self.large_tx_threshold = large_tx_threshold
self.admin_key = admin_key
self.balances_path = balances_path

# Previous-state snapshots
self._prev_tip_slot: Optional[int] = None
Expand All @@ -364,9 +374,13 @@ def __init__(self, node_url: str, store: SubscriberStore,
self._prev_balances: Dict[str, float] = {}
self._running = False

def _get(self, path: str) -> Optional[dict]:
def _get(self, path: str, headers: Optional[Dict[str, str]] = None) -> Optional[dict]:
try:
resp = requests.get(f"{self.node_url}{path}", timeout=15)
resp = requests.get(
f"{self.node_url}{path}",
timeout=15,
headers=headers or {},
)
resp.raise_for_status()
return resp.json()
except Exception as exc:
Expand Down Expand Up @@ -465,19 +479,60 @@ def _check_miners(self):
self._prev_miners = current_miners

def _check_large_tx(self):
balances_data = self._get("/api/balances")
if balances_data is None or not isinstance(balances_data, list):
# FIX(#7063): the live node no longer exposes the unauthenticated
# /api/balances surface. The canonical replacement is
# /wallet/balances/all, which requires the X-Admin-Key header and
# returns 503 ADMIN_KEY_UNSET when RC_ADMIN_KEY is not configured
# on the node. Skip polling gracefully in that case so existing
# operators (without an admin key) don't see noisy 503/401 errors
# on every cycle.
if not self.admin_key:
log.debug(
"Skipping large_tx poll: RUSTCHAIN_ADMIN_KEY not configured "
"(node no longer exposes /api/balances)."
)
return

balances_data = self._get(
self.balances_path,
headers={"X-Admin-Key": self.admin_key},
)
if balances_data is None or not isinstance(balances_data, (dict, list)):
return

# /wallet/balances/all returns {"balances": [{miner_id, amount_i64,
# amount_rtc}, ...], "total_i64": ..., "total_rtc": ...}. Older
# deployments returned a bare list; accept both shapes.
raw_entries: Any
if isinstance(balances_data, list):
raw_entries = balances_data
elif isinstance(balances_data.get("balances"), list):
raw_entries = balances_data["balances"]
else:
return

current_balances: Dict[str, float] = {}
for entry in balances_data:
for entry in raw_entries:
if not isinstance(entry, dict):
continue
miner_id = entry.get("miner_id") or entry.get("miner")
balance = entry.get("balance") or entry.get("amount", 0)
if miner_id is not None:
try:
current_balances[miner_id] = float(balance)
except (ValueError, TypeError):
continue
if miner_id is None:
continue
# Prefer the human-readable RTC amount when present, fall back
# to the integer stroop balance, then the legacy `amount` key.
balance: Any = (
entry.get("amount_rtc")
if entry.get("amount_rtc") is not None
else entry.get("amount_i64")
if entry.get("amount_i64") is not None
else entry.get("balance")
if entry.get("balance") is not None
else entry.get("amount", 0)
)
try:
current_balances[str(miner_id)] = float(balance)
except (ValueError, TypeError):
continue

if self._prev_balances:
for miner_id, new_bal in current_balances.items():
Expand Down Expand Up @@ -706,6 +761,12 @@ def main():
help="RTC threshold for large_tx events (default: %(default)s)")
parser.add_argument("--db", default=DEFAULT_DB_PATH,
help="SQLite database path (default: %(default)s)")
parser.add_argument("--admin-key", default=DEFAULT_NODE_ADMIN_KEY,
help="X-Admin-Key for node endpoints that require it "
"(env: RUSTCHAIN_ADMIN_KEY, default: %(default)s)")
parser.add_argument("--balances-path", default=NODE_BALANCES_PATH,
help="Node balance endpoint polled for large_tx events "
"(env: RUSTCHAIN_BALANCES_PATH, default: %(default)s)")
args = parser.parse_args()

store = SubscriberStore(db_path=args.db)
Expand All @@ -716,6 +777,8 @@ def main():
store=store,
poll_interval=args.poll_interval,
large_tx_threshold=args.large_tx_threshold,
admin_key=args.admin_key,
balances_path=args.balances_path,
)
poller_thread = threading.Thread(target=poller.run, daemon=True)
poller_thread.start()
Expand Down
Loading