diff --git a/CHANGELOG.md b/CHANGELOG.md index 7b82d0add..68b744c09 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,8 @@ - Fix to avoid schema download if not configured #2530. #### Experts +- `intelmq.bots.experts.securitytxt`: + - Added new bot (PR#2538 by Frank Westers and Sebastian Wagner) #### Outputs - `intelmq.bots.outputs.cif3.output`: diff --git a/docs/dev/release.md b/docs/dev/release.md index 914e31523..3c8da229e 100644 --- a/docs/dev/release.md +++ b/docs/dev/release.md @@ -30,8 +30,6 @@ These apply to all projects: - `intelmq/version.py`: Update the version. -Eventually adapt the default log levels if necessary. Should be INFO for stable releases. - ### IntelMQ API - `intelmq_api/version.py`: Update the version. diff --git a/docs/user/bots.md b/docs/user/bots.md index 433fe98ef..56edbd158 100644 --- a/docs/user/bots.md +++ b/docs/user/bots.md @@ -3524,6 +3524,56 @@ to true. (optional, boolean) Query for IPs at `https://stat.ripe.net/data/maxmind-geo-lite/data.json?resource=%s`. Defaults to true. +--- + +### SecurityTXT
+ +SecurityTXT is an initiative to standardize how websites publish their abuse contact information. +It is standardized in [RFC 9116 "A File Format to Aid in Security Vulnerability Disclosure"](https://datatracker.ietf.org/doc/rfc9116/). +Refer to the linked document RFC for more information on `security.txt`. +This bot looks for `security.txt` files on a URL or IP, retrieves the primary contact information out of it and adds this to the event. + +**Requirements** + +To use this bot, you need to install the required dependencies: + +```bash +pip3 install -r intelmq/bots/experts/securitytxt/REQUIREMENTS.txt +``` + +**Module:** `intelmq.bots.experts.securitytxt.expert` + +**Parameters** + +**`url_field`** + +The field in the event that contains the URL/IP on which to look for the the security.txt file. Default: `source.reverse_dns` + +**`contact_field`** + +The field in the event in which to put the found contact details. Default: `source.abuse_contact` + +**`only_email_address`** (bool) + +Contact details can be web URLs or email addresses. When this value is set to True, it only selects email addresses as contact information. +Default: `true` + +**`overwrite`** (bool) + +Boolean indicating whether to override existing data in contact_field. +Default: `true` + +**`check_expired`** (bool) + +Boolean indicating whether to check if the security.txt has expired according to its own expiry date. +Default: `false` + +**`check_canonical`** (bool) + +Boolean indicating whether to check if the url is contained in the list of canonical urls. +Default: `false` + + --- ### Sieve diff --git a/intelmq/__init__.py b/intelmq/__init__.py index 0fb77e825..b5a6154b0 100644 --- a/intelmq/__init__.py +++ b/intelmq/__init__.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: 2014 Tomás Lima +# SPDX-FileCopyrightText: 2014 Tomás Lima, 2015-2020 nic.at GmbH, 2024 Institute for Common Good Technology # # SPDX-License-Identifier: AGPL-3.0-or-later @@ -29,7 +29,12 @@ VAR_STATE_PATH = os.path.join(ROOT_DIR, "var/lib/bots/") -DEFAULT_LOGGING_LEVEL = "INFO" +if isinstance(__version_info__[-1], str) and __version_info__[-1][0].lower() in ('a', 'b', 'd'): + # for alpha, beta and dev instances, set default log level to DEBUG, for others, including RCs, use INFO + DEFAULT_LOGGING_LEVEL = "DEBUG" +else: + DEFAULT_LOGGING_LEVEL = "INFO" + HARMONIZATION_CONF_FILE = os.path.join(CONFIG_DIR, "harmonization.conf") RUNTIME_CONF_FILE = os.path.join(CONFIG_DIR, "runtime.yaml") old_runtime_conf_file = pathlib.Path(RUNTIME_CONF_FILE).with_suffix('.conf') diff --git a/intelmq/bots/experts/securitytxt/REQUIREMENTS.txt b/intelmq/bots/experts/securitytxt/REQUIREMENTS.txt new file mode 100644 index 000000000..3b93c2981 --- /dev/null +++ b/intelmq/bots/experts/securitytxt/REQUIREMENTS.txt @@ -0,0 +1,4 @@ +# SPDX-FileCopyrightText: 2022 Frank Westers, 2024 Institute for Common Good Technology +# SPDX-License-Identifier: AGPL-3.0-or-later + +wellknown-securitytxt \ No newline at end of file diff --git a/intelmq/bots/experts/securitytxt/__init__.py b/intelmq/bots/experts/securitytxt/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/intelmq/bots/experts/securitytxt/expert.py b/intelmq/bots/experts/securitytxt/expert.py new file mode 100644 index 000000000..94f2815cd --- /dev/null +++ b/intelmq/bots/experts/securitytxt/expert.py @@ -0,0 +1,113 @@ +# SPDX-FileCopyrightText: 2022 Frank Westers, 2024 Institute for Common Good Technology +# +# SPDX-License-Identifier: AGPL-3.0-or-later + +from typing import Optional + +import requests + +from intelmq.lib.bot import ExpertBot +from intelmq.lib.exceptions import MissingDependencyError + +try: + from securitytxt import SecurityTXT +except (ImportError, ModuleNotFoundError): + SecurityTXT = None + + +class SecurityTXTExpertBot(ExpertBot): + """ + A bot for retrieving contact details from a security.txt + """ + """ + url_field: The field where to find the url which should be searched + contact_field: Field in which to place the found contact details + + only_email_address: whether to select only email addresses as contact detail (no web urls) + overwrite: whether to override existing data + check_expired / check_canonical: whether to perform checks on expiry date / canonical urls. + """ + url_field: str = "source.reverse_dns" + contact_field: str = "source.abuse_contact" + + only_email_address: bool = True + overwrite: bool = True + check_expired: bool = False + check_canonical: bool = False + + def init(self): + if SecurityTXT is None: + raise MissingDependencyError('wellknown-securitytxt') + + def process(self): + event = self.receive_message() + + try: + self.check_prerequisites(event) + primary_contact = self.get_primary_contact(event.get(self.url_field)) + event.add(self.contact_field, primary_contact, overwrite=self.overwrite) + except NotMeetsRequirementsError as e: + self.logger.debug("Skipping event (%s).", e) + except ContactNotFoundError as e: + self.logger.debug("No contact found: %s Continue.", e) + + self.send_message(event) + self.acknowledge_message() + + def check_prerequisites(self, event) -> None: + """ + Check whether this event should be processed by this bot, or can be skipped. + :param event: The event to evaluate. + """ + if not event.get(self.url_field, False): + raise NotMeetsRequirementsError("The URL field is empty.") + if event.get(self.contact_field, False) and not self.overwrite: + raise NotMeetsRequirementsError("All replace values already set.") + + def get_primary_contact(self, url: str) -> Optional[str]: + """ + Given a url, get the file, check it's validity and look for contact details. The primary contact details are + returned. If only_email_address is set to True, it will only return email addresses (no urls). + :param url: The URL on which to look for a security.txt file + :return: The contact information + :raises ContactNotFoundError: if contact cannot be found + """ + try: + securitytxt = SecurityTXT.from_url(url) + if not self.security_txt_is_valid(securitytxt): + raise ContactNotFoundError("SecurityTXT File not valid.") + for contact in securitytxt.contact: + if not self.only_email_address or SecurityTXTExpertBot.is_email_address(contact): + return contact + raise ContactNotFoundError("No contact details found in SecurityTXT.") + except (FileNotFoundError, AttributeError, requests.exceptions.RequestException): + raise ContactNotFoundError("SecurityTXT file could not be found or parsed.") + + def security_txt_is_valid(self, securitytxt: SecurityTXT): + """ + Determine whether a security.txt file is valid according to parameters of the bot. + :param securitytxt: The securityTXT object + :return: Whether the securitytxt is valid. + """ + return (not self.check_expired or not securitytxt.expired) and \ + (not self.check_canonical or securitytxt.canonical_url()) + + @staticmethod + def is_email_address(contact: str): + """ + Determine whether the argument is an email address + :param contact: the contact + :return: whether contact is email address + """ + return 'mailto:' in contact or '@' in contact + + +class NotMeetsRequirementsError(Exception): + pass + + +class ContactNotFoundError(Exception): + pass + + +BOT = SecurityTXTExpertBot diff --git a/intelmq/tests/bots/experts/securitytxt/__init__.py b/intelmq/tests/bots/experts/securitytxt/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/intelmq/tests/bots/experts/securitytxt/test_expert.py b/intelmq/tests/bots/experts/securitytxt/test_expert.py new file mode 100644 index 000000000..cf01285ba --- /dev/null +++ b/intelmq/tests/bots/experts/securitytxt/test_expert.py @@ -0,0 +1,98 @@ +# SPDX-FileCopyrightText: 2022 Frank Westers +# +# SPDX-License-Identifier: AGPL-3.0-or-later + +# -*- coding: utf-8 -*- +""" +Testing the SecurityTXT Expert Bot +""" + +import unittest + +import requests_mock + +import intelmq.lib.test as test +from intelmq.bots.experts.securitytxt.expert import SecurityTXTExpertBot + +EXAMPLE_INPUT_IP = {"__type": "Event", + "source.ip": "192.168.123.123"} + +EXPECTED_OUTPUT_IP = {"__type": "Event", + "source.ip": "192.168.123.123", + "source.account": 'test@test.local'} + +EXAMPLE_INPUT_FQDN = {"__type": "Event", + "source.fqdn": "test.local"} + +EXPECTED_OUTPUT_FQDN = {"__type": "Event", + "source.fqdn": "test.local", + "source.abuse_contact": 'test.local/whitehat'} + +EXPECTED_OUTPUT_FQDN_NO_CONTACT = {"__type": "Event", + "source.fqdn": "test.local"} + +@requests_mock.Mocker() +@test.skip_exotic() +class TestSecurityTXTExpertBot(test.BotTestCase, unittest.TestCase): + """ + A TestCase for the SecurityTXT Expert Bot + """ + + @classmethod + def set_bot(cls): + cls.bot_reference = SecurityTXTExpertBot + + def test_ip(self, m: requests_mock.Mocker): + self._run_generic_test(securitytxt_url=f"https://{EXAMPLE_INPUT_IP['source.ip']}/.well-known/security.txt", + securitytxt=f"Contact: {EXPECTED_OUTPUT_IP['source.account']}", + input_message=EXAMPLE_INPUT_IP, + output_message=EXPECTED_OUTPUT_IP, + config={'url_field': 'source.ip', 'contact_field': 'source.account', + 'only_email_address': False}, + m=m) + + def test_fqdn(self, m: requests_mock.Mocker): + self._run_generic_test(securitytxt_url=f"https://{EXAMPLE_INPUT_FQDN['source.fqdn']}/.well-known/security.txt", + securitytxt=f"Contact: {EXPECTED_OUTPUT_FQDN['source.abuse_contact']}", + input_message=EXAMPLE_INPUT_FQDN, + output_message=EXPECTED_OUTPUT_FQDN, + config={'url_field': 'source.fqdn', 'contact_field': 'source.abuse_contact', + 'only_email_address': False}, + m=m) + + def test_only_email_address_true(self, m: requests_mock.Mocker): + self._run_generic_test(securitytxt_url=f"https://{EXAMPLE_INPUT_FQDN['source.fqdn']}/.well-known/security.txt", + securitytxt=f"Contact: {EXPECTED_OUTPUT_FQDN['source.abuse_contact']}", + input_message=EXAMPLE_INPUT_FQDN, + output_message=EXPECTED_OUTPUT_FQDN_NO_CONTACT, + config={'url_field': 'source.fqdn', 'contact_field': 'source.abuse_contact', + 'only_email_address': True}, + m=m) + + def test_expired(self, m: requests_mock.Mocker): + self._run_generic_test(securitytxt_url=f"https://{EXAMPLE_INPUT_FQDN['source.fqdn']}/.well-known/security.txt", + securitytxt=f"Contact: {EXPECTED_OUTPUT_FQDN['source.abuse_contact']}\nExpires: 1900-12-31T18:37:07.000Z", + input_message=EXAMPLE_INPUT_FQDN, + output_message=EXPECTED_OUTPUT_FQDN_NO_CONTACT, + config={'url_field': 'source.fqdn', 'contact_field': 'source.abuse_contact', + 'only_email_address': False, 'check_expired': True}, + m=m) + + def test_not_expired(self, m: requests_mock.Mocker): + self._run_generic_test(securitytxt_url=f"https://{EXAMPLE_INPUT_FQDN['source.fqdn']}/.well-known/security.txt", + securitytxt=f"Contact: {EXPECTED_OUTPUT_FQDN['source.abuse_contact']}\nExpires: 3000-12-31T18:37:07.000Z", + input_message=EXAMPLE_INPUT_FQDN, + output_message=EXPECTED_OUTPUT_FQDN, + config={'url_field': 'source.fqdn', 'contact_field': 'source.abuse_contact', + 'only_email_address': False, 'check_expired': True}, + m=m) + + def _run_generic_test(self, m: requests_mock.Mocker, config: dict, securitytxt_url: str, securitytxt: str, + input_message: dict, output_message: dict): + self.sysconfig = config + self.prepare_bot() + m.get(requests_mock.ANY, status_code=404) + m.get(securitytxt_url, text=securitytxt) + self.input_message = input_message + self.run_bot() + self.assertMessageEqual(0, output_message)