-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[SECRES-2420] Support custom verifiers and loggers (#26)
- Loading branch information
Showing
9 changed files
with
246 additions
and
73 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
""" | ||
Users of the supply chain firewall are able to use their own custom | ||
loggers according to their own logging needs. This module contains a | ||
template for writing such a custom logger. | ||
The firewall discovers loggers at runtime via the following simple protocol. | ||
The module implementing the custom logger must contain a function with the | ||
following name and signature: | ||
``` | ||
def load_logger() -> FirewallLogger | ||
``` | ||
This `load_logger` function should return an instance of the custom logger | ||
for the firewall's use. The module may then be placed in the `scfw/loggers` | ||
directory for runtime import, no further modification required. Make sure | ||
to reinstall the package after doing so. | ||
""" | ||
|
||
from scfw.ecosystem import ECOSYSTEM | ||
from scfw.logger import FirewallAction, FirewallLogger | ||
from scfw.target import InstallTarget | ||
|
||
|
||
class CustomFirewallLogger(FirewallLogger): | ||
def log( | ||
self, | ||
action: FirewallAction, | ||
ecosystem: ECOSYSTEM, | ||
command: list[str], | ||
targets: list[InstallTarget] | ||
): | ||
return | ||
|
||
|
||
def load_logger() -> FirewallLogger: | ||
return CustomFirewallLogger() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
""" | ||
Users of the supply chain firewall may provide custom installation target | ||
verifiers representing alternative sources of truth for the firewall to use. | ||
This module contains a template for writing such a custom verifier. | ||
The firewall discovers verifiers at runtime via the following simple protocol. | ||
The module implementing the custom verifier must contain a function with the | ||
following name and signature: | ||
``` | ||
def load_verifier() -> InstallTargetVerifier | ||
``` | ||
This `load_verifier` function should return an instance of the custom verifier | ||
for the firewall's use. The module may then be placed in the scfw/verifiers directory | ||
for runtime import, no further modification required.. Make sure to reinstall the | ||
package after doing so. | ||
""" | ||
|
||
from scfw.target import InstallTarget | ||
from scfw.verifier import FindingSeverity, InstallTargetVerifier | ||
|
||
|
||
class CustomInstallTargetVerifier(InstallTargetVerifier): | ||
def name(self) -> str: | ||
return "CustomInstallTargetVerifier" | ||
|
||
def verify(self, target: InstallTarget) -> list[tuple[FindingSeverity, str]]: | ||
return [] | ||
|
||
|
||
def load_verifier() -> InstallTargetVerifier: | ||
return CustomInstallTargetVerifier() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,16 +1,51 @@ | ||
""" | ||
Exports a set of client loggers implementing the firewall's logging protocol. | ||
Exports the currently discoverable set of client loggers implementing the | ||
firewall's logging protocol. | ||
One logger ships with the supply chain firewall by default: `DDLogger`, | ||
which sends logs to Datadog. Firewall users may additionally provide | ||
custom loggers according to their own logging needs. | ||
The firewall discovers loggers at runtime via the following simple protocol. | ||
The module implementing the custom logger must contain a function with the | ||
following name and signature: | ||
``` | ||
def load_logger() -> FirewallLogger | ||
``` | ||
This `load_logger` function should return an instance of the custom logger | ||
for the firewall's use. The module may then be placed in the same directory | ||
as this source file for runtime import. Make sure to reinstall the package | ||
after doing so. | ||
""" | ||
|
||
import importlib | ||
import logging | ||
import os | ||
import pkgutil | ||
|
||
from scfw.logger import FirewallLogger | ||
from scfw.loggers.dd_logger import DDLogger | ||
|
||
_log = logging.getLogger(__name__) | ||
|
||
|
||
def get_firewall_loggers() -> list[FirewallLogger]: | ||
""" | ||
Return the current set of available client loggers. | ||
Return the currently discoverable set of client loggers. | ||
Returns: | ||
A `list` of the currently available `FirewallLogger`s. | ||
A `list` of the discovered `FirewallLogger`s. | ||
""" | ||
return [DDLogger()] | ||
loggers = [] | ||
|
||
for _, module, _ in pkgutil.iter_modules([os.path.dirname(__file__)]): | ||
try: | ||
logger = importlib.import_module(f".{module}", package=__name__).load_logger() | ||
loggers.append(logger) | ||
except ModuleNotFoundError: | ||
_log.warning(f"Failed to load module {module} while collecting loggers") | ||
except AttributeError: | ||
_log.warning(f"Module {module} does not export a logger") | ||
|
||
return loggers |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,22 +1,53 @@ | ||
""" | ||
Exports the current set of installation target verifiers for use in the | ||
supply-chain firewall's main routine. | ||
Exports the currently discoverable set of installation target verifiers | ||
for use in the supply chain firewall's main routine. | ||
Two installation target verifiers ship with the supply chain firewall | ||
by default: one that uses Datadog Security Research's public malicious | ||
packages dataset and one that uses OSV.dev. Users of the supply chain | ||
firewall may additionally provide custom verifiers representing alternative | ||
sources of truth for the firewall to use. | ||
The firewall discovers verifiers at runtime via the following simple protocol. | ||
The module implementing the custom verifier must contain a function with the | ||
following name and signature: | ||
``` | ||
def load_verifier() -> InstallTargetVerifier | ||
``` | ||
This `load_verifier` function should return an instance of the custom verifier | ||
for the firewall's use. The module may then be placed in the same directory | ||
as this source file for runtime import. Make sure to reinstall the package | ||
after doing so. | ||
""" | ||
|
||
import importlib | ||
import logging | ||
import os | ||
import pkgutil | ||
|
||
from scfw.verifier import InstallTargetVerifier | ||
from scfw.verifiers.dd_verifier import DatadogMaliciousPackagesVerifier | ||
from scfw.verifiers.osv_verifier import OsvVerifier | ||
|
||
_log = logging.getLogger(__name__) | ||
|
||
|
||
def get_install_target_verifiers() -> list[InstallTargetVerifier]: | ||
""" | ||
Return the current set of installation target verifiers. | ||
Return the currently discoverable set of installation target verifiers. | ||
Returns: | ||
A list of initialized installation target verifiers, one per currently | ||
available type. | ||
A list of the discovered installation target verifiers. | ||
""" | ||
return [ | ||
DatadogMaliciousPackagesVerifier(), | ||
OsvVerifier() | ||
] | ||
verifiers = [] | ||
|
||
for _, module, _ in pkgutil.iter_modules([os.path.dirname(__file__)]): | ||
try: | ||
verifier = importlib.import_module(f".{module}", package=__name__).load_verifier() | ||
verifiers.append(verifier) | ||
except ModuleNotFoundError: | ||
_log.warning(f"Failed to load module {module} while collecting installation target verifiers") | ||
except AttributeError: | ||
_log.warning(f"Module {module} does not export an installation target verifier") | ||
|
||
return verifiers |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters