-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSiemplifyVaultCyberArkPam.py
165 lines (147 loc) · 5.06 KB
/
SiemplifyVaultCyberArkPam.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import sys
import json
import requests
import base64
try:
from urllib.parse import urljoin
except ImportError:
from urlparse import urljoin
from SiemplifyVault import SiemplifyVault
from cryptography.hazmat.primitives.serialization.pkcs12 import load_key_and_certificates
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption
from cryptography.hazmat.backends import default_backend
from typing import Optional
from requests_toolbelt.adapters.x509 import X509Adapter
# CONSTS
CA_CERT_PATH = "cacert.pem"
URLS = {
"get_access_token": "/PasswordVault/API/Auth/CyberArk/Logon",
"get_password": "PasswordVault/API/Accounts/{account_id}/Password/Retrieve/"
}
MAX_RETRIES = 1
GET_TOKEN_TIMEOUT = 60
# vault manager class, includes the logic that contact the remote vault provider
# supports python2.7 and python 3.7
class CyberArkPamManagerError(Exception):
"""
General Exception for CyberArk PAM manager
"""
pass
class CyberArkPamNotFoundError(CyberArkPamManagerError):
"""
Not Found Exception for CyberArk PAM manager
"""
pass
class SiemplifyVaultCyberArkPam(SiemplifyVault):
"""
CyberArk PAM Manager
"""
def __init__(self, vault_settings):
super(SiemplifyVaultCyberArkPam, self).__init__(vault_settings)
self.__set_certificates(
self.client_certificate_passphrase,
self.client_certificate
)
self.__set_verify(self.verify_ssl, self.client_ca_certificate)
self.session.headers.update({
"Content-Type": "application/json",
"Authorization": self.__get_access_token(
self.username, self.password
)
})
def __set_certificates(self, client_certificate_passphrase = None,
client_certificate = None):
if not client_certificate:
return
backend = default_backend()
encoded_cert = base64.b64decode(client_certificate)
encoded_passphrase = (
client_certificate_passphrase.encode("utf-8")
if client_certificate_passphrase is not None
else client_certificate_passphrase
)
decoded_cert = load_key_and_certificates(
data=encoded_cert,
password=encoded_passphrase,
backend=backend
)
cert_bytes = decoded_cert[1].public_bytes(Encoding.DER)
pk_bytes = decoded_cert[0].private_bytes(
encoding=Encoding.DER,
format=PrivateFormat.PKCS8,
encryption_algorithm=NoEncryption()
)
adapter = X509Adapter(
max_retries=MAX_RETRIES,
cert_bytes=cert_bytes,
pk_bytes=pk_bytes,
encoding=Encoding.DER
)
self.session.mount('https://', adapter)
def __set_verify(self, verify_ssl, ca_certificate=None):
"""
Set verify ssl
:param verify_ssl: {bool} True if verify ssl
:param ca_certificate: {str} CA certificate
"""
if verify_ssl and ca_certificate:
ca_cert = base64.b64decode(ca_certificate)
with open(CA_CERT_PATH, "w+") as f:
f.write(ca_cert.decode())
self.session.verify = CA_CERT_PATH
elif verify_ssl:
self.session.verify = True
else:
self.session.verify = False
def __build_full_uri(self, url_key, **kwargs):
"""
Build full uri from url key
:param url_key: {str} The key
:param kwargs: {dict} Variables passed for string formatting
:return: {str} The full uri
"""
return urljoin(self.api_root, URLS[url_key].format(**kwargs))
def __get_access_token(self, username, password):
"""
Get token from CyberArk PAM
:param username
:param password
:return: {str} Token
"""
payload = {
'username': username,
'password': password
}
response = self.session.post(
url=self.__build_full_uri('get_access_token'),
json=payload,
timeout=GET_TOKEN_TIMEOUT
)
self.validate_response(response)
return response.text[1:-1]
@staticmethod
def validate_response(response):
"""
Check for error
"""
try:
response.raise_for_status()
except requests.HTTPError as e:
if e.response.status_code == 404:
raise CyberArkPamNotFoundError(e)
raise CyberArkPamManagerError(e)
def get_password(self, account):
payload = {
"reason": "Chronicle SOAR vault integration"
}
prepared_payload = {
key: value for key, value in payload.items()
if value is not None
}
response = self.session.post(
url=self.__build_full_uri('get_password', account_id=account),
json=prepared_payload
)
self.validate_response(response)
# we get the secret with "", we remove it and return the clean secret
return response.text[1:-1]