Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "x-cli"
version = "0.1.0"
version = "0.2.0"
description = "CLI for X/Twitter API v2"
requires-python = ">=3.11"
dependencies = [
Expand Down
16 changes: 14 additions & 2 deletions src/x_cli/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@
API_BASE = "https://api.x.com/2"


class RateLimitError(RuntimeError):
"""Raised when the API returns HTTP 429."""

def __init__(self, reset_at: str) -> None:
self.reset_at = reset_at
super().__init__(f"Rate limited. Resets at {reset_at}.")


class XApiClient:
def __init__(self, creds: Credentials) -> None:
self.creds = creds
Expand All @@ -37,7 +45,7 @@ def _oauth_request(self, method: str, url: str, json_body: dict | None = None) -
def _handle(self, resp: httpx.Response) -> dict[str, Any]:
if resp.status_code == 429:
reset = resp.headers.get("x-rate-limit-reset", "unknown")
raise RuntimeError(f"Rate limited. Resets at {reset}.")
raise RateLimitError(reset)
data = resp.json()
if not resp.is_success:
errors = data.get("errors", [])
Expand Down Expand Up @@ -108,7 +116,9 @@ def get_user(self, username: str) -> dict[str, Any]:
fields = "user.fields=created_at,description,public_metrics,verified,profile_image_url,url,location,pinned_tweet_id"
return self._bearer_get(f"{API_BASE}/users/by/username/{username}?{fields}")

def get_timeline(self, user_id: str, max_results: int = 10) -> dict[str, Any]:
def get_timeline(
self, user_id: str, max_results: int = 10, since_id: str | None = None
) -> dict[str, Any]:
max_results = max(5, min(max_results, 100))
params = {
"max_results": str(max_results),
Expand All @@ -117,6 +127,8 @@ def get_timeline(self, user_id: str, max_results: int = 10) -> dict[str, Any]:
"user.fields": "name,username,verified",
"media.fields": "url,preview_image_url,type",
}
if since_id:
params["since_id"] = since_id
resp = self._http.get(
f"{API_BASE}/users/{user_id}/tweets",
params=params,
Expand Down
45 changes: 45 additions & 0 deletions src/x_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .auth import load_credentials
from .formatters import format_output
from .utils import parse_tweet_id, strip_at
from .watch import WatchTarget, watch_loop


class State:
Expand Down Expand Up @@ -254,6 +255,50 @@ def retweet(state, id_or_url):
state.output(data, "Retweeted")


# ============================================================
# watch
# ============================================================

@cli.command("watch")
@click.argument("usernames", nargs=-1, required=True)
@click.option("--interval", "-i", default=60, type=int, help="Poll interval in seconds (default: 60)")
@click.option("--filter", "-f", "filters", multiple=True, help="Only show tweets containing keyword (repeatable)")
@click.option("--notify", "-n", is_flag=True, default=False, help="Desktop notifications for new tweets")
@click.option("--max", "max_tweets", default=None, type=int, help="Stop after N tweets")
@pass_state
def watch(state, usernames, interval, filters, notify, max_tweets):
"""Watch accounts for new tweets in real-time.

\b
Examples:
x-cli watch CheddarFlow
x-cli watch CheddarFlow unusual_whales --interval 30
x-cli watch CheddarFlow -f "$NVDA" -f "$TSLA"
x-cli watch CheddarFlow --notify --max 50
"""
# Resolve usernames to user IDs
targets = []
for username in usernames:
uname = strip_at(username)
try:
user_data = state.client.get_user(uname)
uid = user_data["data"]["id"]
targets.append(WatchTarget(username=uname, user_id=uid))
except RuntimeError as e:
raise click.ClickException(f"Could not find user @{uname}: {e}")

watch_loop(
client=state.client,
targets=targets,
interval=interval,
filters=list(filters),
notify=notify,
max_tweets=max_tweets,
mode=state.mode,
verbose=state.verbose,
)


def main():
cli()

Expand Down
168 changes: 168 additions & 0 deletions src/x_cli/watch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
"""Watch mode: poll accounts for new tweets in real-time."""

from __future__ import annotations

import platform
import subprocess
import sys
import time
from dataclasses import dataclass, field
from typing import Any

from rich.console import Console

from .api import RateLimitError, XApiClient
from .formatters import format_output

_stderr = Console(stderr=True)


@dataclass
class WatchTarget:
"""A user being watched."""

username: str
user_id: str
last_seen_id: str | None = None


@dataclass
class WatchStats:
"""Accumulated stats for the session."""

tweets_seen: int = 0
polls: int = 0
per_user: dict[str, int] = field(default_factory=dict)


def _matches_filters(text: str, filters: list[str]) -> bool:
"""Return True if tweet text matches any of the filter keywords (case-insensitive)."""
if not filters:
return True
text_lower = text.lower()
return any(f.lower() in text_lower for f in filters)


def _notify(username: str, text: str) -> None:
"""Send a desktop notification (macOS only, silent fail elsewhere)."""
if platform.system() != "Darwin":
print("\a", end="", flush=True) # terminal bell fallback
return
preview = text[:100].replace('"', '\\"').replace("\n", " ")
script = f'display notification "{preview}" with title "x-cli: @{username}"'
try:
subprocess.run(["osascript", "-e", script], capture_output=True, timeout=5)
except Exception:
pass


def _seed_last_seen(client: XApiClient, target: WatchTarget) -> None:
"""Fetch the most recent tweet ID so we only show new ones."""
try:
data = client.get_timeline(target.user_id, max_results=5)
tweets = data.get("data", [])
if tweets:
target.last_seen_id = tweets[0]["id"]
except RuntimeError:
pass


def _print_summary(stats: WatchStats, targets: list[WatchTarget]) -> None:
_stderr.print()
_stderr.print("[bold]Watch session summary[/bold]")
_stderr.print(f" Polls: {stats.polls}")
_stderr.print(f" Tweets seen: {stats.tweets_seen}")
for t in targets:
count = stats.per_user.get(t.username, 0)
_stderr.print(f" @{t.username}: {count} new tweets")


def watch_loop(
client: XApiClient,
targets: list[WatchTarget],
interval: int,
filters: list[str],
notify: bool,
max_tweets: int | None,
mode: str,
verbose: bool,
) -> None:
"""Main polling loop. Runs until Ctrl+C or max_tweets reached."""
stats = WatchStats()

# Seed last_seen_id for each target so we skip existing tweets
_stderr.print("[dim]Initializing watch...[/dim]")
for target in targets:
_seed_last_seen(client, target)
_stderr.print(f"[dim] Tracking @{target.username} (id={target.user_id})[/dim]")

usernames = ", ".join(f"@{t.username}" for t in targets)
_stderr.print(f"[bold green]Watching {usernames}[/bold green] (every {interval}s, Ctrl+C to stop)")
if filters:
_stderr.print(f"[dim]Filters: {', '.join(filters)}[/dim]")
_stderr.print()

try:
while True:
for target in targets:
try:
data = client.get_timeline(
target.user_id,
max_results=10,
since_id=target.last_seen_id,
)
except RateLimitError as e:
reset_ts = e.reset_at
try:
wait = max(0, int(reset_ts) - int(time.time())) + 5
except ValueError:
wait = 60
_stderr.print(f"[yellow]Rate limited. Waiting {wait}s...[/yellow]")
time.sleep(wait)
continue
except RuntimeError as exc:
_stderr.print(f"[red]Error for @{target.username}: {exc}[/red]")
continue

tweets = data.get("data", [])
if not tweets:
continue

includes = data.get("includes", {})

# Tweets come newest-first; process oldest-first for chronological output
for tweet in reversed(tweets):
text = tweet.get("text", "")
note = tweet.get("note_tweet", {})
if note and note.get("text"):
text = note["text"]

if not _matches_filters(text, filters):
continue

# Build a single-tweet payload for the formatter
payload: dict[str, Any] = {
"data": tweet,
"includes": includes,
}
format_output(payload, mode, f"@{target.username}", verbose=verbose)

if notify:
_notify(target.username, text)

stats.tweets_seen += 1
stats.per_user[target.username] = stats.per_user.get(target.username, 0) + 1

if max_tweets and stats.tweets_seen >= max_tweets:
_stderr.print(f"\n[bold]Reached --max {max_tweets} tweets.[/bold]")
_print_summary(stats, targets)
return

# Update cursor to newest tweet
target.last_seen_id = tweets[0]["id"]

stats.polls += 1
time.sleep(interval)

except KeyboardInterrupt:
_print_summary(stats, targets)
59 changes: 59 additions & 0 deletions tests/test_watch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""Tests for watch module."""

from x_cli.watch import _matches_filters, WatchTarget, WatchStats


class TestMatchesFilters:
def test_no_filters_matches_everything(self):
assert _matches_filters("any text here", []) is True

def test_single_filter_match(self):
assert _matches_filters("Big $NVDA sweep today", ["$NVDA"]) is True

def test_single_filter_no_match(self):
assert _matches_filters("Big $TSLA sweep today", ["$NVDA"]) is False

def test_case_insensitive(self):
assert _matches_filters("nvidia is pumping", ["NVIDIA"]) is True
assert _matches_filters("NVIDIA is pumping", ["nvidia"]) is True

def test_multiple_filters_any_match(self):
assert _matches_filters("$NVDA calls", ["$TSLA", "$NVDA"]) is True

def test_multiple_filters_none_match(self):
assert _matches_filters("$AAPL calls", ["$TSLA", "$NVDA"]) is False

def test_partial_match(self):
assert _matches_filters("Something about flow", ["flow"]) is True

def test_empty_text(self):
assert _matches_filters("", ["keyword"]) is False
assert _matches_filters("", []) is True


class TestWatchTarget:
def test_defaults(self):
t = WatchTarget(username="CheddarFlow", user_id="123")
assert t.username == "CheddarFlow"
assert t.user_id == "123"
assert t.last_seen_id is None

def test_with_last_seen(self):
t = WatchTarget(username="test", user_id="456", last_seen_id="789")
assert t.last_seen_id == "789"


class TestWatchStats:
def test_defaults(self):
s = WatchStats()
assert s.tweets_seen == 0
assert s.polls == 0
assert s.per_user == {}

def test_accumulate(self):
s = WatchStats()
s.tweets_seen += 3
s.polls += 1
s.per_user["CheddarFlow"] = 3
assert s.tweets_seen == 3
assert s.per_user["CheddarFlow"] == 3