Skip to content

Commit

Permalink
new: [stream] Added new argument to push the full JSON status object.
Browse files Browse the repository at this point in the history
  • Loading branch information
cedricbonhomme committed Nov 7, 2024
1 parent e32784a commit f0c2d5d
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 13 deletions.
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,16 @@ You only have to execute it once.


```bash
$ FediVuln-Stream --user --sighting
$ FediVuln-Stream --user --push-sighting
```

Using the ``--sighting`` argument, detected vulnerability IDs will be recorded in
Using the ``--push-sighting`` argument, detected vulnerability IDs will be recorded in
[Vulnerability Lookup](https://github.com/cve-search/vulnerability-lookup) as
[sightings](https://vulnerability-lookup.readthedocs.io/en/latest/sightings.html).

With ``--push-status`` argument, the full JSON status object will be sent to the
Vulnerability Lookup instance and stored in the kvrocks database.


### Publishing

Expand Down
70 changes: 59 additions & 11 deletions fedivuln/stream.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
import argparse
import json
import re
import sys
import urllib.parse
from datetime import datetime

import requests
from mastodon import Mastodon, StreamListener

from fedivuln import config

# print(config.api_base_url)

# Custom encoder for datetime
class DateTimeEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
return obj.isoformat() # Convert datetime to ISO format string
return super().default(obj)


mastodon = Mastodon(
client_id=config.mastodon_clientcred,
Expand All @@ -19,7 +28,7 @@

# Listener class for handling stream events
class VulnStreamListener(StreamListener):
def __init__(self, sighting: bool = False):
def __init__(self, push_sighting: bool = False, push_status: bool = False):
# Regular expression to match CVE, GHSA, and PySec IDs
self.vulnerability_pattern = re.compile(
r"\b(CVE-\d{4}-\d{4,})\b" # CVE pattern
Expand All @@ -31,7 +40,8 @@ def __init__(self, sighting: bool = False):
r"|\b(RHSA-\d{4}:\d{4})\b", # RedHat pattern
re.IGNORECASE,
)
self.sighting = sighting
self.push_sighting = push_sighting
self.push_status = push_status

# When a new status (post) is received
def on_update(self, status):
Expand All @@ -50,10 +60,14 @@ def on_update(self, status):
vulnerability_ids = remove_case_insensitive_duplicates(vulnerability_ids)
if vulnerability_ids:
print("Vulnerability IDs detected:", ", ".join(vulnerability_ids))
if self.sighting:
push_to_vulnerability_lookup(
if self.push_sighting:
push_sighting_to_vulnerability_lookup(
status["uri"], vulnerability_ids
) # Push a sighting to Vulnerability Lookup
) # Push the sighting to Vulnerability Lookup
if self.push_status:
push_status_to_vulnerability_lookup(
status, vulnerability_ids
) # Push the status to Vulnerability Lookup
else:
print("No ID detected. Ignoring.")

Expand Down Expand Up @@ -81,7 +95,7 @@ def remove_case_insensitive_duplicates(input_list):
return list({item.lower(): item for item in input_list}.values())


def push_to_vulnerability_lookup(status_uri, vulnerability_ids):
def push_sighting_to_vulnerability_lookup(status_uri, vulnerability_ids):
"""Create a sighting from an incoming status and push it to the Vulnerability Lookup instance."""
print("Pushing sighting to Vulnerability Lookup...")
headers_json = {
Expand All @@ -107,6 +121,33 @@ def push_to_vulnerability_lookup(status_uri, vulnerability_ids):
)


def push_status_to_vulnerability_lookup(status, vulnerability_ids):
"""Push the status to the Vulnerability Lookup instance."""
print("Pushing status to Vulnerability Lookup...")
headers_json = {
"Content-Type": "application/json",
"accept": "application/json",
"X-API-KEY": f"{config.vulnerability_auth_token}",
}
# status = convert_datetime(status)
json_payload = {"status": status, "vulnerability_ids": vulnerability_ids}
json_string = json.dumps(json_payload, cls=DateTimeEncoder)
try:
r = requests.post(
urllib.parse.urljoin(config.vulnerability_lookup_base_url, "status/"),
data=json_string,
headers=headers_json,
)
if r.status_code not in (200, 201):
print(
f"Error when sending POST request to the Vulnerability Lookup server: {r.reason}"
)
except requests.exceptions.ConnectionError as e:
print(
f"Error when sending POST request to the Vulnerability Lookup server:\n{e}"
)


def main():
parser = argparse.ArgumentParser(prog="FediVuln-Stream")
parser.add_argument(
Expand All @@ -120,21 +161,28 @@ def main():
help="Streams public events.",
)
parser.add_argument(
"--sighting",
"--push-sighting",
action="store_true",
help="Push the sightings to Vulnerability Lookup.",
)
parser.add_argument(
"--push-status",
action="store_true",
help="Push the status to Vulnerability Lookup.",
)

arguments = parser.parse_args()

# Instantiate the listener
listener = VulnStreamListener(sighting=arguments.sighting)
listener = VulnStreamListener(
push_sighting=arguments.push_sighting, push_status=arguments.push_status
)

if arguments.user:
print("Starting user stream...")
print("Starting Mastodon user stream...")
mastodon.stream_user(listener)
elif arguments.public:
print("Starting local public stream...")
print("Starting Mastodon local public stream...")
mastodon.stream_public(listener, remote=True)
else:
parser.print_help(sys.stderr)
Expand Down

0 comments on commit f0c2d5d

Please sign in to comment.