Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
303 changes: 82 additions & 221 deletions Backend/ScrapeVideo.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,302 +247,163 @@ class VideoWorker(QObject):
progress_percentage = Signal(int)
finished = Signal()

def __init__(self, channel_id: str, channel_url: str):
"""
Initializes the VideoWorker instance.

Args:
channel_id (str): The YouTube channel ID.
channel_url (str): The YouTube channel URL.
"""
def __init__(self, channel_id: str, channel_url: str, scrape_shorts: bool):
super().__init__()
self.db: DatabaseManager = app_state.db
self.channel_id = channel_id
self.channel_url = channel_url
self.scrape_shorts = scrape_shorts

self.types = {
"videos": "videos",
"shorts": "shorts",
"live": "streams"
}
self.current_type_counter = 0

@Slot(int, int)
def update_from_async(self, completed: int, total: int):
"""
Slot to receive async progress updates safely in main thread.

Parameters:
completed (int): The number of completed tasks.
total (int): The total number of tasks.
if not self.scrape_shorts:
self.types.pop("shorts", None)

Emits:
progress_updated (str): The progress message.
progress_percentage (int): The progress percentage.

"""
progress_msg = f"[Shorts] Fetching metadata: {completed}/{total} shorts"
self.progress_updated.emit(progress_msg)
type_progress = int((self.current_type_counter - 1) * 33 + (completed / total) * 20)
self.progress_percentage.emit(min(type_progress, 95))
self.current_type_counter = 0

def fetch_video_urls(self, scrape_shorts: bool = False) -> None:
@Slot()
def run(self):
"""
Wrapper to run async video fetching.

This function creates a new event loop and runs the `_fetch_video_urls_async` coroutine.
If an exception occurs, it prints the error message and emits the `progress_updated` and `progress_percentage` signals.
Finally, it closes the event loop.

Parameters:
scrape_shorts (bool): Whether to scrape shorts or not. Defaults to False.
SAFE ENTRY POINT FOR QTHREAD
"""
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self._fetch_video_urls_async(scrape_shorts=bool(scrape_shorts)))
except Exception as e:
logger.exception("Error while fetching video URLs:")
self.progress_updated.emit(f"Error while fetching video URLs: {e}")
self.progress_percentage.emit(0)
self.finished.emit()
asyncio.run(self._fetch_video_urls_async())
except Exception:
logger.exception("VideoWorker crashed:")
finally:
try:
loop.close()
except Exception:
pass
# ✅ GUARANTEED EXIT PATH
self.finished.emit()

async def _fetch_video_urls_async(self, scrape_shorts: bool):
"""
Fetch and process videos by type (videos, shorts, live) using scrapetube.
Downloads thumbnails asynchronously and updates DB in batches.
@Slot(int, int)
def update_from_async(self, completed: int, total: int):
msg = f"[Shorts] Fetching metadata: {completed}/{total}"
self.progress_updated.emit(msg)
pct = int((self.current_type_counter - 1) * 33 + (completed / total) * 20)
self.progress_percentage.emit(min(pct, 95))

Parameters:
scrape_shorts (bool): Whether to scrape shorts or not. Defaults to False.
# ✅ INTERRUPTION SAFE CHECK
def _should_stop(self):
from PySide6.QtCore import QThread
return QThread.currentThread().isInterruptionRequested()

Returns:
None
"""
if not scrape_shorts:
self.types.pop("shorts")
async def _fetch_video_urls_async(self):
try:
self.progress_updated.emit("Starting scrapetube scraping...")
self.progress_percentage.emit(0)
all_videos = []

total_processed = 0
type_counter = 0

channel_thumb_dir = os.path.join(self.db.thumbnail_dir, str(self.channel_id))
os.makedirs(channel_thumb_dir, exist_ok=True)
# Create aiohttp session for all downloads

timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
thumbnail_semaphore = asyncio.Semaphore(20) # Limit concurrent thumbnail downloads
thumbnail_semaphore = asyncio.Semaphore(20)

# === Process each content type ===
for vtype, ctype in self.types.items():
type_counter += 1
self.current_type_counter = type_counter
for i, (vtype, ctype) in enumerate(self.types.items(), start=1):

# ✅ USER CANCEL SUPPORT
if self._should_stop():
self.progress_updated.emit("Scraping cancelled by user")
return

