Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install pytest pytest-cov beautifulsoup4 Pillow
pip install pytest pytest-cov pytest-asyncio beautifulsoup4 Pillow

- name: Run tests
run: |
Expand Down
2 changes: 1 addition & 1 deletion src/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
Telegram Backup Automation - Main Package
"""

__version__ = "7.2.1"
__version__ = "7.3.0"
51 changes: 51 additions & 0 deletions src/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,26 @@ def create_parser() -> argparse.ArgumentParser:
"--merge", action="store_true", help="Allow importing into a chat that already has messages"
)

# Fill gaps command
fill_gaps_parser = subparsers.add_parser(
"fill-gaps",
help="Detect and fill message gaps from failed backups",
description=(
"Scans backed-up chats for gaps in message ID sequences "
"and recovers skipped messages from Telegram. "
"Gaps are caused by API errors, rate limits, or interruptions "
"during previous backup runs."
),
)
fill_gaps_parser.add_argument("-c", "--chat-id", type=int, help="Fill gaps only for this specific chat ID")
fill_gaps_parser.add_argument(
"-t",
"--threshold",
type=int,
default=None,
help="Minimum gap size to investigate (overrides GAP_THRESHOLD env var)",
)

return parser


Expand Down Expand Up @@ -207,6 +227,35 @@ async def run_list_chats(args) -> int:
return 1


async def run_fill_gaps_cmd(args) -> int:
"""Run fill-gaps command."""
from .config import Config, setup_logging
from .telegram_backup import run_fill_gaps

try:
config = Config()
if args.threshold is not None:
config.gap_threshold = args.threshold
setup_logging(config)

summary = await run_fill_gaps(config, chat_id=args.chat_id)
print("\nGap-fill complete:")
print(f" Chats scanned: {summary['chats_scanned']}")
print(f" Chats with gaps: {summary['chats_with_gaps']}")
print(f" Total gaps found: {summary['total_gaps']}")
print(f" Messages recovered: {summary['total_recovered']}")
if summary["details"]:
for detail in summary["details"]:
print(
f" - {detail['chat_name']} (ID {detail['chat_id']}): "
f"{detail['gaps']} gaps, {detail['recovered']} recovered"
)
return 0
except Exception as e:
print(f"Gap-fill failed: {e}", file=sys.stderr)
return 1


async def run_import(args) -> int:
"""Run import command."""
from .config import Config, setup_logging
Expand Down Expand Up @@ -304,6 +353,8 @@ def main() -> int:
return asyncio.run(run_list_chats(args))
elif args.command == "import":
return asyncio.run(run_import(args))
elif args.command == "fill-gaps":
return asyncio.run(run_fill_gaps_cmd(args))
else:
parser.print_help()
return 0
Expand Down
6 changes: 6 additions & 0 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ def __init__(self):
# Useful for recovering from interrupted backups or deleted media files
self.verify_media = os.getenv("VERIFY_MEDIA", "false").lower() == "true"

# Gap-fill mode: detect and recover skipped messages
# When enabled, runs after each scheduled backup to find and fill gaps
# in message ID sequences caused by API errors or interruptions
self.fill_gaps = os.getenv("FILL_GAPS", "false").lower() == "true"
self.gap_threshold = int(os.getenv("GAP_THRESHOLD", "50"))

# Real-time listener mode
# When enabled, runs a background listener that catches message edits and deletions
# in real-time instead of batch-checking on each backup run
Expand Down
43 changes: 43 additions & 0 deletions src/db/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,49 @@ async def update_sync_status(self, chat_id: int, last_message_id: int, message_c
await session.execute(stmt)
await session.commit()

# ========== Gap Detection ==========

async def detect_message_gaps(self, chat_id: int, threshold: int = 50) -> list[tuple[int, int, int]]:
"""Detect gaps in message ID sequences for a chat.

Uses a SQL LAG() window function to find gaps larger than threshold.

