Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 129 additions & 142 deletions vulture_os/toolkit/api_parser/crowdstrike/crowdstrike.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,12 @@ class CrowdstrikeAPIError(Exception):

class CrowdstrikeParser(ApiParser):
AUTH_URI = "oauth2/token"
DEVICE_URI = "devices/queries/devices/v1"
# DEVICE_URI = "devices/queries/devices/v1"
DETECTION_URI = "detects/queries/detects/v1"
DETECTION_DETAILS_URI = "detects/entities/summaries/GET/v1"
INCIDENT_URI = "incidents/queries/incidents/v1"
INCIDENT_DETAILS_URI = "incidents/entities/incidents/GET/v1"

HEADERS = {
"Content-Type": "application/x-www-form-urlencoded",
'accept': 'application/json'
}

def __init__(self, data):
super().__init__(data)

Expand All @@ -64,29 +59,30 @@ def __init__(self, data):
self.client_secret = data["crowdstrike_client_secret"]
self.client = data["crowdstrike_client"]

self.product = 'crowdstrike'
self.session = None

self.request_incidents = data.get('crowdstrike_request_incidents', False)

self.session = None

if not self.api_host.startswith('https://'):
self.api_host = f"https://{self.api_host}"

self.login()

def login(self):
logger.info(f"[{__parser__}][login]: Login in...", extra={'frontend': str(self.frontend)})
auth_url = f"{self.api_host}/{self.AUTH_URI}"

self.session = requests.session()
self.session.headers.update(self.HEADERS)

