diff --git a/tests/test_vintage_ai_rustchain_client.py b/tests/test_vintage_ai_rustchain_client.py index 2e24128a3..1fea0d27b 100644 --- a/tests/test_vintage_ai_rustchain_client.py +++ b/tests/test_vintage_ai_rustchain_client.py @@ -20,8 +20,8 @@ def test_get_miners_accepts_envelope_payloads(monkeypatch): monkeypatch.setattr( client, - "_get", - lambda endpoint: { + "_get_public", + lambda endpoint, params=None: { "items": [ {"miner": "alice", "hardware_type": "PowerPC G4"}, {"miner": "bob", "hardware_type": "x86-64"}, @@ -40,7 +40,7 @@ def test_get_miners_returns_empty_list_for_unexpected_payload(monkeypatch): module = load_client_module() client = module.RustChainClient(base_url="https://node.example") - monkeypatch.setattr(client, "_get", lambda endpoint: {"pagination": {"total": 0}}) + monkeypatch.setattr(client, "_get_public", lambda endpoint, params=None: {"pagination": {"total": 0}}) assert client.get_miners() == [] @@ -138,3 +138,161 @@ def read(self): return body return FakeResp() + + +# --- Issue #6624: _request_public must NOT include admin key headers --- + + +def test_request_public_uses_public_headers(monkeypatch): + """_request_public must use _get_public_headers (no admin key).""" + module = load_client_module() + client = module.RustChainClient( + base_url="https://node.example", admin_key="secret-admin-key-123" + ) + + captured_headers = {} + + class FakeResp: + def __enter__(self): + return self + def __exit__(self, *args): + return False + def read(self): + return b'{"ok": true}' + + def fake_urlopen(req, **kwargs): + captured_headers.update(req.headers) + return FakeResp() + + monkeypatch.setattr("urllib.request.urlopen", fake_urlopen) + + result = client._request_public("GET", "/health") + assert result == {"ok": True} + assert "X-Admin-Key" not in captured_headers, ( + f"_request_public must not send admin key; got headers: {captured_headers}" + ) + assert captured_headers.get("Accept") == "application/json" + + +def test_request_with_admin_key_sends_header(monkeypatch): + """_request (authenticated) must include X-Admin-Key when configured.""" + module = load_client_module() + client = module.RustChainClient( + base_url="https://node.example", admin_key="secret-admin-key-123" + ) + + captured_headers = {} + + class FakeResp: + def __enter__(self): + return self + def __exit__(self, *args): + return False + def read(self): + return b'{"ok": true}' + + def fake_urlopen(req, **kwargs): + captured_headers.update(req.headers) + return FakeResp() + + monkeypatch.setattr("urllib.request.urlopen", fake_urlopen) + + result = client._request("POST", "/api/submit", data={"key": "val"}) + assert result == {"ok": True} + headers_lower = {k.lower(): v for k, v in captured_headers.items()} + assert headers_lower.get("x-admin-key") == "secret-admin-key-123" + + +def test_read_methods_use_public_no_admin_key(monkeypatch): + """Read methods (health, get_epoch, get_miners, etc.) must not send admin key.""" + module = load_client_module() + client = module.RustChainClient( + base_url="https://node.example", admin_key="secret-admin-key-123" + ) + + captured_headers = {} + + class FakeResp: + def __enter__(self): + return self + def __exit__(self, *args): + return False + def read(self): + return b'{"result": "ok"}' + + def fake_urlopen(req, **kwargs): + captured_headers.clear() + captured_headers.update(req.headers) + return FakeResp() + + monkeypatch.setattr("urllib.request.urlopen", fake_urlopen) + + read_endpoints = [ + lambda: client.health(), + lambda: client.get_epoch(), + lambda: client.get_wallet_balance("miner_123"), + lambda: client.get_wallet_history("miner_123"), + lambda: client.get_stats(), + lambda: client.get_hall_of_fame(), + lambda: client.get_miner_eligibility("miner_123"), + ] + + for call in read_endpoints: + call() + assert "X-Admin-Key" not in captured_headers, ( + f"Read method must not send admin key; got: {captured_headers}" + ) + + +def test_admin_key_not_set_no_header_sent(monkeypatch): + """When admin_key is None, no X-Admin-Key header is sent even on write requests.""" + module = load_client_module() + client = module.RustChainClient(base_url="https://node.example") + + captured_headers = {} + + class FakeResp: + def __enter__(self): + return self + def __exit__(self, *args): + return False + def read(self): + return b'{"ok": true}' + + def fake_urlopen(req, **kwargs): + captured_headers.update(req.headers) + return FakeResp() + + monkeypatch.setattr("urllib.request.urlopen", fake_urlopen) + + client._request("POST", "/api/submit", data={"key": "val"}) + assert "X-Admin-Key" not in captured_headers + + +def test_get_public_headers_never_includes_admin_key(): + """_get_public_headers() must never include admin key regardless of config.""" + module = load_client_module() + client = module.RustChainClient( + base_url="https://node.example", admin_key="super-secret" + ) + headers = client._get_public_headers() + assert "X-Admin-Key" not in headers + assert "Accept" in headers + + +def test_get_headers_includes_admin_key_when_set(): + """_get_headers() includes admin key when configured.""" + module = load_client_module() + client = module.RustChainClient( + base_url="https://node.example", admin_key="my-admin-key" + ) + headers = client._get_headers() + assert headers["X-Admin-Key"] == "my-admin-key" + + +def test_get_headers_no_admin_key_when_unset(): + """_get_headers() omits admin key when not configured.""" + module = load_client_module() + client = module.RustChainClient(base_url="https://node.example") + headers = client._get_headers() + assert "X-Admin-Key" not in headers diff --git a/tools/social_mining/anti_gaming.py b/tools/social_mining/anti_gaming.py new file mode 100644 index 000000000..4a52bd1a0 --- /dev/null +++ b/tools/social_mining/anti_gaming.py @@ -0,0 +1,162 @@ +"""RIP-310 Anti-Gaming Measures. + +Frequency caps, content quality checks, and RIP-309 epoch-based metric +rotation integration to prevent social mining exploitation. +""" + +import hashlib +import time +from collections import defaultdict +from dataclasses import dataclass, field +from typing import Optional + +from social_mining import ( + ActionType, + BeaconID, + Platform, + SocialAction, + SocialMiningEngine, +) + + +@dataclass +class FrequencyTracker: + """Tracks action frequency per beacon per type per day.""" + _counts: dict[str, dict[str, int]] = field(default_factory=lambda: defaultdict(lambda: defaultdict(int))) + _windows: dict[str, float] = field(default_factory=dict) # beacon_key -> window start + + def _window_key(self, beacon_hash: str, action_type: ActionType) -> str: + return f"{beacon_hash}:{action_type.value}" + + def _get_window_start(self) -> float: + t = time.time() + return t - (t % 86400) + + def increment(self, beacon_hash: str, action_type: ActionType) -> int: + key = self._window_key(beacon_hash, action_type) + now = time.time() + window_start = self._get_window_start() + stored = self._windows.get(key) + if stored is None or stored < window_start: + self._counts[key] = defaultdict(int) + self._windows[key] = now + self._counts[key][str(int(now // 86400))] += 1 + return self._counts[key][str(int(now // 86400))] + + def get_count(self, beacon_hash: str, action_type: ActionType) -> int: + key = self._window_key(beacon_hash, action_type) + day = str(int(time.time() // 86400)) + return self._counts[key].get(day, 0) + + +FREQUENCY_LIMITS: dict[ActionType, int] = { + ActionType.POST: 5, + ActionType.THREAD: 5, + ActionType.VIDEO: 3, + ActionType.COMMENT: 20, + ActionType.UPVOTE_RECEIVED: 999_999, # effectively uncapped + ActionType.TIP_RECEIVED: 999_999, +} + +MIN_COMMENT_LENGTH = 50 + +# RIP-309 metric rotation: weights shift each epoch +METRIC_ROTATION_WEIGHTS = [ + {"engagement": 0.4, "quality": 0.3, "recency": 0.3}, + {"engagement": 0.3, "quality": 0.4, "recency": 0.3}, + {"engagement": 0.3, "quality": 0.3, "recency": 0.4}, +] + + +@dataclass +class ContentFingerprint: + """Fingerprint of content to detect duplicates.""" + content_hash: str + timestamp: float = field(default_factory=time.time) + + +class AntiGamingModule: + """Enforces anti-gaming rules for social mining.""" + + def __init__(self, engine: SocialMiningEngine) -> None: + self.engine = engine + self.frequency_tracker = FrequencyTracker() + self.content_fingerprints: dict[str, list[ContentFingerprint]] = defaultdict(list) + self.flagged_beacons: set[str] = set() + + def check_frequency_cap( + self, beacon_hash: str, action_type: ActionType + ) -> tuple[bool, int]: + """Returns (allowed, current_count).""" + limit = FREQUENCY_LIMITS.get(action_type, 0) + current = self.frequency_tracker.get_count(beacon_hash, action_type) + return current < limit, current + + def check_content_quality(self, action: SocialAction) -> bool: + """Substantive comments must be >50 characters.""" + if action.action_type == ActionType.COMMENT: + return action.comment_length > MIN_COMMENT_LENGTH + return True + + def check_duplicate_content(self, action: SocialAction) -> bool: + """Detect duplicate content from the same beacon within 1 hour.""" + fps = self.content_fingerprints[action.beacon.beacon_hash] + now = time.time() + for fp in fps: + if ( + fp.content_hash == action.content_hash + and now - fp.timestamp < 3600 + ): + return False + fps.append(ContentFingerprint(content_hash=action.content_hash)) + return True + + def validate_action(self, action: SocialAction) -> tuple[bool, str]: + """Full anti-gaming validation. Returns (allowed, reason).""" + if not action.beacon.verify(): + return False, "Invalid or inactive Beacon ID" + + if action.beacon.beacon_hash in self.flagged_beacons: + return False, "Beacon flagged for gaming" + + allowed, count = self.check_frequency_cap( + action.beacon.beacon_hash, action.action_type + ) + if not allowed: + return False, f"Frequency cap reached for {action.action_type.value}" + + if not self.check_content_quality(action): + return False, "Comment does not meet minimum quality threshold (>50 chars)" + + if not self.check_duplicate_content(action): + return False, "Duplicate content detected within cooldown window" + + return True, "passed" + + def record_action(self, action: SocialAction) -> None: + """Record the action for frequency tracking after validation passes.""" + self.frequency_tracker.increment( + action.beacon.beacon_hash, action.action_type + ) + + def flag_beacon(self, beacon_hash: str) -> None: + self.flagged_beacons.add(beacon_hash) + + def unflag_beacon(self, beacon_hash: str) -> None: + self.flagged_beacons.discard(beacon_hash) + + def get_metric_weights(self, epoch_id: int) -> dict[str, float]: + """RIP-309 epoch-based metric rotation.""" + idx = epoch_id % len(METRIC_ROTATION_WEIGHTS) + return METRIC_ROTATION_WEIGHTS[idx] + + def calculate_rotation_score( + self, epoch_id: int, engagement: float, quality: float, recency: float + ) -> float: + """Apply metric rotation weights to composite score.""" + weights = self.get_metric_weights(epoch_id) + return ( + engagement * weights["engagement"] + + quality * weights["quality"] + + recency * weights["recency"] + ) diff --git a/tools/social_mining/platform_rewards.py b/tools/social_mining/platform_rewards.py new file mode 100644 index 000000000..c97f2f6cb --- /dev/null +++ b/tools/social_mining/platform_rewards.py @@ -0,0 +1,122 @@ +"""RIP-310 Platform-Specific Reward Calculations. + +Exact reward rates and daily caps per the RIP-310 specification: + - Post on Moltbook: 0.01 RTC, 5 posts/day cap + - Post on 4claw: 0.01 RTC, 5 threads/day cap + - Upload video on BoTTube: 0.05 RTC, 3 videos/day cap + - Comment (substantive, >50 chars): 0.002 RTC, 20/day cap + - Receive upvote: 0.001 RTC, uncapped + - Receive tip: full amount minus 8% fee, uncapped +""" + +from dataclasses import dataclass +from typing import Optional + +from social_mining import ( + ActionType, + BeaconID, + Platform, + SocialAction, + SocialMiningEngine, +) + + +@dataclass +class RewardRule: + action_type: ActionType + rtc_reward: float + daily_cap: Optional[int] # None = uncapped + + +REWARD_TABLE: dict[tuple[Platform, ActionType], RewardRule] = { + (Platform.MOLTBOOK, ActionType.POST): RewardRule( + action_type=ActionType.POST, rtc_reward=0.01, daily_cap=5 + ), + (Platform.FOURCLAW, ActionType.THREAD): RewardRule( + action_type=ActionType.THREAD, rtc_reward=0.01, daily_cap=5 + ), + (Platform.BOTUBE, ActionType.VIDEO): RewardRule( + action_type=ActionType.VIDEO, rtc_reward=0.05, daily_cap=3 + ), + (Platform.MOLTBOOK, ActionType.COMMENT): RewardRule( + action_type=ActionType.COMMENT, rtc_reward=0.002, daily_cap=20 + ), + (Platform.FOURCLAW, ActionType.COMMENT): RewardRule( + action_type=ActionType.COMMENT, rtc_reward=0.002, daily_cap=20 + ), + (Platform.BOTUBE, ActionType.COMMENT): RewardRule( + action_type=ActionType.COMMENT, rtc_reward=0.002, daily_cap=20 + ), + (Platform.MOLTBOOK, ActionType.UPVOTE_RECEIVED): RewardRule( + action_type=ActionType.UPVOTE_RECEIVED, rtc_reward=0.001, daily_cap=None + ), + (Platform.FOURCLAW, ActionType.UPVOTE_RECEIVED): RewardRule( + action_type=ActionType.UPVOTE_RECEIVED, rtc_reward=0.001, daily_cap=None + ), + (Platform.BOTUBE, ActionType.UPVOTE_RECEIVED): RewardRule( + action_type=ActionType.UPVOTE_RECEIVED, rtc_reward=0.001, daily_cap=None + ), +} + +TIP_FEE_RATE = 0.08 # 8% platform fee + + +class PlatformRewardCalculator: + """Calculates rewards per RIP-310 specification.""" + + def __init__(self, engine: SocialMiningEngine) -> None: + self.engine = engine + + def get_reward_rule( + self, platform: Platform, action_type: ActionType + ) -> Optional[RewardRule]: + return REWARD_TABLE.get((platform, action_type)) + + def calculate_reward(self, action: SocialAction) -> float: + rule = self.get_reward_rule(action.platform, action.action_type) + if rule is None: + return 0.0 + + # Enforce content quality for comments + if action.action_type == ActionType.COMMENT and action.comment_length <= 50: + return 0.0 + + # Enforce daily cap + if rule.daily_cap is not None: + today_count = self.engine.get_actions_today( + action.beacon.beacon_hash, action.action_type + ) + if today_count >= rule.daily_cap: + return 0.0 + + return rule.rtc_reward + + def calculate_tip_reward(self, tip_amount: float) -> float: + """Tip reward: full amount minus 8% platform fee to social_mining_pool.""" + if tip_amount <= 0: + return 0.0 + fee = tip_amount * TIP_FEE_RATE + return tip_amount - fee + + def process_tip(self, action: SocialAction, tip_amount: float) -> float: + """Process a tip action: collect fee into treasury, return net to user.""" + if action.action_type != ActionType.TIP_RECEIVED: + return 0.0 + if tip_amount <= 0: + return 0.0 + net = self.calculate_tip_reward(tip_amount) + self.engine.treasury.collect_tip_fee(tip_amount) + return net + + def process_action(self, action: SocialAction) -> float: + """Process any social action and return the RTC reward.""" + if not action.beacon.verify(): + return 0.0 + + if action.action_type == ActionType.TIP_RECEIVED: + return self.process_tip(action, action.tip_amount or 0.0) + + reward = self.calculate_reward(action) + if reward > 0: + self.engine.record_reward(action, reward) + return reward diff --git a/tools/social_mining/social_mining.py b/tools/social_mining/social_mining.py new file mode 100644 index 000000000..2756c6ba5 --- /dev/null +++ b/tools/social_mining/social_mining.py @@ -0,0 +1,207 @@ +"""RIP-310 Social Mining Protocol - Core Logic. + +Beacon ID verification, reward tracking, and epoch settlement for +social mining on 4claw, Moltbook, and BoTTube. +""" + +from dataclasses import dataclass, field +from enum import Enum +from typing import Optional +import hashlib +import time + + +class Platform(Enum): + MOLTBOOK = "moltbook" + FOURCLAW = "4claw" + BOTUBE = "botube" + + +class ActionType(Enum): + POST = "post" + THREAD = "thread" + VIDEO = "video" + COMMENT = "comment" + UPVOTE_RECEIVED = "upvote_received" + TIP_RECEIVED = "tip_received" + + +class EpochState(Enum): + ACTIVE = "active" + SETTLED = "settled" + FINALIZED = "finalized" + + +@dataclass +class BeaconID: + """Hardware-attested identity for a social mining participant.""" + beacon_hash: str + public_key: str + registered_at: float = field(default_factory=time.time) + is_active: bool = True + + def verify(self) -> bool: + if not self.is_active: + return False + if not self.beacon_hash or len(self.beacon_hash) < 64: + return False + return True + + @staticmethod + def generate_from_key(public_key: str) -> "BeaconID": + h = hashlib.sha256(public_key.encode()).hexdigest() + return BeaconID(beacon_hash=h, public_key=public_key) + + +@dataclass +class SocialAction: + """A single social mining action submitted for reward.""" + action_id: str + platform: Platform + action_type: ActionType + beacon: BeaconID + content_hash: str + timestamp: float = field(default_factory=time.time) + tip_amount: Optional[float] = None # only for TIP_RECEIVED + comment_length: int = 0 # only for COMMENT + + +@dataclass +class RewardRecord: + """Record of a reward credited to a participant.""" + action_id: str + beacon_hash: str + platform: Platform + action_type: ActionType + rtc_amount: float + epoch_id: int + timestamp: float = field(default_factory=time.time) + + +@dataclass +class Epoch: + """An epoch period for settlement.""" + epoch_id: int + start_time: float + end_time: float + state: EpochState = EpochState.ACTIVE + + def is_active(self) -> bool: + return self.state == EpochState.ACTIVE + + def is_within(self, timestamp: float) -> bool: + return self.start_time <= timestamp < self.end_time + + def settle(self) -> None: + if self.state == EpochState.ACTIVE: + self.state = EpochState.SETTLED + + def finalize(self) -> None: + if self.state == EpochState.SETTLED: + self.state = EpochState.FINALIZED + + +@dataclass +class TreasuryPool: + """Treasury pool for social mining rewards.""" + total_inflow: float = 0.0 + total_outflow: float = 0.0 + balance: float = 0.0 + tip_fee_rate: float = 0.08 # 8% platform fee on tips + + def deposit(self, amount: float) -> None: + self.total_inflow += amount + self.balance += amount + + def withdraw_reward(self, amount: float) -> bool: + if amount > self.balance: + return False + self.total_outflow += amount + self.balance -= amount + return True + + def collect_tip_fee(self, tip_amount: float) -> float: + fee = tip_amount * self.tip_fee_rate + self.deposit(fee) + return fee + + +class SocialMiningEngine: + """Core engine coordinating Beacon verification, rewards, and epochs.""" + + def __init__(self) -> None: + self.beacons: dict[str, BeaconID] = {} + self.epochs: list[Epoch] = [] + self.current_epoch: Optional[Epoch] = None + self.reward_records: list[RewardRecord] = [] + self.treasury = TreasuryPool() + self._action_history: dict[str, list[str]] = {} # beacon_hash -> [action_ids] + + def register_beacon(self, public_key: str) -> BeaconID: + beacon = BeaconID.generate_from_key(public_key) + self.beacons[beacon.beacon_hash] = beacon + return beacon + + def get_beacon(self, beacon_hash: str) -> Optional[BeaconID]: + return self.beacons.get(beacon_hash) + + def start_epoch(self, duration_seconds: float) -> Epoch: + if self.current_epoch and self.current_epoch.is_active(): + self.current_epoch.settle() + now = time.time() + epoch = Epoch( + epoch_id=len(self.epochs), + start_time=now, + end_time=now + duration_seconds, + ) + self.epochs.append(epoch) + self.current_epoch = epoch + return epoch + + def settle_current_epoch(self) -> Optional[Epoch]: + if self.current_epoch and self.current_epoch.is_active(): + self.current_epoch.settle() + settled = self.current_epoch + self.current_epoch = None + return settled + return None + + def record_reward(self, action: SocialAction, rtc_amount: float) -> RewardRecord: + if self.current_epoch is None: + raise RuntimeError("No active epoch to record rewards in") + record = RewardRecord( + action_id=action.action_id, + beacon_hash=action.beacon.beacon_hash, + platform=action.platform, + action_type=action.action_type, + rtc_amount=rtc_amount, + epoch_id=self.current_epoch.epoch_id, + ) + self.reward_records.append(record) + self.treasury.withdraw_reward(rtc_amount) + self._action_history.setdefault(action.beacon.beacon_hash, []).append(action.action_id) + return record + + def get_rewards_for_epoch(self, epoch_id: int) -> list[RewardRecord]: + return [r for r in self.reward_records if r.epoch_id == epoch_id] + + def get_rewards_for_beacon(self, beacon_hash: str) -> list[RewardRecord]: + return [r for r in self.reward_records if r.beacon_hash == beacon_hash] + + def total_rewards_for_epoch(self, epoch_id: int) -> float: + return sum(r.rtc_amount for r in self.get_rewards_for_epoch(epoch_id)) + + def get_actions_today(self, beacon_hash: str, action_type: ActionType) -> int: + """Count actions of a given type for a beacon in the current day.""" + today_start = self._today_start() + count = 0 + for record in self.reward_records: + if record.beacon_hash == beacon_hash and record.action_type == action_type: + if record.timestamp >= today_start: + count += 1 + return count + + @staticmethod + def _today_start() -> float: + t = time.time() + return t - (t % 86400) diff --git a/tools/social_mining/test_social_mining.py b/tools/social_mining/test_social_mining.py new file mode 100644 index 000000000..652b5d81d --- /dev/null +++ b/tools/social_mining/test_social_mining.py @@ -0,0 +1,492 @@ +"""Comprehensive tests for RIP-310 Social Mining Protocol.""" + +import time +import unittest +from unittest.mock import patch + +from social_mining import ( + ActionType, + BeaconID, + Epoch, + EpochState, + Platform, + RewardRecord, + SocialAction, + SocialMiningEngine, + TreasuryPool, +) +from platform_rewards import ( + REWARD_TABLE, + TIP_FEE_RATE, + PlatformRewardCalculator, + RewardRule, +) +from anti_gaming import ( + FREQUENCY_LIMITS, + METRIC_ROTATION_WEIGHTS, + MIN_COMMENT_LENGTH, + AntiGamingModule, + FrequencyTracker, +) + + +class TestBeaconID(unittest.TestCase): + def test_valid_beacon(self): + beacon = BeaconID(beacon_hash="a" * 64, public_key="test_key") + self.assertTrue(beacon.verify()) + + def test_inactive_beacon(self): + beacon = BeaconID(beacon_hash="a" * 64, public_key="test_key", is_active=False) + self.assertFalse(beacon.verify()) + + def test_short_hash_rejected(self): + beacon = BeaconID(beacon_hash="short", public_key="test_key") + self.assertFalse(beacon.verify()) + + def test_empty_hash_rejected(self): + beacon = BeaconID(beacon_hash="", public_key="test_key") + self.assertFalse(beacon.verify()) + + def test_generate_from_key(self): + beacon = BeaconID.generate_from_key("my_public_key") + self.assertEqual(len(beacon.beacon_hash), 64) + self.assertTrue(beacon.verify()) + + def test_deterministic_generation(self): + b1 = BeaconID.generate_from_key("key") + b2 = BeaconID.generate_from_key("key") + self.assertEqual(b1.beacon_hash, b2.beacon_hash) + + +class TestTreasuryPool(unittest.TestCase): + def test_deposit(self): + pool = TreasuryPool() + pool.deposit(100.0) + self.assertEqual(pool.balance, 100.0) + self.assertEqual(pool.total_inflow, 100.0) + + def test_withdraw(self): + pool = TreasuryPool() + pool.deposit(100.0) + self.assertTrue(pool.withdraw_reward(30.0)) + self.assertEqual(pool.balance, 70.0) + self.assertEqual(pool.total_outflow, 30.0) + + def test_insufficient_withdraw(self): + pool = TreasuryPool() + pool.deposit(10.0) + self.assertFalse(pool.withdraw_reward(20.0)) + self.assertEqual(pool.balance, 10.0) + + def test_tip_fee_collection(self): + pool = TreasuryPool() + fee = pool.collect_tip_fee(100.0) + self.assertAlmostEqual(fee, 8.0) + self.assertAlmostEqual(pool.balance, 8.0) + + def test_tip_fee_rate(self): + self.assertAlmostEqual(TIP_FEE_RATE, 0.08) + + +class TestEpoch(unittest.TestCase): + def test_active_epoch(self): + now = time.time() + epoch = Epoch(epoch_id=0, start_time=now, end_time=now + 86400) + self.assertTrue(epoch.is_active()) + self.assertTrue(epoch.is_within(now)) + + def test_settle(self): + now = time.time() + epoch = Epoch(epoch_id=0, start_time=now, end_time=now + 86400) + epoch.settle() + self.assertEqual(epoch.state, EpochState.SETTLED) + self.assertFalse(epoch.is_active()) + + def test_finalize(self): + now = time.time() + epoch = Epoch(epoch_id=0, start_time=now, end_time=now + 86400) + epoch.settle() + epoch.finalize() + self.assertEqual(epoch.state, EpochState.FINALIZED) + + def test_cannot_settle_finalized(self): + now = time.time() + epoch = Epoch(epoch_id=0, start_time=now, end_time=now + 86400) + epoch.state = EpochState.FINALIZED + epoch.settle() + self.assertEqual(epoch.state, EpochState.FINALIZED) + + +class TestSocialMiningEngine(unittest.TestCase): + def setUp(self): + self.engine = SocialMiningEngine() + + def test_register_beacon(self): + beacon = self.engine.register_beacon("pub_key_123") + self.assertIn(beacon.beacon_hash, self.engine.beacons) + self.assertTrue(beacon.verify()) + + def test_get_beacon(self): + beacon = self.engine.register_beacon("pub_key_456") + retrieved = self.engine.get_beacon(beacon.beacon_hash) + self.assertEqual(retrieved.beacon_hash, beacon.beacon_hash) + + def test_start_epoch(self): + epoch = self.engine.start_epoch(86400) + self.assertTrue(epoch.is_active()) + self.assertEqual(self.engine.current_epoch.epoch_id, 0) + + def test_settle_current_epoch(self): + self.engine.start_epoch(86400) + settled = self.engine.settle_current_epoch() + self.assertIsNotNone(settled) + self.assertEqual(settled.state, EpochState.SETTLED) + self.assertIsNone(self.engine.current_epoch) + + def test_record_reward(self): + self.engine.start_epoch(86400) + beacon = BeaconID(beacon_hash="a" * 64, public_key="key") + action = SocialAction( + action_id="act_1", + platform=Platform.MOLTBOOK, + action_type=ActionType.POST, + beacon=beacon, + content_hash="hash1", + ) + record = self.engine.record_reward(action, 0.01) + self.assertEqual(record.rtc_amount, 0.01) + self.assertEqual(record.epoch_id, 0) + + def test_reward_without_epoch_raises(self): + beacon = BeaconID(beacon_hash="a" * 64, public_key="key") + action = SocialAction( + action_id="act_1", + platform=Platform.MOLTBOOK, + action_type=ActionType.POST, + beacon=beacon, + content_hash="hash1", + ) + with self.assertRaises(RuntimeError): + self.engine.record_reward(action, 0.01) + + def test_rewards_for_epoch(self): + self.engine.start_epoch(86400) + beacon = BeaconID(beacon_hash="a" * 64, public_key="key") + for i in range(3): + action = SocialAction( + action_id=f"act_{i}", + platform=Platform.MOLTBOOK, + action_type=ActionType.POST, + beacon=beacon, + content_hash=f"hash{i}", + ) + self.engine.record_reward(action, 0.01) + rewards = self.engine.get_rewards_for_epoch(0) + self.assertEqual(len(rewards), 3) + self.assertAlmostEqual(self.engine.total_rewards_for_epoch(0), 0.03) + + def test_rewards_for_beacon(self): + self.engine.start_epoch(86400) + b1 = BeaconID(beacon_hash="a" * 64, public_key="k1") + b2 = BeaconID(beacon_hash="b" * 64, public_key="k2") + a1 = SocialAction( + action_id="a1", platform=Platform.MOLTBOOK, + action_type=ActionType.POST, beacon=b1, content_hash="h1", + ) + a2 = SocialAction( + action_id="a2", platform=Platform.FOURCLAW, + action_type=ActionType.THREAD, beacon=b2, content_hash="h2", + ) + self.engine.record_reward(a1, 0.01) + self.engine.record_reward(a2, 0.01) + self.assertEqual(len(self.engine.get_rewards_for_beacon(b1.beacon_hash)), 1) + self.assertEqual(len(self.engine.get_rewards_for_beacon(b2.beacon_hash)), 1) + + +class TestRewardTable(unittest.TestCase): + def test_moltbook_post_rate(self): + rule = REWARD_TABLE[(Platform.MOLTBOOK, ActionType.POST)] + self.assertAlmostEqual(rule.rtc_reward, 0.01) + self.assertEqual(rule.daily_cap, 5) + + def test_fourclaw_thread_rate(self): + rule = REWARD_TABLE[(Platform.FOURCLAW, ActionType.THREAD)] + self.assertAlmostEqual(rule.rtc_reward, 0.01) + self.assertEqual(rule.daily_cap, 5) + + def test_botube_video_rate(self): + rule = REWARD_TABLE[(Platform.BOTUBE, ActionType.VIDEO)] + self.assertAlmostEqual(rule.rtc_reward, 0.05) + self.assertEqual(rule.daily_cap, 3) + + def test_comment_rate(self): + for platform in Platform: + rule = REWARD_TABLE[(platform, ActionType.COMMENT)] + self.assertAlmostEqual(rule.rtc_reward, 0.002) + self.assertEqual(rule.daily_cap, 20) + + def test_upvote_rate(self): + for platform in Platform: + rule = REWARD_TABLE[(platform, ActionType.UPVOTE_RECEIVED)] + self.assertAlmostEqual(rule.rtc_reward, 0.001) + self.assertIsNone(rule.daily_cap) + + +class TestPlatformRewardCalculator(unittest.TestCase): + def setUp(self): + self.engine = SocialMiningEngine() + self.calc = PlatformRewardCalculator(self.engine) + self.engine.start_epoch(86400) + self.beacon = BeaconID(beacon_hash="a" * 64, public_key="key") + + def test_moltbook_post_reward(self): + action = SocialAction( + action_id="p1", platform=Platform.MOLTBOOK, + action_type=ActionType.POST, beacon=self.beacon, content_hash="h1", + ) + reward = self.calc.process_action(action) + self.assertAlmostEqual(reward, 0.01) + + def test_comment_too_short(self): + action = SocialAction( + action_id="c1", platform=Platform.MOLTBOOK, + action_type=ActionType.COMMENT, beacon=self.beacon, + content_hash="h2", comment_length=30, + ) + reward = self.calc.process_action(action) + self.assertAlmostEqual(reward, 0.0) + + def test_comment_valid(self): + action = SocialAction( + action_id="c2", platform=Platform.MOLTBOOK, + action_type=ActionType.COMMENT, beacon=self.beacon, + content_hash="h3", comment_length=51, + ) + reward = self.calc.process_action(action) + self.assertAlmostEqual(reward, 0.002) + + def test_tip_reward(self): + action = SocialAction( + action_id="t1", platform=Platform.MOLTBOOK, + action_type=ActionType.TIP_RECEIVED, beacon=self.beacon, + content_hash="h4", tip_amount=10.0, + ) + reward = self.calc.process_action(action) + self.assertAlmostEqual(reward, 9.2) # 10 * (1 - 0.08) + self.assertAlmostEqual(self.engine.treasury.balance, 0.8) + + def test_tip_zero_amount(self): + action = SocialAction( + action_id="t2", platform=Platform.MOLTBOOK, + action_type=ActionType.TIP_RECEIVED, beacon=self.beacon, + content_hash="h5", tip_amount=0.0, + ) + reward = self.calc.process_action(action) + self.assertAlmostEqual(reward, 0.0) + + def test_invalid_beacon_rejected(self): + inactive = BeaconID(beacon_hash="b" * 64, public_key="k", is_active=False) + action = SocialAction( + action_id="x1", platform=Platform.MOLTBOOK, + action_type=ActionType.POST, beacon=inactive, content_hash="h6", + ) + reward = self.calc.process_action(action) + self.assertAlmostEqual(reward, 0.0) + + def test_daily_cap_enforced(self): + for i in range(5): + action = SocialAction( + action_id=f"cap_{i}", platform=Platform.MOLTBOOK, + action_type=ActionType.POST, beacon=self.beacon, + content_hash=f"cap_h{i}", + ) + self.assertAlmostEqual(self.calc.process_action(action), 0.01) + overflow = SocialAction( + action_id="cap_overflow", platform=Platform.MOLTBOOK, + action_type=ActionType.POST, beacon=self.beacon, + content_hash="cap_overflow_h", + ) + self.assertAlmostEqual(self.calc.process_action(overflow), 0.0) + + +class TestAntiGaming(unittest.TestCase): + def setUp(self): + self.engine = SocialMiningEngine() + self.anti = AntiGamingModule(self.engine) + self.beacon = BeaconID(beacon_hash="a" * 64, public_key="key") + + def test_frequency_cap_check(self): + allowed, count = self.anti.check_frequency_cap( + self.beacon.beacon_hash, ActionType.POST + ) + self.assertTrue(allowed) + self.assertEqual(count, 0) + + def test_frequency_limit_exists(self): + self.assertEqual(FREQUENCY_LIMITS[ActionType.POST], 5) + self.assertEqual(FREQUENCY_LIMITS[ActionType.THREAD], 5) + self.assertEqual(FREQUENCY_LIMITS[ActionType.VIDEO], 3) + self.assertEqual(FREQUENCY_LIMITS[ActionType.COMMENT], 20) + + def test_content_quality_pass(self): + action = SocialAction( + action_id="q1", platform=Platform.MOLTBOOK, + action_type=ActionType.COMMENT, beacon=self.beacon, + content_hash="q1h", comment_length=51, + ) + self.assertTrue(self.anti.check_content_quality(action)) + + def test_content_quality_fail(self): + action = SocialAction( + action_id="q2", platform=Platform.MOLTBOOK, + action_type=ActionType.COMMENT, beacon=self.beacon, + content_hash="q2h", comment_length=50, + ) + self.assertFalse(self.anti.check_content_quality(action)) + + def test_non_comment_always_passes_quality(self): + action = SocialAction( + action_id="q3", platform=Platform.MOLTBOOK, + action_type=ActionType.POST, beacon=self.beacon, + content_hash="q3h", comment_length=5, + ) + self.assertTrue(self.anti.check_content_quality(action)) + + def test_duplicate_content_detected(self): + action = SocialAction( + action_id="d1", platform=Platform.MOLTBOOK, + action_type=ActionType.POST, beacon=self.beacon, + content_hash="dup_hash", + ) + self.assertTrue(self.anti.check_duplicate_content(action)) + self.assertFalse(self.anti.check_duplicate_content(action)) + + def test_validate_action_passes(self): + action = SocialAction( + action_id="v1", platform=Platform.MOLTBOOK, + action_type=ActionType.POST, beacon=self.beacon, + content_hash="v1h", + ) + allowed, reason = self.anti.validate_action(action) + self.assertTrue(allowed) + self.assertEqual(reason, "passed") + + def test_validate_action_invalid_beacon(self): + inactive = BeaconID(beacon_hash="b" * 64, public_key="k", is_active=False) + action = SocialAction( + action_id="v2", platform=Platform.MOLTBOOK, + action_type=ActionType.POST, beacon=inactive, content_hash="v2h", + ) + allowed, reason = self.anti.validate_action(action) + self.assertFalse(allowed) + self.assertIn("Beacon", reason) + + def test_validate_action_flagged_beacon(self): + self.anti.flag_beacon(self.beacon.beacon_hash) + action = SocialAction( + action_id="v3", platform=Platform.MOLTBOOK, + action_type=ActionType.POST, beacon=self.beacon, content_hash="v3h", + ) + allowed, _ = self.anti.validate_action(action) + self.assertFalse(allowed) + + def test_flag_unflag_beacon(self): + self.anti.flag_beacon(self.beacon.beacon_hash) + self.assertIn(self.beacon.beacon_hash, self.anti.flagged_beacons) + self.anti.unflag_beacon(self.beacon.beacon_hash) + self.assertNotIn(self.beacon.beacon_hash, self.anti.flagged_beacons) + + def test_metric_rotation_weights(self): + w0 = self.anti.get_metric_weights(0) + w1 = self.anti.get_metric_weights(1) + w2 = self.anti.get_metric_weights(2) + w3 = self.anti.get_metric_weights(3) + self.assertNotEqual(w0["engagement"], w1["engagement"]) + self.assertEqual(w0, w3) + + def test_rotation_score_calculation(self): + score = self.anti.calculate_rotation_score(0, 1.0, 1.0, 1.0) + self.assertAlmostEqual(score, 1.0) + score2 = self.anti.calculate_rotation_score(0, 1.0, 0.0, 0.0) + self.assertAlmostEqual(score2, 0.4) + + def test_frequency_tracker(self): + tracker = FrequencyTracker() + for _ in range(3): + tracker.increment("beacon1", ActionType.POST) + self.assertEqual(tracker.get_count("beacon1", ActionType.POST), 3) + + +class TestIntegration(unittest.TestCase): + """End-to-end integration tests.""" + + def setUp(self): + self.engine = SocialMiningEngine() + self.calc = PlatformRewardCalculator(self.engine) + self.anti = AntiGamingModule(self.engine) + self.beacon = self.engine.register_beacon("integration_pub_key") + self.engine.start_epoch(86400) + + def test_full_flow_post(self): + self.engine.treasury.deposit(100.0) + action = SocialAction( + action_id="int_1", platform=Platform.MOLTBOOK, + action_type=ActionType.POST, beacon=self.beacon, + content_hash="int_h1", + ) + allowed, _ = self.anti.validate_action(action) + self.assertTrue(allowed) + self.anti.record_action(action) + reward = self.calc.process_action(action) + self.assertAlmostEqual(reward, 0.01) + self.assertAlmostEqual(self.engine.treasury.balance, 99.99) + + def test_full_flow_tip(self): + action = SocialAction( + action_id="int_t1", platform=Platform.MOLTBOOK, + action_type=ActionType.TIP_RECEIVED, beacon=self.beacon, + content_hash="int_th1", tip_amount=50.0, + ) + allowed, _ = self.anti.validate_action(action) + self.assertTrue(allowed) + reward = self.calc.process_action(action) + self.assertAlmostEqual(reward, 46.0) + self.assertAlmostEqual(self.engine.treasury.balance, 4.0) + + def test_frequency_cap_integration(self): + video_cap = FREQUENCY_LIMITS[ActionType.VIDEO] # 3 + for i in range(video_cap): + action = SocialAction( + action_id=f"int_cap_{i}", platform=Platform.BOTUBE, + action_type=ActionType.VIDEO, beacon=self.beacon, + content_hash=f"int_cap_h{i}", + ) + self.anti.validate_action(action) + self.anti.record_action(action) + self.assertAlmostEqual(self.calc.process_action(action), 0.05) + overflow = SocialAction( + action_id="int_cap_overflow", platform=Platform.BOTUBE, + action_type=ActionType.VIDEO, beacon=self.beacon, + content_hash="int_cap_overflow_h", + ) + allowed, reason = self.anti.validate_action(overflow) + self.assertFalse(allowed) + self.assertIn("cap", reason.lower()) + + def test_epoch_settlement(self): + for i in range(3): + action = SocialAction( + action_id=f"ep_{i}", platform=Platform.FOURCLAW, + action_type=ActionType.THREAD, beacon=self.beacon, + content_hash=f"ep_h{i}", + ) + self.anti.validate_action(action) + self.calc.process_action(action) + settled = self.engine.settle_current_epoch() + self.assertIsNotNone(settled) + self.assertEqual(len(self.engine.get_rewards_for_epoch(0)), 3) + self.assertAlmostEqual(self.engine.total_rewards_for_epoch(0), 0.03) + + +if __name__ == "__main__": + unittest.main() diff --git a/vintage_ai_video_pipeline/rustchain_client.py b/vintage_ai_video_pipeline/rustchain_client.py index 86e7668b7..411495349 100644 --- a/vintage_ai_video_pipeline/rustchain_client.py +++ b/vintage_ai_video_pipeline/rustchain_client.py @@ -28,6 +28,7 @@ class RustChainClient: def __init__( self, base_url: str = DEFAULT_BASE_URL, + admin_key: Optional[str] = None, verify_ssl: bool = False, timeout: int = 30, retry_count: int = 3, @@ -38,12 +39,14 @@ def __init__( Args: base_url: Base URL of the RustChain API + admin_key: Admin authentication key (used only for write operations) verify_ssl: Enable SSL verification (default: False for self-signed certs) timeout: Request timeout in seconds retry_count: Number of retries on failure retry_delay: Delay between retries (seconds) """ self.base_url = base_url.rstrip("/") + self.admin_key = admin_key self.verify_ssl = verify_ssl self.timeout = timeout self.retry_count = retry_count @@ -60,7 +63,17 @@ def __init__( self._known_miners = {} def _get_headers(self) -> Dict[str, str]: - """Get request headers""" + """Get request headers (includes admin key for write operations)""" + headers = { + "Accept": "application/json", + "User-Agent": "vintage-ai-video-pipeline/1.0.0", + } + if self.admin_key: + headers["X-Admin-Key"] = self.admin_key + return headers + + def _get_public_headers(self) -> Dict[str, str]: + """Get request headers WITHOUT admin key (for read-only operations)""" return { "Accept": "application/json", "User-Agent": "vintage-ai-video-pipeline/1.0.0", @@ -124,6 +137,62 @@ def _request( raise Exception("Max retries exceeded") + def _request_public( + self, + method: str, + endpoint: str, + ) -> Dict[str, Any]: + """Make an HTTP request WITHOUT the admin key header (read-only operations). + + This ensures that read operations (GET requests for balances, miner lists, + etc.) never send admin credentials, following the principle of least privilege. + """ + url = f"{self.base_url}{endpoint}" + headers = self._get_public_headers() + + for attempt in range(self.retry_count): + try: + req = Request(url, headers=headers, method=method) + + with urllib.request.urlopen( + req, + context=self._ctx, + timeout=self.timeout + ) as response: + raw = response.read() + response_data = raw.decode("utf-8").strip() if raw else "" + if not response_data: + return {} + return json.loads(response_data) + + except HTTPError as e: + error_body = e.read().decode("utf-8") if e.fp else "" + if attempt == self.retry_count - 1: + raise Exception( + f"HTTP Error {e.code}: {e.reason} - {error_body}" + ) + except URLError as e: + if attempt == self.retry_count - 1: + raise Exception(f"Connection Error: {e.reason}") + except json.JSONDecodeError as e: + if attempt == self.retry_count - 1: + raise Exception(f"Invalid JSON response: {str(e)}") + except Exception: + if attempt == self.retry_count - 1: + raise + + if attempt < self.retry_count - 1: + time.sleep(self.retry_delay * (attempt + 1)) + + raise Exception("Max retries exceeded") + + def _get_public(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]: + """GET request without admin key (read-only operations)""" + if params: + query = urllib.parse.urlencode(params) + endpoint = f"{endpoint}?{query}" + return self._request_public("GET", endpoint) + def _get(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]: """GET request with query parameters""" if params: @@ -132,21 +201,21 @@ def _get(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]: return self._request("GET", endpoint) def health(self) -> Dict[str, Any]: - """Check node health""" - return self._get("/health") + """Check node health (public, no admin key)""" + return self._get_public("/health") def get_epoch(self) -> Dict[str, Any]: - """Get current epoch information""" - return self._get("/epoch") + """Get current epoch information (public, no admin key)""" + return self._get_public("/epoch") def get_miners(self) -> List[Dict[str, Any]]: """ - List all active miners - + List all active miners (public, no admin key) + Returns: List of miner information dictionaries """ - data = self._get("/api/miners") + data = self._get_public("/api/miners") if isinstance(data, list): return data if isinstance(data, dict): @@ -157,24 +226,24 @@ def get_miners(self) -> List[Dict[str, Any]]: return [] def get_miner_eligibility(self, miner_id: str) -> Dict[str, Any]: - """Check miner's epoch eligibility""" - return self._get("/lottery/eligibility", params={"miner_id": miner_id}) + """Check miner's epoch eligibility (public, no admin key)""" + return self._get_public("/lottery/eligibility", params={"miner_id": miner_id}) def get_wallet_balance(self, miner_id: str) -> Dict[str, Any]: - """Get wallet balance for a miner""" - return self._get("/wallet/balance", params={"miner_id": miner_id}) + """Get wallet balance for a miner (public, no admin key)""" + return self._get_public("/wallet/balance", params={"miner_id": miner_id}) def get_wallet_history(self, miner_id: str, limit: int = 10) -> Dict[str, Any]: - """Get transaction history for a miner""" - return self._get("/wallet/history", params={"miner_id": miner_id, "limit": limit}) + """Get transaction history for a miner (public, no admin key)""" + return self._get_public("/wallet/history", params={"miner_id": miner_id, "limit": limit}) def get_stats(self) -> Dict[str, Any]: - """Get network statistics""" - return self._get("/api/stats") + """Get network statistics (public, no admin key)""" + return self._get_public("/api/stats") def get_hall_of_fame(self) -> Dict[str, Any]: - """Get Hall of Fame leaderboard""" - return self._get("/api/hall_of_fame") + """Get Hall of Fame leaderboard (public, no admin key)""" + return self._get_public("/api/hall_of_fame") def monitor_attestations( self,