Skip to content

Add Live Evaluation API endpoint and PyPa live pipeline importer #1969

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions vulnerabilities/api_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
#


from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed

from django.db.models import Prefetch
from django_filters import rest_framework as filters
from drf_spectacular.utils import OpenApiParameter
Expand All @@ -25,6 +28,7 @@
from rest_framework.reverse import reverse
from rest_framework.throttling import AnonRateThrottle

from vulnerabilities.importers import LIVE_IMPORTERS_REGISTRY
from vulnerabilities.models import AdvisoryReference
from vulnerabilities.models import AdvisorySeverity
from vulnerabilities.models import AdvisoryV2
Expand Down Expand Up @@ -1293,3 +1297,83 @@ def lookup(self, request):
return Response(
AdvisoryPackageV2Serializer(qs, many=True, context={"request": request}).data
)


class LiveEvaluationSerializer(serializers.Serializer):
purl_string = serializers.CharField(help_text="PackageURL to evaluate")
no_threading = serializers.BooleanField(required=False, default=False)


class LiveEvaluationViewSet(viewsets.GenericViewSet):
serializer_class = LiveEvaluationSerializer

@extend_schema(
request=LiveEvaluationSerializer,
responses={
202: {"description": "Live evaluation done successfully"},
400: {"description": "Invalid request"},
500: {"description": "Internal server error"},
},
)
@action(detail=False, methods=["post"])
def evaluate(self, request):
serializer = self.get_serializer(data=request.data)
if not serializer.is_valid():
return Response(
serializer.errors,
status=status.HTTP_400_BAD_REQUEST,
)

purl_string = serializer.validated_data.get("purl_string")
no_threading = serializer.validated_data.get("no_threading", False)

try:
purl = PackageURL.from_string(purl_string) if purl_string else None
if not purl:
return Response({"error": "Invalid PackageURL"}, status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
return Response(
{"error": f"Invalid PackageURL: {str(e)}"}, status=status.HTTP_400_BAD_REQUEST
)

importers = [
importer
for importer in LIVE_IMPORTERS_REGISTRY.values()
if hasattr(importer, "supported_types")
and purl.type in getattr(importer, "supported_types", [])
]

if not importers:
return Response(
{"error": f"No live importers found for purl type '{purl.type}'"},
status=status.HTTP_400_BAD_REQUEST,
)

results = []

def run_importer(importer):
importer_name = getattr(importer, "pipeline_id", importer.__name__)
response_data = {"importer": importer_name, "purl": purl_string, "steps_completed": []}
try:
pipeline_instance = importer(purl=purl)
status_code, error = pipeline_instance.execute()
if status_code != 0:
response_data["error"] = f"Importer {importer_name} failed: {error}"
else:
response_data["steps_completed"].append("import")
except Exception as e:
response_data["error"] = f"Error running importer {importer_name}: {str(e)}"
return response_data

if not no_threading and len(importers) > 1:
with ThreadPoolExecutor(max_workers=len(importers)) as executor:
future_to_importer = {
executor.submit(run_importer, importer): importer for importer in importers
}
for future in as_completed(future_to_importer):
results.append(future.result())
else:
for importer in importers:
results.append(run_importer(importer))

return Response(results, status=status.HTTP_202_ACCEPTED)
7 changes: 7 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
from vulnerabilities.pipelines.v2_importers import oss_fuzz as oss_fuzz_v2
from vulnerabilities.pipelines.v2_importers import postgresql_importer as postgresql_importer_v2
from vulnerabilities.pipelines.v2_importers import pypa_importer as pypa_importer_v2
from vulnerabilities.pipelines.v2_importers import pypa_live_importer as pypa_live_importer_v2
from vulnerabilities.pipelines.v2_importers import pysec_importer as pysec_importer_v2
from vulnerabilities.pipelines.v2_importers import vulnrichment_importer as vulnrichment_importer_v2
from vulnerabilities.pipelines.v2_importers import xen_importer as xen_importer_v2
Expand Down Expand Up @@ -113,3 +114,9 @@
oss_fuzz.OSSFuzzImporter,
]
)

