Skip to content

Commit f1883f1

Browse files
committed
refactor: heuristic dependencies handled in main problog logic
Signed-off-by: Carl Flottmann <[email protected]>
1 parent 4fbf160 commit f1883f1

File tree

12 files changed

+104
-78
lines changed

12 files changed

+104
-78
lines changed

src/macaron/malware_analyzer/pypi_heuristics/base_analyzer.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved.
1+
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved.
22
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.
33

44
"""Define and initialize the base analyzer."""
@@ -18,13 +18,9 @@ def __init__(
1818
self,
1919
name: str,
2020
heuristic: Heuristics,
21-
depends_on: list[tuple[Heuristics, HeuristicResult]] | None,
2221
) -> None:
2322
self.name: str = name
2423
self.heuristic: Heuristics = heuristic
25-
self.depends_on: list[tuple[Heuristics, HeuristicResult]] | None = (
26-
depends_on # Contains the dependent heuristics and the expected result of each heuristic
27-
)
2824

2925
@abstractmethod
3026
def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:

src/macaron/malware_analyzer/pypi_heuristics/heuristics.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,6 @@ class HeuristicResult(str, Enum):
5353
#: Indicates that suspicious activity was detected.
5454
FAIL = "FAIL"
5555

56-
#: Indicates that the heuristic check could not be performed due to missing metadata.
57-
#: The `SKIP` result occurs when the necessary metadata is not available. This often happens
58-
#: when fetching data through the PyPI API and the relevant data, such as the maintainer's
59-
#: join date or release information, is missing or unavailable.
56+
#: Indicates that this heuristic is not applicable to this package.
57+
#: Please use HeuristicAnalyzerValueError for malformed package data.
6058
SKIP = "SKIP"

src/macaron/malware_analyzer/pypi_heuristics/metadata/anomalous_version.py

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,7 @@ class AnomalousVersionAnalyzer(BaseHeuristicAnalyzer):
6161
DIGIT_DATE_FORMATS: list[str] = ["%Y%m%d", "%Y%d%m", "%d%m%Y", "%m%d%Y", "%y%m%d", "%y%d%m", "%d%m%y", "%m%d%y"]
6262

6363
def __init__(self) -> None:
64-
super().__init__(
65-
name="anomalous_version_analyzer",
66-
heuristic=Heuristics.ANOMALOUS_VERSION,
67-
depends_on=[(Heuristics.ONE_RELEASE, HeuristicResult.FAIL)],
68-
)
64+
super().__init__(name="anomalous_version_analyzer", heuristic=Heuristics.ANOMALOUS_VERSION)
6965
self.major_threshold, self.epoch_threshold, self.day_publish_error = self._load_defaults()
7066

7167
def _load_defaults(self) -> tuple[int, int, int]:
@@ -110,13 +106,8 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
110106
logger.debug(error_msg)
111107
raise HeuristicAnalyzerValueError(error_msg)
112108

113-
if len(releases) != 1:
114-
error_msg = (
115-
"This heuristic depends on a single release, but somehow there are multiple when the one release"
116-
+ " heuristic failed."
117-
)
118-
logger.debug(error_msg)
119-
raise HeuristicAnalyzerValueError(error_msg)
109+
if len(releases) != 1: # We only analyze packages with a single release, this heuristic does not apply.
110+
return HeuristicResult.SKIP, {}
120111

121112
# Since there is only one release, the latest version should be that release
122113
release = pypi_package_json.get_latest_version()

src/macaron/malware_analyzer/pypi_heuristics/metadata/closer_release_join_date.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,19 @@
33

44
"""Analyzer checks whether the maintainers' join date closer to latest package's release date."""
55

6+
import logging
67
from datetime import datetime, timedelta
78

89
from macaron.config.defaults import defaults
10+
from macaron.errors import HeuristicAnalyzerValueError
911
from macaron.json_tools import JsonType
1012
from macaron.malware_analyzer.datetime_parser import parse_datetime
1113
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
1214
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
1315
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset, PyPIRegistry
1416

17+
logger: logging.Logger = logging.getLogger(__name__)
18+
1519

1620
class CloserReleaseJoinDateAnalyzer(BaseHeuristicAnalyzer):
1721
"""Check whether the maintainers' join date closer to package's latest release date.
@@ -20,9 +24,7 @@ class CloserReleaseJoinDateAnalyzer(BaseHeuristicAnalyzer):
2024
"""
2125

