diff --git a/pyproject.toml b/pyproject.toml index 9d7bb3d..3e05965 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "x-cli" -version = "0.1.0" +version = "0.2.0" description = "CLI for X/Twitter API v2" requires-python = ">=3.11" dependencies = [ diff --git a/src/x_cli/api.py b/src/x_cli/api.py index 62282d9..ba3afc2 100644 --- a/src/x_cli/api.py +++ b/src/x_cli/api.py @@ -11,6 +11,14 @@ API_BASE = "https://api.x.com/2" +class RateLimitError(RuntimeError): + """Raised when the API returns HTTP 429.""" + + def __init__(self, reset_at: str) -> None: + self.reset_at = reset_at + super().__init__(f"Rate limited. Resets at {reset_at}.") + + class XApiClient: def __init__(self, creds: Credentials) -> None: self.creds = creds @@ -37,7 +45,7 @@ def _oauth_request(self, method: str, url: str, json_body: dict | None = None) - def _handle(self, resp: httpx.Response) -> dict[str, Any]: if resp.status_code == 429: reset = resp.headers.get("x-rate-limit-reset", "unknown") - raise RuntimeError(f"Rate limited. Resets at {reset}.") + raise RateLimitError(reset) data = resp.json() if not resp.is_success: errors = data.get("errors", []) @@ -108,7 +116,9 @@ def get_user(self, username: str) -> dict[str, Any]: fields = "user.fields=created_at,description,public_metrics,verified,profile_image_url,url,location,pinned_tweet_id" return self._bearer_get(f"{API_BASE}/users/by/username/{username}?{fields}") - def get_timeline(self, user_id: str, max_results: int = 10) -> dict[str, Any]: + def get_timeline( + self, user_id: str, max_results: int = 10, since_id: str | None = None + ) -> dict[str, Any]: max_results = max(5, min(max_results, 100)) params = { "max_results": str(max_results), @@ -117,6 +127,8 @@ def get_timeline(self, user_id: str, max_results: int = 10) -> dict[str, Any]: "user.fields": "name,username,verified", "media.fields": "url,preview_image_url,type", } + if since_id: + params["since_id"] = since_id resp = self._http.get( f"{API_BASE}/users/{user_id}/tweets", params=params, diff --git a/src/x_cli/cli.py b/src/x_cli/cli.py index 7816e3e..3a2929a 100644 --- a/src/x_cli/cli.py +++ b/src/x_cli/cli.py @@ -9,6 +9,7 @@ from .auth import load_credentials from .formatters import format_output from .utils import parse_tweet_id, strip_at +from .watch import WatchTarget, watch_loop class State: @@ -254,6 +255,50 @@ def retweet(state, id_or_url): state.output(data, "Retweeted") +# ============================================================ +# watch +# ============================================================ + +@cli.command("watch") +@click.argument("usernames", nargs=-1, required=True) +@click.option("--interval", "-i", default=60, type=int, help="Poll interval in seconds (default: 60)") +@click.option("--filter", "-f", "filters", multiple=True, help="Only show tweets containing keyword (repeatable)") +@click.option("--notify", "-n", is_flag=True, default=False, help="Desktop notifications for new tweets") +@click.option("--max", "max_tweets", default=None, type=int, help="Stop after N tweets") +@pass_state +def watch(state, usernames, interval, filters, notify, max_tweets): + """Watch accounts for new tweets in real-time. + + \b + Examples: + x-cli watch CheddarFlow + x-cli watch CheddarFlow unusual_whales --interval 30 + x-cli watch CheddarFlow -f "$NVDA" -f "$TSLA" + x-cli watch CheddarFlow --notify --max 50 + """ + # Resolve usernames to user IDs + targets = [] + for username in usernames: + uname = strip_at(username) + try: + user_data = state.client.get_user(uname) + uid = user_data["data"]["id"] + targets.append(WatchTarget(username=uname, user_id=uid)) + except RuntimeError as e: + raise click.ClickException(f"Could not find user @{uname}: {e}") + + watch_loop( + client=state.client, + targets=targets, + interval=interval, + filters=list(filters), + notify=notify, + max_tweets=max_tweets, + mode=state.mode, + verbose=state.verbose, + ) + + def main(): cli() diff --git a/src/x_cli/watch.py b/src/x_cli/watch.py new file mode 100644 index 0000000..cceb67a --- /dev/null +++ b/src/x_cli/watch.py @@ -0,0 +1,168 @@ +"""Watch mode: poll accounts for new tweets in real-time.""" + +from __future__ import annotations + +import platform +import subprocess +import sys +import time +from dataclasses import dataclass, field +from typing import Any + +from rich.console import Console + +from .api import RateLimitError, XApiClient +from .formatters import format_output + +_stderr = Console(stderr=True) + + +@dataclass +class WatchTarget: + """A user being watched.""" + + username: str + user_id: str + last_seen_id: str | None = None + + +@dataclass +class WatchStats: + """Accumulated stats for the session.""" + + tweets_seen: int = 0 + polls: int = 0 + per_user: dict[str, int] = field(default_factory=dict) + + +def _matches_filters(text: str, filters: list[str]) -> bool: + """Return True if tweet text matches any of the filter keywords (case-insensitive).""" + if not filters: + return True + text_lower = text.lower() + return any(f.lower() in text_lower for f in filters) + + +def _notify(username: str, text: str) -> None: + """Send a desktop notification (macOS only, silent fail elsewhere).""" + if platform.system() != "Darwin": + print("\a", end="", flush=True) # terminal bell fallback + return + preview = text[:100].replace('"', '\\"').replace("\n", " ") + script = f'display notification "{preview}" with title "x-cli: @{username}"' + try: + subprocess.run(["osascript", "-e", script], capture_output=True, timeout=5) + except Exception: + pass + + +def _seed_last_seen(client: XApiClient, target: WatchTarget) -> None: + """Fetch the most recent tweet ID so we only show new ones.""" + try: + data = client.get_timeline(target.user_id, max_results=5) + tweets = data.get("data", []) + if tweets: + target.last_seen_id = tweets[0]["id"] + except RuntimeError: + pass + + +def _print_summary(stats: WatchStats, targets: list[WatchTarget]) -> None: + _stderr.print() + _stderr.print("[bold]Watch session summary[/bold]") + _stderr.print(f" Polls: {stats.polls}") + _stderr.print(f" Tweets seen: {stats.tweets_seen}") + for t in targets: + count = stats.per_user.get(t.username, 0) + _stderr.print(f" @{t.username}: {count} new tweets") + + +def watch_loop( + client: XApiClient, + targets: list[WatchTarget], + interval: int, + filters: list[str], + notify: bool, + max_tweets: int | None, + mode: str, + verbose: bool, +) -> None: + """Main polling loop. Runs until Ctrl+C or max_tweets reached.""" + stats = WatchStats() + + # Seed last_seen_id for each target so we skip existing tweets + _stderr.print("[dim]Initializing watch...[/dim]") + for target in targets: + _seed_last_seen(client, target) + _stderr.print(f"[dim] Tracking @{target.username} (id={target.user_id})[/dim]") + + usernames = ", ".join(f"@{t.username}" for t in targets) + _stderr.print(f"[bold green]Watching {usernames}[/bold green] (every {interval}s, Ctrl+C to stop)") + if filters: + _stderr.print(f"[dim]Filters: {', '.join(filters)}[/dim]") + _stderr.print() + + try: + while True: + for target in targets: + try: + data = client.get_timeline( + target.user_id, + max_results=10, + since_id=target.last_seen_id, + ) + except RateLimitError as e: + reset_ts = e.reset_at + try: + wait = max(0, int(reset_ts) - int(time.time())) + 5 + except ValueError: + wait = 60 + _stderr.print(f"[yellow]Rate limited. Waiting {wait}s...[/yellow]") + time.sleep(wait) + continue + except RuntimeError as exc: + _stderr.print(f"[red]Error for @{target.username}: {exc}[/red]") + continue + + tweets = data.get("data", []) + if not tweets: + continue + + includes = data.get("includes", {}) + + # Tweets come newest-first; process oldest-first for chronological output + for tweet in reversed(tweets): + text = tweet.get("text", "") + note = tweet.get("note_tweet", {}) + if note and note.get("text"): + text = note["text"] + + if not _matches_filters(text, filters): + continue + + # Build a single-tweet payload for the formatter + payload: dict[str, Any] = { + "data": tweet, + "includes": includes, + } + format_output(payload, mode, f"@{target.username}", verbose=verbose) + + if notify: + _notify(target.username, text) + + stats.tweets_seen += 1 + stats.per_user[target.username] = stats.per_user.get(target.username, 0) + 1 + + if max_tweets and stats.tweets_seen >= max_tweets: + _stderr.print(f"\n[bold]Reached --max {max_tweets} tweets.[/bold]") + _print_summary(stats, targets) + return + + # Update cursor to newest tweet + target.last_seen_id = tweets[0]["id"] + + stats.polls += 1 + time.sleep(interval) + + except KeyboardInterrupt: + _print_summary(stats, targets) diff --git a/tests/test_watch.py b/tests/test_watch.py new file mode 100644 index 0000000..ac1a93b --- /dev/null +++ b/tests/test_watch.py @@ -0,0 +1,59 @@ +"""Tests for watch module.""" + +from x_cli.watch import _matches_filters, WatchTarget, WatchStats + + +class TestMatchesFilters: + def test_no_filters_matches_everything(self): + assert _matches_filters("any text here", []) is True + + def test_single_filter_match(self): + assert _matches_filters("Big $NVDA sweep today", ["$NVDA"]) is True + + def test_single_filter_no_match(self): + assert _matches_filters("Big $TSLA sweep today", ["$NVDA"]) is False + + def test_case_insensitive(self): + assert _matches_filters("nvidia is pumping", ["NVIDIA"]) is True + assert _matches_filters("NVIDIA is pumping", ["nvidia"]) is True + + def test_multiple_filters_any_match(self): + assert _matches_filters("$NVDA calls", ["$TSLA", "$NVDA"]) is True + + def test_multiple_filters_none_match(self): + assert _matches_filters("$AAPL calls", ["$TSLA", "$NVDA"]) is False + + def test_partial_match(self): + assert _matches_filters("Something about flow", ["flow"]) is True + + def test_empty_text(self): + assert _matches_filters("", ["keyword"]) is False + assert _matches_filters("", []) is True + + +class TestWatchTarget: + def test_defaults(self): + t = WatchTarget(username="CheddarFlow", user_id="123") + assert t.username == "CheddarFlow" + assert t.user_id == "123" + assert t.last_seen_id is None + + def test_with_last_seen(self): + t = WatchTarget(username="test", user_id="456", last_seen_id="789") + assert t.last_seen_id == "789" + + +class TestWatchStats: + def test_defaults(self): + s = WatchStats() + assert s.tweets_seen == 0 + assert s.polls == 0 + assert s.per_user == {} + + def test_accumulate(self): + s = WatchStats() + s.tweets_seen += 3 + s.polls += 1 + s.per_user["CheddarFlow"] = 3 + assert s.tweets_seen == 3 + assert s.per_user["CheddarFlow"] == 3