Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 161 additions & 3 deletions tests/test_vintage_ai_rustchain_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ def test_get_miners_accepts_envelope_payloads(monkeypatch):

monkeypatch.setattr(
client,
"_get",
lambda endpoint: {
"_get_public",
lambda endpoint, params=None: {
"items": [
{"miner": "alice", "hardware_type": "PowerPC G4"},
{"miner": "bob", "hardware_type": "x86-64"},
Expand All @@ -40,7 +40,7 @@ def test_get_miners_returns_empty_list_for_unexpected_payload(monkeypatch):
module = load_client_module()
client = module.RustChainClient(base_url="https://node.example")

monkeypatch.setattr(client, "_get", lambda endpoint: {"pagination": {"total": 0}})
monkeypatch.setattr(client, "_get_public", lambda endpoint, params=None: {"pagination": {"total": 0}})

assert client.get_miners() == []

Expand Down Expand Up @@ -138,3 +138,161 @@ def read(self):
return body

return FakeResp()


# --- Issue #6624: _request_public must NOT include admin key headers ---


def test_request_public_uses_public_headers(monkeypatch):
"""_request_public must use _get_public_headers (no admin key)."""
module = load_client_module()
client = module.RustChainClient(
base_url="https://node.example", admin_key="secret-admin-key-123"
)

captured_headers = {}

class FakeResp:
def __enter__(self):
return self
def __exit__(self, *args):
return False
def read(self):
return b'{"ok": true}'

def fake_urlopen(req, **kwargs):
captured_headers.update(req.headers)
return FakeResp()

monkeypatch.setattr("urllib.request.urlopen", fake_urlopen)

result = client._request_public("GET", "/health")
assert result == {"ok": True}
assert "X-Admin-Key" not in captured_headers, (
f"_request_public must not send admin key; got headers: {captured_headers}"
)
assert captured_headers.get("Accept") == "application/json"


def test_request_with_admin_key_sends_header(monkeypatch):
"""_request (authenticated) must include X-Admin-Key when configured."""
module = load_client_module()
client = module.RustChainClient(
base_url="https://node.example", admin_key="secret-admin-key-123"
)

captured_headers = {}

class FakeResp:
def __enter__(self):
return self
def __exit__(self, *args):
return False
def read(self):
return b'{"ok": true}'

def fake_urlopen(req, **kwargs):
captured_headers.update(req.headers)
return FakeResp()

monkeypatch.setattr("urllib.request.urlopen", fake_urlopen)

result = client._request("POST", "/api/submit", data={"key": "val"})
assert result == {"ok": True}
headers_lower = {k.lower(): v for k, v in captured_headers.items()}
assert headers_lower.get("x-admin-key") == "secret-admin-key-123"


def test_read_methods_use_public_no_admin_key(monkeypatch):
"""Read methods (health, get_epoch, get_miners, etc.) must not send admin key."""
module = load_client_module()
client = module.RustChainClient(
base_url="https://node.example", admin_key="secret-admin-key-123"
)

captured_headers = {}

class FakeResp:
def __enter__(self):
return self
def __exit__(self, *args):
return False
def read(self):
return b'{"result": "ok"}'

def fake_urlopen(req, **kwargs):
captured_headers.clear()
captured_headers.update(req.headers)
return FakeResp()

monkeypatch.setattr("urllib.request.urlopen", fake_urlopen)

read_endpoints = [
lambda: client.health(),
lambda: client.get_epoch(),
lambda: client.get_wallet_balance("miner_123"),
lambda: client.get_wallet_history("miner_123"),
lambda: client.get_stats(),
lambda: client.get_hall_of_fame(),
lambda: client.get_miner_eligibility("miner_123"),
]

for call in read_endpoints:
call()
assert "X-Admin-Key" not in captured_headers, (
f"Read method must not send admin key; got: {captured_headers}"
)


def test_admin_key_not_set_no_header_sent(monkeypatch):
"""When admin_key is None, no X-Admin-Key header is sent even on write requests."""
module = load_client_module()
client = module.RustChainClient(base_url="https://node.example")

captured_headers = {}

class FakeResp:
def __enter__(self):
return self
def __exit__(self, *args):
return False
def read(self):
return b'{"ok": true}'

def fake_urlopen(req, **kwargs):
captured_headers.update(req.headers)
return FakeResp()

monkeypatch.setattr("urllib.request.urlopen", fake_urlopen)

client._request("POST", "/api/submit", data={"key": "val"})
assert "X-Admin-Key" not in captured_headers


def test_get_public_headers_never_includes_admin_key():
"""_get_public_headers() must never include admin key regardless of config."""
module = load_client_module()
client = module.RustChainClient(
base_url="https://node.example", admin_key="super-secret"
)
headers = client._get_public_headers()
assert "X-Admin-Key" not in headers
assert "Accept" in headers


def test_get_headers_includes_admin_key_when_set():
"""_get_headers() includes admin key when configured."""
module = load_client_module()
client = module.RustChainClient(
base_url="https://node.example", admin_key="my-admin-key"
)
headers = client._get_headers()
assert headers["X-Admin-Key"] == "my-admin-key"


def test_get_headers_no_admin_key_when_unset():
"""_get_headers() omits admin key when not configured."""
module = load_client_module()
client = module.RustChainClient(base_url="https://node.example")
headers = client._get_headers()
assert "X-Admin-Key" not in headers
105 changes: 87 additions & 18 deletions vintage_ai_video_pipeline/rustchain_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class RustChainClient:
def __init__(
self,
base_url: str = DEFAULT_BASE_URL,
admin_key: Optional[str] = None,
verify_ssl: bool = False,
timeout: int = 30,
retry_count: int = 3,
Expand All @@ -38,12 +39,14 @@ def __init__(

Args:
base_url: Base URL of the RustChain API
admin_key: Admin authentication key (used only for write operations)
verify_ssl: Enable SSL verification (default: False for self-signed certs)
timeout: Request timeout in seconds
retry_count: Number of retries on failure
retry_delay: Delay between retries (seconds)
"""
self.base_url = base_url.rstrip("/")
self.admin_key = admin_key
self.verify_ssl = verify_ssl
self.timeout = timeout
self.retry_count = retry_count
Expand All @@ -60,7 +63,17 @@ def __init__(
self._known_miners = {}

def _get_headers(self) -> Dict[str, str]:
"""Get request headers"""
"""Get request headers (includes admin key for write operations)"""
headers = {
"Accept": "application/json",
"User-Agent": "vintage-ai-video-pipeline/1.0.0",
}
if self.admin_key:
headers["X-Admin-Key"] = self.admin_key
return headers

def _get_public_headers(self) -> Dict[str, str]:
"""Get request headers WITHOUT admin key (for read-only operations)"""
return {
"Accept": "application/json",
"User-Agent": "vintage-ai-video-pipeline/1.0.0",
Expand Down Expand Up @@ -124,6 +137,62 @@ def _request(

raise Exception("Max retries exceeded")

def _request_public(
self,
method: str,
endpoint: str,
) -> Dict[str, Any]:
"""Make an HTTP request WITHOUT the admin key header (read-only operations).

This ensures that read operations (GET requests for balances, miner lists,
etc.) never send admin credentials, following the principle of least privilege.
"""
url = f"{self.base_url}{endpoint}"
headers = self._get_public_headers()

for attempt in range(self.retry_count):
try:
req = Request(url, headers=headers, method=method)

with urllib.request.urlopen(
req,
context=self._ctx,
timeout=self.timeout
) as response:
raw = response.read()
response_data = raw.decode("utf-8").strip() if raw else ""
if not response_data:
return {}
return json.loads(response_data)

except HTTPError as e:
error_body = e.read().decode("utf-8") if e.fp else ""
if attempt == self.retry_count - 1:
raise Exception(
f"HTTP Error {e.code}: {e.reason} - {error_body}"
)
except URLError as e:
if attempt == self.retry_count - 1:
raise Exception(f"Connection Error: {e.reason}")
except json.JSONDecodeError as e:
if attempt == self.retry_count - 1:
raise Exception(f"Invalid JSON response: {str(e)}")
except Exception:
if attempt == self.retry_count - 1:
raise

if attempt < self.retry_count - 1:
time.sleep(self.retry_delay * (attempt + 1))

raise Exception("Max retries exceeded")

def _get_public(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]:
"""GET request without admin key (read-only operations)"""
if params:
query = urllib.parse.urlencode(params)
endpoint = f"{endpoint}?{query}"
return self._request_public("GET", endpoint)

def _get(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]:
"""GET request with query parameters"""
if params:
Expand All @@ -132,21 +201,21 @@ def _get(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]:
return self._request("GET", endpoint)

def health(self) -> Dict[str, Any]:
"""Check node health"""
return self._get("/health")
"""Check node health (public, no admin key)"""
return self._get_public("/health")

def get_epoch(self) -> Dict[str, Any]:
"""Get current epoch information"""
return self._get("/epoch")
"""Get current epoch information (public, no admin key)"""
return self._get_public("/epoch")

def get_miners(self) -> List[Dict[str, Any]]:
"""
List all active miners
List all active miners (public, no admin key)

Returns:
List of miner information dictionaries
"""
data = self._get("/api/miners")
data = self._get_public("/api/miners")
if isinstance(data, list):
return data
if isinstance(data, dict):
Expand All @@ -157,24 +226,24 @@ def get_miners(self) -> List[Dict[str, Any]]:
return []

def get_miner_eligibility(self, miner_id: str) -> Dict[str, Any]:
"""Check miner's epoch eligibility"""
return self._get("/lottery/eligibility", params={"miner_id": miner_id})
"""Check miner's epoch eligibility (public, no admin key)"""
return self._get_public("/lottery/eligibility", params={"miner_id": miner_id})

def get_wallet_balance(self, miner_id: str) -> Dict[str, Any]:
"""Get wallet balance for a miner"""
return self._get("/wallet/balance", params={"miner_id": miner_id})
"""Get wallet balance for a miner (public, no admin key)"""
return self._get_public("/wallet/balance", params={"miner_id": miner_id})

def get_wallet_history(self, miner_id: str, limit: int = 10) -> Dict[str, Any]:
"""Get transaction history for a miner"""
return self._get("/wallet/history", params={"miner_id": miner_id, "limit": limit})
"""Get transaction history for a miner (public, no admin key)"""
return self._get_public("/wallet/history", params={"miner_id": miner_id, "limit": limit})

def get_stats(self) -> Dict[str, Any]:
"""Get network statistics"""
return self._get("/api/stats")
"""Get network statistics (public, no admin key)"""
return self._get_public("/api/stats")

def get_hall_of_fame(self) -> Dict[str, Any]:
"""Get Hall of Fame leaderboard"""
return self._get("/api/hall_of_fame")
"""Get Hall of Fame leaderboard (public, no admin key)"""
return self._get_public("/api/hall_of_fame")

def monitor_attestations(
self,
Expand Down
Loading