Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

[![Linting](https://github.com/TourmalineCore/c2pie/actions/workflows/lint-on-pull-request.yml/badge.svg?branch=develop)](https://github.com/TourmalineCore/c2pie/actions/workflows/lint-on-pull-request.yml)
[![c2pa](https://img.shields.io/badge/c2pa-v1.4-seagreen.svg)](https://c2pa.org/)
[![coverage](https://img.shields.io/badge/e2e_coverage-71.13%25-yellow)](https://github.com/TourmalineCore/c2pie/actions/workflows/calculate-tests-coverage-on-pull-request.yml)
[![coverage](https://img.shields.io/badge/units_coverage-79.65%25-yellow)](https://github.com/TourmalineCore/c2pie/actions/workflows/calculate-tests-coverage-on-pull-request.yml)
[![coverage](https://img.shields.io/badge/full_coverage-91.80%25-forestgreen)](https://github.com/TourmalineCore/c2pie/actions/workflows/calculate-tests-coverage-on-pull-request.yml)
[![coverage](https://img.shields.io/badge/e2e_coverage-71.18%25-yellow)](https://github.com/TourmalineCore/c2pie/actions/workflows/calculate-tests-coverage-on-pull-request.yml)
[![coverage](https://img.shields.io/badge/units_coverage-85.02%25-olivedrab)](https://github.com/TourmalineCore/c2pie/actions/workflows/calculate-tests-coverage-on-pull-request.yml)
[![coverage](https://img.shields.io/badge/full_coverage-92.15%25-forestgreen)](https://github.com/TourmalineCore/c2pie/actions/workflows/calculate-tests-coverage-on-pull-request.yml)
[![latest](https://img.shields.io/pypi/v/c2pie?label=latest&colorB=fc8021)](https://pypi.org/project/c2pie/)

<br>
Expand Down
78 changes: 51 additions & 27 deletions c2pie/c2pa/assertion.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ def __init__(
if not content_boxes:
payload = self.get_payload_from_schema()
box_type_hex = get_assertion_content_box_type(self.type)
content_boxes = [ContentBox(box_type=box_type_hex, payload=payload)]
content_boxes = [
ContentBox(
box_type=box_type_hex,
payload=payload,
)
]

super().__init__(
content_type=get_assertion_content_type(self.type),
Expand All @@ -64,56 +69,75 @@ class HashDataAssertion(Assertion):

def __init__(
self,
cai_offset: int,
hashed_data: bytes,
additional_exclusions: list[dict[str, int]] | None = None,
):
exclusions: list[dict[str, int]] = [
{
"start": cai_offset,
"length": 65535,
},
]
exclusions: list[dict[str, int]] = []

if additional_exclusions:
exclusions.extend(additional_exclusions)

schema: dict[str, Any] = {
"name": "jumbf manifest",
"exclusions": exclusions,
"alg": "sha256",
"hash": hashed_data,
"pad": [],
"pad": b""
+ b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
+ b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
+ b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
+ b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
}
super().__init__(C2PA_AssertionTypes.data_hash, schema)

def set_hash_data_length(
super().__init__(
C2PA_AssertionTypes.data_hash,
schema,
)

def add_full_c2pa_structure_exclusion(
self,
offset: int,
length: int,
) -> None:
if self.schema.get("name") != "jumbf manifest":
raise ValueError("c2pa.hash.data: jumbf manifest is missing")
exclusions = self.schema["exclusions"]
previous_exclusion_lenght = len(cbor_to_bytes(exclusions))

self.schema["exclusions"].extend(
[
{
"start": offset,
"length": length,
},
]
)

current_exclusion_lenght = len(cbor_to_bytes(exclusions))

difference = current_exclusion_lenght - previous_exclusion_lenght

exclusions = self.schema.get("exclusions", [])
"""
Important! If the Data Hash Assertion is less than 24 bytes or greater than
255 bytes, the size of the cbor header will change during conversion to cbor
and will occupy less than 2 bytes or more than 2 bytes, correspondingly.
"""

if not exclusions:
raise ValueError("c2pa.hash.data: exclusions are missing")
schema_length = len(cbor_to_bytes(self.schema))
additional_byte = 0 # 2 byte CBOR header case

exclusions[0]["length"] = int(length)
if schema_length < 24: # 1 byte CBOR header case
additional_byte = -1
elif schema_length > 255: # 3 byte CBOR header case
additional_byte = 1

self.schema["pad"] = self.schema["pad"][difference + additional_byte :]

payload = self.get_payload_from_schema()
if self.content_boxes:
self.content_boxes[0] = ContentBox(

self.content_boxes = [
ContentBox(
box_type=get_assertion_content_box_type(self.type),
payload=payload,
)
else:
self.content_boxes = [
ContentBox(
box_type=get_assertion_content_box_type(self.type),
payload=payload,
)
]
]

self.sync_payload()

Expand Down
9 changes: 7 additions & 2 deletions c2pie/c2pa/assertion_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@ def __init__(
def get_assertions(self) -> list:
return self.assertions

def set_hash_data_length(
def add_full_c2pa_structure_exclusion(
self,
offset: int,
length: int,
) -> None:
for assertion in self.assertions:
if assertion.type == C2PA_AssertionTypes.data_hash:
assertion.set_hash_data_length(length)
assertion.add_full_c2pa_structure_exclusion(
offset,
length,
)

self.sync_payload()
41 changes: 40 additions & 1 deletion c2pie/c2pa/claim_signature.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ def __init__(
self.require_tsa = require_tsa
self.tsa_log_dir = tsa_log_dir

self.serialized_length = 0

content_boxes = self._generate_payload()

super().__init__(
Expand Down Expand Up @@ -129,10 +131,45 @@ def _generate_unprotected_header(self, serialized_sig_structure: bytes) -> bytes
},
],
},
"pad": b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
}

return unprotected_header

def serialize_cose_sign1_tagged_with_alignment(
self,
cose_sign1: list,
) -> bytes:
"""
Takes a COSE_Sign1 as an array containing protected_header, unprotected_header,
payload, and signature, and returns a serialized COSE_Sign1_Tagged structure.
"""
cose_sign1_tagged_cbor = cbor2.dumps(
cbor2.CBORTag(18, cose_sign1),
canonical=True,
)

"""
The length of a TSA token can be variable. To ensure that a new token does not exceed
the exclusion boundary for the C2PA structure, we need to align the length of
the Claim Signature using the pad field, similar to the Data Hash Assertion.
"""
if self.serialized_length == 0:
self.serialized_length = len(cose_sign1_tagged_cbor)
elif self.serialized_length != len(cose_sign1_tagged_cbor):
difference = self.serialized_length - len(cose_sign1_tagged_cbor)

if difference > len(cose_sign1[1]["pad"]):
raise ValueError("Difference in length exceeds the predefined pad")

cose_sign1[1]["pad"] = b"\x00" * (len(cose_sign1[1]["pad"]) + difference)
cose_sign1_tagged_cbor = cbor2.dumps(
cbor2.CBORTag(18, cose_sign1),
canonical=True,
)

return cose_sign1_tagged_cbor

def _create_cose_sign1_tagged(self) -> bytes:
"""
COSE_Sign1 = [
Expand Down Expand Up @@ -179,4 +216,6 @@ def _create_cose_sign1_tagged(self) -> bytes:

cose_sign1 = [serialized_protected_header, unprotected_header, None, signature]

return cbor2.dumps(cbor2.CBORTag(18, cose_sign1), canonical=True)
cose_sign1_tagged_cbor = self.serialize_cose_sign1_tagged_with_alignment(cose_sign1)

return cose_sign1_tagged_cbor
8 changes: 6 additions & 2 deletions c2pie/c2pa/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,20 @@ def get_assertions(self):
return self.assertion_store.get_assertions()
return

def set_hash_data_length(
def add_full_c2pa_structure_exclusion(
self,
offset: int,
length: int,
):
"""
Updates the length of exceptions in HashData, reassembles Claim (assertion hashes)
and ClaimSignature (COSE Sign1 detached over Claim CBOR).
"""
if self.assertion_store and self.claim and self.claim_signature:
self.assertion_store.set_hash_data_length(length)
self.assertion_store.add_full_c2pa_structure_exclusion(
offset,
length,
)
self.claim.set_assertion_store(self.assertion_store)
self.claim_signature.set_claim(self.claim)

Expand Down
8 changes: 6 additions & 2 deletions c2pie/c2pa/manifest_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@ def __init__(
def sync_payload(self):
super().sync_payload()

def set_hash_data_length_for_all(
def add_full_c2pa_structure_exclusion(
self,
offset: int,
length: int,
) -> None:
self.manifests[-1].set_hash_data_length(length)
self.manifests[-1].add_full_c2pa_structure_exclusion(
offset,
length,
)

super().sync_payload()

Expand Down
Loading