-
Notifications
You must be signed in to change notification settings - Fork 455
/
api_sample.py
executable file
·159 lines (133 loc) · 6.92 KB
/
api_sample.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
from datetime import datetime
from pathlib import Path
from typing import List
from sslyze import (
Scanner,
ServerScanRequest,
SslyzeOutputAsJson,
ServerNetworkLocation,
ScanCommandAttemptStatusEnum,
ServerScanStatusEnum,
ServerScanResult,
ServerScanResultAsJson,
)
from sslyze.errors import ServerHostnameCouldNotBeResolved
from sslyze.scanner.scan_command_attempt import ScanCommandAttempt
def _print_failed_scan_command_attempt(scan_command_attempt: ScanCommandAttempt) -> None:
print(
f"\nError when running ssl_2_0_cipher_suites: {scan_command_attempt.error_reason}:\n"
f"{scan_command_attempt.error_trace}"
)
def main() -> None:
print("=> Starting the scans")
date_scans_started = datetime.utcnow()
# First create the scan requests for each server that we want to scan
try:
all_scan_requests = [
ServerScanRequest(server_location=ServerNetworkLocation(hostname="cloudflare.com")),
ServerScanRequest(server_location=ServerNetworkLocation(hostname="google.com")),
]
except ServerHostnameCouldNotBeResolved:
# Handle bad input ie. invalid hostnames
print("Error resolving the supplied hostnames")
return
# Then queue all the scans
scanner = Scanner()
scanner.queue_scans(all_scan_requests)
# And retrieve and process the results for each server
all_server_scan_results = []
for server_scan_result in scanner.get_results():
all_server_scan_results.append(server_scan_result)
print(f"\n\n****Results for {server_scan_result.server_location.hostname}****")
# Were we able to connect to the server and run the scan?
if server_scan_result.scan_status == ServerScanStatusEnum.ERROR_NO_CONNECTIVITY:
# No we weren't
print(
f"\nError: Could not connect to {server_scan_result.server_location.hostname}:"
f" {server_scan_result.connectivity_error_trace}"
)
continue
# Since we were able to run the scan, scan_result is populated
assert server_scan_result.scan_result
# Process the result of the SSL 2.0 scan command
ssl2_attempt = server_scan_result.scan_result.ssl_2_0_cipher_suites
if ssl2_attempt.status == ScanCommandAttemptStatusEnum.ERROR:
# An error happened when this scan command was run
_print_failed_scan_command_attempt(ssl2_attempt)
elif ssl2_attempt.status == ScanCommandAttemptStatusEnum.COMPLETED:
# This scan command was run successfully
ssl2_result = ssl2_attempt.result
assert ssl2_result
print("\nAccepted cipher suites for SSL 2.0:")
for accepted_cipher_suite in ssl2_result.accepted_cipher_suites:
print(f"* {accepted_cipher_suite.cipher_suite.name}")
# Process the result of the TLS 1.3 scan command
tls1_3_attempt = server_scan_result.scan_result.tls_1_3_cipher_suites
if tls1_3_attempt.status == ScanCommandAttemptStatusEnum.ERROR:
_print_failed_scan_command_attempt(ssl2_attempt)
elif tls1_3_attempt.status == ScanCommandAttemptStatusEnum.COMPLETED:
tls1_3_result = tls1_3_attempt.result
assert tls1_3_result
print("\nAccepted cipher suites for TLS 1.3:")
for accepted_cipher_suite in tls1_3_result.accepted_cipher_suites:
print(f"* {accepted_cipher_suite.cipher_suite.name}")
# Process the result of the certificate info scan command
certinfo_attempt = server_scan_result.scan_result.certificate_info
if certinfo_attempt.status == ScanCommandAttemptStatusEnum.ERROR:
_print_failed_scan_command_attempt(certinfo_attempt)
elif certinfo_attempt.status == ScanCommandAttemptStatusEnum.COMPLETED:
certinfo_result = certinfo_attempt.result
assert certinfo_result
print("\nLeaf certificates deployed:")
for cert_deployment in certinfo_result.certificate_deployments:
leaf_cert = cert_deployment.received_certificate_chain[0]
print(
f"{leaf_cert.public_key().__class__.__name__}: {leaf_cert.subject.rfc4514_string()}"
f" (Serial: {leaf_cert.serial_number})"
)
# etc... Other scan command results to process are in server_scan_result.scan_result
# Lastly, save the all the results to a JSON file
json_file_out = Path("api_sample_results.json")
print(f"\n\n=> Saving scan results to {json_file_out}")
example_json_result_output(json_file_out, all_server_scan_results, date_scans_started, datetime.utcnow())
# And ensure we are able to parse them
print(f"\n\n=> Parsing scan results from {json_file_out}")
example_json_result_parsing(json_file_out)
def example_json_result_output(
json_file_out: Path,
all_server_scan_results: List[ServerScanResult],
date_scans_started: datetime,
date_scans_completed: datetime,
) -> None:
json_output = SslyzeOutputAsJson(
server_scan_results=[ServerScanResultAsJson.model_validate(result) for result in all_server_scan_results],
invalid_server_strings=[], # Not needed here - specific to the CLI interface
date_scans_started=date_scans_started,
date_scans_completed=date_scans_completed,
)
json_output_as_str = json_output.model_dump_json()
json_file_out.write_text(json_output_as_str)
def example_json_result_parsing(results_as_json_file: Path) -> None:
# SSLyze scan results serialized to JSON were saved to this file using --json_out
results_as_json = results_as_json_file.read_text()
# These results can be parsed
parsed_results = SslyzeOutputAsJson.model_validate_json(results_as_json)
# Making it easy to do post-processing and inspection of the results
print("The following servers were scanned:")
for server_scan_result in parsed_results.server_scan_results:
print(f"\n****{server_scan_result.server_location.hostname}:{server_scan_result.server_location.port}****")
if server_scan_result.scan_status == ServerScanStatusEnum.ERROR_NO_CONNECTIVITY:
print(f"That scan failed with the following error:\n{server_scan_result.connectivity_error_trace}")
continue
assert server_scan_result.scan_result
certinfo_attempt = server_scan_result.scan_result.certificate_info
if certinfo_attempt.status == ScanCommandAttemptStatusEnum.ERROR:
_print_failed_scan_command_attempt(certinfo_attempt) # type: ignore
else:
certinfo_result = server_scan_result.scan_result.certificate_info.result
assert certinfo_result
for cert_deployment in certinfo_result.certificate_deployments:
print(f" SHA1 of leaf certificate: {cert_deployment.received_certificate_chain[0].fingerprint_sha1}")
print("")
if __name__ == "__main__":
main()