1
1
import argparse
2
2
import re
3
3
import sys
4
+ import urllib .parse
4
5
5
- from mastodon import Mastodon , StreamListener # type: ignore[import-untyped]
6
+ import requests
7
+ from mastodon import Mastodon , StreamListener
6
8
7
9
from fedivuln import config
8
10
15
17
16
18
# Listener class for handling stream events
17
19
class VulnStreamListener (StreamListener ):
18
- # Regular expression to match CVE, GHSA, and PySec IDs
19
- cve_pattern = re .compile (r"\bCVE-\d{4}-\d{4,}\b" , re .IGNORECASE )
20
- ghsa_pattern = re .compile (
21
- r"GHSA-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}" , re .IGNORECASE
22
- )
23
- pysec_pattern = re .compile (r"PYSEC-\d{4}-\d{2,5}" , re .IGNORECASE )
20
+ def __init__ (self , sighting : bool = False ):
21
+ # Regular expression to match CVE, GHSA, and PySec IDs
22
+ self .vulnerability_pattern = re .compile (
23
+ r"\b(CVE-\d{4}-\d{4,})\b" # CVE pattern
24
+ r"|\b(GHSA-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4})\b" # GHSA pattern
25
+ r"|\b(PYSEC-\d{4}-\d{2,5})\b" , # PYSEC pattern
26
+ re .IGNORECASE ,
27
+ )
28
+ self .sighting = sighting
24
29
25
30
# When a new status (post) is received
26
31
def on_update (self , status ):
27
32
print ("New status received." )
28
33
content = status ["content" ]
29
- if (
30
- self .cve_pattern .search (content )
31
- or self .ghsa_pattern .search (content )
32
- or self .pysec_pattern .search (content )
33
- ):
34
- print ("Vulnerability detected:" )
35
- print (status ) # Prints the full HTML content of the status
34
+ vulnerability_ids = self .vulnerability_pattern .findall (
35
+ content
36
+ ) # Find all matches in the content
37
+ # Filter out empty matches and print any found IDs
38
+ vulnerability_ids = [match for match in vulnerability_ids if match ]
39
+ if vulnerability_ids :
40
+ print ("Vulnerability IDs detected:" )
41
+ print ("Vulnerability IDs found:" , ", " .join (vulnerability_ids ))
42
+ print (status ) # Prints the full status
43
+ if self .sighting :
44
+ push_to_vulnerability_lookup (
45
+ vulnerability_ids
46
+ ) # Push a sighting to Vulnerability Lookup
36
47
else :
37
48
print ("Ignoring." )
38
49
@@ -51,8 +62,29 @@ def on_abort(self, err):
51
62
print ("Stream aborted with error:" , err )
52
63
53
64
54
- # Instantiate the listener
55
- listener = VulnStreamListener ()
65
+ def push_to_vulnerability_lookup (vulnerability_ids ):
66
+ """Create a sighting from an incoming status and push it to the Vulnerability Lookup instance."""
67
+ print ("Pushing sighting to Vulnerability Lookup..." )
68
+ headers_json = {
69
+ "Content-Type" : "application/json" ,
70
+ "accept" : "application/json" ,
71
+ }
72
+ sighting = {"type" : "seen" , "vulnerability" : vulnerability_ids [0 ]}
73
+ try :
74
+ r = requests .post (
75
+ urllib .parse .urljoin (config .vulnerability_lookup_base_url , "sighting/" ),
76
+ json = sighting ,
77
+ headers = headers_json ,
78
+ auth = ("admin" , "token" ),
79
+ )
80
+ if r .status_code not in (200 , 201 ):
81
+ print (
82
+ f"Error when sending POST request to the Vulnerability Lookup server: { r .reason } "
83
+ )
84
+ except requests .exceptions .ConnectionError as e :
85
+ print (
86
+ f"Error when sending POST request to the Vulnerability Lookup server:\n { e } "
87
+ )
56
88
57
89
58
90
def main ():
@@ -67,9 +99,17 @@ def main():
67
99
action = "store_true" ,
68
100
help = "Streams public events." ,
69
101
)
102
+ parser .add_argument (
103
+ "--sighting" ,
104
+ action = "store_true" ,
105
+ help = "Push the sightings to Vulnerability Lookup." ,
106
+ )
70
107
71
108
arguments = parser .parse_args ()
72
109
110
+ # Instantiate the listener
111
+ listener = VulnStreamListener (sighting = arguments .sighting )
112
+
73
113
if arguments .user :
74
114
print ("Starting user stream..." )
75
115
mastodon .stream_user (listener )
0 commit comments