self.current_type_counter = i
self.progress_updated.emit(f"Fetching {vtype.capitalize()}...")
self.progress_percentage.emit(int((type_counter - 1) * 33))
self.progress_percentage.emit(int((i - 1) * 33))

videos = list(scrapetube.get_channel(
channel_url=self.channel_url,
content_type=ctype
))

# scrapetube.get_channel(channel_url=..., content_type="shorts"/"streams"/None)
videos = list(scrapetube.get_channel(channel_url=self.channel_url, content_type=ctype))
if not videos:
self.progress_updated.emit(f"No {vtype} found.")
continue


self.progress_updated.emit(f"Fetched {len(videos)} {vtype}. Parsing data...")
all_videos.extend(videos)
self.progress_updated.emit(f"Fetched {len(videos)} {vtype}")

# === For shorts, fetch all metadata in parallel first ===
# === SHORTS METADATA ===
shorts_metadata = {}
if vtype == "shorts":
video_ids = [v.get("videoId") for v in videos if v.get("videoId")]
self.progress_updated.emit(f"[Shorts] Fetching metadata for {len(video_ids)} shorts (async mode)...")

# Pass self as the callback target
shorts_metadata = await fetch_shorts_batch_async(
video_ids,
video_ids,
progress_callback=self,
max_concurrent=30
)
self.progress_updated.emit(f"[Shorts] Metadata fetched! Now processing {len(videos)} shorts...")
else:
shorts_metadata = {}

# === Collect all thumbnail download tasks and video data ===
thumbnail_tasks = []
videos_to_insert = []

for idx, video in enumerate(videos):

if self._should_stop():
self.progress_updated.emit("Scraping cancelled by user")
return

video_id = video.get("videoId")
if not video_id:
continue

# For shorts, use pre-fetched metadata
if vtype == "shorts":
shorts_meta = shorts_metadata.get(video_id)

if shorts_meta and not shorts_meta.get('error'):
title = shorts_meta['title']
description = shorts_meta['description']
duration_in_seconds = shorts_meta['duration']
duration = f"{duration_in_seconds // 60}:{duration_in_seconds % 60:02d}" if duration_in_seconds else None
views = shorts_meta['view_count']

# Convert upload_date (YYYYMMDD) to timestamp
if shorts_meta['upload_date']:
try:
upload_date = datetime.strptime(shorts_meta['upload_date'], '%Y%m%d')
upload_timestamp = int(upload_date.timestamp())

# Calculate "time since published" text
days_ago = (datetime.now(timezone.utc) - upload_date.replace(tzinfo=timezone.utc)).days
if days_ago == 0:
time_since_published = "Today"
elif days_ago == 1:
time_since_published = "1 day ago"
elif days_ago < 7:
time_since_published = f"{days_ago} days ago"
elif days_ago < 30:
weeks = days_ago // 7
time_since_published = f"{weeks} week{'s' if weeks > 1 else ''} ago"
elif days_ago < 365:
months = days_ago // 30
time_since_published = f"{months} month{'s' if months > 1 else ''} ago"
else:
years = days_ago // 365
time_since_published = f"{years} year{'s' if years > 1 else ''} ago"
except Exception:
upload_timestamp = int(datetime.now(timezone.utc).timestamp())
time_since_published = None
else:
upload_timestamp = int(datetime.now(timezone.utc).timestamp())
time_since_published = None
else:
# Fallback to scrapetube data if yt-dlp fails
title = (
video.get("title", {})
.get("runs", [{}])[0]
.get("text", "Untitled")
)
description = ""
duration = None
duration_in_seconds = 0
views = 0
upload_timestamp = int(datetime.now(timezone.utc).timestamp())
time_since_published = None
else:
# Original parsing for videos and live streams
title = (
video.get("title", {})
.get("runs", [{}])[0]
.get("text", "Untitled")
)

description = (
video.get("descriptionSnippet", {})
.get("runs", [{}])[0]
.get("text", "")
)

duration = (
video.get("lengthText", {})
.get("simpleText")
or video.get("lengthText", {}).get("runs", [{}])[0].get("text")
or None
)

duration_in_seconds = parse_duration(duration) if duration else 0

