Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: store provenance asset info #975

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,437 changes: 722 additions & 715 deletions docs/source/assets/er-diagram.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 6 additions & 0 deletions src/macaron/database/table_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,12 @@ class Provenance(ORMBase):
#: The provenance payload.
provenance_payload: Mapped[InTotoPayload] = mapped_column(ProvenancePayload, nullable=False)

#: The name of the provenance asset.
provenance_asset_name: Mapped[str] = mapped_column(String, nullable=True)

#: The URL of the provenance asset.
provenance_asset_url: Mapped[str] = mapped_column(String, nullable=True)

#: The verified status of the provenance.
verified: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)

Expand Down
48 changes: 32 additions & 16 deletions src/macaron/provenance/provenance_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import os
import tempfile
from dataclasses import dataclass
from functools import partial

from packageurl import PackageURL
Expand All @@ -28,6 +29,15 @@
logger: logging.Logger = logging.getLogger(__name__)


@dataclass(frozen=True)
class ProvenanceAsset:
"""This class exists to hold a provenance payload with the original asset's name and URL."""

payload: InTotoPayload
name: str
url: str


class ProvenanceFinder:
"""This class is used to find and retrieve provenance files from supported registries."""

Expand All @@ -42,7 +52,7 @@ def __init__(self) -> None:
elif isinstance(registry, JFrogMavenRegistry):
self.jfrog_registry = registry

def find_provenance(self, purl: PackageURL) -> list[InTotoPayload]:
def find_provenance(self, purl: PackageURL) -> list[ProvenanceAsset]:
"""Find the provenance file(s) of the passed PURL.

Parameters
Expand All @@ -52,8 +62,8 @@ def find_provenance(self, purl: PackageURL) -> list[InTotoPayload]:

Returns
-------
list[InTotoPayload]
The provenance payload, or an empty list if not found.
list[ProvenanceAsset]
The provenance asset, or an empty list if not found.
"""
logger.debug("Seeking provenance of: %s", purl)

Expand Down Expand Up @@ -82,7 +92,7 @@ def find_provenance(self, purl: PackageURL) -> list[InTotoPayload]:
logger.debug("Provenance finding not supported for PURL type: %s", purl.type)
return []