Returns:
List of (gap_start_id, gap_end_id, gap_size) tuples where
gap_start is the last message ID before the gap and
gap_end is the first message ID after the gap.
"""
async with self.db_manager.async_session_factory() as session:
result = await session.execute(
text(
"""
SELECT gap_start, gap_end, gap_size FROM (
SELECT
LAG(id) OVER (ORDER BY id) AS gap_start,
id AS gap_end,
id - LAG(id) OVER (ORDER BY id) AS gap_size
FROM messages
WHERE chat_id = :chat_id
) gaps
WHERE gap_size > :threshold
ORDER BY gap_start
"""
),
{"chat_id": chat_id, "threshold": threshold},
)
return [(row[0], row[1], row[2]) for row in result.fetchall()]

async def get_chats_with_messages(self) -> list[int]:
"""Get all chat IDs that exist in the chats table.

Queries the chats table directly instead of scanning the messages table,
which would be extremely slow on large databases.
"""
async with self.db_manager.async_session_factory() as session:
stmt = select(Chat.id)
result = await session.execute(stmt)
return [row[0] for row in result.fetchall()]

# ========== Statistics ==========

async def get_statistics(self) -> dict[str, Any]:
Expand Down
35 changes: 34 additions & 1 deletion src/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,33 @@ async def _run_backup_job(self):
# Run backup using shared client
await run_backup(self.config, client=client)

# Run gap-fill if enabled
gap_fill_ok = True
if self.config.fill_gaps:
try:
from .telegram_backup import run_fill_gaps

logger.info("Running post-backup gap-fill...")
result = await run_fill_gaps(self.config, client=client)
if result.get("errors", 0) > 0:
gap_fill_ok = False
logger.warning(
f"Gap-fill completed with {result['errors']} error(s) "
f"({result['total_recovered']} messages recovered)"
)
except Exception as e:
gap_fill_ok = False
logger.error(f"Gap-fill failed: {e}", exc_info=True)

# Reload tracked chats in listener after backup
# (new chats may have been added)
if self._listener:
await self._listener._load_tracked_chats()

logger.info("Scheduled backup completed successfully")
if gap_fill_ok:
logger.info("Scheduled backup completed successfully")
else:
logger.warning("Scheduled backup completed, but gap-fill had errors")

except Exception as e:
logger.error(f"Scheduled backup failed: {e}", exc_info=True)
Expand Down Expand Up @@ -214,6 +235,18 @@ async def run_forever(self):
await run_backup(self.config, client=self._connection.client)
logger.info("Initial backup completed")

# Run gap-fill if enabled
if self.config.fill_gaps:
try:
from .telegram_backup import run_fill_gaps

logger.info("Running initial gap-fill...")
result = await run_fill_gaps(self.config, client=self._connection.client)
if result.get("errors", 0) > 0:
logger.warning(f"Initial gap-fill completed with {result['errors']} error(s)")
except Exception as e:
logger.error(f"Initial gap-fill failed: {e}", exc_info=True)

# Reload tracked chats in listener after initial backup
if self._listener:
await self._listener._load_tracked_chats()
Expand Down
170 changes: 170 additions & 0 deletions src/telegram_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,142 @@ async def _commit_batch(self, batch_data: list[dict], chat_id: int) -> None:
if reactions_list:
await self.db.insert_reactions(msg["id"], chat_id, reactions_list)

async def _fill_gap_range(self, entity, chat_id: int, gap_start: int, gap_end: int) -> int:
"""
Fetch and store messages for a single gap range.

Args:
entity: Telegram entity for the chat
chat_id: Chat identifier
gap_start: Last message ID before the gap
gap_end: First message ID after the gap

Returns:
Number of recovered messages
"""
batch_data: list[dict] = []
batch_size = self.config.batch_size
recovered = 0

async for message in self.client.iter_messages(entity, min_id=gap_start, max_id=gap_end, reverse=True):
msg_data = await self._process_message(message, chat_id)
batch_data.append(msg_data)

if len(batch_data) >= batch_size:
await self._commit_batch(batch_data, chat_id)
recovered += len(batch_data)
batch_data = []

# Flush remaining messages
if batch_data:
await self._commit_batch(batch_data, chat_id)
recovered += len(batch_data)

return recovered

async def _fill_gaps(self, chat_id: int | None = None) -> dict:
"""
Detect and fill gaps in message ID sequences.

Scans chats for missing message ID ranges and fetches them from Telegram.

Args:
chat_id: If provided, scan only this chat. Otherwise scan all chats.

