From 5ee34ce2b70d806d44da3756423c52645bc102ac Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 18 Jan 2026 05:08:15 +0000 Subject: [PATCH 1/5] feat(examples): add Polymarket copytrading bot Add a copytrading bot that monitors a target wallet's trades and mirrors them on your account. Features include: - Poll-based trade monitoring via Data API - Configurable scale factor for position sizing - Market filtering by slug - Duplicate trade prevention - Status logging with session statistics --- examples/copytrading.py | 423 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 423 insertions(+) create mode 100644 examples/copytrading.py diff --git a/examples/copytrading.py b/examples/copytrading.py new file mode 100644 index 0000000..700fed0 --- /dev/null +++ b/examples/copytrading.py @@ -0,0 +1,423 @@ +""" +Polymarket Copytrading Bot + +Monitors a target wallet's trades and mirrors them on your account. +Uses dr-manhattan's unified API for Polymarket trading. + +Usage: + uv run python examples/copytrading.py --target + uv run python examples/copytrading.py --target --scale 0.5 + uv run python examples/copytrading.py --target --markets fed-decision +""" + +import argparse +import os +import sys +import time +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Dict, List, Optional, Set + +from dotenv import load_dotenv + +from dr_manhattan import Polymarket +from dr_manhattan.exchanges.polymarket import PublicTrade +from dr_manhattan.models import Market +from dr_manhattan.models.order import OrderSide +from dr_manhattan.utils import setup_logger +from dr_manhattan.utils.logger import Colors + +logger = setup_logger(__name__) + + +@dataclass +class CopyStats: + """Statistics for copytrading session""" + + trades_detected: int = 0 + trades_copied: int = 0 + trades_skipped: int = 0 + trades_failed: int = 0 + total_volume: float = 0.0 + start_time: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + + +class CopytradingBot: + """ + Copytrading bot that monitors a target wallet and mirrors trades. + + Features: + - Polls target wallet trades via Polymarket Data API + - Mirrors trades with configurable size scaling + - Tracks copied trades to avoid duplicates + - Supports market filtering + """ + + def __init__( + self, + exchange: Polymarket, + target_wallet: str, + scale_factor: float = 1.0, + poll_interval: float = 5.0, + max_position: float = 100.0, + min_trade_size: float = 1.0, + market_filter: Optional[List[str]] = None, + ): + """ + Initialize copytrading bot. + + Args: + exchange: Authenticated Polymarket exchange + target_wallet: Target wallet address to copy + scale_factor: Multiply target trade sizes by this factor + poll_interval: Seconds between polling for new trades + max_position: Maximum position size per outcome + min_trade_size: Minimum trade size to copy + market_filter: List of market slugs/IDs to filter (None = all) + """ + self.exchange = exchange + self.target_wallet = target_wallet + self.scale_factor = scale_factor + self.poll_interval = poll_interval + self.max_position = max_position + self.min_trade_size = min_trade_size + self.market_filter = market_filter + + self.is_running = False + self.copied_trades: Set[str] = set() + self.stats = CopyStats() + self.market_cache: Dict[str, Market] = {} + self.last_poll_time: Optional[datetime] = None + + def _get_trade_id(self, trade: PublicTrade) -> str: + """Generate unique ID for a trade""" + return f"{trade.transaction_hash}_{trade.outcome_index}" + + def _should_copy_trade(self, trade: PublicTrade) -> bool: + """Check if trade should be copied""" + trade_id = self._get_trade_id(trade) + + if trade_id in self.copied_trades: + return False + + if trade.size < self.min_trade_size: + logger.debug(f"Skipping small trade: {trade.size}") + return False + + if self.market_filter: + slug = trade.event_slug or trade.slug or "" + if not any(f.lower() in slug.lower() for f in self.market_filter): + return False + + return True + + def _get_market(self, trade: PublicTrade) -> Optional[Market]: + """Get market data for a trade""" + condition_id = trade.condition_id + if not condition_id: + return None + + if condition_id in self.market_cache: + return self.market_cache[condition_id] + + try: + slug = trade.event_slug or trade.slug + if slug: + markets = self.exchange.fetch_markets_by_slug(slug) + for market in markets: + self.market_cache[market.id] = market + if market.id == condition_id: + return market + + market = self.exchange.fetch_market(condition_id) + self.market_cache[condition_id] = market + return market + except Exception as e: + logger.warning(f"Failed to fetch market {condition_id}: {e}") + return None + + def _get_token_id(self, market: Market, outcome: str) -> Optional[str]: + """Get token ID for an outcome""" + token_ids = market.metadata.get("clobTokenIds", []) + outcomes = market.outcomes + + if not token_ids or len(token_ids) != len(outcomes): + try: + token_ids = self.exchange.fetch_token_ids(market.id) + market.metadata["clobTokenIds"] = token_ids + except Exception as e: + logger.warning(f"Failed to fetch token IDs: {e}") + return None + + for i, out in enumerate(outcomes): + if out.lower() == outcome.lower(): + return token_ids[i] if i < len(token_ids) else None + + return None + + def _execute_copy_trade(self, trade: PublicTrade) -> bool: + """Execute a copy of the target's trade""" + market = self._get_market(trade) + if not market: + logger.error(f"Cannot find market for trade: {trade.condition_id}") + return False + + outcome = trade.outcome + if not outcome: + outcome = market.outcomes[trade.outcome_index] if trade.outcome_index is not None else None + + if not outcome: + logger.error("Cannot determine outcome for trade") + return False + + token_id = self._get_token_id(market, outcome) + if not token_id: + logger.error(f"Cannot find token ID for outcome: {outcome}") + return False + + side = OrderSide.BUY if trade.side.upper() == "BUY" else OrderSide.SELL + size = trade.size * self.scale_factor + price = trade.price + + if size > self.max_position: + size = self.max_position + logger.warning(f"Capped trade size to max_position: {self.max_position}") + + try: + order = self.exchange.create_order( + market_id=market.id, + outcome=outcome, + side=side, + price=price, + size=size, + params={"token_id": token_id}, + ) + + side_colored = Colors.green("BUY") if side == OrderSide.BUY else Colors.red("SELL") + logger.info( + f" {Colors.cyan('COPIED')} {side_colored} {size:.2f} " + f"{Colors.magenta(outcome[:20])} @ {Colors.yellow(f'{price:.4f}')} " + f"[{Colors.gray(order.id[:8] + '...')}]" + ) + return True + + except Exception as e: + logger.error(f"Failed to execute copy trade: {e}") + return False + + def _poll_trades(self) -> List[PublicTrade]: + """Poll for new trades from target wallet""" + try: + trades = self.exchange.fetch_public_trades( + user=self.target_wallet, + limit=50, + taker_only=True, + ) + + if self.last_poll_time: + trades = [t for t in trades if t.timestamp > self.last_poll_time] + + self.last_poll_time = datetime.now(timezone.utc) + return trades + + except Exception as e: + logger.warning(f"Failed to fetch trades: {e}") + return [] + + def _process_trades(self, trades: List[PublicTrade]): + """Process new trades from target wallet""" + for trade in trades: + self.stats.trades_detected += 1 + + if not self._should_copy_trade(trade): + self.stats.trades_skipped += 1 + continue + + trade_id = self._get_trade_id(trade) + side_str = trade.side.upper() + outcome_str = trade.outcome or f"idx:{trade.outcome_index}" + + logger.info( + f"\n{Colors.bold('New Trade Detected:')} " + f"{Colors.cyan(side_str)} {trade.size:.2f} {Colors.magenta(outcome_str[:20])} " + f"@ {Colors.yellow(f'{trade.price:.4f}')} [{Colors.gray(trade.slug or '')}]" + ) + + if self._execute_copy_trade(trade): + self.copied_trades.add(trade_id) + self.stats.trades_copied += 1 + self.stats.total_volume += trade.size * self.scale_factor + else: + self.stats.trades_failed += 1 + + def log_status(self): + """Log current status""" + elapsed = (datetime.now(timezone.utc) - self.stats.start_time).total_seconds() + elapsed_str = f"{int(elapsed // 60)}m {int(elapsed % 60)}s" + + logger.info( + f"\n[{time.strftime('%H:%M:%S')}] " + f"{Colors.bold('Status:')} " + f"Detected: {Colors.cyan(str(self.stats.trades_detected))} | " + f"Copied: {Colors.green(str(self.stats.trades_copied))} | " + f"Skipped: {Colors.gray(str(self.stats.trades_skipped))} | " + f"Failed: {Colors.red(str(self.stats.trades_failed))} | " + f"Volume: {Colors.yellow(f'${self.stats.total_volume:.2f}')} | " + f"Uptime: {Colors.gray(elapsed_str)}" + ) + + def run(self, duration_minutes: Optional[int] = None): + """Run the copytrading bot""" + logger.info(f"\n{Colors.bold('Copytrading Bot Started')}") + logger.info(f"Target: {Colors.cyan(self.target_wallet)}") + logger.info(f"Scale: {Colors.yellow(f'{self.scale_factor}x')}") + logger.info(f"Interval: {Colors.gray(f'{self.poll_interval}s')}") + logger.info(f"Max Position: {Colors.blue(f'{self.max_position}')}") + + if self.market_filter: + logger.info(f"Markets: {Colors.magenta(', '.join(self.market_filter))}") + + address = getattr(self.exchange, "_address", None) + if address: + logger.info(f"Bot Address: {Colors.cyan(address)}") + + try: + balance = self.exchange.fetch_balance() + usdc = balance.get("USDC", 0.0) + logger.info(f"Balance: {Colors.green(f'${usdc:,.2f}')} USDC") + except Exception as e: + logger.warning(f"Failed to fetch balance: {e}") + + logger.info(f"\n{Colors.gray('Waiting for trades...')}") + + self.is_running = True + self.stats = CopyStats() + start_time = time.time() + end_time = start_time + (duration_minutes * 60) if duration_minutes else None + + try: + while self.is_running: + if end_time and time.time() >= end_time: + break + + trades = self._poll_trades() + if trades: + self._process_trades(trades) + + self.log_status() + time.sleep(self.poll_interval) + + except KeyboardInterrupt: + logger.info("\nStopping...") + + finally: + self.is_running = False + self._log_summary() + + def _log_summary(self): + """Log final summary""" + elapsed = (datetime.now(timezone.utc) - self.stats.start_time).total_seconds() + + logger.info(f"\n{Colors.bold('Session Summary')}") + logger.info(f"Duration: {int(elapsed // 60)}m {int(elapsed % 60)}s") + logger.info(f"Trades Detected: {self.stats.trades_detected}") + logger.info(f"Trades Copied: {Colors.green(str(self.stats.trades_copied))}") + logger.info(f"Trades Skipped: {self.stats.trades_skipped}") + logger.info(f"Trades Failed: {Colors.red(str(self.stats.trades_failed))}") + logger.info(f"Total Volume: {Colors.yellow(f'${self.stats.total_volume:.2f}')}") + + def stop(self): + """Stop the bot""" + self.is_running = False + + +def parse_args() -> argparse.Namespace: + """Parse command line arguments""" + parser = argparse.ArgumentParser(description="Polymarket Copytrading Bot") + parser.add_argument( + "-t", + "--target", + required=True, + help="Target wallet address to copy trades from", + ) + parser.add_argument( + "-s", + "--scale", + type=float, + default=float(os.getenv("SCALE_FACTOR", "1.0")), + help="Scale factor for trade sizes (default: 1.0)", + ) + parser.add_argument( + "-i", + "--interval", + type=float, + default=float(os.getenv("POLL_INTERVAL", "5")), + help="Poll interval in seconds (default: 5)", + ) + parser.add_argument( + "--max-position", + type=float, + default=float(os.getenv("MAX_POSITION", "100")), + help="Maximum position size (default: 100)", + ) + parser.add_argument( + "--min-size", + type=float, + default=float(os.getenv("MIN_TRADE_SIZE", "1")), + help="Minimum trade size to copy (default: 1)", + ) + parser.add_argument( + "-m", + "--markets", + nargs="*", + default=None, + help="Filter to specific market slugs", + ) + parser.add_argument( + "-d", + "--duration", + type=int, + default=None, + help="Duration in minutes (default: indefinite)", + ) + return parser.parse_args() + + +def main() -> int: + """Entry point""" + load_dotenv() + args = parse_args() + + private_key = os.getenv("POLYMARKET_PRIVATE_KEY") or os.getenv("PRIVATE_KEY") + if not private_key: + logger.error("POLYMARKET_PRIVATE_KEY or PRIVATE_KEY required in environment") + return 1 + + funder = os.getenv("POLYMARKET_FUNDER") or os.getenv("FUNDER") + + try: + exchange = Polymarket({ + "private_key": private_key, + "funder": funder, + "verbose": False, + }) + except Exception as e: + logger.error(f"Failed to initialize exchange: {e}") + return 1 + + bot = CopytradingBot( + exchange=exchange, + target_wallet=args.target, + scale_factor=args.scale, + poll_interval=args.interval, + max_position=args.max_position, + min_trade_size=args.min_size, + market_filter=args.markets, + ) + + bot.run(duration_minutes=args.duration) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) From 4941585d1ef182587d38f71d1db71366f21157a0 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 18 Jan 2026 11:32:07 +0000 Subject: [PATCH 2/5] feat(utils): add Telegram bot integration Add TelegramBot utility class for sending notifications via Telegram Bot API. Integrate with copytrading bot to send: - Startup/shutdown notifications - Trade detected and copied notifications - Error notifications Usage: --telegram --telegram-token --telegram-chat-id Or set TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID environment variables --- dr_manhattan/utils/__init__.py | 2 + dr_manhattan/utils/telegram.py | 216 +++++++++++++++++++++++++++++++++ examples/copytrading.py | 98 +++++++++++++-- 3 files changed, 309 insertions(+), 7 deletions(-) create mode 100644 dr_manhattan/utils/telegram.py diff --git a/dr_manhattan/utils/__init__.py b/dr_manhattan/utils/__init__.py index 2c1df6b..b1ac463 100644 --- a/dr_manhattan/utils/__init__.py +++ b/dr_manhattan/utils/__init__.py @@ -1,6 +1,7 @@ """Utility functions and helpers for Dr. Manhattan.""" from .logger import ColoredFormatter, default_logger, setup_logger +from .telegram import TelegramBot from .tui import prompt_confirm, prompt_market_selection, prompt_selection __all__ = [ @@ -10,4 +11,5 @@ "prompt_selection", "prompt_market_selection", "prompt_confirm", + "TelegramBot", ] diff --git a/dr_manhattan/utils/telegram.py b/dr_manhattan/utils/telegram.py new file mode 100644 index 0000000..1c021ca --- /dev/null +++ b/dr_manhattan/utils/telegram.py @@ -0,0 +1,216 @@ +""" +Telegram Bot Integration + +Simple Telegram notification module using the Bot API. +""" + +import logging +from dataclasses import dataclass +from typing import Optional + +import requests + +logger = logging.getLogger(__name__) + + +@dataclass +class TelegramConfig: + """Telegram bot configuration""" + + bot_token: str + chat_id: str + parse_mode: str = "HTML" + disable_notification: bool = False + + +class TelegramBot: + """ + Simple Telegram bot for sending notifications. + + Usage: + bot = TelegramBot(token="your_bot_token", chat_id="your_chat_id") + bot.send("Hello from Dr. Manhattan!") + """ + + BASE_URL = "https://api.telegram.org/bot{token}/{method}" + + def __init__( + self, + token: str, + chat_id: str, + parse_mode: str = "HTML", + disable_notification: bool = False, + timeout: int = 10, + ): + """ + Initialize Telegram bot. + + Args: + token: Bot token from @BotFather + chat_id: Chat ID to send messages to + parse_mode: Message parse mode (HTML, Markdown, MarkdownV2) + disable_notification: Send messages silently + timeout: Request timeout in seconds + """ + self.token = token + self.chat_id = chat_id + self.parse_mode = parse_mode + self.disable_notification = disable_notification + self.timeout = timeout + self._enabled = bool(token and chat_id) + + @property + def enabled(self) -> bool: + """Check if bot is configured and enabled""" + return self._enabled + + def _request(self, method: str, data: dict) -> Optional[dict]: + """Make API request to Telegram""" + if not self._enabled: + return None + + url = self.BASE_URL.format(token=self.token, method=method) + + try: + response = requests.post(url, json=data, timeout=self.timeout) + result = response.json() + + if not result.get("ok"): + logger.warning(f"Telegram API error: {result.get('description')}") + return None + + return result.get("result") + + except requests.Timeout: + logger.warning("Telegram request timed out") + return None + except requests.RequestException as e: + logger.warning(f"Telegram request failed: {e}") + return None + except Exception as e: + logger.warning(f"Telegram error: {e}") + return None + + def send( + self, + text: str, + parse_mode: Optional[str] = None, + disable_notification: Optional[bool] = None, + ) -> bool: + """ + Send a text message. + + Args: + text: Message text + parse_mode: Override default parse mode + disable_notification: Override default notification setting + + Returns: + True if message sent successfully + """ + if not self._enabled: + return False + + data = { + "chat_id": self.chat_id, + "text": text, + "parse_mode": parse_mode or self.parse_mode, + "disable_notification": disable_notification + if disable_notification is not None + else self.disable_notification, + } + + result = self._request("sendMessage", data) + return result is not None + + def send_trade_notification( + self, + side: str, + size: float, + outcome: str, + price: float, + market: str = "", + is_copy: bool = True, + ) -> bool: + """ + Send a trade notification with formatted message. + + Args: + side: BUY or SELL + size: Trade size + outcome: Outcome name + price: Trade price + market: Market name/slug + is_copy: Whether this is a copied trade + """ + side_emoji = "🟢" if side.upper() == "BUY" else "🔴" + action = "Copied" if is_copy else "Detected" + + text = ( + f"{side_emoji} Trade {action}\n" + f"Side: {side.upper()}\n" + f"Size: {size:.2f}\n" + f"Outcome: {outcome}\n" + f"Price: {price:.4f}" + ) + + if market: + text += f"\nMarket: {market}" + + return self.send(text) + + def send_status( + self, + trades_detected: int, + trades_copied: int, + trades_failed: int, + total_volume: float, + uptime: str, + ) -> bool: + """Send a status update notification""" + text = ( + f"📊 Copytrading Status\n" + f"Detected: {trades_detected}\n" + f"Copied: {trades_copied}\n" + f"Failed: {trades_failed}\n" + f"Volume: ${total_volume:.2f}\n" + f"Uptime: {uptime}" + ) + + return self.send(text) + + def send_error(self, error: str, context: str = "") -> bool: + """Send an error notification""" + text = f"⚠️ Error\n{error}" + if context: + text += f"\nContext: {context}" + + return self.send(text) + + def send_startup( + self, + target_wallet: str, + scale_factor: float, + balance: float, + ) -> bool: + """Send startup notification""" + text = ( + f"🚀 Copytrading Bot Started\n" + f"Target: {target_wallet[:8]}...{target_wallet[-6:]}\n" + f"Scale: {scale_factor}x\n" + f"Balance: ${balance:,.2f}" + ) + + return self.send(text) + + def send_shutdown(self, stats: dict) -> bool: + """Send shutdown notification with final stats""" + text = ( + f"🛑 Copytrading Bot Stopped\n" + f"Trades Copied: {stats.get('copied', 0)}\n" + f"Trades Failed: {stats.get('failed', 0)}\n" + f"Total Volume: ${stats.get('volume', 0):.2f}\n" + f"Duration: {stats.get('duration', 'N/A')}" + ) + + return self.send(text) diff --git a/examples/copytrading.py b/examples/copytrading.py index 700fed0..0758bf2 100644 --- a/examples/copytrading.py +++ b/examples/copytrading.py @@ -7,7 +7,7 @@ Usage: uv run python examples/copytrading.py --target uv run python examples/copytrading.py --target --scale 0.5 - uv run python examples/copytrading.py --target --markets fed-decision + uv run python examples/copytrading.py --target --telegram """ import argparse @@ -24,7 +24,7 @@ from dr_manhattan.exchanges.polymarket import PublicTrade from dr_manhattan.models import Market from dr_manhattan.models.order import OrderSide -from dr_manhattan.utils import setup_logger +from dr_manhattan.utils import TelegramBot, setup_logger from dr_manhattan.utils.logger import Colors logger = setup_logger(__name__) @@ -51,6 +51,7 @@ class CopytradingBot: - Mirrors trades with configurable size scaling - Tracks copied trades to avoid duplicates - Supports market filtering + - Telegram notifications for trades and status """ def __init__( @@ -62,6 +63,7 @@ def __init__( max_position: float = 100.0, min_trade_size: float = 1.0, market_filter: Optional[List[str]] = None, + telegram: Optional[TelegramBot] = None, ): """ Initialize copytrading bot. @@ -74,6 +76,7 @@ def __init__( max_position: Maximum position size per outcome min_trade_size: Minimum trade size to copy market_filter: List of market slugs/IDs to filter (None = all) + telegram: Optional TelegramBot for notifications """ self.exchange = exchange self.target_wallet = target_wallet @@ -82,6 +85,7 @@ def __init__( self.max_position = max_position self.min_trade_size = min_trade_size self.market_filter = market_filter + self.telegram = telegram self.is_running = False self.copied_trades: Set[str] = set() @@ -160,6 +164,11 @@ def _execute_copy_trade(self, trade: PublicTrade) -> bool: market = self._get_market(trade) if not market: logger.error(f"Cannot find market for trade: {trade.condition_id}") + if self.telegram: + self.telegram.send_error( + f"Cannot find market: {trade.condition_id}", + context="execute_copy_trade", + ) return False outcome = trade.outcome @@ -199,10 +208,23 @@ def _execute_copy_trade(self, trade: PublicTrade) -> bool: f"{Colors.magenta(outcome[:20])} @ {Colors.yellow(f'{price:.4f}')} " f"[{Colors.gray(order.id[:8] + '...')}]" ) + + if self.telegram: + self.telegram.send_trade_notification( + side=side.value, + size=size, + outcome=outcome, + price=price, + market=trade.slug or trade.event_slug or "", + is_copy=True, + ) + return True except Exception as e: logger.error(f"Failed to execute copy trade: {e}") + if self.telegram: + self.telegram.send_error(str(e), context="execute_copy_trade") return False def _poll_trades(self) -> List[PublicTrade]: @@ -243,6 +265,16 @@ def _process_trades(self, trades: List[PublicTrade]): f"@ {Colors.yellow(f'{trade.price:.4f}')} [{Colors.gray(trade.slug or '')}]" ) + if self.telegram: + self.telegram.send_trade_notification( + side=side_str, + size=trade.size, + outcome=outcome_str, + price=trade.price, + market=trade.slug or trade.event_slug or "", + is_copy=False, + ) + if self._execute_copy_trade(trade): self.copied_trades.add(trade_id) self.stats.trades_copied += 1 @@ -250,10 +282,14 @@ def _process_trades(self, trades: List[PublicTrade]): else: self.stats.trades_failed += 1 + def _get_uptime_str(self) -> str: + """Get formatted uptime string""" + elapsed = (datetime.now(timezone.utc) - self.stats.start_time).total_seconds() + return f"{int(elapsed // 60)}m {int(elapsed % 60)}s" + def log_status(self): """Log current status""" - elapsed = (datetime.now(timezone.utc) - self.stats.start_time).total_seconds() - elapsed_str = f"{int(elapsed // 60)}m {int(elapsed % 60)}s" + uptime_str = self._get_uptime_str() logger.info( f"\n[{time.strftime('%H:%M:%S')}] " @@ -263,7 +299,7 @@ def log_status(self): f"Skipped: {Colors.gray(str(self.stats.trades_skipped))} | " f"Failed: {Colors.red(str(self.stats.trades_failed))} | " f"Volume: {Colors.yellow(f'${self.stats.total_volume:.2f}')} | " - f"Uptime: {Colors.gray(elapsed_str)}" + f"Uptime: {Colors.gray(uptime_str)}" ) def run(self, duration_minutes: Optional[int] = None): @@ -277,10 +313,14 @@ def run(self, duration_minutes: Optional[int] = None): if self.market_filter: logger.info(f"Markets: {Colors.magenta(', '.join(self.market_filter))}") + if self.telegram and self.telegram.enabled: + logger.info(f"Telegram: {Colors.green('Enabled')}") + address = getattr(self.exchange, "_address", None) if address: logger.info(f"Bot Address: {Colors.cyan(address)}") + usdc = 0.0 try: balance = self.exchange.fetch_balance() usdc = balance.get("USDC", 0.0) @@ -288,6 +328,13 @@ def run(self, duration_minutes: Optional[int] = None): except Exception as e: logger.warning(f"Failed to fetch balance: {e}") + if self.telegram: + self.telegram.send_startup( + target_wallet=self.target_wallet, + scale_factor=self.scale_factor, + balance=usdc, + ) + logger.info(f"\n{Colors.gray('Waiting for trades...')}") self.is_running = True @@ -316,16 +363,24 @@ def run(self, duration_minutes: Optional[int] = None): def _log_summary(self): """Log final summary""" - elapsed = (datetime.now(timezone.utc) - self.stats.start_time).total_seconds() + duration_str = self._get_uptime_str() logger.info(f"\n{Colors.bold('Session Summary')}") - logger.info(f"Duration: {int(elapsed // 60)}m {int(elapsed % 60)}s") + logger.info(f"Duration: {duration_str}") logger.info(f"Trades Detected: {self.stats.trades_detected}") logger.info(f"Trades Copied: {Colors.green(str(self.stats.trades_copied))}") logger.info(f"Trades Skipped: {self.stats.trades_skipped}") logger.info(f"Trades Failed: {Colors.red(str(self.stats.trades_failed))}") logger.info(f"Total Volume: {Colors.yellow(f'${self.stats.total_volume:.2f}')}") + if self.telegram: + self.telegram.send_shutdown({ + "copied": self.stats.trades_copied, + "failed": self.stats.trades_failed, + "volume": self.stats.total_volume, + "duration": duration_str, + }) + def stop(self): """Stop the bot""" self.is_running = False @@ -380,6 +435,21 @@ def parse_args() -> argparse.Namespace: default=None, help="Duration in minutes (default: indefinite)", ) + parser.add_argument( + "--telegram", + action="store_true", + help="Enable Telegram notifications", + ) + parser.add_argument( + "--telegram-token", + default=os.getenv("TELEGRAM_BOT_TOKEN"), + help="Telegram bot token (or set TELEGRAM_BOT_TOKEN)", + ) + parser.add_argument( + "--telegram-chat-id", + default=os.getenv("TELEGRAM_CHAT_ID"), + help="Telegram chat ID (or set TELEGRAM_CHAT_ID)", + ) return parser.parse_args() @@ -405,6 +475,19 @@ def main() -> int: logger.error(f"Failed to initialize exchange: {e}") return 1 + telegram = None + if args.telegram: + if not args.telegram_token or not args.telegram_chat_id: + logger.error("Telegram enabled but TELEGRAM_BOT_TOKEN or TELEGRAM_CHAT_ID not set") + return 1 + telegram = TelegramBot( + token=args.telegram_token, + chat_id=args.telegram_chat_id, + ) + if not telegram.enabled: + logger.warning("Telegram bot not properly configured") + telegram = None + bot = CopytradingBot( exchange=exchange, target_wallet=args.target, @@ -413,6 +496,7 @@ def main() -> int: max_position=args.max_position, min_trade_size=args.min_size, market_filter=args.markets, + telegram=telegram, ) bot.run(duration_minutes=args.duration) From 86213ae33cfb7fe74b4d7c032ff07f296164859a Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 18 Jan 2026 12:35:06 +0000 Subject: [PATCH 3/5] refactor(telegram): modular folder structure with strict types Refactor Telegram module into a scalable folder structure: - telegram/types.py: Strict type definitions (ParseMode, SendResult, etc.) - telegram/bot.py: Generic TelegramBot class with type-safe API - telegram/formatters.py: HTML formatting utilities and MessageBuilder Features: - Fully typed with dataclasses and enums - Generic bot class for any use case - MessageBuilder for fluent message construction - HTML formatting helpers (bold, code, link, etc.) - Support for photos, documents, inline keyboards - Batch sending and message editing --- dr_manhattan/utils/telegram.py | 216 -------------- dr_manhattan/utils/telegram/__init__.py | 97 ++++++ dr_manhattan/utils/telegram/bot.py | 348 ++++++++++++++++++++++ dr_manhattan/utils/telegram/formatters.py | 308 +++++++++++++++++++ dr_manhattan/utils/telegram/types.py | 168 +++++++++++ examples/copytrading.py | 138 ++++++--- 6 files changed, 1018 insertions(+), 257 deletions(-) delete mode 100644 dr_manhattan/utils/telegram.py create mode 100644 dr_manhattan/utils/telegram/__init__.py create mode 100644 dr_manhattan/utils/telegram/bot.py create mode 100644 dr_manhattan/utils/telegram/formatters.py create mode 100644 dr_manhattan/utils/telegram/types.py diff --git a/dr_manhattan/utils/telegram.py b/dr_manhattan/utils/telegram.py deleted file mode 100644 index 1c021ca..0000000 --- a/dr_manhattan/utils/telegram.py +++ /dev/null @@ -1,216 +0,0 @@ -""" -Telegram Bot Integration - -Simple Telegram notification module using the Bot API. -""" - -import logging -from dataclasses import dataclass -from typing import Optional - -import requests - -logger = logging.getLogger(__name__) - - -@dataclass -class TelegramConfig: - """Telegram bot configuration""" - - bot_token: str - chat_id: str - parse_mode: str = "HTML" - disable_notification: bool = False - - -class TelegramBot: - """ - Simple Telegram bot for sending notifications. - - Usage: - bot = TelegramBot(token="your_bot_token", chat_id="your_chat_id") - bot.send("Hello from Dr. Manhattan!") - """ - - BASE_URL = "https://api.telegram.org/bot{token}/{method}" - - def __init__( - self, - token: str, - chat_id: str, - parse_mode: str = "HTML", - disable_notification: bool = False, - timeout: int = 10, - ): - """ - Initialize Telegram bot. - - Args: - token: Bot token from @BotFather - chat_id: Chat ID to send messages to - parse_mode: Message parse mode (HTML, Markdown, MarkdownV2) - disable_notification: Send messages silently - timeout: Request timeout in seconds - """ - self.token = token - self.chat_id = chat_id - self.parse_mode = parse_mode - self.disable_notification = disable_notification - self.timeout = timeout - self._enabled = bool(token and chat_id) - - @property - def enabled(self) -> bool: - """Check if bot is configured and enabled""" - return self._enabled - - def _request(self, method: str, data: dict) -> Optional[dict]: - """Make API request to Telegram""" - if not self._enabled: - return None - - url = self.BASE_URL.format(token=self.token, method=method) - - try: - response = requests.post(url, json=data, timeout=self.timeout) - result = response.json() - - if not result.get("ok"): - logger.warning(f"Telegram API error: {result.get('description')}") - return None - - return result.get("result") - - except requests.Timeout: - logger.warning("Telegram request timed out") - return None - except requests.RequestException as e: - logger.warning(f"Telegram request failed: {e}") - return None - except Exception as e: - logger.warning(f"Telegram error: {e}") - return None - - def send( - self, - text: str, - parse_mode: Optional[str] = None, - disable_notification: Optional[bool] = None, - ) -> bool: - """ - Send a text message. - - Args: - text: Message text - parse_mode: Override default parse mode - disable_notification: Override default notification setting - - Returns: - True if message sent successfully - """ - if not self._enabled: - return False - - data = { - "chat_id": self.chat_id, - "text": text, - "parse_mode": parse_mode or self.parse_mode, - "disable_notification": disable_notification - if disable_notification is not None - else self.disable_notification, - } - - result = self._request("sendMessage", data) - return result is not None - - def send_trade_notification( - self, - side: str, - size: float, - outcome: str, - price: float, - market: str = "", - is_copy: bool = True, - ) -> bool: - """ - Send a trade notification with formatted message. - - Args: - side: BUY or SELL - size: Trade size - outcome: Outcome name - price: Trade price - market: Market name/slug - is_copy: Whether this is a copied trade - """ - side_emoji = "🟢" if side.upper() == "BUY" else "🔴" - action = "Copied" if is_copy else "Detected" - - text = ( - f"{side_emoji} Trade {action}\n" - f"Side: {side.upper()}\n" - f"Size: {size:.2f}\n" - f"Outcome: {outcome}\n" - f"Price: {price:.4f}" - ) - - if market: - text += f"\nMarket: {market}" - - return self.send(text) - - def send_status( - self, - trades_detected: int, - trades_copied: int, - trades_failed: int, - total_volume: float, - uptime: str, - ) -> bool: - """Send a status update notification""" - text = ( - f"📊 Copytrading Status\n" - f"Detected: {trades_detected}\n" - f"Copied: {trades_copied}\n" - f"Failed: {trades_failed}\n" - f"Volume: ${total_volume:.2f}\n" - f"Uptime: {uptime}" - ) - - return self.send(text) - - def send_error(self, error: str, context: str = "") -> bool: - """Send an error notification""" - text = f"⚠️ Error\n{error}" - if context: - text += f"\nContext: {context}" - - return self.send(text) - - def send_startup( - self, - target_wallet: str, - scale_factor: float, - balance: float, - ) -> bool: - """Send startup notification""" - text = ( - f"🚀 Copytrading Bot Started\n" - f"Target: {target_wallet[:8]}...{target_wallet[-6:]}\n" - f"Scale: {scale_factor}x\n" - f"Balance: ${balance:,.2f}" - ) - - return self.send(text) - - def send_shutdown(self, stats: dict) -> bool: - """Send shutdown notification with final stats""" - text = ( - f"🛑 Copytrading Bot Stopped\n" - f"Trades Copied: {stats.get('copied', 0)}\n" - f"Trades Failed: {stats.get('failed', 0)}\n" - f"Total Volume: ${stats.get('volume', 0):.2f}\n" - f"Duration: {stats.get('duration', 'N/A')}" - ) - - return self.send(text) diff --git a/dr_manhattan/utils/telegram/__init__.py b/dr_manhattan/utils/telegram/__init__.py new file mode 100644 index 0000000..3ec0f26 --- /dev/null +++ b/dr_manhattan/utils/telegram/__init__.py @@ -0,0 +1,97 @@ +""" +Telegram Bot Integration Module + +A scalable, type-safe Telegram bot client for sending messages and notifications. + +Basic Usage: + from dr_manhattan.utils.telegram import TelegramBot + + bot = TelegramBot(token="...", chat_id="...") + bot.send("Hello, World!") + +With Message Builder: + from dr_manhattan.utils.telegram import TelegramBot, MessageBuilder + + msg = (MessageBuilder() + .title("Status Update") + .field("CPU", "45%") + .field("Memory", "2.1GB") + .build()) + + bot.send(msg) + +With Formatters: + from dr_manhattan.utils.telegram import TelegramBot + from dr_manhattan.utils.telegram.formatters import bold, code, key_value + + bot.send(f"{bold('Alert')}: Server is {code('online')}") +""" + +from .bot import TelegramBot +from .formatters import ( + MessageBuilder, + blockquote, + bold, + bullet_list, + code, + escape_html, + italic, + key_value, + link, + mention, + numbered_list, + pre, + progress_bar, + spoiler, + strikethrough, + table, + underline, +) +from .types import ( + Chat, + ChatType, + InlineKeyboardButton, + InlineKeyboardMarkup, + Message, + MessageOptions, + ParseMode, + ReplyMarkup, + SendResult, + TelegramConfig, + User, +) + +__all__ = [ + # Core + "TelegramBot", + # Types + "TelegramConfig", + "MessageOptions", + "SendResult", + "ParseMode", + "ChatType", + "User", + "Chat", + "Message", + "InlineKeyboardButton", + "InlineKeyboardMarkup", + "ReplyMarkup", + # Formatters + "MessageBuilder", + "escape_html", + "bold", + "italic", + "code", + "pre", + "link", + "mention", + "strikethrough", + "underline", + "spoiler", + "blockquote", + "table", + "key_value", + "bullet_list", + "numbered_list", + "progress_bar", +] diff --git a/dr_manhattan/utils/telegram/bot.py b/dr_manhattan/utils/telegram/bot.py new file mode 100644 index 0000000..d2c5a98 --- /dev/null +++ b/dr_manhattan/utils/telegram/bot.py @@ -0,0 +1,348 @@ +""" +Core Telegram Bot implementation. + +A generic, type-safe Telegram bot client for sending messages and notifications. +""" + +import json +import logging +from typing import Any, Dict, List, Optional, Union + +import requests + +from .types import ( + InlineKeyboardMarkup, + Message, + MessageOptions, + ParseMode, + ReplyMarkup, + SendResult, + TelegramConfig, +) + +logger = logging.getLogger(__name__) + + +class TelegramBot: + """ + Generic Telegram bot client for sending messages. + + This is a low-level, type-safe client that can be used for any purpose. + For domain-specific formatting, use the formatters module. + + Example: + bot = TelegramBot(token="...", chat_id="...") + result = bot.send("Hello, World!") + if result.success: + print(f"Message sent: {result.message_id}") + """ + + BASE_URL = "https://api.telegram.org/bot{token}/{method}" + + def __init__( + self, + token: str, + chat_id: str, + parse_mode: ParseMode = ParseMode.HTML, + disable_notification: bool = False, + disable_web_page_preview: bool = True, + timeout: int = 10, + ) -> None: + """ + Initialize Telegram bot. + + Args: + token: Bot token from @BotFather + chat_id: Default chat ID to send messages to + parse_mode: Default parse mode for messages + disable_notification: Send messages silently by default + disable_web_page_preview: Disable link previews by default + timeout: Request timeout in seconds + """ + self._config = TelegramConfig( + bot_token=token, + chat_id=chat_id, + parse_mode=parse_mode, + disable_notification=disable_notification, + disable_web_page_preview=disable_web_page_preview, + timeout=timeout, + ) + + @classmethod + def from_config(cls, config: TelegramConfig) -> "TelegramBot": + """Create bot from config object""" + return cls( + token=config.bot_token, + chat_id=config.chat_id, + parse_mode=config.parse_mode, + disable_notification=config.disable_notification, + disable_web_page_preview=config.disable_web_page_preview, + timeout=config.timeout, + ) + + @property + def enabled(self) -> bool: + """Check if bot is configured and enabled""" + return bool(self._config.bot_token and self._config.chat_id) + + @property + def config(self) -> TelegramConfig: + """Get current configuration""" + return self._config + + def _build_url(self, method: str) -> str: + """Build API URL for method""" + return self.BASE_URL.format(token=self._config.bot_token, method=method) + + def _request( + self, + method: str, + data: Dict[str, Any], + ) -> SendResult: + """Make API request to Telegram""" + if not self.enabled: + return SendResult(success=False, error="Bot not configured") + + url = self._build_url(method) + + try: + response = requests.post(url, json=data, timeout=self._config.timeout) + result = response.json() + + if not result.get("ok"): + error_msg = result.get("description", "Unknown error") + logger.warning(f"Telegram API error: {error_msg}") + return SendResult(success=False, error=error_msg, raw=result) + + return SendResult( + success=True, + message_id=result.get("result", {}).get("message_id"), + raw=result.get("result"), + ) + + except requests.Timeout: + logger.warning("Telegram request timed out") + return SendResult(success=False, error="Request timed out") + except requests.RequestException as e: + logger.warning(f"Telegram request failed: {e}") + return SendResult(success=False, error=str(e)) + except json.JSONDecodeError as e: + logger.warning(f"Failed to parse Telegram response: {e}") + return SendResult(success=False, error=f"Invalid response: {e}") + except Exception as e: + logger.warning(f"Telegram error: {e}") + return SendResult(success=False, error=str(e)) + + def send( + self, + text: str, + chat_id: Optional[str] = None, + options: Optional[MessageOptions] = None, + reply_markup: Optional[ReplyMarkup] = None, + ) -> SendResult: + """ + Send a text message. + + Args: + text: Message text (supports HTML/Markdown based on parse_mode) + chat_id: Override default chat ID + options: Message options (parse_mode, notifications, etc.) + reply_markup: Optional inline keyboard + + Returns: + SendResult with success status and message ID + """ + if not text: + return SendResult(success=False, error="Empty message") + + opts = options or MessageOptions() + + data: Dict[str, Any] = { + "chat_id": chat_id or self._config.chat_id, + "text": text, + "parse_mode": (opts.parse_mode or self._config.parse_mode).value, + "disable_notification": opts.disable_notification + if opts.disable_notification is not None + else self._config.disable_notification, + "disable_web_page_preview": opts.disable_web_page_preview + if opts.disable_web_page_preview is not None + else self._config.disable_web_page_preview, + } + + if opts.reply_to_message_id: + data["reply_to_message_id"] = opts.reply_to_message_id + + if opts.protect_content: + data["protect_content"] = True + + if reply_markup: + if isinstance(reply_markup, InlineKeyboardMarkup): + data["reply_markup"] = reply_markup.to_dict() + + return self._request("sendMessage", data) + + def send_photo( + self, + photo: str, + caption: Optional[str] = None, + chat_id: Optional[str] = None, + options: Optional[MessageOptions] = None, + ) -> SendResult: + """ + Send a photo. + + Args: + photo: Photo URL or file_id + caption: Optional caption + chat_id: Override default chat ID + options: Message options + """ + opts = options or MessageOptions() + + data: Dict[str, Any] = { + "chat_id": chat_id or self._config.chat_id, + "photo": photo, + } + + if caption: + data["caption"] = caption + data["parse_mode"] = (opts.parse_mode or self._config.parse_mode).value + + return self._request("sendPhoto", data) + + def send_document( + self, + document: str, + caption: Optional[str] = None, + chat_id: Optional[str] = None, + options: Optional[MessageOptions] = None, + ) -> SendResult: + """ + Send a document. + + Args: + document: Document URL or file_id + caption: Optional caption + chat_id: Override default chat ID + options: Message options + """ + opts = options or MessageOptions() + + data: Dict[str, Any] = { + "chat_id": chat_id or self._config.chat_id, + "document": document, + } + + if caption: + data["caption"] = caption + data["parse_mode"] = (opts.parse_mode or self._config.parse_mode).value + + return self._request("sendDocument", data) + + def edit_message( + self, + message_id: int, + text: str, + chat_id: Optional[str] = None, + options: Optional[MessageOptions] = None, + reply_markup: Optional[ReplyMarkup] = None, + ) -> SendResult: + """ + Edit an existing message. + + Args: + message_id: ID of message to edit + text: New message text + chat_id: Override default chat ID + options: Message options + reply_markup: Optional inline keyboard + """ + opts = options or MessageOptions() + + data: Dict[str, Any] = { + "chat_id": chat_id or self._config.chat_id, + "message_id": message_id, + "text": text, + "parse_mode": (opts.parse_mode or self._config.parse_mode).value, + } + + if reply_markup: + if isinstance(reply_markup, InlineKeyboardMarkup): + data["reply_markup"] = reply_markup.to_dict() + + return self._request("editMessageText", data) + + def delete_message( + self, + message_id: int, + chat_id: Optional[str] = None, + ) -> SendResult: + """ + Delete a message. + + Args: + message_id: ID of message to delete + chat_id: Override default chat ID + """ + data = { + "chat_id": chat_id or self._config.chat_id, + "message_id": message_id, + } + + return self._request("deleteMessage", data) + + def get_me(self) -> Optional[Dict[str, Any]]: + """Get bot information""" + result = self._request("getMe", {}) + return result.raw if result.success else None + + def send_chat_action( + self, + action: str = "typing", + chat_id: Optional[str] = None, + ) -> SendResult: + """ + Send chat action (typing indicator, etc.) + + Args: + action: Action type (typing, upload_photo, upload_document, etc.) + chat_id: Override default chat ID + """ + data = { + "chat_id": chat_id or self._config.chat_id, + "action": action, + } + + return self._request("sendChatAction", data) + + def send_batch( + self, + messages: List[str], + chat_id: Optional[str] = None, + options: Optional[MessageOptions] = None, + delay_ms: int = 0, + ) -> List[SendResult]: + """ + Send multiple messages. + + Args: + messages: List of message texts + chat_id: Override default chat ID + options: Message options + delay_ms: Delay between messages in milliseconds (0 = no delay) + + Returns: + List of SendResult for each message + """ + import time + + results: List[SendResult] = [] + + for i, text in enumerate(messages): + result = self.send(text, chat_id=chat_id, options=options) + results.append(result) + + if delay_ms > 0 and i < len(messages) - 1: + time.sleep(delay_ms / 1000) + + return results diff --git a/dr_manhattan/utils/telegram/formatters.py b/dr_manhattan/utils/telegram/formatters.py new file mode 100644 index 0000000..60beb23 --- /dev/null +++ b/dr_manhattan/utils/telegram/formatters.py @@ -0,0 +1,308 @@ +""" +Message formatting utilities for Telegram. + +Provides HTML and Markdown formatting helpers for building messages. +""" + +import html +from dataclasses import dataclass +from typing import Any, Dict, List, Optional, Sequence, Tuple, Union + + +def escape_html(text: str) -> str: + """Escape HTML special characters""" + return html.escape(str(text)) + + +def bold(text: str) -> str: + """Format text as bold (HTML)""" + return f"{escape_html(text)}" + + +def italic(text: str) -> str: + """Format text as italic (HTML)""" + return f"{escape_html(text)}" + + +def code(text: str) -> str: + """Format text as inline code (HTML)""" + return f"{escape_html(text)}" + + +def pre(text: str, language: Optional[str] = None) -> str: + """Format text as code block (HTML)""" + if language: + return f'
{escape_html(text)}
' + return f"
{escape_html(text)}
" + + +def link(text: str, url: str) -> str: + """Format text as hyperlink (HTML)""" + return f'{escape_html(text)}' + + +def mention(text: str, user_id: int) -> str: + """Format text as user mention (HTML)""" + return f'{escape_html(text)}' + + +def strikethrough(text: str) -> str: + """Format text as strikethrough (HTML)""" + return f"{escape_html(text)}" + + +def underline(text: str) -> str: + """Format text as underline (HTML)""" + return f"{escape_html(text)}" + + +def spoiler(text: str) -> str: + """Format text as spoiler (HTML)""" + return f'{escape_html(text)}' + + +def blockquote(text: str) -> str: + """Format text as blockquote (HTML)""" + return f"
{escape_html(text)}
" + + +@dataclass +class TableRow: + """Table row data""" + + cells: List[str] + bold_first: bool = False + + +def table( + rows: Sequence[Union[Tuple[str, ...], List[str], TableRow]], + header: Optional[Sequence[str]] = None, + separator: str = " | ", +) -> str: + """ + Format data as a simple text table. + + Args: + rows: List of rows (tuples/lists of cell values) + header: Optional header row + separator: Column separator + + Returns: + Formatted table as monospace text + """ + lines: List[str] = [] + + if header: + lines.append(separator.join(bold(h) for h in header)) + lines.append("-" * 20) + + for row in rows: + if isinstance(row, TableRow): + cells = row.cells + if row.bold_first and cells: + cells = [bold(cells[0])] + [code(c) for c in cells[1:]] + else: + cells = [code(c) for c in cells] + else: + cells = [code(str(c)) for c in row] + lines.append(separator.join(cells)) + + return "\n".join(lines) + + +def key_value( + data: Dict[str, Any], + separator: str = ": ", + bold_keys: bool = True, +) -> str: + """ + Format key-value pairs. + + Args: + data: Dictionary of key-value pairs + separator: Separator between key and value + bold_keys: Whether to bold the keys + + Returns: + Formatted key-value pairs + """ + lines: List[str] = [] + + for key, value in data.items(): + key_str = bold(key) if bold_keys else escape_html(key) + value_str = code(str(value)) + lines.append(f"{key_str}{separator}{value_str}") + + return "\n".join(lines) + + +def bullet_list(items: Sequence[str], bullet: str = "-") -> str: + """ + Format items as a bullet list. + + Args: + items: List of items + bullet: Bullet character + + Returns: + Formatted bullet list + """ + return "\n".join(f"{bullet} {escape_html(item)}" for item in items) + + +def numbered_list(items: Sequence[str], start: int = 1) -> str: + """ + Format items as a numbered list. + + Args: + items: List of items + start: Starting number + + Returns: + Formatted numbered list + """ + return "\n".join(f"{i}. {escape_html(item)}" for i, item in enumerate(items, start)) + + +def progress_bar( + current: float, + total: float, + width: int = 10, + filled: str = "█", + empty: str = "░", +) -> str: + """ + Create a text progress bar. + + Args: + current: Current value + total: Total value + width: Bar width in characters + filled: Character for filled portion + empty: Character for empty portion + + Returns: + Progress bar string + """ + if total <= 0: + ratio = 0.0 + else: + ratio = min(1.0, max(0.0, current / total)) + + filled_width = int(ratio * width) + empty_width = width - filled_width + + bar = filled * filled_width + empty * empty_width + percentage = ratio * 100 + + return f"{bar} {percentage:.1f}%" + + +class MessageBuilder: + """ + Fluent message builder for constructing formatted messages. + + Example: + msg = (MessageBuilder() + .title("Alert") + .field("Status", "OK") + .field("Count", 42) + .newline() + .text("Details here") + .build()) + """ + + def __init__(self) -> None: + self._parts: List[str] = [] + + def text(self, text: str, escape: bool = True) -> "MessageBuilder": + """Add plain text""" + self._parts.append(escape_html(text) if escape else text) + return self + + def raw(self, text: str) -> "MessageBuilder": + """Add raw HTML (no escaping)""" + self._parts.append(text) + return self + + def title(self, text: str) -> "MessageBuilder": + """Add a bold title""" + self._parts.append(bold(text)) + return self + + def subtitle(self, text: str) -> "MessageBuilder": + """Add an italic subtitle""" + self._parts.append(italic(text)) + return self + + def field(self, key: str, value: Any, inline: bool = False) -> "MessageBuilder": + """Add a key-value field""" + separator = ": " if inline else "\n" + if not inline: + self._parts.append(f"{bold(key)}: {code(str(value))}") + else: + self._parts.append(f"{bold(key)}: {code(str(value))}") + return self + + def fields(self, data: Dict[str, Any]) -> "MessageBuilder": + """Add multiple key-value fields""" + for key, value in data.items(): + self.field(key, value) + self.newline() + return self + + def code_block(self, text: str, language: Optional[str] = None) -> "MessageBuilder": + """Add a code block""" + self._parts.append(pre(text, language)) + return self + + def inline_code(self, text: str) -> "MessageBuilder": + """Add inline code""" + self._parts.append(code(text)) + return self + + def link_text(self, text: str, url: str) -> "MessageBuilder": + """Add a hyperlink""" + self._parts.append(link(text, url)) + return self + + def newline(self, count: int = 1) -> "MessageBuilder": + """Add newlines""" + self._parts.append("\n" * count) + return self + + def separator(self, char: str = "-", width: int = 20) -> "MessageBuilder": + """Add a separator line""" + self._parts.append(char * width) + return self + + def bullet(self, items: Sequence[str]) -> "MessageBuilder": + """Add a bullet list""" + self._parts.append(bullet_list(items)) + return self + + def numbered(self, items: Sequence[str], start: int = 1) -> "MessageBuilder": + """Add a numbered list""" + self._parts.append(numbered_list(items, start)) + return self + + def progress( + self, + current: float, + total: float, + label: Optional[str] = None, + ) -> "MessageBuilder": + """Add a progress bar""" + bar = progress_bar(current, total) + if label: + self._parts.append(f"{escape_html(label)}: {bar}") + else: + self._parts.append(bar) + return self + + def build(self) -> str: + """Build the final message""" + return "".join(self._parts) + + def __str__(self) -> str: + return self.build() diff --git a/dr_manhattan/utils/telegram/types.py b/dr_manhattan/utils/telegram/types.py new file mode 100644 index 0000000..e1b2db6 --- /dev/null +++ b/dr_manhattan/utils/telegram/types.py @@ -0,0 +1,168 @@ +""" +Type definitions for Telegram bot integration. +""" + +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Callable, Dict, List, Optional, TypeVar, Union + +T = TypeVar("T") + +Callback = Callable[[Dict[str, Any]], None] + + +class ParseMode(str, Enum): + """Telegram message parse modes""" + + HTML = "HTML" + MARKDOWN = "Markdown" + MARKDOWN_V2 = "MarkdownV2" + + +class ChatType(str, Enum): + """Telegram chat types""" + + PRIVATE = "private" + GROUP = "group" + SUPERGROUP = "supergroup" + CHANNEL = "channel" + + +@dataclass(frozen=True) +class TelegramConfig: + """Configuration for Telegram bot""" + + bot_token: str + chat_id: str + parse_mode: ParseMode = ParseMode.HTML + disable_notification: bool = False + disable_web_page_preview: bool = True + timeout: int = 10 + + def __post_init__(self) -> None: + if not self.bot_token: + raise ValueError("bot_token is required") + if not self.chat_id: + raise ValueError("chat_id is required") + + +@dataclass(frozen=True) +class MessageOptions: + """Options for sending a message""" + + parse_mode: Optional[ParseMode] = None + disable_notification: Optional[bool] = None + disable_web_page_preview: Optional[bool] = None + reply_to_message_id: Optional[int] = None + protect_content: bool = False + + +@dataclass(frozen=True) +class SendResult: + """Result of sending a message""" + + success: bool + message_id: Optional[int] = None + error: Optional[str] = None + raw: Optional[Dict[str, Any]] = None + + +@dataclass(frozen=True) +class User: + """Telegram user""" + + id: int + is_bot: bool + first_name: str + last_name: Optional[str] = None + username: Optional[str] = None + language_code: Optional[str] = None + + +@dataclass(frozen=True) +class Chat: + """Telegram chat""" + + id: int + type: ChatType + title: Optional[str] = None + username: Optional[str] = None + first_name: Optional[str] = None + last_name: Optional[str] = None + + +@dataclass(frozen=True) +class Message: + """Telegram message""" + + message_id: int + date: int + chat: Chat + from_user: Optional[User] = None + text: Optional[str] = None + raw: Dict[str, Any] = field(default_factory=dict) + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "Message": + """Parse message from API response""" + chat_data = data.get("chat", {}) + chat = Chat( + id=chat_data.get("id", 0), + type=ChatType(chat_data.get("type", "private")), + title=chat_data.get("title"), + username=chat_data.get("username"), + first_name=chat_data.get("first_name"), + last_name=chat_data.get("last_name"), + ) + + from_data = data.get("from") + from_user = None + if from_data: + from_user = User( + id=from_data.get("id", 0), + is_bot=from_data.get("is_bot", False), + first_name=from_data.get("first_name", ""), + last_name=from_data.get("last_name"), + username=from_data.get("username"), + language_code=from_data.get("language_code"), + ) + + return cls( + message_id=data.get("message_id", 0), + date=data.get("date", 0), + chat=chat, + from_user=from_user, + text=data.get("text"), + raw=data, + ) + + +@dataclass +class InlineKeyboardButton: + """Inline keyboard button""" + + text: str + url: Optional[str] = None + callback_data: Optional[str] = None + + +@dataclass +class InlineKeyboardMarkup: + """Inline keyboard markup""" + + inline_keyboard: List[List[InlineKeyboardButton]] = field(default_factory=list) + + def to_dict(self) -> Dict[str, Any]: + """Convert to API format""" + return { + "inline_keyboard": [ + [ + {k: v for k, v in {"text": btn.text, "url": btn.url, "callback_data": btn.callback_data}.items() if v is not None} + for btn in row + ] + for row in self.inline_keyboard + ] + } + + +ReplyMarkup = Union[InlineKeyboardMarkup, None] diff --git a/examples/copytrading.py b/examples/copytrading.py index 0758bf2..706ccbf 100644 --- a/examples/copytrading.py +++ b/examples/copytrading.py @@ -26,6 +26,7 @@ from dr_manhattan.models.order import OrderSide from dr_manhattan.utils import TelegramBot, setup_logger from dr_manhattan.utils.logger import Colors +from dr_manhattan.utils.telegram import MessageBuilder, bold, code logger = setup_logger(__name__) @@ -159,16 +160,58 @@ def _get_token_id(self, market: Market, outcome: str) -> Optional[str]: return None + def _notify_trade( + self, + side: str, + size: float, + outcome: str, + price: float, + market: str, + is_copy: bool, + ) -> None: + """Send trade notification via Telegram""" + if not self.telegram: + return + + emoji = "🟢" if side.upper() == "BUY" else "🔴" + action = "Copied" if is_copy else "Detected" + + msg = ( + MessageBuilder() + .title(f"{emoji} Trade {action}") + .newline() + .field("Side", side.upper()) + .newline() + .field("Size", f"{size:.2f}") + .newline() + .field("Outcome", outcome) + .newline() + .field("Price", f"{price:.4f}") + ) + + if market: + msg.newline().field("Market", market) + + self.telegram.send(msg.build()) + + def _notify_error(self, error: str, context: str = "") -> None: + """Send error notification via Telegram""" + if not self.telegram: + return + + msg = MessageBuilder().title("⚠️ Error").newline().raw(code(error)) + + if context: + msg.newline().field("Context", context) + + self.telegram.send(msg.build()) + def _execute_copy_trade(self, trade: PublicTrade) -> bool: """Execute a copy of the target's trade""" market = self._get_market(trade) if not market: logger.error(f"Cannot find market for trade: {trade.condition_id}") - if self.telegram: - self.telegram.send_error( - f"Cannot find market: {trade.condition_id}", - context="execute_copy_trade", - ) + self._notify_error(f"Cannot find market: {trade.condition_id}", "execute_copy_trade") return False outcome = trade.outcome @@ -209,22 +252,20 @@ def _execute_copy_trade(self, trade: PublicTrade) -> bool: f"[{Colors.gray(order.id[:8] + '...')}]" ) - if self.telegram: - self.telegram.send_trade_notification( - side=side.value, - size=size, - outcome=outcome, - price=price, - market=trade.slug or trade.event_slug or "", - is_copy=True, - ) + self._notify_trade( + side=side.value, + size=size, + outcome=outcome, + price=price, + market=trade.slug or trade.event_slug or "", + is_copy=True, + ) return True except Exception as e: logger.error(f"Failed to execute copy trade: {e}") - if self.telegram: - self.telegram.send_error(str(e), context="execute_copy_trade") + self._notify_error(str(e), "execute_copy_trade") return False def _poll_trades(self) -> List[PublicTrade]: @@ -246,7 +287,7 @@ def _poll_trades(self) -> List[PublicTrade]: logger.warning(f"Failed to fetch trades: {e}") return [] - def _process_trades(self, trades: List[PublicTrade]): + def _process_trades(self, trades: List[PublicTrade]) -> None: """Process new trades from target wallet""" for trade in trades: self.stats.trades_detected += 1 @@ -265,15 +306,14 @@ def _process_trades(self, trades: List[PublicTrade]): f"@ {Colors.yellow(f'{trade.price:.4f}')} [{Colors.gray(trade.slug or '')}]" ) - if self.telegram: - self.telegram.send_trade_notification( - side=side_str, - size=trade.size, - outcome=outcome_str, - price=trade.price, - market=trade.slug or trade.event_slug or "", - is_copy=False, - ) + self._notify_trade( + side=side_str, + size=trade.size, + outcome=outcome_str, + price=trade.price, + market=trade.slug or trade.event_slug or "", + is_copy=False, + ) if self._execute_copy_trade(trade): self.copied_trades.add(trade_id) @@ -287,7 +327,7 @@ def _get_uptime_str(self) -> str: elapsed = (datetime.now(timezone.utc) - self.stats.start_time).total_seconds() return f"{int(elapsed // 60)}m {int(elapsed % 60)}s" - def log_status(self): + def log_status(self) -> None: """Log current status""" uptime_str = self._get_uptime_str() @@ -302,7 +342,7 @@ def log_status(self): f"Uptime: {Colors.gray(uptime_str)}" ) - def run(self, duration_minutes: Optional[int] = None): + def run(self, duration_minutes: Optional[int] = None) -> None: """Run the copytrading bot""" logger.info(f"\n{Colors.bold('Copytrading Bot Started')}") logger.info(f"Target: {Colors.cyan(self.target_wallet)}") @@ -329,11 +369,19 @@ def run(self, duration_minutes: Optional[int] = None): logger.warning(f"Failed to fetch balance: {e}") if self.telegram: - self.telegram.send_startup( - target_wallet=self.target_wallet, - scale_factor=self.scale_factor, - balance=usdc, + wallet_short = f"{self.target_wallet[:8]}...{self.target_wallet[-6:]}" + msg = ( + MessageBuilder() + .title("🚀 Copytrading Bot Started") + .newline() + .field("Target", wallet_short) + .newline() + .field("Scale", f"{self.scale_factor}x") + .newline() + .field("Balance", f"${usdc:,.2f}") + .build() ) + self.telegram.send(msg) logger.info(f"\n{Colors.gray('Waiting for trades...')}") @@ -361,7 +409,7 @@ def run(self, duration_minutes: Optional[int] = None): self.is_running = False self._log_summary() - def _log_summary(self): + def _log_summary(self) -> None: """Log final summary""" duration_str = self._get_uptime_str() @@ -374,14 +422,22 @@ def _log_summary(self): logger.info(f"Total Volume: {Colors.yellow(f'${self.stats.total_volume:.2f}')}") if self.telegram: - self.telegram.send_shutdown({ - "copied": self.stats.trades_copied, - "failed": self.stats.trades_failed, - "volume": self.stats.total_volume, - "duration": duration_str, - }) - - def stop(self): + msg = ( + MessageBuilder() + .title("🛑 Copytrading Bot Stopped") + .newline() + .field("Trades Copied", str(self.stats.trades_copied)) + .newline() + .field("Trades Failed", str(self.stats.trades_failed)) + .newline() + .field("Total Volume", f"${self.stats.total_volume:.2f}") + .newline() + .field("Duration", duration_str) + .build() + ) + self.telegram.send(msg) + + def stop(self) -> None: """Stop the bot""" self.is_running = False From fbee4b5e6a738a823637a120afa0e06b128f1359 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 18 Jan 2026 12:46:06 +0000 Subject: [PATCH 4/5] refactor(examples): modular copytrading folder structure Refactor copytrading bot into a modular folder structure: - copytrading/types.py: Type definitions (BotConfig, CopyStats, TradeInfo) - copytrading/notifications.py: Pluggable notification system - copytrading/bot.py: Core CopytradingBot class - copytrading/cli.py: Command-line interface - copytrading/__main__.py: Module entry point Features: - Clean separation of concerns - Pluggable notification handlers (Telegram, Null) - Type-safe configuration with validation - Programmatic and CLI usage supported --- examples/copytrading.py | 563 -------------------------- examples/copytrading/__init__.py | 44 ++ examples/copytrading/__main__.py | 13 + examples/copytrading/bot.py | 360 ++++++++++++++++ examples/copytrading/cli.py | 167 ++++++++ examples/copytrading/notifications.py | 181 +++++++++ examples/copytrading/types.py | 85 ++++ 7 files changed, 850 insertions(+), 563 deletions(-) delete mode 100644 examples/copytrading.py create mode 100644 examples/copytrading/__init__.py create mode 100644 examples/copytrading/__main__.py create mode 100644 examples/copytrading/bot.py create mode 100644 examples/copytrading/cli.py create mode 100644 examples/copytrading/notifications.py create mode 100644 examples/copytrading/types.py diff --git a/examples/copytrading.py b/examples/copytrading.py deleted file mode 100644 index 706ccbf..0000000 --- a/examples/copytrading.py +++ /dev/null @@ -1,563 +0,0 @@ -""" -Polymarket Copytrading Bot - -Monitors a target wallet's trades and mirrors them on your account. -Uses dr-manhattan's unified API for Polymarket trading. - -Usage: - uv run python examples/copytrading.py --target - uv run python examples/copytrading.py --target --scale 0.5 - uv run python examples/copytrading.py --target --telegram -""" - -import argparse -import os -import sys -import time -from dataclasses import dataclass, field -from datetime import datetime, timezone -from typing import Dict, List, Optional, Set - -from dotenv import load_dotenv - -from dr_manhattan import Polymarket -from dr_manhattan.exchanges.polymarket import PublicTrade -from dr_manhattan.models import Market -from dr_manhattan.models.order import OrderSide -from dr_manhattan.utils import TelegramBot, setup_logger -from dr_manhattan.utils.logger import Colors -from dr_manhattan.utils.telegram import MessageBuilder, bold, code - -logger = setup_logger(__name__) - - -@dataclass -class CopyStats: - """Statistics for copytrading session""" - - trades_detected: int = 0 - trades_copied: int = 0 - trades_skipped: int = 0 - trades_failed: int = 0 - total_volume: float = 0.0 - start_time: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) - - -class CopytradingBot: - """ - Copytrading bot that monitors a target wallet and mirrors trades. - - Features: - - Polls target wallet trades via Polymarket Data API - - Mirrors trades with configurable size scaling - - Tracks copied trades to avoid duplicates - - Supports market filtering - - Telegram notifications for trades and status - """ - - def __init__( - self, - exchange: Polymarket, - target_wallet: str, - scale_factor: float = 1.0, - poll_interval: float = 5.0, - max_position: float = 100.0, - min_trade_size: float = 1.0, - market_filter: Optional[List[str]] = None, - telegram: Optional[TelegramBot] = None, - ): - """ - Initialize copytrading bot. - - Args: - exchange: Authenticated Polymarket exchange - target_wallet: Target wallet address to copy - scale_factor: Multiply target trade sizes by this factor - poll_interval: Seconds between polling for new trades - max_position: Maximum position size per outcome - min_trade_size: Minimum trade size to copy - market_filter: List of market slugs/IDs to filter (None = all) - telegram: Optional TelegramBot for notifications - """ - self.exchange = exchange - self.target_wallet = target_wallet - self.scale_factor = scale_factor - self.poll_interval = poll_interval - self.max_position = max_position - self.min_trade_size = min_trade_size - self.market_filter = market_filter - self.telegram = telegram - - self.is_running = False - self.copied_trades: Set[str] = set() - self.stats = CopyStats() - self.market_cache: Dict[str, Market] = {} - self.last_poll_time: Optional[datetime] = None - - def _get_trade_id(self, trade: PublicTrade) -> str: - """Generate unique ID for a trade""" - return f"{trade.transaction_hash}_{trade.outcome_index}" - - def _should_copy_trade(self, trade: PublicTrade) -> bool: - """Check if trade should be copied""" - trade_id = self._get_trade_id(trade) - - if trade_id in self.copied_trades: - return False - - if trade.size < self.min_trade_size: - logger.debug(f"Skipping small trade: {trade.size}") - return False - - if self.market_filter: - slug = trade.event_slug or trade.slug or "" - if not any(f.lower() in slug.lower() for f in self.market_filter): - return False - - return True - - def _get_market(self, trade: PublicTrade) -> Optional[Market]: - """Get market data for a trade""" - condition_id = trade.condition_id - if not condition_id: - return None - - if condition_id in self.market_cache: - return self.market_cache[condition_id] - - try: - slug = trade.event_slug or trade.slug - if slug: - markets = self.exchange.fetch_markets_by_slug(slug) - for market in markets: - self.market_cache[market.id] = market - if market.id == condition_id: - return market - - market = self.exchange.fetch_market(condition_id) - self.market_cache[condition_id] = market - return market - except Exception as e: - logger.warning(f"Failed to fetch market {condition_id}: {e}") - return None - - def _get_token_id(self, market: Market, outcome: str) -> Optional[str]: - """Get token ID for an outcome""" - token_ids = market.metadata.get("clobTokenIds", []) - outcomes = market.outcomes - - if not token_ids or len(token_ids) != len(outcomes): - try: - token_ids = self.exchange.fetch_token_ids(market.id) - market.metadata["clobTokenIds"] = token_ids - except Exception as e: - logger.warning(f"Failed to fetch token IDs: {e}") - return None - - for i, out in enumerate(outcomes): - if out.lower() == outcome.lower(): - return token_ids[i] if i < len(token_ids) else None - - return None - - def _notify_trade( - self, - side: str, - size: float, - outcome: str, - price: float, - market: str, - is_copy: bool, - ) -> None: - """Send trade notification via Telegram""" - if not self.telegram: - return - - emoji = "🟢" if side.upper() == "BUY" else "🔴" - action = "Copied" if is_copy else "Detected" - - msg = ( - MessageBuilder() - .title(f"{emoji} Trade {action}") - .newline() - .field("Side", side.upper()) - .newline() - .field("Size", f"{size:.2f}") - .newline() - .field("Outcome", outcome) - .newline() - .field("Price", f"{price:.4f}") - ) - - if market: - msg.newline().field("Market", market) - - self.telegram.send(msg.build()) - - def _notify_error(self, error: str, context: str = "") -> None: - """Send error notification via Telegram""" - if not self.telegram: - return - - msg = MessageBuilder().title("⚠️ Error").newline().raw(code(error)) - - if context: - msg.newline().field("Context", context) - - self.telegram.send(msg.build()) - - def _execute_copy_trade(self, trade: PublicTrade) -> bool: - """Execute a copy of the target's trade""" - market = self._get_market(trade) - if not market: - logger.error(f"Cannot find market for trade: {trade.condition_id}") - self._notify_error(f"Cannot find market: {trade.condition_id}", "execute_copy_trade") - return False - - outcome = trade.outcome - if not outcome: - outcome = market.outcomes[trade.outcome_index] if trade.outcome_index is not None else None - - if not outcome: - logger.error("Cannot determine outcome for trade") - return False - - token_id = self._get_token_id(market, outcome) - if not token_id: - logger.error(f"Cannot find token ID for outcome: {outcome}") - return False - - side = OrderSide.BUY if trade.side.upper() == "BUY" else OrderSide.SELL - size = trade.size * self.scale_factor - price = trade.price - - if size > self.max_position: - size = self.max_position - logger.warning(f"Capped trade size to max_position: {self.max_position}") - - try: - order = self.exchange.create_order( - market_id=market.id, - outcome=outcome, - side=side, - price=price, - size=size, - params={"token_id": token_id}, - ) - - side_colored = Colors.green("BUY") if side == OrderSide.BUY else Colors.red("SELL") - logger.info( - f" {Colors.cyan('COPIED')} {side_colored} {size:.2f} " - f"{Colors.magenta(outcome[:20])} @ {Colors.yellow(f'{price:.4f}')} " - f"[{Colors.gray(order.id[:8] + '...')}]" - ) - - self._notify_trade( - side=side.value, - size=size, - outcome=outcome, - price=price, - market=trade.slug or trade.event_slug or "", - is_copy=True, - ) - - return True - - except Exception as e: - logger.error(f"Failed to execute copy trade: {e}") - self._notify_error(str(e), "execute_copy_trade") - return False - - def _poll_trades(self) -> List[PublicTrade]: - """Poll for new trades from target wallet""" - try: - trades = self.exchange.fetch_public_trades( - user=self.target_wallet, - limit=50, - taker_only=True, - ) - - if self.last_poll_time: - trades = [t for t in trades if t.timestamp > self.last_poll_time] - - self.last_poll_time = datetime.now(timezone.utc) - return trades - - except Exception as e: - logger.warning(f"Failed to fetch trades: {e}") - return [] - - def _process_trades(self, trades: List[PublicTrade]) -> None: - """Process new trades from target wallet""" - for trade in trades: - self.stats.trades_detected += 1 - - if not self._should_copy_trade(trade): - self.stats.trades_skipped += 1 - continue - - trade_id = self._get_trade_id(trade) - side_str = trade.side.upper() - outcome_str = trade.outcome or f"idx:{trade.outcome_index}" - - logger.info( - f"\n{Colors.bold('New Trade Detected:')} " - f"{Colors.cyan(side_str)} {trade.size:.2f} {Colors.magenta(outcome_str[:20])} " - f"@ {Colors.yellow(f'{trade.price:.4f}')} [{Colors.gray(trade.slug or '')}]" - ) - - self._notify_trade( - side=side_str, - size=trade.size, - outcome=outcome_str, - price=trade.price, - market=trade.slug or trade.event_slug or "", - is_copy=False, - ) - - if self._execute_copy_trade(trade): - self.copied_trades.add(trade_id) - self.stats.trades_copied += 1 - self.stats.total_volume += trade.size * self.scale_factor - else: - self.stats.trades_failed += 1 - - def _get_uptime_str(self) -> str: - """Get formatted uptime string""" - elapsed = (datetime.now(timezone.utc) - self.stats.start_time).total_seconds() - return f"{int(elapsed // 60)}m {int(elapsed % 60)}s" - - def log_status(self) -> None: - """Log current status""" - uptime_str = self._get_uptime_str() - - logger.info( - f"\n[{time.strftime('%H:%M:%S')}] " - f"{Colors.bold('Status:')} " - f"Detected: {Colors.cyan(str(self.stats.trades_detected))} | " - f"Copied: {Colors.green(str(self.stats.trades_copied))} | " - f"Skipped: {Colors.gray(str(self.stats.trades_skipped))} | " - f"Failed: {Colors.red(str(self.stats.trades_failed))} | " - f"Volume: {Colors.yellow(f'${self.stats.total_volume:.2f}')} | " - f"Uptime: {Colors.gray(uptime_str)}" - ) - - def run(self, duration_minutes: Optional[int] = None) -> None: - """Run the copytrading bot""" - logger.info(f"\n{Colors.bold('Copytrading Bot Started')}") - logger.info(f"Target: {Colors.cyan(self.target_wallet)}") - logger.info(f"Scale: {Colors.yellow(f'{self.scale_factor}x')}") - logger.info(f"Interval: {Colors.gray(f'{self.poll_interval}s')}") - logger.info(f"Max Position: {Colors.blue(f'{self.max_position}')}") - - if self.market_filter: - logger.info(f"Markets: {Colors.magenta(', '.join(self.market_filter))}") - - if self.telegram and self.telegram.enabled: - logger.info(f"Telegram: {Colors.green('Enabled')}") - - address = getattr(self.exchange, "_address", None) - if address: - logger.info(f"Bot Address: {Colors.cyan(address)}") - - usdc = 0.0 - try: - balance = self.exchange.fetch_balance() - usdc = balance.get("USDC", 0.0) - logger.info(f"Balance: {Colors.green(f'${usdc:,.2f}')} USDC") - except Exception as e: - logger.warning(f"Failed to fetch balance: {e}") - - if self.telegram: - wallet_short = f"{self.target_wallet[:8]}...{self.target_wallet[-6:]}" - msg = ( - MessageBuilder() - .title("🚀 Copytrading Bot Started") - .newline() - .field("Target", wallet_short) - .newline() - .field("Scale", f"{self.scale_factor}x") - .newline() - .field("Balance", f"${usdc:,.2f}") - .build() - ) - self.telegram.send(msg) - - logger.info(f"\n{Colors.gray('Waiting for trades...')}") - - self.is_running = True - self.stats = CopyStats() - start_time = time.time() - end_time = start_time + (duration_minutes * 60) if duration_minutes else None - - try: - while self.is_running: - if end_time and time.time() >= end_time: - break - - trades = self._poll_trades() - if trades: - self._process_trades(trades) - - self.log_status() - time.sleep(self.poll_interval) - - except KeyboardInterrupt: - logger.info("\nStopping...") - - finally: - self.is_running = False - self._log_summary() - - def _log_summary(self) -> None: - """Log final summary""" - duration_str = self._get_uptime_str() - - logger.info(f"\n{Colors.bold('Session Summary')}") - logger.info(f"Duration: {duration_str}") - logger.info(f"Trades Detected: {self.stats.trades_detected}") - logger.info(f"Trades Copied: {Colors.green(str(self.stats.trades_copied))}") - logger.info(f"Trades Skipped: {self.stats.trades_skipped}") - logger.info(f"Trades Failed: {Colors.red(str(self.stats.trades_failed))}") - logger.info(f"Total Volume: {Colors.yellow(f'${self.stats.total_volume:.2f}')}") - - if self.telegram: - msg = ( - MessageBuilder() - .title("🛑 Copytrading Bot Stopped") - .newline() - .field("Trades Copied", str(self.stats.trades_copied)) - .newline() - .field("Trades Failed", str(self.stats.trades_failed)) - .newline() - .field("Total Volume", f"${self.stats.total_volume:.2f}") - .newline() - .field("Duration", duration_str) - .build() - ) - self.telegram.send(msg) - - def stop(self) -> None: - """Stop the bot""" - self.is_running = False - - -def parse_args() -> argparse.Namespace: - """Parse command line arguments""" - parser = argparse.ArgumentParser(description="Polymarket Copytrading Bot") - parser.add_argument( - "-t", - "--target", - required=True, - help="Target wallet address to copy trades from", - ) - parser.add_argument( - "-s", - "--scale", - type=float, - default=float(os.getenv("SCALE_FACTOR", "1.0")), - help="Scale factor for trade sizes (default: 1.0)", - ) - parser.add_argument( - "-i", - "--interval", - type=float, - default=float(os.getenv("POLL_INTERVAL", "5")), - help="Poll interval in seconds (default: 5)", - ) - parser.add_argument( - "--max-position", - type=float, - default=float(os.getenv("MAX_POSITION", "100")), - help="Maximum position size (default: 100)", - ) - parser.add_argument( - "--min-size", - type=float, - default=float(os.getenv("MIN_TRADE_SIZE", "1")), - help="Minimum trade size to copy (default: 1)", - ) - parser.add_argument( - "-m", - "--markets", - nargs="*", - default=None, - help="Filter to specific market slugs", - ) - parser.add_argument( - "-d", - "--duration", - type=int, - default=None, - help="Duration in minutes (default: indefinite)", - ) - parser.add_argument( - "--telegram", - action="store_true", - help="Enable Telegram notifications", - ) - parser.add_argument( - "--telegram-token", - default=os.getenv("TELEGRAM_BOT_TOKEN"), - help="Telegram bot token (or set TELEGRAM_BOT_TOKEN)", - ) - parser.add_argument( - "--telegram-chat-id", - default=os.getenv("TELEGRAM_CHAT_ID"), - help="Telegram chat ID (or set TELEGRAM_CHAT_ID)", - ) - return parser.parse_args() - - -def main() -> int: - """Entry point""" - load_dotenv() - args = parse_args() - - private_key = os.getenv("POLYMARKET_PRIVATE_KEY") or os.getenv("PRIVATE_KEY") - if not private_key: - logger.error("POLYMARKET_PRIVATE_KEY or PRIVATE_KEY required in environment") - return 1 - - funder = os.getenv("POLYMARKET_FUNDER") or os.getenv("FUNDER") - - try: - exchange = Polymarket({ - "private_key": private_key, - "funder": funder, - "verbose": False, - }) - except Exception as e: - logger.error(f"Failed to initialize exchange: {e}") - return 1 - - telegram = None - if args.telegram: - if not args.telegram_token or not args.telegram_chat_id: - logger.error("Telegram enabled but TELEGRAM_BOT_TOKEN or TELEGRAM_CHAT_ID not set") - return 1 - telegram = TelegramBot( - token=args.telegram_token, - chat_id=args.telegram_chat_id, - ) - if not telegram.enabled: - logger.warning("Telegram bot not properly configured") - telegram = None - - bot = CopytradingBot( - exchange=exchange, - target_wallet=args.target, - scale_factor=args.scale, - poll_interval=args.interval, - max_position=args.max_position, - min_trade_size=args.min_size, - market_filter=args.markets, - telegram=telegram, - ) - - bot.run(duration_minutes=args.duration) - return 0 - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/examples/copytrading/__init__.py b/examples/copytrading/__init__.py new file mode 100644 index 0000000..998cd34 --- /dev/null +++ b/examples/copytrading/__init__.py @@ -0,0 +1,44 @@ +""" +Polymarket Copytrading Bot + +Monitors a target wallet's trades and mirrors them on your account. + +Usage: + uv run python -m examples.copytrading --target + uv run python -m examples.copytrading --target --scale 0.5 + uv run python -m examples.copytrading --target --telegram + +Programmatic Usage: + from examples.copytrading import CopytradingBot, BotConfig + from dr_manhattan import Polymarket + + exchange = Polymarket({"private_key": "..."}) + config = BotConfig(target_wallet="0x...") + + bot = CopytradingBot(exchange, config) + bot.run() +""" + +from .bot import CopytradingBot +from .notifications import ( + NotificationHandler, + NullNotifier, + TelegramNotifier, + create_notifier, +) +from .types import BotConfig, CopyStats, TradeAction, TradeInfo + +__all__ = [ + # Bot + "CopytradingBot", + # Types + "BotConfig", + "CopyStats", + "TradeAction", + "TradeInfo", + # Notifications + "NotificationHandler", + "TelegramNotifier", + "NullNotifier", + "create_notifier", +] diff --git a/examples/copytrading/__main__.py b/examples/copytrading/__main__.py new file mode 100644 index 0000000..40bab88 --- /dev/null +++ b/examples/copytrading/__main__.py @@ -0,0 +1,13 @@ +""" +Entry point for running the copytrading bot as a module. + +Usage: + uv run python -m examples.copytrading --target +""" + +import sys + +from .cli import main + +if __name__ == "__main__": + sys.exit(main()) diff --git a/examples/copytrading/bot.py b/examples/copytrading/bot.py new file mode 100644 index 0000000..1a54b7a --- /dev/null +++ b/examples/copytrading/bot.py @@ -0,0 +1,360 @@ +""" +Copytrading bot implementation. + +Monitors a target wallet's trades and mirrors them on your account. +""" + +import logging +import time +from datetime import datetime, timezone +from typing import Dict, List, Optional, Set + +from dr_manhattan import Polymarket +from dr_manhattan.exchanges.polymarket import PublicTrade +from dr_manhattan.models import Market +from dr_manhattan.models.order import OrderSide +from dr_manhattan.utils.logger import Colors + +from .notifications import NotificationHandler, NullNotifier +from .types import BotConfig, CopyStats, TradeInfo + +logger = logging.getLogger(__name__) + + +class CopytradingBot: + """ + Copytrading bot that monitors a target wallet and mirrors trades. + + Features: + - Polls target wallet trades via Polymarket Data API + - Mirrors trades with configurable size scaling + - Tracks copied trades to avoid duplicates + - Supports market filtering + - Pluggable notification system + """ + + def __init__( + self, + exchange: Polymarket, + config: BotConfig, + notifier: Optional[NotificationHandler] = None, + ) -> None: + """ + Initialize copytrading bot. + + Args: + exchange: Authenticated Polymarket exchange + config: Bot configuration + notifier: Optional notification handler + """ + self._exchange = exchange + self._config = config + self._notifier = notifier or NullNotifier() + + self._is_running = False + self._copied_trades: Set[str] = set() + self._stats = CopyStats() + self._market_cache: Dict[str, Market] = {} + self._last_poll_time: Optional[datetime] = None + + @property + def config(self) -> BotConfig: + """Get bot configuration""" + return self._config + + @property + def stats(self) -> CopyStats: + """Get current statistics""" + return self._stats + + @property + def is_running(self) -> bool: + """Check if bot is running""" + return self._is_running + + def _get_trade_id(self, trade: PublicTrade) -> str: + """Generate unique ID for a trade""" + return f"{trade.transaction_hash}_{trade.outcome_index}" + + def _create_trade_info(self, trade: PublicTrade) -> TradeInfo: + """Create TradeInfo from PublicTrade""" + return TradeInfo( + trade_id=self._get_trade_id(trade), + side=trade.side, + size=trade.size, + outcome=trade.outcome or f"idx:{trade.outcome_index}", + price=trade.price, + market_slug=trade.slug or trade.event_slug or "", + condition_id=trade.condition_id or "", + timestamp=trade.timestamp, + ) + + def _should_copy_trade(self, trade: PublicTrade) -> bool: + """Check if trade should be copied""" + trade_id = self._get_trade_id(trade) + + if trade_id in self._copied_trades: + return False + + if trade.size < self._config.min_trade_size: + logger.debug(f"Skipping small trade: {trade.size}") + return False + + if self._config.market_filter: + slug = trade.event_slug or trade.slug or "" + if not any(f.lower() in slug.lower() for f in self._config.market_filter): + return False + + return True + + def _get_market(self, trade: PublicTrade) -> Optional[Market]: + """Get market data for a trade""" + condition_id = trade.condition_id + if not condition_id: + return None + + if condition_id in self._market_cache: + return self._market_cache[condition_id] + + try: + slug = trade.event_slug or trade.slug + if slug: + markets = self._exchange.fetch_markets_by_slug(slug) + for market in markets: + self._market_cache[market.id] = market + if market.id == condition_id: + return market + + market = self._exchange.fetch_market(condition_id) + self._market_cache[condition_id] = market + return market + except Exception as e: + logger.warning(f"Failed to fetch market {condition_id}: {e}") + return None + + def _get_token_id(self, market: Market, outcome: str) -> Optional[str]: + """Get token ID for an outcome""" + token_ids = market.metadata.get("clobTokenIds", []) + outcomes = market.outcomes + + if not token_ids or len(token_ids) != len(outcomes): + try: + token_ids = self._exchange.fetch_token_ids(market.id) + market.metadata["clobTokenIds"] = token_ids + except Exception as e: + logger.warning(f"Failed to fetch token IDs: {e}") + return None + + for i, out in enumerate(outcomes): + if out.lower() == outcome.lower(): + return token_ids[i] if i < len(token_ids) else None + + return None + + def _execute_copy_trade(self, trade: PublicTrade, trade_info: TradeInfo) -> bool: + """Execute a copy of the target's trade""" + market = self._get_market(trade) + if not market: + logger.error(f"Cannot find market for trade: {trade.condition_id}") + self._notifier.notify_error( + f"Cannot find market: {trade.condition_id}", + "execute_copy_trade", + ) + return False + + outcome = trade.outcome + if not outcome: + outcome = market.outcomes[trade.outcome_index] if trade.outcome_index is not None else None + + if not outcome: + logger.error("Cannot determine outcome for trade") + return False + + token_id = self._get_token_id(market, outcome) + if not token_id: + logger.error(f"Cannot find token ID for outcome: {outcome}") + return False + + side = OrderSide.BUY if trade.side.upper() == "BUY" else OrderSide.SELL + size = trade.size * self._config.scale_factor + price = trade.price + + if size > self._config.max_position: + size = self._config.max_position + logger.warning(f"Capped trade size to max_position: {self._config.max_position}") + + try: + order = self._exchange.create_order( + market_id=market.id, + outcome=outcome, + side=side, + price=price, + size=size, + params={"token_id": token_id}, + ) + + side_colored = Colors.green("BUY") if side == OrderSide.BUY else Colors.red("SELL") + logger.info( + f" {Colors.cyan('COPIED')} {side_colored} {size:.2f} " + f"{Colors.magenta(outcome[:20])} @ {Colors.yellow(f'{price:.4f}')} " + f"[{Colors.gray(order.id[:8] + '...')}]" + ) + + self._notifier.notify_trade_copied(trade_info, size) + return True + + except Exception as e: + logger.error(f"Failed to execute copy trade: {e}") + self._notifier.notify_error(str(e), "execute_copy_trade") + return False + + def _poll_trades(self) -> List[PublicTrade]: + """Poll for new trades from target wallet""" + try: + trades = self._exchange.fetch_public_trades( + user=self._config.target_wallet, + limit=50, + taker_only=True, + ) + + if self._last_poll_time: + trades = [t for t in trades if t.timestamp > self._last_poll_time] + + self._last_poll_time = datetime.now(timezone.utc) + return trades + + except Exception as e: + logger.warning(f"Failed to fetch trades: {e}") + return [] + + def _process_trades(self, trades: List[PublicTrade]) -> None: + """Process new trades from target wallet""" + for trade in trades: + self._stats.trades_detected += 1 + trade_info = self._create_trade_info(trade) + + if not self._should_copy_trade(trade): + self._stats.trades_skipped += 1 + continue + + logger.info( + f"\n{Colors.bold('New Trade Detected:')} " + f"{Colors.cyan(trade_info.side_upper)} {trade.size:.2f} " + f"{Colors.magenta(trade_info.outcome[:20])} " + f"@ {Colors.yellow(f'{trade.price:.4f}')} " + f"[{Colors.gray(trade_info.market_slug or '')}]" + ) + + self._notifier.notify_trade_detected(trade_info) + + if self._execute_copy_trade(trade, trade_info): + self._copied_trades.add(trade_info.trade_id) + self._stats.trades_copied += 1 + self._stats.total_volume += trade.size * self._config.scale_factor + else: + self._stats.trades_failed += 1 + + def _get_uptime_str(self) -> str: + """Get formatted uptime string""" + elapsed = (datetime.now(timezone.utc) - self._stats.start_time).total_seconds() + return f"{int(elapsed // 60)}m {int(elapsed % 60)}s" + + def _log_status(self) -> None: + """Log current status""" + uptime_str = self._get_uptime_str() + + logger.info( + f"\n[{time.strftime('%H:%M:%S')}] " + f"{Colors.bold('Status:')} " + f"Detected: {Colors.cyan(str(self._stats.trades_detected))} | " + f"Copied: {Colors.green(str(self._stats.trades_copied))} | " + f"Skipped: {Colors.gray(str(self._stats.trades_skipped))} | " + f"Failed: {Colors.red(str(self._stats.trades_failed))} | " + f"Volume: {Colors.yellow(f'${self._stats.total_volume:.2f}')} | " + f"Uptime: {Colors.gray(uptime_str)}" + ) + + def _log_startup(self, balance: float) -> None: + """Log startup information""" + logger.info(f"\n{Colors.bold('Copytrading Bot Started')}") + logger.info(f"Target: {Colors.cyan(self._config.target_wallet)}") + logger.info(f"Scale: {Colors.yellow(f'{self._config.scale_factor}x')}") + logger.info(f"Interval: {Colors.gray(f'{self._config.poll_interval}s')}") + logger.info(f"Max Position: {Colors.blue(f'{self._config.max_position}')}") + + if self._config.market_filter: + logger.info(f"Markets: {Colors.magenta(', '.join(self._config.market_filter))}") + + if hasattr(self._notifier, "enabled") and self._notifier.enabled: + logger.info(f"Telegram: {Colors.green('Enabled')}") + + address = getattr(self._exchange, "_address", None) + if address: + logger.info(f"Bot Address: {Colors.cyan(address)}") + + logger.info(f"Balance: {Colors.green(f'${balance:,.2f}')} USDC") + + def _log_summary(self) -> None: + """Log final summary""" + duration_str = self._get_uptime_str() + + logger.info(f"\n{Colors.bold('Session Summary')}") + logger.info(f"Duration: {duration_str}") + logger.info(f"Trades Detected: {self._stats.trades_detected}") + logger.info(f"Trades Copied: {Colors.green(str(self._stats.trades_copied))}") + logger.info(f"Trades Skipped: {self._stats.trades_skipped}") + logger.info(f"Trades Failed: {Colors.red(str(self._stats.trades_failed))}") + logger.info(f"Total Volume: {Colors.yellow(f'${self._stats.total_volume:.2f}')}") + + self._notifier.notify_shutdown(self._stats, duration_str) + + def run(self, duration_minutes: Optional[int] = None) -> None: + """ + Run the copytrading bot. + + Args: + duration_minutes: Optional duration limit in minutes + """ + usdc = 0.0 + try: + balance = self._exchange.fetch_balance() + usdc = balance.get("USDC", 0.0) + except Exception as e: + logger.warning(f"Failed to fetch balance: {e}") + + self._log_startup(usdc) + self._notifier.notify_startup( + self._config.target_wallet, + self._config.scale_factor, + usdc, + ) + + logger.info(f"\n{Colors.gray('Waiting for trades...')}") + + self._is_running = True + self._stats = CopyStats() + start_time = time.time() + end_time = start_time + (duration_minutes * 60) if duration_minutes else None + + try: + while self._is_running: + if end_time and time.time() >= end_time: + break + + trades = self._poll_trades() + if trades: + self._process_trades(trades) + + self._log_status() + time.sleep(self._config.poll_interval) + + except KeyboardInterrupt: + logger.info("\nStopping...") + + finally: + self._is_running = False + self._log_summary() + + def stop(self) -> None: + """Stop the bot""" + self._is_running = False diff --git a/examples/copytrading/cli.py b/examples/copytrading/cli.py new file mode 100644 index 0000000..577ab34 --- /dev/null +++ b/examples/copytrading/cli.py @@ -0,0 +1,167 @@ +""" +Command-line interface for the copytrading bot. +""" + +import argparse +import os +import sys +from typing import Optional + +from dotenv import load_dotenv + +from dr_manhattan import Polymarket +from dr_manhattan.utils import setup_logger + +from .bot import CopytradingBot +from .notifications import create_notifier +from .types import BotConfig + +logger = setup_logger(__name__) + + +def parse_args() -> argparse.Namespace: + """Parse command line arguments""" + parser = argparse.ArgumentParser( + description="Polymarket Copytrading Bot", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + %(prog)s --target 0x123...abc + %(prog)s --target 0x123...abc --scale 0.5 + %(prog)s --target 0x123...abc --telegram --markets fed-decision + +Environment Variables: + POLYMARKET_PRIVATE_KEY Your wallet private key + POLYMARKET_FUNDER Proxy wallet funder address (optional) + TELEGRAM_BOT_TOKEN Telegram bot token (optional) + TELEGRAM_CHAT_ID Telegram chat ID (optional) + """, + ) + + parser.add_argument( + "-t", + "--target", + required=True, + help="Target wallet address to copy trades from", + ) + parser.add_argument( + "-s", + "--scale", + type=float, + default=float(os.getenv("SCALE_FACTOR", "1.0")), + help="Scale factor for trade sizes (default: 1.0)", + ) + parser.add_argument( + "-i", + "--interval", + type=float, + default=float(os.getenv("POLL_INTERVAL", "5")), + help="Poll interval in seconds (default: 5)", + ) + parser.add_argument( + "--max-position", + type=float, + default=float(os.getenv("MAX_POSITION", "100")), + help="Maximum position size (default: 100)", + ) + parser.add_argument( + "--min-size", + type=float, + default=float(os.getenv("MIN_TRADE_SIZE", "1")), + help="Minimum trade size to copy (default: 1)", + ) + parser.add_argument( + "-m", + "--markets", + nargs="*", + default=None, + help="Filter to specific market slugs", + ) + parser.add_argument( + "-d", + "--duration", + type=int, + default=None, + help="Duration in minutes (default: indefinite)", + ) + parser.add_argument( + "--telegram", + action="store_true", + help="Enable Telegram notifications", + ) + parser.add_argument( + "--telegram-token", + default=os.getenv("TELEGRAM_BOT_TOKEN"), + help="Telegram bot token (or set TELEGRAM_BOT_TOKEN)", + ) + parser.add_argument( + "--telegram-chat-id", + default=os.getenv("TELEGRAM_CHAT_ID"), + help="Telegram chat ID (or set TELEGRAM_CHAT_ID)", + ) + + return parser.parse_args() + + +def create_exchange() -> Optional[Polymarket]: + """Create and authenticate Polymarket exchange""" + private_key = os.getenv("POLYMARKET_PRIVATE_KEY") or os.getenv("PRIVATE_KEY") + if not private_key: + logger.error("POLYMARKET_PRIVATE_KEY or PRIVATE_KEY required in environment") + return None + + funder = os.getenv("POLYMARKET_FUNDER") or os.getenv("FUNDER") + + try: + return Polymarket({ + "private_key": private_key, + "funder": funder, + "verbose": False, + }) + except Exception as e: + logger.error(f"Failed to initialize exchange: {e}") + return None + + +def main() -> int: + """Entry point""" + load_dotenv() + args = parse_args() + + exchange = create_exchange() + if not exchange: + return 1 + + telegram_token = None + telegram_chat_id = None + + if args.telegram: + if not args.telegram_token or not args.telegram_chat_id: + logger.error("Telegram enabled but TELEGRAM_BOT_TOKEN or TELEGRAM_CHAT_ID not set") + return 1 + telegram_token = args.telegram_token + telegram_chat_id = args.telegram_chat_id + + notifier = create_notifier(telegram_token, telegram_chat_id) + + config = BotConfig( + target_wallet=args.target, + scale_factor=args.scale, + poll_interval=args.interval, + max_position=args.max_position, + min_trade_size=args.min_size, + market_filter=args.markets, + ) + + bot = CopytradingBot( + exchange=exchange, + config=config, + notifier=notifier, + ) + + bot.run(duration_minutes=args.duration) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/examples/copytrading/notifications.py b/examples/copytrading/notifications.py new file mode 100644 index 0000000..3cd4011 --- /dev/null +++ b/examples/copytrading/notifications.py @@ -0,0 +1,181 @@ +""" +Notification handlers for the copytrading bot. + +Provides a clean interface for sending notifications via Telegram. +""" + +from typing import Optional, Protocol + +from dr_manhattan.utils.telegram import MessageBuilder, TelegramBot, code + +from .types import CopyStats, TradeInfo + + +class NotificationHandler(Protocol): + """Protocol for notification handlers""" + + def notify_startup( + self, + target_wallet: str, + scale_factor: float, + balance: float, + ) -> None: + """Send startup notification""" + ... + + def notify_shutdown(self, stats: CopyStats, duration: str) -> None: + """Send shutdown notification""" + ... + + def notify_trade_detected(self, trade: TradeInfo) -> None: + """Send trade detected notification""" + ... + + def notify_trade_copied(self, trade: TradeInfo, scaled_size: float) -> None: + """Send trade copied notification""" + ... + + def notify_error(self, error: str, context: str) -> None: + """Send error notification""" + ... + + +class TelegramNotifier: + """Telegram notification handler for copytrading bot""" + + def __init__(self, bot: TelegramBot) -> None: + self._bot = bot + + @property + def enabled(self) -> bool: + """Check if notifications are enabled""" + return self._bot.enabled + + def notify_startup( + self, + target_wallet: str, + scale_factor: float, + balance: float, + ) -> None: + """Send startup notification""" + wallet_short = f"{target_wallet[:8]}...{target_wallet[-6:]}" + msg = ( + MessageBuilder() + .title("Copytrading Bot Started") + .newline() + .field("Target", wallet_short) + .newline() + .field("Scale", f"{scale_factor}x") + .newline() + .field("Balance", f"${balance:,.2f}") + .build() + ) + self._bot.send(msg) + + def notify_shutdown(self, stats: CopyStats, duration: str) -> None: + """Send shutdown notification""" + msg = ( + MessageBuilder() + .title("Copytrading Bot Stopped") + .newline() + .field("Trades Copied", str(stats.trades_copied)) + .newline() + .field("Trades Failed", str(stats.trades_failed)) + .newline() + .field("Total Volume", f"${stats.total_volume:.2f}") + .newline() + .field("Duration", duration) + .build() + ) + self._bot.send(msg) + + def notify_trade_detected(self, trade: TradeInfo) -> None: + """Send trade detected notification""" + self._send_trade_notification(trade, is_copy=False) + + def notify_trade_copied(self, trade: TradeInfo, scaled_size: float) -> None: + """Send trade copied notification""" + self._send_trade_notification(trade, is_copy=True, size_override=scaled_size) + + def _send_trade_notification( + self, + trade: TradeInfo, + is_copy: bool, + size_override: Optional[float] = None, + ) -> None: + """Send a trade notification""" + emoji = "+" if trade.is_buy else "-" + action = "Copied" if is_copy else "Detected" + size = size_override if size_override is not None else trade.size + + msg = ( + MessageBuilder() + .title(f"{emoji} Trade {action}") + .newline() + .field("Side", trade.side_upper) + .newline() + .field("Size", f"{size:.2f}") + .newline() + .field("Outcome", trade.outcome) + .newline() + .field("Price", f"{trade.price:.4f}") + ) + + if trade.market_slug: + msg.newline().field("Market", trade.market_slug) + + self._bot.send(msg.build()) + + def notify_error(self, error: str, context: str = "") -> None: + """Send error notification""" + msg = MessageBuilder().title("Error").newline().raw(code(error)) + + if context: + msg.newline().field("Context", context) + + self._bot.send(msg.build()) + + +class NullNotifier: + """Null notification handler that does nothing""" + + @property + def enabled(self) -> bool: + return False + + def notify_startup( + self, + target_wallet: str, + scale_factor: float, + balance: float, + ) -> None: + pass + + def notify_shutdown(self, stats: CopyStats, duration: str) -> None: + pass + + def notify_trade_detected(self, trade: TradeInfo) -> None: + pass + + def notify_trade_copied(self, trade: TradeInfo, scaled_size: float) -> None: + pass + + def notify_error(self, error: str, context: str = "") -> None: + pass + + +def create_notifier( + telegram_token: Optional[str] = None, + telegram_chat_id: Optional[str] = None, +) -> NotificationHandler: + """ + Create a notification handler. + + Returns TelegramNotifier if credentials provided, else NullNotifier. + """ + if telegram_token and telegram_chat_id: + bot = TelegramBot(token=telegram_token, chat_id=telegram_chat_id) + if bot.enabled: + return TelegramNotifier(bot) + + return NullNotifier() diff --git a/examples/copytrading/types.py b/examples/copytrading/types.py new file mode 100644 index 0000000..f8a6d42 --- /dev/null +++ b/examples/copytrading/types.py @@ -0,0 +1,85 @@ +""" +Type definitions for the copytrading bot. +""" + +from dataclasses import dataclass, field +from datetime import datetime, timezone +from enum import Enum +from typing import List, Optional + + +class TradeAction(str, Enum): + """Trade action type""" + + DETECTED = "detected" + COPIED = "copied" + SKIPPED = "skipped" + FAILED = "failed" + + +@dataclass +class CopyStats: + """Statistics for a copytrading session""" + + trades_detected: int = 0 + trades_copied: int = 0 + trades_skipped: int = 0 + trades_failed: int = 0 + total_volume: float = 0.0 + start_time: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + + def to_dict(self) -> dict: + """Convert to dictionary""" + return { + "detected": self.trades_detected, + "copied": self.trades_copied, + "skipped": self.trades_skipped, + "failed": self.trades_failed, + "volume": self.total_volume, + } + + +@dataclass +class BotConfig: + """Configuration for the copytrading bot""" + + target_wallet: str + scale_factor: float = 1.0 + poll_interval: float = 5.0 + max_position: float = 100.0 + min_trade_size: float = 1.0 + market_filter: Optional[List[str]] = None + + def __post_init__(self) -> None: + if not self.target_wallet: + raise ValueError("target_wallet is required") + if self.scale_factor <= 0: + raise ValueError("scale_factor must be positive") + if self.poll_interval <= 0: + raise ValueError("poll_interval must be positive") + if self.max_position <= 0: + raise ValueError("max_position must be positive") + + +@dataclass +class TradeInfo: + """Information about a trade""" + + trade_id: str + side: str + size: float + outcome: str + price: float + market_slug: str + condition_id: str + timestamp: datetime + + @property + def side_upper(self) -> str: + """Get uppercase side""" + return self.side.upper() + + @property + def is_buy(self) -> bool: + """Check if this is a buy trade""" + return self.side_upper == "BUY" From 4a6f1d75b01628d0bdaf4dd193ad9bf62e6c4af1 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 18 Jan 2026 12:54:18 +0000 Subject: [PATCH 5/5] fix: resolve lint errors in telegram and copytrading modules --- dr_manhattan/utils/telegram/bot.py | 19 +++++++++++-------- dr_manhattan/utils/telegram/formatters.py | 12 +++++------- dr_manhattan/utils/telegram/types.py | 10 +++++++++- examples/copytrading/bot.py | 4 +++- examples/copytrading/cli.py | 12 +++++++----- 5 files changed, 35 insertions(+), 22 deletions(-) diff --git a/dr_manhattan/utils/telegram/bot.py b/dr_manhattan/utils/telegram/bot.py index d2c5a98..ab6285b 100644 --- a/dr_manhattan/utils/telegram/bot.py +++ b/dr_manhattan/utils/telegram/bot.py @@ -6,13 +6,12 @@ import json import logging -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional import requests from .types import ( InlineKeyboardMarkup, - Message, MessageOptions, ParseMode, ReplyMarkup, @@ -161,12 +160,16 @@ def send( "chat_id": chat_id or self._config.chat_id, "text": text, "parse_mode": (opts.parse_mode or self._config.parse_mode).value, - "disable_notification": opts.disable_notification - if opts.disable_notification is not None - else self._config.disable_notification, - "disable_web_page_preview": opts.disable_web_page_preview - if opts.disable_web_page_preview is not None - else self._config.disable_web_page_preview, + "disable_notification": ( + opts.disable_notification + if opts.disable_notification is not None + else self._config.disable_notification + ), + "disable_web_page_preview": ( + opts.disable_web_page_preview + if opts.disable_web_page_preview is not None + else self._config.disable_web_page_preview + ), } if opts.reply_to_message_id: diff --git a/dr_manhattan/utils/telegram/formatters.py b/dr_manhattan/utils/telegram/formatters.py index 60beb23..696dbbd 100644 --- a/dr_manhattan/utils/telegram/formatters.py +++ b/dr_manhattan/utils/telegram/formatters.py @@ -32,7 +32,9 @@ def code(text: str) -> str: def pre(text: str, language: Optional[str] = None) -> str: """Format text as code block (HTML)""" if language: - return f'
{escape_html(text)}
' + return ( + f'
{escape_html(text)}
' + ) return f"
{escape_html(text)}
" @@ -235,13 +237,9 @@ def subtitle(self, text: str) -> "MessageBuilder": self._parts.append(italic(text)) return self - def field(self, key: str, value: Any, inline: bool = False) -> "MessageBuilder": + def field(self, key: str, value: Any) -> "MessageBuilder": """Add a key-value field""" - separator = ": " if inline else "\n" - if not inline: - self._parts.append(f"{bold(key)}: {code(str(value))}") - else: - self._parts.append(f"{bold(key)}: {code(str(value))}") + self._parts.append(f"{bold(key)}: {code(str(value))}") return self def fields(self, data: Dict[str, Any]) -> "MessageBuilder": diff --git a/dr_manhattan/utils/telegram/types.py b/dr_manhattan/utils/telegram/types.py index e1b2db6..519d69f 100644 --- a/dr_manhattan/utils/telegram/types.py +++ b/dr_manhattan/utils/telegram/types.py @@ -157,7 +157,15 @@ def to_dict(self) -> Dict[str, Any]: return { "inline_keyboard": [ [ - {k: v for k, v in {"text": btn.text, "url": btn.url, "callback_data": btn.callback_data}.items() if v is not None} + { + k: v + for k, v in { + "text": btn.text, + "url": btn.url, + "callback_data": btn.callback_data, + }.items() + if v is not None + } for btn in row ] for row in self.inline_keyboard diff --git a/examples/copytrading/bot.py b/examples/copytrading/bot.py index 1a54b7a..c68f167 100644 --- a/examples/copytrading/bot.py +++ b/examples/copytrading/bot.py @@ -164,7 +164,9 @@ def _execute_copy_trade(self, trade: PublicTrade, trade_info: TradeInfo) -> bool outcome = trade.outcome if not outcome: - outcome = market.outcomes[trade.outcome_index] if trade.outcome_index is not None else None + outcome = ( + market.outcomes[trade.outcome_index] if trade.outcome_index is not None else None + ) if not outcome: logger.error("Cannot determine outcome for trade") diff --git a/examples/copytrading/cli.py b/examples/copytrading/cli.py index 577ab34..ad5a81b 100644 --- a/examples/copytrading/cli.py +++ b/examples/copytrading/cli.py @@ -113,11 +113,13 @@ def create_exchange() -> Optional[Polymarket]: funder = os.getenv("POLYMARKET_FUNDER") or os.getenv("FUNDER") try: - return Polymarket({ - "private_key": private_key, - "funder": funder, - "verbose": False, - }) + return Polymarket( + { + "private_key": private_key, + "funder": funder, + "verbose": False, + } + ) except Exception as e: logger.error(f"Failed to initialize exchange: {e}") return None