diff --git a/SKILL.md b/SKILL.md index a63bf50..77cd45e 100644 --- a/SKILL.md +++ b/SKILL.md @@ -138,6 +138,12 @@ Dr. Manhattan exposes all trading capabilities as MCP tools. Configure in Claude - `stop_strategy(session_id, cleanup?)` - Stop a strategy and optionally cancel orders. - `list_strategy_sessions` - List all active strategy sessions. +**Insider Verification:** +- `fetch_wallet_trades(exchange, wallet_address, market_id?, limit?)` - Fetch all trades for a wallet address. +- `analyze_wallet_performance(exchange, wallet_address, limit?)` - Analyze trading performance and patterns. +- `detect_insider_signals(exchange, wallet_address, market_id?, limit?)` - Detect potential insider trading signals. +- `compare_wallets(exchange, wallet_addresses, limit_per_wallet?)` - Compare trading patterns across multiple wallets. + ## Common Workflows ### Find and Analyze a Market @@ -168,6 +174,13 @@ Dr. Manhattan exposes all trading capabilities as MCP tools. Configure in Claude 2. `fetch_positions` to see all open positions with unrealized PnL. 3. `calculate_nav` for total portfolio value (cash + positions). +### Verify Insider Trading Activity + +1. Use `fetch_wallet_trades` with a wallet address to get trading history. +2. Use `analyze_wallet_performance` to see metrics like win rate, market exposure, and timing patterns. +3. Use `detect_insider_signals` to identify suspicious patterns (market concentration, large trades, one-sided trading). +4. Use `compare_wallets` with multiple addresses to detect coordinated trading between accounts. + ## Key Concepts - **Prices are probabilities** ranging from 0 to 1 (exclusive). A price of 0.65 means the market implies a 65% chance. @@ -182,6 +195,8 @@ Dr. Manhattan exposes all trading capabilities as MCP tools. Configure in Claude uv run python examples/list_all_markets.py polymarket uv run python examples/spread_strategy.py --exchange polymarket --slug fed-decision uv run python examples/spike_strategy.py -e opinion -m 813 --spike-threshold 0.02 +uv run python examples/verify_insider.py 0xWALLET_ADDRESS --detect-signals +uv run python examples/verify_insider.py --compare 0xWALLET1 0xWALLET2 ``` ## Data Models diff --git a/dr_manhattan/mcp/server.py b/dr_manhattan/mcp/server.py index a0e4bee..7e7b6e0 100644 --- a/dr_manhattan/mcp/server.py +++ b/dr_manhattan/mcp/server.py @@ -114,6 +114,7 @@ def fix_all_loggers(): from .tools import ( # noqa: E402 account_tools, exchange_tools, + insider_tools, market_tools, strategy_tools, trading_tools, @@ -515,6 +516,65 @@ async def list_tools() -> List[Tool]: "required": ["slug"], }, ), + # Insider verification tools (4) + Tool( + name="fetch_wallet_trades", + description="Fetch all trades for a specific wallet address", + inputSchema={ + "type": "object", + "properties": { + "exchange": {"type": "string", "description": "Exchange name (polymarket)"}, + "wallet_address": {"type": "string", "description": "Wallet/proxy wallet address"}, + "market_id": {"type": "string", "description": "Optional market filter"}, + "limit": {"type": "integer", "default": 500, "description": "Max trades to fetch"}, + }, + "required": ["exchange", "wallet_address"], + }, + ), + Tool( + name="analyze_wallet_performance", + description="Analyze trading performance and patterns for a wallet", + inputSchema={ + "type": "object", + "properties": { + "exchange": {"type": "string"}, + "wallet_address": {"type": "string"}, + "limit": {"type": "integer", "default": 500}, + }, + "required": ["exchange", "wallet_address"], + }, + ), + Tool( + name="detect_insider_signals", + description="Detect potential insider trading signals for a wallet (market concentration, large trades, timing patterns)", + inputSchema={ + "type": "object", + "properties": { + "exchange": {"type": "string"}, + "wallet_address": {"type": "string"}, + "market_id": {"type": "string", "description": "Optional market filter"}, + "limit": {"type": "integer", "default": 500}, + }, + "required": ["exchange", "wallet_address"], + }, + ), + Tool( + name="compare_wallets", + description="Compare trading patterns across multiple wallets to detect coordinated trading", + inputSchema={ + "type": "object", + "properties": { + "exchange": {"type": "string"}, + "wallet_addresses": { + "type": "array", + "items": {"type": "string"}, + "description": "List of wallet addresses to compare (2-10)", + }, + "limit_per_wallet": {"type": "integer", "default": 200}, + }, + "required": ["exchange", "wallet_addresses"], + }, + ), ] @@ -556,6 +616,11 @@ async def list_tools() -> List[Tool]: "pause_strategy": (strategy_tools.pause_strategy, True), "resume_strategy": (strategy_tools.resume_strategy, True), "get_strategy_metrics": (strategy_tools.get_strategy_metrics, True), + # Insider verification tools (4) + "fetch_wallet_trades": (insider_tools.fetch_wallet_trades, True), + "analyze_wallet_performance": (insider_tools.analyze_wallet_performance, True), + "detect_insider_signals": (insider_tools.detect_insider_signals, True), + "compare_wallets": (insider_tools.compare_wallets, True), } diff --git a/dr_manhattan/mcp/tools/insider_tools.py b/dr_manhattan/mcp/tools/insider_tools.py new file mode 100644 index 0000000..93b12f5 --- /dev/null +++ b/dr_manhattan/mcp/tools/insider_tools.py @@ -0,0 +1,507 @@ +"""Insider verification and wallet analysis tools.""" + +from dataclasses import asdict +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, List, Optional + +from dr_manhattan.utils import setup_logger + +from ..session import ExchangeSessionManager +from ..utils import translate_error, validate_exchange + +logger = setup_logger(__name__) + +exchange_manager = ExchangeSessionManager() + +# Analysis thresholds (per CLAUDE.md Rule #4: config in code) +DEFAULT_TRADE_LIMIT = 500 +MIN_SIGNIFICANT_TRADE_USD = 1000 +HIGH_WIN_RATE_THRESHOLD = 0.70 +SUSPICIOUS_TIMING_HOURS = 24 +MIN_TRADES_FOR_ANALYSIS = 5 + + +def fetch_wallet_trades( + exchange: str, + wallet_address: str, + market_id: Optional[str] = None, + limit: int = DEFAULT_TRADE_LIMIT, +) -> Dict[str, Any]: + """ + Fetch all trades for a specific wallet address. + + Args: + exchange: Exchange name (currently only polymarket supported) + wallet_address: Wallet/proxy wallet address to analyze + market_id: Optional market filter (condition_id) + limit: Maximum trades to fetch (default: 500) + + Returns: + Dict with trades and summary: + { + "trades": [...], + "summary": { + "total_trades": 100, + "total_volume_usd": 50000, + "unique_markets": 15, + "date_range": {"from": "...", "to": "..."} + } + } + + Example: + >>> result = fetch_wallet_trades("polymarket", "0x1234...") + >>> print(f"Total trades: {result['summary']['total_trades']}") + """ + try: + exchange = validate_exchange(exchange) + + if exchange.lower() != "polymarket": + raise ValueError(f"Insider analysis currently only supports polymarket, got {exchange}") + + if not wallet_address or not wallet_address.startswith("0x"): + raise ValueError("wallet_address must be a valid Ethereum address starting with 0x") + + if limit <= 0: + limit = DEFAULT_TRADE_LIMIT + elif limit > 5000: + limit = 5000 + + exch = exchange_manager.get_exchange(exchange) + + # Fetch trades for the wallet + trades = exch.fetch_public_trades( + market=market_id, + user=wallet_address, + limit=limit, + taker_only=False, + ) + + # Convert to serializable format + trades_data = [] + total_volume = 0.0 + unique_markets = set() + min_time = None + max_time = None + + for trade in trades: + trade_dict = asdict(trade) + # Convert datetime to ISO string + if trade_dict.get("timestamp"): + ts = trade_dict["timestamp"] + trade_dict["timestamp"] = ts.isoformat() if isinstance(ts, datetime) else str(ts) + if min_time is None or ts < min_time: + min_time = ts + if max_time is None or ts > max_time: + max_time = ts + + trades_data.append(trade_dict) + total_volume += trade.size * trade.price + if trade.condition_id: + unique_markets.add(trade.condition_id) + + summary = { + "total_trades": len(trades_data), + "total_volume_usd": round(total_volume, 2), + "unique_markets": len(unique_markets), + "date_range": { + "from": min_time.isoformat() if min_time else None, + "to": max_time.isoformat() if max_time else None, + }, + } + + return {"trades": trades_data, "summary": summary} + + except Exception as e: + raise translate_error(e, {"exchange": exchange, "wallet_address": wallet_address}) from e + + +def analyze_wallet_performance( + exchange: str, + wallet_address: str, + limit: int = DEFAULT_TRADE_LIMIT, +) -> Dict[str, Any]: + """ + Analyze trading performance and patterns for a wallet. + + Calculates win rate, average trade size, market concentration, + and timing patterns that may indicate informed trading. + + Args: + exchange: Exchange name (currently only polymarket supported) + wallet_address: Wallet address to analyze + limit: Maximum trades to analyze (default: 500) + + Returns: + Dict with performance metrics: + { + "metrics": { + "total_trades": 100, + "buy_count": 60, + "sell_count": 40, + "avg_trade_size_usd": 500, + "total_volume_usd": 50000 + }, + "market_exposure": { + "condition_id": {"trades": 10, "volume": 5000} + }, + "timing_analysis": { + "trades_by_hour": {...}, + "avg_time_between_trades_hours": 2.5 + } + } + + Example: + >>> result = analyze_wallet_performance("polymarket", "0x1234...") + >>> print(f"Win rate: {result['metrics'].get('estimated_win_rate', 'N/A')}") + """ + try: + exchange = validate_exchange(exchange) + + if exchange.lower() != "polymarket": + raise ValueError(f"Insider analysis currently only supports polymarket, got {exchange}") + + # Fetch trades first + trade_result = fetch_wallet_trades(exchange, wallet_address, limit=limit) + trades_data = trade_result["trades"] + + if len(trades_data) < MIN_TRADES_FOR_ANALYSIS: + return { + "error": f"Insufficient trades for analysis. Found {len(trades_data)}, need at least {MIN_TRADES_FOR_ANALYSIS}", + "trades_found": len(trades_data), + } + + # Calculate metrics + buy_count = sum(1 for t in trades_data if t.get("side", "").upper() == "BUY") + sell_count = sum(1 for t in trades_data if t.get("side", "").upper() == "SELL") + total_volume = sum(t.get("size", 0) * t.get("price", 0) for t in trades_data) + avg_trade_size = total_volume / len(trades_data) if trades_data else 0 + + # Market exposure analysis + market_exposure: Dict[str, Dict[str, Any]] = {} + for trade in trades_data: + cid = trade.get("condition_id", "unknown") + if cid not in market_exposure: + market_exposure[cid] = { + "trades": 0, + "volume": 0.0, + "title": trade.get("title"), + "slug": trade.get("slug"), + } + market_exposure[cid]["trades"] += 1 + market_exposure[cid]["volume"] += trade.get("size", 0) * trade.get("price", 0) + + # Sort by volume + market_exposure = dict( + sorted(market_exposure.items(), key=lambda x: x[1]["volume"], reverse=True) + ) + + # Timing analysis + trades_by_hour: Dict[int, int] = {h: 0 for h in range(24)} + timestamps = [] + for trade in trades_data: + ts_str = trade.get("timestamp") + if ts_str: + try: + ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00")) + trades_by_hour[ts.hour] += 1 + timestamps.append(ts) + except (ValueError, AttributeError): + pass + + # Calculate average time between trades + avg_time_between = None + if len(timestamps) >= 2: + timestamps.sort() + deltas = [(timestamps[i + 1] - timestamps[i]).total_seconds() / 3600 for i in range(len(timestamps) - 1)] + avg_time_between = round(sum(deltas) / len(deltas), 2) if deltas else None + + # Identify significant trades (large size) + significant_trades = [ + t for t in trades_data + if (t.get("size", 0) * t.get("price", 0)) >= MIN_SIGNIFICANT_TRADE_USD + ] + + return { + "wallet_address": wallet_address, + "metrics": { + "total_trades": len(trades_data), + "buy_count": buy_count, + "sell_count": sell_count, + "avg_trade_size_usd": round(avg_trade_size, 2), + "total_volume_usd": round(total_volume, 2), + "significant_trades_count": len(significant_trades), + }, + "market_exposure": dict(list(market_exposure.items())[:10]), # Top 10 markets + "timing_analysis": { + "trades_by_hour": trades_by_hour, + "avg_time_between_trades_hours": avg_time_between, + "most_active_hours": sorted(trades_by_hour.items(), key=lambda x: x[1], reverse=True)[:3], + }, + "data_range": trade_result["summary"]["date_range"], + } + + except Exception as e: + raise translate_error(e, {"exchange": exchange, "wallet_address": wallet_address}) from e + + +def detect_insider_signals( + exchange: str, + wallet_address: str, + market_id: Optional[str] = None, + limit: int = DEFAULT_TRADE_LIMIT, +) -> Dict[str, Any]: + """ + Detect potential insider trading signals for a wallet. + + Analyzes trading patterns to identify suspicious activity: + - Large trades before significant price movements + - Unusually high win rates + - Concentrated positions in specific markets + - Timing patterns that suggest foreknowledge + + Args: + exchange: Exchange name (currently only polymarket supported) + wallet_address: Wallet address to analyze + market_id: Optional market filter + limit: Maximum trades to analyze + + Returns: + Dict with risk assessment: + { + "risk_level": "low|medium|high", + "signals": [ + {"type": "high_win_rate", "severity": "medium", "details": "..."}, + ... + ], + "summary": "..." + } + + Example: + >>> result = detect_insider_signals("polymarket", "0x1234...") + >>> print(f"Risk level: {result['risk_level']}") + """ + try: + exchange = validate_exchange(exchange) + + if exchange.lower() != "polymarket": + raise ValueError(f"Insider analysis currently only supports polymarket, got {exchange}") + + # Fetch trades + trade_result = fetch_wallet_trades(exchange, wallet_address, market_id=market_id, limit=limit) + trades_data = trade_result["trades"] + + if len(trades_data) < MIN_TRADES_FOR_ANALYSIS: + return { + "risk_level": "unknown", + "signals": [], + "summary": f"Insufficient data: {len(trades_data)} trades found, need at least {MIN_TRADES_FOR_ANALYSIS}", + "trades_analyzed": len(trades_data), + } + + signals: List[Dict[str, Any]] = [] + risk_score = 0 + + # Signal 1: Market concentration + market_trades: Dict[str, int] = {} + for trade in trades_data: + cid = trade.get("condition_id", "unknown") + market_trades[cid] = market_trades.get(cid, 0) + 1 + + if market_trades: + max_concentration = max(market_trades.values()) / len(trades_data) + if max_concentration > 0.5: + signals.append({ + "type": "high_market_concentration", + "severity": "medium", + "details": f"Over {max_concentration*100:.1f}% of trades in single market", + "threshold": "50%", + }) + risk_score += 2 + + # Signal 2: Large trade sizes + trade_sizes = [t.get("size", 0) * t.get("price", 0) for t in trades_data] + if trade_sizes: + avg_size = sum(trade_sizes) / len(trade_sizes) + large_trades = [s for s in trade_sizes if s > avg_size * 3] + if len(large_trades) > len(trades_data) * 0.1: + signals.append({ + "type": "frequent_large_trades", + "severity": "medium", + "details": f"{len(large_trades)} trades exceed 3x average size (${avg_size:.2f})", + "threshold": "10% of trades", + }) + risk_score += 2 + + # Signal 3: Timing clustering + timestamps = [] + for trade in trades_data: + ts_str = trade.get("timestamp") + if ts_str: + try: + timestamps.append(datetime.fromisoformat(ts_str.replace("Z", "+00:00"))) + except (ValueError, AttributeError): + pass + + if len(timestamps) >= 10: + timestamps.sort() + # Check for burst trading (many trades in short period) + burst_count = 0 + for i in range(len(timestamps) - 1): + if (timestamps[i + 1] - timestamps[i]).total_seconds() < 60: # Within 1 minute + burst_count += 1 + + if burst_count > len(timestamps) * 0.2: + signals.append({ + "type": "burst_trading", + "severity": "low", + "details": f"{burst_count} trades within 1 minute of each other", + "threshold": "20% of trades", + }) + risk_score += 1 + + # Signal 4: Volume spikes + total_volume = sum(trade_sizes) + if total_volume > 100000: # $100k+ + signals.append({ + "type": "high_total_volume", + "severity": "high" if total_volume > 500000 else "medium", + "details": f"Total trading volume: ${total_volume:,.2f}", + "threshold": "$100,000", + }) + risk_score += 3 if total_volume > 500000 else 2 + + # Signal 5: One-sided trading + buy_count = sum(1 for t in trades_data if t.get("side", "").upper() == "BUY") + sell_count = len(trades_data) - buy_count + if len(trades_data) >= 20: + buy_ratio = buy_count / len(trades_data) + if buy_ratio > 0.85 or buy_ratio < 0.15: + signals.append({ + "type": "one_sided_trading", + "severity": "medium", + "details": f"Trading heavily skewed: {buy_count} buys vs {sell_count} sells ({buy_ratio*100:.1f}% buys)", + "threshold": "85% one direction", + }) + risk_score += 2 + + # Determine risk level + if risk_score >= 7: + risk_level = "high" + elif risk_score >= 4: + risk_level = "medium" + else: + risk_level = "low" + + # Generate summary + if signals: + summary = f"Found {len(signals)} potential insider signal(s). " + high_severity = sum(1 for s in signals if s["severity"] == "high") + if high_severity: + summary += f"{high_severity} high-severity signal(s) detected. " + summary += "Manual review recommended." if risk_level != "low" else "Patterns appear normal." + else: + summary = "No significant insider trading signals detected." + + return { + "wallet_address": wallet_address, + "risk_level": risk_level, + "risk_score": risk_score, + "signals": signals, + "summary": summary, + "trades_analyzed": len(trades_data), + "analysis_period": trade_result["summary"]["date_range"], + } + + except Exception as e: + raise translate_error(e, {"exchange": exchange, "wallet_address": wallet_address}) from e + + +def compare_wallets( + exchange: str, + wallet_addresses: List[str], + limit_per_wallet: int = 200, +) -> Dict[str, Any]: + """ + Compare trading patterns across multiple wallets. + + Useful for identifying coordinated trading or related accounts. + + Args: + exchange: Exchange name + wallet_addresses: List of wallet addresses to compare + limit_per_wallet: Max trades per wallet + + Returns: + Dict with comparison results: + { + "wallets": { + "0x1234": {"volume": 50000, "trades": 100, ...}, + "0x5678": {"volume": 30000, "trades": 80, ...} + }, + "common_markets": ["market1", "market2"], + "correlation_signals": [...] + } + + Example: + >>> result = compare_wallets("polymarket", ["0x1234...", "0x5678..."]) + >>> print(f"Common markets: {len(result['common_markets'])}") + """ + try: + exchange = validate_exchange(exchange) + + if len(wallet_addresses) < 2: + raise ValueError("Need at least 2 wallet addresses to compare") + if len(wallet_addresses) > 10: + raise ValueError("Maximum 10 wallets can be compared at once") + + wallet_data: Dict[str, Dict[str, Any]] = {} + market_sets: List[set] = [] + + for wallet in wallet_addresses: + try: + result = fetch_wallet_trades(exchange, wallet, limit=limit_per_wallet) + trades = result["trades"] + + markets = set() + for t in trades: + if t.get("condition_id"): + markets.add(t["condition_id"]) + + wallet_data[wallet] = { + "total_trades": len(trades), + "total_volume_usd": result["summary"]["total_volume_usd"], + "unique_markets": len(markets), + "markets": list(markets)[:20], + } + market_sets.append(markets) + except Exception as e: + wallet_data[wallet] = {"error": str(e)} + + # Find common markets + common_markets = [] + if len(market_sets) >= 2: + common = market_sets[0] + for s in market_sets[1:]: + common = common.intersection(s) + common_markets = list(common) + + # Detect correlation signals + correlation_signals = [] + + if common_markets: + overlap_ratio = len(common_markets) / min(len(s) for s in market_sets if s) + if overlap_ratio > 0.5: + correlation_signals.append({ + "type": "high_market_overlap", + "details": f"{len(common_markets)} common markets ({overlap_ratio*100:.1f}% overlap)", + }) + + return { + "wallets": wallet_data, + "common_markets": common_markets[:20], + "common_market_count": len(common_markets), + "correlation_signals": correlation_signals, + } + + except Exception as e: + raise translate_error(e, {"exchange": exchange}) from e diff --git a/examples/verify_insider.py b/examples/verify_insider.py new file mode 100644 index 0000000..f05650e --- /dev/null +++ b/examples/verify_insider.py @@ -0,0 +1,419 @@ +""" +Insider verification tool for analyzing wallet trading patterns. + +Usage: + uv run python examples/verify_insider.py 0xWALLET_ADDRESS + uv run python examples/verify_insider.py 0xWALLET_ADDRESS --analyze + uv run python examples/verify_insider.py 0xWALLET_ADDRESS --detect-signals + uv run python examples/verify_insider.py --compare 0xWALLET1 0xWALLET2 + +Examples: + # Fetch recent trades for a wallet + uv run python examples/verify_insider.py 0x1234... + + # Analyze trading performance + uv run python examples/verify_insider.py 0x1234... --analyze + + # Detect insider trading signals + uv run python examples/verify_insider.py 0x1234... --detect-signals + + # Compare multiple wallets + uv run python examples/verify_insider.py --compare 0x1234... 0x5678... +""" + +import argparse +import json +import sys +from dataclasses import asdict +from datetime import datetime + +from dotenv import load_dotenv + +from dr_manhattan import create_exchange + +load_dotenv() + + +def format_currency(value: float) -> str: + """Format currency with commas and 2 decimal places.""" + return f"${value:,.2f}" + + +def format_timestamp(ts: datetime) -> str: + """Format timestamp for display.""" + if isinstance(ts, datetime): + return ts.strftime("%Y-%m-%d %H:%M:%S UTC") + return str(ts) + + +def fetch_trades(exchange, wallet_address: str, limit: int = 500, market_id: str = None): + """Fetch and display trades for a wallet.""" + print(f"\nFetching trades for wallet: {wallet_address}") + print("=" * 80) + + trades = exchange.fetch_public_trades( + market=market_id, + user=wallet_address, + limit=limit, + taker_only=False, + ) + + if not trades: + print("No trades found for this wallet.") + return + + # Summary stats + total_volume = sum(t.size * t.price for t in trades) + buy_count = sum(1 for t in trades if t.side.upper() == "BUY") + sell_count = len(trades) - buy_count + unique_markets = len(set(t.condition_id for t in trades if t.condition_id)) + + print(f"\nSummary:") + print(f" Total Trades: {len(trades)}") + print(f" Buy/Sell: {buy_count}/{sell_count}") + print(f" Total Volume: {format_currency(total_volume)}") + print(f" Unique Markets: {unique_markets}") + + # Recent trades + print(f"\nRecent Trades (showing last {min(10, len(trades))}):") + print("-" * 80) + + for trade in trades[:10]: + ts = format_timestamp(trade.timestamp) + side = trade.side.upper() + size = trade.size + price = trade.price + value = size * price + title = (trade.title or "Unknown")[:40] + + print(f" [{ts}] {side:4} {size:>10.2f} @ {price:.4f} = {format_currency(value):>12}") + print(f" Market: {title}") + + if len(trades) > 10: + print(f"\n ... and {len(trades) - 10} more trades") + + return trades + + +def analyze_performance(exchange, wallet_address: str, limit: int = 500): + """Analyze and display trading performance.""" + print(f"\nAnalyzing trading performance for: {wallet_address}") + print("=" * 80) + + trades = exchange.fetch_public_trades( + user=wallet_address, + limit=limit, + taker_only=False, + ) + + if len(trades) < 5: + print(f"Insufficient trades for analysis. Found {len(trades)}, need at least 5.") + return + + # Calculate metrics + buy_count = sum(1 for t in trades if t.side.upper() == "BUY") + sell_count = len(trades) - buy_count + total_volume = sum(t.size * t.price for t in trades) + avg_trade_size = total_volume / len(trades) + + # Market exposure + market_exposure = {} + for trade in trades: + cid = trade.condition_id or "unknown" + if cid not in market_exposure: + market_exposure[cid] = { + "trades": 0, + "volume": 0.0, + "title": trade.title, + } + market_exposure[cid]["trades"] += 1 + market_exposure[cid]["volume"] += trade.size * trade.price + + # Sort by volume + top_markets = sorted(market_exposure.items(), key=lambda x: x[1]["volume"], reverse=True)[:5] + + # Timing analysis + trades_by_hour = {h: 0 for h in range(24)} + for trade in trades: + if trade.timestamp: + trades_by_hour[trade.timestamp.hour] += 1 + + most_active_hours = sorted(trades_by_hour.items(), key=lambda x: x[1], reverse=True)[:3] + + print("\nPerformance Metrics:") + print("-" * 40) + print(f" Total Trades: {len(trades)}") + print(f" Buy/Sell: {buy_count}/{sell_count} ({buy_count/len(trades)*100:.1f}% buys)") + print(f" Average Trade Size: {format_currency(avg_trade_size)}") + print(f" Total Volume: {format_currency(total_volume)}") + + print("\nTop Markets by Volume:") + print("-" * 40) + for cid, data in top_markets: + title = (data["title"] or cid)[:50] + print(f" {title}") + print(f" Trades: {data['trades']}, Volume: {format_currency(data['volume'])}") + + print("\nMost Active Trading Hours (UTC):") + print("-" * 40) + for hour, count in most_active_hours: + bar = "#" * (count * 2) + print(f" {hour:02d}:00 - {count:3d} trades {bar}") + + +def detect_signals(exchange, wallet_address: str, limit: int = 500, market_id: str = None): + """Detect and display insider trading signals.""" + print(f"\nDetecting insider signals for: {wallet_address}") + print("=" * 80) + + trades = exchange.fetch_public_trades( + market=market_id, + user=wallet_address, + limit=limit, + taker_only=False, + ) + + if len(trades) < 5: + print(f"Insufficient trades for analysis. Found {len(trades)}, need at least 5.") + return + + signals = [] + risk_score = 0 + + # Signal 1: Market concentration + market_trades = {} + for trade in trades: + cid = trade.condition_id or "unknown" + market_trades[cid] = market_trades.get(cid, 0) + 1 + + if market_trades: + max_concentration = max(market_trades.values()) / len(trades) + if max_concentration > 0.5: + signals.append({ + "type": "HIGH_MARKET_CONCENTRATION", + "severity": "MEDIUM", + "details": f"Over {max_concentration*100:.1f}% of trades in single market", + }) + risk_score += 2 + + # Signal 2: Large trade sizes + trade_sizes = [t.size * t.price for t in trades] + avg_size = sum(trade_sizes) / len(trade_sizes) + large_trades = [s for s in trade_sizes if s > avg_size * 3] + if len(large_trades) > len(trades) * 0.1: + signals.append({ + "type": "FREQUENT_LARGE_TRADES", + "severity": "MEDIUM", + "details": f"{len(large_trades)} trades exceed 3x average size ({format_currency(avg_size)})", + }) + risk_score += 2 + + # Signal 3: High volume + total_volume = sum(trade_sizes) + if total_volume > 100000: + severity = "HIGH" if total_volume > 500000 else "MEDIUM" + signals.append({ + "type": "HIGH_TOTAL_VOLUME", + "severity": severity, + "details": f"Total trading volume: {format_currency(total_volume)}", + }) + risk_score += 3 if total_volume > 500000 else 2 + + # Signal 4: One-sided trading + buy_count = sum(1 for t in trades if t.side.upper() == "BUY") + buy_ratio = buy_count / len(trades) + if buy_ratio > 0.85 or buy_ratio < 0.15: + signals.append({ + "type": "ONE_SIDED_TRADING", + "severity": "MEDIUM", + "details": f"Trading heavily skewed: {buy_ratio*100:.1f}% buys", + }) + risk_score += 2 + + # Signal 5: Burst trading + timestamps = sorted([t.timestamp for t in trades if t.timestamp]) + burst_count = 0 + for i in range(len(timestamps) - 1): + if (timestamps[i + 1] - timestamps[i]).total_seconds() < 60: + burst_count += 1 + + if len(timestamps) >= 10 and burst_count > len(timestamps) * 0.2: + signals.append({ + "type": "BURST_TRADING", + "severity": "LOW", + "details": f"{burst_count} trades within 1 minute of each other", + }) + risk_score += 1 + + # Determine risk level + if risk_score >= 7: + risk_level = "HIGH" + elif risk_score >= 4: + risk_level = "MEDIUM" + else: + risk_level = "LOW" + + # Display results + print(f"\nRisk Assessment:") + print("-" * 40) + print(f" Risk Level: {risk_level}") + print(f" Risk Score: {risk_score}/10") + print(f" Trades Analyzed: {len(trades)}") + + if signals: + print(f"\nSignals Detected ({len(signals)}):") + print("-" * 40) + for signal in signals: + severity = signal["severity"] + marker = "[!]" if severity == "HIGH" else "[*]" if severity == "MEDIUM" else "[-]" + print(f" {marker} {signal['type']}") + print(f" Severity: {severity}") + print(f" {signal['details']}") + else: + print("\n No significant insider trading signals detected.") + + return {"risk_level": risk_level, "signals": signals, "risk_score": risk_score} + + +def compare_wallets(exchange, wallet_addresses: list, limit_per_wallet: int = 200): + """Compare trading patterns across wallets.""" + print(f"\nComparing {len(wallet_addresses)} wallets") + print("=" * 80) + + wallet_data = {} + market_sets = [] + + for wallet in wallet_addresses: + print(f"\n Fetching data for {wallet[:10]}...") + try: + trades = exchange.fetch_public_trades( + user=wallet, + limit=limit_per_wallet, + taker_only=False, + ) + + markets = set() + volume = 0.0 + for t in trades: + if t.condition_id: + markets.add(t.condition_id) + volume += t.size * t.price + + wallet_data[wallet] = { + "trades": len(trades), + "volume": volume, + "markets": len(markets), + } + market_sets.append(markets) + except Exception as e: + wallet_data[wallet] = {"error": str(e)} + + # Find common markets + common_markets = [] + if len(market_sets) >= 2: + common = market_sets[0] + for s in market_sets[1:]: + common = common.intersection(s) + common_markets = list(common) + + print("\nWallet Comparison:") + print("-" * 60) + print(f"{'Wallet':<14} {'Trades':>8} {'Volume':>14} {'Markets':>8}") + print("-" * 60) + + for wallet, data in wallet_data.items(): + if "error" in data: + print(f"{wallet[:12]}.. {'Error':>8}") + else: + print( + f"{wallet[:12]}.. {data['trades']:>8} {format_currency(data['volume']):>14} {data['markets']:>8}" + ) + + print(f"\nCommon Markets: {len(common_markets)}") + + if common_markets: + overlap_ratio = len(common_markets) / min(len(s) for s in market_sets if s) + if overlap_ratio > 0.5: + print(f"\n [!] HIGH OVERLAP: {overlap_ratio*100:.1f}% market overlap detected") + print(" This may indicate coordinated trading") + + +def main(): + parser = argparse.ArgumentParser( + description="Analyze wallet trading patterns for insider signals", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + + parser.add_argument( + "wallet_address", + nargs="?", + help="Wallet address to analyze", + ) + parser.add_argument( + "--exchange", + default="polymarket", + help="Exchange name (default: polymarket)", + ) + parser.add_argument( + "--limit", + type=int, + default=500, + help="Maximum trades to fetch (default: 500)", + ) + parser.add_argument( + "--market", + help="Filter by market/condition ID", + ) + parser.add_argument( + "--analyze", + action="store_true", + help="Analyze trading performance", + ) + parser.add_argument( + "--detect-signals", + action="store_true", + help="Detect insider trading signals", + ) + parser.add_argument( + "--compare", + nargs="+", + metavar="WALLET", + help="Compare multiple wallets", + ) + parser.add_argument( + "--json", + action="store_true", + help="Output in JSON format", + ) + + args = parser.parse_args() + + # Validate arguments + if not args.wallet_address and not args.compare: + parser.error("Either wallet_address or --compare is required") + + # Create exchange (read-only, no credentials needed) + exchange = create_exchange(args.exchange, verbose=False, validate=False) + + try: + if args.compare: + if len(args.compare) < 2: + parser.error("--compare requires at least 2 wallet addresses") + compare_wallets(exchange, args.compare, args.limit) + elif args.detect_signals: + result = detect_signals(exchange, args.wallet_address, args.limit, args.market) + if args.json and result: + print("\n" + json.dumps(result, indent=2)) + elif args.analyze: + analyze_performance(exchange, args.wallet_address, args.limit) + else: + fetch_trades(exchange, args.wallet_address, args.limit, args.market) + + except Exception as e: + print(f"\nError: {e}", file=sys.stderr) + sys.exit(1) + + +if __name__ == "__main__": + main()