payload = {'client_id': self.client_id,
'client_secret': self.client_secret}
try:
# Rate limiting seems to be 10/minute for the authentication endpoint
response = self.session.post(
auth_url,
data=payload,
url=f"{self.api_host}/{self.AUTH_URI}",
data={
'client_id': self.client_id,
'client_secret': self.client_secret
},
headers={
"Content-Type": "application/x-www-form-urlencoded",
"accept": "application/json"
},
timeout=10,
proxies=self.proxies,
verify=self.api_parser_custom_certificate if self.api_parser_custom_certificate else self.api_parser_verify_ssl
Expand All @@ -97,23 +93,21 @@ def login(self):
return False, ('Connection failed')
except requests.exceptions.ReadTimeout:
self.session = None
logger.error(f'[{__parser__}][login]: Connection failed {self.client_id} (read_timeout)', extra={'frontend': str(self.frontend)})
logger.error(f'[{__parser__}][login]: Connection failed {self.client_id} (ReadTimeout)', extra={'frontend': str(self.frontend)})
return False, ('Connection failed')

response.raise_for_status()
if response.status_code not in [200, 201]:
self.session = None
logger.error(f'[{__parser__}][login]: Authentication failed. code {response.status_code}', extra={'frontend': str(self.frontend)})
logger.error(f'[{__parser__}][login]: Authentication failed. Status code {response.status_code}', extra={'frontend': str(self.frontend)})
return False, ('Authentication failed')

ret = response.json()
self.session.headers.update(
{'authorization': f"{ret['token_type'].capitalize()} {ret['access_token']}"})
del self.session.headers['Content-Type']
del self.session.headers['Accept-Encoding']
self.session.headers.update({'authorization': f"{ret['token_type'].capitalize()} {ret['access_token']}"})

return True, self.session


def __execute_query(self, method, url, query, timeout=10):
retry = 3
while(retry > 0):
Expand All @@ -128,11 +122,10 @@ def __execute_query(self, method, url, query, timeout=10):
verify=self.api_parser_custom_certificate if self.api_parser_custom_certificate else self.api_parser_verify_ssl
)
elif(method == "POST"):
headers = {'Content-Type': 'application/json'}
response = self.session.post(
url,
data=json.dumps(query),
headers=headers,
headers={'Content-Type': 'application/json'},
timeout=timeout,
proxies=self.proxies,
verify=self.api_parser_custom_certificate if self.api_parser_custom_certificate else self.api_parser_verify_ssl
Expand All @@ -143,12 +136,13 @@ def __execute_query(self, method, url, query, timeout=10):
break # no error we break from the loop

if response.status_code not in [200, 201]:
logger.error(
f"[{__parser__}][__execute_query]: Error at Crowdstrike API Call URL: {url} Code: {response.status_code} Content: {response.content}", extra={'frontend': str(self.frontend)}
)
return {}
msg = f"[{__parser__}][__execute_query]: Error at Crowdstrike API Call URL: {url} Code: {response.status_code} Content: {response.content}"
logger.error(msg, extra={'frontend': str(self.frontend)})
raise Exception(msg)

return response.json()


def unionDict(self, dictBase, dictToAdd):
finalDict = {}
for k, v in dictBase.items():
Expand All @@ -161,145 +155,101 @@ def unionDict(self, dictBase, dictToAdd):
return finalDict

def execute_query(self, method, url, query={}, timeout=10):
# can set a custom limit of entry we want to retrieve
customLimit = int(query.get('limit', -1))
# query['limit'] = 100

jsonResp = self.__execute_query(method, url, query, timeout=timeout)
# Get first page of logs
jsonResp = self.__execute_query(method, url, query, timeout=timeout)
totalToRetrieve = jsonResp.get('meta', {}).get('pagination', {}).get('total', 0)

while(totalToRetrieve > 0 and totalToRetrieve != len(jsonResp.get('resources', []))):
# we retrieved enough data
if(customLimit > 0 and customLimit <= len(jsonResp['resources'])):
break
query['offset'] = int(jsonResp['meta']['pagination']['offset'])
jsonAdditionalResp = self.__execute_query(
method, url, query, timeout=timeout)
jsonResp = self.unionDict(jsonResp, jsonAdditionalResp)
#jsonResp += [jsonAdditionalResp]
if totalToRetrieve > 0:
# Continue to paginate while totalToRetrieve is different than the length of all logs gathered from successive paginations
# The default page size is 100 (when "limit" parameter is not passed to query, like the case here)
while(totalToRetrieve != len(jsonResp.get('resources', []))):
query['offset'] = len(jsonResp.get('resources', []))

jsonAdditionalResp = self.__execute_query(method, url, query, timeout=timeout)
self.update_lock()

jsonResp = self.unionDict(jsonResp, jsonAdditionalResp)

return jsonResp

def getSummary(self):
nbSensor = self.getSensorsTotal()
version, updated = self.getApplianceVersion()
return nbSensor, version, updated

def getApplianceVersion(self):
return 0, 0

def getSensorsTotal(self):
device_url = f"{self.api_host}/{self.DEVICE_URI}"
ret = self.execute_query('GET', device_url, {'limit': 1})
return int(ret['meta']['pagination']['total'])

def getAlerts(self, since, to):
'''
we retrieve raw incidents and detections
'''
logger.debug(f"[{__parser__}][getAlerts]: From {since} until {to}", extra={'frontend': str(self.frontend)})

finalRawAlerts = []
# first retrieve the detection raw ids
alert_url = f"{self.api_host}/{self.DETECTION_URI}"
payload = {
"filter": f"last_behavior:>'{since}'+last_behavior:<='{to}'",
"sort": "last_behavior|desc"
}
ret = self.execute_query("GET", alert_url, payload)
ids = ret['resources']
if(len(ids) > 0):
# retrieve the content of detection selected
alert_url = f"{self.api_host}/{self.DETECTION_DETAILS_URI}"
payload = {"ids": ids}
ret = self.execute_query("POST", alert_url, payload)

alerts = ret['resources']
for alert in alerts:
finalRawAlerts += [alert]

if self.request_incidents:
# then retrieve the incident raw ids
alert_url = f"{self.api_host}/{self.INCIDENT_URI}"
payload = {
"filter": f"start:>'{since}'+start:<='{to}'",
"sort": "end|desc"
}

ret = self.execute_query("GET", alert_url, payload)
ids = ret['resources']
def get_detections(self, since, to):
logger.debug(f"[{__parser__}][get_detections]: From {since} until {to}", extra={'frontend': str(self.frontend)})

if(len(ids) > 0):
# retrieve the content of selected incidents
alert_url = f"{self.api_host}/{self.INCIDENT_DETAILS_URI}"
payload = {"ids": ids}
ret = self.execute_query("POST", alert_url, payload)
alerts = ret['resources']
for alert in alerts:
finalRawAlerts += [alert]
return finalRawAlerts
detections = []
# first retrieve the detections raw ids
ret = self.execute_query(method="GET",
url=f"{self.api_host}/{self.DETECTION_URI}",
query={
"filter": f"last_behavior:>'{since}'+last_behavior:<='{to}'",
"sort": "last_behavior|desc"
})
ids = ret['resources']
# then retrieve the content of selected detections ids
if(len(ids) > 0):
ret = self.execute_query(method="POST",
url=f"{self.api_host}/{self.DETECTION_DETAILS_URI}",
query={"ids": ids})
ret = ret['resources']
for alert in ret: detections += [alert]
return detections

def get_incidents(self, since, to):
logger.debug(f"[{__parser__}][get_incidents]: From {since} until {to}", extra={'frontend': str(self.frontend)})

incidents = []
# first retrieve the incidents raw ids
ret = self.execute_query(method="GET",
url=f"{self.api_host}/{self.INCIDENT_URI}",
query={
"filter": f"start:>'{since}'+start:<='{to}'",
"sort": "end|desc"
})
ids = ret['resources']
# then retrieve the content of selected incidents ids
if(len(ids) > 0):
ret = self.execute_query(method="POST",
url=f"{self.api_host}/{self.INCIDENT_DETAILS_URI}",
query={"ids": ids})
ret = ret['resources']
for alert in ret: incidents += [alert]
return incidents

def get_logs(self, kind, since, to):
logs = []

msg = f"Querying {kind} from {since}, to {to}"
logger.info(f"[{__parser__}][get_logs]: {msg}", extra={'frontend': str(self.frontend)})

try:
return self.getAlerts(since, to)
get_func_type = getattr(self, f"get_{kind}") # get_detections/get_incidents function getter
logs = get_func_type(since, to)
except Exception as e:
logger.exception(f"[{__parser__}][get_logs]: {e}", extra={'frontend': str(self.frontend)})
raise Exception(f"Error querying {kind} logs")

return logs

def format_log(self, log):
log['kind'] = self.kind
log['observer_version'] = self.observer_version
log['url'] = self.api_host
log['url'] = self.api_host # This static field is mandatory for parser
return json.dumps(log)

def execute(self):
# Retrieve version of cybereason console
_, self.observer_version, _ = self.getSummary()

self.kind = "details"
# Default timestamp is 24 hours ago
since = self.last_api_call or (timezone.now() - datetime.timedelta(days=7))
# Get a batch of 24h at most, to avoid running the parser for too long
# delay the query time of 2 minutes, to avoid missing events
to = min(timezone.now()-timedelta(minutes=2), since + timedelta(hours=24))
to = to.strftime("%Y-%m-%dT%H:%M:%SZ")
since = since.strftime("%Y-%m-%dT%H:%M:%SZ")
tmp_logs = self.get_logs(self.kind, since=since, to=to)

# Downloading may take some while, so refresh token in Redis
self.update_lock()

total = len(tmp_logs)

if total > 0:
logger.info(f"[{__parser__}][execute]: Total logs fetched : {total}", extra={'frontend': str(self.frontend)})

# Logs sorted by timestamp descending, so first is newer
self.frontend.last_api_call = to

elif self.last_api_call < timezone.now()-timedelta(hours=24):
# If no logs where retrieved during the last 24hours,
# move forward 1h to prevent stagnate ad vitam eternam
self.frontend.last_api_call += timedelta(hours=1)

self.write_to_file([self.format_log(log) for log in tmp_logs])

# Writting may take some while, so refresh token in Redis
self.update_lock()

logger.info(f"[{__parser__}][execute]: Parsing done.", extra={'frontend': str(self.frontend)})

def test(self):
try:
logger.debug(f"[{__parser__}][test]:Running tests...", extra={'frontend': str(self.frontend)})
self.login() # establish a session to console

logger.info(f"[{__parser__}][test]:Running tests...", extra={'frontend': str(self.frontend)})

query_time = (timezone.now() - timedelta(days=3)).strftime("%Y-%m-%dT%H:%M:%SZ")
since = (timezone.now() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%SZ")
to = timezone.now().strftime("%Y-%m-%dT%H:%M:%SZ")

logs = self.get_logs("details", query_time, to)
logs = []
logs += self.get_logs("detections", since, to)
if self.request_incidents: logs += self.get_logs("incidents", since, to)

msg = f"{len(logs)} details retrieved"
msg = f"{len(logs)} logs retrieved"
logger.info(f"[{__parser__}][test]: {msg}", extra={'frontend': str(self.frontend)})
return {
"status": True,
Expand All @@ -311,3 +261,40 @@ def test(self):
"status": False,
"error": str(e)
}


def execute(self):
self.login() # establish a session to console

kinds = ["detections"]
if self.request_incidents: kinds.append("incidents")

for kind in kinds:
# Default timestamp is 24 hours ago
since = self.last_collected_timestamps.get(f"crowdstrike_falcon_{kind}") or (timezone.now() - datetime.timedelta(days=7))
# Get a batch of 24h at most, to avoid running queries for too long
# also delay the query time of 3 minutes, to avoid missing events
to = min(timezone.now()-timedelta(minutes=3), since + timedelta(hours=24))
to = to.strftime("%Y-%m-%dT%H:%M:%SZ")
since = since.strftime("%Y-%m-%dT%H:%M:%SZ")

logs = self.get_logs(kind=kind, since=since, to=to)
total = len(logs)

if total > 0:
logger.info(f"[{__parser__}][execute]: Total logs fetched : {total}", extra={'frontend': str(self.frontend)})
self.last_collected_timestamps[f"crowdstrike_falcon_{kind}"] = to

elif self.last_collected_timestamps.get(f"crowdstrike_falcon_{kind}", timezone.now()) < timezone.now()-timedelta(hours=24):
# If no logs where retrieved during the last 24hours,
# move forward 1h to prevent stagnate ad vitam eternam
self.frontend.last_api_call += timedelta(hours=1)
elif not self.last_collected_timestamps.get(f"crowdstrike_falcon_{kind}"):
# If last_collected_timestamps for the kind requested doesn't exists
# (possible in case we don't get logs for the next period of time at first collector execution)
self.last_collected_timestamps[f"crowdstrike_falcon_{kind}"] = to

self.write_to_file([self.format_log(log) for log in logs])
self.update_lock()

logger.info(f"[{__parser__}][execute]: Parsing done.", extra={'frontend': str(self.frontend)})