Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: Lint

on:
pull_request:

jobs:
ruff:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: astral-sh/ruff-action@v3
with:
args: check tee_gateway
- uses: astral-sh/ruff-action@v3
with:
args: format --check tee_gateway

mypy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.12"
cache: pip
- run: pip install -r requirements.txt mypy
- run: mypy tee_gateway
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ test-local:
# export OPENAI_API_KEY=... ANTHROPIC_API_KEY=... etc.
python3 -m tee_gateway

.PHONY: mypy
mypy:
python3 -m mypy tee_gateway

.PHONY: help
help:
@echo "Available targets:"
Expand All @@ -99,6 +103,7 @@ help:
@echo " make get-tls-cert - Print the nitriding TLS certificate"
@echo ""
@echo " make test-local - Run server locally without TEE (development)"
@echo " make mypy - Run mypy type checker on tee_gateway"
@echo ""
@echo " LLM endpoints (/v1/chat/completions, /v1/completions) require x402"
@echo " payment headers. Use an x402-compatible client to call them."
130 changes: 73 additions & 57 deletions examples/verify_attestation.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,57 +17,56 @@
nonce = "0123456789abcdef0123456789abcdef01234567"

logging.basicConfig(
filename='verification_logs.log',
filename="verification_logs.log",
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)

PCR_tuple = namedtuple("PCRs", ["PCR0", "PCR1", "PCR2"])

# This library is based on richardfan1126: nitro-enclave-python-demo

# This library is based on richardfan1126: nitro-enclave-python-demo
# (https://github.com/richardfan1126/nitro-enclave-python-demo/tree/master)
def get_pcrs() -> PCR_tuple:
"""
Gets expected PCR values from enclave measurements JSON, returns a tuple of the PCR values.
"""
with open(measurement_path, 'r') as file:
with open(measurement_path, "r") as file:
json_measurement_data = file.read()

try:
measurement_data = json.loads(json_measurement_data)
PCRs = PCR_tuple(measurement_data["Measurements"]["PCR0"],
measurement_data["Measurements"]["PCR1"],
measurement_data["Measurements"]["PCR2"])
logging.debug("Given PCR measurements:\n"
"PCR0 %s\n"
"PCR1 %s\n"
"PCR2 %s\n",
PCRs.PCR0,
PCRs.PCR1,
PCRs.PCR2)
PCRs = PCR_tuple(
measurement_data["Measurements"]["PCR0"],
measurement_data["Measurements"]["PCR1"],
measurement_data["Measurements"]["PCR2"],
)
logging.debug(
"Given PCR measurements:\nPCR0 %s\nPCR1 %s\nPCR2 %s\n",
PCRs.PCR0,
PCRs.PCR1,
PCRs.PCR2,
)
except json.JSONDecodeError as e:
raise ValueError("Error reading measurement file for PCRs: %s" % e)

return PCRs


def get_root_cert_pem() -> str:
with open(root_cert_path, 'r') as file:
with open(root_cert_path, "r") as file:
return file.read()


def get_attestation(url: str, nonce: str) -> str:
# Construct curl command
curl_command = [
"curl",
"-k",
"-G",
url,
"--data-urlencode",
f"nonce={nonce}"
]
curl_command = ["curl", "-k", "-G", url, "--data-urlencode", f"nonce={nonce}"]