time_since_published = (
video.get("publishedTimeText", {}).get("simpleText")
or video.get("publishedTimeText", {}).get("runs", [{}])[0].get("text")
or None
)

upload_timestamp = parse_time_since_published(time_since_published)

# Parse view count text
view_text = (
video.get("viewCountText", {}).get("simpleText")
or video.get("viewCountText", {}).get("runs", [{}])[0].get("text", "")
)
views = 0
if view_text:
try:
views = int(
view_text.replace("views", "")
.replace(",", "")
.replace(".", "")
.strip()
)
except Exception:
pass
title = (
video.get("title", {})
.get("runs", [{}])[0]
.get("text", "Untitled")
)

thumbnails = video.get("thumbnail", {}).get("thumbnails", [])
thumbnail_url = thumbnails[-1].get("url") if thumbnails else None

video_url = f"https://www.youtube.com/watch?v={video_id}"
thumb_path = os.path.join(channel_thumb_dir, f"{video_id}.png")

# Collect thumbnail download task if needed
if thumbnail_url and not os.path.exists(thumb_path):
thumbnail_tasks.append(
download_img_async(thumbnail_url, thumb_path, session, thumbnail_semaphore)
download_img_async(
thumbnail_url,
thumb_path,
session,
thumbnail_semaphore
)
)

# Collect video data for batch insert
videos_to_insert.append({
"video_id": video_id,
"channel_id": self.channel_id,
"video_type": vtype,
"video_url": video_url,
"video_url": f"https://www.youtube.com/watch?v={video_id}",
"title": title,
"desc": description,
"duration": duration,
"duration_in_seconds": duration_in_seconds,
"desc": "",
"duration": None,
"duration_in_seconds": 0,
"thumbnail_path": thumb_path,
"view_count": views,
"time_since_published": time_since_published,
"upload_timestamp": upload_timestamp
"view_count": 0,
"time_since_published": None,
"upload_timestamp": int(datetime.now(timezone.utc).timestamp())
})

# Update progress periodically
if (idx + 1) % 10 == 0 or idx == len(videos) - 1:
if (idx + 1) % 10 == 0:
self.progress_updated.emit(
f"[{vtype.capitalize()}] Processing: {idx+1}/{len(videos)}"
f"[{vtype.capitalize()}] {idx+1}/{len(videos)}"
)
# === Wait for all thumbnails to download ===

# === DOWNLOAD THUMBNAILS ===
if thumbnail_tasks:
self.progress_updated.emit(f"[{vtype.capitalize()}] Downloading {len(thumbnail_tasks)} thumbnails...")
self.progress_updated.emit(f"[{vtype.capitalize()}] Downloading thumbnails...")
await asyncio.gather(*thumbnail_tasks, return_exceptions=True)
self.progress_updated.emit(f"[{vtype.capitalize()}] ✓ All thumbnails downloaded")

# === Batch insert to database ===
logger.debug(f"Saving {len(videos_to_insert)} {vtype} entries to DB for channel_id={self.channel_id}")
self.progress_updated.emit(f"[{vtype.capitalize()}] Saving {len(videos_to_insert)} videos to database...")


# === DATABASE SAVE ===
for video_data in videos_to_insert:
existing_videos = self.db.fetch(
table="VIDEO", where="video_id = ?", params=(video_data["video_id"],)
)
video_exists = len(existing_videos) > 0
self.db.insert("VIDEO", video_data)

total_processed += len(videos_to_insert)
self.progress_updated.emit(f"[{vtype.capitalize()}] ✓ Saved {len(videos_to_insert)} videos")

overall_progress = int(type_counter * 33)
self.progress_percentage.emit(min(overall_progress, 95))

self.progress_updated.emit(f"Completed scraping! Total {total_processed} videos saved.")
self.progress_percentage.emit(100)
self.finished.emit()
self.progress_percentage.emit(min(i * 33, 95))

except Exception as e:
self.progress_updated.emit(f"Fetching {vtype.capitalize()}...")
logger.debug(f"Scraping {vtype} for channel {self.channel_id}")
self.progress_percentage.emit(0)
self.finished.emit()
self.progress_updated.emit(f"Completed scraping! Total {total_processed} videos saved.")
self.progress_percentage.emit(100)

except Exception:
logger.exception("Async scrape failure")
Loading
Loading