From 49fbc15e98014aab3d71e961e54749aa53aa6bd5 Mon Sep 17 00:00:00 2001 From: Adrien Barbaresi Date: Fri, 16 Aug 2024 18:52:27 +0200 Subject: [PATCH] CLI: review code, add types and tests (#677) * CLI: review code and add types * fix download tests * lint and add test * typing and formatting * add tests * add verbosity test * redo test --- tests/cli_tests.py | 19 ++- tests/downloads_tests.py | 4 +- trafilatura/cli.py | 28 ++-- trafilatura/cli_utils.py | 347 +++++++++++++++++++++++---------------- trafilatura/downloads.py | 2 +- trafilatura/settings.py | 6 +- 6 files changed, 242 insertions(+), 164 deletions(-) diff --git a/tests/cli_tests.py b/tests/cli_tests.py index e875a628..432078ef 100644 --- a/tests/cli_tests.py +++ b/tests/cli_tests.py @@ -12,20 +12,22 @@ from contextlib import redirect_stdout from datetime import datetime from os import path +from tempfile import gettempdir from unittest.mock import patch import pytest from courlan import UrlStore -from trafilatura import cli, cli_utils, spider # settings +from trafilatura import cli, cli_utils, spider, settings from trafilatura.downloads import add_to_compressed_dict, fetch_url -from trafilatura.settings import args_to_extractor from trafilatura.utils import LANGID_FLAG logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) RESOURCES_DIR = path.join(path.abspath(path.dirname(__file__)), "resources") +settings.MAX_FILES_PER_DIRECTORY = 1 + def test_parser(): """test argument parsing for the command-line interface""" @@ -84,6 +86,7 @@ def test_parser(): "--url-filter", "test1", "test2", + "-vvv", ] with patch.object(sys, "argv", testargs): args = cli.parse_args(testargs) @@ -181,6 +184,7 @@ def test_input_type(): with open(testfile, "rb") as f: teststring = f.read(1024) assert cli.examine(teststring, args) is None + assert cli.examine([1, 2, 3], args) is None testfile = "docs/usage.rst" with open(testfile, "r", encoding="utf-8") as f: teststring = f.read() @@ -232,10 +236,13 @@ def test_sysoutput(): args = cli.parse_args(testargs) result = "DADIDA" cli_utils.write_result(result, args) + args.output_dir = gettempdir() + args.backup_dir = None + cli_utils.write_result(result, args) # process with backup directory and no counter - options = args_to_extractor(args) + options = settings.args_to_extractor(args) assert options.format == "markdown" and options.formatting is True - assert cli_utils.process_result("DADIDA", args, None, options) is None + assert cli_utils.process_result("DADIDA", args, -1, options) == -1 # test keeping dir structure testargs = ["", "-i", "myinputdir/", "-o", "test/", "--keep-dirs"] with patch.object(sys, "argv", testargs): @@ -404,7 +411,7 @@ def test_file_processing(): # test manually for f in cli_utils.generate_filelist(args.input_dir): cli_utils.file_processing(f, args) - options = args_to_extractor(args) + options = settings.args_to_extractor(args) args.output_dir = "/dev/null" for f in cli_utils.generate_filelist(args.input_dir): cli_utils.file_processing(f, args, options=options) @@ -420,7 +427,7 @@ def test_cli_config_file(): ) as f: teststring = f.read() args.config_file = path.join(RESOURCES_DIR, args.config_file) - options = args_to_extractor(args) + options = settings.args_to_extractor(args) assert cli.examine(teststring, args, options=options) is None diff --git a/tests/downloads_tests.py b/tests/downloads_tests.py index 10b4ec4b..a443e922 100644 --- a/tests/downloads_tests.py +++ b/tests/downloads_tests.py @@ -233,8 +233,8 @@ def test_queue(): args.config_file = os.path.join(RESOURCES_DIR, 'newsettings.cfg') options = args_to_extractor(args) options.config['DEFAULT']['SLEEP_TIME'] = '0.2' - results = download_queue_processing(url_store, args, None, options) - assert len(results[0]) == 5 and results[1] is None + results = download_queue_processing(url_store, args, -1, options) + assert len(results[0]) == 5 and results[1] is -1 if __name__ == '__main__': diff --git a/trafilatura/cli.py b/trafilatura/cli.py index 7fe0fa2b..6c1e905f 100644 --- a/trafilatura/cli.py +++ b/trafilatura/cli.py @@ -12,6 +12,7 @@ from importlib_metadata import version from platform import python_version +from typing import Any from .cli_utils import (cli_crawler, cli_discovery, examine, file_processing_pipeline, load_blacklist, @@ -34,7 +35,7 @@ sys.stderr = codecs.getwriter('utf-8')(sys.stderr.buffer, 'strict') -def add_args(parser): +def add_args(parser: Any) -> Any: "Add argument groups and arguments to parser." group1 = parser.add_argument_group('Input', 'URLs, files or directories to process') @@ -202,7 +203,7 @@ def add_args(parser): return parser -def parse_args(args): +def parse_args(args: Any) -> Any: """Define parser for command-line arguments""" parser = argparse.ArgumentParser(description='Command-line interface for Trafilatura') parser = add_args(parser) @@ -210,7 +211,7 @@ def parse_args(args): return map_args(parser.parse_args()) -def map_args(args): +def map_args(args: Any) -> Any: '''Map existing options to format and output choices.''' # formats for otype in ("csv", "html", "json", "markdown", "xml", "xmltei"): @@ -249,32 +250,28 @@ def map_args(args): return args -def main(): +def main() -> None: """ Run as a command-line utility. """ args = parse_args(sys.argv[1:]) process_args(args) -def process_args(args): +def process_args(args: Any) -> None: """Perform the actual processing according to the arguments""" - # init error_caught = False - # verbosity + if args.verbose == 1: logging.basicConfig(stream=sys.stdout, level=logging.WARNING) elif args.verbose >= 2: logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) + if args.blacklist: args.blacklist = load_blacklist(args.blacklist) # processing according to mutually exclusive options - # read url list from input file - if args.input_file and all([not args.crawl, not args.explore, not args.feed, not args.probe, not args.sitemap]): - url_store = load_input_dict(args) - error_caught = url_processing_pipeline(args, url_store) # fetch urls from a feed or a sitemap - elif args.explore or args.feed or args.sitemap: + if args.explore or args.feed or args.sitemap: cli_discovery(args) # activate crawler/spider @@ -289,6 +286,11 @@ def process_args(args): elif args.input_dir: file_processing_pipeline(args) + # read url list from input file + elif args.input_file: + url_store = load_input_dict(args) + error_caught = url_processing_pipeline(args, url_store) + # process input URL elif args.URL: url_store = load_input_dict(args) @@ -300,7 +302,7 @@ def process_args(args): write_result(result, args) # change exit code if there are errors - if error_caught is True: + if error_caught: sys.exit(1) diff --git a/trafilatura/cli_utils.py b/trafilatura/cli_utils.py index 93f186f8..01bef96a 100644 --- a/trafilatura/cli_utils.py +++ b/trafilatura/cli_utils.py @@ -15,6 +15,7 @@ from datetime import datetime from functools import partial from os import makedirs, path, stat, walk +from typing import Any, Generator, Optional, List, Set, Tuple from courlan import UrlStore, extract_domain, get_base_url # validate_url @@ -23,13 +24,23 @@ from .baseline import html2txt from .core import extract from .deduplication import generate_bow_hash -from .downloads import (add_to_compressed_dict, buffered_downloads, - load_download_buffer) +from .downloads import add_to_compressed_dict, buffered_downloads, load_download_buffer from .feeds import find_feed_urls from .meta import reset_caches -from .settings import FILENAME_LEN, MAX_FILES_PER_DIRECTORY, args_to_extractor +from .settings import ( + Extractor, + FILENAME_LEN, + MAX_FILES_PER_DIRECTORY, + args_to_extractor, +) from .sitemaps import sitemap_search -from .utils import LANGID_FLAG, URL_BLACKLIST_REGEX, is_acceptable_length, language_classifier, make_chunks +from .utils import ( + LANGID_FLAG, + URL_BLACKLIST_REGEX, + is_acceptable_length, + language_classifier, + make_chunks, +) LOGGER = logging.getLogger(__name__) @@ -37,32 +48,32 @@ random.seed(345) # make generated file names reproducible CHAR_CLASS = string.ascii_letters + string.digits -STRIP_DIR = re.compile(r'[^/]+$') -STRIP_EXTENSION = re.compile(r'\.[a-z]{2,5}$') +STRIP_DIR = re.compile(r"[^/]+$") +STRIP_EXTENSION = re.compile(r"\.[a-z]{2,5}$") CLEAN_XML = re.compile(r"<[^<]+?>") -INPUT_URLS_ARGS = ['URL', 'crawl', 'explore', 'probe', 'feed', 'sitemap'] +INPUT_URLS_ARGS = ["URL", "crawl", "explore", "probe", "feed", "sitemap"] EXTENSION_MAPPING = { - 'csv': '.csv', - 'json': '.json', - 'xml': '.xml', - 'xmltei': '.xml', + "csv": ".csv", + "json": ".json", + "xml": ".xml", + "xmltei": ".xml", } -def load_input_urls(args): - '''Read list of URLs to process or derive one from command-line arguments''' - input_urls = [] +def load_input_urls(args: Any) -> List[str]: + "Read list of URLs to process or derive one from command-line arguments." + input_urls: List[str] = [] if args.input_file: try: # optional: errors='strict', buffering=1 - with open(args.input_file, mode='r', encoding='utf-8') as inputfile: + with open(args.input_file, mode="r", encoding="utf-8") as inputfile: input_urls.extend(line.strip() for line in inputfile) except UnicodeDecodeError: - sys.exit('ERROR: system, file type or buffer encoding') + sys.exit("ERROR: system, file type or buffer encoding") else: for arg in INPUT_URLS_ARGS: if getattr(args, arg): @@ -70,23 +81,22 @@ def load_input_urls(args): break if not input_urls: - LOGGER.warning('No input provided') + LOGGER.warning("No input provided") # uniq URLs while preserving order (important) return list(dict.fromkeys(input_urls)) -def load_blacklist(filename): - '''Read list of unwanted URLs''' - with open(filename, 'r', encoding='utf-8') as inputfh: +def load_blacklist(filename: str) -> Set[str]: + "Read list of unwanted URLs." + with open(filename, "r", encoding="utf-8") as inputfh: # if validate_url(url)[0] is True: - blacklist = {URL_BLACKLIST_REGEX.sub('', line.strip()) for line in inputfh} + blacklist = {URL_BLACKLIST_REGEX.sub("", line.strip()) for line in inputfh} return blacklist -def load_input_dict(args): - '''Read input list of URLs to process and - build a domain-aware dictionary''' +def load_input_dict(args: Any) -> UrlStore: + "Read input list of URLs to process and build a domain-aware dictionary." inputlist = load_input_urls(args) # deduplicate, filter and convert to dict return add_to_compressed_dict( @@ -94,41 +104,40 @@ def load_input_dict(args): blacklist=args.blacklist, compression=(args.sitemap and not args.list), url_filter=args.url_filter, - verbose=args.verbose + verbose=args.verbose, ) -def check_outputdir_status(directory): - '''Check if the output directory is within reach and writable''' +def check_outputdir_status(directory: str) -> bool: + "Check if the output directory is within reach and writable." # check the directory status if not path.exists(directory) or not path.isdir(directory): try: makedirs(directory, exist_ok=True) except OSError: # maybe the directory has already been created - #sleep(0.25) - #if not path.exists(directory) or not path.isdir(directory): - sys.stderr.write('ERROR: Destination directory cannot be created: ' + directory + '\n') + # sleep(0.25) + # if not path.exists(directory) or not path.isdir(directory): + sys.stderr.write( + "ERROR: Destination directory cannot be created: " + directory + "\n" + ) # raise OSError() return False return True -def determine_counter_dir(dirname, counter): - '''Return a destination directory based on a file counter''' - if counter is not None: - counter_dir = str(int(counter/MAX_FILES_PER_DIRECTORY) + 1) - else: - counter_dir = '' - return path.join(dirname, counter_dir) +def determine_counter_dir(dirname: str, c: int) -> str: + "Return a destination directory based on a file counter." + c_dir = str(int(c / MAX_FILES_PER_DIRECTORY) + 1) if c >= 0 else "" + return path.join(dirname, c_dir) -def get_writable_path(destdir, extension): - '''Find a writable path and return it along with its random file name''' +def get_writable_path(destdir: str, extension: str) -> Tuple[str, str]: + "Find a writable path and return it along with its random file name." output_path = None while output_path is None or path.exists(output_path): # generate a random filename of the desired length - filename = ''.join(random.choice(CHAR_CLASS) for _ in range(FILENAME_LEN)) + filename = "".join(random.choice(CHAR_CLASS) for _ in range(FILENAME_LEN)) output_path = path.join(destdir, filename + extension) return output_path, filename @@ -136,22 +145,26 @@ def get_writable_path(destdir, extension): def generate_hash_filename(content: str) -> str: """Create a filename-safe string by hashing the given content after deleting potential XML tags.""" - return urlsafe_b64encode( - generate_bow_hash(CLEAN_XML.sub("", content), 12) - ).decode() + return urlsafe_b64encode(generate_bow_hash(CLEAN_XML.sub("", content), 12)).decode() -def determine_output_path(args, orig_filename, content, counter=None, new_filename=None): - '''Pick a directory based on selected options and a file name based on output type''' +def determine_output_path( + args: Any, + orig_filename: str, + content: str, + counter: int = -1, + new_filename: Optional[str] = None, +) -> Tuple[str, str]: + "Pick a directory based on selected options and a file name based on output type." # determine extension, TXT by default - extension = EXTENSION_MAPPING.get(args.output_format, '.txt') + extension = EXTENSION_MAPPING.get(args.output_format, ".txt") if args.keep_dirs: # strip directory - original_dir = STRIP_DIR.sub('', orig_filename) + original_dir = STRIP_DIR.sub("", orig_filename) destination_dir = path.join(args.output_dir, original_dir) # strip extension - filename = STRIP_EXTENSION.sub('', orig_filename) + filename = STRIP_EXTENSION.sub("", orig_filename) else: destination_dir = determine_counter_dir(args.output_dir, counter) # use cryptographic hash on file contents to define name @@ -161,87 +174,109 @@ def determine_output_path(args, orig_filename, content, counter=None, new_filena return output_path, destination_dir -def archive_html(htmlstring, args, counter=None): - '''Write a copy of raw HTML in backup directory''' +def archive_html(htmlstring: str, args: Any, counter: int = -1) -> str: + "Write a copy of raw HTML in backup directory." destination_directory = determine_counter_dir(args.backup_dir, counter) - output_path, filename = get_writable_path(destination_directory, '.html.gz') + output_path, filename = get_writable_path(destination_directory, ".html.gz") # check the directory status if check_outputdir_status(destination_directory) is True: # write - with gzip.open(output_path, 'wb') as outputfile: - outputfile.write(htmlstring.encode('utf-8')) + with gzip.open(output_path, "wb") as outputfile: + outputfile.write(htmlstring.encode("utf-8")) return filename -def write_result(result, args, orig_filename=None, counter=None, new_filename=None): - '''Deal with result (write to STDOUT or to file)''' +def write_result( + result: Optional[str], + args: Any, + orig_filename: str = "", + counter: int = -1, + new_filename: Optional[str] = None, +) -> None: + """Deal with result (write to STDOUT or to file)""" if result is None: return if args.output_dir is None: - sys.stdout.write(result + '\n') + sys.stdout.write(result + "\n") else: - destination_path, destination_dir = determine_output_path(args, orig_filename, result, counter, new_filename) + destination_path, destination_dir = determine_output_path( + args, orig_filename, result, counter, new_filename + ) # check the directory status if check_outputdir_status(destination_dir) is True: - with open(destination_path, mode='w', encoding='utf-8') as outputfile: + with open(destination_path, mode="w", encoding="utf-8") as outputfile: outputfile.write(result) -def generate_filelist(inputdir): - '''Walk the directory tree and output all file names''' +def generate_filelist(inputdir: str) -> Generator[str, None, None]: + "Walk the directory tree and output all file names." for root, _, inputfiles in walk(inputdir): for fname in inputfiles: yield path.join(root, fname) -def file_processing(filename, args, counter=None, options=None): - '''Aggregated functions to process a file in a list''' +def file_processing( + filename: str, args: Any, counter: int = -1, options: Optional[Extractor] = None +) -> None: + "Aggregated functions to process a file in a list." if not options: options = args_to_extractor(args) options.source = filename - with open(filename, 'rb') as inputf: + with open(filename, "rb") as inputf: htmlstring = inputf.read() file_stat = stat(filename) ref_timestamp = min(file_stat.st_ctime, file_stat.st_mtime) - options.date_params["max_date"] = datetime.fromtimestamp(ref_timestamp).strftime("%Y-%m-%d") + options.date_params["max_date"] = datetime.fromtimestamp(ref_timestamp).strftime( + "%Y-%m-%d" + ) result = examine(htmlstring, args, options=options) write_result(result, args, filename, counter, new_filename=None) -def process_result(htmlstring, args, counter, options): - '''Extract text and metadata from a download webpage and eventually write out the result''' +def process_result( + htmlstring: str, args: Any, counter: int, options: Optional[Extractor] +) -> int: + "Extract text and metadata from a download webpage and eventually write out the result." # backup option - fileslug = archive_html(htmlstring, args, counter) if args.backup_dir else None + fileslug = archive_html(htmlstring, args, counter) if args.backup_dir else "" # process result = examine(htmlstring, args, options=options) - write_result(result, args, orig_filename=fileslug, counter=counter, new_filename=fileslug) + write_result( + result, args, orig_filename=fileslug, counter=counter, new_filename=fileslug + ) # increment written file counter - if counter is not None and result is not None: + if counter >= 0 and result: counter += 1 return counter -def download_queue_processing(url_store, args, counter, options): - '''Implement a download queue consumer, single- or multi-threaded''' +def download_queue_processing( + url_store: UrlStore, args: Any, counter: int, options: Extractor +) -> Tuple[List[str], int]: + "Implement a download queue consumer, single- or multi-threaded." errors = [] - while url_store.done is False: - bufferlist, url_store = load_download_buffer(url_store, options.config.getfloat('DEFAULT', 'SLEEP_TIME')) + sleep_time = options.config.getfloat("DEFAULT", "SLEEP_TIME") + + while not url_store.done: + bufferlist, url_store = load_download_buffer(url_store, sleep_time) # process downloads - for url, result in buffered_downloads(bufferlist, args.parallel, options=options): + for url, result in buffered_downloads( + bufferlist, args.parallel, options=options + ): # handle result - if result is not None: + if result: options.url = url counter = process_result(result, args, counter, options) else: - LOGGER.warning('No result for URL: %s', url) + LOGGER.warning("No result for URL: %s", url) errors.append(url) return errors, counter -def cli_discovery(args): +def cli_discovery(args: Any) -> None: "Group CLI functions dedicated to URL discovery." url_store = load_input_dict(args) input_urls = url_store.dump_urls() @@ -250,11 +285,11 @@ def cli_discovery(args): options = args_to_extractor(args) func = partial( - find_feed_urls if args.feed else sitemap_search, - target_lang=args.target_language, - external=options.config.getboolean('DEFAULT', 'EXTERNAL_URLS'), - sleep_time=options.config.getfloat('DEFAULT', 'SLEEP_TIME') - ) + find_feed_urls if args.feed else sitemap_search, + target_lang=args.target_language, + external=options.config.getboolean("DEFAULT", "EXTERNAL_URLS"), + sleep_time=options.config.getfloat("DEFAULT", "SLEEP_TIME"), + ) # link discovery and storage with ThreadPoolExecutor(max_workers=args.parallel) as executor: @@ -265,7 +300,11 @@ def cli_discovery(args): if future.result() is not None: url_store.add_urls(future.result()) # empty buffer in order to spare memory - if args.sitemap and args.list and len(url_store.get_known_domains()) >= args.parallel: + if ( + args.sitemap + and args.list + and len(url_store.get_known_domains()) >= args.parallel + ): url_store.print_unvisited_urls() url_store.reset() reset_caches() @@ -280,27 +319,33 @@ def cli_discovery(args): cli_crawler(args, url_store=control_dict, options=options) -def build_exploration_dict(url_store, input_urls, args): +def build_exploration_dict( + url_store: UrlStore, input_urls: List[str], args: Any +) -> UrlStore: "Find domains for which nothing has been found and add info to the crawl dict." input_domains = {extract_domain(u) for u in input_urls} - known_domains = {extract_domain(u) for u in url_store.get_known_domains()} - still_to_crawl = input_domains - known_domains + still_to_crawl = input_domains - { + extract_domain(u) for u in url_store.get_known_domains() + } new_input_urls = [u for u in input_urls if extract_domain(u) in still_to_crawl] - control_dict = add_to_compressed_dict( - new_input_urls, - blacklist=args.blacklist, - url_filter=args.url_filter, - verbose=args.verbose - ) - return control_dict - - -def cli_crawler(args, n=30, url_store=None, options=None): - '''Start a focused crawler which downloads a fixed number of URLs within a website - and prints the links found in the process''' - if not options: - options = args_to_extractor(args) - sleep_time = options.config.getfloat('DEFAULT', 'SLEEP_TIME') + return add_to_compressed_dict( + new_input_urls, + blacklist=args.blacklist, + url_filter=args.url_filter, + verbose=args.verbose, + ) + + +def cli_crawler( + args: Any, + n: int = 30, + url_store: Optional[UrlStore] = None, + options: Optional[Extractor] = None, +) -> None: + """Start a focused crawler which downloads a fixed number of URLs within a website + and prints the links found in the process.""" + options = options or args_to_extractor(args) + sleep_time = options.config.getfloat("DEFAULT", "SLEEP_TIME") param_dict = {} # load input URLs @@ -313,27 +358,33 @@ def cli_crawler(args, n=30, url_store=None, options=None): for hostname in spider.URL_STORE.get_known_domains(): if spider.URL_STORE.urldict[hostname].tuples: startpage = spider.URL_STORE.get_url(hostname, as_visited=False) - param_dict[hostname] = spider.init_crawl(startpage, lang=args.target_language) + if startpage: + param_dict[hostname] = spider.init_crawl( + startpage, lang=args.target_language + ) # update info # TODO: register changes? # if base_url != hostname: # ... # iterate until the threshold is reached - while spider.URL_STORE.done is False: - bufferlist, spider.URL_STORE = load_download_buffer(spider.URL_STORE, sleep_time) - for url, result in buffered_downloads(bufferlist, args.parallel, decode=False, options=options): + while not spider.URL_STORE.done: + bufferlist, spider.URL_STORE = load_download_buffer( + spider.URL_STORE, sleep_time + ) + for url, result in buffered_downloads( + bufferlist, args.parallel, decode=False, options=options + ): if result is not None: - base_url = get_base_url(url) - spider.process_response(result, param_dict[base_url]) + spider.process_response(result, param_dict[get_base_url(url)]) # early exit if maximum count is reached if any(c >= n for c in spider.URL_STORE.get_all_counts()): break - print('\n'.join(u for u in spider.URL_STORE.dump_urls())) + print("\n".join(u for u in spider.URL_STORE.dump_urls())) -def probe_homepage(args): +def probe_homepage(args: Any) -> None: "Probe websites for extractable content and print the fitting ones." input_urls = load_input_urls(args) options = args_to_extractor(args) @@ -341,75 +392,93 @@ def probe_homepage(args): for url, result in buffered_downloads(input_urls, args.parallel, options=options): if result is not None: result = html2txt(result) - if result and len(result) > options.min_extracted_size and any(c.isalpha() for c in result): - if not LANGID_FLAG or not args.target_language or language_classifier(result, "") == args.target_language: + if ( + result + and len(result) > options.min_extracted_size + and any(c.isalpha() for c in result) + ): + if ( + not LANGID_FLAG + or not args.target_language + or language_classifier(result, "") == args.target_language + ): print(url, flush=True) -def url_processing_pipeline(args, url_store): - '''Aggregated functions to show a list and download and process an input list''' - # print list without further processing +def url_processing_pipeline(args: Any, url_store: UrlStore) -> bool: + "Aggregated functions to show a list and download and process an input list." if args.list: url_store.print_unvisited_urls() # and not write_result() return False # and not sys.exit(0) options = args_to_extractor(args) + counter = 0 if url_store.total_url_number() > MAX_FILES_PER_DIRECTORY else -1 - # initialize file counter if necessary - if url_store.total_url_number() > MAX_FILES_PER_DIRECTORY: - counter = 0 - else: - counter = None # download strategy errors, counter = download_queue_processing(url_store, args, counter, options) - LOGGER.debug('%s URLs could not be found', len(errors)) - # option to retry + LOGGER.debug("%s URLs could not be found", len(errors)) + if args.archived is True: url_store = UrlStore() - url_store.add_urls(['https://web.archive.org/web/20/' + e for e in errors]) - if len(url_store.find_known_urls('https://web.archive.org')) > 0: - archived_errors, _ = download_queue_processing(url_store, args, counter, options) - LOGGER.debug('%s archived URLs out of %s could not be found', len(archived_errors), len(errors)) + url_store.add_urls(["https://web.archive.org/web/20/" + e for e in errors]) + if len(url_store.find_known_urls("https://web.archive.org")) > 0: + archived_errors, _ = download_queue_processing( + url_store, args, counter, options + ) + LOGGER.debug( + "%s archived URLs out of %s could not be found", + len(archived_errors), + len(errors), + ) # pass information along if URLs are missing return bool(archived_errors) - # pass information along if URLs are missing + return bool(errors) -def file_processing_pipeline(args): - '''Define batches for parallel file processing and perform the extraction''' - filecounter = None +def file_processing_pipeline(args: Any) -> None: + "Define batches for parallel file processing and perform the extraction." + filecounter = -1 options = args_to_extractor(args) - timeout = options.config.getint('DEFAULT', 'EXTRACTION_TIMEOUT') + timeout = options.config.getint("DEFAULT", "EXTRACTION_TIMEOUT") # max_tasks_per_child available in Python >= 3.11 with ProcessPoolExecutor(max_workers=args.parallel) as executor: # chunk input: https://github.com/python/cpython/issues/74028 - for filebatch in make_chunks(generate_filelist(args.input_dir), MAX_FILES_PER_DIRECTORY): - if filecounter is None and len(filebatch) >= MAX_FILES_PER_DIRECTORY: + for filebatch in make_chunks( + generate_filelist(args.input_dir), MAX_FILES_PER_DIRECTORY + ): + if filecounter < 0 and len(filebatch) >= MAX_FILES_PER_DIRECTORY: filecounter = 0 - worker = partial(file_processing, args=args, counter=filecounter, options=options) + worker = partial( + file_processing, args=args, counter=filecounter, options=options + ) executor.map(worker, filebatch, chunksize=10, timeout=timeout) # update counter - if filecounter is not None: + if filecounter >= 0: filecounter += len(filebatch) -def examine(htmlstring, args, url=None, options=None): - """Generic safeguards and triggers""" +def examine( + htmlstring: Optional[Any], + args: Any, + url: Optional[str] = None, + options: Optional[Extractor] = None, +) -> Optional[str]: + "Generic safeguards and triggers around extraction function." result = None if not options: options = args_to_extractor(args, url) # safety check if htmlstring is None: - sys.stderr.write('ERROR: empty document\n') + sys.stderr.write("ERROR: empty document\n") elif not is_acceptable_length(len(htmlstring), options): - sys.stderr.write('ERROR: file size\n') + sys.stderr.write("ERROR: file size\n") # proceed else: try: result = extract(htmlstring, options=options) # ugly but efficient except Exception as err: - sys.stderr.write(f'ERROR: {str(err)}' + '\n' + traceback.format_exc() + '\n') + sys.stderr.write(f"ERROR: {str(err)}\n{traceback.format_exc()}\n") return result diff --git a/trafilatura/downloads.py b/trafilatura/downloads.py index f2be990e..8f0e6858 100644 --- a/trafilatura/downloads.py +++ b/trafilatura/downloads.py @@ -361,7 +361,7 @@ def buffered_downloads( download_threads: int, decode: bool = True, options: Optional[Extractor] = None, -) -> Generator[Tuple[str, str], None, None]: +) -> Generator[Tuple[str, Union[Response, str]], None, None]: """Download queue consumer, single- or multi-threaded.""" worker = partial(fetch_url, options=options) if decode else fetch_response with ThreadPoolExecutor(max_workers=download_threads) as executor: diff --git a/trafilatura/settings.py b/trafilatura/settings.py index 155c60f2..ace5112f 100644 --- a/trafilatura/settings.py +++ b/trafilatura/settings.py @@ -6,7 +6,7 @@ from configparser import ConfigParser from datetime import datetime from html import unescape -from typing import Dict, Optional +from typing import Any, Dict, Optional try: from os import sched_getaffinity @@ -119,7 +119,7 @@ def _add_config(self, config): self.config = config -def args_to_extractor(args, url=None): +def args_to_extractor(args: Any, url: Optional[str] = None) -> Extractor: "Derive extractor configuration from CLI args." options = Extractor( config=use_config(filename=args.config_file), output_format=args.output_format, @@ -135,7 +135,7 @@ def args_to_extractor(args, url=None): return options -def set_date_params(extensive=True): +def set_date_params(extensive: bool = True): "Provide default parameters for date extraction." return { "original_date": True,