From 439899dd8b36869ecb2d70be0356c1860ff9570b Mon Sep 17 00:00:00 2001 From: Adrien Barbaresi Date: Thu, 9 Nov 2023 13:15:07 +0100 Subject: [PATCH] format + type hinting --- tests/feeds_tests.py | 242 ++++++++++++++++++++++++++++++------------- trafilatura/feeds.py | 177 ++++++++++++++++++++----------- 2 files changed, 286 insertions(+), 133 deletions(-) diff --git a/tests/feeds_tests.py b/tests/feeds_tests.py index f9466465..5b32e888 100644 --- a/tests/feeds_tests.py +++ b/tests/feeds_tests.py @@ -13,183 +13,283 @@ logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) TEST_DIR = os.path.abspath(os.path.dirname(__file__)) -RESOURCES_DIR = os.path.join(TEST_DIR, 'resources') +RESOURCES_DIR = os.path.join(TEST_DIR, "resources") XMLDECL = '\n' def test_atom_extraction(): - '''Test link extraction from an Atom feed''' - params = feeds.FeedParameters('https://example.org', 'example.org', '') + """Test link extraction from an Atom feed""" + params = feeds.FeedParameters("https://example.org", "example.org", "") assert not feeds.extract_links(None, params) - assert len(feeds.extract_links('', params)) == 0 + assert len(feeds.extract_links("", params)) == 0 - filepath = os.path.join(RESOURCES_DIR, 'feed1.atom') + filepath = os.path.join(RESOURCES_DIR, "feed1.atom") with open(filepath, "r", encoding="utf-8") as f: teststring = f.read() assert len(feeds.extract_links(teststring, params)) > 0 - params = feeds.FeedParameters('https://www.dwds.de', 'dwds.de', '') + params = feeds.FeedParameters("https://www.dwds.de", "dwds.de", "") assert ( len( feeds.extract_links( f'{XMLDECL}', - params + params, ) ) == 0 ) - params = feeds.FeedParameters('http://example.org', 'example.org', 'http://example.org') + params = feeds.FeedParameters( + "http://example.org", "example.org", "http://example.org" + ) assert ( len( feeds.extract_links( f'{XMLDECL}', - params + params, ) ) == 0 ) - params = feeds.FeedParameters('https://example.org', 'example.org', '') + params = feeds.FeedParameters("https://example.org", "example.org", "") assert ( len( feeds.extract_links( f'{XMLDECL}', - params + params, ) ) == 0 ) - params = feeds.FeedParameters('http://example.org/', 'example.org', 'http://example.org') + params = feeds.FeedParameters( + "http://example.org/", "example.org", "http://example.org" + ) assert feeds.extract_links( - f'{XMLDECL}', - params) == ['http://example.org/article1/'] # TODO: remove slash? + f'{XMLDECL}', params + ) == [ + "http://example.org/article1/" + ] # TODO: remove slash? def test_rss_extraction(): - '''Test link extraction from a RSS feed''' - params = feeds.FeedParameters('http://example.org/', 'example.org', '') + """Test link extraction from a RSS feed""" + params = feeds.FeedParameters("http://example.org/", "example.org", "") assert ( len( feeds.extract_links( - f'{XMLDECL}http://example.org/article1/', - params + f"{XMLDECL}http://example.org/article1/", params ) ) == 1 ) # CDATA assert feeds.extract_links( - f'{XMLDECL}', - params) == ['http://example.org/article1/'] # TODO: remove slash? + f"{XMLDECL}", params + ) == [ + "http://example.org/article1/" + ] # TODO: remove slash? # spaces - params = feeds.FeedParameters('https://www.ak-kurier.de/', 'ak-kurier.de', '') - assert len(feeds.extract_links(XMLDECL + '\r\n https://www.ak-kurier.de/akkurier/www/artikel/108815-sinfonisches-blasorchester-spielt-1500-euro-fuer-kinder-in-drk-krankenhaus-kirchen-ein ', params)) == 1 - - params = feeds.FeedParameters('http://example.org', 'example.org', 'http://example.org') + params = feeds.FeedParameters("https://www.ak-kurier.de/", "ak-kurier.de", "") assert ( len( feeds.extract_links( - f'{XMLDECL}http://example.org/', - params + XMLDECL + + "\r\n https://www.ak-kurier.de/akkurier/www/artikel/108815-sinfonisches-blasorchester-spielt-1500-euro-fuer-kinder-in-drk-krankenhaus-kirchen-ein ", + params, ) ) + == 1 + ) + + params = feeds.FeedParameters( + "http://example.org", "example.org", "http://example.org" + ) + assert ( + len(feeds.extract_links(f"{XMLDECL}http://example.org/", params)) == 0 ) - params = feeds.FeedParameters('http://example.org', 'example.org', '') + params = feeds.FeedParameters("http://example.org", "example.org", "") assert ( - len( - feeds.extract_links( - f'{XMLDECL}https://example.org', - params - ) - ) + len(feeds.extract_links(f"{XMLDECL}https://example.org", params)) == 0 ) - params = feeds.FeedParameters('https://www.dwds.de', 'dwds.de', 'https://www.dwds.de') + params = feeds.FeedParameters( + "https://www.dwds.de", "dwds.de", "https://www.dwds.de" + ) assert feeds.extract_links( - f'{XMLDECL}/api/feed/themenglossar/Corona', - params - ) == ['https://www.dwds.de/api/feed/themenglossar/Corona'] + f"{XMLDECL}/api/feed/themenglossar/Corona", params + ) == ["https://www.dwds.de/api/feed/themenglossar/Corona"] - params = feeds.FeedParameters('https://example.org', 'example.org', '') - filepath = os.path.join(RESOURCES_DIR, 'feed2.rss') + params = feeds.FeedParameters("https://example.org", "example.org", "") + filepath = os.path.join(RESOURCES_DIR, "feed2.rss") with open(filepath, "r", encoding="utf-8") as f: teststring = f.read() assert len(feeds.extract_links(teststring, params)) > 0 def test_json_extraction(): - '''Test link extraction from a JSON feed''' + """Test link extraction from a JSON feed""" # find link - params = feeds.FeedParameters('https://www.jsonfeed.org', 'jsonfeed.org', '') - assert len(feeds.determine_feed('>', params)) == 1 + params = feeds.FeedParameters("https://www.jsonfeed.org", "jsonfeed.org", "") + assert ( + len( + feeds.determine_feed( + '>', + params, + ) + ) + == 1 + ) # extract data - filepath = os.path.join(RESOURCES_DIR, 'feed.json') + filepath = os.path.join(RESOURCES_DIR, "feed.json") with open(filepath, "r", encoding="utf-8") as f: teststring = f.read() - params = feeds.FeedParameters('https://npr.org', 'npr.org', '') + params = feeds.FeedParameters("https://npr.org", "npr.org", "") links = feeds.extract_links(teststring, params) assert len(links) == 25 # id as a backup - params = feeds.FeedParameters('https://example.org', 'example.org', '') - links = feeds.extract_links(r'{"version":"https:\/\/jsonfeed.org\/version\/1","items":[{"id":"https://www.example.org/1","title":"Test"}]}', params) + params = feeds.FeedParameters("https://example.org", "example.org", "") + links = feeds.extract_links( + r'{"version":"https:\/\/jsonfeed.org\/version\/1","items":[{"id":"https://www.example.org/1","title":"Test"}]}', + params, + ) assert len(links) == 1 def test_feeds_helpers(): - '''Test helper functions for feed extraction''' - params = feeds.FeedParameters('https://example.org', 'example.org', 'https://example.org') + """Test helper functions for feed extraction""" + params = feeds.FeedParameters( + "https://example.org", "example.org", "https://example.org" + ) domainname, baseurl = get_hostinfo("https://example.org") assert domainname == params.domain and baseurl == params.base # nothing useful - assert len(feeds.determine_feed('', params)) == 0 - assert len(feeds.determine_feed('', params)) == 0 + assert len(feeds.determine_feed("", params)) == 0 + assert ( + len( + feeds.determine_feed( + '', + params, + ) + ) + == 0 + ) # useful - assert len(feeds.determine_feed('', params)) == 1 - assert len(feeds.determine_feed('', params)) == 1 - assert len(feeds.determine_feed('', params)) == 1 - assert len(feeds.determine_feed('', params)) == 1 - assert len(feeds.determine_feed('', params)) == 1 + assert ( + len( + feeds.determine_feed( + '', + params, + ) + ) + == 1 + ) + assert ( + len( + feeds.determine_feed( + '', + params, + ) + ) + == 1 + ) + assert ( + len( + feeds.determine_feed( + '', + params, + ) + ) + == 1 + ) + assert ( + len( + feeds.determine_feed( + '', + params, + ) + ) + == 1 + ) + assert ( + len( + feeds.determine_feed( + '', + params, + ) + ) + == 1 + ) # no comments wanted - assert len(feeds.determine_feed('', params)) == 0 + assert ( + len( + feeds.determine_feed( + '', + params, + ) + ) + == 0 + ) # invalid links - params = feeds.FeedParameters('example.org', 'example.org', 'https://example.org') # fix - assert len(feeds.determine_feed('', params)) == 0 + params = feeds.FeedParameters( + "example.org", "example.org", "https://example.org" + ) # fix + assert ( + len( + feeds.determine_feed( + '', + params, + ) + ) + == 0 + ) # detecting in -elements - params = feeds.FeedParameters('https://example.org', 'example.org', 'https://example.org') - assert feeds.determine_feed('', params) == ['https://example.org/feed.xml'] - assert feeds.determine_feed('', params) == ['https://example.org/feed.atom'] - assert feeds.determine_feed('', params) == ['https://example.org/rss'] + params = feeds.FeedParameters( + "https://example.org", "example.org", "https://example.org" + ) + assert feeds.determine_feed( + '', params + ) == ["https://example.org/feed.xml"] + assert feeds.determine_feed( + '', params + ) == ["https://example.org/feed.atom"] + assert feeds.determine_feed( + '', params + ) == ["https://example.org/rss"] # feed discovery - assert not feeds.find_feed_urls('http://') - assert not feeds.find_feed_urls('https://httpbun.org/status/404') + assert not feeds.find_feed_urls("http://") + assert not feeds.find_feed_urls("https://httpbun.org/status/404") # Feedburner/Google links - assert feeds.handle_link_list(['https://feedproxy.google.com/ABCD'], params) == ['https://feedproxy.google.com/ABCD'] + assert feeds.handle_link_list(["https://feedproxy.google.com/ABCD"], params) == [ + "https://feedproxy.google.com/ABCD" + ] # override failed checks - assert feeds.handle_link_list(['https://feedburner.com/kat/1'], params) == ['https://feedburner.com/kat/1'] + assert feeds.handle_link_list(["https://feedburner.com/kat/1"], params) == [ + "https://feedburner.com/kat/1" + ] # diverging domain names - assert not feeds.handle_link_list(['https://www.software.info/1'], params) + assert not feeds.handle_link_list(["https://www.software.info/1"], params) def test_cli_behavior(): - '''Test command-line interface with respect to feeds''' - testargs = ['', '--list', '--feed', 'https://httpbun.org/xml'] - with patch.object(sys, 'argv', testargs): + """Test command-line interface with respect to feeds""" + testargs = ["", "--list", "--feed", "https://httpbun.org/xml"] + with patch.object(sys, "argv", testargs): assert cli.main() is None -if __name__ == '__main__': +if __name__ == "__main__": test_atom_extraction() test_rss_extraction() test_json_extraction() diff --git a/trafilatura/feeds.py b/trafilatura/feeds.py index eddf02d3..a6528ea3 100644 --- a/trafilatura/feeds.py +++ b/trafilatura/feeds.py @@ -10,10 +10,16 @@ import re from itertools import islice -from typing import Optional +from typing import List, Optional -from courlan import (check_url, clean_url, filter_urls, fix_relative_urls, - get_hostinfo, validate_url) +from courlan import ( + check_url, + clean_url, + filter_urls, + fix_relative_urls, + get_hostinfo, + validate_url, +) from .downloads import fetch_url from .settings import MAX_LINKS @@ -21,22 +27,43 @@ LOGGER = logging.getLogger(__name__) -FEED_TYPES = {'application/atom+xml', 'application/json', 'application/rdf+xml', 'application/rss+xml', 'application/x.atom+xml', 'application/x-atom+xml', 'text/atom+xml', 'text/plain', 'text/rdf+xml', 'text/rss+xml', 'text/xml'} -FEED_EXTENSIONS = {'.rss', '.rdf', '.xml'} -FEED_OPENING = re.compile(r'<(feed|rss|\?xml)') +FEED_TYPES = { + "application/atom+xml", + "application/json", + "application/rdf+xml", + "application/rss+xml", + "application/x.atom+xml", + "application/x-atom+xml", + "text/atom+xml", + "text/plain", + "text/rdf+xml", + "text/rss+xml", + "text/xml", +} +FEED_EXTENSIONS = {".rss", ".rdf", ".xml"} +FEED_OPENING = re.compile(r"<(feed|rss|\?xml)") LINK_ATTRS = re.compile(r'(?:\s*)(?:)?(?:\s*)') +LINK_ELEMENTS = re.compile( + r"(?:\s*)(?:)?(?:\s*)" +) -BLACKLIST = re.compile(r'\bcomments\b') # no comment feed +BLACKLIST = re.compile(r"\bcomments\b") # no comment feed class FeedParameters: "Store necessary information to proceed a feed." __slots__ = ["base", "domain", "ext", "lang", "ref"] - def __init__(self, baseurl: str, domainname: str, reference: str, external: bool = False, target_lang: Optional[str] = None) -> None: + def __init__( + self, + baseurl: str, + domainname: str, + reference: str, + external: bool = False, + target_lang: Optional[str] = None, + ) -> None: self.base: str = baseurl self.domain: str = domainname self.ext: bool = external @@ -44,9 +71,9 @@ def __init__(self, baseurl: str, domainname: str, reference: str, external: bool self.ref: str = reference -def handle_link_list(linklist, params): - '''Examine links to determine if they are valid and - lead to a web page''' +def handle_link_list(linklist: List[str], params: FeedParameters) -> List[str]: + """Examine links to determine if they are valid and + lead to a web page""" output_links = [] # sort and uniq for item in sorted(set(linklist)): @@ -55,92 +82,112 @@ def handle_link_list(linklist, params): # control output for validity checked = check_url(link, language=params.lang) if checked is not None: - if not params.ext and not "feed" in link and not is_similar_domain(params.domain, checked[1]): - LOGGER.warning('Rejected, diverging domain names: %s %s', params.domain, checked[1]) + if ( + not params.ext + and not "feed" in link + and not is_similar_domain(params.domain, checked[1]) + ): + LOGGER.warning( + "Rejected, diverging domain names: %s %s", params.domain, checked[1] + ) else: output_links.append(checked[0]) # Feedburner/Google feeds - elif 'feedburner' in item or 'feedproxy' in item: + elif "feedburner" in item or "feedproxy" in item: output_links.append(item) return output_links -def extract_links(feed_string, params): - '''Extract links from Atom and RSS feeds''' +def extract_links(feed_string: str, params: FeedParameters) -> List[str]: + """Extract links from Atom and RSS feeds""" feed_links = [] # check if it's a feed if feed_string is None: - LOGGER.debug('Empty feed: %s', params.domain) + LOGGER.debug("Empty feed: %s", params.domain) return feed_links feed_string = feed_string.strip() # typical first and second lines absent - if not FEED_OPENING.match(feed_string) and not \ - ('' in feed_string: + elif "" in feed_string: feed_links.extend( - [m[1].strip() for m in islice(LINK_ELEMENTS.finditer(feed_string, re.DOTALL), MAX_LINKS)] + [ + m[1].strip() + for m in islice( + LINK_ELEMENTS.finditer(feed_string, re.DOTALL), MAX_LINKS + ) + ] ) # refine output_links = handle_link_list(feed_links, params) - output_links = [l for l in output_links if l != params.ref and l.count('/') > 2] + output_links = [l for l in output_links if l != params.ref and l.count("/") > 2] # log result if feed_links: - LOGGER.debug('Links found: %s of which %s valid', len(feed_links), len(output_links)) + LOGGER.debug( + "Links found: %s of which %s valid", len(feed_links), len(output_links) + ) else: - LOGGER.debug('Invalid feed for %s', params.domain) + LOGGER.debug("Invalid feed for %s", params.domain) return output_links -def determine_feed(htmlstring, params): - '''Try to extract the feed URL from the home page. - Adapted from http://www.aaronsw.com/2002/feedfinder/''' +def determine_feed(htmlstring: str, params: FeedParameters) -> List[str]: + """Try to extract the feed URL from the home page. + Adapted from http://www.aaronsw.com/2002/feedfinder/""" # parse the page to look for feeds tree = load_html(htmlstring) # safeguard if tree is None: - LOGGER.debug('Invalid HTML/Feed page: %s', params.base) + LOGGER.debug("Invalid HTML/Feed page: %s", params.base) return [] feed_urls = [] for linkelem in tree.xpath('//link[@rel="alternate"]'): # discard elements without links - if 'href' not in linkelem.attrib: + if "href" not in linkelem.attrib: continue # most common case + websites like geo.de - if ('type' in linkelem.attrib and linkelem.get('type') in FEED_TYPES) or \ - 'atom' in linkelem.get('href') or 'rss' in linkelem.get('href'): - feed_urls.append(linkelem.get('href')) + if ( + ("type" in linkelem.attrib and linkelem.get("type") in FEED_TYPES) + or "atom" in linkelem.get("href") + or "rss" in linkelem.get("href") + ): + feed_urls.append(linkelem.get("href")) # backup if not feed_urls: - for linkelem in tree.xpath('//a[@href]'): - if linkelem.get('href')[-4:].lower() in FEED_EXTENSIONS or \ - linkelem.get('href')[-5:].lower() == '.atom' or \ - 'atom' in linkelem.get('href') or 'rss' in linkelem.get('href'): - feed_urls.append(linkelem.get('href')) + for linkelem in tree.xpath("//a[@href]"): + if ( + linkelem.get("href")[-4:].lower() in FEED_EXTENSIONS + or linkelem.get("href")[-5:].lower() == ".atom" + or "atom" in linkelem.get("href") + or "rss" in linkelem.get("href") + ): + feed_urls.append(linkelem.get("href")) # refine output_urls = [] for link in sorted(set(feed_urls)): @@ -152,11 +199,15 @@ def determine_feed(htmlstring, params): continue output_urls.append(link) # log result - LOGGER.debug('Feed URLs found: %s of which %s valid', len(feed_urls), len(output_urls)) + LOGGER.debug( + "Feed URLs found: %s of which %s valid", len(feed_urls), len(output_urls) + ) return output_urls -def find_feed_urls(url, target_lang=None, external=False): +def find_feed_urls( + url: str, target_lang: Optional[str] = None, external: bool = False +) -> List[str]: """Try to find feed URLs. Args: @@ -173,9 +224,9 @@ def find_feed_urls(url, target_lang=None, external=False): """ domainname, baseurl = get_hostinfo(url) if domainname is None: - LOGGER.warning('Invalid URL: %s', url) + LOGGER.warning("Invalid URL: %s", url) return [] - params = FeedParameters(baseurl, domainname, external, target_lang, url) + params = FeedParameters(baseurl, domainname, url, external, target_lang) urlfilter = None downloaded = fetch_url(url) if downloaded is not None: @@ -192,28 +243,30 @@ def find_feed_urls(url, target_lang=None, external=False): # return links found if len(feed_links) > 0: feed_links = filter_urls(feed_links, urlfilter) - LOGGER.debug('%s feed links found for %s', len(feed_links), domainname) + LOGGER.debug("%s feed links found for %s", len(feed_links), domainname) return feed_links - LOGGER.debug('No usable feed links found: %s', url) + LOGGER.debug("No usable feed links found: %s", url) else: - LOGGER.error('Could not download web page: %s', url) - if url.strip('/') != baseurl: + LOGGER.error("Could not download web page: %s", url) + if url.strip("/") != baseurl: return try_homepage(baseurl, target_lang) # try alternative: Google News if target_lang is not None: downloaded = fetch_url( - f'https://news.google.com/rss/search?q=site:{baseurl}&hl={target_lang}&scoring=n&num=100' + f"https://news.google.com/rss/search?q=site:{baseurl}&hl={target_lang}&scoring=n&num=100" ) if downloaded is not None: feed_links = extract_links(downloaded, params) feed_links = filter_urls(feed_links, urlfilter) - LOGGER.debug('%s Google news links found for %s', len(feed_links), domainname) + LOGGER.debug( + "%s Google news links found for %s", len(feed_links), domainname + ) return feed_links return [] -def try_homepage(baseurl, target_lang): - '''Shift into reverse and try the homepage instead of the particular feed - page that was given as input.''' - LOGGER.debug('Probing homepage for feeds instead: %s', baseurl) +def try_homepage(baseurl: str, target_lang: Optional[str]) -> List[str]: + """Shift into reverse and try the homepage instead of the particular feed + page that was given as input.""" + LOGGER.debug("Probing homepage for feeds instead: %s", baseurl) return find_feed_urls(baseurl, target_lang)