def _find_provenance(self, discovery_functions: list[partial[list[InTotoPayload]]]) -> list[InTotoPayload]:
def _find_provenance(self, discovery_functions: list[partial[list[ProvenanceAsset]]]) -> list[ProvenanceAsset]:
"""Find the provenance file(s) using the passed discovery functions.

Parameters
Expand All @@ -93,7 +103,7 @@ def _find_provenance(self, discovery_functions: list[partial[list[InTotoPayload]
Returns
-------
list[InTotoPayload]
The provenance payload(s) from the first successful function, or an empty list if none were.
The provenance asset(s) from the first successful function, or an empty list if none were.
"""
if not discovery_functions:
return []
Expand All @@ -108,7 +118,7 @@ def _find_provenance(self, discovery_functions: list[partial[list[InTotoPayload]
return []


def find_npm_provenance(purl: PackageURL, registry: NPMRegistry) -> list[InTotoPayload]:
def find_npm_provenance(purl: PackageURL, registry: NPMRegistry) -> list[ProvenanceAsset]:
"""Find and download the NPM based provenance for the passed PURL.

Two kinds of attestation can be retrieved from npm: "Provenance" and "Publish". The "Provenance" attestation
Expand All @@ -125,8 +135,8 @@ def find_npm_provenance(purl: PackageURL, registry: NPMRegistry) -> list[InTotoP

Returns
-------
list[InTotoPayload]
The provenance payload(s), or an empty list if not found.
list[ProvenanceAsset]
The provenance asset(s), or an empty list if not found.
"""
if not registry.enabled:
logger.debug("The npm registry is not enabled.")
Expand Down Expand Up @@ -172,16 +182,19 @@ def find_npm_provenance(purl: PackageURL, registry: NPMRegistry) -> list[InTotoP
publish_payload = load_provenance_payload(signed_download_path)
except LoadIntotoAttestationError as error:
logger.error("Error while loading publish attestation: %s", error)
return [provenance_payload]
return [ProvenanceAsset(provenance_payload, npm_provenance_asset.name, npm_provenance_asset.url)]

return [provenance_payload, publish_payload]
return [
ProvenanceAsset(provenance_payload, npm_provenance_asset.name, npm_provenance_asset.url),
ProvenanceAsset(publish_payload, npm_provenance_asset.name, npm_provenance_asset.url),
]

except OSError as error:
logger.error("Error while storing provenance in the temporary directory: %s", error)
return []


def find_gav_provenance(purl: PackageURL, registry: JFrogMavenRegistry) -> list[InTotoPayload]:
def find_gav_provenance(purl: PackageURL, registry: JFrogMavenRegistry) -> list[ProvenanceAsset]:
"""Find and download the GAV based provenance for the passed PURL.

Parameters
Expand All @@ -193,8 +206,8 @@ def find_gav_provenance(purl: PackageURL, registry: JFrogMavenRegistry) -> list[

Returns
-------
list[InTotoPayload] | None
The provenance payload if found, or an empty list otherwise.
list[ProvenanceAsset] | None
The provenance asset if found, or an empty list otherwise.

Raises
------
Expand Down Expand Up @@ -263,7 +276,7 @@ def find_gav_provenance(purl: PackageURL, registry: JFrogMavenRegistry) -> list[
if not is_witness_provenance_payload(provenance_payload, witness_verifier_config.predicate_types):
continue

provenances.append(provenance_payload)
provenances.append(ProvenanceAsset(provenance_payload, provenance_asset.name, provenance_asset.url))
except OSError as error:
logger.error("Error while storing provenance in the temporary directory: %s", error)

Expand All @@ -277,7 +290,7 @@ def find_gav_provenance(purl: PackageURL, registry: JFrogMavenRegistry) -> list[

def find_provenance_from_ci(
analyze_ctx: AnalyzeContext, git_obj: Git | None, download_path: str
) -> InTotoPayload | None:
) -> ProvenanceAsset | None:
"""Try to find provenance from CI services of the repository.

Note that we stop going through the CI services once we encounter a CI service
Expand Down Expand Up @@ -372,7 +385,10 @@ def find_provenance_from_ci(
download_provenances_from_ci_service(ci_info, download_path)

# TODO consider how to handle multiple payloads here.
return ci_info["provenances"][0].payload if ci_info["provenances"] else None
if ci_info["provenances"]:
provenance = ci_info["provenances"][0]
return ProvenanceAsset(provenance.payload, provenance.asset.name, provenance.asset.url)
return None

else:
logger.debug("CI service not supported for provenance finding: %s", ci_service.name)
Expand Down
26 changes: 15 additions & 11 deletions src/macaron/provenance/provenance_verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from macaron.config.defaults import defaults
from macaron.config.global_config import global_config
from macaron.provenance.provenance_extractor import ProvenancePredicate, SLSAGithubGenericBuildDefinitionV01
from macaron.provenance.provenance_finder import ProvenanceAsset
from macaron.repo_finder.commit_finder import AbstractPurlType, determine_abstract_purl_type
from macaron.slsa_analyzer.analyze_context import AnalyzeContext
from macaron.slsa_analyzer.asset import AssetLocator
Expand All @@ -28,15 +29,15 @@
logger: logging.Logger = logging.getLogger(__name__)


def verify_provenance(purl: PackageURL, provenance: list[InTotoPayload]) -> bool:
def verify_provenance(purl: PackageURL, provenance_assets: list[ProvenanceAsset]) -> bool:
"""Verify the passed provenance.

Parameters
----------
purl: PackageURL
The PURL of the analysis target.
provenance: list[InTotoPayload]
The list of provenance.
provenance_assets: list[ProvenanceAsset]
The list of provenance assets.

Returns
-------
Expand All @@ -50,7 +51,7 @@ def verify_provenance(purl: PackageURL, provenance: list[InTotoPayload]) -> bool
verification_function = None

if purl.type == "npm":
verification_function = partial(verify_npm_provenance, purl, provenance)
verification_function = partial(verify_npm_provenance, purl, provenance_assets)

# TODO other verification functions go here.

Expand All @@ -61,30 +62,33 @@ def verify_provenance(purl: PackageURL, provenance: list[InTotoPayload]) -> bool
return False


def verify_npm_provenance(purl: PackageURL, provenance: list[InTotoPayload]) -> bool:
def verify_npm_provenance(purl: PackageURL, provenance_assets: list[ProvenanceAsset]) -> bool:
"""Compare the unsigned payload subject digest with the signed payload digest, if available.

Parameters
----------
purl: PackageURL
The PURL of the analysis target.
provenance: list[InTotoPayload]
The provenances to verify.
provenance_assets: list[ProvenanceAsset]
The provenance assets to verify.

Returns
-------
bool
True if the provenance was verified, or False otherwise.
"""
if len(provenance) != 2:
logger.debug("Expected unsigned and signed provenance.")
if len(provenance_assets) != 2:
logger.debug("Expected unsigned and signed provenance assets.")
return False

signed_subjects = provenance[1].statement.get("subject")
signed_provenance = provenance_assets[1].payload
unsigned_provenance = provenance_assets[0].payload

signed_subjects = signed_provenance.statement.get("subject")
if not signed_subjects:
return False

unsigned_subjects = provenance[0].statement.get("subject")
unsigned_subjects = unsigned_provenance.statement.get("subject")
if not unsigned_subjects:
return False

Expand Down
11 changes: 8 additions & 3 deletions src/macaron/slsa_analyzer/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,12 +354,14 @@ def run_single(
)

provenance_is_verified = False
provenance_asset = None
if not provenance_payload and parsed_purl:
# Try to find the provenance file for the parsed PURL.
provenance_finder = ProvenanceFinder()
provenances = provenance_finder.find_provenance(parsed_purl)
if provenances:
provenance_payload = provenances[0]
provenance_asset = provenances[0]
provenance_payload = provenance_asset.payload
if verify_provenance:
provenance_is_verified = provenance_verifier.verify_provenance(parsed_purl, provenances)

Expand Down Expand Up @@ -480,10 +482,11 @@ def run_single(
if not provenance_payload:
# Look for provenance using the CI.
with tempfile.TemporaryDirectory() as temp_dir:
provenance_payload = find_provenance_from_ci(analyze_ctx, git_obj, temp_dir)
provenance_asset = find_provenance_from_ci(analyze_ctx, git_obj, temp_dir)
# If found, validate analysis target against new provenance.
if provenance_payload:
if provenance_asset:
# If repository URL was not provided as input, check the one found during analysis.
provenance_payload = provenance_asset.payload
if not repo_path_input and component.repository:
repo_path_input = component.repository.remote_path
provenance_repo_url = provenance_commit_digest = None
Expand Down Expand Up @@ -528,6 +531,8 @@ def run_single(
provenance_payload=provenance_payload,
slsa_level=slsa_level,
slsa_version=slsa_version,
provenance_asset_name=provenance_asset.name if provenance_asset else None,
provenance_asset_url=provenance_asset.url if provenance_asset else None,
# TODO Add release tag, release digest.
)

Expand Down
25 changes: 19 additions & 6 deletions src/macaron/slsa_analyzer/checks/provenance_available_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,31 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData:
CheckResultData
The result of the check.
"""
available = (
ctx.dynamic_data["provenance_info"]
and ctx.dynamic_data["provenance_info"].provenance_payload
and not ctx.dynamic_data["is_inferred_prov"]
)
provenance_info = None
inferred = False
if ctx.dynamic_data["provenance_info"]:
provenance_info = ctx.dynamic_data["provenance_info"]
inferred = ctx.dynamic_data["is_inferred_prov"]

if not provenance_info or not provenance_info.provenance_payload or inferred:
return CheckResultData(
result_tables=[
ProvenanceAvailableFacts(
confidence=Confidence.HIGH,
)
],
result_type=CheckResultType.FAILED,
)

return CheckResultData(
result_tables=[
ProvenanceAvailableFacts(
confidence=Confidence.HIGH,
asset_name=provenance_info.provenance_asset_name,
asset_url=provenance_info.provenance_asset_url,
)
],
result_type=CheckResultType.PASSED if available else CheckResultType.FAILED,
result_type=CheckResultType.PASSED,
)


Expand Down
Loading