From 2ecbda4b1bc9064058c6e2189a2f3d6b71a2ed1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lvaro=20Justen=20=28=40turicas=29?= Date: Sun, 19 May 2024 21:26:02 -0300 Subject: [PATCH 01/30] Add console_scripts to setup.cfg --- setup.cfg | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/setup.cfg b/setup.cfg index 77478cb..2cffba5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -24,6 +24,10 @@ packages = find: python_requires = >=3.7 install_requires = file: requirements/base.txt +[options.entry_points] +console_scripts = + youtool = youtool:cli + [options.extras_require] cli = file: requirements/cli.txt dev = file: requirements/dev.txt From 252ff46e14cc221b07bda07843aa94934e9d6162 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lvaro=20Justen=20=28=40turicas=29?= Date: Sun, 19 May 2024 21:28:04 -0300 Subject: [PATCH 02/30] Implement draft CLI module --- youtool/cli.py | 44 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 youtool/cli.py diff --git a/youtool/cli.py b/youtool/cli.py new file mode 100644 index 0000000..be0bbd0 --- /dev/null +++ b/youtool/cli.py @@ -0,0 +1,44 @@ +import argparse + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--api-key") + subparsers = parser.add_subparsers(required=True, dest="command") + + api_key = args.api_key or os.environ.get("YOUTUBE_API_KEY") + + cmd_channel_id = subparsers.add_parser("channel-id", help="Get channel IDs from a list of URLs (or CSV filename with URLs inside), generate CSV output (just the IDs)") + cmd_channel_info = subparsers.add_parser("channel-info", help="Get channel info from a list of IDs (or CSV filename with IDs inside), generate CSV output (same schema for `channel` dicts)") + cmd_video_info = subparsers.add_parser("video-info", help="Get video info from a list of IDs or URLs (or CSV filename with URLs/IDs inside), generate CSV output (same schema for `video` dicts)") + cmd_video_search = subparsers.add_parser("video-search", help="Get video info from a list of IDs or URLs (or CSV filename with URLs/IDs inside), generate CSV output (simplified `video` dict schema or option to get full video info after)") + cmd_video_comments = subparsers.add_parser("video-comments", help="Get comments from a video ID, generate CSV output (same schema for `comment` dicts)") + cmd_video_livechat = subparsers.add_parser("video-livechat", help="Get comments from a video ID, generate CSV output (same schema for `chat_message` dicts)") + cmd_video_transcriptions = subparsers.add_parser("video-transcription", help="Download video transcriptions based on language code, path and list of video IDs or URLs (or CSV filename with URLs/IDs inside), download files to destination and report results") + + args = parser.parse_args() + + if args.command == "channel-id": + print(f"Implement: {args.command}") # TODO: implement + + elif args.command == "channel-info": + print(f"Implement: {args.command}") # TODO: implement + + elif args.command == "video-info": + print(f"Implement: {args.command}") # TODO: implement + + elif args.command == "video-search": + print(f"Implement: {args.command}") # TODO: implement + + elif args.command == "video-comments": + print(f"Implement: {args.command}") # TODO: implement + + elif args.command == "video-livechat": + print(f"Implement: {args.command}") # TODO: implement + + elif args.command == "video-transcription": + print(f"Implement: {args.command}") # TODO: implement + + +if __name__ == "__main__": + main() From dcc9e2f16ed3db17258108292b63f71d93773f97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lvaro=20Justen=20=28=40turicas=29?= Date: Sun, 19 May 2024 21:30:52 -0300 Subject: [PATCH 03/30] Add old/draft CLI search code --- youtool/cli.py | 114 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) diff --git a/youtool/cli.py b/youtool/cli.py index be0bbd0..ff675a3 100644 --- a/youtool/cli.py +++ b/youtool/cli.py @@ -1,4 +1,6 @@ import argparse +import os +import sys def main(): @@ -29,6 +31,118 @@ def main(): elif args.command == "video-search": print(f"Implement: {args.command}") # TODO: implement + exit(1) + + # TODO: update code below based on new YouTube class API + import rows + from loguru import logger + from tqdm import tqdm + + from youtool import YouTube + + parser = argparse.ArgumentParser() + parser.add_argument("--key") + parser.add_argument("csv_filename") + parser.add_argument("url", nargs="+") + args = parser.parse_args() + + key = args.key or os.environ.get("YOUTUBE_API_KEY") + if not key: + print("ERROR: Must provide an API key (--key or YOUTUBE_API_KEY env var)", file=sys.stderr) + exit(1) + + if not Path(args.csv_filename).parent.exists(): + Path(args.csv_filename).parent.mkdir(parents=True) + writer = rows.utils.CsvLazyDictWriter(args.csv_filename) # TODO: use csv + yt = YouTube(key) + videos_urls = [] + channels = {} + for url in tqdm(args.url, desc="Retrieving channel IDs"): + url = url.strip() + if "/watch?" in url: + videos_urls.append(url) + continue + channel_id = yt.channel_id_from_url(url) + if not channel_id: + username = url.split("youtube.com/")[1].split("?")[0].split("/")[0] + logger.warning(f"Channel ID not found for URL {url}") + continue + channels[channel_id] = { + "id": channel_id, + "url": url, + } + for channel_id, playlist_id in yt.playlists_ids(list(channels.keys())).items(): + channels[channel_id]["playlist_id"] = playlist_id + fields = "id duration definition status views likes dislikes favorites comments channel_id title description published_at scheduled_to finished_at concurrent_viewers started_at".split() + # TODO: check fields + for data in tqdm(channels.values(), desc="Retrieving videos"): + try: + for video_batch in ipartition(yt.playlist_videos(data["playlist_id"]), 50): + for video in yt.videos_infos([row["id"] for row in video_batch]): + writer.writerow({field: video.get(field) for field in fields}) + except RuntimeError: # Cannot find playlist + continue + videos_ids = (video_url.split("watch?v=")[1].split("&")[0] for video_url in videos_urls) + for video in tqdm(yt.videos_infos(videos_ids), desc="Retrieving individual videos"): + writer.writerow({field: video.get(field) for field in fields}) + writer.close() + + # SEARCH + now = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc) + timezone_br = datetime.timezone(offset=datetime.timedelta(hours=-3)) + now_br = now.astimezone(timezone_br) + search_start = (now - datetime.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0) + search_stop = search_start + datetime.timedelta(hours=1) + + parent = Path(__file__).parent + parser = argparse.ArgumentParser() + parser.add_argument("--keys-filename", default=parent / "youtube-keys.csv") + parser.add_argument("--terms-filename", default=parent / "search-terms.csv") + parser.add_argument("--channels-filename", default=parent / "search-channels.csv") + parser.add_argument("--start", default=str(search_start)) + parser.add_argument("--stop", default=str(search_stop)) + parser.add_argument("--limit", type=int, default=20) + parser.add_argument("--order", default="viewCount") + parser.add_argument("data_path") + args = parser.parse_args() + + data_path = Path(args.data_path) + keys_filename = Path(args.keys_filename) + terms_filename = Path(args.terms_filename) + channels_filename = Path(args.channels_filename) + now_path_name = now_br.strftime("%Y-%m-%dT%H") + youtube_keys = read_keys(keys_filename) + channels_groups = read_channels(args.channels_filename) + search_start, search_stop = args.start, args.stop + if isinstance(search_start, str): + search_start = datetime.datetime.fromisoformat(search_start) + if isinstance(search_stop, str): + search_stop = datetime.datetime.fromisoformat(search_stop) + search_start_str = search_start.astimezone(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + search_stop_str = search_stop.astimezone(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + search_limit = args.limit + search_order = args.order + terms_categories = read_csv_dictlist(terms_filename, "categoria", "termo") + + print(search_start_str) + print(search_stop_str) + + search_start_br = search_start.astimezone(timezone_br) + result_filename = data_path / f"search_{search_start_br.strftime('%Y-%m-%dT%H')}.csv" + writer = rows.utils.CsvLazyDictWriter(result_filename) + search_results = youtube_search( + terms_categories=terms_categories, + keys=youtube_keys["search"], + start=search_start_str, + stop=search_stop_str, + channels_groups=channels_groups, + order=search_order, + limit=search_limit, + ) + for result in search_results: + writer.writerow(result) + writer.close() + elif args.command == "video-comments": print(f"Implement: {args.command}") # TODO: implement From f2540a8784ebc7bd884d22f08840d806523e9022 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lvaro=20Justen=20=28=40turicas=29?= Date: Sat, 8 Jun 2024 14:13:49 -0300 Subject: [PATCH 04/30] Add useful scripts (to be added to utils and CLI) --- scripts/channel_data.py | 187 ++++++++++++++++++++++++++++++++++++++++ scripts/clean_vtt.py | 43 +++++++++ 2 files changed, 230 insertions(+) create mode 100644 scripts/channel_data.py create mode 100644 scripts/clean_vtt.py diff --git a/scripts/channel_data.py b/scripts/channel_data.py new file mode 100644 index 0000000..e00b965 --- /dev/null +++ b/scripts/channel_data.py @@ -0,0 +1,187 @@ +# pip install youtool[livechat,transcription] +import argparse +import os +import json +import shelve +from pathlib import Path + +from chat_downloader.errors import ChatDisabled, LoginRequired, NoChatReplay +from tqdm import tqdm +from youtool import YouTube + + +class CsvLazyDictWriter: # Got and adapted from + """Lazy CSV dict writer, so you don't need to specify field names beforehand + + This class is almost the same as `csv.DictWriter` with the following + differences: + + - You don't need to pass `fieldnames` (it's extracted on the first + `.writerow` call); + - You can pass either a filename or a fobj (like `sys.stdout`); + """ + + def __init__(self, filename_or_fobj, encoding="utf-8", *args, **kwargs): + self.writer = None + self.filename_or_fobj = filename_or_fobj + self.encoding = encoding + self._fobj = None + self.writer_args = args + self.writer_kwargs = kwargs + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + + @property + def fobj(self): + if self._fobj is None: + if getattr(self.filename_or_fobj, "read", None) is not None: + self._fobj = self.filename_or_fobj + else: + self._fobj = open( + self.filename_or_fobj, mode="w", encoding=self.encoding + ) + + return self._fobj + + def writerow(self, row): + if self.writer is None: + self.writer = csv.DictWriter( + self.fobj, + fieldnames=list(row.keys()), + *self.writer_args, + **self.writer_kwargs + ) + self.writer.writeheader() + + self.writerow = self.writer.writerow + return self.writerow(row) + + def __del__(self): + self.close() + + def close(self): + if self._fobj and not self._fobj.closed: + self._fobj.close() + + +# TODO: add options to get only part of the data (not all steps) +parser = argparse.ArgumentParser() +parser.add_argument("--api-key", default=os.environ.get("YOUTUBE_API_KEY"), help="Comma-separated list of YouTube API keys to use") +parser.add_argument("username_or_channel_url", type=str) +parser.add_argument("data_path", type=Path) +parser.add_argument("language-code", default="pt-orig", help="See the list by running `yt-dlp --list-subs `") +args = parser.parse_args() + +if not args.api_key: + import sys + + print("ERROR: API key must be provided either by `--api-key` or `YOUTUBE_API_KEY` environment variable", file=sys.stderr) + exit(1) +api_keys = [key.strip() for key in args.api_key.split(",") if key.strip()] + + +username = args.username +if username.startswith("https://"): + channel_url = username + username = [item for item in username.split("/") if item][-1] +else: + channel_url = f"https://www.youtube.com/@{username}" +data_path = args.data_path +channel_csv_filename = data_path / f"{username}-channel.csv" +playlist_csv_filename = data_path / f"{username}-playlist.csv" +playlist_video_csv_filename = data_path / f"{username}-playlist-video.csv" +video_csv_filename = data_path / f"{username}-video.csv" +comment_csv_filename = data_path / f"{username}-comment.csv" +livechat_csv_filename = data_path / f"username}-livechat.csv" +language_code = args.language_code +video_transcription_path = data_path / Path(f"{username}-transcriptions") + +yt = YouTube(api_keys, disable_ipv6=True) +video_transcription_path.mkdir(parents=True, exist_ok=True) +channel_writer = CsvLazyDictWriter(channel_csv_filename) +playlist_writer = CsvLazyDictWriter(playlist_csv_filename) +video_writer = CsvLazyDictWriter(video_csv_filename) +comment_writer = CsvLazyDictWriter(comment_csv_filename) +livechat_writer = CsvLazyDictWriter(livechat_csv_filename) +playlist_video_writer = CsvLazyDictWriter(playlist_video_csv_filename) + +print("Retrieving channel info") +channel_id = yt.channel_id_from_url(channel_url) +channel_info = list(yt.channels_infos([channel_id]))[0] +channel_writer.writerow(channel_info) +channel_writer.close() + +main_playlist = { + "id": channel_info["playlist_id"], + "title": "Uploads", + "description": channel_info["description"], + "videos": channel_info["videos"], + "channel_id": channel_id, + "channel_title": channel_info["title"], + "published_at": channel_info["published_at"], + "thumbnail_url": channel_info["thumbnail_url"], +} +playlist_writer.writerow(main_playlist) +playlist_ids = [channel_info["playlist_id"]] +for playlist in tqdm(yt.channel_playlists(channel_id), desc="Retrieving channel playlists"): + playlist_writer.writerow(playlist) + playlist_ids.append(playlist["id"]) +playlist_writer.close() + +video_ids = [] +for playlist_id in tqdm(playlist_ids, desc="Retrieving playlists' videos"): + for video in yt.playlist_videos(playlist_id): + if video["id"] not in video_ids: + video_ids.append(video["id"]) + row = { + "playlist_id": playlist_id, + "video_id": video["id"], + "video_status": video["status"], + "channel_id": video["channel_id"], + "channel_title": video["channel_title"], + "playlist_channel_id": video["playlist_channel_id"], + "playlist_channel_title": video["playlist_channel_title"], + "title": video["title"], + "description": video["description"], + "published_at": video["published_at"], + "added_to_playlist_at": video["added_to_playlist_at"], + "tags": video["tags"], + } + playlist_video_writer.writerow(row) +playlist_video_writer.close() + +videos = [] +for video in tqdm(yt.videos_infos(video_ids), desc="Retrieving detailed video information"): + videos.append(video) + video_writer.writerow(video) +video_writer.close() + +for video_id in tqdm(video_ids, desc="Retrieving video comments"): + try: + for comment in yt.video_comments(video_id): + comment_writer.writerow(comment) + except RuntimeError: # Comments disabled + continue +comment_writer.close() + +print("Retrieving transcriptions") +yt.videos_transcriptions( + video_ids, + language_code=language_code, + path=video_transcription_path, + skip_downloaded=True, + batch_size=10, +) + +# TODO: live chat code will freeze if it's not available +for video_id in tqdm(video_ids, desc="Retrieving live chat"): + try: + for comment in yt.video_livechat(video_id): + livechat_writer.writerow(comment) + except (LoginRequired, NoChatReplay, ChatDisabled): + continue +livechat_writer.close() diff --git a/scripts/clean_vtt.py b/scripts/clean_vtt.py new file mode 100644 index 0000000..3412b59 --- /dev/null +++ b/scripts/clean_vtt.py @@ -0,0 +1,43 @@ +# pip install webvtt-py +import argparse +import io +import json +import os +import shelve +import time +from pathlib import Path + +import tiktoken +import webvtt +from openai import APITimeoutError, OpenAI +from rows.utils import CsvLazyDictWriter +from tqdm import tqdm + + +def vtt_clean(vtt_content, same_line=False): + result_lines, last_line = [], None + for caption in webvtt.read_buffer(io.StringIO(vtt_content)): + new_lines = caption.text.strip().splitlines() + for line in new_lines: + line = line.strip() + if not line or line == last_line: + continue + result_lines.append(f"{str(caption.start).split('.')[0]} {line}\n" if not same_line else f"{line} ") + last_line = line + return "".join(result_lines) + + +parser = argparse.ArgumentParser() +parser.add_argument("input_path", type=Path) +parser.add_argument("output_path", type=Path) +args = parser.parse_args() + +for filename in tqdm(args.input_path.glob("*.vtt")): + new_filename = args.output_path / filename.name + if new_filename.exists(): + continue + with filename.open() as fobj: + data = fobj.read() + result = vtt_clean(data) + with new_filename.open(mode="w") as fobj: + fobj.write(result) From 079e5ee2d94a781583067680c0c3f82a49cdb062 Mon Sep 17 00:00:00 2001 From: aninhasalesp Date: Tue, 25 Jun 2024 11:14:23 -0300 Subject: [PATCH 05/30] - Add argparse integration and command handling for Youtube CLI Tool --- youtool/cli.py | 179 ++++++++++--------------------------------------- 1 file changed, 37 insertions(+), 142 deletions(-) diff --git a/youtool/cli.py b/youtool/cli.py index ff675a3..6926185 100644 --- a/youtool/cli.py +++ b/youtool/cli.py @@ -1,158 +1,53 @@ import argparse import os -import sys +from commands import COMMANDS -def main(): - parser = argparse.ArgumentParser() - parser.add_argument("--api-key") - subparsers = parser.add_subparsers(required=True, dest="command") - - api_key = args.api_key or os.environ.get("YOUTUBE_API_KEY") - - cmd_channel_id = subparsers.add_parser("channel-id", help="Get channel IDs from a list of URLs (or CSV filename with URLs inside), generate CSV output (just the IDs)") - cmd_channel_info = subparsers.add_parser("channel-info", help="Get channel info from a list of IDs (or CSV filename with IDs inside), generate CSV output (same schema for `channel` dicts)") - cmd_video_info = subparsers.add_parser("video-info", help="Get video info from a list of IDs or URLs (or CSV filename with URLs/IDs inside), generate CSV output (same schema for `video` dicts)") - cmd_video_search = subparsers.add_parser("video-search", help="Get video info from a list of IDs or URLs (or CSV filename with URLs/IDs inside), generate CSV output (simplified `video` dict schema or option to get full video info after)") - cmd_video_comments = subparsers.add_parser("video-comments", help="Get comments from a video ID, generate CSV output (same schema for `comment` dicts)") - cmd_video_livechat = subparsers.add_parser("video-livechat", help="Get comments from a video ID, generate CSV output (same schema for `chat_message` dicts)") - cmd_video_transcriptions = subparsers.add_parser("video-transcription", help="Download video transcriptions based on language code, path and list of video IDs or URLs (or CSV filename with URLs/IDs inside), download files to destination and report results") - - args = parser.parse_args() - - if args.command == "channel-id": - print(f"Implement: {args.command}") # TODO: implement - - elif args.command == "channel-info": - print(f"Implement: {args.command}") # TODO: implement - - elif args.command == "video-info": - print(f"Implement: {args.command}") # TODO: implement - elif args.command == "video-search": - print(f"Implement: {args.command}") # TODO: implement - exit(1) - - # TODO: update code below based on new YouTube class API - import rows - from loguru import logger - from tqdm import tqdm - - from youtool import YouTube - - parser = argparse.ArgumentParser() - parser.add_argument("--key") - parser.add_argument("csv_filename") - parser.add_argument("url", nargs="+") - args = parser.parse_args() - - key = args.key or os.environ.get("YOUTUBE_API_KEY") - if not key: - print("ERROR: Must provide an API key (--key or YOUTUBE_API_KEY env var)", file=sys.stderr) - exit(1) - - if not Path(args.csv_filename).parent.exists(): - Path(args.csv_filename).parent.mkdir(parents=True) - writer = rows.utils.CsvLazyDictWriter(args.csv_filename) # TODO: use csv - yt = YouTube(key) - videos_urls = [] - channels = {} - for url in tqdm(args.url, desc="Retrieving channel IDs"): - url = url.strip() - if "/watch?" in url: - videos_urls.append(url) - continue - channel_id = yt.channel_id_from_url(url) - if not channel_id: - username = url.split("youtube.com/")[1].split("?")[0].split("/")[0] - logger.warning(f"Channel ID not found for URL {url}") - continue - channels[channel_id] = { - "id": channel_id, - "url": url, - } - for channel_id, playlist_id in yt.playlists_ids(list(channels.keys())).items(): - channels[channel_id]["playlist_id"] = playlist_id - fields = "id duration definition status views likes dislikes favorites comments channel_id title description published_at scheduled_to finished_at concurrent_viewers started_at".split() - # TODO: check fields - for data in tqdm(channels.values(), desc="Retrieving videos"): - try: - for video_batch in ipartition(yt.playlist_videos(data["playlist_id"]), 50): - for video in yt.videos_infos([row["id"] for row in video_batch]): - writer.writerow({field: video.get(field) for field in fields}) - except RuntimeError: # Cannot find playlist - continue - videos_ids = (video_url.split("watch?v=")[1].split("&")[0] for video_url in videos_urls) - for video in tqdm(yt.videos_infos(videos_ids), desc="Retrieving individual videos"): - writer.writerow({field: video.get(field) for field in fields}) - writer.close() - - # SEARCH - now = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc) - timezone_br = datetime.timezone(offset=datetime.timedelta(hours=-3)) - now_br = now.astimezone(timezone_br) - search_start = (now - datetime.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0) - search_stop = search_start + datetime.timedelta(hours=1) +def main(): + """ + Main function for the YouTube CLI Tool. - parent = Path(__file__).parent - parser = argparse.ArgumentParser() - parser.add_argument("--keys-filename", default=parent / "youtube-keys.csv") - parser.add_argument("--terms-filename", default=parent / "search-terms.csv") - parser.add_argument("--channels-filename", default=parent / "search-channels.csv") - parser.add_argument("--start", default=str(search_start)) - parser.add_argument("--stop", default=str(search_stop)) - parser.add_argument("--limit", type=int, default=20) - parser.add_argument("--order", default="viewCount") - parser.add_argument("data_path") - args = parser.parse_args() + This function sets up the argument parser for the CLI tool, including options for the YouTube API key and + command-specific subparsers. It then parses the command-line arguments, retrieving the YouTube API key + from either the command-line argument '--api-key' or the environment variable 'YOUTUBE_API_KEY'. If the API + key is not provided through any means, it raises an argparse.ArgumentError. - data_path = Path(args.data_path) - keys_filename = Path(args.keys_filename) - terms_filename = Path(args.terms_filename) - channels_filename = Path(args.channels_filename) - now_path_name = now_br.strftime("%Y-%m-%dT%H") - youtube_keys = read_keys(keys_filename) - channels_groups = read_channels(args.channels_filename) - search_start, search_stop = args.start, args.stop - if isinstance(search_start, str): - search_start = datetime.datetime.fromisoformat(search_start) - if isinstance(search_stop, str): - search_stop = datetime.datetime.fromisoformat(search_stop) - search_start_str = search_start.astimezone(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") - search_stop_str = search_stop.astimezone(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") - search_limit = args.limit - search_order = args.order - terms_categories = read_csv_dictlist(terms_filename, "categoria", "termo") + Finally, the function executes the appropriate command based on the parsed arguments. If an exception occurs + during the execution of the command, it is caught and raised as an argparse error for proper handling. - print(search_start_str) - print(search_stop_str) + Raises: + argparse.ArgumentError: If the YouTube API key is not provided. + argparse.ArgumentError: If there is an error during the execution of the command. - search_start_br = search_start.astimezone(timezone_br) - result_filename = data_path / f"search_{search_start_br.strftime('%Y-%m-%dT%H')}.csv" - writer = rows.utils.CsvLazyDictWriter(result_filename) - search_results = youtube_search( - terms_categories=terms_categories, - keys=youtube_keys["search"], - start=search_start_str, - stop=search_stop_str, - channels_groups=channels_groups, - order=search_order, - limit=search_limit, - ) - for result in search_results: - writer.writerow(result) - writer.close() + """ + parser = argparse.ArgumentParser(description="CLI Tool for managing YouTube videos add playlists") + parser.add_argument("--api-key", type=str, help="YouTube API Key", dest="api_key") + parser.add_argument("--debug", type=bool, help="Debug mode", dest="debug") + + subparsers = parser.add_subparsers(required=True, dest="command", title="Command", help="Command to be executed") + # cmd_video_search = subparsers.add_parser("video-search", help="Get video info from a list of IDs or URLs (or CSV filename with URLs/IDs inside), generate CSV output (simplified `video` dict schema or option to get full video info after)") + # cmd_video_comments = subparsers.add_parser("video-comments", help="Get comments from a video ID, generate CSV output (same schema for `comment` dicts)") + # cmd_video_livechat = subparsers.add_parser("video-livechat", help="Get comments from a video ID, generate CSV output (same schema for `chat_message` dicts)") + # cmd_video_transcriptions = subparsers.add_parser("video-transcription", help="Download video transcriptions based on language code, path and list of video IDs or URLs (or CSV filename with URLs/IDs inside), download files to destination and report results") - elif args.command == "video-comments": - print(f"Implement: {args.command}") # TODO: implement + for command in COMMANDS: + command.parse_arguments(subparsers) - elif args.command == "video-livechat": - print(f"Implement: {args.command}") # TODO: implement + args = parser.parse_args() + args.api_key = args.api_key or os.environ.get("YOUTUBE_API_KEY") - elif args.command == "video-transcription": - print(f"Implement: {args.command}") # TODO: implement + if not args.api_key: + parser.error("YouTube API Key is required") + + try: + print(args.func(**args.__dict__)) + except Exception as error: + if args.debug: + raise error + parser.error(error) if __name__ == "__main__": - main() + main() \ No newline at end of file From 4c5d15124a2f54ed56ba13e7d54ee962b7769881 Mon Sep 17 00:00:00 2001 From: aninhasalesp Date: Tue, 25 Jun 2024 11:16:11 -0300 Subject: [PATCH 06/30] - Implemented method to extract URLs from a CSV file; - Implemented method to convert a list of dictionaries into a CSV file or string; --- youtool/commands/base.py | 115 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 youtool/commands/base.py diff --git a/youtool/commands/base.py b/youtool/commands/base.py new file mode 100644 index 0000000..165a48f --- /dev/null +++ b/youtool/commands/base.py @@ -0,0 +1,115 @@ +import csv +import argparse + +from typing import List, Dict, Any, Self +from io import StringIO +from pathlib import Path +from datetime import datetime + + +class Command(): + """ + A base class for commands to inherit from, following a specific structure. + + Attributes: + name (str): The name of the command. + arguments (List[Dict[str, Any]]): A list of dictionaries, each representing an argument for the command. + """ + name: str + arguments: List[Dict[str, Any]] + + @classmethod + def generate_parser(cls: Self, subparsers: argparse._SubParsersAction): + """ + Creates a parser for the command and adds it to the subparsers. + + Args: + subparsers (argparse._SubParsersAction): The subparsers action to add the parser to. + + Returns: + argparse.ArgumentParser: The parser for the command. + """ + return subparsers.add_parser(cls.name, help=cls.__doc__) + + @classmethod + def parse_arguments(cls: Self, subparsers: argparse._SubParsersAction) -> None: + """ + Parses the arguments for the command and sets the command's execute method as the default function to call. + + Args: + subparsers (argparse._SubParsersAction): The subparsers action to add the parser to. + """ + parser = cls.generate_parser(subparsers) + for argument in cls.arguments: + argument_copy = {**argument} + argument_name = argument_copy.pop("name") + parser.add_argument(argument_name, **argument_copy) + parser.set_defaults(func=cls.execute) + + @classmethod + def execute(cls: Self, arguments: argparse.Namespace): + """ + Executes the command. + + This method should be overridden by subclasses to define the command's behavior. + + Args: + arguments (argparse.Namespace): The parsed arguments for the command. + """ + raise NotImplementedError() + + @staticmethod + def data_from_csv(file_path: str, data_column_name: str = None) -> List[str]: + """ + Extracts a list of URLs from a specified CSV file. + + Args: file_path (str): The path to the CSV file containing the URLs. + data_column_name (str, optional): The name of the column in the CSV file that contains the URLs. + If not provided, it defaults to `ChannelId.URL_COLUMN_NAME`. + + Returns: + List[str]: A list of URLs extracted from the specified CSV file. + + Raises: + Exception: If the file path is invalid or the file cannot be found. + """ + data = [] + + file_path = Path(file_path) + if not file_path.is_file(): + raise FileNotFoundError(f"Invalid file path: {file_path}") + + with file_path.open('r', newline='') as csv_file: + reader = csv.DictReader(csv_file) + if data_column_name not in reader.fieldnames: + raise Exception(f"Column {data_column_name} not found on {file_path}") + for row in reader: + data.append(row.get(data_column_name)) + return data + + @classmethod + def data_to_csv(cls: Self, data: List[Dict], output_file_path: str = None) -> str: + """ + Converts a list of channel IDs into a CSV file. + + Parameters: + channels_ids (List[str]): List of channel IDs to be written to the CSV. + output_file_path (str, optional): Path to the file where the CSV will be saved. If not provided, the CSV will be returned as a string. + channel_id_column_name (str, optional): Name of the column in the CSV that will contain the channel IDs. + If not provided, the default value defined in ChannelId.CHANNEL_ID_COLUMN_NAME will be used. + + Returns: + str: The path of the created CSV file or, if no path is provided, the contents of the CSV as a string. + """ + if output_file_path: + output_path = Path(output_file_path) + if output_path.is_dir(): + command_name = cls.name.replace("-", "_") + timestamp = datetime.now().strftime("%M%S%f") + output_file_path = output_path / f"{command_name}_{timestamp}.csv" + + with (Path(output_file_path).open('w', newline='') if output_file_path else StringIO()) as csv_file: + writer = csv.DictWriter(csv_file, fieldnames=list(data[0].keys()) if data else []) + writer.writeheader() + writer.writerows(data) + return str(output_file_path) if output_file_path else csv_file.getvalue() \ No newline at end of file From 943f6b07b3da175e42366e0b05270adb021f0eac Mon Sep 17 00:00:00 2001 From: aninhasalesp Date: Tue, 25 Jun 2024 11:20:00 -0300 Subject: [PATCH 07/30] - Implemented command to extract YouTube channel IDs from a list of URLs or a CSV file containing URLs; - Added commands directory structure --- youtool/commands/channel_id.py | 85 ++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 youtool/commands/channel_id.py diff --git a/youtool/commands/channel_id.py b/youtool/commands/channel_id.py new file mode 100644 index 0000000..2233d33 --- /dev/null +++ b/youtool/commands/channel_id.py @@ -0,0 +1,85 @@ +import csv + +from typing import Self + +from youtool import YouTube + +from .base import Command + + +class ChannelId(Command): + """ + Get channel IDs from a list of URLs (or CSV filename with URLs inside), generate CSV output (just the IDs) + """ + name = "channel-id" + arguments = [ + {"name": "--urls", "type": str, "help": "Channels urls", "nargs": "*"}, + {"name": "--urls-file-path", "type": str, "help": "Channels urls csv file path"}, + {"name": "--output-file-path", "type": str, "help": "Output csv file path"}, + {"name": "--url-column-name", "type": str, "help": "URL column name on csv input files"}, + {"name": "--id-column-name", "type": str, "help": "Channel ID column name on csv output files"} + ] + + URL_COLUMN_NAME: str = "channel_url" + CHANNEL_ID_COLUMN_NAME: str = "channel_id" + + @classmethod + def execute(cls: Self, **kwargs) -> str: + """ + Execute the channel-id command to fetch YouTube channel IDs from URLs and save them to a CSV file. + + This method retrieves YouTube channel IDs from a list of provided URLs or from a file containing URLs. + It then saves these channel IDs to a CSV file if an output file path is specified. + + Args: + urls (list[str], optional): A list of YouTube channel URLs. Either this or urls_file_path must be provided. + urls_file_path (str, optional): Path to a CSV file containing YouTube channel URLs. + Requires url_column_name to specify the column with URLs. + output_file_path (str, optional): Path to the output CSV file where channel IDs will be saved. + If not provided, the result will be returned as a string. + api_key (str): The API key to authenticate with the YouTube Data API. + url_column_name (str, optional): The name of the column in the urls_file_path CSV file that contains the URLs. + Default is "url". + id_column_name (str, optional): The name of the column for channel IDs in the output CSV file. + Default is "channel_id". + + Returns: + str: A message indicating the result of the command. If output_file_path is specified, the message will + include the path to the generated CSV file. Otherwise, it will return the result as a string. + + Raises: + Exception: If neither urls nor urls_file_path is provided. + """ + urls = kwargs.get("urls") + urls_file_path = kwargs.get("urls_file_path") + output_file_path = kwargs.get("output_file_path") + api_key = kwargs.get("api_key") + + url_column_name = kwargs.get("url_column_name") + id_column_name = kwargs.get("id_column_name") + + if urls_file_path and not urls: + urls = cls.data_from_csv( + file_path=urls_file_path, + data_column_name=url_column_name or cls.URL_COLUMN_NAME + ) + + if not urls: + raise Exception("Either 'username' or 'url' must be provided for the channel-id command") + + youtube = YouTube([api_key], disable_ipv6=True) + + channels_ids = [ + youtube.channel_id_from_url(url) for url in urls if url + ] + + result = cls.data_to_csv( + data=[ + { + (id_column_name or cls.CHANNEL_ID_COLUMN_NAME): channel_id for channel_id in channels_ids + } + ], + output_file_path=output_file_path + ) + + return result \ No newline at end of file From b4f82e5bd4e03da73274175c544dcb91a41e6ef4 Mon Sep 17 00:00:00 2001 From: aninhasalesp Date: Tue, 25 Jun 2024 11:22:28 -0300 Subject: [PATCH 08/30] - Added to the list; --- youtool/commands/__init__.py | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 youtool/commands/__init__.py diff --git a/youtool/commands/__init__.py b/youtool/commands/__init__.py new file mode 100644 index 0000000..9d1c702 --- /dev/null +++ b/youtool/commands/__init__.py @@ -0,0 +1,10 @@ +from .channel_id import ChannelId + + +COMMANDS = [ + ChannelId +] + +__all__ = [ + COMMANDS, ChannelId +] \ No newline at end of file From 525015e5d7eb332efdf876f3da4a0b28ba4d9f8f Mon Sep 17 00:00:00 2001 From: Ana Paula Sales Date: Wed, 26 Jun 2024 16:42:07 -0300 Subject: [PATCH 09/30] Update cli.py fix: show error with parser if not in debug mode --- youtool/cli.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/youtool/cli.py b/youtool/cli.py index 6926185..dce4356 100644 --- a/youtool/cli.py +++ b/youtool/cli.py @@ -46,8 +46,8 @@ def main(): except Exception as error: if args.debug: raise error - parser.error(error) + parser.error(error) if __name__ == "__main__": - main() \ No newline at end of file + main() From 4fba6d47b303428b8415c557d3f4c854bfaccdde Mon Sep 17 00:00:00 2001 From: aninhasalesp Date: Thu, 27 Jun 2024 18:40:48 -0300 Subject: [PATCH 10/30] - Removed the type annotation from the method; - Changed file path passing to use from in the method; --- youtool/commands/base.py | 29 +++++++++++++++++------------ youtool/commands/channel_id.py | 6 +++--- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/youtool/commands/base.py b/youtool/commands/base.py index 165a48f..81deb61 100644 --- a/youtool/commands/base.py +++ b/youtool/commands/base.py @@ -1,7 +1,7 @@ import csv import argparse -from typing import List, Dict, Any, Self +from typing import List, Dict, Any, Optional from io import StringIO from pathlib import Path from datetime import datetime @@ -19,7 +19,7 @@ class Command(): arguments: List[Dict[str, Any]] @classmethod - def generate_parser(cls: Self, subparsers: argparse._SubParsersAction): + def generate_parser(cls, subparsers: argparse._SubParsersAction): """ Creates a parser for the command and adds it to the subparsers. @@ -32,7 +32,7 @@ def generate_parser(cls: Self, subparsers: argparse._SubParsersAction): return subparsers.add_parser(cls.name, help=cls.__doc__) @classmethod - def parse_arguments(cls: Self, subparsers: argparse._SubParsersAction) -> None: + def parse_arguments(cls, subparsers: argparse._SubParsersAction) -> None: """ Parses the arguments for the command and sets the command's execute method as the default function to call. @@ -47,7 +47,7 @@ def parse_arguments(cls: Self, subparsers: argparse._SubParsersAction) -> None: parser.set_defaults(func=cls.execute) @classmethod - def execute(cls: Self, arguments: argparse.Namespace): + def execute(cls, arguments: argparse.Namespace): """ Executes the command. @@ -59,36 +59,41 @@ def execute(cls: Self, arguments: argparse.Namespace): raise NotImplementedError() @staticmethod - def data_from_csv(file_path: str, data_column_name: str = None) -> List[str]: + def data_from_csv(file_path: Path, data_column_name: Optional[str] = None) -> List[str]: """ Extracts a list of URLs from a specified CSV file. - Args: file_path (str): The path to the CSV file containing the URLs. - data_column_name (str, optional): The name of the column in the CSV file that contains the URLs. - If not provided, it defaults to `ChannelId.URL_COLUMN_NAME`. + Args: + file_path: The path to the CSV file containing the URLs. + data_column_name: The name of the column in the CSV file that contains the URLs. + If not provided, it defaults to `ChannelId.URL_COLUMN_NAME`. Returns: - List[str]: A list of URLs extracted from the specified CSV file. + A list of URLs extracted from the specified CSV file. Raises: Exception: If the file path is invalid or the file cannot be found. """ data = [] - file_path = Path(file_path) if not file_path.is_file(): raise FileNotFoundError(f"Invalid file path: {file_path}") with file_path.open('r', newline='') as csv_file: reader = csv.DictReader(csv_file) - if data_column_name not in reader.fieldnames: + fieldnames = reader.fieldnames + + if fieldnames is None: + raise ValueError("Fieldnames is None") + + if data_column_name not in fieldnames: raise Exception(f"Column {data_column_name} not found on {file_path}") for row in reader: data.append(row.get(data_column_name)) return data @classmethod - def data_to_csv(cls: Self, data: List[Dict], output_file_path: str = None) -> str: + def data_to_csv(cls, data: List[Dict], output_file_path: Optional[str] = None) -> str: """ Converts a list of channel IDs into a CSV file. diff --git a/youtool/commands/channel_id.py b/youtool/commands/channel_id.py index 2233d33..c648342 100644 --- a/youtool/commands/channel_id.py +++ b/youtool/commands/channel_id.py @@ -1,6 +1,6 @@ import csv -from typing import Self +from pathlib import Path from youtool import YouTube @@ -24,7 +24,7 @@ class ChannelId(Command): CHANNEL_ID_COLUMN_NAME: str = "channel_id" @classmethod - def execute(cls: Self, **kwargs) -> str: + def execute(cls, **kwargs) -> str: """ Execute the channel-id command to fetch YouTube channel IDs from URLs and save them to a CSV file. @@ -60,7 +60,7 @@ def execute(cls: Self, **kwargs) -> str: if urls_file_path and not urls: urls = cls.data_from_csv( - file_path=urls_file_path, + file_path=Path(urls_file_path), data_column_name=url_column_name or cls.URL_COLUMN_NAME ) From 2ba79df4234e90572c289ceb660a85f6bb980138 Mon Sep 17 00:00:00 2001 From: aninhasalesp Date: Thu, 27 Jun 2024 19:22:31 -0300 Subject: [PATCH 11/30] - Add changed the method signature in the class to accept (**kwargs) and return a string; - Added logic to convert values retrieved from the CSV file to strings before appending them to the data list; --- youtool/commands/base.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/youtool/commands/base.py b/youtool/commands/base.py index 81deb61..6c2ddb0 100644 --- a/youtool/commands/base.py +++ b/youtool/commands/base.py @@ -47,7 +47,7 @@ def parse_arguments(cls, subparsers: argparse._SubParsersAction) -> None: parser.set_defaults(func=cls.execute) @classmethod - def execute(cls, arguments: argparse.Namespace): + def execute(cls, **kwargs) -> str: """ Executes the command. @@ -89,7 +89,9 @@ def data_from_csv(file_path: Path, data_column_name: Optional[str] = None) -> Li if data_column_name not in fieldnames: raise Exception(f"Column {data_column_name} not found on {file_path}") for row in reader: - data.append(row.get(data_column_name)) + value = row.get(data_column_name) + if value is not None: + data.append(str(value)) return data @classmethod From 8ab5185e82d4b460bc70b0807e527e9d78447d30 Mon Sep 17 00:00:00 2001 From: aninhasalesp Date: Thu, 27 Jun 2024 21:16:04 -0300 Subject: [PATCH 12/30] - Fixed typing error in all in the file. --- youtool/commands/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/youtool/commands/__init__.py b/youtool/commands/__init__.py index 9d1c702..985024e 100644 --- a/youtool/commands/__init__.py +++ b/youtool/commands/__init__.py @@ -6,5 +6,5 @@ ] __all__ = [ - COMMANDS, ChannelId + "COMMANDS", "ChannelId" ] \ No newline at end of file From 6b283205a6bde8cb04d63725da41dc8dba7f6af1 Mon Sep 17 00:00:00 2001 From: aninhasalesp Date: Thu, 27 Jun 2024 22:21:05 -0300 Subject: [PATCH 13/30] Add updates docstrings --- youtool/cli.py | 4 +--- youtool/commands/base.py | 24 +++++++++--------------- youtool/commands/channel_id.py | 10 +++------- 3 files changed, 13 insertions(+), 25 deletions(-) diff --git a/youtool/cli.py b/youtool/cli.py index dce4356..7875342 100644 --- a/youtool/cli.py +++ b/youtool/cli.py @@ -5,8 +5,7 @@ def main(): - """ - Main function for the YouTube CLI Tool. + """Main function for the YouTube CLI Tool. This function sets up the argument parser for the CLI tool, including options for the YouTube API key and command-specific subparsers. It then parses the command-line arguments, retrieving the YouTube API key @@ -19,7 +18,6 @@ def main(): Raises: argparse.ArgumentError: If the YouTube API key is not provided. argparse.ArgumentError: If there is an error during the execution of the command. - """ parser = argparse.ArgumentParser(description="CLI Tool for managing YouTube videos add playlists") parser.add_argument("--api-key", type=str, help="YouTube API Key", dest="api_key") diff --git a/youtool/commands/base.py b/youtool/commands/base.py index 6c2ddb0..5598afd 100644 --- a/youtool/commands/base.py +++ b/youtool/commands/base.py @@ -7,9 +7,8 @@ from datetime import datetime -class Command(): - """ - A base class for commands to inherit from, following a specific structure. +class Command: + """A base class for commands to inherit from, following a specific structure. Attributes: name (str): The name of the command. @@ -20,8 +19,7 @@ class Command(): @classmethod def generate_parser(cls, subparsers: argparse._SubParsersAction): - """ - Creates a parser for the command and adds it to the subparsers. + """Creates a parser for the command and adds it to the subparsers. Args: subparsers (argparse._SubParsersAction): The subparsers action to add the parser to. @@ -33,8 +31,7 @@ def generate_parser(cls, subparsers: argparse._SubParsersAction): @classmethod def parse_arguments(cls, subparsers: argparse._SubParsersAction) -> None: - """ - Parses the arguments for the command and sets the command's execute method as the default function to call. + """Parses the arguments for the command and sets the command's execute method as the default function to call. Args: subparsers (argparse._SubParsersAction): The subparsers action to add the parser to. @@ -47,9 +44,8 @@ def parse_arguments(cls, subparsers: argparse._SubParsersAction) -> None: parser.set_defaults(func=cls.execute) @classmethod - def execute(cls, **kwargs) -> str: - """ - Executes the command. + def execute(cls, **kwargs) -> str: # noqa: D417 + """Executes the command. This method should be overridden by subclasses to define the command's behavior. @@ -60,8 +56,7 @@ def execute(cls, **kwargs) -> str: @staticmethod def data_from_csv(file_path: Path, data_column_name: Optional[str] = None) -> List[str]: - """ - Extracts a list of URLs from a specified CSV file. + """Extracts a list of URLs from a specified CSV file. Args: file_path: The path to the CSV file containing the URLs. @@ -96,13 +91,12 @@ def data_from_csv(file_path: Path, data_column_name: Optional[str] = None) -> Li @classmethod def data_to_csv(cls, data: List[Dict], output_file_path: Optional[str] = None) -> str: - """ - Converts a list of channel IDs into a CSV file. + """Converts a list of channel IDs into a CSV file. Parameters: channels_ids (List[str]): List of channel IDs to be written to the CSV. output_file_path (str, optional): Path to the file where the CSV will be saved. If not provided, the CSV will be returned as a string. - channel_id_column_name (str, optional): Name of the column in the CSV that will contain the channel IDs. + channel_id_column_name (str, optional): Name of the column in the CSV that will contain the channel IDs. If not provided, the default value defined in ChannelId.CHANNEL_ID_COLUMN_NAME will be used. Returns: diff --git a/youtool/commands/channel_id.py b/youtool/commands/channel_id.py index c648342..8e1d004 100644 --- a/youtool/commands/channel_id.py +++ b/youtool/commands/channel_id.py @@ -1,4 +1,3 @@ -import csv from pathlib import Path @@ -8,9 +7,7 @@ class ChannelId(Command): - """ - Get channel IDs from a list of URLs (or CSV filename with URLs inside), generate CSV output (just the IDs) - """ + """Get channel IDs from a list of URLs (or CSV filename with URLs inside), generate CSV output (just the IDs).""" name = "channel-id" arguments = [ {"name": "--urls", "type": str, "help": "Channels urls", "nargs": "*"}, @@ -24,9 +21,8 @@ class ChannelId(Command): CHANNEL_ID_COLUMN_NAME: str = "channel_id" @classmethod - def execute(cls, **kwargs) -> str: - """ - Execute the channel-id command to fetch YouTube channel IDs from URLs and save them to a CSV file. + def execute(cls, **kwargs) -> str: # noqa: D417 + """Execute the channel-id command to fetch YouTube channel IDs from URLs and save them to a CSV file. This method retrieves YouTube channel IDs from a list of provided URLs or from a file containing URLs. It then saves these channel IDs to a CSV file if an output file path is specified. From dfc2011450d48e18effe62f2338947ad72944e8c Mon Sep 17 00:00:00 2001 From: aninhasalesp Date: Tue, 2 Jul 2024 00:52:05 -0300 Subject: [PATCH 14/30] Update import --- youtool/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/youtool/cli.py b/youtool/cli.py index 7875342..961d2e6 100644 --- a/youtool/cli.py +++ b/youtool/cli.py @@ -1,7 +1,7 @@ import argparse import os -from commands import COMMANDS +from youtool.commands import COMMANDS def main(): From b1b33670fdebed9e5418ea9ea1824547f25b302a Mon Sep 17 00:00:00 2001 From: aninhasalesp Date: Tue, 2 Jul 2024 00:53:41 -0300 Subject: [PATCH 15/30] Add update command into the file --- youtool/commands/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/youtool/commands/__init__.py b/youtool/commands/__init__.py index 985024e..89bbc09 100644 --- a/youtool/commands/__init__.py +++ b/youtool/commands/__init__.py @@ -1,10 +1,10 @@ +from .base import Command from .channel_id import ChannelId - COMMANDS = [ ChannelId ] __all__ = [ - "COMMANDS", "ChannelId" -] \ No newline at end of file + "Command", "COMMANDS", "ChannelId", +] From 28b2574278a16e4b28cf0aeaa88347881f09f2fd Mon Sep 17 00:00:00 2001 From: aninhasalesp Date: Tue, 2 Jul 2024 00:55:04 -0300 Subject: [PATCH 16/30] Add update --- youtool/commands/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/youtool/commands/base.py b/youtool/commands/base.py index 5598afd..077c826 100644 --- a/youtool/commands/base.py +++ b/youtool/commands/base.py @@ -113,4 +113,4 @@ def data_to_csv(cls, data: List[Dict], output_file_path: Optional[str] = None) - writer = csv.DictWriter(csv_file, fieldnames=list(data[0].keys()) if data else []) writer.writeheader() writer.writerows(data) - return str(output_file_path) if output_file_path else csv_file.getvalue() \ No newline at end of file + return str(output_file_path) if output_file_path else csv_file.getvalue() From fe180fb7efa5b1663ce413a816332ec7231a58a0 Mon Sep 17 00:00:00 2001 From: aninhasalesp Date: Tue, 2 Jul 2024 00:56:19 -0300 Subject: [PATCH 17/30] Add improvements to the file --- youtool/commands/channel_id.py | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/youtool/commands/channel_id.py b/youtool/commands/channel_id.py index 8e1d004..d42f311 100644 --- a/youtool/commands/channel_id.py +++ b/youtool/commands/channel_id.py @@ -54,14 +54,7 @@ def execute(cls, **kwargs) -> str: # noqa: D417 url_column_name = kwargs.get("url_column_name") id_column_name = kwargs.get("id_column_name") - if urls_file_path and not urls: - urls = cls.data_from_csv( - file_path=Path(urls_file_path), - data_column_name=url_column_name or cls.URL_COLUMN_NAME - ) - - if not urls: - raise Exception("Either 'username' or 'url' must be provided for the channel-id command") + urls = cls.resolve_urls(urls, urls_file_path, url_column_name) youtube = YouTube([api_key], disable_ipv6=True) @@ -72,10 +65,22 @@ def execute(cls, **kwargs) -> str: # noqa: D417 result = cls.data_to_csv( data=[ { - (id_column_name or cls.CHANNEL_ID_COLUMN_NAME): channel_id for channel_id in channels_ids - } + (id_column_name or cls.CHANNEL_ID_COLUMN_NAME): channel_id + } for channel_id in channels_ids ], output_file_path=output_file_path ) - return result \ No newline at end of file + return result + + @classmethod + def resolve_urls(cls, urls, urls_file_path, url_column_name): + if urls_file_path and not urls: + urls = cls.data_from_csv( + file_path=Path(urls_file_path), + data_column_name=url_column_name or cls.URL_COLUMN_NAME + ) + + if not urls: + raise Exception("Either 'username' or 'url' must be provided for the channel-id command") + return urls From d4e66b4209628a8e28c6c8ec43c3f93f3de93a64 Mon Sep 17 00:00:00 2001 From: aninhasalesp Date: Tue, 2 Jul 2024 00:58:13 -0300 Subject: [PATCH 18/30] Add test for cli file --- tests/test_cli.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 tests/test_cli.py diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000..3a489ee --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,20 @@ +import pytest + +from subprocess import run + +from youtool.commands import COMMANDS + +from youtool.commands.base import Command + + +@pytest.mark.parametrize( + "command", COMMANDS +) +def test_missing_api_key(monkeypatch: pytest.MonkeyPatch, command: Command): + monkeypatch.delenv('YOUTUBE_API_KEY', raising=False) + cli_path = "youtool/cli.py" + command = ["python", cli_path, command.name] + result = run(command, capture_output=True, text=True, check=False) + + assert result.returncode == 2 + assert "YouTube API Key is required" in result.stderr \ No newline at end of file From 4bf29ff4e4c6f7ab8143d2a424cac2b972b669b9 Mon Sep 17 00:00:00 2001 From: aninhasalesp Date: Tue, 2 Jul 2024 00:59:00 -0300 Subject: [PATCH 19/30] Add test for base file --- tests/commands/__init__.py | 0 tests/commands/test_base.py | 127 ++++++++++++++++++++++++++++++++++++ 2 files changed, 127 insertions(+) create mode 100644 tests/commands/__init__.py create mode 100644 tests/commands/test_base.py diff --git a/tests/commands/__init__.py b/tests/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/commands/test_base.py b/tests/commands/test_base.py new file mode 100644 index 0000000..9d3ad90 --- /dev/null +++ b/tests/commands/test_base.py @@ -0,0 +1,127 @@ +import csv +import argparse +import pytest + +from io import StringIO +from datetime import datetime +from pathlib import Path +from unittest.mock import MagicMock, patch, mock_open +from youtool.commands import Command + + +class TestCommand(Command): + name = "command_name" + arguments = [ + {"name": "--test-arg", "help": "Test argument", "default": "default_value", "type": str} + ] + + @classmethod + def execute(cls, **kwargs): + return "executed" + +@pytest.fixture +def subparsers(): + parser = argparse.ArgumentParser() + return parser.add_subparsers() + + +def test_generate_parser(subparsers): + parser = TestCommand.generate_parser(subparsers) + + assert parser is not None, "Parser should not be None" + assert isinstance(parser, argparse.ArgumentParser), "Parser should be an instance of argparse.ArgumentParser" + assert parser.prog.endswith(TestCommand.name), f"Parser prog should end with '{TestCommand.name}'" + + +def test_parse_arguments(subparsers): + subparsers_mock = MagicMock(spec=subparsers) + + TestCommand.parse_arguments(subparsers_mock) + + subparsers_mock.add_parser.assert_called_once_with(TestCommand.name, help=TestCommand.__doc__) + parser_mock = subparsers_mock.add_parser.return_value + parser_mock.add_argument.assert_called_once_with("--test-arg", help="Test argument", default="default_value", type=str) + parser_mock.set_defaults.assert_called_once_with(func=TestCommand.execute) + + +def test_command(): + class MyCommand(Command): + pass + + with pytest.raises(NotImplementedError): + MyCommand.execute() + + +@pytest.fixture +def mock_csv_file(): + + csv_content = """URL + http://example.com + http://example2.com + """ + return csv_content + +def test_data_from_csv_valid(mock_csv_file): + with patch('pathlib.Path.is_file', return_value=True): + with patch('builtins.open', mock_open(read_data=mock_csv_file)): + data_column_name = "URL" + file_path = Path("tests/commands/csv_valid.csv") + result = Command.data_from_csv(file_path, data_column_name) + assert len(result) == 2 + assert result[0] == "http://example.com" + assert result[1] == "http://example2.com" + +def test_data_from_csv_file_not_found(): + with patch('pathlib.Path.is_file', return_value=False): + file_path = Path("/fake/path/not_found.csv") + with pytest.raises(FileNotFoundError): + Command.data_from_csv(file_path, "URL") + +def test_data_from_csv_column_not_found(mock_csv_file): + with patch('pathlib.Path.is_file', return_value=True): + with patch('builtins.open', mock_open(read_data=mock_csv_file)): + file_path = Path("tests/commands/csv_column_not_found.csv") + with pytest.raises(Exception) as exc_info: + Command.data_from_csv(file_path, "NonExistentColumn") + assert "Column NonExistentColumn not found on tests/commands/csv_column_not_found.csv" in str(exc_info.value), "Exception message should contain column not found error" + + +@pytest.fixture +def sample_data(): + return [ + {"id": "123", "name": "Channel One"}, + {"id": "456", "name": "Channel Two"} + ] + +def test_data_to_csv_with_output_file_path(tmp_path, sample_data): + output_file_path = tmp_path / "output.csv" + + result_path = Command.data_to_csv(sample_data, str(output_file_path)) + + assert result_path == str(output_file_path), "The returned path should match the provided output file path" + assert output_file_path.exists(), "The output file should exist" + with output_file_path.open('r') as f: + reader = csv.DictReader(f) + rows = list(reader) + assert len(rows) == 2, "There should be two rows in the output CSV" + assert rows[0]["id"] == "123" and rows[1]["id"] == "456", "The IDs should match the sample data" + +def test_data_to_csv_without_output_file_path(sample_data): + csv_content = Command.data_to_csv(sample_data) + + assert "id,name" in csv_content + assert "123,Channel One" in csv_content + assert "456,Channel Two" in csv_content + +def test_data_to_csv_output(tmp_path): + output_file_path = tmp_path / "output.csv" + + data = [ + {"id": 1, "name": "Test1"}, + {"id": 2, "name": "Test2"} + ] + + expected_output = "id,name\n1,Test1\n2,Test2\n" + result = Command.data_to_csv(data, str(output_file_path)) + assert Path(output_file_path).is_file() + assert expected_output == Path(output_file_path).read_text() From 216e5f2da8753ca2c00e61d00092e4baeb0e060e Mon Sep 17 00:00:00 2001 From: aninhasalesp Date: Tue, 2 Jul 2024 00:59:45 -0300 Subject: [PATCH 20/30] Add test for channel_id command --- tests/commands/test_channel_id.py | 55 +++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 tests/commands/test_channel_id.py diff --git a/tests/commands/test_channel_id.py b/tests/commands/test_channel_id.py new file mode 100644 index 0000000..56035ee --- /dev/null +++ b/tests/commands/test_channel_id.py @@ -0,0 +1,55 @@ +import csv +import pytest + +from io import StringIO + +from unittest.mock import patch, call +from youtool.commands.channel_id import ChannelId + +@pytest.fixture +def csv_file(tmp_path): + csv_content = "channel_url\nhttps://www.youtube.com/@Turicas/featured\n" + csv_file = tmp_path / "urls.csv" + csv_file.write_text(csv_content) + return csv_file + +@pytest.fixture +def youtube_api_mock(): + with patch("youtool.commands.channel_id.YouTube") as mock: + mock.return_value.channel_id_from_url.side_effect = lambda url: f"channel-{url}" + yield mock + +def test_channels_ids_csv_preparation(youtube_api_mock): + urls = ["https://www.youtube.com/@Turicas/featured", "https://www.youtube.com/c/PythonicCaf%C3%A9"] + api_key = "test_api_key" + id_column_name = "custom_id_column" + expected_result_data = [ + {id_column_name: "channel-https://www.youtube.com/@Turicas/featured"}, + {id_column_name: "channel-https://www.youtube.com/c/PythonicCaf%C3%A9"} + ] + with StringIO() as csv_file: + writer = csv.DictWriter(csv_file, fieldnames=[id_column_name]) + writer.writeheader() + writer.writerows(expected_result_data) + expected_result_csv = csv_file.getvalue() + + result = ChannelId.execute(urls=urls, api_key=api_key, id_column_name=id_column_name) + + youtube_api_mock.return_value.channel_id_from_url.assert_has_calls([call(url) for url in urls], any_order=True) + assert result == expected_result_csv + + +def test_resolve_urls_with_direct_urls(): + # Tests whether the function returns the directly given list of URLs. + urls = ["https://www.youtube.com/@Turicas/featured"] + result = ChannelId.resolve_urls(urls, None, None) + assert result == urls + +def test_resolve_urls_with_file_path(csv_file): + result = ChannelId.resolve_urls(None, csv_file, "channel_url") + assert result == ["https://www.youtube.com/@Turicas/featured"] + +def test_resolve_urls_raises_exception(): + # Tests whether the function throws an exception when neither urls nor urls_file_path are provided. + with pytest.raises(Exception, match="Either 'username' or 'url' must be provided for the channel-id command"): + ChannelId.resolve_urls(None, None, None) From 1b335b7f184de9c0c6b2678d050072de4c6d5d95 Mon Sep 17 00:00:00 2001 From: aninhasalesp Date: Fri, 5 Jul 2024 15:26:20 -0300 Subject: [PATCH 21/30] add docstrings --- tests/commands/test_base.py | 47 +++++++++++++++++++++++++++++++ tests/commands/test_channel_id.py | 29 +++++++++++++++++-- tests/test_cli.py | 5 ++++ 3 files changed, 79 insertions(+), 2 deletions(-) diff --git a/tests/commands/test_base.py b/tests/commands/test_base.py index 9d3ad90..e9265e8 100644 --- a/tests/commands/test_base.py +++ b/tests/commands/test_base.py @@ -21,11 +21,17 @@ def execute(cls, **kwargs): @pytest.fixture def subparsers(): + """Fixture to create subparsers for argument parsing.""" parser = argparse.ArgumentParser() return parser.add_subparsers() def test_generate_parser(subparsers): + """Test to verify the parser generation. + + This test checks if the `generate_parser` method correctly generates a parser + for the command and sets the appropriate properties + """ parser = TestCommand.generate_parser(subparsers) assert parser is not None, "Parser should not be None" @@ -34,6 +40,11 @@ def test_generate_parser(subparsers): def test_parse_arguments(subparsers): + """Test to verify argument parsing. + + This test checks if the `parse_arguments` method correctly adds the command's + arguments to the parser and sets the default function to the command's execute method. + """ subparsers_mock = MagicMock(spec=subparsers) TestCommand.parse_arguments(subparsers_mock) @@ -45,6 +56,11 @@ def test_parse_arguments(subparsers): def test_command(): + """Test to verify that the `execute` method is implemented. + + This test ensures that if a command does not implement the `execute` method, + a `NotImplementedError` is raised. + """ class MyCommand(Command): pass @@ -54,6 +70,7 @@ class MyCommand(Command): @pytest.fixture def mock_csv_file(): + """Fixture to provide mock CSV content for tests.""" csv_content = """URL http://example.com @@ -62,6 +79,14 @@ def mock_csv_file(): return csv_content def test_data_from_csv_valid(mock_csv_file): + """Test to verify reading data from a valid CSV file. + + This test checks if the `data_from_csv` method correctly reads data from a valid CSV file + and returns the expected list of URLs. + + Args: + mock_csv_file (str): The mock CSV file content. + """ with patch('pathlib.Path.is_file', return_value=True): with patch('builtins.open', mock_open(read_data=mock_csv_file)): data_column_name = "URL" @@ -72,6 +97,11 @@ def test_data_from_csv_valid(mock_csv_file): assert result[1] == "http://example2.com" def test_data_from_csv_file_not_found(): + """Test to verify behavior when the specified column is not found in the CSV file. + + This test checks if the `data_from_csv` method raises an exception when the specified + column does not exist in the CSV file. + """ with patch('pathlib.Path.is_file', return_value=False): file_path = Path("/fake/path/not_found.csv") with pytest.raises(FileNotFoundError): @@ -88,12 +118,18 @@ def test_data_from_csv_column_not_found(mock_csv_file): @pytest.fixture def sample_data(): + """Fixture to provide sample data for tests.""" return [ {"id": "123", "name": "Channel One"}, {"id": "456", "name": "Channel Two"} ] def test_data_to_csv_with_output_file_path(tmp_path, sample_data): + """Test to verify writing data to a CSV file with an output file path specified. + + This test checks if the `data_to_csv` method correctly writes the sample data to + a CSV file when an output file path is provided. + """ output_file_path = tmp_path / "output.csv" result_path = Command.data_to_csv(sample_data, str(output_file_path)) @@ -107,6 +143,11 @@ def test_data_to_csv_with_output_file_path(tmp_path, sample_data): assert rows[0]["id"] == "123" and rows[1]["id"] == "456", "The IDs should match the sample data" def test_data_to_csv_without_output_file_path(sample_data): + """Test to verify writing data to a CSV format without an output file path specified. + + This test checks if the `data_to_csv` method correctly returns the CSV content + as a string when no output file path is provided. + """ csv_content = Command.data_to_csv(sample_data) assert "id,name" in csv_content @@ -114,6 +155,12 @@ def test_data_to_csv_without_output_file_path(sample_data): assert "456,Channel Two" in csv_content def test_data_to_csv_output(tmp_path): + """ + Test to verify the content of the output CSV file. + + This test checks if the `data_to_csv` method writes the expected content + to the output CSV file. + """ output_file_path = tmp_path / "output.csv" data = [ diff --git a/tests/commands/test_channel_id.py b/tests/commands/test_channel_id.py index 56035ee..04400ef 100644 --- a/tests/commands/test_channel_id.py +++ b/tests/commands/test_channel_id.py @@ -8,6 +8,8 @@ @pytest.fixture def csv_file(tmp_path): + """Fixture to create a temporary CSV file with a single YouTube channel URL.""" + csv_content = "channel_url\nhttps://www.youtube.com/@Turicas/featured\n" csv_file = tmp_path / "urls.csv" csv_file.write_text(csv_content) @@ -15,11 +17,21 @@ def csv_file(tmp_path): @pytest.fixture def youtube_api_mock(): + """Fixture to mock the YouTube API. + + This fixture mocks the `YouTube` class and its `channel_id_from_url` method + to return a channel ID based on the URL. + """ with patch("youtool.commands.channel_id.YouTube") as mock: mock.return_value.channel_id_from_url.side_effect = lambda url: f"channel-{url}" yield mock def test_channels_ids_csv_preparation(youtube_api_mock): + """Fixture to mock the YouTube API. + + This fixture mocks the `YouTube` class and its `channel_id_from_url` method + to return a channel ID based on the URL. + """ urls = ["https://www.youtube.com/@Turicas/featured", "https://www.youtube.com/c/PythonicCaf%C3%A9"] api_key = "test_api_key" id_column_name = "custom_id_column" @@ -40,16 +52,29 @@ def test_channels_ids_csv_preparation(youtube_api_mock): def test_resolve_urls_with_direct_urls(): - # Tests whether the function returns the directly given list of URLs. + """Test to verify resolving URLs when provided directly. + + This test checks if the `resolve_urls` method of the `ChannelId` class correctly + returns the given list of URLs when provided directly. + """ urls = ["https://www.youtube.com/@Turicas/featured"] result = ChannelId.resolve_urls(urls, None, None) assert result == urls def test_resolve_urls_with_file_path(csv_file): + """Test to verify resolving URLs from a CSV file. + + This test checks if the `resolve_urls` method of the `ChannelId` class correctly + reads URLs from a given CSV file. + """ result = ChannelId.resolve_urls(None, csv_file, "channel_url") assert result == ["https://www.youtube.com/@Turicas/featured"] def test_resolve_urls_raises_exception(): - # Tests whether the function throws an exception when neither urls nor urls_file_path are provided. + """Test to verify exception raising when no URLs are provided. + + This test checks if the `resolve_urls` method of the `ChannelId` class raises an exception + when neither direct URLs nor a file path are provided. + """ with pytest.raises(Exception, match="Either 'username' or 'url' must be provided for the channel-id command"): ChannelId.resolve_urls(None, None, None) diff --git a/tests/test_cli.py b/tests/test_cli.py index 3a489ee..9165041 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -11,6 +11,11 @@ "command", COMMANDS ) def test_missing_api_key(monkeypatch: pytest.MonkeyPatch, command: Command): + """Test to verify behavior when the YouTube API key is missing. + + This test ensures that when the YouTube API key is not set, running any command + from the youtool CLI results in an appropriate error message and exit code. + """ monkeypatch.delenv('YOUTUBE_API_KEY', raising=False) cli_path = "youtool/cli.py" command = ["python", cli_path, command.name] From c5ad8fd7302c85af58972e850f297de202f2ff6a Mon Sep 17 00:00:00 2001 From: aninhasalesp Date: Tue, 25 Jun 2024 14:45:38 -0300 Subject: [PATCH 22/30] - Implement ChannelInfo class to fetch YouTube channel information from URLs, usernames, or CSV files containing them; - Add method to filter channel information based on specified columns; - Define method to handle the command logic, including reading input, fetching channel data, and saving to CSV; - Support for various input methods including direct URLs/usernames and file paths for CSV input; - Support for specifying output CSV file path and columns to include in the output. --- youtool/commands/channel_info.py | 120 +++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) create mode 100644 youtool/commands/channel_info.py diff --git a/youtool/commands/channel_info.py b/youtool/commands/channel_info.py new file mode 100644 index 0000000..493ef82 --- /dev/null +++ b/youtool/commands/channel_info.py @@ -0,0 +1,120 @@ +import csv + +from typing import List, Dict, Optional, Self + +from youtool import YouTube + +from .base import Command + + +class ChannelInfo(Command): + """ + Get channel info from a list of IDs (or CSV filename with IDs inside), generate CSV output + (same schema for `channel` dicts) + """ + name = "channel-info" + arguments = [ + {"name": "--urls", "type": str, "help": "Channel URLs", "nargs": "*"}, + {"name": "--usernames", "type": str, "help": "Channel usernames", "nargs": "*"}, + {"name": "--ids", "type": str, "help": "Channel IDs", "nargs": "*"}, + {"name": "--urls-file-path", "type": str, "help": "Channel URLs CSV file path"}, + {"name": "--usernames-file-path", "type": str, "help": "Channel usernames CSV file path"}, + {"name": "--ids-file-path", "type": str, "help": "Channel IDs CSV file path"}, + {"name": "--output-file-path", "type": str, "help": "Output CSV file path"}, + {"name": "--url-column-name", "type": str, "help": "URL column name on CSV input files"}, + {"name": "--username-column-name", "type": str, "help": "Username column name on CSV input files"}, + {"name": "--id-column-name", "type": str, "help": "ID column name on CSV input files"}, + ] + + URL_COLUMN_NAME: str = "channel_url" + USERNAME_COLUMN_NAME: str = "channel_username" + ID_COLUMN_NAME: str = "channel_id" + INFO_COLUMNS: List[str] = [ + "id", "title", "description", "published_at", "view_count", "subscriber_count", "video_count" + ] + + @staticmethod + def filter_fields(channel_info: Dict, info_columns: Optional[List] = None): + """ + Filters the fields of a dictionary containing channel information based on + specified columns. + + Args: + channel_info (Dict): A dictionary containing channel information. + info_columns (Optional[List], optional): A list specifying which fields + to include in the filtered output. If None, returns the entire + channel_info dictionary. Defaults to None. + + Returns: + Dict: A dictionary containing only the fields specified in info_columns + (if provided) or the entire channel_info dictionary if info_columns is None. + """ + return { + field: value for field, value in channel_info.items() if field in info_columns + } if info_columns else channel_info + + @classmethod + def execute(cls: Self, **kwargs) -> str: + """ + Execute the channel-info command to fetch YouTube channel information from URLs or usernames and save them to a CSV file. + + Args: + urls (list[str], optional): A list of YouTube channel URLs. If not provided, `urls_file_path` must be specified. + usernames (list[str], optional): A list of YouTube channel usernames. If not provided, `usernames_file_path` must be specified. + urls_file_path (str, optional): Path to a CSV file containing YouTube channel URLs. + usernames_file_path (str, optional): Path to a CSV file containing YouTube channel usernames. + output_file_path (str, optional): Path to the output CSV file where channel information will be saved. + api_key (str): The API key to authenticate with the YouTube Data API. + url_column_name (str, optional): The name of the column in the `urls_file_path` CSV file that contains the URLs. + Default is "channel_url". + username_column_name (str, optional): The name of the column in the `usernames_file_path` CSV file that contains the usernames. + Default is "channel_username". + info_columns (str, optional): Comma-separated list of columns to include in the output CSV. Default is the class attribute `INFO_COLUMNS`. + + Returns: + str: A message indicating the result of the command. If `output_file_path` is specified, the message will + include the path to the generated CSV file. Otherwise, it will return the result as a string. + + Raises: + Exception: If neither `urls`, `usernames`, `urls_file_path` nor `usernames_file_path` is provided. + """ + urls = kwargs.get("urls") + usernames = kwargs.get("usernames") + urls_file_path = kwargs.get("urls_file_path") + usernames_file_path = kwargs.get("usernames_file_path") + output_file_path = kwargs.get("output_file_path") + api_key = kwargs.get("api_key") + + url_column_name = kwargs.get("url_column_name") + username_column_name = kwargs.get("username_column_name") + info_columns = kwargs.get("info_columns") + + info_columns = [ + column.strip() for column in info_columns.split(",") + ] if info_columns else ChannelInfo.INFO_COLUMNS + + if urls_file_path and not urls: + urls = ChannelInfo.data_from_file(urls_file_path, url_column_name) + if usernames_file_path and not usernames: + usernames = ChannelInfo.data_from_file(usernames_file_path, username_column_name) + + if not urls and not usernames: + raise Exception("Either 'urls' or 'usernames' must be provided for the channel-info command") + + youtube = YouTube([api_key], disable_ipv6=True) + + channels_ids = [ + youtube.channel_id_from_url(url) for url in (urls or []) if url + ] + [ + youtube.channel_id_from_username(username) for username in (usernames or []) if username + ] + channel_ids = [channel_id for channel_id in channels_ids if channel_id] + + return cls.data_to_csv( + data=[ + ChannelInfo.filter_fields( + channel_info, info_columns + ) for channel_info in (youtube.channels_infos(channel_ids) or []) + ], + output_file_path=output_file_path + ) \ No newline at end of file From e718d4a1acc2482395ede78a16353a5a32138def Mon Sep 17 00:00:00 2001 From: aninhasalesp Date: Tue, 25 Jun 2024 14:47:42 -0300 Subject: [PATCH 23/30] - Included ChannelInfo in the list of commands in COMMANDS. --- youtool/commands/__init__.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/youtool/commands/__init__.py b/youtool/commands/__init__.py index 89bbc09..1939a22 100644 --- a/youtool/commands/__init__.py +++ b/youtool/commands/__init__.py @@ -1,10 +1,12 @@ from .base import Command from .channel_id import ChannelId +from .channel_info import ChannelInfo COMMANDS = [ - ChannelId + ChannelId, + ChannelInfo ] __all__ = [ - "Command", "COMMANDS", "ChannelId", + COMMANDS, ChannelId, ChannelInfo ] From 7dc7b8d297122045191ccc7d94d90170f15518bf Mon Sep 17 00:00:00 2001 From: aninhasalesp Date: Thu, 27 Jun 2024 22:21:05 -0300 Subject: [PATCH 24/30] Add updates docstrings --- youtool/commands/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/youtool/commands/base.py b/youtool/commands/base.py index 077c826..275c282 100644 --- a/youtool/commands/base.py +++ b/youtool/commands/base.py @@ -44,7 +44,7 @@ def parse_arguments(cls, subparsers: argparse._SubParsersAction) -> None: parser.set_defaults(func=cls.execute) @classmethod - def execute(cls, **kwargs) -> str: # noqa: D417 + def execute(cls, **kwargs) -> str: """Executes the command. This method should be overridden by subclasses to define the command's behavior. From ed012e55368eed19b93f534bd85b72726b44248b Mon Sep 17 00:00:00 2001 From: aninhasalesp Date: Thu, 27 Jun 2024 22:22:26 -0300 Subject: [PATCH 25/30] Add updates docstrings --- youtool/commands/channel_id.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/youtool/commands/channel_id.py b/youtool/commands/channel_id.py index d42f311..c599982 100644 --- a/youtool/commands/channel_id.py +++ b/youtool/commands/channel_id.py @@ -21,7 +21,7 @@ class ChannelId(Command): CHANNEL_ID_COLUMN_NAME: str = "channel_id" @classmethod - def execute(cls, **kwargs) -> str: # noqa: D417 + def execute(cls, **kwargs) -> str: """Execute the channel-id command to fetch YouTube channel IDs from URLs and save them to a CSV file. This method retrieves YouTube channel IDs from a list of provided URLs or from a file containing URLs. From 9a5fe66e52486d3fa7840cfd1b7f98d4a79cf5ee Mon Sep 17 00:00:00 2001 From: aninhasalesp Date: Thu, 27 Jun 2024 22:51:39 -0300 Subject: [PATCH 26/30] - Add updates --- youtool/commands/channel_id.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/youtool/commands/channel_id.py b/youtool/commands/channel_id.py index c599982..d42f311 100644 --- a/youtool/commands/channel_id.py +++ b/youtool/commands/channel_id.py @@ -21,7 +21,7 @@ class ChannelId(Command): CHANNEL_ID_COLUMN_NAME: str = "channel_id" @classmethod - def execute(cls, **kwargs) -> str: + def execute(cls, **kwargs) -> str: # noqa: D417 """Execute the channel-id command to fetch YouTube channel IDs from URLs and save them to a CSV file. This method retrieves YouTube channel IDs from a list of provided URLs or from a file containing URLs. From 8ba47cf3a9a20f0544964d29db64607287e260fe Mon Sep 17 00:00:00 2001 From: aninhasalesp Date: Thu, 27 Jun 2024 22:53:51 -0300 Subject: [PATCH 27/30] - Add updates --- youtool/commands/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/youtool/commands/base.py b/youtool/commands/base.py index 275c282..077c826 100644 --- a/youtool/commands/base.py +++ b/youtool/commands/base.py @@ -44,7 +44,7 @@ def parse_arguments(cls, subparsers: argparse._SubParsersAction) -> None: parser.set_defaults(func=cls.execute) @classmethod - def execute(cls, **kwargs) -> str: + def execute(cls, **kwargs) -> str: # noqa: D417 """Executes the command. This method should be overridden by subclasses to define the command's behavior. From c08e4ecf7090c1e458ff2238578a8195e450d725 Mon Sep 17 00:00:00 2001 From: aninhasalesp Date: Thu, 4 Jul 2024 13:57:55 -0300 Subject: [PATCH 28/30] - Add test for channel_info command; - Add update channel_info file; - fix test_base --- tests/commands/test_base.py | 16 ++++----- tests/commands/test_channel_info.py | 53 +++++++++++++++++++++++++++++ youtool/commands/channel_info.py | 6 ++-- 3 files changed, 64 insertions(+), 11 deletions(-) create mode 100644 tests/commands/test_channel_info.py diff --git a/tests/commands/test_base.py b/tests/commands/test_base.py index e9265e8..e15c787 100644 --- a/tests/commands/test_base.py +++ b/tests/commands/test_base.py @@ -2,8 +2,6 @@ import argparse import pytest -from io import StringIO -from datetime import datetime from pathlib import Path from unittest.mock import MagicMock, patch, mock_open from youtool.commands import Command @@ -90,7 +88,7 @@ def test_data_from_csv_valid(mock_csv_file): with patch('pathlib.Path.is_file', return_value=True): with patch('builtins.open', mock_open(read_data=mock_csv_file)): data_column_name = "URL" - file_path = Path("tests/commands/csv_valid.csv") + file_path = Path("tests/resources/csv_valid.csv") result = Command.data_from_csv(file_path, data_column_name) assert len(result) == 2 assert result[0] == "http://example.com" @@ -110,10 +108,10 @@ def test_data_from_csv_file_not_found(): def test_data_from_csv_column_not_found(mock_csv_file): with patch('pathlib.Path.is_file', return_value=True): with patch('builtins.open', mock_open(read_data=mock_csv_file)): - file_path = Path("tests/commands/csv_column_not_found.csv") + file_path = Path("tests/resources/csv_column_not_found.csv") with pytest.raises(Exception) as exc_info: Command.data_from_csv(file_path, "NonExistentColumn") - assert "Column NonExistentColumn not found on tests/commands/csv_column_not_found.csv" in str(exc_info.value), "Exception message should contain column not found error" + assert "Column NonExistentColumn not found on tests/resources/csv_column_not_found.csv" in str(exc_info.value) @pytest.fixture @@ -134,13 +132,13 @@ def test_data_to_csv_with_output_file_path(tmp_path, sample_data): result_path = Command.data_to_csv(sample_data, str(output_file_path)) - assert result_path == str(output_file_path), "The returned path should match the provided output file path" - assert output_file_path.exists(), "The output file should exist" + assert result_path == str(output_file_path) + assert output_file_path.exists() with output_file_path.open('r') as f: reader = csv.DictReader(f) rows = list(reader) - assert len(rows) == 2, "There should be two rows in the output CSV" - assert rows[0]["id"] == "123" and rows[1]["id"] == "456", "The IDs should match the sample data" + assert len(rows) == 2 + assert rows[0]["id"] == "123" and rows[1]["id"] == "456" def test_data_to_csv_without_output_file_path(sample_data): """Test to verify writing data to a CSV format without an output file path specified. diff --git a/tests/commands/test_channel_info.py b/tests/commands/test_channel_info.py new file mode 100644 index 0000000..06b3a66 --- /dev/null +++ b/tests/commands/test_channel_info.py @@ -0,0 +1,53 @@ +import pytest + +from unittest.mock import patch, Mock, call + +from youtool.commands.channel_info import ChannelInfo, YouTube + + +def test_filter_fields(): + channel_info = { + 'channel_id': '123456', + 'channel_name': 'Test Channel', + 'subscribers': 1000, + 'videos': 50, + 'category': 'Tech' + } + + info_columns = ['channel_id', 'channel_name', 'subscribers'] + filtered_info = ChannelInfo.filter_fields(channel_info, info_columns) + + expected_result = { + 'channel_id': '123456', + 'channel_name': 'Test Channel', + 'subscribers': 1000 + } + + assert filtered_info == expected_result, f"Expected {expected_result}, but got {filtered_info}" + + +def test_channel_ids_from_urls_and_usernames(mocker): + urls = ["https://www.youtube.com/@Turicas/featured", "https://www.youtube.com/c/PythonicCaf%C3%A9"] + usernames = ["Turicas", "PythonicCafe"] + + ids_from_urls_mock = "id_from_url" + ids_from_usernames_mock = "id_from_username" + youtube_mock = mocker.patch("youtool.commands.channel_info.YouTube") + + channel_id_from_url_mock = Mock(return_value=ids_from_urls_mock) + channel_id_from_username_mock = Mock(return_value=ids_from_usernames_mock) + channels_infos_mock = Mock(return_value=[]) + + youtube_mock.return_value.channel_id_from_url = channel_id_from_url_mock + youtube_mock.return_value.channel_id_from_username = channel_id_from_username_mock + youtube_mock.return_value.channels_infos = channels_infos_mock + + ChannelInfo.execute(urls=urls, usernames=usernames) + + channel_id_from_url_mock.assert_has_calls( + [call(url) for url in urls] + ) + channel_id_from_username_mock.assert_has_calls( + [call(username) for username in usernames] + ) + channels_infos_mock.assert_called_once_with([ids_from_urls_mock, ids_from_usernames_mock]) diff --git a/youtool/commands/channel_info.py b/youtool/commands/channel_info.py index 493ef82..fb0944e 100644 --- a/youtool/commands/channel_info.py +++ b/youtool/commands/channel_info.py @@ -108,7 +108,9 @@ def execute(cls: Self, **kwargs) -> str: ] + [ youtube.channel_id_from_username(username) for username in (usernames or []) if username ] - channel_ids = [channel_id for channel_id in channels_ids if channel_id] + channel_ids = list( + set([channel_id for channel_id in channels_ids if channel_id]) + ) return cls.data_to_csv( data=[ @@ -117,4 +119,4 @@ def execute(cls: Self, **kwargs) -> str: ) for channel_info in (youtube.channels_infos(channel_ids) or []) ], output_file_path=output_file_path - ) \ No newline at end of file + ) From a5bb13d54c0aa8474126fe923c026bb6ab268974 Mon Sep 17 00:00:00 2001 From: aninhasalesp Date: Fri, 5 Jul 2024 15:34:28 -0300 Subject: [PATCH 29/30] add docstrings --- tests/commands/test_channel_info.py | 16 ++++++++++++---- youtool/commands/channel_info.py | 14 +++++++------- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/tests/commands/test_channel_info.py b/tests/commands/test_channel_info.py index 06b3a66..5e6ef33 100644 --- a/tests/commands/test_channel_info.py +++ b/tests/commands/test_channel_info.py @@ -1,11 +1,14 @@ -import pytest +from unittest.mock import Mock, call -from unittest.mock import patch, Mock, call - -from youtool.commands.channel_info import ChannelInfo, YouTube +from youtool.commands.channel_info import ChannelInfo def test_filter_fields(): + """Test to verify the filtering of channel information fields. + + This test checks if the `filter_fields` method of the `ChannelInfo` class correctly + filters out unwanted fields from the channel information dictionary based on the provided columns. + """ channel_info = { 'channel_id': '123456', 'channel_name': 'Test Channel', @@ -27,6 +30,11 @@ def test_filter_fields(): def test_channel_ids_from_urls_and_usernames(mocker): + """Test to verify fetching channel IDs from both URLs and usernames. + + This test checks if the `execute` method of the `ChannelInfo` class correctly fetches channel IDs + from a list of URLs and usernames, and then calls the `channels_infos` method with these IDs. + """ urls = ["https://www.youtube.com/@Turicas/featured", "https://www.youtube.com/c/PythonicCaf%C3%A9"] usernames = ["Turicas", "PythonicCafe"] diff --git a/youtool/commands/channel_info.py b/youtool/commands/channel_info.py index fb0944e..09103af 100644 --- a/youtool/commands/channel_info.py +++ b/youtool/commands/channel_info.py @@ -8,8 +8,7 @@ class ChannelInfo(Command): - """ - Get channel info from a list of IDs (or CSV filename with IDs inside), generate CSV output + """Get channel info from a list of IDs (or CSV filename with IDs inside), generate CSV output (same schema for `channel` dicts) """ name = "channel-info" @@ -35,8 +34,7 @@ class ChannelInfo(Command): @staticmethod def filter_fields(channel_info: Dict, info_columns: Optional[List] = None): - """ - Filters the fields of a dictionary containing channel information based on + """Filters the fields of a dictionary containing channel information based on specified columns. Args: @@ -55,8 +53,8 @@ def filter_fields(channel_info: Dict, info_columns: Optional[List] = None): @classmethod def execute(cls: Self, **kwargs) -> str: - """ - Execute the channel-info command to fetch YouTube channel information from URLs or usernames and save them to a CSV file. + """Execute the channel-info command to fetch YouTube channel information from URLs or + usernames and save them to a CSV file. Args: urls (list[str], optional): A list of YouTube channel URLs. If not provided, `urls_file_path` must be specified. @@ -69,7 +67,8 @@ def execute(cls: Self, **kwargs) -> str: Default is "channel_url". username_column_name (str, optional): The name of the column in the `usernames_file_path` CSV file that contains the usernames. Default is "channel_username". - info_columns (str, optional): Comma-separated list of columns to include in the output CSV. Default is the class attribute `INFO_COLUMNS`. + info_columns (str, optional): Comma-separated list of columns to include in the output CSV. + Default is the class attribute `INFO_COLUMNS`. Returns: str: A message indicating the result of the command. If `output_file_path` is specified, the message will @@ -78,6 +77,7 @@ def execute(cls: Self, **kwargs) -> str: Raises: Exception: If neither `urls`, `usernames`, `urls_file_path` nor `usernames_file_path` is provided. """ + urls = kwargs.get("urls") usernames = kwargs.get("usernames") urls_file_path = kwargs.get("urls_file_path") From 923170e226881b2e4dd570fb74e9d3023f5cb344 Mon Sep 17 00:00:00 2001 From: aninhasalesp Date: Fri, 5 Jul 2024 23:16:33 -0300 Subject: [PATCH 30/30] fix --- tests/commands/test_channel_info.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/commands/test_channel_info.py b/tests/commands/test_channel_info.py index 5e6ef33..67f5c3d 100644 --- a/tests/commands/test_channel_info.py +++ b/tests/commands/test_channel_info.py @@ -1,3 +1,5 @@ +import pytest + from unittest.mock import Mock, call from youtool.commands.channel_info import ChannelInfo