From 4c02dafb10112c55a8feebbe95a79d3c9675e5a3 Mon Sep 17 00:00:00 2001 From: mbonniot Date: Wed, 26 Feb 2025 15:16:19 +0100 Subject: [PATCH] [API_PARSER][CROWDSTRIKE] Clean code (mostly remove unused methods/variables) & add comments --- .../api_parser/crowdstrike/crowdstrike.py | 271 +++++++++--------- 1 file changed, 129 insertions(+), 142 deletions(-) diff --git a/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py b/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py index 92ebe46bb..6660eda92 100644 --- a/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py +++ b/vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py @@ -45,17 +45,12 @@ class CrowdstrikeAPIError(Exception): class CrowdstrikeParser(ApiParser): AUTH_URI = "oauth2/token" - DEVICE_URI = "devices/queries/devices/v1" + # DEVICE_URI = "devices/queries/devices/v1" DETECTION_URI = "detects/queries/detects/v1" DETECTION_DETAILS_URI = "detects/entities/summaries/GET/v1" INCIDENT_URI = "incidents/queries/incidents/v1" INCIDENT_DETAILS_URI = "incidents/entities/incidents/GET/v1" - HEADERS = { - "Content-Type": "application/x-www-form-urlencoded", - 'accept': 'application/json' - } - def __init__(self, data): super().__init__(data) @@ -64,29 +59,30 @@ def __init__(self, data): self.client_secret = data["crowdstrike_client_secret"] self.client = data["crowdstrike_client"] - self.product = 'crowdstrike' - self.session = None - self.request_incidents = data.get('crowdstrike_request_incidents', False) + self.session = None + if not self.api_host.startswith('https://'): self.api_host = f"https://{self.api_host}" - self.login() def login(self): logger.info(f"[{__parser__}][login]: Login in...", extra={'frontend': str(self.frontend)}) - auth_url = f"{self.api_host}/{self.AUTH_URI}" self.session = requests.session() - self.session.headers.update(self.HEADERS) - - payload = {'client_id': self.client_id, - 'client_secret': self.client_secret} try: + # Rate limiting seems to be 10/minute for the authentication endpoint response = self.session.post( - auth_url, - data=payload, + url=f"{self.api_host}/{self.AUTH_URI}", + data={ + 'client_id': self.client_id, + 'client_secret': self.client_secret + }, + headers={ + "Content-Type": "application/x-www-form-urlencoded", + "accept": "application/json" + }, timeout=10, proxies=self.proxies, verify=self.api_parser_custom_certificate if self.api_parser_custom_certificate else self.api_parser_verify_ssl @@ -97,23 +93,21 @@ def login(self): return False, ('Connection failed') except requests.exceptions.ReadTimeout: self.session = None - logger.error(f'[{__parser__}][login]: Connection failed {self.client_id} (read_timeout)', extra={'frontend': str(self.frontend)}) + logger.error(f'[{__parser__}][login]: Connection failed {self.client_id} (ReadTimeout)', extra={'frontend': str(self.frontend)}) return False, ('Connection failed') response.raise_for_status() if response.status_code not in [200, 201]: self.session = None - logger.error(f'[{__parser__}][login]: Authentication failed. code {response.status_code}', extra={'frontend': str(self.frontend)}) + logger.error(f'[{__parser__}][login]: Authentication failed. Status code {response.status_code}', extra={'frontend': str(self.frontend)}) return False, ('Authentication failed') ret = response.json() - self.session.headers.update( - {'authorization': f"{ret['token_type'].capitalize()} {ret['access_token']}"}) - del self.session.headers['Content-Type'] - del self.session.headers['Accept-Encoding'] + self.session.headers.update({'authorization': f"{ret['token_type'].capitalize()} {ret['access_token']}"}) return True, self.session + def __execute_query(self, method, url, query, timeout=10): retry = 3 while(retry > 0): @@ -128,11 +122,10 @@ def __execute_query(self, method, url, query, timeout=10): verify=self.api_parser_custom_certificate if self.api_parser_custom_certificate else self.api_parser_verify_ssl ) elif(method == "POST"): - headers = {'Content-Type': 'application/json'} response = self.session.post( url, data=json.dumps(query), - headers=headers, + headers={'Content-Type': 'application/json'}, timeout=timeout, proxies=self.proxies, verify=self.api_parser_custom_certificate if self.api_parser_custom_certificate else self.api_parser_verify_ssl @@ -143,12 +136,13 @@ def __execute_query(self, method, url, query, timeout=10): break # no error we break from the loop if response.status_code not in [200, 201]: - logger.error( - f"[{__parser__}][__execute_query]: Error at Crowdstrike API Call URL: {url} Code: {response.status_code} Content: {response.content}", extra={'frontend': str(self.frontend)} - ) - return {} + msg = f"[{__parser__}][__execute_query]: Error at Crowdstrike API Call URL: {url} Code: {response.status_code} Content: {response.content}" + logger.error(msg, extra={'frontend': str(self.frontend)}) + raise Exception(msg) + return response.json() + def unionDict(self, dictBase, dictToAdd): finalDict = {} for k, v in dictBase.items(): @@ -161,145 +155,101 @@ def unionDict(self, dictBase, dictToAdd): return finalDict def execute_query(self, method, url, query={}, timeout=10): - # can set a custom limit of entry we want to retrieve - customLimit = int(query.get('limit', -1)) + # query['limit'] = 100 - jsonResp = self.__execute_query(method, url, query, timeout=timeout) + # Get first page of logs + jsonResp = self.__execute_query(method, url, query, timeout=timeout) totalToRetrieve = jsonResp.get('meta', {}).get('pagination', {}).get('total', 0) - while(totalToRetrieve > 0 and totalToRetrieve != len(jsonResp.get('resources', []))): - # we retrieved enough data - if(customLimit > 0 and customLimit <= len(jsonResp['resources'])): - break - query['offset'] = int(jsonResp['meta']['pagination']['offset']) - jsonAdditionalResp = self.__execute_query( - method, url, query, timeout=timeout) - jsonResp = self.unionDict(jsonResp, jsonAdditionalResp) - #jsonResp += [jsonAdditionalResp] + if totalToRetrieve > 0: + # Continue to paginate while totalToRetrieve is different than the length of all logs gathered from successive paginations + # The default page size is 100 (when "limit" parameter is not passed to query, like the case here) + while(totalToRetrieve != len(jsonResp.get('resources', []))): + query['offset'] = len(jsonResp.get('resources', [])) + + jsonAdditionalResp = self.__execute_query(method, url, query, timeout=timeout) + self.update_lock() + + jsonResp = self.unionDict(jsonResp, jsonAdditionalResp) + return jsonResp - def getSummary(self): - nbSensor = self.getSensorsTotal() - version, updated = self.getApplianceVersion() - return nbSensor, version, updated - - def getApplianceVersion(self): - return 0, 0 - - def getSensorsTotal(self): - device_url = f"{self.api_host}/{self.DEVICE_URI}" - ret = self.execute_query('GET', device_url, {'limit': 1}) - return int(ret['meta']['pagination']['total']) - - def getAlerts(self, since, to): - ''' - we retrieve raw incidents and detections - ''' - logger.debug(f"[{__parser__}][getAlerts]: From {since} until {to}", extra={'frontend': str(self.frontend)}) - - finalRawAlerts = [] - # first retrieve the detection raw ids - alert_url = f"{self.api_host}/{self.DETECTION_URI}" - payload = { - "filter": f"last_behavior:>'{since}'+last_behavior:<='{to}'", - "sort": "last_behavior|desc" - } - ret = self.execute_query("GET", alert_url, payload) - ids = ret['resources'] - if(len(ids) > 0): - # retrieve the content of detection selected - alert_url = f"{self.api_host}/{self.DETECTION_DETAILS_URI}" - payload = {"ids": ids} - ret = self.execute_query("POST", alert_url, payload) - - alerts = ret['resources'] - for alert in alerts: - finalRawAlerts += [alert] - - if self.request_incidents: - # then retrieve the incident raw ids - alert_url = f"{self.api_host}/{self.INCIDENT_URI}" - payload = { - "filter": f"start:>'{since}'+start:<='{to}'", - "sort": "end|desc" - } - ret = self.execute_query("GET", alert_url, payload) - ids = ret['resources'] + def get_detections(self, since, to): + logger.debug(f"[{__parser__}][get_detections]: From {since} until {to}", extra={'frontend': str(self.frontend)}) - if(len(ids) > 0): - # retrieve the content of selected incidents - alert_url = f"{self.api_host}/{self.INCIDENT_DETAILS_URI}" - payload = {"ids": ids} - ret = self.execute_query("POST", alert_url, payload) - alerts = ret['resources'] - for alert in alerts: - finalRawAlerts += [alert] - return finalRawAlerts + detections = [] + # first retrieve the detections raw ids + ret = self.execute_query(method="GET", + url=f"{self.api_host}/{self.DETECTION_URI}", + query={ + "filter": f"last_behavior:>'{since}'+last_behavior:<='{to}'", + "sort": "last_behavior|desc" + }) + ids = ret['resources'] + # then retrieve the content of selected detections ids + if(len(ids) > 0): + ret = self.execute_query(method="POST", + url=f"{self.api_host}/{self.DETECTION_DETAILS_URI}", + query={"ids": ids}) + ret = ret['resources'] + for alert in ret: detections += [alert] + return detections + + def get_incidents(self, since, to): + logger.debug(f"[{__parser__}][get_incidents]: From {since} until {to}", extra={'frontend': str(self.frontend)}) + + incidents = [] + # first retrieve the incidents raw ids + ret = self.execute_query(method="GET", + url=f"{self.api_host}/{self.INCIDENT_URI}", + query={ + "filter": f"start:>'{since}'+start:<='{to}'", + "sort": "end|desc" + }) + ids = ret['resources'] + # then retrieve the content of selected incidents ids + if(len(ids) > 0): + ret = self.execute_query(method="POST", + url=f"{self.api_host}/{self.INCIDENT_DETAILS_URI}", + query={"ids": ids}) + ret = ret['resources'] + for alert in ret: incidents += [alert] + return incidents def get_logs(self, kind, since, to): + logs = [] + msg = f"Querying {kind} from {since}, to {to}" logger.info(f"[{__parser__}][get_logs]: {msg}", extra={'frontend': str(self.frontend)}) - try: - return self.getAlerts(since, to) + get_func_type = getattr(self, f"get_{kind}") # get_detections/get_incidents function getter + logs = get_func_type(since, to) except Exception as e: logger.exception(f"[{__parser__}][get_logs]: {e}", extra={'frontend': str(self.frontend)}) raise Exception(f"Error querying {kind} logs") + return logs + def format_log(self, log): - log['kind'] = self.kind - log['observer_version'] = self.observer_version - log['url'] = self.api_host + log['url'] = self.api_host # This static field is mandatory for parser return json.dumps(log) - def execute(self): - # Retrieve version of cybereason console - _, self.observer_version, _ = self.getSummary() - - self.kind = "details" - # Default timestamp is 24 hours ago - since = self.last_api_call or (timezone.now() - datetime.timedelta(days=7)) - # Get a batch of 24h at most, to avoid running the parser for too long - # delay the query time of 2 minutes, to avoid missing events - to = min(timezone.now()-timedelta(minutes=2), since + timedelta(hours=24)) - to = to.strftime("%Y-%m-%dT%H:%M:%SZ") - since = since.strftime("%Y-%m-%dT%H:%M:%SZ") - tmp_logs = self.get_logs(self.kind, since=since, to=to) - - # Downloading may take some while, so refresh token in Redis - self.update_lock() - - total = len(tmp_logs) - - if total > 0: - logger.info(f"[{__parser__}][execute]: Total logs fetched : {total}", extra={'frontend': str(self.frontend)}) - - # Logs sorted by timestamp descending, so first is newer - self.frontend.last_api_call = to - - elif self.last_api_call < timezone.now()-timedelta(hours=24): - # If no logs where retrieved during the last 24hours, - # move forward 1h to prevent stagnate ad vitam eternam - self.frontend.last_api_call += timedelta(hours=1) - - self.write_to_file([self.format_log(log) for log in tmp_logs]) - - # Writting may take some while, so refresh token in Redis - self.update_lock() - - logger.info(f"[{__parser__}][execute]: Parsing done.", extra={'frontend': str(self.frontend)}) def test(self): try: - logger.debug(f"[{__parser__}][test]:Running tests...", extra={'frontend': str(self.frontend)}) + self.login() # establish a session to console + + logger.info(f"[{__parser__}][test]:Running tests...", extra={'frontend': str(self.frontend)}) - query_time = (timezone.now() - timedelta(days=3)).strftime("%Y-%m-%dT%H:%M:%SZ") + since = (timezone.now() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%SZ") to = timezone.now().strftime("%Y-%m-%dT%H:%M:%SZ") - logs = self.get_logs("details", query_time, to) + logs = [] + logs += self.get_logs("detections", since, to) + if self.request_incidents: logs += self.get_logs("incidents", since, to) - msg = f"{len(logs)} details retrieved" + msg = f"{len(logs)} logs retrieved" logger.info(f"[{__parser__}][test]: {msg}", extra={'frontend': str(self.frontend)}) return { "status": True, @@ -311,3 +261,40 @@ def test(self): "status": False, "error": str(e) } + + + def execute(self): + self.login() # establish a session to console + + kinds = ["detections"] + if self.request_incidents: kinds.append("incidents") + + for kind in kinds: + # Default timestamp is 24 hours ago + since = self.last_collected_timestamps.get(f"crowdstrike_falcon_{kind}") or (timezone.now() - datetime.timedelta(days=7)) + # Get a batch of 24h at most, to avoid running queries for too long + # also delay the query time of 3 minutes, to avoid missing events + to = min(timezone.now()-timedelta(minutes=3), since + timedelta(hours=24)) + to = to.strftime("%Y-%m-%dT%H:%M:%SZ") + since = since.strftime("%Y-%m-%dT%H:%M:%SZ") + + logs = self.get_logs(kind=kind, since=since, to=to) + total = len(logs) + + if total > 0: + logger.info(f"[{__parser__}][execute]: Total logs fetched : {total}", extra={'frontend': str(self.frontend)}) + self.last_collected_timestamps[f"crowdstrike_falcon_{kind}"] = to + + elif self.last_collected_timestamps.get(f"crowdstrike_falcon_{kind}", timezone.now()) < timezone.now()-timedelta(hours=24): + # If no logs where retrieved during the last 24hours, + # move forward 1h to prevent stagnate ad vitam eternam + self.frontend.last_api_call += timedelta(hours=1) + elif not self.last_collected_timestamps.get(f"crowdstrike_falcon_{kind}"): + # If last_collected_timestamps for the kind requested doesn't exists + # (possible in case we don't get logs for the next period of time at first collector execution) + self.last_collected_timestamps[f"crowdstrike_falcon_{kind}"] = to + + self.write_to_file([self.format_log(log) for log in logs]) + self.update_lock() + + logger.info(f"[{__parser__}][execute]: Parsing done.", extra={'frontend': str(self.frontend)})