diff --git a/dr_manhattan/utils/__init__.py b/dr_manhattan/utils/__init__.py index 2c1df6b..b1ac463 100644 --- a/dr_manhattan/utils/__init__.py +++ b/dr_manhattan/utils/__init__.py @@ -1,6 +1,7 @@ """Utility functions and helpers for Dr. Manhattan.""" from .logger import ColoredFormatter, default_logger, setup_logger +from .telegram import TelegramBot from .tui import prompt_confirm, prompt_market_selection, prompt_selection __all__ = [ @@ -10,4 +11,5 @@ "prompt_selection", "prompt_market_selection", "prompt_confirm", + "TelegramBot", ] diff --git a/dr_manhattan/utils/telegram/__init__.py b/dr_manhattan/utils/telegram/__init__.py new file mode 100644 index 0000000..3ec0f26 --- /dev/null +++ b/dr_manhattan/utils/telegram/__init__.py @@ -0,0 +1,97 @@ +""" +Telegram Bot Integration Module + +A scalable, type-safe Telegram bot client for sending messages and notifications. + +Basic Usage: + from dr_manhattan.utils.telegram import TelegramBot + + bot = TelegramBot(token="...", chat_id="...") + bot.send("Hello, World!") + +With Message Builder: + from dr_manhattan.utils.telegram import TelegramBot, MessageBuilder + + msg = (MessageBuilder() + .title("Status Update") + .field("CPU", "45%") + .field("Memory", "2.1GB") + .build()) + + bot.send(msg) + +With Formatters: + from dr_manhattan.utils.telegram import TelegramBot + from dr_manhattan.utils.telegram.formatters import bold, code, key_value + + bot.send(f"{bold('Alert')}: Server is {code('online')}") +""" + +from .bot import TelegramBot +from .formatters import ( + MessageBuilder, + blockquote, + bold, + bullet_list, + code, + escape_html, + italic, + key_value, + link, + mention, + numbered_list, + pre, + progress_bar, + spoiler, + strikethrough, + table, + underline, +) +from .types import ( + Chat, + ChatType, + InlineKeyboardButton, + InlineKeyboardMarkup, + Message, + MessageOptions, + ParseMode, + ReplyMarkup, + SendResult, + TelegramConfig, + User, +) + +__all__ = [ + # Core + "TelegramBot", + # Types + "TelegramConfig", + "MessageOptions", + "SendResult", + "ParseMode", + "ChatType", + "User", + "Chat", + "Message", + "InlineKeyboardButton", + "InlineKeyboardMarkup", + "ReplyMarkup", + # Formatters + "MessageBuilder", + "escape_html", + "bold", + "italic", + "code", + "pre", + "link", + "mention", + "strikethrough", + "underline", + "spoiler", + "blockquote", + "table", + "key_value", + "bullet_list", + "numbered_list", + "progress_bar", +] diff --git a/dr_manhattan/utils/telegram/bot.py b/dr_manhattan/utils/telegram/bot.py new file mode 100644 index 0000000..ab6285b --- /dev/null +++ b/dr_manhattan/utils/telegram/bot.py @@ -0,0 +1,351 @@ +""" +Core Telegram Bot implementation. + +A generic, type-safe Telegram bot client for sending messages and notifications. +""" + +import json +import logging +from typing import Any, Dict, List, Optional + +import requests + +from .types import ( + InlineKeyboardMarkup, + MessageOptions, + ParseMode, + ReplyMarkup, + SendResult, + TelegramConfig, +) + +logger = logging.getLogger(__name__) + + +class TelegramBot: + """ + Generic Telegram bot client for sending messages. + + This is a low-level, type-safe client that can be used for any purpose. + For domain-specific formatting, use the formatters module. + + Example: + bot = TelegramBot(token="...", chat_id="...") + result = bot.send("Hello, World!") + if result.success: + print(f"Message sent: {result.message_id}") + """ + + BASE_URL = "https://api.telegram.org/bot{token}/{method}" + + def __init__( + self, + token: str, + chat_id: str, + parse_mode: ParseMode = ParseMode.HTML, + disable_notification: bool = False, + disable_web_page_preview: bool = True, + timeout: int = 10, + ) -> None: + """ + Initialize Telegram bot. + + Args: + token: Bot token from @BotFather + chat_id: Default chat ID to send messages to + parse_mode: Default parse mode for messages + disable_notification: Send messages silently by default + disable_web_page_preview: Disable link previews by default + timeout: Request timeout in seconds + """ + self._config = TelegramConfig( + bot_token=token, + chat_id=chat_id, + parse_mode=parse_mode, + disable_notification=disable_notification, + disable_web_page_preview=disable_web_page_preview, + timeout=timeout, + ) + + @classmethod + def from_config(cls, config: TelegramConfig) -> "TelegramBot": + """Create bot from config object""" + return cls( + token=config.bot_token, + chat_id=config.chat_id, + parse_mode=config.parse_mode, + disable_notification=config.disable_notification, + disable_web_page_preview=config.disable_web_page_preview, + timeout=config.timeout, + ) + + @property + def enabled(self) -> bool: + """Check if bot is configured and enabled""" + return bool(self._config.bot_token and self._config.chat_id) + + @property + def config(self) -> TelegramConfig: + """Get current configuration""" + return self._config + + def _build_url(self, method: str) -> str: + """Build API URL for method""" + return self.BASE_URL.format(token=self._config.bot_token, method=method) + + def _request( + self, + method: str, + data: Dict[str, Any], + ) -> SendResult: + """Make API request to Telegram""" + if not self.enabled: + return SendResult(success=False, error="Bot not configured") + + url = self._build_url(method) + + try: + response = requests.post(url, json=data, timeout=self._config.timeout) + result = response.json() + + if not result.get("ok"): + error_msg = result.get("description", "Unknown error") + logger.warning(f"Telegram API error: {error_msg}") + return SendResult(success=False, error=error_msg, raw=result) + + return SendResult( + success=True, + message_id=result.get("result", {}).get("message_id"), + raw=result.get("result"), + ) + + except requests.Timeout: + logger.warning("Telegram request timed out") + return SendResult(success=False, error="Request timed out") + except requests.RequestException as e: + logger.warning(f"Telegram request failed: {e}") + return SendResult(success=False, error=str(e)) + except json.JSONDecodeError as e: + logger.warning(f"Failed to parse Telegram response: {e}") + return SendResult(success=False, error=f"Invalid response: {e}") + except Exception as e: + logger.warning(f"Telegram error: {e}") + return SendResult(success=False, error=str(e)) + + def send( + self, + text: str, + chat_id: Optional[str] = None, + options: Optional[MessageOptions] = None, + reply_markup: Optional[ReplyMarkup] = None, + ) -> SendResult: + """ + Send a text message. + + Args: + text: Message text (supports HTML/Markdown based on parse_mode) + chat_id: Override default chat ID + options: Message options (parse_mode, notifications, etc.) + reply_markup: Optional inline keyboard + + Returns: + SendResult with success status and message ID + """ + if not text: + return SendResult(success=False, error="Empty message") + + opts = options or MessageOptions() + + data: Dict[str, Any] = { + "chat_id": chat_id or self._config.chat_id, + "text": text, + "parse_mode": (opts.parse_mode or self._config.parse_mode).value, + "disable_notification": ( + opts.disable_notification + if opts.disable_notification is not None + else self._config.disable_notification + ), + "disable_web_page_preview": ( + opts.disable_web_page_preview + if opts.disable_web_page_preview is not None + else self._config.disable_web_page_preview + ), + } + + if opts.reply_to_message_id: + data["reply_to_message_id"] = opts.reply_to_message_id + + if opts.protect_content: + data["protect_content"] = True + + if reply_markup: + if isinstance(reply_markup, InlineKeyboardMarkup): + data["reply_markup"] = reply_markup.to_dict() + + return self._request("sendMessage", data) + + def send_photo( + self, + photo: str, + caption: Optional[str] = None, + chat_id: Optional[str] = None, + options: Optional[MessageOptions] = None, + ) -> SendResult: + """ + Send a photo. + + Args: + photo: Photo URL or file_id + caption: Optional caption + chat_id: Override default chat ID + options: Message options + """ + opts = options or MessageOptions() + + data: Dict[str, Any] = { + "chat_id": chat_id or self._config.chat_id, + "photo": photo, + } + + if caption: + data["caption"] = caption + data["parse_mode"] = (opts.parse_mode or self._config.parse_mode).value + + return self._request("sendPhoto", data) + + def send_document( + self, + document: str, + caption: Optional[str] = None, + chat_id: Optional[str] = None, + options: Optional[MessageOptions] = None, + ) -> SendResult: + """ + Send a document. + + Args: + document: Document URL or file_id + caption: Optional caption + chat_id: Override default chat ID + options: Message options + """ + opts = options or MessageOptions() + + data: Dict[str, Any] = { + "chat_id": chat_id or self._config.chat_id, + "document": document, + } + + if caption: + data["caption"] = caption + data["parse_mode"] = (opts.parse_mode or self._config.parse_mode).value + + return self._request("sendDocument", data) + + def edit_message( + self, + message_id: int, + text: str, + chat_id: Optional[str] = None, + options: Optional[MessageOptions] = None, + reply_markup: Optional[ReplyMarkup] = None, + ) -> SendResult: + """ + Edit an existing message. + + Args: + message_id: ID of message to edit + text: New message text + chat_id: Override default chat ID + options: Message options + reply_markup: Optional inline keyboard + """ + opts = options or MessageOptions() + + data: Dict[str, Any] = { + "chat_id": chat_id or self._config.chat_id, + "message_id": message_id, + "text": text, + "parse_mode": (opts.parse_mode or self._config.parse_mode).value, + } + + if reply_markup: + if isinstance(reply_markup, InlineKeyboardMarkup): + data["reply_markup"] = reply_markup.to_dict() + + return self._request("editMessageText", data) + + def delete_message( + self, + message_id: int, + chat_id: Optional[str] = None, + ) -> SendResult: + """ + Delete a message. + + Args: + message_id: ID of message to delete + chat_id: Override default chat ID + """ + data = { + "chat_id": chat_id or self._config.chat_id, + "message_id": message_id, + } + + return self._request("deleteMessage", data) + + def get_me(self) -> Optional[Dict[str, Any]]: + """Get bot information""" + result = self._request("getMe", {}) + return result.raw if result.success else None + + def send_chat_action( + self, + action: str = "typing", + chat_id: Optional[str] = None, + ) -> SendResult: + """ + Send chat action (typing indicator, etc.) + + Args: + action: Action type (typing, upload_photo, upload_document, etc.) + chat_id: Override default chat ID + """ + data = { + "chat_id": chat_id or self._config.chat_id, + "action": action, + } + + return self._request("sendChatAction", data) + + def send_batch( + self, + messages: List[str], + chat_id: Optional[str] = None, + options: Optional[MessageOptions] = None, + delay_ms: int = 0, + ) -> List[SendResult]: + """ + Send multiple messages. + + Args: + messages: List of message texts + chat_id: Override default chat ID + options: Message options + delay_ms: Delay between messages in milliseconds (0 = no delay) + + Returns: + List of SendResult for each message + """ + import time + + results: List[SendResult] = [] + + for i, text in enumerate(messages): + result = self.send(text, chat_id=chat_id, options=options) + results.append(result) + + if delay_ms > 0 and i < len(messages) - 1: + time.sleep(delay_ms / 1000) + + return results diff --git a/dr_manhattan/utils/telegram/formatters.py b/dr_manhattan/utils/telegram/formatters.py new file mode 100644 index 0000000..696dbbd --- /dev/null +++ b/dr_manhattan/utils/telegram/formatters.py @@ -0,0 +1,306 @@ +""" +Message formatting utilities for Telegram. + +Provides HTML and Markdown formatting helpers for building messages. +""" + +import html +from dataclasses import dataclass +from typing import Any, Dict, List, Optional, Sequence, Tuple, Union + + +def escape_html(text: str) -> str: + """Escape HTML special characters""" + return html.escape(str(text)) + + +def bold(text: str) -> str: + """Format text as bold (HTML)""" + return f"{escape_html(text)}" + + +def italic(text: str) -> str: + """Format text as italic (HTML)""" + return f"{escape_html(text)}" + + +def code(text: str) -> str: + """Format text as inline code (HTML)""" + return f"{escape_html(text)}" + + +def pre(text: str, language: Optional[str] = None) -> str: + """Format text as code block (HTML)""" + if language: + return ( + f'
{escape_html(text)}
' + ) + return f"
{escape_html(text)}
" + + +def link(text: str, url: str) -> str: + """Format text as hyperlink (HTML)""" + return f'{escape_html(text)}' + + +def mention(text: str, user_id: int) -> str: + """Format text as user mention (HTML)""" + return f'{escape_html(text)}' + + +def strikethrough(text: str) -> str: + """Format text as strikethrough (HTML)""" + return f"{escape_html(text)}" + + +def underline(text: str) -> str: + """Format text as underline (HTML)""" + return f"{escape_html(text)}" + + +def spoiler(text: str) -> str: + """Format text as spoiler (HTML)""" + return f'{escape_html(text)}' + + +def blockquote(text: str) -> str: + """Format text as blockquote (HTML)""" + return f"
{escape_html(text)}
" + + +@dataclass +class TableRow: + """Table row data""" + + cells: List[str] + bold_first: bool = False + + +def table( + rows: Sequence[Union[Tuple[str, ...], List[str], TableRow]], + header: Optional[Sequence[str]] = None, + separator: str = " | ", +) -> str: + """ + Format data as a simple text table. + + Args: + rows: List of rows (tuples/lists of cell values) + header: Optional header row + separator: Column separator + + Returns: + Formatted table as monospace text + """ + lines: List[str] = [] + + if header: + lines.append(separator.join(bold(h) for h in header)) + lines.append("-" * 20) + + for row in rows: + if isinstance(row, TableRow): + cells = row.cells + if row.bold_first and cells: + cells = [bold(cells[0])] + [code(c) for c in cells[1:]] + else: + cells = [code(c) for c in cells] + else: + cells = [code(str(c)) for c in row] + lines.append(separator.join(cells)) + + return "\n".join(lines) + + +def key_value( + data: Dict[str, Any], + separator: str = ": ", + bold_keys: bool = True, +) -> str: + """ + Format key-value pairs. + + Args: + data: Dictionary of key-value pairs + separator: Separator between key and value + bold_keys: Whether to bold the keys + + Returns: + Formatted key-value pairs + """ + lines: List[str] = [] + + for key, value in data.items(): + key_str = bold(key) if bold_keys else escape_html(key) + value_str = code(str(value)) + lines.append(f"{key_str}{separator}{value_str}") + + return "\n".join(lines) + + +def bullet_list(items: Sequence[str], bullet: str = "-") -> str: + """ + Format items as a bullet list. + + Args: + items: List of items + bullet: Bullet character + + Returns: + Formatted bullet list + """ + return "\n".join(f"{bullet} {escape_html(item)}" for item in items) + + +def numbered_list(items: Sequence[str], start: int = 1) -> str: + """ + Format items as a numbered list. + + Args: + items: List of items + start: Starting number + + Returns: + Formatted numbered list + """ + return "\n".join(f"{i}. {escape_html(item)}" for i, item in enumerate(items, start)) + + +def progress_bar( + current: float, + total: float, + width: int = 10, + filled: str = "█", + empty: str = "░", +) -> str: + """ + Create a text progress bar. + + Args: + current: Current value + total: Total value + width: Bar width in characters + filled: Character for filled portion + empty: Character for empty portion + + Returns: + Progress bar string + """ + if total <= 0: + ratio = 0.0 + else: + ratio = min(1.0, max(0.0, current / total)) + + filled_width = int(ratio * width) + empty_width = width - filled_width + + bar = filled * filled_width + empty * empty_width + percentage = ratio * 100 + + return f"{bar} {percentage:.1f}%" + + +class MessageBuilder: + """ + Fluent message builder for constructing formatted messages. + + Example: + msg = (MessageBuilder() + .title("Alert") + .field("Status", "OK") + .field("Count", 42) + .newline() + .text("Details here") + .build()) + """ + + def __init__(self) -> None: + self._parts: List[str] = [] + + def text(self, text: str, escape: bool = True) -> "MessageBuilder": + """Add plain text""" + self._parts.append(escape_html(text) if escape else text) + return self + + def raw(self, text: str) -> "MessageBuilder": + """Add raw HTML (no escaping)""" + self._parts.append(text) + return self + + def title(self, text: str) -> "MessageBuilder": + """Add a bold title""" + self._parts.append(bold(text)) + return self + + def subtitle(self, text: str) -> "MessageBuilder": + """Add an italic subtitle""" + self._parts.append(italic(text)) + return self + + def field(self, key: str, value: Any) -> "MessageBuilder": + """Add a key-value field""" + self._parts.append(f"{bold(key)}: {code(str(value))}") + return self + + def fields(self, data: Dict[str, Any]) -> "MessageBuilder": + """Add multiple key-value fields""" + for key, value in data.items(): + self.field(key, value) + self.newline() + return self + + def code_block(self, text: str, language: Optional[str] = None) -> "MessageBuilder": + """Add a code block""" + self._parts.append(pre(text, language)) + return self + + def inline_code(self, text: str) -> "MessageBuilder": + """Add inline code""" + self._parts.append(code(text)) + return self + + def link_text(self, text: str, url: str) -> "MessageBuilder": + """Add a hyperlink""" + self._parts.append(link(text, url)) + return self + + def newline(self, count: int = 1) -> "MessageBuilder": + """Add newlines""" + self._parts.append("\n" * count) + return self + + def separator(self, char: str = "-", width: int = 20) -> "MessageBuilder": + """Add a separator line""" + self._parts.append(char * width) + return self + + def bullet(self, items: Sequence[str]) -> "MessageBuilder": + """Add a bullet list""" + self._parts.append(bullet_list(items)) + return self + + def numbered(self, items: Sequence[str], start: int = 1) -> "MessageBuilder": + """Add a numbered list""" + self._parts.append(numbered_list(items, start)) + return self + + def progress( + self, + current: float, + total: float, + label: Optional[str] = None, + ) -> "MessageBuilder": + """Add a progress bar""" + bar = progress_bar(current, total) + if label: + self._parts.append(f"{escape_html(label)}: {bar}") + else: + self._parts.append(bar) + return self + + def build(self) -> str: + """Build the final message""" + return "".join(self._parts) + + def __str__(self) -> str: + return self.build() diff --git a/dr_manhattan/utils/telegram/types.py b/dr_manhattan/utils/telegram/types.py new file mode 100644 index 0000000..519d69f --- /dev/null +++ b/dr_manhattan/utils/telegram/types.py @@ -0,0 +1,176 @@ +""" +Type definitions for Telegram bot integration. +""" + +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Callable, Dict, List, Optional, TypeVar, Union + +T = TypeVar("T") + +Callback = Callable[[Dict[str, Any]], None] + + +class ParseMode(str, Enum): + """Telegram message parse modes""" + + HTML = "HTML" + MARKDOWN = "Markdown" + MARKDOWN_V2 = "MarkdownV2" + + +class ChatType(str, Enum): + """Telegram chat types""" + + PRIVATE = "private" + GROUP = "group" + SUPERGROUP = "supergroup" + CHANNEL = "channel" + + +@dataclass(frozen=True) +class TelegramConfig: + """Configuration for Telegram bot""" + + bot_token: str + chat_id: str + parse_mode: ParseMode = ParseMode.HTML + disable_notification: bool = False + disable_web_page_preview: bool = True + timeout: int = 10 + + def __post_init__(self) -> None: + if not self.bot_token: + raise ValueError("bot_token is required") + if not self.chat_id: + raise ValueError("chat_id is required") + + +@dataclass(frozen=True) +class MessageOptions: + """Options for sending a message""" + + parse_mode: Optional[ParseMode] = None + disable_notification: Optional[bool] = None + disable_web_page_preview: Optional[bool] = None + reply_to_message_id: Optional[int] = None + protect_content: bool = False + + +@dataclass(frozen=True) +class SendResult: + """Result of sending a message""" + + success: bool + message_id: Optional[int] = None + error: Optional[str] = None + raw: Optional[Dict[str, Any]] = None + + +@dataclass(frozen=True) +class User: + """Telegram user""" + + id: int + is_bot: bool + first_name: str + last_name: Optional[str] = None + username: Optional[str] = None + language_code: Optional[str] = None + + +@dataclass(frozen=True) +class Chat: + """Telegram chat""" + + id: int + type: ChatType + title: Optional[str] = None + username: Optional[str] = None + first_name: Optional[str] = None + last_name: Optional[str] = None + + +@dataclass(frozen=True) +class Message: + """Telegram message""" + + message_id: int + date: int + chat: Chat + from_user: Optional[User] = None + text: Optional[str] = None + raw: Dict[str, Any] = field(default_factory=dict) + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "Message": + """Parse message from API response""" + chat_data = data.get("chat", {}) + chat = Chat( + id=chat_data.get("id", 0), + type=ChatType(chat_data.get("type", "private")), + title=chat_data.get("title"), + username=chat_data.get("username"), + first_name=chat_data.get("first_name"), + last_name=chat_data.get("last_name"), + ) + + from_data = data.get("from") + from_user = None + if from_data: + from_user = User( + id=from_data.get("id", 0), + is_bot=from_data.get("is_bot", False), + first_name=from_data.get("first_name", ""), + last_name=from_data.get("last_name"), + username=from_data.get("username"), + language_code=from_data.get("language_code"), + ) + + return cls( + message_id=data.get("message_id", 0), + date=data.get("date", 0), + chat=chat, + from_user=from_user, + text=data.get("text"), + raw=data, + ) + + +@dataclass +class InlineKeyboardButton: + """Inline keyboard button""" + + text: str + url: Optional[str] = None + callback_data: Optional[str] = None + + +@dataclass +class InlineKeyboardMarkup: + """Inline keyboard markup""" + + inline_keyboard: List[List[InlineKeyboardButton]] = field(default_factory=list) + + def to_dict(self) -> Dict[str, Any]: + """Convert to API format""" + return { + "inline_keyboard": [ + [ + { + k: v + for k, v in { + "text": btn.text, + "url": btn.url, + "callback_data": btn.callback_data, + }.items() + if v is not None + } + for btn in row + ] + for row in self.inline_keyboard + ] + } + + +ReplyMarkup = Union[InlineKeyboardMarkup, None] diff --git a/examples/copytrading/__init__.py b/examples/copytrading/__init__.py new file mode 100644 index 0000000..998cd34 --- /dev/null +++ b/examples/copytrading/__init__.py @@ -0,0 +1,44 @@ +""" +Polymarket Copytrading Bot + +Monitors a target wallet's trades and mirrors them on your account. + +Usage: + uv run python -m examples.copytrading --target + uv run python -m examples.copytrading --target --scale 0.5 + uv run python -m examples.copytrading --target --telegram + +Programmatic Usage: + from examples.copytrading import CopytradingBot, BotConfig + from dr_manhattan import Polymarket + + exchange = Polymarket({"private_key": "..."}) + config = BotConfig(target_wallet="0x...") + + bot = CopytradingBot(exchange, config) + bot.run() +""" + +from .bot import CopytradingBot +from .notifications import ( + NotificationHandler, + NullNotifier, + TelegramNotifier, + create_notifier, +) +from .types import BotConfig, CopyStats, TradeAction, TradeInfo + +__all__ = [ + # Bot + "CopytradingBot", + # Types + "BotConfig", + "CopyStats", + "TradeAction", + "TradeInfo", + # Notifications + "NotificationHandler", + "TelegramNotifier", + "NullNotifier", + "create_notifier", +] diff --git a/examples/copytrading/__main__.py b/examples/copytrading/__main__.py new file mode 100644 index 0000000..40bab88 --- /dev/null +++ b/examples/copytrading/__main__.py @@ -0,0 +1,13 @@ +""" +Entry point for running the copytrading bot as a module. + +Usage: + uv run python -m examples.copytrading --target +""" + +import sys + +from .cli import main + +if __name__ == "__main__": + sys.exit(main()) diff --git a/examples/copytrading/bot.py b/examples/copytrading/bot.py new file mode 100644 index 0000000..c68f167 --- /dev/null +++ b/examples/copytrading/bot.py @@ -0,0 +1,362 @@ +""" +Copytrading bot implementation. + +Monitors a target wallet's trades and mirrors them on your account. +""" + +import logging +import time +from datetime import datetime, timezone +from typing import Dict, List, Optional, Set + +from dr_manhattan import Polymarket +from dr_manhattan.exchanges.polymarket import PublicTrade +from dr_manhattan.models import Market +from dr_manhattan.models.order import OrderSide +from dr_manhattan.utils.logger import Colors + +from .notifications import NotificationHandler, NullNotifier +from .types import BotConfig, CopyStats, TradeInfo + +logger = logging.getLogger(__name__) + + +class CopytradingBot: + """ + Copytrading bot that monitors a target wallet and mirrors trades. + + Features: + - Polls target wallet trades via Polymarket Data API + - Mirrors trades with configurable size scaling + - Tracks copied trades to avoid duplicates + - Supports market filtering + - Pluggable notification system + """ + + def __init__( + self, + exchange: Polymarket, + config: BotConfig, + notifier: Optional[NotificationHandler] = None, + ) -> None: + """ + Initialize copytrading bot. + + Args: + exchange: Authenticated Polymarket exchange + config: Bot configuration + notifier: Optional notification handler + """ + self._exchange = exchange + self._config = config + self._notifier = notifier or NullNotifier() + + self._is_running = False + self._copied_trades: Set[str] = set() + self._stats = CopyStats() + self._market_cache: Dict[str, Market] = {} + self._last_poll_time: Optional[datetime] = None + + @property + def config(self) -> BotConfig: + """Get bot configuration""" + return self._config + + @property + def stats(self) -> CopyStats: + """Get current statistics""" + return self._stats + + @property + def is_running(self) -> bool: + """Check if bot is running""" + return self._is_running + + def _get_trade_id(self, trade: PublicTrade) -> str: + """Generate unique ID for a trade""" + return f"{trade.transaction_hash}_{trade.outcome_index}" + + def _create_trade_info(self, trade: PublicTrade) -> TradeInfo: + """Create TradeInfo from PublicTrade""" + return TradeInfo( + trade_id=self._get_trade_id(trade), + side=trade.side, + size=trade.size, + outcome=trade.outcome or f"idx:{trade.outcome_index}", + price=trade.price, + market_slug=trade.slug or trade.event_slug or "", + condition_id=trade.condition_id or "", + timestamp=trade.timestamp, + ) + + def _should_copy_trade(self, trade: PublicTrade) -> bool: + """Check if trade should be copied""" + trade_id = self._get_trade_id(trade) + + if trade_id in self._copied_trades: + return False + + if trade.size < self._config.min_trade_size: + logger.debug(f"Skipping small trade: {trade.size}") + return False + + if self._config.market_filter: + slug = trade.event_slug or trade.slug or "" + if not any(f.lower() in slug.lower() for f in self._config.market_filter): + return False + + return True + + def _get_market(self, trade: PublicTrade) -> Optional[Market]: + """Get market data for a trade""" + condition_id = trade.condition_id + if not condition_id: + return None + + if condition_id in self._market_cache: + return self._market_cache[condition_id] + + try: + slug = trade.event_slug or trade.slug + if slug: + markets = self._exchange.fetch_markets_by_slug(slug) + for market in markets: + self._market_cache[market.id] = market + if market.id == condition_id: + return market + + market = self._exchange.fetch_market(condition_id) + self._market_cache[condition_id] = market + return market + except Exception as e: + logger.warning(f"Failed to fetch market {condition_id}: {e}") + return None + + def _get_token_id(self, market: Market, outcome: str) -> Optional[str]: + """Get token ID for an outcome""" + token_ids = market.metadata.get("clobTokenIds", []) + outcomes = market.outcomes + + if not token_ids or len(token_ids) != len(outcomes): + try: + token_ids = self._exchange.fetch_token_ids(market.id) + market.metadata["clobTokenIds"] = token_ids + except Exception as e: + logger.warning(f"Failed to fetch token IDs: {e}") + return None + + for i, out in enumerate(outcomes): + if out.lower() == outcome.lower(): + return token_ids[i] if i < len(token_ids) else None + + return None + + def _execute_copy_trade(self, trade: PublicTrade, trade_info: TradeInfo) -> bool: + """Execute a copy of the target's trade""" + market = self._get_market(trade) + if not market: + logger.error(f"Cannot find market for trade: {trade.condition_id}") + self._notifier.notify_error( + f"Cannot find market: {trade.condition_id}", + "execute_copy_trade", + ) + return False + + outcome = trade.outcome + if not outcome: + outcome = ( + market.outcomes[trade.outcome_index] if trade.outcome_index is not None else None + ) + + if not outcome: + logger.error("Cannot determine outcome for trade") + return False + + token_id = self._get_token_id(market, outcome) + if not token_id: + logger.error(f"Cannot find token ID for outcome: {outcome}") + return False + + side = OrderSide.BUY if trade.side.upper() == "BUY" else OrderSide.SELL + size = trade.size * self._config.scale_factor + price = trade.price + + if size > self._config.max_position: + size = self._config.max_position + logger.warning(f"Capped trade size to max_position: {self._config.max_position}") + + try: + order = self._exchange.create_order( + market_id=market.id, + outcome=outcome, + side=side, + price=price, + size=size, + params={"token_id": token_id}, + ) + + side_colored = Colors.green("BUY") if side == OrderSide.BUY else Colors.red("SELL") + logger.info( + f" {Colors.cyan('COPIED')} {side_colored} {size:.2f} " + f"{Colors.magenta(outcome[:20])} @ {Colors.yellow(f'{price:.4f}')} " + f"[{Colors.gray(order.id[:8] + '...')}]" + ) + + self._notifier.notify_trade_copied(trade_info, size) + return True + + except Exception as e: + logger.error(f"Failed to execute copy trade: {e}") + self._notifier.notify_error(str(e), "execute_copy_trade") + return False + + def _poll_trades(self) -> List[PublicTrade]: + """Poll for new trades from target wallet""" + try: + trades = self._exchange.fetch_public_trades( + user=self._config.target_wallet, + limit=50, + taker_only=True, + ) + + if self._last_poll_time: + trades = [t for t in trades if t.timestamp > self._last_poll_time] + + self._last_poll_time = datetime.now(timezone.utc) + return trades + + except Exception as e: + logger.warning(f"Failed to fetch trades: {e}") + return [] + + def _process_trades(self, trades: List[PublicTrade]) -> None: + """Process new trades from target wallet""" + for trade in trades: + self._stats.trades_detected += 1 + trade_info = self._create_trade_info(trade) + + if not self._should_copy_trade(trade): + self._stats.trades_skipped += 1 + continue + + logger.info( + f"\n{Colors.bold('New Trade Detected:')} " + f"{Colors.cyan(trade_info.side_upper)} {trade.size:.2f} " + f"{Colors.magenta(trade_info.outcome[:20])} " + f"@ {Colors.yellow(f'{trade.price:.4f}')} " + f"[{Colors.gray(trade_info.market_slug or '')}]" + ) + + self._notifier.notify_trade_detected(trade_info) + + if self._execute_copy_trade(trade, trade_info): + self._copied_trades.add(trade_info.trade_id) + self._stats.trades_copied += 1 + self._stats.total_volume += trade.size * self._config.scale_factor + else: + self._stats.trades_failed += 1 + + def _get_uptime_str(self) -> str: + """Get formatted uptime string""" + elapsed = (datetime.now(timezone.utc) - self._stats.start_time).total_seconds() + return f"{int(elapsed // 60)}m {int(elapsed % 60)}s" + + def _log_status(self) -> None: + """Log current status""" + uptime_str = self._get_uptime_str() + + logger.info( + f"\n[{time.strftime('%H:%M:%S')}] " + f"{Colors.bold('Status:')} " + f"Detected: {Colors.cyan(str(self._stats.trades_detected))} | " + f"Copied: {Colors.green(str(self._stats.trades_copied))} | " + f"Skipped: {Colors.gray(str(self._stats.trades_skipped))} | " + f"Failed: {Colors.red(str(self._stats.trades_failed))} | " + f"Volume: {Colors.yellow(f'${self._stats.total_volume:.2f}')} | " + f"Uptime: {Colors.gray(uptime_str)}" + ) + + def _log_startup(self, balance: float) -> None: + """Log startup information""" + logger.info(f"\n{Colors.bold('Copytrading Bot Started')}") + logger.info(f"Target: {Colors.cyan(self._config.target_wallet)}") + logger.info(f"Scale: {Colors.yellow(f'{self._config.scale_factor}x')}") + logger.info(f"Interval: {Colors.gray(f'{self._config.poll_interval}s')}") + logger.info(f"Max Position: {Colors.blue(f'{self._config.max_position}')}") + + if self._config.market_filter: + logger.info(f"Markets: {Colors.magenta(', '.join(self._config.market_filter))}") + + if hasattr(self._notifier, "enabled") and self._notifier.enabled: + logger.info(f"Telegram: {Colors.green('Enabled')}") + + address = getattr(self._exchange, "_address", None) + if address: + logger.info(f"Bot Address: {Colors.cyan(address)}") + + logger.info(f"Balance: {Colors.green(f'${balance:,.2f}')} USDC") + + def _log_summary(self) -> None: + """Log final summary""" + duration_str = self._get_uptime_str() + + logger.info(f"\n{Colors.bold('Session Summary')}") + logger.info(f"Duration: {duration_str}") + logger.info(f"Trades Detected: {self._stats.trades_detected}") + logger.info(f"Trades Copied: {Colors.green(str(self._stats.trades_copied))}") + logger.info(f"Trades Skipped: {self._stats.trades_skipped}") + logger.info(f"Trades Failed: {Colors.red(str(self._stats.trades_failed))}") + logger.info(f"Total Volume: {Colors.yellow(f'${self._stats.total_volume:.2f}')}") + + self._notifier.notify_shutdown(self._stats, duration_str) + + def run(self, duration_minutes: Optional[int] = None) -> None: + """ + Run the copytrading bot. + + Args: + duration_minutes: Optional duration limit in minutes + """ + usdc = 0.0 + try: + balance = self._exchange.fetch_balance() + usdc = balance.get("USDC", 0.0) + except Exception as e: + logger.warning(f"Failed to fetch balance: {e}") + + self._log_startup(usdc) + self._notifier.notify_startup( + self._config.target_wallet, + self._config.scale_factor, + usdc, + ) + + logger.info(f"\n{Colors.gray('Waiting for trades...')}") + + self._is_running = True + self._stats = CopyStats() + start_time = time.time() + end_time = start_time + (duration_minutes * 60) if duration_minutes else None + + try: + while self._is_running: + if end_time and time.time() >= end_time: + break + + trades = self._poll_trades() + if trades: + self._process_trades(trades) + + self._log_status() + time.sleep(self._config.poll_interval) + + except KeyboardInterrupt: + logger.info("\nStopping...") + + finally: + self._is_running = False + self._log_summary() + + def stop(self) -> None: + """Stop the bot""" + self._is_running = False diff --git a/examples/copytrading/cli.py b/examples/copytrading/cli.py new file mode 100644 index 0000000..ad5a81b --- /dev/null +++ b/examples/copytrading/cli.py @@ -0,0 +1,169 @@ +""" +Command-line interface for the copytrading bot. +""" + +import argparse +import os +import sys +from typing import Optional + +from dotenv import load_dotenv + +from dr_manhattan import Polymarket +from dr_manhattan.utils import setup_logger + +from .bot import CopytradingBot +from .notifications import create_notifier +from .types import BotConfig + +logger = setup_logger(__name__) + + +def parse_args() -> argparse.Namespace: + """Parse command line arguments""" + parser = argparse.ArgumentParser( + description="Polymarket Copytrading Bot", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + %(prog)s --target 0x123...abc + %(prog)s --target 0x123...abc --scale 0.5 + %(prog)s --target 0x123...abc --telegram --markets fed-decision + +Environment Variables: + POLYMARKET_PRIVATE_KEY Your wallet private key + POLYMARKET_FUNDER Proxy wallet funder address (optional) + TELEGRAM_BOT_TOKEN Telegram bot token (optional) + TELEGRAM_CHAT_ID Telegram chat ID (optional) + """, + ) + + parser.add_argument( + "-t", + "--target", + required=True, + help="Target wallet address to copy trades from", + ) + parser.add_argument( + "-s", + "--scale", + type=float, + default=float(os.getenv("SCALE_FACTOR", "1.0")), + help="Scale factor for trade sizes (default: 1.0)", + ) + parser.add_argument( + "-i", + "--interval", + type=float, + default=float(os.getenv("POLL_INTERVAL", "5")), + help="Poll interval in seconds (default: 5)", + ) + parser.add_argument( + "--max-position", + type=float, + default=float(os.getenv("MAX_POSITION", "100")), + help="Maximum position size (default: 100)", + ) + parser.add_argument( + "--min-size", + type=float, + default=float(os.getenv("MIN_TRADE_SIZE", "1")), + help="Minimum trade size to copy (default: 1)", + ) + parser.add_argument( + "-m", + "--markets", + nargs="*", + default=None, + help="Filter to specific market slugs", + ) + parser.add_argument( + "-d", + "--duration", + type=int, + default=None, + help="Duration in minutes (default: indefinite)", + ) + parser.add_argument( + "--telegram", + action="store_true", + help="Enable Telegram notifications", + ) + parser.add_argument( + "--telegram-token", + default=os.getenv("TELEGRAM_BOT_TOKEN"), + help="Telegram bot token (or set TELEGRAM_BOT_TOKEN)", + ) + parser.add_argument( + "--telegram-chat-id", + default=os.getenv("TELEGRAM_CHAT_ID"), + help="Telegram chat ID (or set TELEGRAM_CHAT_ID)", + ) + + return parser.parse_args() + + +def create_exchange() -> Optional[Polymarket]: + """Create and authenticate Polymarket exchange""" + private_key = os.getenv("POLYMARKET_PRIVATE_KEY") or os.getenv("PRIVATE_KEY") + if not private_key: + logger.error("POLYMARKET_PRIVATE_KEY or PRIVATE_KEY required in environment") + return None + + funder = os.getenv("POLYMARKET_FUNDER") or os.getenv("FUNDER") + + try: + return Polymarket( + { + "private_key": private_key, + "funder": funder, + "verbose": False, + } + ) + except Exception as e: + logger.error(f"Failed to initialize exchange: {e}") + return None + + +def main() -> int: + """Entry point""" + load_dotenv() + args = parse_args() + + exchange = create_exchange() + if not exchange: + return 1 + + telegram_token = None + telegram_chat_id = None + + if args.telegram: + if not args.telegram_token or not args.telegram_chat_id: + logger.error("Telegram enabled but TELEGRAM_BOT_TOKEN or TELEGRAM_CHAT_ID not set") + return 1 + telegram_token = args.telegram_token + telegram_chat_id = args.telegram_chat_id + + notifier = create_notifier(telegram_token, telegram_chat_id) + + config = BotConfig( + target_wallet=args.target, + scale_factor=args.scale, + poll_interval=args.interval, + max_position=args.max_position, + min_trade_size=args.min_size, + market_filter=args.markets, + ) + + bot = CopytradingBot( + exchange=exchange, + config=config, + notifier=notifier, + ) + + bot.run(duration_minutes=args.duration) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/examples/copytrading/notifications.py b/examples/copytrading/notifications.py new file mode 100644 index 0000000..3cd4011 --- /dev/null +++ b/examples/copytrading/notifications.py @@ -0,0 +1,181 @@ +""" +Notification handlers for the copytrading bot. + +Provides a clean interface for sending notifications via Telegram. +""" + +from typing import Optional, Protocol + +from dr_manhattan.utils.telegram import MessageBuilder, TelegramBot, code + +from .types import CopyStats, TradeInfo + + +class NotificationHandler(Protocol): + """Protocol for notification handlers""" + + def notify_startup( + self, + target_wallet: str, + scale_factor: float, + balance: float, + ) -> None: + """Send startup notification""" + ... + + def notify_shutdown(self, stats: CopyStats, duration: str) -> None: + """Send shutdown notification""" + ... + + def notify_trade_detected(self, trade: TradeInfo) -> None: + """Send trade detected notification""" + ... + + def notify_trade_copied(self, trade: TradeInfo, scaled_size: float) -> None: + """Send trade copied notification""" + ... + + def notify_error(self, error: str, context: str) -> None: + """Send error notification""" + ... + + +class TelegramNotifier: + """Telegram notification handler for copytrading bot""" + + def __init__(self, bot: TelegramBot) -> None: + self._bot = bot + + @property + def enabled(self) -> bool: + """Check if notifications are enabled""" + return self._bot.enabled + + def notify_startup( + self, + target_wallet: str, + scale_factor: float, + balance: float, + ) -> None: + """Send startup notification""" + wallet_short = f"{target_wallet[:8]}...{target_wallet[-6:]}" + msg = ( + MessageBuilder() + .title("Copytrading Bot Started") + .newline() + .field("Target", wallet_short) + .newline() + .field("Scale", f"{scale_factor}x") + .newline() + .field("Balance", f"${balance:,.2f}") + .build() + ) + self._bot.send(msg) + + def notify_shutdown(self, stats: CopyStats, duration: str) -> None: + """Send shutdown notification""" + msg = ( + MessageBuilder() + .title("Copytrading Bot Stopped") + .newline() + .field("Trades Copied", str(stats.trades_copied)) + .newline() + .field("Trades Failed", str(stats.trades_failed)) + .newline() + .field("Total Volume", f"${stats.total_volume:.2f}") + .newline() + .field("Duration", duration) + .build() + ) + self._bot.send(msg) + + def notify_trade_detected(self, trade: TradeInfo) -> None: + """Send trade detected notification""" + self._send_trade_notification(trade, is_copy=False) + + def notify_trade_copied(self, trade: TradeInfo, scaled_size: float) -> None: + """Send trade copied notification""" + self._send_trade_notification(trade, is_copy=True, size_override=scaled_size) + + def _send_trade_notification( + self, + trade: TradeInfo, + is_copy: bool, + size_override: Optional[float] = None, + ) -> None: + """Send a trade notification""" + emoji = "+" if trade.is_buy else "-" + action = "Copied" if is_copy else "Detected" + size = size_override if size_override is not None else trade.size + + msg = ( + MessageBuilder() + .title(f"{emoji} Trade {action}") + .newline() + .field("Side", trade.side_upper) + .newline() + .field("Size", f"{size:.2f}") + .newline() + .field("Outcome", trade.outcome) + .newline() + .field("Price", f"{trade.price:.4f}") + ) + + if trade.market_slug: + msg.newline().field("Market", trade.market_slug) + + self._bot.send(msg.build()) + + def notify_error(self, error: str, context: str = "") -> None: + """Send error notification""" + msg = MessageBuilder().title("Error").newline().raw(code(error)) + + if context: + msg.newline().field("Context", context) + + self._bot.send(msg.build()) + + +class NullNotifier: + """Null notification handler that does nothing""" + + @property + def enabled(self) -> bool: + return False + + def notify_startup( + self, + target_wallet: str, + scale_factor: float, + balance: float, + ) -> None: + pass + + def notify_shutdown(self, stats: CopyStats, duration: str) -> None: + pass + + def notify_trade_detected(self, trade: TradeInfo) -> None: + pass + + def notify_trade_copied(self, trade: TradeInfo, scaled_size: float) -> None: + pass + + def notify_error(self, error: str, context: str = "") -> None: + pass + + +def create_notifier( + telegram_token: Optional[str] = None, + telegram_chat_id: Optional[str] = None, +) -> NotificationHandler: + """ + Create a notification handler. + + Returns TelegramNotifier if credentials provided, else NullNotifier. + """ + if telegram_token and telegram_chat_id: + bot = TelegramBot(token=telegram_token, chat_id=telegram_chat_id) + if bot.enabled: + return TelegramNotifier(bot) + + return NullNotifier() diff --git a/examples/copytrading/types.py b/examples/copytrading/types.py new file mode 100644 index 0000000..f8a6d42 --- /dev/null +++ b/examples/copytrading/types.py @@ -0,0 +1,85 @@ +""" +Type definitions for the copytrading bot. +""" + +from dataclasses import dataclass, field +from datetime import datetime, timezone +from enum import Enum +from typing import List, Optional + + +class TradeAction(str, Enum): + """Trade action type""" + + DETECTED = "detected" + COPIED = "copied" + SKIPPED = "skipped" + FAILED = "failed" + + +@dataclass +class CopyStats: + """Statistics for a copytrading session""" + + trades_detected: int = 0 + trades_copied: int = 0 + trades_skipped: int = 0 + trades_failed: int = 0 + total_volume: float = 0.0 + start_time: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + + def to_dict(self) -> dict: + """Convert to dictionary""" + return { + "detected": self.trades_detected, + "copied": self.trades_copied, + "skipped": self.trades_skipped, + "failed": self.trades_failed, + "volume": self.total_volume, + } + + +@dataclass +class BotConfig: + """Configuration for the copytrading bot""" + + target_wallet: str + scale_factor: float = 1.0 + poll_interval: float = 5.0 + max_position: float = 100.0 + min_trade_size: float = 1.0 + market_filter: Optional[List[str]] = None + + def __post_init__(self) -> None: + if not self.target_wallet: + raise ValueError("target_wallet is required") + if self.scale_factor <= 0: + raise ValueError("scale_factor must be positive") + if self.poll_interval <= 0: + raise ValueError("poll_interval must be positive") + if self.max_position <= 0: + raise ValueError("max_position must be positive") + + +@dataclass +class TradeInfo: + """Information about a trade""" + + trade_id: str + side: str + size: float + outcome: str + price: float + market_slug: str + condition_id: str + timestamp: datetime + + @property + def side_upper(self) -> str: + """Get uppercase side""" + return self.side.upper() + + @property + def is_buy(self) -> bool: + """Check if this is a buy trade""" + return self.side_upper == "BUY"