Skip to content

Commit

Permalink
Merge branch 'develop' into makefile-codestyle
Browse files Browse the repository at this point in the history
  • Loading branch information
sebix authored Jan 3, 2025
2 parents e1f43de + 791b376 commit 0d59d28
Show file tree
Hide file tree
Showing 7 changed files with 267 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
- Fix to avoid schema download if not configured #2530.

#### Experts
- `intelmq.bots.experts.securitytxt`:
- Added new bot (PR#2538 by Frank Westers and Sebastian Wagner)

#### Outputs
- `intelmq.bots.outputs.cif3.output`:
Expand Down
50 changes: 50 additions & 0 deletions docs/user/bots.md
Original file line number Diff line number Diff line change
Expand Up @@ -3524,6 +3524,56 @@ to true.
(optional, boolean) Query for IPs at `https://stat.ripe.net/data/maxmind-geo-lite/data.json?resource=%s`. Defaults to
true.

---

### SecurityTXT <div id="intelmq.bots.experts.securitytxt.expert" />

SecurityTXT is an initiative to standardize how websites publish their abuse contact information.
It is standardized in [RFC 9116 "A File Format to Aid in Security Vulnerability Disclosure"](https://datatracker.ietf.org/doc/rfc9116/).
Refer to the linked document RFC for more information on `security.txt`.
This bot looks for `security.txt` files on a URL or IP, retrieves the primary contact information out of it and adds this to the event.

**Requirements**

To use this bot, you need to install the required dependencies:

```bash
pip3 install -r intelmq/bots/experts/securitytxt/REQUIREMENTS.txt
```

**Module:** `intelmq.bots.experts.securitytxt.expert`

**Parameters**

**`url_field`**

The field in the event that contains the URL/IP on which to look for the the security.txt file. Default: `source.reverse_dns`

**`contact_field`**

The field in the event in which to put the found contact details. Default: `source.abuse_contact`

**`only_email_address`** (bool)

Contact details can be web URLs or email addresses. When this value is set to True, it only selects email addresses as contact information.
Default: `true`

**`overwrite`** (bool)

Boolean indicating whether to override existing data in contact_field.
Default: `true`

**`check_expired`** (bool)

Boolean indicating whether to check if the security.txt has expired according to its own expiry date.
Default: `false`

**`check_canonical`** (bool)

Boolean indicating whether to check if the url is contained in the list of canonical urls.
Default: `false`


---

### Sieve <div id="intelmq.bots.experts.sieve.expert" />
Expand Down
4 changes: 4 additions & 0 deletions intelmq/bots/experts/securitytxt/REQUIREMENTS.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# SPDX-FileCopyrightText: 2022 Frank Westers, 2024 Institute for Common Good Technology
# SPDX-License-Identifier: AGPL-3.0-or-later

wellknown-securitytxt
Empty file.
113 changes: 113 additions & 0 deletions intelmq/bots/experts/securitytxt/expert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# SPDX-FileCopyrightText: 2022 Frank Westers, 2024 Institute for Common Good Technology
#
# SPDX-License-Identifier: AGPL-3.0-or-later

from typing import Optional

import requests

from intelmq.lib.bot import ExpertBot
from intelmq.lib.exceptions import MissingDependencyError

try:
from securitytxt import SecurityTXT
except (ImportError, ModuleNotFoundError):
SecurityTXT = None


class SecurityTXTExpertBot(ExpertBot):
"""
A bot for retrieving contact details from a security.txt
"""
"""
url_field: The field where to find the url which should be searched
contact_field: Field in which to place the found contact details
only_email_address: whether to select only email addresses as contact detail (no web urls)
overwrite: whether to override existing data
check_expired / check_canonical: whether to perform checks on expiry date / canonical urls.
"""
url_field: str = "source.reverse_dns"
contact_field: str = "source.abuse_contact"

only_email_address: bool = True
overwrite: bool = True
check_expired: bool = False
check_canonical: bool = False

def init(self):
if SecurityTXT is None:
raise MissingDependencyError('wellknown-securitytxt')

def process(self):
event = self.receive_message()

try:
self.check_prerequisites(event)
primary_contact = self.get_primary_contact(event.get(self.url_field))
event.add(self.contact_field, primary_contact, overwrite=self.overwrite)
except NotMeetsRequirementsError as e:
self.logger.debug("Skipping event (%s).", e)
except ContactNotFoundError as e:
self.logger.debug("No contact found: %s Continue.", e)

self.send_message(event)
self.acknowledge_message()

def check_prerequisites(self, event) -> None:
"""
Check whether this event should be processed by this bot, or can be skipped.
:param event: The event to evaluate.
"""
if not event.get(self.url_field, False):
raise NotMeetsRequirementsError("The URL field is empty.")
if event.get(self.contact_field, False) and not self.overwrite:
raise NotMeetsRequirementsError("All replace values already set.")

def get_primary_contact(self, url: str) -> Optional[str]:
"""
Given a url, get the file, check it's validity and look for contact details. The primary contact details are
returned. If only_email_address is set to True, it will only return email addresses (no urls).
:param url: The URL on which to look for a security.txt file
:return: The contact information
:raises ContactNotFoundError: if contact cannot be found
"""
try:
securitytxt = SecurityTXT.from_url(url)
if not self.security_txt_is_valid(securitytxt):
raise ContactNotFoundError("SecurityTXT File not valid.")
for contact in securitytxt.contact:
if not self.only_email_address or SecurityTXTExpertBot.is_email_address(contact):
return contact
raise ContactNotFoundError("No contact details found in SecurityTXT.")
except (FileNotFoundError, AttributeError, requests.exceptions.RequestException):
raise ContactNotFoundError("SecurityTXT file could not be found or parsed.")

def security_txt_is_valid(self, securitytxt: SecurityTXT):
"""
Determine whether a security.txt file is valid according to parameters of the bot.
:param securitytxt: The securityTXT object
:return: Whether the securitytxt is valid.
"""
return (not self.check_expired or not securitytxt.expired) and \
(not self.check_canonical or securitytxt.canonical_url())

@staticmethod
def is_email_address(contact: str):
"""
Determine whether the argument is an email address
:param contact: the contact
:return: whether contact is email address
"""
return 'mailto:' in contact or '@' in contact


class NotMeetsRequirementsError(Exception):
pass


class ContactNotFoundError(Exception):
pass


BOT = SecurityTXTExpertBot
Empty file.
98 changes: 98 additions & 0 deletions intelmq/tests/bots/experts/securitytxt/test_expert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# SPDX-FileCopyrightText: 2022 Frank Westers
#
# SPDX-License-Identifier: AGPL-3.0-or-later

# -*- coding: utf-8 -*-
"""
Testing the SecurityTXT Expert Bot
"""

import unittest

import requests_mock

import intelmq.lib.test as test
from intelmq.bots.experts.securitytxt.expert import SecurityTXTExpertBot

EXAMPLE_INPUT_IP = {"__type": "Event",
"source.ip": "192.168.123.123"}

EXPECTED_OUTPUT_IP = {"__type": "Event",
"source.ip": "192.168.123.123",
"source.account": '[email protected]'}

EXAMPLE_INPUT_FQDN = {"__type": "Event",
"source.fqdn": "test.local"}

EXPECTED_OUTPUT_FQDN = {"__type": "Event",
"source.fqdn": "test.local",
"source.abuse_contact": 'test.local/whitehat'}

EXPECTED_OUTPUT_FQDN_NO_CONTACT = {"__type": "Event",
"source.fqdn": "test.local"}

@requests_mock.Mocker()
@test.skip_exotic()
class TestSecurityTXTExpertBot(test.BotTestCase, unittest.TestCase):
"""
A TestCase for the SecurityTXT Expert Bot
"""

@classmethod
def set_bot(cls):
cls.bot_reference = SecurityTXTExpertBot

def test_ip(self, m: requests_mock.Mocker):
self._run_generic_test(securitytxt_url=f"https://{EXAMPLE_INPUT_IP['source.ip']}/.well-known/security.txt",
securitytxt=f"Contact: {EXPECTED_OUTPUT_IP['source.account']}",
input_message=EXAMPLE_INPUT_IP,
output_message=EXPECTED_OUTPUT_IP,
config={'url_field': 'source.ip', 'contact_field': 'source.account',
'only_email_address': False},
m=m)

def test_fqdn(self, m: requests_mock.Mocker):
self._run_generic_test(securitytxt_url=f"https://{EXAMPLE_INPUT_FQDN['source.fqdn']}/.well-known/security.txt",
securitytxt=f"Contact: {EXPECTED_OUTPUT_FQDN['source.abuse_contact']}",
input_message=EXAMPLE_INPUT_FQDN,
output_message=EXPECTED_OUTPUT_FQDN,
config={'url_field': 'source.fqdn', 'contact_field': 'source.abuse_contact',
'only_email_address': False},
m=m)

def test_only_email_address_true(self, m: requests_mock.Mocker):
self._run_generic_test(securitytxt_url=f"https://{EXAMPLE_INPUT_FQDN['source.fqdn']}/.well-known/security.txt",
securitytxt=f"Contact: {EXPECTED_OUTPUT_FQDN['source.abuse_contact']}",
input_message=EXAMPLE_INPUT_FQDN,
output_message=EXPECTED_OUTPUT_FQDN_NO_CONTACT,
config={'url_field': 'source.fqdn', 'contact_field': 'source.abuse_contact',
'only_email_address': True},
m=m)

def test_expired(self, m: requests_mock.Mocker):
self._run_generic_test(securitytxt_url=f"https://{EXAMPLE_INPUT_FQDN['source.fqdn']}/.well-known/security.txt",
securitytxt=f"Contact: {EXPECTED_OUTPUT_FQDN['source.abuse_contact']}\nExpires: 1900-12-31T18:37:07.000Z",
input_message=EXAMPLE_INPUT_FQDN,
output_message=EXPECTED_OUTPUT_FQDN_NO_CONTACT,
config={'url_field': 'source.fqdn', 'contact_field': 'source.abuse_contact',
'only_email_address': False, 'check_expired': True},
m=m)

def test_not_expired(self, m: requests_mock.Mocker):
self._run_generic_test(securitytxt_url=f"https://{EXAMPLE_INPUT_FQDN['source.fqdn']}/.well-known/security.txt",
securitytxt=f"Contact: {EXPECTED_OUTPUT_FQDN['source.abuse_contact']}\nExpires: 3000-12-31T18:37:07.000Z",
input_message=EXAMPLE_INPUT_FQDN,
output_message=EXPECTED_OUTPUT_FQDN,
config={'url_field': 'source.fqdn', 'contact_field': 'source.abuse_contact',
'only_email_address': False, 'check_expired': True},
m=m)

def _run_generic_test(self, m: requests_mock.Mocker, config: dict, securitytxt_url: str, securitytxt: str,
input_message: dict, output_message: dict):
self.sysconfig = config
self.prepare_bot()
m.get(requests_mock.ANY, status_code=404)
m.get(securitytxt_url, text=securitytxt)
self.input_message = input_message
self.run_bot()
self.assertMessageEqual(0, output_message)

0 comments on commit 0d59d28

Please sign in to comment.