2226
def __init__(self) -> None:
23-
super().__init__(
24-
name="closer_release_join_date_analyzer", heuristic=Heuristics.CLOSER_RELEASE_JOIN_DATE, depends_on=None
25-
)
27+
super().__init__(name="closer_release_join_date_analyzer", heuristic=Heuristics.CLOSER_RELEASE_JOIN_DATE)
2628
self.gap_threshold: int = self._load_defaults()
2729

2830
def _load_defaults(self) -> int:
@@ -97,17 +99,24 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
9799
maintainers_join_date: list[datetime] | None = self._get_maintainers_join_date(
98100
pypi_package_json.pypi_registry, pypi_package_json.component_name
99101
)
102+
if not maintainers_join_date:
103+
error_msg = "Metadata has no maintainers or join dates for them"
104+
logger.debug(error_msg)
105+
raise HeuristicAnalyzerValueError(error_msg)
106+
100107
latest_release_date: datetime | None = self._get_latest_release_date(pypi_package_json)
108+
if not latest_release_date:
109+
error_msg = "Unable to parse latest upload time"
110+
logger.debug(error_msg)
111+
raise HeuristicAnalyzerValueError(error_msg)
112+
101113
detail_info: dict[str, JsonType] = {
102114
"maintainers_join_date": (
103115
[date.strftime("%Y-%m-%d %H:%M:%S") for date in maintainers_join_date] if maintainers_join_date else []
104116
),
105117
"latest_release_date": latest_release_date.strftime("%Y-%m-%d %H:%M:%S") if latest_release_date else "",
106118
}
107119

108-
if maintainers_join_date is None or latest_release_date is None:
109-
return HeuristicResult.SKIP, detail_info
110-
111120
for date in maintainers_join_date:
112121
difference = abs(latest_release_date - date)
113122
threshold_delta = timedelta(days=self.gap_threshold)

src/macaron/malware_analyzer/pypi_heuristics/metadata/empty_project_link.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,21 @@
33

44
"""Analyzer checks there is no project link of the package."""
55

6+
import logging
7+
68
from macaron.json_tools import JsonType
79
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
810
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
911
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset
1012

13+
logger: logging.Logger = logging.getLogger(__name__)
14+
1115

1216
class EmptyProjectLinkAnalyzer(BaseHeuristicAnalyzer):
1317
"""Check whether the PyPI package has no project links."""
1418

1519
def __init__(self) -> None:
16-
super().__init__(name="empty_project_link_analyzer", heuristic=Heuristics.EMPTY_PROJECT_LINK, depends_on=None)
20+
super().__init__(name="empty_project_link_analyzer", heuristic=Heuristics.EMPTY_PROJECT_LINK)
1721

1822
def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:
1923
"""Analyze the package.
@@ -30,10 +34,7 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
3034
"""
3135
project_links = pypi_package_json.get_project_links()
3236

33-
if project_links is None:
34-
return HeuristicResult.FAIL, {}
35-
36-
if len(project_links) == 0: # Total.
37+
if project_links is None or len(project_links) == 0:
3738
return HeuristicResult.FAIL, {}
3839

3940
return HeuristicResult.PASS, {"project_links": project_links}

src/macaron/malware_analyzer/pypi_heuristics/metadata/high_release_frequency.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from datetime import datetime
88

99
from macaron.config.defaults import defaults
10+
from macaron.errors import HeuristicAnalyzerValueError
1011
from macaron.json_tools import JsonType, json_extract
1112
from macaron.malware_analyzer.datetime_parser import parse_datetime
1213
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
@@ -20,11 +21,7 @@ class HighReleaseFrequencyAnalyzer(BaseHeuristicAnalyzer):
2021
"""Check whether the release frequency is high."""
2122

2223
def __init__(self) -> None:
23-
super().__init__(
24-
name="high_release_frequency_analyzer",
25-
heuristic=Heuristics.HIGH_RELEASE_FREQUENCY,
26-
depends_on=[(Heuristics.ONE_RELEASE, HeuristicResult.PASS)], # Analyzing when this heuristic pass
27-
)
24+
super().__init__(name="high_release_frequency_analyzer", heuristic=Heuristics.HIGH_RELEASE_FREQUENCY)
2825
self.average_gap_threshold: int = self._load_defaults() # Days
2926

