From f0c2d5d654af670ecaea0637fe4b9ae7fbae2087 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9dric=20Bonhomme?= Date: Thu, 7 Nov 2024 11:20:49 +0100 Subject: [PATCH] new: [stream] Added new argument to push the full JSON status object. --- README.md | 7 +++-- fedivuln/stream.py | 70 ++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 64 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 1a638de..5691bd7 100644 --- a/README.md +++ b/README.md @@ -41,13 +41,16 @@ You only have to execute it once. ```bash -$ FediVuln-Stream --user --sighting +$ FediVuln-Stream --user --push-sighting ``` -Using the ``--sighting`` argument, detected vulnerability IDs will be recorded in +Using the ``--push-sighting`` argument, detected vulnerability IDs will be recorded in [Vulnerability Lookup](https://github.com/cve-search/vulnerability-lookup) as [sightings](https://vulnerability-lookup.readthedocs.io/en/latest/sightings.html). +With ``--push-status`` argument, the full JSON status object will be sent to the +Vulnerability Lookup instance and stored in the kvrocks database. + ### Publishing diff --git a/fedivuln/stream.py b/fedivuln/stream.py index 724a254..37ed24b 100644 --- a/fedivuln/stream.py +++ b/fedivuln/stream.py @@ -1,14 +1,23 @@ import argparse +import json import re import sys import urllib.parse +from datetime import datetime import requests from mastodon import Mastodon, StreamListener from fedivuln import config -# print(config.api_base_url) + +# Custom encoder for datetime +class DateTimeEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, datetime): + return obj.isoformat() # Convert datetime to ISO format string + return super().default(obj) + mastodon = Mastodon( client_id=config.mastodon_clientcred, @@ -19,7 +28,7 @@ # Listener class for handling stream events class VulnStreamListener(StreamListener): - def __init__(self, sighting: bool = False): + def __init__(self, push_sighting: bool = False, push_status: bool = False): # Regular expression to match CVE, GHSA, and PySec IDs self.vulnerability_pattern = re.compile( r"\b(CVE-\d{4}-\d{4,})\b" # CVE pattern @@ -31,7 +40,8 @@ def __init__(self, sighting: bool = False): r"|\b(RHSA-\d{4}:\d{4})\b", # RedHat pattern re.IGNORECASE, ) - self.sighting = sighting + self.push_sighting = push_sighting + self.push_status = push_status # When a new status (post) is received def on_update(self, status): @@ -50,10 +60,14 @@ def on_update(self, status): vulnerability_ids = remove_case_insensitive_duplicates(vulnerability_ids) if vulnerability_ids: print("Vulnerability IDs detected:", ", ".join(vulnerability_ids)) - if self.sighting: - push_to_vulnerability_lookup( + if self.push_sighting: + push_sighting_to_vulnerability_lookup( status["uri"], vulnerability_ids - ) # Push a sighting to Vulnerability Lookup + ) # Push the sighting to Vulnerability Lookup + if self.push_status: + push_status_to_vulnerability_lookup( + status, vulnerability_ids + ) # Push the status to Vulnerability Lookup else: print("No ID detected. Ignoring.") @@ -81,7 +95,7 @@ def remove_case_insensitive_duplicates(input_list): return list({item.lower(): item for item in input_list}.values()) -def push_to_vulnerability_lookup(status_uri, vulnerability_ids): +def push_sighting_to_vulnerability_lookup(status_uri, vulnerability_ids): """Create a sighting from an incoming status and push it to the Vulnerability Lookup instance.""" print("Pushing sighting to Vulnerability Lookup...") headers_json = { @@ -107,6 +121,33 @@ def push_to_vulnerability_lookup(status_uri, vulnerability_ids): ) +def push_status_to_vulnerability_lookup(status, vulnerability_ids): + """Push the status to the Vulnerability Lookup instance.""" + print("Pushing status to Vulnerability Lookup...") + headers_json = { + "Content-Type": "application/json", + "accept": "application/json", + "X-API-KEY": f"{config.vulnerability_auth_token}", + } + # status = convert_datetime(status) + json_payload = {"status": status, "vulnerability_ids": vulnerability_ids} + json_string = json.dumps(json_payload, cls=DateTimeEncoder) + try: + r = requests.post( + urllib.parse.urljoin(config.vulnerability_lookup_base_url, "status/"), + data=json_string, + headers=headers_json, + ) + if r.status_code not in (200, 201): + print( + f"Error when sending POST request to the Vulnerability Lookup server: {r.reason}" + ) + except requests.exceptions.ConnectionError as e: + print( + f"Error when sending POST request to the Vulnerability Lookup server:\n{e}" + ) + + def main(): parser = argparse.ArgumentParser(prog="FediVuln-Stream") parser.add_argument( @@ -120,21 +161,28 @@ def main(): help="Streams public events.", ) parser.add_argument( - "--sighting", + "--push-sighting", action="store_true", help="Push the sightings to Vulnerability Lookup.", ) + parser.add_argument( + "--push-status", + action="store_true", + help="Push the status to Vulnerability Lookup.", + ) arguments = parser.parse_args() # Instantiate the listener - listener = VulnStreamListener(sighting=arguments.sighting) + listener = VulnStreamListener( + push_sighting=arguments.push_sighting, push_status=arguments.push_status + ) if arguments.user: - print("Starting user stream...") + print("Starting Mastodon user stream...") mastodon.stream_user(listener) elif arguments.public: - print("Starting local public stream...") + print("Starting Mastodon local public stream...") mastodon.stream_public(listener, remote=True) else: parser.print_help(sys.stderr)