diff --git a/requirements/base.txt b/requirements/base.txt index ea93b32..a0305f9 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -1,2 +1,2 @@ isodate -requests +requests==2.32.4 diff --git a/setup.cfg b/setup.cfg index 77478cb..36ff81d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -24,6 +24,10 @@ packages = find: python_requires = >=3.7 install_requires = file: requirements/base.txt +[options.entry_points] +console_scripts = + youtool = youtool.cli:main + [options.extras_require] cli = file: requirements/cli.txt dev = file: requirements/dev.txt diff --git a/youtool/__init__.py b/youtool/__init__.py index 28bbe83..6baeb5e 100644 --- a/youtool/__init__.py +++ b/youtool/__init__.py @@ -11,7 +11,7 @@ import isodate # TODO: implement duration parser to remove dependency? import requests -REGEXP_CHANNEL_ID = re.compile('"externalId":"([^"]+)"') +REGEXP_CHANNEL_ID = re.compile('"channelId":"([^"]+)"') REGEXP_LOCATION_RADIUS = re.compile(r"^[0-9.]+(?:m|km|ft|mi)$") REGEXP_NAIVE_DATETIME = re.compile(r"^[0-9]{4}-[0-9]{2}-[0-9]{2}[T ][0-9]{2}:[0-9]{2}:[0-9]{2}$") REGEXP_DATETIME_MILLIS = re.compile(r"^[0-9]{4}-[0-9]{2}-[0-9]{2}[T ][0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]+") @@ -519,10 +519,11 @@ def video_comments(self, video_id: str): yield parse_comment_data(reply) def video_livechat(self, video_id: str, expand_emojis=True): - from chat_downloader import ChatDownloader from chat_downloader.errors import ChatDisabled, LoginRequired, NoChatReplay - downloader = ChatDownloader() + from youtool.chat_downloader import YouToolChatDownloader + + downloader = YouToolChatDownloader() video_url = f"https://youtube.com/watch?v={video_id}" try: live = downloader.get_chat(video_url, message_groups=["messages", "superchat"]) diff --git a/youtool/chat_downloader/__init__.py b/youtool/chat_downloader/__init__.py new file mode 100644 index 0000000..953d0dd --- /dev/null +++ b/youtool/chat_downloader/__init__.py @@ -0,0 +1,142 @@ +import itertools +import time +from urllib.parse import urlparse + +from chat_downloader.chat_downloader import ChatDownloader +from chat_downloader.debugging import log +from chat_downloader.errors import ChatGeneratorError, InvalidURL, SiteNotSupported, URLNotProvided +from chat_downloader.formatting.format import ItemFormatter +from chat_downloader.output.continuous_write import ContinuousWriter +from chat_downloader.sites.common import SiteDefault +from chat_downloader.utils.timed_utils import TimedGenerator + +from .youtube import YouTubeCD + + +class YouToolChatDownloader(ChatDownloader): + """ + YouTool Chat Downloader subclass to fix YouTube data parsing issues + """ + + def get_chat( + self, + url=None, + start_time=None, + end_time=None, + max_attempts=15, + retry_timeout=None, + interruptible_retry=True, + timeout=None, + inactivity_timeout=None, + max_messages=None, + message_groups=SiteDefault("message_groups"), + message_types=None, + output=None, + overwrite=True, + sort_keys=True, + indent=4, + format=SiteDefault("format"), + format_file=None, + chat_type="live", + ignore=None, + message_receive_timeout=0.1, + buffer_size=4096, + ): + """ + Override get_chat to use YouTubeCD instead of YouTubeChatDownloader + """ + if not url: + raise URLNotProvided("No URL provided.") + + original_params = locals() + original_params.pop("self") + + # loop through all websites and + # get corresponding website parser + # based on matching url with predefined regex + site = YouTubeCD + match_info = site.matches(url) + if match_info: # match found + + function_name, match = match_info + + # Create new session + self.create_session(site) + site_object = self.sessions[site.__name__] + + # Parse site-defaults + params = {} + for k, v in original_params.items(): + params[k] = site_object.get_site_value(v) + + log("info", f"Site: {site_object._NAME}") + log("debug", f"Program parameters: {params}") + + get_chat = getattr(site_object, function_name, None) + if not get_chat: + raise NotImplementedError(f"{function_name} has not been implemented in {site.__name__}.") + + chat = get_chat(match, params) + log("debug", f'Match found: "{match}". Running "{function_name}" function in "{site.__name__}".') + + if chat is None: + raise ChatGeneratorError(f'No valid generator found in {site.__name__} for url "{url}"') + + if isinstance(params["max_messages"], int): + chat.chat = itertools.islice(chat.chat, params["max_messages"]) + else: + pass # TODO throw error + + if params["timeout"] is not None or params["inactivity_timeout"] is not None: + # Generator requires timing functionality + + chat.chat = TimedGenerator(chat.chat, params["timeout"], params["inactivity_timeout"]) + + if isinstance(params["timeout"], (float, int)): + start = time.time() + + def log_on_timeout(): + log("debug", f"Timeout occurred after {time.time() - start} seconds.") + + setattr(chat.chat, "on_timeout", log_on_timeout) + + if isinstance(params["inactivity_timeout"], (float, int)): + + def log_on_inactivity_timeout(): + log("debug", f"Inactivity timeout occurred after {params['inactivity_timeout']} seconds.") + + setattr(chat.chat, "on_inactivity_timeout", log_on_inactivity_timeout) + + formatter = ItemFormatter(params["format_file"]) + chat.format = lambda x: formatter.format(x, format_name=params["format"]) + + if params["output"]: + chat.attach_writer( + ContinuousWriter( + params["output"], + indent=params["indent"], + sort_keys=params["sort_keys"], + overwrite=params["overwrite"], + lazy_initialise=True, + ) + ) + + chat.site = site_object + + log("debug", f"Chat information: {chat.__dict__}") + log("info", f'Retrieving chat for "{chat.title}".') + + return chat + + parsed = urlparse(url) + log("debug", str(parsed)) + + if parsed.netloc: + raise SiteNotSupported(f"Site not supported: {parsed.netloc}") + elif not parsed.scheme: # No scheme, try to correct + original_params["url"] = "https://" + url + chat = self.get_chat(**original_params) + if chat: + return chat + else: + raise InvalidURL(f'Invalid URL: "{url}"') diff --git a/youtool/chat_downloader/youtube.py b/youtool/chat_downloader/youtube.py new file mode 100644 index 0000000..a41924f --- /dev/null +++ b/youtool/chat_downloader/youtube.py @@ -0,0 +1,123 @@ +from chat_downloader.sites.youtube import YouTubeChatDownloader +from chat_downloader.utils.core import float_or_none, multi_get, parse_iso8601, regex_search, try_parse_json + + +class YouTubeCD(YouTubeChatDownloader): + """ + YouTube Chat Downloader subclass to fix YouTube data parsing issues + """ + + def _parse_video_data(self, video_id, params=None, video_type="video"): + details = {} + + if video_type == "clip": + original_url = self._YT_CLIP_TEMPLATE.format(video_id) + else: # video_type == 'video' + original_url = self._YT_VIDEO_TEMPLATE.format(video_id) + + yt_initial_data, ytcfg, player_response_info = self._get_initial_info(original_url, params) + + streaming_data = player_response_info.get("streamingData") or {} + first_format = multi_get(streaming_data, "adaptiveFormats", 0) or multi_get(streaming_data, "formats", 0) or {} + + # Live streaming details + player_renderer = multi_get(player_response_info, "microformat", "playerMicroformatRenderer") or {} + live_details = player_renderer.get("liveBroadcastDetails") or {} + + # Video info + video_details = player_response_info.get("videoDetails") or {} + details["title"] = video_details.get("title") + details["author"] = video_details.get("author") + details["author_id"] = video_details.get("channelId") + details["original_video_id"] = video_details.get("videoId") + + # Clip info + clip_details = player_response_info.get("clipConfig") + if clip_details: + details["clip_start_time"] = float_or_none(clip_details.get("startTimeMs", 0)) / 1e3 + details["clip_end_time"] = float_or_none(clip_details.get("endTimeMs", 0)) / 1e3 + details["video_type"] = "clip" + + elif not video_details.get("isLiveContent"): + details["video_type"] = "premiere" + + else: + details["video_type"] = "video" + + start_timestamp = live_details.get("startTimestamp") + end_timestamp = live_details.get("endTimestamp") + details["start_time"] = parse_iso8601(start_timestamp) if start_timestamp else None + details["end_time"] = parse_iso8601(end_timestamp) if end_timestamp else None + + details["duration"] = ( + (float_or_none(first_format.get("approxDurationMs", 0)) / 1e3) + or float_or_none(video_details.get("lengthSeconds")) + or float_or_none(player_renderer.get("lengthSeconds")) + ) + + if not details["duration"] and details["start_time"] and details["end_time"]: + details["duration"] = (details["end_time"] - details["start_time"]) / 1e6 + + # Parse continuation info + sub_menu_items = ( + multi_get( + yt_initial_data, + "contents", + "twoColumnWatchNextResults", + "conversationBar", + "liveChatRenderer", + "header", + "liveChatHeaderRenderer", + "viewSelector", + "sortFilterSubMenuRenderer", + "subMenuItems", + ) + or {} + ) + details["continuation_info"] = { + x["title"]: x["continuation"]["reloadContinuationData"]["continuation"] for x in sub_menu_items + } + + # live, upcoming or past + if video_details.get("isLive") or live_details.get("isLiveNow"): + details["status"] = "live" + + elif video_details.get("isUpcoming"): + details["status"] = "upcoming" + + else: + details["status"] = "past" + + try: + client_continuation = yt_initial_data["contents"]["twoColumnWatchNextResults"]["conversationBar"][ + "liveChatRenderer" + ]["continuations"][0]["reloadContinuationData"]["continuation"] + + if details["status"] != "past": + response = self._session_get(f"https://www.youtube.com/live_chat?continuation={client_continuation}") + else: + response = self._session_get( + f"https://www.youtube.com/live_chat_replay?continuation={client_continuation}" + ) + + html = response.text + yt = regex_search(html, self._YT_INITIAL_DATA_RE) + dictLiveChats = try_parse_json(yt) + + continuations = dictLiveChats["continuationContents"]["liveChatContinuation"]["header"][ + "liveChatHeaderRenderer" + ]["viewSelector"]["sortFilterSubMenuRenderer"]["subMenuItems"] + + top_continuation = continuations[0]["continuation"]["reloadContinuationData"]["continuation"] + live_continuation = continuations[1]["continuation"]["reloadContinuationData"]["continuation"] + + if details["status"] != "past": + details["continuation_info"]["Top chat"] = top_continuation + details["continuation_info"]["Live chat"] = live_continuation + else: + details["continuation_info"]["Top chat replay"] = top_continuation + details["continuation_info"]["Live chat replay"] = live_continuation + except: + pass + + return details, player_response_info, yt_initial_data, ytcfg diff --git a/youtool/cli.py b/youtool/cli.py new file mode 100644 index 0000000..d780f9a --- /dev/null +++ b/youtool/cli.py @@ -0,0 +1,53 @@ +import argparse +import os + +from youtool.commands import COMMANDS + + +def main(): + """Main function for the YouTube CLI Tool. + + This function sets up the argument parser for the CLI tool, including options for the YouTube API key and + command-specific subparsers. It then parses the command-line arguments, retrieving the YouTube API key + from either the command-line argument '--api-key' or the environment variable 'YOUTUBE_API_KEY'. If the API + key is not provided through any means, it raises an argparse.ArgumentError. + + Finally, the function executes the appropriate command based on the parsed arguments. If an exception occurs + during the execution of the command, it is caught and raised as an argparse error for proper handling. + + Raises: + argparse.ArgumentError: If the YouTube API key is not provided. + argparse.ArgumentError: If there is an error during the execution of the command. + """ + parser = argparse.ArgumentParser(description="CLI Tool for managing YouTube videos add playlists") + parser.add_argument( + "-k", + "--api-key", + type=str, + help="YouTube API key (defaults to environment variable YOUTUBE_API_KEY)", + default=os.getenv("YOUTUBE_API_KEY"), + dest="api_key", + ) + parser.add_argument("-d", "--debug", help="Debug mode", dest="debug", default=False, action="store_true") + + subparsers = parser.add_subparsers(required=True, dest="command", title="Command", help="Command to be executed") + + for command in COMMANDS: + command.parse_arguments(subparsers) + + args = parser.parse_args() + args.api_key = args.api_key or os.environ.get("YOUTUBE_API_KEY") + + if not args.api_key: + parser.error("YouTube API Key is required") + + try: + print(args.func(**args.__dict__)) + except Exception as error: + if args.debug: + raise error + parser.error(error) + + +if __name__ == "__main__": + main() diff --git a/youtool/commands/__init__.py b/youtool/commands/__init__.py new file mode 100644 index 0000000..175b783 --- /dev/null +++ b/youtool/commands/__init__.py @@ -0,0 +1,32 @@ +from typing import List + +from .base import Command +from .channel_id import ChannelId +from .channel_info import ChannelInfo +from .video_comments import VideoComments +from .video_info import VideoInfo +from .video_livechat import VideoLiveChat +from .video_search import VideoSearch +from .video_transcription import VideoTranscription + +COMMANDS: List[Command] = [ + ChannelId, + ChannelInfo, + VideoInfo, + VideoSearch, + VideoComments, + VideoLiveChat, + VideoTranscription, +] + +__all__ = [ + "Command", + "COMMANDS", + "ChannelId", + "ChannelInfo", + "VideoInfo", + "VideoSearch", + "VideoComments", + "VideoLiveChat", + "VideoTranscription", +] diff --git a/youtool/commands/base.py b/youtool/commands/base.py new file mode 100644 index 0000000..303ee0a --- /dev/null +++ b/youtool/commands/base.py @@ -0,0 +1,161 @@ +import argparse +import csv +from datetime import datetime +from io import StringIO +from pathlib import Path +from typing import Any, Dict, List, Optional +from urllib.parse import parse_qs, urlparse + + +class Command: + """A base class for commands to inherit from, following a specific structure. + + Attributes: + name (str): The name of the command. + arguments (List[Dict[str, Any]]): A list of dictionaries, each representing an argument for the command. + """ + + name: str + arguments: List[Dict[str, Any]] + + @staticmethod + def video_id_from_url(video_url: str) -> Optional[str]: + """Extracts the video ID from a YouTube URL. + + Args: + url (str): The YouTube video URL. + + Returns: + Optional[str]: The extracted video ID, or None if not found. + """ + parsed_url = urlparse(video_url) + parsed_url_query = dict(parse_qs(parsed_url.query)) + return parsed_url_query.get("v") + + @classmethod + def generate_parser(cls, subparsers: argparse._SubParsersAction): + """Creates a parser for the command and adds it to the subparsers. + + Args: + subparsers (argparse._SubParsersAction): The subparsers action to add the parser to. + + Returns: + argparse.ArgumentParser: The parser for the command. + """ + return subparsers.add_parser(cls.name, help=cls.__doc__) + + @classmethod + def parse_arguments(cls, subparsers: argparse._SubParsersAction) -> None: + """Parses the arguments for the command and sets the command's execute method as the default function to call. + + Args: + subparsers (argparse._SubParsersAction): The subparsers action to add the parser to. + """ + parser = cls.generate_parser(subparsers) + groups = {} + + for argument in cls.arguments: + argument_copy = {**argument} + argument_names = [name for name in [argument_copy.pop("name"), argument_copy.pop("short", None)] if name] + + group_name = argument_copy.pop("mutually_exclusive_group", None) + if group_name: + if group_name not in groups: + groups[group_name] = parser.add_argument_group(group_name) + groups[group_name].add_argument(*argument_names, **argument_copy) + else: + parser.add_argument(*argument_names, **argument_copy) + parser.set_defaults(func=cls.execute) + + @staticmethod + def filter_fields(video_info: Dict, info_columns: Optional[List] = None): + """Filters the fields of a dictionary containing video information based on + specified columns. + + Args: + video_info (Dict): A dictionary containing video information. + info_columns (Optional[List], optional): A list specifying which fields + to include in the filtered output. If None, returns the entire + video_info dictionary. Defaults to None. + + Returns: + Dict: A dictionary containing only the fields specified in info_columns + (if provided) or the entire video_info dictionary if info_columns is None. + """ + return ( + {field: value for field, value in video_info.items() if field in info_columns} + if info_columns + else video_info + ) + + @classmethod + def execute(cls, **kwargs) -> str: + """Executes the command. + + This method should be overridden by subclasses to define the command's behavior. + + Args: + arguments (argparse.Namespace): The parsed arguments for the command. + """ + raise NotImplementedError() + + @staticmethod + def data_from_csv(file_path: Path, data_column_name: Optional[str] = None) -> List[str]: + """Extracts a list of URLs from a specified CSV file. + + Args: + file_path: The path to the CSV file containing the URLs. + data_column_name: The name of the column in the CSV file that contains the URLs. + If not provided, it defaults to `ChannelId.URL_COLUMN_NAME`. + + Returns: + A list of URLs extracted from the specified CSV file. + + Raises: + Exception: If the file path is invalid or the file cannot be found. + """ + data = [] + + if not file_path.is_file(): + raise FileNotFoundError(f"Invalid file path: {file_path}") + + with file_path.open("r", newline="") as csv_file: + reader = csv.DictReader(csv_file) + fieldnames = reader.fieldnames + + if fieldnames is None: + raise ValueError("Fieldnames is None") + + if data_column_name not in fieldnames: + raise Exception(f"Column {data_column_name} not found on {file_path}") + for row in reader: + value = row.get(data_column_name) + if value is not None: + data.append(str(value)) + return data + + @classmethod + def data_to_csv(cls, data: List[Dict], output_file_path: Optional[str] = None) -> str: + """Converts a list of channel IDs into a CSV file. + + Parameters: + channels_ids (List[str]): List of channel IDs to be written to the CSV. + output_file_path (str, optional): Path to the file where the CSV will be saved. If not provided, the CSV will be returned as a string. + channel_id_column_name (str, optional): Name of the column in the CSV that will contain the channel IDs. + If not provided, the default value defined in ChannelId.CHANNEL_ID_COLUMN_NAME will be used. + + Returns: + str: The path of the created CSV file or, if no path is provided, the contents of the CSV as a string. + """ + if output_file_path: + output_path = Path(output_file_path) + if output_path.is_dir(): + command_name = cls.name.replace("-", "_") + timestamp = datetime.now().strftime("%M%S%f") + output_file_path = output_path / f"{command_name}_{timestamp}.csv" + + with Path(output_file_path).open("w", newline="") if output_file_path else StringIO() as csv_file: + writer = csv.DictWriter(csv_file, fieldnames=list(data[0].keys()) if data else []) + writer.writeheader() + writer.writerows(data) + return str(output_file_path) if output_file_path else csv_file.getvalue() diff --git a/youtool/commands/channel_id.py b/youtool/commands/channel_id.py new file mode 100644 index 0000000..3b33ed1 --- /dev/null +++ b/youtool/commands/channel_id.py @@ -0,0 +1,94 @@ +from pathlib import Path + +from youtool import YouTube + +from .base import Command + + +class ChannelId(Command): + """Get channel IDs from a list of URLs (or CSV filename with URLs inside), generate CSV output (just the IDs).""" + + name = "channel-id" + arguments = [ + { + "name": "--urls", + "short": "-u", + "type": str, + "help": "Channels urls", + "nargs": "*", + "mutually_exclusive_group": "input_source", + }, + { + "name": "--urls-file-path", + "short": "-f", + "type": Path, + "help": "Channels urls csv file path", + "mutually_exclusive_group": "input_source", + }, + {"name": "--output-file-path", "short": "-o", "type": Path, "help": "Output csv file path"}, + {"name": "--url-column-name", "short": "-c", "type": str, "help": "URL column name on csv input files"}, + {"name": "--id-column-name", "short": "-i", "type": str, "help": "Channel ID column name on csv output files"}, + ] + + URL_COLUMN_NAME: str = "channel_url" + CHANNEL_ID_COLUMN_NAME: str = "channel_id" + + @classmethod + def execute(cls, **kwargs) -> str: + """Execute the channel-id command to fetch YouTube channel IDs from URLs and save them to a CSV file. + + This command retrieves YouTube channel IDs from one of two possible inputs: + - a list of YouTube channel URLs (`--urls`), or + - a CSV file containing those URLs (`--urls-file-path`). + + Args: + urls (list[str]): List of YouTube channel URLs. + Mutually exclusive with `urls_file_path`. + urls_file_path (Path): Path to a CSV file containing YouTube channel URLs. + Mutually exclusive with `urls`. + Requires url_column_name to specify the column with URLs. + output_file_path (Path, optional): Path to the output CSV file where channel IDs will be saved. + If not provided, the result will be returned as a string. + api_key (str): The API key to authenticate with the YouTube Data API. + url_column_name (str, optional): The name of the column in the urls_file_path CSV file that contains the URLs. + Default is "url". + id_column_name (str, optional): The name of the column for channel IDs in the output CSV file. + Default is "channel_id". + + Returns: + str: A message indicating the result of the command. If output_file_path is specified, the message will + include the path to the generated CSV file. Otherwise, it will return the result as a string. + + Raises: + ValueError: If neither `urls` nor `urls_file_path` is provided, or if both are provided at the same time. + """ + urls = kwargs.get("urls") or [] + urls_file_path = kwargs.get("urls_file_path") + output_file_path = kwargs.get("output_file_path") + api_key = kwargs.get("api_key") + + url_column_name = kwargs.get("url_column_name") + id_column_name = kwargs.get("id_column_name") + + urls = cls.resolve_urls(urls, urls_file_path, url_column_name) + + youtube = YouTube([api_key], disable_ipv6=True) + + channels_ids = [youtube.channel_id_from_url(url) for url in urls if url] + + result = cls.data_to_csv( + data=[{(id_column_name or cls.CHANNEL_ID_COLUMN_NAME): channel_id} for channel_id in channels_ids], + output_file_path=output_file_path, + ) + + return result + + @classmethod + def resolve_urls(cls, urls, urls_file_path, url_column_name): + if urls_file_path: + urls += cls.data_from_csv( + file_path=Path(urls_file_path), data_column_name=url_column_name or cls.URL_COLUMN_NAME + ) + if not urls: + raise Exception("Either 'username' or 'url' must be provided for the channel-id command") + return urls diff --git a/youtool/commands/channel_info.py b/youtool/commands/channel_info.py new file mode 100644 index 0000000..56a27c2 --- /dev/null +++ b/youtool/commands/channel_info.py @@ -0,0 +1,131 @@ +from pathlib import Path +from typing import List, Self + +from youtool import YouTube + +from .base import Command + + +class ChannelInfo(Command): + """Get channel info from a list of IDs (or CSV filename with IDs inside), generate CSV output + (same schema for `channel` dicts) + """ + + name = "channel-info" + arguments = [ + { + "name": "--urls", + "short": "-u", + "type": str, + "help": "Channel URLs", + "nargs": "*", + "mutually_exclusive_group": "input_source", + }, + {"name": "--usernames", "short": "-n", "type": str, "help": "Channel usernames", "nargs": "*"}, + {"name": "--ids", "short": "-i", "type": str, "help": "Channel IDs", "nargs": "*"}, + { + "name": "--urls-file-path", + "short": "-f", + "type": Path, + "help": "Channel URLs CSV file path", + "mutually_exclusive_group": "input_source", + }, + {"name": "--usernames-file-path", "short": "-m", "type": Path, "help": "Channel usernames CSV file path"}, + {"name": "--ids-file-path", "short": "-d", "type": Path, "help": "Channel IDs CSV file path"}, + {"name": "--output-file-path", "short": "-o", "type": Path, "help": "Output CSV file path"}, + {"name": "--url-column-name", "short": "-c", "type": str, "help": "URL column name on CSV input files"}, + { + "name": "--username-column-name", + "short": "-s", + "type": str, + "help": "Username column name on CSV input files", + }, + {"name": "--id-column-name", "short": "-a", "type": str, "help": "ID column name on CSV input files"}, + ] + + URL_COLUMN_NAME: str = "channel_url" + USERNAME_COLUMN_NAME: str = "channel_username" + ID_COLUMN_NAME: str = "channel_id" + INFO_COLUMNS: List[str] = [ + "id", + "title", + "description", + "published_at", + "view_count", + "subscriber_count", + "video_count", + ] + + @classmethod + def execute(cls: Self, **kwargs) -> str: + """Execute the channel-info command to fetch YouTube channel information from URLs or + usernames and save them to a CSV file. + + Args: + urls (list[str], optional): A list of YouTube channel URLs. If not provided, `urls_file_path` must be specified. + usernames (list[str], optional): A list of YouTube channel usernames. If not provided, `usernames_file_path` must be specified. + ids (list[str], optional): A list of YouTube channel IDs. If not provided, `ids_file_path` must be specified. + urls_file_path (Path, optional): Path to a CSV file containing YouTube channel URLs. + usernames_file_path (Path, optional): Path to a CSV file containing YouTube channel usernames. + output_file_path (Path, optional): Path to the output CSV file where channel information will be saved. + ids_file_path (Path, optional): Path to a CSV file containing YouTube channel IDs. + api_key (str): The API key to authenticate with the YouTube Data API. + url_column_name (str, optional): The name of the column in the `urls_file_path` CSV file that contains the URLs. + Default is "channel_url". + username_column_name (str, optional): The name of the column in the `usernames_file_path` CSV file that contains the usernames. + Default is "channel_username". + info_columns (str, optional): Comma-separated list of columns to include in the output CSV. + Default is the class attribute `INFO_COLUMNS`. + + Returns: + str: A message indicating the result of the command. If `output_file_path` is specified, the message will + include the path to the generated CSV file. Otherwise, it will return the result as a string. + + Raises: + Exception: If neither `urls`, `usernames`, `urls_file_path` nor `usernames_file_path` is provided. + """ + + urls = kwargs.get("urls") or [] + usernames = kwargs.get("usernames") or [] + ids = kwargs.get("ids") or [] + urls_file_path = kwargs.get("urls_file_path") + usernames_file_path = kwargs.get("usernames_file_path") + output_file_path = kwargs.get("output_file_path") + ids_file_path = kwargs.get("ids_file_path") + api_key = kwargs.get("api_key") + + url_column_name = kwargs.get("url_column_name") or ChannelInfo.URL_COLUMN_NAME + username_column_name = kwargs.get("username_column_name") or ChannelInfo.USERNAME_COLUMN_NAME + id_column_name = kwargs.get("id_column_name") or ChannelInfo.ID_COLUMN_NAME + info_columns = kwargs.get("info_columns") + + info_columns = ( + [column.strip() for column in info_columns.split(",")] if info_columns else ChannelInfo.INFO_COLUMNS + ) + + if urls_file_path: + urls += ChannelInfo.data_from_csv(urls_file_path, url_column_name) + if usernames_file_path: + usernames += ChannelInfo.data_from_csv(usernames_file_path, username_column_name) + if ids_file_path: + ids += ChannelInfo.data_from_csv(ids_file_path, id_column_name) + + if not urls and not usernames and not ids: + raise Exception("Either 'urls', 'usernames', or 'ids' must be provided for the channel-info command") + + youtube = YouTube([api_key], disable_ipv6=True) + + channels_ids = ( + [youtube.channel_id_from_url(url) for url in (urls or []) if url] + + [youtube.channel_id_from_username(username) for username in (usernames or []) if username] + + (ids or []) + ) + channel_ids = list(set([channel_id for channel_id in channels_ids if channel_id])) + return cls.data_to_csv( + data=[ + ChannelInfo.filter_fields(channel_info, info_columns) + for channel_info in (youtube.channels_infos(channel_ids) or []) + if channel_info + ], + output_file_path=output_file_path, + ) diff --git a/youtool/commands/video_comments.py b/youtool/commands/video_comments.py new file mode 100644 index 0000000..337fdfe --- /dev/null +++ b/youtool/commands/video_comments.py @@ -0,0 +1,47 @@ +from pathlib import Path +from typing import List, Self + +from youtool import YouTube + +from .base import Command + + +class VideoComments(Command): + """ + Get comments from a video ID, generate CSV output + """ + + name = "video-comments" + arguments = [ + {"name": "--ids", "short": "-i", "type": str, "help": "Video ID", "required": True}, + {"name": "--output-file-path", "short": "-o", "type": Path, "help": "Output CSV file path"}, + ] + + COMMENT_COLUMNS: List[str] = ["comment_id", "author_display_name", "text_display", "like_count", "published_at"] + + @classmethod + def execute(cls: Self, **kwargs) -> str: + """ + Execute the get-comments command to fetch comments from a YouTube video and save them to a CSV file. + + - a YouTube video ID (`--ids`). + + Args: + ids (str): The ID of the YouTube video. + output_file_path (Path): Path to the output CSV file where comments will be saved. + api_key (str): The API key to authenticate with the YouTube Data API. + + Returns: + A message indicating the result of the command. If output_file_path is specified, + the message will include the path to the generated CSV file. + Otherwise, it will return the result as a string. + """ + ids = kwargs.get("ids") + output_file_path = kwargs.get("output_file_path") + api_key = kwargs.get("api_key") + + youtube = YouTube([api_key], disable_ipv6=True) + + comments = list(youtube.video_comments(ids)) + + return cls.data_to_csv(data=comments, output_file_path=output_file_path) diff --git a/youtool/commands/video_info.py b/youtool/commands/video_info.py new file mode 100644 index 0000000..5971294 --- /dev/null +++ b/youtool/commands/video_info.py @@ -0,0 +1,127 @@ +from pathlib import Path +from typing import List, Self + +from youtool import YouTube + +from .base import Command + + +class VideoInfo(Command): + """Get video info from a list of IDs or URLs (or CSV filename with URLs/IDs inside), generate CSV output (same schema for video dicts)")""" + + name = "video-info" + arguments = [ + { + "name": "--ids", + "short": "-i", + "type": str, + "help": "Video IDs", + "nargs": "*", + "mutually_exclusive_group": "input_source", + }, + { + "name": "--urls", + "short": "-u", + "type": str, + "help": "Video URLs", + "nargs": "*", + "mutually_exclusive_group": "input_source", + }, + { + "name": "--urls-file-path", + "short": "-f", + "type": Path, + "help": "Channels urls csv file path", + "mutually_exclusive_group": "input_source", + }, + {"name": "--ids-file-path", "short": "-d", "type": Path, "help": "Channel IDs CSV file path"}, + {"name": "--output-file-path", "short": "-o", "type": Path, "help": "Output CSV file path"}, + {"name": "--url_column_name", "short": "-c", "type": str, "help": "URL column name on CSV input files"}, + {"name": "--id_column_name", "short": "-a", "type": str, "help": "ID column name on CSV input files"}, + { + "name": "--info_columns", + "short": "-l", + "type": str, + "help": "Comma-separated list of columns to include in the output CSV", + }, + ] + + ID_COLUMN_NAME: str = "video_id" + URL_COLUMN_NAME: str = "video_url" + INFO_COLUMNS: List[str] = [ + "id", + "title", + "description", + "published_at", + "view_count", + "like_count", + "comment_count", + ] + + @classmethod + def execute(cls: Self, **kwargs) -> str: + """ + Execute the video-info command to fetch YouTube video information from IDs or URLs and save them to a CSV file. + + - a list of YouTube video IDs (`--ids`), or + - a list of YouTube video URLs (`--urls`), or + - a CSV file containing those URLs (`--urls-file-path`) or IDs (`--ids-file-path`). + + Args: + ids (list[str], optional): List of YouTube video IDs. + Mutually exclusive with `urls` and `input_file_path`. + urls (list[str], optional): List of YouTube video URLs. + Mutually exclusive with `ids` and `input_file_path`. + urls_file_path (Path, optional): Path to a CSV file containing YouTube video URLs. + ids_file_path (Path, optional): Path to a CSV file containing YouTube video IDs. + output_file_path (Path, optional): Path to the output CSV file where video information will be saved. + api_key (str): The API key to authenticate with the YouTube Data API. + url_column_name (str, optional): The name of the column in the input_file_path CSV file that contains the URLs. + Default is "video_url". + id_column_name (str, optional): The name of the column in the input_file_path CSV file that contains the IDs. + Default is "video_id". + info_columns (str, optional): Comma-separated list of columns to include in the output CSV. + Default is the class attribute INFO_COLUMNS. + + Returns: + str: A message indicating the result of the command. If output_file_path is specified, the message will + include the path to the generated CSV file. Otherwise, it will return the result as a string. + + Raises: + Exception: If neither ids, urls, nor input_file_path is provided. + """ + ids = kwargs.get("ids") or [] + urls = kwargs.get("urls") or [] + ids_file_path = kwargs.get("ids_file_path") + urls_file_path = kwargs.get("urls_file_path") + output_file_path = kwargs.get("output_file_path") + api_key = kwargs.get("api_key") + + url_column_name = kwargs.get("url_column_name") or VideoInfo.URL_COLUMN_NAME + id_column_name = kwargs.get("id_column_name") or VideoInfo.ID_COLUMN_NAME + + info_columns = kwargs.get("info_columns") + + info_columns = ( + [column.strip() for column in info_columns.split(",")] if info_columns else VideoInfo.INFO_COLUMNS + ) + if ids_file_path: + ids += cls.data_from_csv(ids_file_path, id_column_name) + if urls_file_path: + urls += cls.data_from_csv(urls_file_path, url_column_name) + + if not ids and not urls: + raise Exception("Either 'ids', 'urls' must be provided for the video-info command") + + youtube = YouTube([api_key], disable_ipv6=True) + + if urls: + ids += sum([cls.video_id_from_url(url) for url in urls], []) + + # Remove duplicated + ids = list(set(ids)) + videos_infos = list(youtube.videos_infos([_id for _id in ids if _id])) + return cls.data_to_csv( + data=[VideoInfo.filter_fields(video_info, info_columns) for video_info in videos_infos], + output_file_path=output_file_path, + ) diff --git a/youtool/commands/video_livechat.py b/youtool/commands/video_livechat.py new file mode 100644 index 0000000..1dafab3 --- /dev/null +++ b/youtool/commands/video_livechat.py @@ -0,0 +1,78 @@ +from datetime import datetime +from pathlib import Path +from typing import List, Optional, Self + +from youtool import YouTube + +from .base import Command + + +class VideoLiveChat(Command): + """Get live chat comments from a video ID, generate CSV output (same schema for chat_message dicts)""" + + name = "video-livechat" + arguments = [ + {"name": "--ids", "short": "-i", "type": str, "help": "Video ID", "required": True}, + {"name": "--output-file-path", "short": "-o", "type": Path, "help": "Output CSV file path"}, + {"name": "--expand-emojis", "short": "-e", "action": "store_true", "help": "Expand emojis in chat messages"}, + ] + + CHAT_COLUMNS: List[str] = [ + "id", + "video_id", + "created_at", + "type", + "action", + "video_time", + "author", + "author_id", + "author_image_url", + "text", + "money_currency", + "money_amount", + ] + + @staticmethod + def parse_timestamp(timestamp: str) -> str: + try: + return datetime.fromisoformat(timestamp.replace("Z", "")).strftime("%Y-%m-%d %H:%M:%S") + except Exception: + return timestamp + + @staticmethod + def parse_decimal(value: Optional[str]) -> Optional[float]: + if value is None: + return None + try: + return float(str(value).replace(",", "")) + except Exception: + return None + + @classmethod + def execute(cls: Self, **kwargs) -> str: + """ + Execute the video-livechat command to fetch live chat messages from a YouTube video and save them to a CSV file. + + - a YouTube video ID (`--ids`). + + Args: + ids (str): The ID of the YouTube video. + output_file_path (Path): Path to the output CSV file where chat messages will be saved. + expand_emojis (bool): Whether to expand emojis in chat messages. Defaults to True. + api_key (str): The API key to authenticate with the YouTube Data API. + + Returns: + A message indicating the result of the command. If output_file_path is specified, + the message will include the path to the generated CSV file. + Otherwise, it will return the result as a string. + """ + ids = kwargs.get("ids") + output_file_path = kwargs.get("output_file_path") + expand_emojis = kwargs.get("expand_emojis", True) + api_key = kwargs.get("api_key") + + youtube = YouTube([api_key], disable_ipv6=True) + + chat_messages = list(youtube.video_livechat(ids, expand_emojis)) + + return cls.data_to_csv(data=chat_messages, output_file_path=output_file_path) diff --git a/youtool/commands/video_search.py b/youtool/commands/video_search.py new file mode 100644 index 0000000..17345f5 --- /dev/null +++ b/youtool/commands/video_search.py @@ -0,0 +1,126 @@ +from pathlib import Path +from typing import List, Self + +from youtool import YouTube + +from .base import Command + + +class VideoSearch(Command): + """ + Search video info from a list of IDs or URLs (or CSV filename with URLs/IDs inside), + generate CSV output (simplified video dict schema or option to get full video info) + """ + + name = "video-search" + arguments = [ + { + "name": "--ids", + "short": "-i", + "type": str, + "help": "Video IDs", + "nargs": "*", + "mutually_exclusive_group": "input_source", + }, + { + "name": "--urls", + "short": "-u", + "type": str, + "help": "Video URLs", + "nargs": "*", + "mutually_exclusive_group": "input_source", + }, + { + "name": "--ids-file-path", + "short": "-d", + "type": Path, + "help": "Channel IDs CSV file path", + "mutually_exclusive_group": "input_source", + }, + { + "name": "--urls-file-path", + "short": "-f", + "type": Path, + "help": "Channels urls csv file path", + "mutually_exclusive_group": "input_source", + }, + {"name": "--output-file-path", "short": "-o", "type": Path, "help": "Output CSV file path"}, + {"name": "--url_column_name", "short": "-c", "type": str, "help": "URL column name on csv input files"}, + {"name": "--id_column_name", "short": "-a", "type": str, "help": "Channel ID column name on csv output files"}, + { + "name": "--info_columns", + "short": "-l", + "type": str, + "help": "Comma-separated list of columns to include in the output CSV", + }, + {"name": "--full-info", "action": "store_true", "help": "Option to get full video info", "default": False}, + ] + + ID_COLUMN_NAME: str = "video_id" + URL_COLUMN_NAME: str = "video_url" + INFO_COLUMNS: List[str] = ["id", "title", "published_at", "views"] + FULL_INFO_COLUMNS: List[str] = INFO_COLUMNS + ["description", "like_count", "comment_count"] + + @classmethod + def execute(cls: Self, **kwargs) -> str: + """ + Execute the video-search command to fetch YouTube video information from IDs or URLs and save them to a CSV file. + + - a list of YouTube video IDs (`--ids`), or + - a list of YouTube video URLs (`--urls`), or + - a CSV file containing those URLs (`--urls-file-path`) or IDs (`--ids-file-path`). + + Args: + ids (list[str], optional): A list of YouTube video IDs. If not provided, input_file_path must be specified. + urls (list[str], optional): A list of YouTube video URLs. If not provided, input_file_path must be specified. + ids_file_path (Path, optional): Path to a CSV file containing YouTube video IDs. + urls_file_path (Path, optional): Path to a CSV file containing YouTube video URLs. + output_file_path (Path, optional): Path to the output CSV file where video information will be saved. + api_key (str): The API key to authenticate with the YouTube Data API. + full_info (bool, optional): Flag to indicate whether to get full video info. Default is False. + url_column_name (str, optional): The name of the column in the input CSV file that contains the URLs. Default is "video_url". + id_column_name (str, optional): The name of the column in the input CSV file that contains the IDs. Default is "video_id". + + Returns: + str: A message indicating the result of the command. If output_file_path is specified, + the message will include the path to the generated CSV file. + Otherwise, it will return the result as a string. + + Raises: + Exception: If neither ids, urls, nor input_file_path is provided. + """ + ids = kwargs.get("ids") or [] + urls = kwargs.get("urls") or [] + ids_file_path = kwargs.get("ids_file_path") + urls_file_path = kwargs.get("urls_file_path") + output_file_path = kwargs.get("output_file_path") + api_key = kwargs.get("api_key") + + url_column_name = kwargs.get("url_column_name") or VideoSearch.URL_COLUMN_NAME + id_column_name = kwargs.get("id_column_name") or VideoSearch.ID_COLUMN_NAME + + info_columns = kwargs.get("info_columns") + full_info = kwargs.get("full_info", False) + + info_columns = VideoSearch.FULL_INFO_COLUMNS if full_info else VideoSearch.INFO_COLUMNS + + if ids_file_path: + ids += cls.data_from_csv(ids_file_path, id_column_name) + if urls_file_path: + urls += cls.data_from_csv(urls_file_path, url_column_name) + + if not ids and not urls: + raise Exception("Either ids, urls, ids_file_path or urls_file_path must be provided") + + youtube = YouTube([api_key], disable_ipv6=True) + + if urls: + ids += sum([cls.video_id_from_url(url) for url in urls], []) + + # Remove duplicated + ids = list(set(ids)) + videos_infos = list(youtube.videos_infos([_id for _id in ids if _id])) + return cls.data_to_csv( + data=[VideoSearch.filter_fields(video_info, info_columns) for video_info in videos_infos], + output_file_path=output_file_path, + ) diff --git a/youtool/commands/video_transcription.py b/youtool/commands/video_transcription.py new file mode 100644 index 0000000..4e8a38f --- /dev/null +++ b/youtool/commands/video_transcription.py @@ -0,0 +1,121 @@ +from pathlib import Path + +from youtool import YouTube + +from .base import Command + + +class VideoTranscription(Command): + """Download video transcriptions from YouTube videos based on IDs or URLs (or CSV filename with URLs/IDs inside), and save them to files.""" + + name = "video-transcription" + arguments = [ + { + "name": "--ids", + "short": "-i", + "type": str, + "help": "Video IDs", + "nargs": "*", + "mutually_exclusive_group": "input_source", + }, + { + "name": "--urls", + "short": "-u", + "type": str, + "help": "Video URLs", + "nargs": "*", + "mutually_exclusive_group": "input_source", + }, + { + "name": "--urls-file-path", + "short": "-f", + "type": Path, + "help": "Channels urls csv file path", + "mutually_exclusive_group": "input_source", + }, + { + "name": "--ids-file-path", + "short": "-d", + "type": Path, + "help": "Channel IDs CSV file path", + "mutually_exclusive_group": "input_source", + }, + { + "name": "--output-dir", + "short": "-o", + "type": Path, + "help": "Output directory to save transcriptions", + "required": True, + }, + { + "name": "--language-code", + "short": "-g", + "type": str, + "help": "Language code for transcription", + "required": True, + }, + {"name": "--url_column_name", "short": "-c", "type": str, "help": "URL column name on CSV input files"}, + {"name": "--id_column_name", "short": "-a", "type": str, "help": "ID column name on CSV input files"}, + ] + + ID_COLUMN_NAME: str = "video_id" + URL_COLUMN_NAME: str = "video_url" + + @classmethod + def execute(cls, **kwargs) -> str: + """Execute the video-transcription command to download transcriptions of videos from IDs or URLs and save them to a CSV file. + + - a list of YouTube video IDs (`--ids`), or + - a list of YouTube video URLs (`--urls`), or + - a CSV file containing those URLs (`--urls-file-path`) or IDs (`--ids-file-path`). + + Args: + ids (list[str], optional): List of YouTube video IDs. + Mutually exclusive with `urls` and `input_file_path`. + urls (list[str], optional): List of YouTube video URLs. + Mutually exclusive with `ids` and `input_file_path`. + urls_file_path (Path, optional): Path to a CSV file containing YouTube video URLs. + ids_file_path (Path, optional): Path to a CSV file containing YouTube video IDs. + output_dir (Path, optional): Path to the output CSV file where video information will be saved. + language_code (str): Language code for the transcription language. + api_key (str): The API key to authenticate with the YouTube Data API. + url_column_name (str, optional): Column name for URLs in the CSV input file. Defaults to "video_url". + id_column_name (str, optional): Column name for IDs in the CSV output file. Defaults to "video_id". + + Returns: + str: A message indicating the result of the command. Reports success or failure for each video transcription download. + """ + ids = kwargs.get("ids") or [] + urls = kwargs.get("urls") or [] + ids_file_path = kwargs.get("ids_file_path") + urls_file_path = kwargs.get("urls_file_path") + output_dir = kwargs.get("output_dir") + language_code = kwargs.get("language_code") + api_key = kwargs.get("api_key") + + url_column_name = kwargs.get("url_column_name") or VideoTranscription.URL_COLUMN_NAME + id_column_name = kwargs.get("id_column_name") or VideoTranscription.ID_COLUMN_NAME + + youtube = YouTube([api_key], disable_ipv6=True) + + if ids_file_path: + ids += cls.data_from_csv(ids_file_path, id_column_name) + if urls_file_path: + urls += cls.data_from_csv(urls_file_path, url_column_name) + + if not ids and not urls: + raise Exception("Either 'ids' or 'urls' must be provided for the video-transcription command") + + if urls: + ids += sum([cls.video_id_from_url(url) for url in urls], []) + + # Remove duplicated + ids = list(set(ids)) + youtube.videos_transcriptions(ids, language_code, output_dir) + output_dir_path = Path(output_dir) + saved_transcriptions = [ + str(output_dir_path / f"{v_id}.{language_code}.vtt") + for v_id in ids + if (output_dir_path / f"{v_id}.{language_code}.vtt").is_file() + ] + return "\n".join(saved_transcriptions)