LIVE_IMPORTERS_REGISTRY = create_registry(
[
pypa_live_importer_v2.PyPaLiveImporterPipeline,
]
)
150 changes: 150 additions & 0 deletions vulnerabilities/pipelines/v2_importers/pypa_live_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#


from typing import Iterable

import requests
import saneyaml
from packageurl import PackageURL
from univers.versions import PypiVersion

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2


class PyPaLiveImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
"""
Pypa Live Importer Pipeline

Collect advisories from PyPA GitHub repository for a single PURL.
"""

pipeline_id = "pypa_live_importer_v2"
supported_types = ["pypi"]
spdx_license_expression = "CC-BY-4.0"
license_url = "https://github.com/pypa/advisory-database/blob/main/LICENSE"

@classmethod
def steps(cls):
return (
cls.get_purl_inputs,
cls.fetch_package_advisories,
cls.collect_and_store_advisories,
)

def get_purl_inputs(self):
purl = self.inputs["purl"]
if not purl:
raise ValueError("PURL is required for PyPaLiveImporterPipeline")

if isinstance(purl, str):
purl = PackageURL.from_string(purl)

if not isinstance(purl, PackageURL):
raise ValueError(f"Object of type {type(purl)} {purl!r} is not a PackageURL instance")

if purl.type not in self.supported_types:
raise ValueError(
f"PURL: {purl!s} is not among the supported package types {self.supported_types!r}"
)

if not purl.version:
raise ValueError(f"PURL: {purl!s} is expected to have a version")

self.purl = purl

def _is_version_affected(self, advisory_dict, version):
affected = advisory_dict.get("affected", [])
try:
v = PypiVersion(version)
except Exception:
return False
for entry in affected:
ranges = entry.get("ranges", [])
for r in ranges:
events = r.get("events", [])
introduced = None
fixed = None
for event in events:
if "introduced" in event:
introduced = event["introduced"]
if "fixed" in event:
fixed = event["fixed"]
try:
if introduced:
introduced_v = PypiVersion(introduced)
if v < introduced_v:
continue
if fixed:
fixed_v = PypiVersion(fixed)
if v >= fixed_v:
continue
if introduced:
introduced_v = PypiVersion(introduced)
if (not fixed or v < PypiVersion(fixed)) and v >= introduced_v:
return True
except Exception:
continue
return False

def fetch_package_advisories(self):
if not self.purl.type in self.supported_types:
return

search_path = f"vulns/{self.purl.name}"

self.package_advisories = []

api_url = f"https://api.github.com/repos/pypa/advisory-database/contents/{search_path}"
response = requests.get(api_url)

if response.status_code == 404:
self.log(f"No advisories found for package {self.purl.name}")
return

if response.status_code != 200:
self.log(f"Failed to fetch advisories: {response.status_code} {response.text}")
return

for item in response.json():
if item["type"] == "file" and item["name"].endswith(".yaml"):
file_url = item["download_url"]
self.log("Fetching advisory file: " + item["name"])
file_response = requests.get(file_url)

if file_response.status_code == 200:
advisory_text = file_response.text
advisory_dict = saneyaml.load(advisory_text)

if self.purl.version and not self._is_version_affected(
advisory_dict, self.purl.version
):
continue

self.package_advisories.append(
{"text": advisory_text, "dict": advisory_dict, "url": item["html_url"]}
)

def advisories_count(self):
return len(self.package_advisories) if hasattr(self, "package_advisories") else 0

def collect_advisories(self) -> Iterable[AdvisoryData]:
from vulnerabilities.importers.osv import parse_advisory_data_v2

if not hasattr(self, "package_advisories"):
return

for advisory in self.package_advisories:
yield parse_advisory_data_v2(
raw_data=advisory["dict"],
supported_ecosystems=self.supported_types,
advisory_url=advisory["url"],
advisory_text=advisory["text"],
)
Loading