diff --git a/README.md b/README.md index 5a79d5d..2d996e0 100644 --- a/README.md +++ b/README.md @@ -9,11 +9,13 @@ The supply-chain firewall is a command-line tool for preventing the installation The firewall collects all targets that would be installed by a given `pip` or `npm` command and checks them against reputable sources of data on open source malware and vulnerabilities. The command is automatically blocked when any data source finds that any target is malicious. In cases where a data source reports other findings for a target, the findings are presented to the user along with a prompt confirming intent to proceed with the installation. -Current data sources used are: +Default data sources include: - Datadog Security Research's public malicious packages [dataset](https://github.com/DataDog/malicious-software-packages-dataset) - [OSV.dev](https://osv.dev) disclosures +Users may also implement verifiers for alternative data sources. A template for implementating custom verifiers may be found in `examples/verifier.py`. Details may also be found in the API documentation. + The principal goal of the supply-chain firewall is to block 100% of installations of known-malicious packages within the purview of its data sources. ## Getting started @@ -72,6 +74,8 @@ The supply-chain firewall can optionally send logs of blocked and successful ins To opt in, set the environment variable `DD_API_KEY` to your Datadog API key, either directly in your shell environment or in a `.env` file in the current working directory. +Users may also implement custom loggers for use with the firewall. A template for implementating custom loggers may be found in `examples/logger.py`. Details may also be found in the API documentation. + ## Development To set up for testing and development, create a fresh `virtualenv`, activate it and run `make install-dev`. This will install `scfw` and the development dependencies. diff --git a/examples/logger.py b/examples/logger.py new file mode 100644 index 0000000..7780927 --- /dev/null +++ b/examples/logger.py @@ -0,0 +1,37 @@ +""" +Users of the supply chain firewall are able to use their own custom +loggers according to their own logging needs. This module contains a +template for writing such a custom logger. + +The firewall discovers loggers at runtime via the following simple protocol. +The module implementing the custom logger must contain a function with the +following name and signature: + +``` +def load_logger() -> FirewallLogger +``` + +This `load_logger` function should return an instance of the custom logger +for the firewall's use. The module may then be placed in the `scfw/loggers` +directory for runtime import, no further modification required. Make sure +to reinstall the package after doing so. +""" + +from scfw.ecosystem import ECOSYSTEM +from scfw.logger import FirewallAction, FirewallLogger +from scfw.target import InstallTarget + + +class CustomFirewallLogger(FirewallLogger): + def log( + self, + action: FirewallAction, + ecosystem: ECOSYSTEM, + command: list[str], + targets: list[InstallTarget] + ): + return + + +def load_logger() -> FirewallLogger: + return CustomFirewallLogger() diff --git a/examples/verifier.py b/examples/verifier.py new file mode 100644 index 0000000..c6adefa --- /dev/null +++ b/examples/verifier.py @@ -0,0 +1,33 @@ +""" +Users of the supply chain firewall may provide custom installation target +verifiers representing alternative sources of truth for the firewall to use. +This module contains a template for writing such a custom verifier. + +The firewall discovers verifiers at runtime via the following simple protocol. +The module implementing the custom verifier must contain a function with the +following name and signature: + +``` +def load_verifier() -> InstallTargetVerifier +``` + +This `load_verifier` function should return an instance of the custom verifier +for the firewall's use. The module may then be placed in the scfw/verifiers directory +for runtime import, no further modification required.. Make sure to reinstall the +package after doing so. +""" + +from scfw.target import InstallTarget +from scfw.verifier import FindingSeverity, InstallTargetVerifier + + +class CustomInstallTargetVerifier(InstallTargetVerifier): + def name(self) -> str: + return "CustomInstallTargetVerifier" + + def verify(self, target: InstallTarget) -> list[tuple[FindingSeverity, str]]: + return [] + + +def load_verifier() -> InstallTargetVerifier: + return CustomInstallTargetVerifier() diff --git a/scfw/loggers/__init__.py b/scfw/loggers/__init__.py index f6adecc..04b99aa 100644 --- a/scfw/loggers/__init__.py +++ b/scfw/loggers/__init__.py @@ -1,16 +1,51 @@ """ -Exports a set of client loggers implementing the firewall's logging protocol. +Exports the currently discoverable set of client loggers implementing the +firewall's logging protocol. + +One logger ships with the supply chain firewall by default: `DDLogger`, +which sends logs to Datadog. Firewall users may additionally provide +custom loggers according to their own logging needs. + +The firewall discovers loggers at runtime via the following simple protocol. +The module implementing the custom logger must contain a function with the +following name and signature: + +``` +def load_logger() -> FirewallLogger +``` + +This `load_logger` function should return an instance of the custom logger +for the firewall's use. The module may then be placed in the same directory +as this source file for runtime import. Make sure to reinstall the package +after doing so. """ +import importlib +import logging +import os +import pkgutil + from scfw.logger import FirewallLogger -from scfw.loggers.dd_logger import DDLogger + +_log = logging.getLogger(__name__) def get_firewall_loggers() -> list[FirewallLogger]: """ - Return the current set of available client loggers. + Return the currently discoverable set of client loggers. Returns: - A `list` of the currently available `FirewallLogger`s. + A `list` of the discovered `FirewallLogger`s. """ - return [DDLogger()] + loggers = [] + + for _, module, _ in pkgutil.iter_modules([os.path.dirname(__file__)]): + try: + logger = importlib.import_module(f".{module}", package=__name__).load_logger() + loggers.append(logger) + except ModuleNotFoundError: + _log.warning(f"Failed to load module {module} while collecting loggers") + except AttributeError: + _log.warning(f"Module {module} does not export a logger") + + return loggers diff --git a/scfw/loggers/dd_logger.py b/scfw/loggers/dd_logger.py index 8f10a90..1bee669 100644 --- a/scfw/loggers/dd_logger.py +++ b/scfw/loggers/dd_logger.py @@ -17,56 +17,8 @@ from datadog_api_client.v2.model.http_log_item import HTTPLogItem import dotenv - -class DDLogger(FirewallLogger): - """ - An implementation of `FirewallLogger` for sending logs to Datadog. - """ - def __init__(self): - """ - Initialize a new `DDLogger`. - """ - DD_LOG_NAME = "ddlog" - DD_LOG_FORMAT = "%(asctime)s %(levelname)s [%(name)s] [%(filename)s:%(lineno)d] - %(message)s" - - dotenv.load_dotenv() - handler = _DDLogHandler() if os.getenv("DD_API_KEY") else logging.NullHandler() - handler.setFormatter(logging.Formatter(DD_LOG_FORMAT)) - - ddlog = logging.getLogger(DD_LOG_NAME) - ddlog.setLevel(logging.INFO) - ddlog.addHandler(handler) - - self.logger = ddlog - - def log( - self, - action: FirewallAction, - ecosystem: ECOSYSTEM, - command: list[str], - targets: list[InstallTarget] - ): - """ - Receive and log data about a completed firewall run. - - Args: - action: The action taken by the firewall. - ecosystem: The ecosystem of the inspected package manager command. - command: The package manager command line provided to the firewall. - targets: The installation targets relevant to firewall's action. - """ - match action: - case FirewallAction.Allow: - message = f"Command '{' '.join(command)}' was allowed" - case FirewallAction.Block: - message = f"Command '{' '.join(command)}' was blocked" - case FirewallAction.Abort: - message = f"Command '{' '.join(command)}' was aborted" - - self.logger.info( - message, - extra={"ecosystem": ecosystem.value, "targets": map(str, targets)} - ) +_DD_LOG_NAME = "ddlog" +_DD_LOG_FORMAT = "%(asctime)s %(levelname)s [%(name)s] [%(filename)s:%(lineno)d] - %(message)s" class _DDLogHandler(logging.Handler): @@ -118,3 +70,63 @@ def emit(self, record): with ApiClient(configuration) as api_client: api_instance = LogsApi(api_client) api_instance.submit_log(content_encoding=ContentEncoding.DEFLATE, body=body) + + +# Configure a single logging handle for all `DDLogger` instances to share +dotenv.load_dotenv() +_handler = _DDLogHandler() if os.getenv("DD_API_KEY") else logging.NullHandler() +_handler.setFormatter(logging.Formatter(_DD_LOG_FORMAT)) + +_ddlog = logging.getLogger(_DD_LOG_NAME) +_ddlog.setLevel(logging.INFO) +_ddlog.addHandler(_handler) + + +class DDLogger(FirewallLogger): + """ + An implementation of `FirewallLogger` for sending logs to Datadog. + """ + def __init__(self): + """ + Initialize a new `DDLogger`. + """ + self.logger = _ddlog + + def log( + self, + action: FirewallAction, + ecosystem: ECOSYSTEM, + command: list[str], + targets: list[InstallTarget] + ): + """ + Receive and log data about a completed firewall run. + + Args: + action: The action taken by the firewall. + ecosystem: The ecosystem of the inspected package manager command. + command: The package manager command line provided to the firewall. + targets: The installation targets relevant to firewall's action. + """ + match action: + case FirewallAction.Allow: + message = f"Command '{' '.join(command)}' was allowed" + case FirewallAction.Block: + message = f"Command '{' '.join(command)}' was blocked" + case FirewallAction.Abort: + message = f"Command '{' '.join(command)}' was aborted" + + self.logger.info( + message, + extra={"ecosystem": ecosystem.value, "targets": map(str, targets)} + ) + + +def load_logger() -> FirewallLogger: + """ + Export `DDLogger` for discovery by the firewall. + + Returns: + A `DDLogger` for use in a run of the supply chain firewall. + """ + return DDLogger() diff --git a/scfw/verifiers/__init__.py b/scfw/verifiers/__init__.py index 6f4a981..f5debae 100644 --- a/scfw/verifiers/__init__.py +++ b/scfw/verifiers/__init__.py @@ -1,22 +1,53 @@ """ -Exports the current set of installation target verifiers for use in the -supply-chain firewall's main routine. +Exports the currently discoverable set of installation target verifiers +for use in the supply chain firewall's main routine. + +Two installation target verifiers ship with the supply chain firewall +by default: one that uses Datadog Security Research's public malicious +packages dataset and one that uses OSV.dev. Users of the supply chain +firewall may additionally provide custom verifiers representing alternative +sources of truth for the firewall to use. + +The firewall discovers verifiers at runtime via the following simple protocol. +The module implementing the custom verifier must contain a function with the +following name and signature: + +``` +def load_verifier() -> InstallTargetVerifier +``` + +This `load_verifier` function should return an instance of the custom verifier +for the firewall's use. The module may then be placed in the same directory +as this source file for runtime import. Make sure to reinstall the package +after doing so. """ +import importlib +import logging +import os +import pkgutil + from scfw.verifier import InstallTargetVerifier -from scfw.verifiers.dd_verifier import DatadogMaliciousPackagesVerifier -from scfw.verifiers.osv_verifier import OsvVerifier + +_log = logging.getLogger(__name__) def get_install_target_verifiers() -> list[InstallTargetVerifier]: """ - Return the current set of installation target verifiers. + Return the currently discoverable set of installation target verifiers. Returns: - A list of initialized installation target verifiers, one per currently - available type. + A list of the discovered installation target verifiers. """ - return [ - DatadogMaliciousPackagesVerifier(), - OsvVerifier() - ] + verifiers = [] + + for _, module, _ in pkgutil.iter_modules([os.path.dirname(__file__)]): + try: + verifier = importlib.import_module(f".{module}", package=__name__).load_verifier() + verifiers.append(verifier) + except ModuleNotFoundError: + _log.warning(f"Failed to load module {module} while collecting installation target verifiers") + except AttributeError: + _log.warning(f"Module {module} does not export an installation target verifier") + + return verifiers diff --git a/scfw/verifiers/dd_verifier.py b/scfw/verifiers/dd_verifier.py index 3ea9293..d606e14 100644 --- a/scfw/verifiers/dd_verifier.py +++ b/scfw/verifiers/dd_verifier.py @@ -71,3 +71,13 @@ def verify(self, target: InstallTarget) -> list[tuple[FindingSeverity, str]]: ] else: return [] + + +def load_verifier() -> InstallTargetVerifier: + """ + Export `DatadogMaliciousPackagesVerifier` for discovery by the firewall. + + Returns: + A `DatadogMaliciousPackagesVerifier` for use in a run of the supply chain firewall. + """ + return DatadogMaliciousPackagesVerifier() diff --git a/scfw/verifiers/osv_verifier.py b/scfw/verifiers/osv_verifier.py index 151ae7e..b106a5c 100644 --- a/scfw/verifiers/osv_verifier.py +++ b/scfw/verifiers/osv_verifier.py @@ -102,3 +102,13 @@ def error_message(e: str) -> str: except requests.exceptions.RequestException as e: _log.warning(f"Failed to query OSV.dev API: returning WARNING finding for target {target}") return [(FindingSeverity.WARNING, error_message(str(e)))] + + +def load_verifier() -> InstallTargetVerifier: + """ + Export `OsvVerifier` for discovery by the firewall. + + Returns: + An `OsvVerifier` for use in a run of the supply chain firewall. + """ + return OsvVerifier() diff --git a/tests/commands/test_pip_command.py b/tests/commands/test_pip_command.py index 43da23d..4e94b87 100644 --- a/tests/commands/test_pip_command.py +++ b/tests/commands/test_pip_command.py @@ -47,16 +47,17 @@ def test_pip_command_would_install_exact(): map( lambda p: InstallTarget(ECOSYSTEM.PIP, p[0], p[1]), [ - ("certifi", "2024.8.30"), - ("charset-normalizer", "3.3.2"), - ("idna", "3.10"), - ("requests", "2.32.3"), - ("urllib3", "2.2.3") + ("botocore", "1.15.0"), + ("docutils", "0.15.2"), + ("jmespath", "0.10.0"), + ("python-dateutil", "2.9.0.post0"), + ("six", "1.16.0"), + ("urllib3", "1.25.11") ] ) ) - command_line = ["pip", "install", "--ignore-installed", "requests==2.32.3"] + command_line = ["pip", "install", "--ignore-installed", "botocore==1.15.0"] command = PipCommand(command_line, executable=sys.executable) targets = command.would_install() assert len(targets) == len(true_targets)