diff --git a/sw/device/tests/penetrationtests/BUILD b/sw/device/tests/penetrationtests/BUILD index c7a38e7d72010..30107fca1b11d 100644 --- a/sw/device/tests/penetrationtests/BUILD +++ b/sw/device/tests/penetrationtests/BUILD @@ -2,7 +2,7 @@ # Licensed under the Apache License, Version 2.0, see LICENSE for details. # SPDX-License-Identifier: Apache-2.0 -load(":pentest.bzl", "pentest_cryptolib_fi_asym", "pentest_cryptolib_fi_sym", "pentest_cryptolib_sca_asym", "pentest_cryptolib_sca_sym", "pentest_fi", "pentest_fi_ibex", "pentest_fi_otbn", "pentest_sca") +load(":pentest.bzl", "pentest_cryptolib_fi_asym", "pentest_cryptolib_fi_gdb_asym", "pentest_cryptolib_fi_gdb_sym", "pentest_cryptolib_fi_sym", "pentest_cryptolib_sca_asym", "pentest_cryptolib_sca_sym", "pentest_fi", "pentest_fi_ibex", "pentest_fi_otbn", "pentest_sca") load("@ot_python_deps//:requirements.bzl", "requirement") package(default_visibility = ["//visibility:public"]) @@ -287,6 +287,17 @@ pentest_cryptolib_fi_asym( test_vectors = CRYPTOLIB_FI_ASYM_TESTVECTOR_TARGETS, ) +pentest_cryptolib_fi_gdb_sym( + name = "fi_sym_cryptolib_python_gdb_test", + tags = [ + "manual", + "skip_in_ci", + ], + test_args = "", + test_harness = "//sw/host/penetrationtests/python/fi:fi_sym_cryptolib_python_gdb_test", + test_vectors = [], +) + pentest_cryptolib_fi_asym( name = "fi_asym_cryptolib_python_test", tags = [], @@ -295,6 +306,17 @@ pentest_cryptolib_fi_asym( test_vectors = [], ) +pentest_cryptolib_fi_gdb_asym( + name = "fi_asym_cryptolib_python_gdb_test", + tags = [ + "manual", + "skip_in_ci", + ], + test_args = "", + test_harness = "//sw/host/penetrationtests/python/fi:fi_asym_cryptolib_python_gdb_test", + test_vectors = [], +) + CRYPTOLIB_SCA_SYM_TESTVECTOR_TARGETS = [ "//sw/host/penetrationtests/testvectors/data:sca_sym_cryptolib", ] diff --git a/sw/device/tests/penetrationtests/doc/README.md b/sw/device/tests/penetrationtests/doc/README.md index 514e3a1f29e00..dfe86bb5e8b60 100644 --- a/sw/device/tests/penetrationtests/doc/README.md +++ b/sw/device/tests/penetrationtests/doc/README.md @@ -67,7 +67,7 @@ cd $REPO_TOP ./bazelisk.sh run //sw/device/tests/penetrationtests:fi_ibex_fpga_cw340_sival_rom_ext ``` -In addition, we have Python scripts to use the pentest framework in //sw/host/penetrationtests/python. +In addition, we have Python scripts to use the pentest framework in `//sw/host/penetrationtests/python`. These scripts are also tested. Use the following command to automatically test the Ibex FI tests on the CW340 FPGA board: @@ -76,6 +76,25 @@ cd $REPO_TOP ./bazelisk.sh run //sw/device/tests/penetrationtests:fi_ibex_python_test_fpga_cw340_sival_rom_ext ``` +## GDB Testing (FiSim) + +The crypto library is tested on a debug enabled CW340 FPGA by tracing several relevant cryptographic calls and apply instruction skips or other fault models to their execution. In order to use this testing, the CW340 FPGA has to be adapted. We provide a picture on its setup as reference: + +![CW340 debug setup](fi_sim_cw340_setup.png) + +In order to run a GDB test, find the "gdb_test" targets in the BUILD file in `//sw/device/tests/penetrationtests`. For example, +```console +./bazelisk.sh run //sw/device/tests/penetrationtests:fi_sym_cryptolib_python_gdb_test_fpga_cw340_rom_ext + +./bazelisk.sh run //sw/device/tests/penetrationtests:fi_asym_cryptolib_python_gdb_test_fpga_cw340_rom_ext +``` + +These tests specifically run in the rom_ext environment since this is a ROM version in the RMA lifecycle which enables debug. The test then builds and runs openocd in the background which opens the default port 3333 to GDB. The files in //sw/host/penetrationtests/python/util contain classes in order to communicate with the FPGA, OpenOCD, and GDB. + +The testing is performed on the flashed pentest framework in this directory which provides the interface to the crypto library in `//sw/device/lib/crypto`. The targeted functions are found parsing the dis file. The parser is also found in `//sw/host/penetrationtests/python/util`. + +All test outputs are saved in the `bazel-testlogs/sw/device/tests/penetrationtests` folder in opentitan. Take note that the output to the terminal is piped to the campaign file in that directory, please consult this file for the test output. This is done since each subtest can take 2-12h. + ## Versioning In `//sw/device/tests/penetrationtests/firmware/lib/pentest_lib.h`, a value PENTEST_VERSION is found with the current version of the pentest framework. diff --git a/sw/device/tests/penetrationtests/doc/fi_sim_cw340_setup.png b/sw/device/tests/penetrationtests/doc/fi_sim_cw340_setup.png new file mode 100644 index 0000000000000..82d8a0117e60f Binary files /dev/null and b/sw/device/tests/penetrationtests/doc/fi_sim_cw340_setup.png differ diff --git a/sw/device/tests/penetrationtests/firmware/fi/cryptolib_fi_asym_impl.c b/sw/device/tests/penetrationtests/firmware/fi/cryptolib_fi_asym_impl.c index 2335f09e5f319..35550edcaa285 100644 --- a/sw/device/tests/penetrationtests/firmware/fi/cryptolib_fi_asym_impl.c +++ b/sw/device/tests/penetrationtests/firmware/fi/cryptolib_fi_asym_impl.c @@ -26,6 +26,9 @@ #define MODULE_ID MAKE_MODULE_ID('f', 'a', 'i') +// Markers in the dis file to be able to trace certain functions +#define PENTEST_MARKER_LABEL(name) asm volatile(#name ":" ::: "memory") + // OAEP label for testing. static const unsigned char kTestLabel[] = "Test label."; static const size_t kTestLabelLen = sizeof(kTestLabel) - 1; @@ -403,12 +406,14 @@ status_t cryptolib_fi_rsa_sign_impl( // Trigger window. if (uj_input.trigger & kPentestTrigger3) { + PENTEST_MARKER_LABEL(PENTEST_MARKER_RSA_SIGN_START); pentest_set_trigger_high(); } TRY(otcrypto_rsa_sign(&private_key, msg_digest, padding_mode, sig_buf)); // Trigger window. if (uj_input.trigger & kPentestTrigger3) { pentest_set_trigger_low(); + PENTEST_MARKER_LABEL(PENTEST_MARKER_RSA_SIGN_END); } // Return data back to host. @@ -561,13 +566,16 @@ status_t cryptolib_fi_rsa_verify_impl( hardened_bool_t verification_result; // Trigger window. if (uj_input.trigger & kPentestTrigger3) { + PENTEST_MARKER_LABEL(PENTEST_MARKER_RSA_VERIFY_START); pentest_set_trigger_high(); } - TRY(otcrypto_rsa_verify(&public_key, msg_digest, padding_mode, sig, - &verification_result)); + status_t status = otcrypto_rsa_verify(&public_key, msg_digest, padding_mode, + sig, &verification_result); if (uj_input.trigger & kPentestTrigger3) { pentest_set_trigger_low(); + PENTEST_MARKER_LABEL(PENTEST_MARKER_RSA_VERIFY_END); } + TRY(status); // Return data back to host. uj_output->result = true; @@ -633,9 +641,11 @@ status_t cryptolib_fi_p256_ecdh_impl( .keyblob = shared_secretblob, }; + PENTEST_MARKER_LABEL(PENTEST_MARKER_P256_ECDH_START); pentest_set_trigger_high(); TRY(otcrypto_ecdh_p256(&private_key, &public_key, &shared_secret)); pentest_set_trigger_low(); + PENTEST_MARKER_LABEL(PENTEST_MARKER_P256_ECDH_END); uint32_t share0[kPentestP256Words]; uint32_t share1[kPentestP256Words]; @@ -726,6 +736,7 @@ status_t cryptolib_fi_p256_sign_impl( // Trigger window 1. if (uj_input.trigger == 1) { + PENTEST_MARKER_LABEL(PENTEST_MARKER_P256_SIGN_START); pentest_set_trigger_high(); } // Sign the message. @@ -733,6 +744,7 @@ status_t cryptolib_fi_p256_sign_impl( signature_mut)); if (uj_input.trigger == 1) { pentest_set_trigger_low(); + PENTEST_MARKER_LABEL(PENTEST_MARKER_P256_SIGN_END); } // Return data back to host. @@ -790,10 +802,12 @@ status_t cryptolib_fi_p256_verify_impl( hardened_bool_t verification_result = kHardenedBoolFalse; + PENTEST_MARKER_LABEL(PENTEST_MARKER_P256_VERIFY_START); pentest_set_trigger_high(); TRY(otcrypto_ecdsa_p256_verify(&public_key, message_digest, signature, &verification_result)); pentest_set_trigger_low(); + PENTEST_MARKER_LABEL(PENTEST_MARKER_P256_VERIFY_END); // Return data back to host. uj_output->result = true; @@ -859,9 +873,11 @@ status_t cryptolib_fi_p384_ecdh_impl( .keyblob = shared_secretblob, }; + PENTEST_MARKER_LABEL(PENTEST_MARKER_P384_ECDH_START); pentest_set_trigger_high(); TRY(otcrypto_ecdh_p384(&private_key, &public_key, &shared_secret)); pentest_set_trigger_low(); + PENTEST_MARKER_LABEL(PENTEST_MARKER_P384_ECDH_END); uint32_t share0[kPentestP384Words]; uint32_t share1[kPentestP384Words]; @@ -952,12 +968,14 @@ status_t cryptolib_fi_p384_sign_impl( // Trigger window 1. if (uj_input.trigger == 1) { + PENTEST_MARKER_LABEL(PENTEST_MARKER_P384_SIGN_START); pentest_set_trigger_high(); } TRY(otcrypto_ecdsa_p384_sign_verify(&private_key, &public_key, message_digest, signature_mut)); if (uj_input.trigger == 1) { pentest_set_trigger_low(); + PENTEST_MARKER_LABEL(PENTEST_MARKER_P384_SIGN_END); } // Return data back to host. @@ -1015,10 +1033,12 @@ status_t cryptolib_fi_p384_verify_impl( hardened_bool_t verification_result = kHardenedBoolFalse; + PENTEST_MARKER_LABEL(PENTEST_MARKER_P384_VERIFY_START); pentest_set_trigger_high(); TRY(otcrypto_ecdsa_p384_verify(&public_key, message_digest, signature, &verification_result)); pentest_set_trigger_low(); + PENTEST_MARKER_LABEL(PENTEST_MARKER_P384_VERIFY_END); // Return data back to host. uj_output->result = true; diff --git a/sw/device/tests/penetrationtests/firmware/fi/cryptolib_fi_sym_impl.c b/sw/device/tests/penetrationtests/firmware/fi/cryptolib_fi_sym_impl.c index fdf3a39a2b426..ab72906b90ad0 100644 --- a/sw/device/tests/penetrationtests/firmware/fi/cryptolib_fi_sym_impl.c +++ b/sw/device/tests/penetrationtests/firmware/fi/cryptolib_fi_sym_impl.c @@ -22,6 +22,9 @@ #define MODULE_ID MAKE_MODULE_ID('c', 'f', 's') +// Markers in the dis file to be able to trace certain functions +#define PENTEST_MARKER_LABEL(name) asm volatile(#name ":" ::: "memory") + status_t cryptolib_fi_aes_impl(cryptolib_fi_sym_aes_in_t uj_input, cryptolib_fi_sym_aes_out_t *uj_output) { // Set the AES mode. @@ -134,9 +137,11 @@ status_t cryptolib_fi_aes_impl(cryptolib_fi_sym_aes_in_t uj_input, }; // Trigger window. + PENTEST_MARKER_LABEL(PENTEST_MARKER_AES_START); pentest_set_trigger_high(); TRY(otcrypto_aes(&key, iv, mode, op, input, padding, output)); pentest_set_trigger_low(); + PENTEST_MARKER_LABEL(PENTEST_MARKER_AES_END); // Return data back to host. uj_output->data_len = padded_len_bytes; @@ -168,11 +173,13 @@ status_t cryptolib_fi_drbg_generate_impl( // Trigger window 0. if (uj_input.trigger & kPentestTrigger2) { + PENTEST_MARKER_LABEL(PENTEST_MARKER_DRBG_GENERATE_START); pentest_set_trigger_high(); } TRY(otcrypto_drbg_generate(nonce, output)); if (uj_input.trigger & kPentestTrigger2) { pentest_set_trigger_low(); + PENTEST_MARKER_LABEL(PENTEST_MARKER_DRBG_GENERATE_END); } // Return data back to host. @@ -197,11 +204,13 @@ status_t cryptolib_fi_drbg_reseed_impl( // Trigger window 0. if (uj_input.trigger & kPentestTrigger1) { + PENTEST_MARKER_LABEL(PENTEST_MARKER_DRBG_RESEED_START); pentest_set_trigger_high(); } TRY(otcrypto_drbg_instantiate(entropy)); if (uj_input.trigger & kPentestTrigger1) { pentest_set_trigger_low(); + PENTEST_MARKER_LABEL(PENTEST_MARKER_DRBG_RESEED_END); } // Return data back to host. @@ -298,10 +307,12 @@ status_t cryptolib_fi_gcm_impl(cryptolib_fi_sym_gcm_in_t uj_input, } // Trigger window. + PENTEST_MARKER_LABEL(PENTEST_MARKER_GCM_ENCRYPT_START); pentest_set_trigger_high(); TRY(otcrypto_aes_gcm_encrypt(&key, plaintext, iv, aad, tag_len, actual_ciphertext, actual_tag)); pentest_set_trigger_low(); + PENTEST_MARKER_LABEL(PENTEST_MARKER_GCM_ENCRYPT_END); // Return data back to host. uj_output->cfg = 0; @@ -385,9 +396,11 @@ status_t cryptolib_fi_hmac_impl(cryptolib_fi_sym_hmac_in_t uj_input, }; // Trigger window. + PENTEST_MARKER_LABEL(PENTEST_MARKER_HMAC_START); pentest_set_trigger_high(); TRY(otcrypto_hmac(&key, input_message, tag)); pentest_set_trigger_low(); + PENTEST_MARKER_LABEL(PENTEST_MARKER_HMAC_END); // Return data back to host. uj_output->data_len = tag_bytes; diff --git a/sw/device/tests/penetrationtests/pentest.bzl b/sw/device/tests/penetrationtests/pentest.bzl index b28460ea489ad..a162594667070 100644 --- a/sw/device/tests/penetrationtests/pentest.bzl +++ b/sw/device/tests/penetrationtests/pentest.bzl @@ -24,6 +24,10 @@ PENTEST_EXEC_ENVS = { "//hw/top_earlgrey:fpga_cw340_rom_ext": "fpga_cw340", } | EARLGREY_SILICON_OWNER_ROM_EXT_ENVS +GDB_EXEC_ENVS = { + "//hw/top_earlgrey:fpga_cw340_rom_ext": "fpga_cw340", +} + FIRMWARE_DEPS_FI = [ "//sw/device/tests/penetrationtests/firmware/fi:alert_fi", "//sw/device/tests/penetrationtests/firmware/fi:crypto_fi", @@ -388,6 +392,50 @@ def pentest_cryptolib_fi_sym(name, test_vectors, test_args, test_harness, tags): deps = FIRMWARE_DEPS_CRYPTOLIB_FI_SYM, ) +def pentest_cryptolib_fi_gdb_sym(name, test_vectors, test_args, test_harness, tags): + """A macro for defining a CryptoTest test case. + + Args: + name: the name of the test. + test_vectors: the test vectors to use. + test_args: additional arguments to pass to the test. + test_harness: the test harness to use. + tags: indicate the tags for CI. + """ + opentitan_test( + name = name, + srcs = ["//sw/device/tests/penetrationtests/firmware:firmware_cryptolib_fi_sym.c"], + fpga = fpga_params( + timeout = "long", + data = test_vectors, + tags = tags, + test_cmd = """ + --bootstrap={firmware} + """ + test_args, + test_harness = test_harness, + ), + fpga_cw340 = fpga_params( + timeout = "long", + tags = tags, + data = test_vectors, + test_cmd = """ + --bootstrap={firmware} + """ + test_args, + test_harness = test_harness, + ), + exec_env = GDB_EXEC_ENVS, + silicon = silicon_params( + timeout = "eternal", + tags = tags, + data = test_vectors, + test_cmd = """ + --bootstrap={firmware} + """ + test_args, + test_harness = test_harness, + ), + deps = FIRMWARE_DEPS_CRYPTOLIB_FI_SYM, + ) + def pentest_cryptolib_fi_asym(name, test_vectors, test_args, test_harness, tags): """A macro for defining a CryptoTest test case. @@ -432,6 +480,50 @@ def pentest_cryptolib_fi_asym(name, test_vectors, test_args, test_harness, tags) deps = FIRMWARE_DEPS_CRYPTOLIB_FI_ASYM, ) +def pentest_cryptolib_fi_gdb_asym(name, test_vectors, test_args, test_harness, tags): + """A macro for defining a CryptoTest test case. + + Args: + name: the name of the test. + test_vectors: the test vectors to use. + test_args: additional arguments to pass to the test. + test_harness: the test harness to use. + tags: indicate the tags for CI. + """ + opentitan_test( + name = name, + srcs = ["//sw/device/tests/penetrationtests/firmware:firmware_cryptolib_fi_asym.c"], + fpga = fpga_params( + timeout = "long", + data = test_vectors, + tags = tags, + test_cmd = """ + --bootstrap={firmware} + """ + test_args, + test_harness = test_harness, + ), + fpga_cw340 = fpga_params( + timeout = "long", + tags = tags, + data = test_vectors, + test_cmd = """ + --bootstrap={firmware} + """ + test_args, + test_harness = test_harness, + ), + exec_env = GDB_EXEC_ENVS, + silicon = silicon_params( + timeout = "eternal", + tags = tags, + data = test_vectors, + test_cmd = """ + --bootstrap={firmware} + """ + test_args, + test_harness = test_harness, + ), + deps = FIRMWARE_DEPS_CRYPTOLIB_FI_ASYM, + ) + def pentest_cryptolib_sca_sym(name, test_vectors, test_args, test_harness, tags): """A macro for defining a CryptoTest test case. diff --git a/sw/host/penetrationtests/python/fi/BUILD b/sw/host/penetrationtests/python/fi/BUILD index c9af46cfdf9da..9e455fb9a0918 100644 --- a/sw/host/penetrationtests/python/fi/BUILD +++ b/sw/host/penetrationtests/python/fi/BUILD @@ -150,6 +150,28 @@ py_binary( ], ) +py_binary( + name = "fi_sym_cryptolib_python_gdb_test", + testonly = True, + srcs = ["gdb_testing/fi_sym_cryptolib_python_gdb_test.py"], + data = [ + "//sw/host/opentitantool", + "//third_party/openocd:openocd_bin", + "//util/openocd/board:cw340_ftdi.cfg", + "//util/openocd/target:lowrisc-earlgrey.cfg", + "@lowrisc_rv32imcb_toolchain//:bin/riscv32-unknown-elf-gdb", + ], + deps = [ + "//sw/host/penetrationtests/python/fi:fi_sym_cryptolib_commands", + "//sw/host/penetrationtests/python/util:common_library", + "//sw/host/penetrationtests/python/util:dis_parser", + "//sw/host/penetrationtests/python/util:gdb_controller", + "//sw/host/penetrationtests/python/util:targets", + "@rules_python//python/runfiles", + requirement("pycryptodome"), + ], +) + py_binary( name = "fi_asym_cryptolib_python_test", testonly = True, @@ -168,6 +190,28 @@ py_binary( ], ) +py_binary( + name = "fi_asym_cryptolib_python_gdb_test", + testonly = True, + srcs = ["gdb_testing/fi_asym_cryptolib_python_gdb_test.py"], + data = [ + "//sw/host/opentitantool", + "//third_party/openocd:openocd_bin", + "//util/openocd/board:cw340_ftdi.cfg", + "//util/openocd/target:lowrisc-earlgrey.cfg", + "@lowrisc_rv32imcb_toolchain//:bin/riscv32-unknown-elf-gdb", + ], + deps = [ + "//sw/host/penetrationtests/python/fi:fi_asym_cryptolib_commands", + "//sw/host/penetrationtests/python/util:common_library", + "//sw/host/penetrationtests/python/util:dis_parser", + "//sw/host/penetrationtests/python/util:gdb_controller", + "//sw/host/penetrationtests/python/util:targets", + "@rules_python//python/runfiles", + requirement("pycryptodome"), + ], +) + py_library( name = "fi_ibex_functions", srcs = ["host_scripts/fi_ibex_functions.py"], diff --git a/sw/host/penetrationtests/python/fi/gdb_testing/fi_asym_cryptolib_python_gdb_test.py b/sw/host/penetrationtests/python/fi/gdb_testing/fi_asym_cryptolib_python_gdb_test.py new file mode 100644 index 0000000000000..3ebcb12f6d0c0 --- /dev/null +++ b/sw/host/penetrationtests/python/fi/gdb_testing/fi_asym_cryptolib_python_gdb_test.py @@ -0,0 +1,1417 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +# What to do when running into errors: +# - If device is busy or seeing "rejected 'gdb' connection, no more connections allowed", +# cut the USB connection, e.g., sudo fuser /dev/ttyUSB0 and kill the PID +# - If the port is busy check sudo lsof -i :3333 and then kill the PID + +from sw.host.penetrationtests.python.fi.communication.fi_asym_cryptolib_commands import ( + OTFIAsymCrypto, +) +from python.runfiles import Runfiles +from sw.host.penetrationtests.python.util import targets +from sw.host.penetrationtests.python.util import common_library +from sw.host.penetrationtests.python.util.gdb_controller import GDBController +from sw.host.penetrationtests.python.util.dis_parser import DisParser +from collections import Counter +import json +import argparse +import unittest +import sys +import os +import time +from Crypto.PublicKey import RSA, ECC +from Crypto.Signature import pkcs1_15, DSS +from Crypto.Hash import SHA256, SHA384 + +ignored_keys_set = set(["status"]) +opentitantool_path = "" +log_dir = "" +elf_path = "" + +# We set to only apply instruction skips in the first +# MAX_SKIPS_PER_LOOP iterations of a loop +MAX_SKIPS_PER_LOOP = 3 + +target = None +asymfi = None + +# Read in the extra arguments from the opentitan_test. +parser = argparse.ArgumentParser() +parser.add_argument("--bitstream", type=str) +parser.add_argument("--bootstrap", type=str) + +args, config_args = parser.parse_known_args() + +BITSTREAM = args.bitstream +BOOTSTRAP = args.bootstrap + +original_stdout = sys.stdout + + +class AsymCryptolibFiSim(unittest.TestCase): + def test_p384_verify(self): + print("Starting the p384 verify test") + # Preparing the input for an invalid signature + key = ECC.generate(curve="P-384") + pubx = [x for x in key.pointQ.x.to_bytes(48, "little")] + puby = [x for x in key.pointQ.y.to_bytes(48, "little")] + message = [i for i in range(16)] + signer = DSS.new(key, "fips-186-3") + h = SHA384.new(bytes(message)) + signature = [x for x in signer.sign(h)] + # Corrupt the signature for FiSim Testing + signature[0] ^= 0x1 + r_bytes = signature[:48] + s_bytes = signature[48:] + r_bytes.reverse() + s_bytes.reverse() + cfg = 0 + trigger = 1 + h = SHA384.new(bytes(message)) + message_digest = [x for x in h.digest()] + + # Directory for the trace log files + pc_trace_file = os.path.join(log_dir, "p384_verify_pc_trace.log") + # Directory for the output results + test_results_file = os.path.join(log_dir, "p384_verify_test_results.log") + # Directory for the the log of the campaign + campaign_file = os.path.join(log_dir, "p384_verify_test_campaign.log") + + successful_faults = 0 + total_attacks = 0 + + def trigger_testos_init(print_output=True): + # Initializing the testOS (setting up the alerts and accelerators) + (device_id, _, _, _, _, _, _) = asymfi.init( + alert_config=common_library.no_escalation_alert_config + ) + if print_output: + print("Output from init ", device_id) + + def read_testos_output(): + # Read the output from the operation + response = target.read_response(max_tries=1000) + return response + + gdb = None + started = False + with open(test_results_file, "w") as test_results, open(campaign_file, "w") as campaign: + print(f"Switching terminal output to {campaign_file}", flush=True) + sys.stdout = campaign + try: + # Program the bitstream, flash the target, and set up OpenOCD + target.initialize_target() + + # Initialize the testOS + trigger_testos_init() + + # Connect to GDB + gdb = GDBController(gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path) + + # We provide the name of the unique marker in the pentest framework + function_name = "PENTEST_MARKER_P384_VERIFY" + # Gives back an array of hits where the function is called + trace_address = parser.get_marker_addresses(function_name) + print("Start and stop addresses of ", function_name, ": ", trace_address) + + crash_observation_address = parser.get_function_start_address( + "ottf_exception_handler" + ) + + # Start the tracing + # We set a short timeout to detect whether GDB has connected properly + # and a long timeout for the entire tracing + initial_timeout = 10 + total_timeout = 60 * 60 * 5 + + gdb.setup_pc_trace(pc_trace_file, trace_address[0], trace_address[1]) + gdb.send_command("c", check_response=False) + + # Trigger the p384 verify from the testOS (we do not read its output) + asymfi.handle_p384_verify( + pubx, puby, r_bytes, s_bytes, message_digest, cfg, trigger + ) + + start_time = time.time() + initial_timeout_stopped = False + total_timeout_stopped = False + + # Run the tracing to get the trace log + while time.time() - start_time < initial_timeout: + output = gdb.read_output() + if "breakpoint 1, " in output: + initial_timeout_stopped = True + break + if not initial_timeout_stopped: + print("No initial break point found, can be a misfire, try again") + sys.exit(1) + while time.time() - start_time < total_timeout: + output = gdb.read_output() + if "PC trace complete" in output: + print("\nTrace complete") + total_timeout_stopped = True + break + if not total_timeout_stopped: + print("Final tracing timeout reached") + sys.exit(1) + + # Parse and truncate the trace log to get all PCs in a list + pc_list = gdb.parse_pc_trace_file(pc_trace_file) + # Get the unique PCs and annotate their occurence count + pc_count_dict = Counter(pc_list) + if len(pc_count_dict) <= 0: + print("Found no tracing, stopping") + sys.exit(1) + print("Tracing has a total of", len(pc_count_dict), "unique PCs", flush=True) + + # Reset the target, flush the output, and close gdb + gdb.close_gdb() + target.close_openocd() + target.initialize_target() + trigger_testos_init() + target.dump_all() + gdb = GDBController(gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path) + + started = True + for pc, count in pc_count_dict.items(): + i_count = 0 + while i_count < min(MAX_SKIPS_PER_LOOP, count): + print("-" * 80) + print("Applying instruction skip in ", pc, "occurence", i_count) + print("-" * 80) + + crash_observation = "crash detected" + + try: + # The observation points + observations = { + # Crash check + crash_observation_address: f"{crash_observation}", + } + gdb.add_observation(observations) + + gdb.apply_instruction_skip( + pc, parser.parse_next_instruction(pc), i_count + ) + gdb.send_command("c", check_response=False) + + # The instruction skip loop + asymfi.handle_p384_verify( + pubx, puby, r_bytes, s_bytes, message_digest, cfg, trigger + ) + testos_response = read_testos_output() + + gdb_response = gdb.read_output() + if "instruction skip applied" in gdb_response: + i_count += 1 + total_attacks += 1 + + if crash_observation in gdb_response: + print("Crash detected, resetting", flush=True) + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + gdb.reset_target() + target.dump_all() + trigger_testos_init(print_output=False) + # Reset again + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + else: + testos_response_json = json.loads(testos_response) + print("Output:", testos_response_json, flush=True) + verification_result = testos_response_json["result"] + verification_status = testos_response_json["status"] + if verification_result and (verification_status == 0): + successful_faults += 1 + print("-" * 80) + print("Successful FI attack!") + print("Location:", pc, "iteration", i_count - 1) + print(gdb_response) + print("Response:", testos_response_json) + print("-" * 80) + test_results.write( + f"{pc}, {i_count - 1}: {testos_response_json}\n" + ) + # Reset GDB by closing and opening again + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + # We do not need to reset the target since it gave an output + else: + print("No break point found, something went wrong", flush=True) + gdb.close_gdb() + target.close_openocd() + target.initialize_target() + trigger_testos_init() + target.dump_all() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + + except json.JSONDecodeError: + print( + "Error: JSON decoding failed. Invalid response format", + flush=True, + ) + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + gdb.reset_target() + target.dump_all() + trigger_testos_init(print_output=False) + # Reset again + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + gdb.dump_output() + target.dump_all() + + except TimeoutError as e: + print("Timeout error, retrying", flush=True) + print(e, flush=True) + gdb.close_gdb() + target.close_openocd() + target.initialize_target() + trigger_testos_init() + target.dump_all() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + + finally: + print("-" * 80) + print("Trace data is logged in ", pc_trace_file) + print("The campaign is logged in ", campaign_file) + print("Instruction skip results are logged in ", test_results_file) + print(f"Total attacks {total_attacks}, successful attacks {successful_faults}") + print("You can find the dissassembly in ", dis_path) + # Close the OpenOCD and GDB connection at the end + if gdb: + gdb.close_gdb() + target.close_openocd() + test_results.write( + f"Total attacks {total_attacks}, successful attacks {successful_faults}\n" + ) + sys.stdout = original_stdout + print("Trace data is logged in ", pc_trace_file) + print("The campaign is logged in ", campaign_file) + print("Instruction skip results are logged in ", test_results_file) + print(f"Total attacks {total_attacks}, successful attacks {successful_faults}") + print("You can find the dissassembly in ", dis_path) + self.assertEqual(successful_faults, 0) + self.assertEqual(started, True) + + def test_p384_ecdh(self): + print("Starting the p384 ecdh test") + # Preparing the input for an invalid signature + private_key1 = ECC.generate(curve="P-384") + private_key_array1 = [x for x in private_key1.d.to_bytes(48, "little")] + private_key2 = ECC.generate(curve="P-384") + private_key_array2 = [x for x in private_key2.d.to_bytes(48, "little")] + private_key_array = [private_key_array1, private_key_array2] + key = ECC.generate(curve="P-384") + public_point = key.pointQ + public_x = [x for x in public_point.x.to_bytes(48, "little")] + public_y = [x for x in public_point.y.to_bytes(48, "little")] + cfg = 0 + trigger = 0 + + # Directory for the trace log files + pc_trace_file = os.path.join(log_dir, "p384_ecdh_pc_trace.log") + # Directory for the output results + test_results_file = os.path.join(log_dir, "p384_ecdh_test_results.log") + # Directory for the the log of the campaign + campaign_file = os.path.join(log_dir, "p384_ecdh_test_campaign.log") + + successful_faults = 0 + total_attacks = 0 + + def trigger_testos_init(print_output=True): + # Initializing the testOS (setting up the alerts and accelerators) + (device_id, _, _, _, _, _, _) = asymfi.init( + alert_config=common_library.no_escalation_alert_config + ) + if print_output: + print("Output from init ", device_id) + + def read_testos_output(): + # Read the output from the operation + response = target.read_response(max_tries=1000) + return response + + ecdh_output = [None, None] + + gdb = None + started = False + with open(test_results_file, "w") as test_results, open(campaign_file, "w") as campaign: + print(f"Switching terminal output to {campaign_file}", flush=True) + sys.stdout = campaign + try: + # Program the bitstream, flash the target, and set up OpenOCD + target.initialize_target() + + # Initialize the testOS + trigger_testos_init() + + # Connect to GDB + gdb = GDBController(gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path) + + # We provide the name of the unique marker in the pentest framework + function_name = "PENTEST_MARKER_P384_ECDH" + # Gives back an array of hits where the function is called + trace_address = parser.get_marker_addresses(function_name) + print("Start and stop addresses of ", function_name, ": ", trace_address) + + crash_observation_address = parser.get_function_start_address( + "ottf_exception_handler" + ) + + # Start the tracing + # We set a short timeout to detect whether GDB has connected properly + # and a long timeout for the entire tracing + initial_timeout = 10 + total_timeout = 60 * 60 * 5 + + gdb.setup_pc_trace(pc_trace_file, trace_address[0], trace_address[1]) + gdb.send_command("c", check_response=False) + + # Trigger the p384 verify from the testOS (we do not read its output) + asymfi.handle_p384_ecdh(private_key_array[0], public_x, public_y, cfg, trigger) + + start_time = time.time() + initial_timeout_stopped = False + total_timeout_stopped = False + + # Run the tracing to get the trace log + while time.time() - start_time < initial_timeout: + output = gdb.read_output() + if "breakpoint 1, " in output: + initial_timeout_stopped = True + break + if not initial_timeout_stopped: + print("No initial break point found, can be a misfire, try again") + sys.exit(1) + while time.time() - start_time < total_timeout: + output = gdb.read_output() + if "PC trace complete" in output: + print("\nTrace complete") + total_timeout_stopped = True + break + if not total_timeout_stopped: + print("Final tracing timeout reached") + sys.exit(1) + + # Parse and truncate the trace log to get all PCs in a list + pc_list = gdb.parse_pc_trace_file(pc_trace_file) + # Get the unique PCs and annotate their occurence count + pc_count_dict = Counter(pc_list) + if len(pc_count_dict) <= 0: + print("Found no tracing, stopping") + sys.exit(1) + print("Tracing has a total of", len(pc_count_dict), "unique PCs", flush=True) + + # Reset the target, flush the output, and close gdb + gdb.close_gdb() + target.close_openocd() + target.initialize_target() + trigger_testos_init() + target.dump_all() + gdb = GDBController(gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path) + + started = True + for pc, count in pc_count_dict.items(): + # Search for collisions in outputs between the ecdh instances + for i in range(2): + i_count = 0 + while i_count < min(MAX_SKIPS_PER_LOOP, count): + print("-" * 80) + print("Applying instruction skip in ", pc, "occurence", i_count) + print("-" * 80) + + crash_observation = "crash detected" + + try: + # The observation points + observations = { + # Crash check + crash_observation_address: f"{crash_observation}", + } + gdb.add_observation(observations) + + gdb.apply_instruction_skip( + pc, parser.parse_next_instruction(pc), i_count + ) + gdb.send_command("c", check_response=False) + + # The instruction skip loop + asymfi.handle_p384_ecdh( + private_key_array[i], public_x, public_y, cfg, trigger + ) + testos_response = read_testos_output() + + gdb_response = gdb.read_output() + if "instruction skip applied" in gdb_response: + i_count += 1 + total_attacks += 1 + + if crash_observation in gdb_response: + print("Crash detected, resetting", flush=True) + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + gdb.reset_target() + target.dump_all() + trigger_testos_init(print_output=False) + # Reset again + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + else: + testos_response_json = json.loads(testos_response) + print("Output:", testos_response_json, flush=True) + if testos_response_json["status"] == 0: + ecdh_output[i] = tuple( + testos_response_json["shared_key"] + ) + if ecdh_output[i] == ecdh_output[1 - i]: + successful_faults += 1 + print("-" * 80) + print("Successful FI attack!") + print("Location:", pc, "iteration", i_count - 1) + print(gdb_response) + print("Response:", testos_response_json) + print("-" * 80) + test_results.write( + f"{pc}, {i_count - 1}: {testos_response_json}\n" + ) + # Reset GDB by closing and opening again + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + # We do not need to reset the target since it gave an output + else: + print("No break point found, something went wrong", flush=True) + gdb.close_gdb() + target.close_openocd() + target.initialize_target() + trigger_testos_init() + target.dump_all() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + + except json.JSONDecodeError: + print( + "Error: JSON decoding failed. Invalid response format", + flush=True, + ) + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + gdb.reset_target() + target.dump_all() + trigger_testos_init(print_output=False) + # Reset again + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + gdb.dump_output() + target.dump_all() + + except TimeoutError as e: + print("Timeout error, retrying", flush=True) + print(e, flush=True) + gdb.close_gdb() + target.close_openocd() + target.initialize_target() + trigger_testos_init() + target.dump_all() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + + finally: + print("-" * 80) + print("Trace data is logged in ", pc_trace_file) + print("The campaign is logged in ", campaign_file) + print("Instruction skip results are logged in ", test_results_file) + print(f"Total attacks {total_attacks}, successful attacks {successful_faults}") + print("You can find the dissassembly in ", dis_path) + # Close the OpenOCD and GDB connection at the end + if gdb: + gdb.close_gdb() + target.close_openocd() + test_results.write( + f"Total attacks {total_attacks}, successful attacks {successful_faults}\n" + ) + sys.stdout = original_stdout + print("Trace data is logged in ", pc_trace_file) + print("The campaign is logged in ", campaign_file) + print("Instruction skip results are logged in ", test_results_file) + print(f"Total attacks {total_attacks}, successful attacks {successful_faults}") + print("You can find the dissassembly in ", dis_path) + self.assertEqual(successful_faults, 0) + self.assertEqual(started, True) + + def test_p256_verify(self): + print("Starting the p256 verify test") + # Preparing the input for an invalid signature + key = ECC.generate(curve="P-256") + pubx = [x for x in key.pointQ.x.to_bytes(32, "little")] + puby = [x for x in key.pointQ.y.to_bytes(32, "little")] + message = [i for i in range(16)] + signer = DSS.new(key, "fips-186-3") + h = SHA256.new(bytes(message)) + signature = [x for x in signer.sign(h)] + # Corrupt the signature for FiSim Testing + signature[0] ^= 0x1 + r_bytes = signature[:32] + s_bytes = signature[32:] + r_bytes.reverse() + s_bytes.reverse() + cfg = 0 + trigger = 1 + h = SHA256.new(bytes(message)) + message_digest = [x for x in h.digest()] + + # Directory for the trace log files + pc_trace_file = os.path.join(log_dir, "p256_verify_pc_trace.log") + # Directory for the output results + test_results_file = os.path.join(log_dir, "p256_verify_test_results.log") + # Directory for the the log of the campaign + campaign_file = os.path.join(log_dir, "p256_verify_test_campaign.log") + + successful_faults = 0 + total_attacks = 0 + + def trigger_testos_init(print_output=True): + # Initializing the testOS (setting up the alerts and accelerators) + (device_id, _, _, _, _, _, _) = asymfi.init( + alert_config=common_library.no_escalation_alert_config + ) + if print_output: + print("Output from init ", device_id) + + def read_testos_output(): + # Read the output from the operation + response = target.read_response(max_tries=1000) + return response + + gdb = None + started = False + with open(test_results_file, "w") as test_results, open(campaign_file, "w") as campaign: + print(f"Switching terminal output to {campaign_file}", flush=True) + sys.stdout = campaign + try: + # Program the bitstream, flash the target, and set up OpenOCD + target.initialize_target() + + # Initialize the testOS + trigger_testos_init() + + # Connect to GDB + gdb = GDBController(gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path) + + # Provide the marker name and extract the start and end address from the dis file + function_name = "PENTEST_MARKER_P256_VERIFY" + # Gives back an array of hits where the function is called + trace_address = parser.get_marker_addresses(function_name) + print("Start and stop addresses of ", function_name, ": ", trace_address) + + crash_observation_address = parser.get_function_start_address( + "ottf_exception_handler" + ) + + # Start the tracing + # We set a short timeout to detect whether GDB has connected properly + # and a long timeout for the entire tracing + initial_timeout = 10 + total_timeout = 60 * 60 * 5 + + gdb.setup_pc_trace(pc_trace_file, trace_address[0], trace_address[1]) + gdb.send_command("c", check_response=False) + + # Trigger the p256 verify from the testOS (we do not read its output) + asymfi.handle_p256_verify( + pubx, puby, r_bytes, s_bytes, message_digest, cfg, trigger + ) + + start_time = time.time() + initial_timeout_stopped = False + total_timeout_stopped = False + + # Run the tracing to get the trace log + while time.time() - start_time < initial_timeout: + output = gdb.read_output() + if "breakpoint 1, " in output: + initial_timeout_stopped = True + break + if not initial_timeout_stopped: + print("No initial break point found, can be a misfire, try again") + sys.exit(1) + while time.time() - start_time < total_timeout: + output = gdb.read_output() + if "PC trace complete" in output: + print("\nTrace complete") + total_timeout_stopped = True + break + if not total_timeout_stopped: + print("Final tracing timeout reached") + sys.exit(1) + + # Parse and truncate the trace log to get all PCs in a list + pc_list = gdb.parse_pc_trace_file(pc_trace_file) + # Get the unique PCs and annotate their occurence count + pc_count_dict = Counter(pc_list) + if len(pc_count_dict) <= 0: + print("Found no tracing, stopping") + sys.exit(1) + print("Tracing has a total of", len(pc_count_dict), "unique PCs", flush=True) + + # Reset the target, flush the output, and close gdb + gdb.close_gdb() + target.close_openocd() + target.initialize_target() + trigger_testos_init() + target.dump_all() + gdb = GDBController(gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path) + + started = True + for pc, count in pc_count_dict.items(): + i_count = 0 + while i_count < min(MAX_SKIPS_PER_LOOP, count): + print("-" * 80) + print("Applying instruction skip in ", pc, "occurence", i_count) + print("-" * 80) + + crash_observation = "crash detected" + + try: + # The observation points + observations = { + # Crash check + crash_observation_address: f"{crash_observation}", + } + gdb.add_observation(observations) + + gdb.apply_instruction_skip( + pc, parser.parse_next_instruction(pc), i_count + ) + gdb.send_command("c", check_response=False) + + # The instruction skip loop + asymfi.handle_p256_verify( + pubx, puby, r_bytes, s_bytes, message_digest, cfg, trigger + ) + testos_response = read_testos_output() + + gdb_response = gdb.read_output() + if "instruction skip applied" in gdb_response: + i_count += 1 + total_attacks += 1 + + if crash_observation in gdb_response: + print("Crash detected, resetting", flush=True) + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + gdb.reset_target() + target.dump_all() + trigger_testos_init(print_output=False) + # Reset again + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + else: + testos_response_json = json.loads(testos_response) + print("Output:", testos_response_json, flush=True) + verification_result = testos_response_json["result"] + verification_status = testos_response_json["status"] + if verification_result and (verification_status == 0): + successful_faults += 1 + print("-" * 80) + print("Successful FI attack!") + print("Location:", pc, "iteration", i_count - 1) + print(gdb_response) + print("Response:", testos_response_json) + print("-" * 80) + test_results.write( + f"{pc}, {i_count - 1}: {testos_response_json}\n" + ) + # Reset GDB by closing and opening again + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + # We do not need to reset the target since it gave an output + else: + print("No break point found, something went wrong", flush=True) + gdb.close_gdb() + target.close_openocd() + target.initialize_target() + trigger_testos_init() + target.dump_all() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + + except json.JSONDecodeError: + print( + "Error: JSON decoding failed. Invalid response format", + flush=True, + ) + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + gdb.reset_target() + target.dump_all() + trigger_testos_init(print_output=False) + # Reset again + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + gdb.dump_output() + target.dump_all() + + except TimeoutError as e: + print("Timeout error, retrying", flush=True) + print(e, flush=True) + gdb.close_gdb() + target.close_openocd() + target.initialize_target() + trigger_testos_init() + target.dump_all() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + + finally: + print("-" * 80) + print("Trace data is logged in ", pc_trace_file) + print("The campaign is logged in ", campaign_file) + print("Instruction skip results are logged in ", test_results_file) + print(f"Total attacks {total_attacks}, successful attacks {successful_faults}") + print("You can find the dissassembly in ", dis_path) + # Close the OpenOCD and GDB connection at the end + if gdb: + gdb.close_gdb() + target.close_openocd() + test_results.write( + f"Total attacks {total_attacks}, successful attacks {successful_faults}\n" + ) + sys.stdout = original_stdout + print("Trace data is logged in ", pc_trace_file) + print("The campaign is logged in ", campaign_file) + print("Instruction skip results are logged in ", test_results_file) + print(f"Total attacks {total_attacks}, successful attacks {successful_faults}") + print("You can find the dissassembly in ", dis_path) + self.assertEqual(successful_faults, 0) + self.assertEqual(started, True) + + def test_p256_ecdh(self): + print("Starting the p256 ecdh test") + # Preparing the input for an invalid signature + private_key1 = ECC.generate(curve="P-256") + private_key_array1 = [x for x in private_key1.d.to_bytes(32, "little")] + private_key2 = ECC.generate(curve="P-256") + private_key_array2 = [x for x in private_key2.d.to_bytes(32, "little")] + private_key_array = [private_key_array1, private_key_array2] + key = ECC.generate(curve="P-256") + public_point = key.pointQ + public_x = [x for x in public_point.x.to_bytes(32, "little")] + public_y = [x for x in public_point.y.to_bytes(32, "little")] + cfg = 0 + trigger = 0 + + # Directory for the trace log files + pc_trace_file = os.path.join(log_dir, "p256_ecdh_pc_trace.log") + # Directory for the output results + test_results_file = os.path.join(log_dir, "p256_ecdh_test_results.log") + # Directory for the the log of the campaign + campaign_file = os.path.join(log_dir, "p256_ecdh_test_campaign.log") + + successful_faults = 0 + total_attacks = 0 + + def trigger_testos_init(print_output=True): + # Initializing the testOS (setting up the alerts and accelerators) + (device_id, _, _, _, _, _, _) = asymfi.init( + alert_config=common_library.no_escalation_alert_config + ) + if print_output: + print("Output from init ", device_id) + + def read_testos_output(): + # Read the output from the operation + response = target.read_response(max_tries=1000) + return response + + ecdh_output = [None, None] + + gdb = None + started = False + with open(test_results_file, "w") as test_results, open(campaign_file, "w") as campaign: + print(f"Switching terminal output to {campaign_file}", flush=True) + sys.stdout = campaign + try: + # Program the bitstream, flash the target, and set up OpenOCD + target.initialize_target() + + # Initialize the testOS + trigger_testos_init() + + # Connect to GDB + gdb = GDBController(gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path) + + # We provide the name of the unique marker in the pentest framework + function_name = "PENTEST_MARKER_P256_ECDH" + # Gives back an array of hits where the function is called + trace_address = parser.get_marker_addresses(function_name) + print("Start and stop addresses of ", function_name, ": ", trace_address) + + crash_observation_address = parser.get_function_start_address( + "ottf_exception_handler" + ) + + # Start the tracing + # We set a short timeout to detect whether GDB has connected properly + # and a long timeout for the entire tracing + initial_timeout = 10 + total_timeout = 60 * 60 * 5 + + gdb.setup_pc_trace(pc_trace_file, trace_address[0], trace_address[1]) + gdb.send_command("c", check_response=False) + + # Trigger the p256 verify from the testOS (we do not read its output) + asymfi.handle_p256_ecdh(private_key_array[0], public_x, public_y, cfg, trigger) + + start_time = time.time() + initial_timeout_stopped = False + total_timeout_stopped = False + + # Run the tracing to get the trace log + while time.time() - start_time < initial_timeout: + output = gdb.read_output() + if "breakpoint 1, " in output: + initial_timeout_stopped = True + break + if not initial_timeout_stopped: + print("No initial break point found, can be a misfire, try again") + sys.exit(1) + while time.time() - start_time < total_timeout: + output = gdb.read_output() + if "PC trace complete" in output: + print("\nTrace complete") + total_timeout_stopped = True + break + if not total_timeout_stopped: + print("Final tracing timeout reached") + sys.exit(1) + + # Parse and truncate the trace log to get all PCs in a list + pc_list = gdb.parse_pc_trace_file(pc_trace_file) + # Get the unique PCs and annotate their occurence count + pc_count_dict = Counter(pc_list) + if len(pc_count_dict) <= 0: + print("Found no tracing, stopping") + sys.exit(1) + print("Tracing has a total of", len(pc_count_dict), "unique PCs", flush=True) + + # Reset the target, flush the output, and close gdb + gdb.close_gdb() + target.close_openocd() + target.initialize_target() + trigger_testos_init() + target.dump_all() + gdb = GDBController(gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path) + + started = True + for pc, count in pc_count_dict.items(): + # Search for collisions in outputs between the ecdh instances + for i in range(2): + i_count = 0 + while i_count < min(MAX_SKIPS_PER_LOOP, count): + print("-" * 80) + print("Applying instruction skip in ", pc, "occurence", i_count) + print("-" * 80) + + crash_observation = "crash detected" + + try: + # The observation points + observations = { + # Crash check + crash_observation_address: f"{crash_observation}", + } + gdb.add_observation(observations) + + gdb.apply_instruction_skip( + pc, parser.parse_next_instruction(pc), i_count + ) + gdb.send_command("c", check_response=False) + + # The instruction skip loop + asymfi.handle_p256_ecdh( + private_key_array[i], public_x, public_y, cfg, trigger + ) + testos_response = read_testos_output() + + gdb_response = gdb.read_output() + if "instruction skip applied" in gdb_response: + i_count += 1 + total_attacks += 1 + + if crash_observation in gdb_response: + print("Crash detected, resetting", flush=True) + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + gdb.reset_target() + target.dump_all() + trigger_testos_init(print_output=False) + # Reset again + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + else: + testos_response_json = json.loads(testos_response) + print("Output:", testos_response_json, flush=True) + if testos_response_json["status"] == 0: + ecdh_output[i] = tuple( + testos_response_json["shared_key"] + ) + if ecdh_output[i] == ecdh_output[1 - i]: + successful_faults += 1 + print("-" * 80) + print("Successful FI attack!") + print("Location:", pc, "iteration", i_count - 1) + print(gdb_response) + print("Response:", testos_response_json) + print("-" * 80) + test_results.write( + f"{pc}, {i_count - 1}: {testos_response_json}\n" + ) + # Reset GDB by closing and opening again + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + # We do not need to reset the target since it gave an output + else: + print("No break point found, something went wrong", flush=True) + gdb.close_gdb() + target.close_openocd() + target.initialize_target() + trigger_testos_init() + target.dump_all() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + + except json.JSONDecodeError: + print( + "Error: JSON decoding failed. Invalid response format", + flush=True, + ) + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + gdb.reset_target() + target.dump_all() + trigger_testos_init(print_output=False) + # Reset again + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + gdb.dump_output() + target.dump_all() + + except TimeoutError as e: + print("Timeout error, retrying", flush=True) + print(e, flush=True) + gdb.close_gdb() + target.close_openocd() + target.initialize_target() + trigger_testos_init() + target.dump_all() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + + finally: + print("-" * 80) + print("Trace data is logged in ", pc_trace_file) + print("The campaign is logged in ", campaign_file) + print("Instruction skip results are logged in ", test_results_file) + print(f"Total attacks {total_attacks}, successful attacks {successful_faults}") + print("You can find the dissassembly in ", dis_path) + # Close the OpenOCD and GDB connection at the end + if gdb: + gdb.close_gdb() + target.close_openocd() + test_results.write( + f"Total attacks {total_attacks}, successful attacks {successful_faults}\n" + ) + sys.stdout = original_stdout + print("Trace data is logged in ", pc_trace_file) + print("The campaign is logged in ", campaign_file) + print("Instruction skip results are logged in ", test_results_file) + print(f"Total attacks {total_attacks}, successful attacks {successful_faults}") + print("You can find the dissassembly in ", dis_path) + self.assertEqual(successful_faults, 0) + self.assertEqual(started, True) + + def test_rsa_verify(self): + print("Starting the rsa verify test") + # Preparing the input for an invalid signature + key = RSA.generate(2048) + public_exponent = key.e + n = [x for x in (key.n).to_bytes(256, "little")] + n_len = 256 + data_len = 13 + data = [i for i in range(data_len)] + h = SHA256.new(bytes(data)) + signer = pkcs1_15.new(key) + signature = signer.sign(h) + sig = [x for x in signature] + # Corrupt the signature for FiSim Testing + sig[0] ^= 0x1 + sig.reverse() + sig_len = len(sig) + cfg = 0 + trigger = 0x4 + hashing = 0 + padding = 0 + + # Directory for the trace log files + pc_trace_file = os.path.join(log_dir, "rsa_verify_pc_trace.log") + # Directory for the output results + test_results_file = os.path.join(log_dir, "rsa_verify_test_results.log") + # Directory for the the log of the campaign + campaign_file = os.path.join(log_dir, "rsa_verify_test_campaign.log") + + successful_faults = 0 + total_attacks = 0 + + def trigger_testos_init(print_output=True): + # Initializing the testOS (setting up the alerts and accelerators) + (device_id, _, _, _, _, _, _) = asymfi.init( + alert_config=common_library.no_escalation_alert_config + ) + if print_output: + print("Output from init ", device_id) + + def read_testos_output(): + # Read the output from the operation + response = target.read_response(max_tries=1000) + return response + + gdb = None + started = False + with open(test_results_file, "w") as test_results, open(campaign_file, "w") as campaign: + print(f"Switching terminal output to {campaign_file}", flush=True) + sys.stdout = campaign + try: + # Program the bitstream, flash the target, and set up OpenOCD + target.initialize_target() + + # Initialize the testOS + trigger_testos_init() + + # Connect to GDB + gdb = GDBController(gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path) + + # Provide the marker name and extract the start and end address from the dis file + function_name = "PENTEST_MARKER_RSA_VERIFY" + # Gives back an array of hits where the function is called + trace_address = parser.get_marker_addresses(function_name) + print("Start and stop addresses of ", function_name, ": ", trace_address) + + crash_observation_address = parser.get_function_start_address( + "ottf_exception_handler" + ) + + # Start the tracing + # We set a short timeout to detect whether GDB has connected properly + # and a long timeout for the entire tracing + initial_timeout = 10 + total_timeout = 60 * 60 * 5 + + gdb.setup_pc_trace(pc_trace_file, trace_address[0], trace_address[1]) + gdb.send_command("c", check_response=False) + + # Trigger the rsa verify from the testOS (we do not read its output) + asymfi.handle_rsa_verify( + data, + data_len, + public_exponent, + n, + n_len, + sig, + sig_len, + padding, + hashing, + cfg, + trigger, + ) + + start_time = time.time() + initial_timeout_stopped = False + total_timeout_stopped = False + + # Run the tracing to get the trace log + # Sometimes the tracing fails due to race conditions, + # we have a quick initial timeout to catch this + while time.time() - start_time < initial_timeout: + output = gdb.read_output() + if "breakpoint 1, " in output: + initial_timeout_stopped = True + break + if not initial_timeout_stopped: + print("No initial break point found, can be a misfire, try again") + sys.exit(1) + while time.time() - start_time < total_timeout: + output = gdb.read_output() + if "PC trace complete" in output: + print("\nTrace complete") + total_timeout_stopped = True + break + if not total_timeout_stopped: + print("Final tracing timeout reached") + sys.exit(1) + + # Parse and truncate the trace log to get all PCs in a list + pc_list = gdb.parse_pc_trace_file(pc_trace_file) + # Get the unique PCs and annotate their occurence count + pc_count_dict = Counter(pc_list) + if len(pc_count_dict) <= 0: + print("Found no tracing, stopping") + sys.exit(1) + print("Tracing has a total of", len(pc_count_dict), "unique PCs", flush=True) + + # Reset the target, flush the output, and close gdb + gdb.close_gdb() + target.close_openocd() + target.initialize_target() + trigger_testos_init() + target.dump_all() + gdb = GDBController(gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path) + + started = True + for pc, count in pc_count_dict.items(): + i_count = 0 + while i_count < min(MAX_SKIPS_PER_LOOP, count): + print("-" * 80) + print("Applying instruction skip in ", pc, "occurence", i_count) + print("-" * 80) + + crash_observation = "crash detected" + + try: + # The observation points + observations = { + # Crash check + crash_observation_address: f"{crash_observation}", + } + gdb.add_observation(observations) + + gdb.apply_instruction_skip( + pc, parser.parse_next_instruction(pc), i_count + ) + gdb.send_command("c", check_response=False) + + # The instruction skip loop + asymfi.handle_rsa_verify( + data, + data_len, + public_exponent, + n, + n_len, + sig, + sig_len, + padding, + hashing, + cfg, + trigger, + ) + testos_response = read_testos_output() + + gdb_response = gdb.read_output() + if "instruction skip applied" in gdb_response: + i_count += 1 + total_attacks += 1 + + if crash_observation in gdb_response: + print("Crash detected, resetting", flush=True) + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + gdb.reset_target() + target.dump_all() + trigger_testos_init(print_output=False) + # Reset again + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + else: + testos_response_json = json.loads(testos_response) + print("Output:", testos_response_json, flush=True) + verification_result = testos_response_json["result"] + verification_status = testos_response_json["status"] + if verification_result and (verification_status == 0): + successful_faults += 1 + print("-" * 80) + print("Successful FI attack!") + print("Location:", pc, "iteration", i_count - 1) + print(gdb_response) + print("Response:", testos_response_json) + print("-" * 80) + test_results.write( + f"{pc}, {i_count - 1}: {testos_response_json}\n" + ) + # Reset GDB by closing and opening again + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + # We do not need to reset the target since it returned an output + else: + print("No break point found, something went wrong", flush=True) + gdb.close_gdb() + target.close_openocd() + target.initialize_target() + trigger_testos_init() + target.dump_all() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + + except json.JSONDecodeError: + print( + "Error: JSON decoding failed. Invalid response format", flush=True + ) + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + gdb.reset_target() + target.dump_all() + trigger_testos_init(print_output=False) + # Reset again + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + gdb.dump_output() + target.dump_all() + + except TimeoutError as e: + print("Timeout error, retrying", flush=True) + print(e, flush=True) + gdb.close_gdb() + target.close_openocd() + target.initialize_target() + trigger_testos_init() + target.dump_all() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + + finally: + print("-" * 80) + print("Trace data is logged in ", pc_trace_file) + print("The campaign is logged in ", campaign_file) + print("Instruction skip results are logged in ", test_results_file) + print(f"Total attacks {total_attacks}, successful attacks {successful_faults}") + print("You can find the dissassembly in ", dis_path) + # Close the OpenOCD and GDB connection at the end + if gdb: + gdb.close_gdb() + target.close_openocd() + test_results.write( + f"Total attacks {total_attacks}, successful attacks {successful_faults}\n" + ) + sys.stdout = original_stdout + print("Trace data is logged in ", pc_trace_file) + print("The campaign is logged in ", campaign_file) + print("Instruction skip results are logged in ", test_results_file) + print(f"Total attacks {total_attacks}, successful attacks {successful_faults}") + print("You can find the dissassembly in ", dis_path) + self.assertEqual(successful_faults, 0) + self.assertEqual(started, True) + + +if __name__ == "__main__": + r = Runfiles.Create() + # Get the openocd path. + openocd_path = r.Rlocation("lowrisc_opentitan/third_party/openocd/build_openocd/bin/openocd") + # Get the openocd config files. + # The first file is on the cw340 (this is specific to the cw340) + CONFIG_FILE_CHIP = r.Rlocation("lowrisc_opentitan/util/openocd/board/cw340_ftdi.cfg") + # The config for the earlgrey design + CONFIG_FILE_DESIGN = r.Rlocation("lowrisc_opentitan/util/openocd/target/lowrisc-earlgrey.cfg") + # Get the opentitantool path. + opentitantool_path = r.Rlocation("lowrisc_opentitan/sw/host/opentitantool/opentitantool") + # The path for GDB and the default port (set up by OpenOCD) + GDB_PATH = r.Rlocation("lowrisc_rv32imcb_toolchain/bin/riscv32-unknown-elf-gdb") + GDB_PORT = 3333 + # Program the bitstream for FPGAs. + bitstream_path = None + if BITSTREAM: + bitstream_path = r.Rlocation("lowrisc_opentitan/" + BITSTREAM) + # Get the test result path + log_dir = os.environ.get("TEST_UNDECLARED_OUTPUTS_DIR") + # Get the firmware path. + firmware_path = r.Rlocation("lowrisc_opentitan/" + BOOTSTRAP) + # Get the disassembly path. + dis_path = firmware_path.replace(".img", ".dis") + # And the path for the elf. + elf_path = firmware_path.replace(".img", ".elf") + + if "fpga" in BOOTSTRAP: + target_type = "fpga" + else: + target_type = "chip" + + target_cfg = targets.TargetConfig( + target_type=target_type, + interface_type="hyperdebug", + fw_bin=firmware_path, + opentitantool=opentitantool_path, + bitstream=bitstream_path, + tool_args=config_args, + openocd=openocd_path, + openocd_chip_config=CONFIG_FILE_CHIP, + openocd_design_config=CONFIG_FILE_DESIGN, + ) + + target = targets.Target(target_cfg) + asymfi = OTFIAsymCrypto(target) + parser = DisParser(dis_path) + + unittest.main(argv=[sys.argv[0]]) diff --git a/sw/host/penetrationtests/python/fi/gdb_testing/fi_sym_cryptolib_python_gdb_test.py b/sw/host/penetrationtests/python/fi/gdb_testing/fi_sym_cryptolib_python_gdb_test.py new file mode 100644 index 0000000000000..febf0765b2cdc --- /dev/null +++ b/sw/host/penetrationtests/python/fi/gdb_testing/fi_sym_cryptolib_python_gdb_test.py @@ -0,0 +1,643 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +# What to do when running into errors: +# - If device is busy or seeing "rejected 'gdb' connection, no more connections allowed", +# cut the USB connection, e.g., sudo fuser /dev/ttyUSB0 and kill the PID +# - If the port is busy check sudo lsof -i :3333 and then kill the PID + +from sw.host.penetrationtests.python.fi.communication.fi_sym_cryptolib_commands import ( + OTFISymCrypto, +) +from python.runfiles import Runfiles +from sw.host.penetrationtests.python.util import targets +from sw.host.penetrationtests.python.util import common_library +from sw.host.penetrationtests.python.util.gdb_controller import GDBController +from sw.host.penetrationtests.python.util.dis_parser import DisParser +from collections import Counter +import json +import argparse +import unittest +import sys +import os +import time + +ignored_keys_set = set(["status"]) +opentitantool_path = "" +log_dir = "" +elf_path = "" + +# We set to only apply instruction skips in the first +# MAX_SKIPS_PER_LOOP iterations of a loop +MAX_SKIPS_PER_LOOP = 2 + +target = None +symfi = None + +# Read in the extra arguments from the opentitan_test. +parser = argparse.ArgumentParser() +parser.add_argument("--bitstream", type=str) +parser.add_argument("--bootstrap", type=str) + +args, config_args = parser.parse_known_args() + +BITSTREAM = args.bitstream +BOOTSTRAP = args.bootstrap + +original_stdout = sys.stdout + + +class SymCryptolibFiSim(unittest.TestCase): + def test_hmac(self): + print("Starting the hmac-sha256 test") + # We only test SHA256 + data_len = 32 + # We prepare two data inputs and check for collisions between them + data1 = [i for i in range(data_len)] + data2 = [data_len - i for i in range(data_len)] + data = [data1, data2] + key_len = 32 + key = [i for i in range(key_len)] + cfg = 0 + trigger = 0 + hash_mode = 0 + mode = 0 + + # Directory for the trace log files + pc_trace_file = os.path.join(log_dir, "hmac_pc_trace.log") + # Directory for the output results + test_results_file = os.path.join(log_dir, "hmac_test_results.log") + # Directory for the the log of the campaign + campaign_file = os.path.join(log_dir, "hmac_test_campaign.log") + + successful_faults = 0 + total_attacks = 0 + + def trigger_testos_init(print_output=True): + # Initializing the testOS (setting up the alerts and accelerators) + (device_id, _, _, _, _, _, _) = symfi.init( + alert_config=common_library.no_escalation_alert_config + ) + if print_output: + print("Output from init ", device_id) + + def read_testos_output(): + # Read the output from the operation + response = target.read_response(max_tries=1000) + return response + + gdb = None + started = False + with open(test_results_file, "w") as test_results, open(campaign_file, "w") as campaign: + print(f"Switching terminal output to {campaign_file}", flush=True) + sys.stdout = campaign + try: + # Program the bitstream, flash the target, and set up OpenOCD + target.initialize_target() + + # Initialize the testOS + trigger_testos_init() + + # Connect to GDB + gdb = GDBController(gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path) + + # We provide the name of the unique marker in the pentest framework + function_name = "PENTEST_MARKER_HMAC" + # Gives back an array of hits where the function is called + trace_address = parser.get_marker_addresses(function_name) + print("Start and stop addresses of ", function_name, ": ", trace_address) + + crash_observation_address = parser.get_function_start_address( + "ottf_exception_handler" + ) + + # Start the tracing + # We set a short timeout to detect whether GDB has connected properly + # and a long timeout for the entire tracing + initial_timeout = 10 + total_timeout = 60 * 60 * 5 + + gdb.setup_pc_trace(pc_trace_file, trace_address[0], trace_address[1]) + gdb.send_command("c", check_response=False) + + # Trigger the hmac from the testOS (we do not read its output) + symfi.handle_hmac(data[0], data_len, key, key_len, hash_mode, mode, cfg, trigger) + + start_time = time.time() + initial_timeout_stopped = False + total_timeout_stopped = False + + # Run the tracing to get the trace log + # Sometimes the tracing fails due to race conditions, + # we have a quick initial timeout to catch this + while time.time() - start_time < initial_timeout: + output = gdb.read_output() + if "breakpoint 1, " in output: + initial_timeout_stopped = True + break + if not initial_timeout_stopped: + print("No initial break point found, can be a misfire, try again") + sys.exit(1) + while time.time() - start_time < total_timeout: + output = gdb.read_output() + if "PC trace complete" in output: + print("\nTrace complete") + total_timeout_stopped = True + break + if not total_timeout_stopped: + print("Final tracing timeout reached") + sys.exit(1) + + # Parse and truncate the trace log to get all PCs in a list + pc_list = gdb.parse_pc_trace_file(pc_trace_file) + # Get the unique PCs and annotate their occurence count + pc_count_dict = Counter(pc_list) + if len(pc_count_dict) <= 0: + print("Found no tracing, stopping") + sys.exit(1) + print("Tracing has a total of", len(pc_count_dict), "unique PCs", flush=True) + + # Reset the target, flush the output, and close gdb + gdb.close_gdb() + target.close_openocd() + target.initialize_target() + trigger_testos_init() + target.dump_all() + gdb = GDBController(gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path) + + data_out = [None, None] + + started = True + for pc, count in pc_count_dict.items(): + # Search for collisions in outputs between the HMAC instances + for i in range(2): + i_count = 0 + while i_count < min(MAX_SKIPS_PER_LOOP, count): + print("-" * 80) + print( + "Applying instruction skip in ", pc, "occurence", i_count, "data", i + ) + print("-" * 80) + + crash_observation = "crash detected" + + try: + # The observation points + observations = { + # Crash check + crash_observation_address: f"{crash_observation}", + } + gdb.add_observation(observations) + + gdb.apply_instruction_skip( + pc, parser.parse_next_instruction(pc), i_count + ) + gdb.send_command("c", check_response=False) + + # The instruction skip loop + symfi.handle_hmac( + data[i], data_len, key, key_len, hash_mode, mode, cfg, trigger + ) + testos_response = read_testos_output() + + gdb_response = gdb.read_output() + if "instruction skip applied" in gdb_response: + i_count += 1 + total_attacks += 1 + + if crash_observation in gdb_response: + print("Crash detected, resetting", flush=True) + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, + gdb_port=GDB_PORT, + elf_file=elf_path, + ) + gdb.reset_target() + target.dump_all() + trigger_testos_init(print_output=False) + # Reset again + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, + gdb_port=GDB_PORT, + elf_file=elf_path, + ) + else: + testos_response_json = json.loads(testos_response) + print("Output:", testos_response_json, flush=True) + if testos_response_json["status"] == 0: + data_out[i] = tuple(testos_response_json["data"]) + + if data_out[i] == data_out[1 - i]: + successful_faults += 1 + print("-" * 80) + print("Successful FI attack!") + print("Location:", pc, "iteration", i_count - 1) + print(gdb_response) + print("Response:", testos_response_json) + print("-" * 80) + test_results.write( + f"{pc}, {i_count - 1}: {testos_response_json}\n" + ) + # Reset GDB by closing and opening again + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, + gdb_port=GDB_PORT, + elf_file=elf_path, + ) + else: + print("No break point found, something went wrong", flush=True) + gdb.close_gdb() + target.close_openocd() + target.initialize_target() + trigger_testos_init() + target.dump_all() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + + except json.JSONDecodeError: + print( + "Error: JSON decoding failed. Invalid response format", + flush=True, + ) + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + gdb.reset_target() + target.dump_all() + trigger_testos_init(print_output=False) + # Reset again + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + gdb.dump_output() + target.dump_all() + + except TimeoutError as e: + print("Timeout error, retrying", flush=True) + print(e, flush=True) + gdb.close_gdb() + target.close_openocd() + target.initialize_target() + trigger_testos_init() + target.dump_all() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + + finally: + print("-" * 80) + print("Trace data is logged in ", pc_trace_file) + print("The campaign is logged in ", campaign_file) + print("Instruction skip results are logged in ", test_results_file) + print(f"Total attacks {total_attacks}, successful attacks {successful_faults}") + print("You can find the dissassembly in ", dis_path) + # Close the OpenOCD and GDB connection at the end + if gdb: + gdb.close_gdb() + target.close_openocd() + test_results.write( + f"Total attacks {total_attacks}, successful attacks {successful_faults}\n" + ) + sys.stdout = original_stdout + print("Trace data is logged in ", pc_trace_file) + print("The campaign is logged in ", campaign_file) + print("Instruction skip results are logged in ", test_results_file) + print(f"Total attacks {total_attacks}, successful attacks {successful_faults}") + print("You can find the dissassembly in ", dis_path) + self.assertEqual(successful_faults, 0) + self.assertEqual(started, True) + + def test_drbg_generate(self): + print("Starting the drbg generate test") + entropy_len = 32 + entropy1 = [i for i in range(entropy_len)] + entropy2 = [entropy_len - i for i in range(entropy_len)] + entropy = [entropy1, entropy2] + nonce_len = 16 + nonce = [i for i in range(nonce_len)] + reseed_interval = 100 + data_len = 16 + mode = 0 + trigger = 2 + cfg = 0 + + # Directory for the trace log files + pc_trace_file = os.path.join(log_dir, "drbg_pc_trace.log") + # Directory for the output results + test_results_file = os.path.join(log_dir, "drbg_test_results.log") + # Directory for the the log of the campaign + campaign_file = os.path.join(log_dir, "drbg_test_campaign.log") + + successful_faults = 0 + total_attacks = 0 + + def trigger_testos_init(print_output=True): + # Initializing the testOS (setting up the alerts and accelerators) + (device_id, _, _, _, _, _, _) = symfi.init( + alert_config=common_library.no_escalation_alert_config + ) + if print_output: + print("Output from init ", device_id) + + def read_testos_output(): + # Read the output from the operation + response = target.read_response(max_tries=1000) + return response + + drbg_out = [None, None] + + gdb = None + started = False + with open(test_results_file, "w") as test_results, open(campaign_file, "w") as campaign: + print(f"Switching terminal output to {campaign_file}", flush=True) + sys.stdout = campaign + try: + # Program the bitstream, flash the target, and set up OpenOCD + target.initialize_target() + + # Initialize the testOS + trigger_testos_init() + + # Connect to GDB + gdb = GDBController(gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path) + + # We provide the name of the unique marker in the pentest framework + function_name = "PENTEST_MARKER_DRBG_GENERATE" + # Gives back an array of hits where the function is called + trace_address = parser.get_marker_addresses(function_name) + print("Start and stop addresses of ", function_name, ": ", trace_address) + + crash_observation_address = parser.get_function_start_address( + "ottf_exception_handler" + ) + + # Start the tracing + # We set a short timeout to detect whether GDB has connected properly + # and a long timeout for the entire tracing + initial_timeout = 10 + total_timeout = 60 * 60 * 5 + + gdb.setup_pc_trace(pc_trace_file, trace_address[0], trace_address[1]) + gdb.send_command("c", check_response=False) + + # Trigger the drbg from the testOS (we do not read its output) + symfi.handle_drbg_reseed( + entropy[0], entropy_len, nonce, nonce_len, reseed_interval, mode, 0, 0 + ) + target.read_response() + symfi.handle_drbg_generate([0], 0, data_len, mode, cfg, trigger) + + start_time = time.time() + initial_timeout_stopped = False + total_timeout_stopped = False + + # Run the tracing to get the trace log + # Sometimes the tracing fails due to race conditions, + # we have a quick initial timeout to catch this + while time.time() - start_time < initial_timeout: + output = gdb.read_output() + if "breakpoint 1, " in output: + initial_timeout_stopped = True + break + if not initial_timeout_stopped: + print("No initial break point found, can be a misfire, try again") + sys.exit(1) + while time.time() - start_time < total_timeout: + output = gdb.read_output() + if "PC trace complete" in output: + print("\nTrace complete") + total_timeout_stopped = True + break + if not total_timeout_stopped: + print("Final tracing timeout reached") + sys.exit(1) + + # Parse and truncate the trace log to get all PCs in a list + pc_list = gdb.parse_pc_trace_file(pc_trace_file) + # Get the unique PCs and annotate their occurence count + pc_count_dict = Counter(pc_list) + if len(pc_count_dict) <= 0: + print("Found no tracing, stopping") + sys.exit(1) + print("Tracing has a total of", len(pc_count_dict), "unique PCs", flush=True) + + # Reset the target, flush the output, and close gdb + gdb.close_gdb() + target.close_openocd() + target.initialize_target() + trigger_testos_init() + target.dump_all() + gdb = GDBController(gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path) + + started = True + for pc, count in pc_count_dict.items(): + # Search for collisions in outputs between the gcm instances + for i in range(2): + i_count = 0 + while i_count < min(MAX_SKIPS_PER_LOOP, count): + print("-" * 80) + print( + "Applying instruction skip in ", pc, "occurence", i_count, "data", i + ) + print("-" * 80) + + crash_observation = "crash detected" + + try: + # The observation points + observations = { + # Crash check + crash_observation_address: f"{crash_observation}", + } + gdb.add_observation(observations) + + gdb.apply_instruction_skip( + pc, parser.parse_next_instruction(pc), i_count + ) + gdb.send_command("c", check_response=False) + + # The instruction skip loop + symfi.handle_drbg_reseed( + entropy[i], + entropy_len, + nonce, + nonce_len, + reseed_interval, + mode, + 0, + 0, + ) + target.read_response() + symfi.handle_drbg_generate([0], 0, data_len, mode, cfg, trigger) + testos_response = read_testos_output() + + gdb_response = gdb.read_output() + if "instruction skip applied" in gdb_response: + i_count += 1 + total_attacks += 1 + + if crash_observation in gdb_response: + print("Crash detected, resetting", flush=True) + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, + gdb_port=GDB_PORT, + elf_file=elf_path, + ) + gdb.reset_target() + target.dump_all() + trigger_testos_init(print_output=False) + # Reset again + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, + gdb_port=GDB_PORT, + elf_file=elf_path, + ) + else: + testos_response_json = json.loads(testos_response) + print("Output:", testos_response_json, flush=True) + if testos_response_json["status"] == 0: + drbg_out[i] = tuple(testos_response_json["data"]) + + if drbg_out[i] == drbg_out[1 - i]: + successful_faults += 1 + print("-" * 80) + print("Successful FI attack!") + print("Location:", pc, "iteration", i_count - 1) + print(gdb_response) + print("Response:", testos_response_json) + print("-" * 80) + test_results.write( + f"{pc}, {i_count - 1}: {testos_response_json}\n" + ) + # Reset GDB by closing and opening again + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, + gdb_port=GDB_PORT, + elf_file=elf_path, + ) + else: + print("No break point found, something went wrong", flush=True) + gdb.close_gdb() + target.close_openocd() + target.initialize_target() + trigger_testos_init() + target.dump_all() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + + except json.JSONDecodeError: + print( + "Error: JSON decoding failed. Invalid response format", + flush=True, + ) + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + gdb.reset_target() + target.dump_all() + trigger_testos_init(print_output=False) + # Reset again + gdb.close_gdb() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + gdb.dump_output() + target.dump_all() + + except TimeoutError as e: + print("Timeout error, retrying", flush=True) + print(e, flush=True) + gdb.close_gdb() + target.close_openocd() + target.initialize_target() + trigger_testos_init() + target.dump_all() + gdb = GDBController( + gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path + ) + + finally: + print("-" * 80) + print("Trace data is logged in ", pc_trace_file) + print("The campaign is logged in ", campaign_file) + print("Instruction skip results are logged in ", test_results_file) + print(f"Total attacks {total_attacks}, successful attacks {successful_faults}") + print("You can find the dissassembly in ", dis_path) + # Close the OpenOCD and GDB connection at the end + if gdb: + gdb.close_gdb() + target.close_openocd() + test_results.write( + f"Total attacks {total_attacks}, successful attacks {successful_faults}\n" + ) + sys.stdout = original_stdout + print("Trace data is logged in ", pc_trace_file) + print("The campaign is logged in ", campaign_file) + print("Instruction skip results are logged in ", test_results_file) + print(f"Total attacks {total_attacks}, successful attacks {successful_faults}") + print("You can find the dissassembly in ", dis_path) + self.assertEqual(successful_faults, 0) + self.assertEqual(started, True) + + +if __name__ == "__main__": + r = Runfiles.Create() + # Get the openocd path. + openocd_path = r.Rlocation("lowrisc_opentitan/third_party/openocd/build_openocd/bin/openocd") + # Get the openocd config files. + # The first file is on the cw340 (this is specific to the cw340) + CONFIG_FILE_CHIP = r.Rlocation("lowrisc_opentitan/util/openocd/board/cw340_ftdi.cfg") + # The config for the earlgrey design + CONFIG_FILE_DESIGN = r.Rlocation("lowrisc_opentitan/util/openocd/target/lowrisc-earlgrey.cfg") + # Get the opentitantool path. + opentitantool_path = r.Rlocation("lowrisc_opentitan/sw/host/opentitantool/opentitantool") + # The path for GDB and the default port (set up by OpenOCD) + GDB_PATH = r.Rlocation("lowrisc_rv32imcb_toolchain/bin/riscv32-unknown-elf-gdb") + GDB_PORT = 3333 + # Program the bitstream for FPGAs. + bitstream_path = None + if BITSTREAM: + bitstream_path = r.Rlocation("lowrisc_opentitan/" + BITSTREAM) + # Get the test result path + log_dir = os.environ.get("TEST_UNDECLARED_OUTPUTS_DIR") + # Get the firmware path. + firmware_path = r.Rlocation("lowrisc_opentitan/" + BOOTSTRAP) + # Get the disassembly path. + dis_path = firmware_path.replace(".img", ".dis") + # And the path for the elf. + elf_path = firmware_path.replace(".img", ".elf") + + if "fpga" in BOOTSTRAP: + target_type = "fpga" + else: + target_type = "chip" + + target_cfg = targets.TargetConfig( + target_type=target_type, + interface_type="hyperdebug", + fw_bin=firmware_path, + opentitantool=opentitantool_path, + bitstream=bitstream_path, + tool_args=config_args, + openocd=openocd_path, + openocd_chip_config=CONFIG_FILE_CHIP, + openocd_design_config=CONFIG_FILE_DESIGN, + ) + + target = targets.Target(target_cfg) + symfi = OTFISymCrypto(target) + parser = DisParser(dis_path) + + unittest.main(argv=[sys.argv[0]]) diff --git a/sw/host/penetrationtests/python/util/BUILD b/sw/host/penetrationtests/python/util/BUILD index cf3ee37eb3d5e..02657651adad1 100644 --- a/sw/host/penetrationtests/python/util/BUILD +++ b/sw/host/penetrationtests/python/util/BUILD @@ -30,3 +30,13 @@ py_library( name = "hyperdebug", srcs = ["hyperdebug.py"], ) + +py_library( + name = "gdb_controller", + srcs = ["gdb_controller.py"], +) + +py_library( + name = "dis_parser", + srcs = ["dis_parser.py"], +) diff --git a/sw/host/penetrationtests/python/util/common_library.py b/sw/host/penetrationtests/python/util/common_library.py index 54515894b467b..cb29601bf2ba6 100644 --- a/sw/host/penetrationtests/python/util/common_library.py +++ b/sw/host/penetrationtests/python/util/common_library.py @@ -310,3 +310,146 @@ "duration_cycles": [0, 7200, 48, 48], "ping_timeout": 1200, } + +no_escalation_alert_config = { + "alert_classes": [ + 2, # "uart0_fatal_fault", + 2, # "uart1_fatal_fault", + 2, # "uart2_fatal_fault", + 2, # "uart3_fatal_fault", + 0, # "gpio_fatal_fault", + 0, # "spi_device_fatal_fault", + 2, # "i2c0_fatal_fault", + 2, # "i2c1_fatal_fault", + 2, # "i2c2_fatal_fault", + 2, # "pattgen_fatal_fault", + 0, # "rv_timer_fatal_fault", + 0, # "otp_ctrl_fatal_macro_error", + 0, # "otp_ctrl_fatal_check_error", + 0, # "otp_ctrl_fatal_bus_integ_error", + 0, # "otp_ctrl_fatal_prim_otp_alert", + 1, # "otp_ctrl_recov_prim_otp_alert", + 0, # "lc_ctrl_fatal_prog_error", + 0, # "lc_ctrl_fatal_state_error", + 0, # "lc_ctrl_fatal_bus_integ_error", + 2, # "spi_host0_fatal_fault", + 2, # "spi_host1_fatal_fault", + 2, # "usbdev_fatal_fault", + 0, # "pwrmgr_aon_fatal_fault", + 0, # "rstmgr_aon_fatal_fault", + 0, # "rstmgr_aon_fatal_cnsty_fault", + 1, # "clkmgr_aon_recov_fault", + 0, # "clkmgr_aon_fatal_fault", + 2, # "sysrst_ctrl_aon_fatal_fault", + 2, # "adc_ctrl_aon_fatal_fault", + 2, # "pwm_aon_fatal_fault", + 2, # "pinmux_aon_fatal_fault", + 0, # "aon_timer_aon_fatal_fault", + 1, # "sensor_ctrl_recov_alert", + 0, # "sensor_ctrl_fatal_alert", + 0, # "sram_ctrl_ret_aon_fatal_error", + 1, # "flash_ctrl_recov_err", + 0, # "flash_ctrl_fatal_std_err", + 0, # "flash_ctrl_fatal_err", + 0, # "flash_ctrl_fatal_prim_flash_alert", + 1, # "flash_ctrl_recov_prim_flash_alert", + 0, # "rv_dm_fatal_fault", + 0, # "rv_plic_fatal_fault", + 1, # "aes_recov_ctrl_update_err", + 0, # "aes_fatal_fault", + 0, # "hmac_fatal_fault", + 1, # "kmac_recov_operation_err", + 0, # "kmac_fatal_fault_err", + 0, # "otbn_fatal", + 1, # "otbn_recov", + 1, # "keymgr_recov_operation_err", + 0, # "keymgr_fatal_fault_err", + 1, # "csrng_recov_alert", + 0, # "csrng_fatal_alert", + 1, # "entropy_src_recov_alert", + 0, # "entropy_src_fatal_alert", + 1, # "edn0_recov_alert", + 0, # "edn0_fatal_alert", + 1, # "edn1_recov_alert", + 0, # "edn1_fatal_alert", + 0, # "sram_ctrl_main_fatal_error", + 0, # "rom_ctrl_fatal", + 0, # "rv_core_ibex_fatal_sw_err", + 1, # "rv_core_ibex_recov_sw_err", + 0, # "rv_core_ibex_fatal_hw_err", + 1, # "rv_core_ibex_recov_hw_err" + ], + "enable_alerts": [ + True, # "uart0_fatal_fault", + True, # "uart1_fatal_fault", + True, # "uart2_fatal_fault", + True, # "uart3_fatal_fault", + True, # "gpio_fatal_fault", + True, # "spi_device_fatal_fault", + True, # "i2c0_fatal_fault", + True, # "i2c1_fatal_fault", + True, # "i2c2_fatal_fault", + True, # "pattgen_fatal_fault", + True, # "rv_timer_fatal_fault", + True, # "otp_ctrl_fatal_macro_error", + True, # "otp_ctrl_fatal_check_error", + True, # "otp_ctrl_fatal_bus_integ_error", + True, # "otp_ctrl_fatal_prim_otp_alert", + True, # "otp_ctrl_recov_prim_otp_alert", + True, # "lc_ctrl_fatal_prog_error", + True, # "lc_ctrl_fatal_state_error", + True, # "lc_ctrl_fatal_bus_integ_error", + True, # "spi_host0_fatal_fault", + True, # "spi_host1_fatal_fault", + True, # "usbdev_fatal_fault", + True, # "pwrmgr_aon_fatal_fault", + True, # "rstmgr_aon_fatal_fault", + True, # "rstmgr_aon_fatal_cnsty_fault", + True, # "clkmgr_aon_recov_fault", + True, # "clkmgr_aon_fatal_fault", + True, # "sysrst_ctrl_aon_fatal_fault", + True, # "adc_ctrl_aon_fatal_fault", + True, # "pwm_aon_fatal_fault", + True, # "pinmux_aon_fatal_fault", + True, # "aon_timer_aon_fatal_fault", + True, # "sensor_ctrl_recov_alert", + True, # "sensor_ctrl_fatal_alert", + True, # "sram_ctrl_ret_aon_fatal_error", + True, # "flash_ctrl_recov_err", + True, # "flash_ctrl_fatal_std_err", + # On FPGA, this alert always raised, so leave it off + False, # "flash_ctrl_fatal_err", + True, # "flash_ctrl_fatal_prim_flash_alert", + True, # "flash_ctrl_recov_prim_flash_alert", + True, # "rv_dm_fatal_fault", + True, # "rv_plic_fatal_fault", + True, # "aes_recov_ctrl_update_err", + True, # "aes_fatal_fault", + True, # "hmac_fatal_fault", + True, # "kmac_recov_operation_err", + True, # "kmac_fatal_fault_err", + True, # "otbn_fatal", + True, # "otbn_recov", + True, # "keymgr_recov_operation_err", + True, # "keymgr_fatal_fault_err", + True, # "csrng_recov_alert", + True, # "csrng_fatal_alert", + True, # "entropy_src_recov_alert", + True, # "entropy_src_fatal_alert", + True, # "edn0_recov_alert", + True, # "edn0_fatal_alert", + True, # "edn1_recov_alert", + True, # "edn1_fatal_alert", + True, # "sram_ctrl_main_fatal_error", + True, # "rom_ctrl_fatal", + True, # "rv_core_ibex_fatal_sw_err", + True, # "rv_core_ibex_recov_sw_err", + True, # "rv_core_ibex_fatal_hw_err", + True, # "rv_core_ibex_recov_hw_err" + ], + "enable_classes": [False, False, False, False], + "accumulation_thresholds": [2, 2, 2, 2], + "signals": [4294967295, 4294967295, 4294967295, 4294967295], + "duration_cycles": [0, 7200, 48, 48], + "ping_timeout": 1200, +} diff --git a/sw/host/penetrationtests/python/util/dis_parser.py b/sw/host/penetrationtests/python/util/dis_parser.py new file mode 100644 index 0000000000000..bd0d3d6ac9391 --- /dev/null +++ b/sw/host/penetrationtests/python/util/dis_parser.py @@ -0,0 +1,149 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +import sys +import os +import re + + +class DisParser: + def __init__(self, dis_file_path=None): + self.dis_file_path = dis_file_path + if not os.path.exists(self.dis_file_path): + print(f"Error: File not found at path: {self.dis_file_path}") + sys.exit(1) + + def get_function_addresses(self, function_name): + try: + with open(self.dis_file_path, "r") as f: + dis_content = f.read() + except IOError as e: + print(f"Error reading file: {e}") + return [] + + addresses = [] + escaped_name = re.escape(function_name) + lines = dis_content.splitlines() + + jump_line_pattern = re.compile(r"^\s*([0-9a-fA-F]{8}):.*?<" + escaped_name + r">") + + next_addr_pattern = re.compile(r"^\s*([0-9a-fA-F]{8}):") + + for i, line in enumerate(lines): + match = jump_line_pattern.match(line) + if match: + jump_address = match.group(1) + + for j in range(i + 1, len(lines)): + next_line = lines[j] + + if not next_line.strip(): + continue + + next_match = next_addr_pattern.match(next_line) + + if next_match: + address_after_jump = next_match.group(1) + addresses.append((f"0x{jump_address}", f"0x{address_after_jump}")) + break + + return addresses + + def parse_next_instruction(self, pc_address): + if pc_address.startswith("0x"): + pc_address = pc_address[2:] + + instruction_addr_pattern = re.compile(r"^\s*([0-9a-fA-F]{8}):") + + found_current_address = False + + try: + with open(self.dis_file_path, "r") as f: + for line in f: + match = instruction_addr_pattern.match(line) + + if match: + line_address = match.group(1) + + if not found_current_address: + if line_address == pc_address: + found_current_address = True + else: + return f"0x{line_address}" + + if found_current_address: + return None + else: + print(f"Error: Address 0x{pc_address} not found in disassembly.") + return None + + except IOError as e: + print(f"Error reading file: {e}") + return None + + def get_function_start_address(self, function_name): + try: + with open(self.dis_file_path, "r") as f: + for line in f: + escaped_name = re.escape(function_name) + pattern = re.compile(r"^([0-9a-fA-F]{8})\s*<" + escaped_name + r">:") + + match = pattern.search(line) + if match: + return f"0x{match.group(1)}" + + except IOError as e: + print(f"Error reading file: {e}") + return None + + return None + + def get_function_end_address(self, function_name): + """ + Finds the address of the last 'ret' instruction in a function. + """ + try: + with open(self.dis_file_path, "r") as f: + escaped_name = re.escape(function_name) + start_pattern = re.compile(r"^[0-9a-fA-F]+\s+<" + escaped_name + r">:") + any_func_pattern = re.compile(r"^[0-9a-fA-F]+\s+<.*>:") + inst_pattern = re.compile(r"^\s*([0-9a-fA-F]+):\s+.*?\s+([a-zA-Z\.]+)") + + in_function = False + function_instructions = [] + + for line in f: + if not in_function: + if start_pattern.search(line): + in_function = True + else: + if any_func_pattern.search(line): + break + + inst_match = inst_pattern.search(line) + if inst_match: + address = inst_match.group(1) + mnemonic = inst_match.group(2) + function_instructions.append((address, mnemonic)) + + if not in_function: + print(f"Error: {function_name} not found") + return None + + for address, mnemonic in reversed(function_instructions): + if mnemonic == "ret": + return f"0x{address}" + + print(f"Warning: No 'ret' instruction found for {function_name}") + return None + + except IOError as e: + print(f"Error reading file: {e}") + return None + + def get_marker_addresses(self, marker_name): + return [ + self.get_function_start_address(marker_name + "_START"), + self.get_function_start_address(marker_name + "_END"), + ] diff --git a/sw/host/penetrationtests/python/util/gdb_controller.py b/sw/host/penetrationtests/python/util/gdb_controller.py new file mode 100644 index 0000000000000..6bfcf138b72b2 --- /dev/null +++ b/sw/host/penetrationtests/python/util/gdb_controller.py @@ -0,0 +1,207 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +from subprocess import PIPE, Popen, TimeoutExpired +import select +import os +import time +import re +import signal + + +class GDBController: + def __init__(self, gdb_path, gdb_port=3333, remote_host="localhost", elf_file=None): + self.remote_host = remote_host + self.gdb_port = gdb_port + self.gdb_path = gdb_path + gdb_command = [ + gdb_path, + # "--interpreter=mi", + "-ex", + f"target remote {remote_host}:{gdb_port}", + ] + if elf_file: + gdb_command.append(elf_file) + + try: + self.gdb_process = Popen(gdb_command, stdin=PIPE, stdout=PIPE, stderr=PIPE, bufsize=0) + + # Flush the output from GDB + self.dump_output() + # Start clean + self.send_command("delete breakpoints", timeout=10) + # Need to flush again from the breakpoints + self.dump_output() + + # Set number of breakpoints + self.n_brkp = 1 + except Exception: + self.close_gdb() + raise + + def read_output(self, print_errors=False, timeout=0.05): + if not self.gdb_process: + return "" + + output = "" + + readable_pipes = [] + if self.gdb_process.stdout: + readable_pipes.append(self.gdb_process.stdout.fileno()) + if self.gdb_process.stderr: + readable_pipes.append(self.gdb_process.stderr.fileno()) + + try: + readable, _, _ = select.select(readable_pipes, [], [], timeout) + + for fd in readable: + if fd == self.gdb_process.stdout.fileno(): + data = os.read(fd, 4096).decode("utf-8", errors="ignore") + output += data + elif fd == self.gdb_process.stderr.fileno(): + # GDB uses stderr for logging/errors + err_data = os.read(fd, 4096).decode("utf-8", errors="ignore") + if print_errors: + print(f"GDB Stderr: {err_data}") + except Exception as e: + print(f"Error reading GDB output: {e}") + + return output + + def dump_output(self, timeout=0.05): + self.read_output(timeout=timeout) + + def send_command(self, mi_command, timeout=0.05, check_response=True): + if not self.gdb_process or not self.gdb_process.stdin: + raise RuntimeError("GDB process not started or stdin not available.") + + command_line = mi_command.strip() + "\n" + + self.gdb_process.stdin.write(command_line.encode("utf-8")) + # Aria: After sending the command let's wait for a while till the command is + # processed on the receiving end + time.sleep(0.1) + + self.gdb_process.stdin.flush() + + if check_response: + start_time = time.time() + response = "" + while True: + response += self.read_output() + + if response.strip().endswith("(gdb)") or ( + "^done" in response and response.strip().endswith("=") + ): + break + + if time.time() - start_time > timeout: + raise TimeoutError( + f"GDB timed out after {timeout}s. Current output: {response}, {mi_command}" + ) + + # To debug you can print this output to see GDB's response + return response + else: + return None + + def reset_target(self, reset_delay=0.005): + self.send_command("monitor reset run", check_response=False) + time.sleep(reset_delay) + self.dump_output() + + def close_gdb(self): + if not self.gdb_process or self.gdb_process.poll() is not None: + return + + self.dump_output() + self.gdb_process.send_signal(signal.SIGINT) + try: + self.gdb_process.communicate(timeout=1) + except TimeoutExpired: + self.gdb_process.kill() + self.gdb_process.communicate() + finally: + self.gdb_process = None + + def get_program_counter(self): + gdb_command = "p $pc" + + try: + response = self.send_command(gdb_command, timeout=0.5) + pc_pattern = re.compile(r"0x([0-9a-fA-F]+)") + match = pc_pattern.search(response) + if match: + return "0x" + match.group(1).strip() + if "No symbol " in response or "Undefined command" in response: + raise RuntimeError(f"GDB returned an error: {response}") + + except Exception: + return None + + def setup_pc_trace(self, file_name, trace_start_addr, trace_end_addr): + self.n_brkp = 1 + self.send_command(f"set logging file {file_name}") + self.send_command("set logging overwrite on") + self.send_command("set pagination off") + self.send_command("set logging enabled on") + traceloop_definition = f"""\ + define traceloop + while 1 + if $pc=={trace_end_addr} + printf "PC trace complete.\\n" + return + end + printf "PC: 0x%x\\n", $pc + stepi + end + end + """ + self.send_command(traceloop_definition) + self.send_command(f"tb *({trace_start_addr})") + commands_definition = "commands 1\ntraceloop\nend" + self.send_command(commands_definition) + self.n_brkp += 1 + + def parse_pc_trace_file(self, file_path): + pc_list = [] + pc_pattern = re.compile(r"PC: (0x[0-9a-fA-F]+)") + + try: + with open(file_path, "r") as f: + for line in f: + match = pc_pattern.search(line) + if match: + pc_list.append(match.group(1)) + except FileNotFoundError: + print(f"Error: Trace file not found at {file_path}") + except Exception as e: + print(f"Error reading or parsing trace file: {e}") + + return pc_list + + def apply_instruction_skip(self, pc_address, next_pc_address, count): + skip_commands = f"commands {self.n_brkp}\n" + skip_commands += f"set $pc={next_pc_address}\n" + skip_commands += 'printf "instruction skip applied\\n"\n' + skip_commands += "c\n" + skip_commands += "end" + + self.send_command(f"tb *({pc_address})") + if count > 1: + ignore_amount = count - 1 + self.send_command(f"ignore {self.n_brkp} {ignore_amount}") + self.send_command(skip_commands) + self.n_brkp += 1 + + def add_observation(self, observations): + for addr, log_message in observations.items(): + obs_command = f"commands {self.n_brkp}\n" + obs_command += f'printf "fisim_result: {log_message} \\n"\n' + obs_command += "c\n" + obs_command += "end" + + self.send_command(f"tb *({addr})") + self.send_command(obs_command) + self.n_brkp += 1 diff --git a/sw/host/penetrationtests/python/util/hyperdebug.py b/sw/host/penetrationtests/python/util/hyperdebug.py index c286915a9616e..6acb322bac4d3 100644 --- a/sw/host/penetrationtests/python/util/hyperdebug.py +++ b/sw/host/penetrationtests/python/util/hyperdebug.py @@ -3,84 +3,85 @@ # SPDX-License-Identifier: Apache-2.0 import time -from subprocess import PIPE, Popen -from typing import Optional - +from subprocess import Popen, run, PIPE, CalledProcessError, TimeoutExpired +import select +import signal import serial +import sys +import os +import socket from serial.tools.list_ports import comports -class HyperDebug(): +class HyperDebug: """Class for the FPGA or Chip connected via a hyperdebug. Initializes OpenTitan with the provided firmware & provides helper functions. """ - def __init__(self, opentitantool, fw_bin, bitstream, tool_args): + + def __init__( + self, + opentitantool, + fw_bin, + bitstream, + tool_args, + openocd, + openocd_chip_config, + openocd_design_config, + ): self.opentitantool = opentitantool self.fw_bin = fw_bin self.bitstream = bitstream self.tool_args = tool_args + self.openocd = openocd + self.openocd_chip_config = openocd_chip_config + self.openocd_design_config = openocd_design_config + self.openocd_process = None - def initialize_target(self): + def initialize_target(self, print_output=True): # Programming the bitstream via the opentitantool seems to block # communication, programming it twice seems to solve the problem - self.program_bitstream(self.bitstream) - self.program_bitstream(self.bitstream) - - self.flash_target(self.fw_bin) - - def program_bitstream(self, bitstream, program_delay=2): - if bitstream: - bitstream_process = Popen( - [ - self.opentitantool, - ] + - self.tool_args + - [ - "--exec", - "transport init", - "--exec", - "fpga load-bitstream " + bitstream, - "no-op", - ], - stderr=PIPE, - ) - stderr = bitstream_process.communicate() - rc = bitstream_process.returncode - if rc != 0: - raise RuntimeError("Error: Failed to program the bitstream.") - return 0 - else: - stderr_str = stderr[1].decode('utf-8') - if "Skip loading bitstream" not in stderr_str: - time.sleep(program_delay) + self.program_bitstream(self.bitstream, print_output=print_output) + self.program_bitstream(self.bitstream, print_output=print_output) + + self.flash_target(self.fw_bin, print_output=print_output) + + if self.openocd: + self.start_openocd(print_output=print_output) + + def program_bitstream(self, bitstream, program_delay=2, print_output=True): + if not bitstream: + return + + command = ( + [self.opentitantool] + + self.tool_args + + ["--exec", "transport init", "--exec", f"fpga load-bitstream {bitstream}", "no-op"] + ) + try: + result = run(command, check=True, capture_output=True, text=True) + if "Skip loading bitstream" not in result.stderr: + time.sleep(program_delay) + if print_output: print(f"Info: FPGA programmed with {bitstream}.") - return 1 - - def flash_target(self, firmware, boot_delay=2): - flash_process = Popen( - [ - self.opentitantool, - ] + - self.tool_args + - [ - "--exec", - "transport init", - "--exec", - "bootstrap " + firmware, - "no-op", - ], - stdout=PIPE, stderr=PIPE + except CalledProcessError as e: + print(f"Error: Failed to program the bitstream.\nStderr: {e.stderr}", file=sys.stderr) + raise + + def flash_target(self, firmware, boot_delay=2, print_output=True): + command = ( + [self.opentitantool] + + self.tool_args + + ["--exec", "transport init", "--exec", f"bootstrap {firmware}", "no-op"] ) - flash_process.communicate() - rc = flash_process.returncode - if rc != 0: - raise RuntimeError("Error: Failed to flash chip.") - return 0 - else: - # Wait until chip finished booting. + try: + run(command, check=True, capture_output=True) + # Wait until chip finishes booting. time.sleep(boot_delay) - print(f"Info: Chip flashed with {firmware}.") - return 1 + if print_output: + print(f"Info: Chip flashed with {firmware}.") + except CalledProcessError as e: + print(f"Error: Failed to flash chip.\nStderr: {e.stderr}", file=sys.stderr) + raise def init_communication(self, port, baudrate): """Open the communication interface. @@ -96,6 +97,117 @@ def init_communication(self, port, baudrate): com_interface.timeout = 1 return com_interface + def start_openocd(self, startup_delay=4, print_output=True): + self.close_openocd() + # We set up OpenOCD with the following default ports + # 6666 for tcl connections + # 4444 for telnet connections + # 3333 for gdb connections + # You can adapt those ports, e.g., via adding the config: -c "telnet_port 4444" + OPENOCD_COMMANDS = "adapter speed 500; transport select jtag; reset_config trst_only" + + command = [ + self.openocd, + "-f", + self.openocd_chip_config, + "-c", + OPENOCD_COMMANDS, + "-f", + self.openocd_design_config, + ] + + if print_output: + print("Starting OpenOCD", flush=True) + + self.openocd_process = Popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE) + + # OpenOCD provides its status data via the error output + openocd_stderr = self.openocd_process.stderr.fileno() + os.set_blocking(openocd_stderr, False) + # Wait until the openocd is set up (this process will remain in the background) + time.sleep(startup_delay) + readable, _, _ = select.select([openocd_stderr], [], [], 0.1) + if readable: + try: + data_after_wait = os.read(openocd_stderr, 1024 * 1024).decode( + "utf-8", errors="ignore" + ) + if print_output: + print(data_after_wait, flush=True) + if "Error:" in data_after_wait: + print("Error detected in starting openocd") + + except BlockingIOError: + print("Error reading the openocd output") + pass + + def close_openocd(self): + if not self.openocd_process or self.openocd_process.poll() is not None: + return + + self.openocd_process.send_signal(signal.SIGINT) + try: + self.openocd_process.communicate(timeout=10) + except TimeoutExpired: + self.openocd_process.kill() + self.openocd_process.communicate() + finally: + self.openocd_process = None + + def read_openocd(self): + if self.openocd_process: + openocd_stderr = self.openocd_process.stderr.fileno() + readable, _, _ = select.select([openocd_stderr], [], [], 0.1) + if readable: + try: + data = os.read(openocd_stderr, 1024 * 1024).decode("utf-8", errors="ignore") + return data + + except BlockingIOError: + print("Error reading the OpenOCD output") + pass + return None + + def send_openocd_command(self, command, timeout=1.0, port=6666): + HOST = "127.0.0.1" + + try: + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(timeout) + s.connect((HOST, port)) + except socket.error as e: + print(f"Error connecting to OpenOCD on port {port}: {e}", flush=True) + return f"ERROR: Connection failed: {e}" + + full_command = f"{command}\x1a" + try: + s.sendall(full_command.encode("utf-8")) + + response = b"" + while True: + chunk = s.recv(4096) + if not chunk: + break + response += chunk + if b"\x00" in chunk: + break + + except socket.timeout: + print(f"Warning: Command '{command}' timed out after {timeout}s.", flush=True) + except socket.error as e: + print(f"Error sending/receiving data: {e}", flush=True) + s.close() + return f"ERROR: Send/Receive failed: {e}" + + s.close() + + decoded_response = response.decode("utf-8", errors="ignore").strip() + + if decoded_response.startswith(command): + decoded_response = decoded_response[len(command):].strip() + + return decoded_response.strip() + def find_target_port(self): for port in comports(): if "UART2" in port.description and "HyperDebug" in port.description: @@ -103,14 +215,12 @@ def find_target_port(self): print("Target not found!") return None - def reset_target(self, com_reset: Optional[bool] = False, reset_delay=0.005): + def reset_target(self, com_reset=False, reset_delay=0.005): """Resets the target.""" - reset_process = Popen( - [ - self.opentitantool, - ] + - self.tool_args + - [ + reset_process = ( + [self.opentitantool] + + self.tool_args + + [ "--exec", "transport init", "--exec", @@ -118,14 +228,13 @@ def reset_target(self, com_reset: Optional[bool] = False, reset_delay=0.005): "--exec", "gpio write RESET true", "no-op", - ], - stdout=PIPE, stderr=PIPE + ] ) - reset_process.communicate() - rc = reset_process.returncode - if rc != 0: - raise RuntimeError("Error: Failed to reset chip.") - else: + try: + run(reset_process, check=True, capture_output=True, text=True) time.sleep(reset_delay) + except CalledProcessError: + print("Error: Failed to reset chip.") + raise if com_reset: self.com_interface = self.init_communication(None, 115200) diff --git a/sw/host/penetrationtests/python/util/targets.py b/sw/host/penetrationtests/python/util/targets.py index 72cb70aad6d2d..ae0b8292ea3bf 100644 --- a/sw/host/penetrationtests/python/util/targets.py +++ b/sw/host/penetrationtests/python/util/targets.py @@ -11,9 +11,10 @@ @dataclass class TargetConfig: - """ Target configuration. + """Target configuration. Stores information about the target. """ + target_type: str fw_bin: str interface_type: Optional[str] = None @@ -26,6 +27,9 @@ class TargetConfig: opentitantool: Optional[str] = None usb_serial: Optional[str] = None husky_serial: Optional[str] = None + openocd: Optional[str] = None + openocd_chip_config: Optional[str] = (None,) + openocd_design_config: Optional[str] = None class Target: @@ -43,7 +47,6 @@ class Target: pacing = 10 / baudrate def __init__(self, target_cfg: TargetConfig): - self.target_cfg = target_cfg self.target = None @@ -53,18 +56,21 @@ def __init__(self, target_cfg: TargetConfig): target_cfg.opentitantool, target_cfg.fw_bin, target_cfg.bitstream, - target_cfg.tool_args + target_cfg.tool_args, + target_cfg.openocd, + target_cfg.openocd_chip_config, + target_cfg.openocd_design_config, ) self.com_interface = self.target.init_communication(target_cfg.port, self.baudrate) - def initialize_target(self): - self.target.initialize_target() + def initialize_target(self, print_output=True): + self.target.initialize_target(print_output=print_output) # Clear the UART self.dump_all() - def reset_target(self): - self.target.reset_target() + def reset_target(self, reset_delay=0.005): + self.target.reset_target(reset_delay=reset_delay) def write(self, data): """Write data to the target.""" @@ -84,11 +90,14 @@ def readline(self): def print_all(self, max_tries=50): it = 0 while it != max_tries: - read_line = str(self.readline().decode().strip()) - if len(read_line) > 0: - print(read_line, flush=True) - else: - break + try: + read_line = str(self.readline().decode().strip()) + if len(read_line) > 0: + print(read_line, flush=True) + else: + break + except UnicodeDecodeError: + pass it += 1 def dump_all(self, max_tries=50): @@ -166,3 +175,19 @@ def read_response(self, max_tries: Optional[int] = 50): break it += 1 return "" + + def start_openocd(self): + if self.target_cfg.openocd: + self.target.start_openocd() + + def read_openocd(self): + if self.target_cfg.openocd: + return self.target.read_openocd() + + def close_openocd(self): + if self.target_cfg.openocd: + self.target.close_openocd() + + def send_openocd_command(self, command): + if self.target_cfg.openocd: + return self.target.send_openocd_command(command)