forked from CrowdStrike/zscaler-FalconX-integration
-
Notifications
You must be signed in to change notification settings - Fork 0
/
category.py
79 lines (70 loc) · 3.08 KB
/
category.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
from app._util.logger import Logger
from requests.exceptions import HTTPError
import config as config
import requests
import json
import sys
class Category():
def __init__(self, auth):
self.token = auth.get_token()
self.hostname = config.zs_hostname
self.category_check_url = self.hostname + \
"/api/v1/urlCategories?customOnly=true"
self.category_post_url = self.hostname + "/api/v1/urlCategories"
self.cat_name = config.cs_category_name
self.headers = headers = {
'content-type': "application/json",
'cache-control': "no-cache",
'cookie': "JSESSIONID=" + str(self.token)
}
self.payload = {
"configuredName": self.cat_name,
"customCategory": "true",
"superCategory": "USER_DEFINED",
"urls": ["mine.ppxxmr.com:5555"]
}
self.logger = Logger()
def custom_category_check(self):
try:
custom_url_cat = requests.request(
"GET", self.category_check_url, headers=self.headers)
str(custom_url_cat.status_code)
except (requests.exceptions.Timeout, requests.exceptions.TooManyRedirects, requests.exceptions.HTTPError, requests.exceptions.RequestException) as e:
sys.exit()
custom_cats = custom_url_cat.json()
if len(custom_url_cat.json()) == 0:
result = 'No Category Matches'
category_id = 'none found'
custom_urls = 'none found'
return result, category_id, custom_urls
else:
for cat in custom_cats:
if self.cat_name == cat['configuredName']:
result = "Category Exists"
category_id = cat['id']
custom_urls = cat['urls']
self.write_intel_raw(custom_urls, "zscaler_urls.json")
return result, category_id, custom_urls
else:
result = 'No Category Matches'
category_id = 'none found'
custom_urls = 'none found'
self.create_cs_cat()
def create_cs_cat(self):
# payload URLs field needs to include 1 URL to be valid
try:
cs_cat = requests.request(
"POST", url=self.category_post_url, headers=self.headers, data=json.dumps(self.payload))
cs_cat_results = str(cs_cat.status_code)
cs_cat_result = cs_cat.json()
category_id = cs_cat_result['id']
return category_id
except (requests.exceptions.Timeout, requests.exceptions.TooManyRedirects, requests.exceptions.HTTPError, requests.exceptions.RequestException) as e:
self.logger.error(
'Error contacting Zscaler URL category API: ' + str(e))
self.logger.error('System will now exit')
sys.exit()
def write_intel_raw(self, intel, file):
intel_raw = {'urls': intel}
with open("app/zscaler/queuing/" + file, 'w', encoding='utf-8') as f:
json.dump(intel_raw, f, ensure_ascii=False, indent=4)