Returns:
Summary dict with gap-fill statistics.
"""
threshold = self.config.gap_threshold
summary = {
"chats_scanned": 0,
"chats_with_gaps": 0,
"total_gaps": 0,
"total_recovered": 0,
"errors": 0,
"details": [],
}

if chat_id is not None:
chat_ids = [chat_id]
else:
# Only scan chats that current config would back up (respects
# CHAT_IDS whitelist, CHAT_TYPES, and all exclude lists)
all_chat_ids = await self.db.get_chats_with_messages()
chat_ids = []
for cid in all_chat_ids:
chat_info = await self.db.get_chat_by_id(cid)
if not chat_info:
continue
ctype = chat_info.get("type", "")
is_user = ctype == "private"
is_group = ctype in ("group", "supergroup")
is_channel = ctype == "channel"
is_bot = ctype == "bot"
if self.config.should_backup_chat(cid, is_user, is_group, is_channel, is_bot):
chat_ids.append(cid)

logger.info(f"Gap-fill: scanning {len(chat_ids)} chat(s) with threshold={threshold}")

for cid in chat_ids:
summary["chats_scanned"] += 1

try:
entity = await self.client.get_entity(cid)
except (ChannelPrivateError, ChatForbiddenError, UserBannedInChannelError) as e:
logger.warning(f"Gap-fill: skipping chat {cid} (no access): {e.__class__.__name__}")
continue
except Exception as e:
logger.error(f"Gap-fill: failed to get entity for chat {cid}: {e}")
summary["errors"] += 1
continue

chat_name = self._get_chat_name(entity)

try:
gaps = await self.db.detect_message_gaps(cid, threshold)
except Exception as e:
logger.error(f"Gap-fill: failed to detect gaps for {chat_name} ({cid}): {e}")
summary["errors"] += 1
continue

if not gaps:
continue

summary["chats_with_gaps"] += 1
chat_recovered = 0

logger.info(f"Gap-fill: {chat_name} (ID: {cid}) has {len(gaps)} gap(s)")

for gap_start, gap_end, gap_size in gaps:
logger.info(f" → Filling gap: {gap_start}..{gap_end} (size {gap_size})")
try:
recovered = await self._fill_gap_range(entity, cid, gap_start, gap_end)
chat_recovered += recovered
logger.info(f" Recovered {recovered} messages")
except Exception as e:
logger.error(f" Error filling gap {gap_start}..{gap_end}: {e}")
summary["errors"] += 1

summary["total_gaps"] += len(gaps)
summary["total_recovered"] += chat_recovered
summary["details"].append(
{
"chat_id": cid,
"chat_name": chat_name,
"gaps": len(gaps),
"recovered": chat_recovered,
}
)

status = "complete" if summary["errors"] == 0 else "complete with errors"
logger.info(
f"Gap-fill {status}: {summary['chats_scanned']} chats scanned, "
f"{summary['total_gaps']} gaps found, {summary['total_recovered']} messages recovered"
+ (f", {summary['errors']} error(s)" if summary["errors"] else "")
)

return summary

async def _sync_deletions_and_edits(self, chat_id: int, entity):
"""
Sync deletions and edits for existing messages in the database.
Expand Down Expand Up @@ -1640,6 +1776,40 @@ async def run_backup(config: Config, client: TelegramClient | None = None):
await backup.db.close()


async def run_fill_gaps(config: Config, client: TelegramClient | None = None, chat_id: int | None = None) -> dict:
"""
Run gap-fill to recover missing messages in backed-up chats.

Args:
config: Configuration object
client: Optional existing TelegramClient to use (for shared connection).
If provided, the operation will use this client instead of creating
its own, avoiding session file lock conflicts.
chat_id: If provided, scan only this chat. Otherwise scan all chats.

Returns:
Summary dict with gap-fill statistics.
"""
backup = await TelegramBackup.create(config, client=client)
try:
await backup.connect()
summary = await backup._fill_gaps(chat_id=chat_id)

# Refresh cached stats if messages were recovered so the viewer
# doesn't show stale totals until the next scheduled recalculation
if summary["total_recovered"] > 0:
try:
await backup.db.calculate_and_store_statistics()
logger.info("Stats recalculated after gap-fill recovery")
except Exception as e:
logger.warning(f"Failed to recalculate stats after gap-fill: {e}")

return summary
finally:
await backup.disconnect()
await backup.db.close()


def main():
"""Main entry point for CLI."""
import asyncio
Expand Down
Loading
Loading