3027
def _load_defaults(self) -> int:
@@ -49,7 +46,13 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
4946
The result and related information collected during the analysis.
5047
"""
5148
version_to_releases: dict | None = pypi_package_json.get_releases()
52-
if version_to_releases is None or len(version_to_releases) == 1:
49+
if version_to_releases is None:
50+
error_msg = "Metadata has no release information"
51+
logger.debug(error_msg)
52+
raise HeuristicAnalyzerValueError(error_msg)
53+
54+
if len(version_to_releases) == 1:
55+
# We only analyze packages with multiple releases, this heuristic does not apply.
5356
return HeuristicResult.SKIP, {}
5457

5558
extract_data: dict[str, datetime] = {}

src/macaron/malware_analyzer/pypi_heuristics/metadata/one_release.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,22 @@
44

55
"""Analyzer checks the packages contain one release."""
66

7+
import logging
8+
9+
from macaron.errors import HeuristicAnalyzerValueError
710
from macaron.json_tools import JsonType
811
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
912
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
1013
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset
1114

15+
logger: logging.Logger = logging.getLogger(__name__)
16+
1217

1318
class OneReleaseAnalyzer(BaseHeuristicAnalyzer):
1419
"""Determine if there is only one release of the package."""
1520

1621
def __init__(self) -> None:
17-
super().__init__(name="one_release_analyzer", heuristic=Heuristics.ONE_RELEASE, depends_on=None)
22+
super().__init__(name="one_release_analyzer", heuristic=Heuristics.ONE_RELEASE)
1823

1924
def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:
2025
"""Analyze the package.
@@ -31,7 +36,9 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
3136
"""
3237
releases: dict | None = pypi_package_json.get_releases()
3338
if releases is None:
34-
return HeuristicResult.SKIP, {"releases": {}}
39+
error_msg = "Metadata has no release information"
40+
logger.debug(error_msg)
41+
raise HeuristicAnalyzerValueError(error_msg)
3542

3643
if len(releases) == 1:
3744
return HeuristicResult.FAIL, {"releases": releases} # Higher false positive, so we keep it MEDIUM

src/macaron/malware_analyzer/pypi_heuristics/metadata/source_code_repo.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ def __init__(self) -> None:
2424
super().__init__(
2525
name="source_code_repo_analyzer",
2626
heuristic=Heuristics.SOURCE_CODE_REPO,
27-
depends_on=[(Heuristics.EMPTY_PROJECT_LINK, HeuristicResult.PASS)],
2827
)
2928

3029
def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:

src/macaron/malware_analyzer/pypi_heuristics/metadata/unchanged_release.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved.
1+
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved.
22
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.
33

44
"""Heuristics analyzer to check unchanged content in multiple releases."""
55
import logging
66
from collections import Counter
77

8+
from macaron.errors import HeuristicAnalyzerValueError
89
from macaron.json_tools import JsonType, json_extract
910
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
1011
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
@@ -17,11 +18,7 @@ class UnchangedReleaseAnalyzer(BaseHeuristicAnalyzer):
1718
"""Analyze whether the content of the package is updated by the maintainer."""
1819

1920
def __init__(self) -> None:
20-
super().__init__(
21-
name="unchanged_release_analyzer",
22-
heuristic=Heuristics.UNCHANGED_RELEASE,
23-
depends_on=[(Heuristics.HIGH_RELEASE_FREQUENCY, HeuristicResult.FAIL)],
24-
)
21+
super().__init__(name="unchanged_release_analyzer", heuristic=Heuristics.UNCHANGED_RELEASE)
2522
self.hash_algo: str = "sha256"
2623

2724
def _get_digests(self, pypi_package_json: PyPIPackageJsonAsset) -> list[str] | None:
@@ -68,6 +65,12 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
6865
"""
6966
digests: list[str] | None = self._get_digests(pypi_package_json)
7067
if digests is None:
68+
error_msg = "Metadata has no digest information"
69+
logger.debug(error_msg)
70+
raise HeuristicAnalyzerValueError(error_msg)
71+
72+
if len(digests) == 1:
73+
# We only analyze packages with multiple releases, this heuristic does not apply.
7174
return HeuristicResult.SKIP, {}
7275

7376
frequency = Counter(digests)

src/macaron/malware_analyzer/pypi_heuristics/metadata/wheel_absence.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ def __init__(self) -> None:
3434
super().__init__(
3535
name="wheel_absence_analyzer",
3636
heuristic=Heuristics.WHEEL_ABSENCE,
37-
depends_on=None,
3837
)
3938

4039
def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:

0 commit comments

Comments
 (0)