# Run the curl command and capture the output
result = subprocess.run(curl_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
result = subprocess.run(
curl_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)

# Check if the command was successful
if result.returncode != 0:
Expand All @@ -75,11 +74,12 @@
return None

# Return the output of the curl command
if result.stdout == None:

Check failure on line 77 in examples/verify_attestation.py

View workflow job for this annotation

GitHub Actions / ruff

ruff (E711)

examples/verify_attestation.py:77:25: E711 Comparison to `None` should be `cond is None` help: Replace with `cond is None`
print("Curl command result was None")

return result.stdout


def verify_attestation_doc(attestation_string: str) -> None:
"""
Verify the attestation document
Expand All @@ -104,17 +104,19 @@
logging.debug("Loaded an attestation document")

# Expose timestamp
timestamp_ms = doc_obj['timestamp']
timestamp_ms = doc_obj["timestamp"]
timestamp_s = timestamp_ms / 1000
logging.info("Attestation document timestamp (ms): %d", timestamp_ms)
logging.info("Attestation document timestamp (UTC): %s",
__import__('datetime').datetime.utcfromtimestamp(timestamp_s).isoformat())
logging.info(
"Attestation document timestamp (UTC): %s",
__import__("datetime").datetime.utcfromtimestamp(timestamp_s).isoformat(),
)

# Get PCRs from attestation document
document_pcrs_arr = doc_obj['pcrs']
document_pcrs_arr = doc_obj["pcrs"]

# Expose user data
user_data = doc_obj['user_data']
user_data = doc_obj["user_data"]
logging.debug("Enclave generated user data: %s", user_data)
prefix_length = 2 # Taken from Nitriding documentation
hash_length = hashlib.sha256().digest_size # 32 bytes for a SHA256 hash
Expand All @@ -129,14 +131,18 @@
app_key_end = app_key_start + hash_length
app_key_hash = user_data[app_key_start:app_key_end]

logging.info("Enclave returned TLS key: %s", base64.b64encode(tls_key_hash).decode('utf-8'))
logging.info("Enclave returned app key: %s", base64.b64encode(app_key_hash).decode('utf-8'))
logging.info(
"Enclave returned TLS key: %s", base64.b64encode(tls_key_hash).decode("utf-8")
)
logging.info(
"Enclave returned app key: %s", base64.b64encode(app_key_hash).decode("utf-8")
)

# Expose public key
# TODO (kyle): Write API to expose public key to inference node
# This will be needed for the sequencer to encrypt
# input data.
public_key = doc_obj['public_key']
public_key = doc_obj["public_key"]
logging.debug("Enclave generated public key: %s", public_key)

## Validating Attestation document ##
Expand All @@ -149,55 +155,62 @@

# Get PCR hexcode
doc_pcr = document_pcrs_arr[index].hex()
logging.debug("PCR%s:\n"
"Attestation value: %s\n"
"Expected PCR: %s",
index,
doc_pcr,
expected_pcr)
logging.debug(
"PCR%s:\nAttestation value: %s\nExpected PCR: %s",
index,
doc_pcr,
expected_pcr,
)

# Check if PCR match
if expected_pcr != doc_pcr:
logging.warn("PCRs do not match:\n"
"Attestation PCR%s: %s\n"
"Expected PCR%s: %s",
index, doc_pcr,
index, expected_pcr)
logging.warn(
"PCRs do not match:\nAttestation PCR%s: %s\nExpected PCR%s: %s",
index,
doc_pcr,
index,
expected_pcr,
)
raise Exception("PCR%s does not match" % index)

logging.debug("Validating nonce")
# Check that nonce matches
attestation_nonce = doc_obj['nonce'].hex()
attestation_nonce = doc_obj["nonce"].hex()
logging.info("Received nonce is %s", attestation_nonce)
logging.info("Given nonce is %s", nonce)
if attestation_nonce != nonce:
raise Exception(f"Attestation nonce: {attestation_nonce}, did not match given nonce: {nonce}")
raise Exception(
f"Attestation nonce: {attestation_nonce}, did not match given nonce: {nonce}"
)

# 2. Validate Signature
# 2. Validate Signature
logging.debug("Validating signature of attestation document")
# Get signing certificate from attestation document
logging.debug("Getting signing certificate from attestation document:\n %s", doc_obj['certificate'])
cert = crypto.load_certificate(crypto.FILETYPE_ASN1, doc_obj['certificate'])
logging.debug(
"Getting signing certificate from attestation document:\n %s",
doc_obj["certificate"],
)
cert = crypto.load_certificate(crypto.FILETYPE_ASN1, doc_obj["certificate"])

# Get the key parameters from the cert public key
logging.debug("Creating EC2 key from the signing certificates public key")
cert_public_numbers = cert.get_pubkey().to_cryptography_key().public_numbers()
x = cert_public_numbers.x
y = cert_public_numbers.y
curve = cert_public_numbers.curve

Check failure on line 200 in examples/verify_attestation.py

View workflow job for this annotation

GitHub Actions / ruff

ruff (F841)

examples/verify_attestation.py:200:5: F841 Local variable `curve` is assigned to but never used help: Remove assignment to unused variable `curve`

x = long_to_bytes(x)
y = long_to_bytes(y)

# Create the EC2 key from public key parameters
key = EC2(alg = CoseAlgorithms.ES384, x = x, y = y, crv = CoseEllipticCurves.P_384)
key = EC2(alg=CoseAlgorithms.ES384, x=x, y=y, crv=CoseEllipticCurves.P_384)

# Get the protected header from attestation document
phdr = cbor2.loads(data[0])

# Construct the Sign1 message
logging.debug("Constructing Sign1 message from the attestation document")
msg = cose.Sign1Message(phdr = phdr, uhdr = data[1], payload = doc)
msg = cose.Sign1Message(phdr=phdr, uhdr=data[1], payload=doc)
msg.signature = data[3]

# Verify the signature using the EC2 key
Expand All @@ -207,8 +220,10 @@
logging.debug("Signature of attestation document verified")

# 3. Validate signing certificate PKI
logging.debug("Verifying the certificate of the attestation document "
"is signed by the root certificate of the AWS Nitro Attestation PKI")
logging.debug(
"Verifying the certificate of the attestation document "
"is signed by the root certificate of the AWS Nitro Attestation PKI"
)
if root_cert_pem is not None:
# Create an X509Store object for the CA bundles
store = crypto.X509Store()
Expand All @@ -219,13 +234,13 @@

# Get the CA bundle from attestation document and store into X509Store
# Except the first certificate, which is the root certificate
for _cert_binary in doc_obj['cabundle'][1:]:
for _cert_binary in doc_obj["cabundle"][1:]:
_cert = crypto.load_certificate(crypto.FILETYPE_ASN1, _cert_binary)
store.add_cert(_cert)

# Get the X509Store context
store_ctx = crypto.X509StoreContext(store, cert)

# Validate the certificate
# If the cert is invalid, it will raise exception
store_ctx.verify_certificate()
Expand All @@ -234,10 +249,11 @@
print("Verification successful")
return


if __name__ == "__main__":
print("Starting verification")

attestation_str = get_attestation(enclave_url, nonce)
print("Attestation string:\n", attestation_str)

verify_attestation_doc(attestation_str)
verify_attestation_doc(attestation_str)
Loading
Loading