1111import base64
1212import binascii
1313import ipaddress
14+ import json
1415import logging
1516import os
1617import re
17- from dataclasses import dataclass
18+ import subprocess # nosec
19+ import tempfile
20+ from collections import defaultdict
21+ from typing import Any
1822
1923import yaml
2024
2125from macaron .config .defaults import defaults
2226from macaron .errors import ConfigurationError , HeuristicAnalyzerValueError
23- from macaron .json_tools import JsonType
27+ from macaron .json_tools import JsonType , json_extract
2428from macaron .malware_analyzer .pypi_heuristics .heuristics import HeuristicResult
2529from macaron .slsa_analyzer .package_registry .pypi_registry import PyPIPackageJsonAsset
2630
3135CALLS = "calls"
3236
3337
34- @dataclass (frozen = True )
35- class Import :
36- """Data class to hold information about extracted import statements.
37-
38- Name, alias, and module are referring to the following patterns of python import statements:
39- - [from <module>] import <name> [as <alias>]
40- """
41-
42- name : str
43- alias : str | None
44- module : str | None
45- lineno : int
46- statement : str
47-
48-
4938class PyPISourcecodeAnalyzer :
5039 """This class is used to analyze the source code of python PyPI packages. This analyzer is a work in progress.
5140
@@ -70,6 +59,7 @@ class PyPISourcecodeAnalyzer:
7059 def __init__ (self ) -> None :
7160 """Collect required data for analysing the source code."""
7261 self .suspicious_patterns = self ._load_defaults ()
62+ self .rule_files : list = []
7363
7464 def _load_defaults (self ) -> dict [str , dict [str , list ]]:
7565 """Load the suspicious pattern from suspicious_pattern.yaml.
@@ -106,7 +96,7 @@ def _load_defaults(self) -> dict[str, dict[str, list]]:
10696 with open (filename , encoding = "utf-8" ) as file :
10797 configured_patterns : dict [str , JsonType ] = yaml .safe_load (file )
10898 except FileNotFoundError as file_error :
109- error_msg = f"Unable to open locate { filename } "
99+ error_msg = f"Unable to locate { filename } "
110100 logger .debug (error_msg )
111101 raise ConfigurationError (error_msg ) from file_error
112102 except yaml .YAMLError as yaml_error :
@@ -162,44 +152,60 @@ def analyze_patterns(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[Heu
162152 HeuristicAnalyzerValueError
163153 if there is no source code available.
164154 """
165- analysis_result : dict = {}
155+ analysis_result : defaultdict = defaultdict (list )
156+ semgrep_commands : list [str ] = ["semgrep" , "scan" ]
166157 result : HeuristicResult = HeuristicResult .PASS
167158
168- source_code = pypi_package_json .package_sourcecode
169- if not source_code :
170- error_msg = "Unable to retrieve PyPI package source code"
159+ source_code_path = pypi_package_json .package_sourcecode_path
160+ if not source_code_path :
161+ error_msg = "Unable to retrieve PyPI package source code path "
171162 logger .debug (error_msg )
172163 raise HeuristicAnalyzerValueError (error_msg )
173164
174- for filename , content in source_code .items ():
175- detail_info = {}
165+ self ._create_rules ()
166+ for rule_file in self .rule_files :
167+ semgrep_commands .extend (["--config" , rule_file .name ])
168+ semgrep_commands .append (source_code_path )
176169
170+ with tempfile .NamedTemporaryFile (mode = "w+" , delete = True ) as output_json_file :
171+ semgrep_commands .append (f"--json-output={ output_json_file .name } " )
177172 try :
178- _ = ast .parse (content )
179- except (SyntaxError , ValueError ) as ast_parse_error :
180- logger .debug ("File %s cannot be parsed as a python file: %s" , filename , ast_parse_error )
181- continue
173+ process = subprocess .run (semgrep_commands , check = True , capture_output = True ) # nosec
174+ except (subprocess .CalledProcessError , subprocess .TimeoutExpired ) as semgrep_error :
175+ error_msg = (
176+ f"Unable to run semgrep on { source_code_path } with arguments { semgrep_commands } : { semgrep_error } "
177+ )
178+ logger .debug (error_msg )
179+ raise HeuristicAnalyzerValueError (error_msg ) from semgrep_error
182180
183- imports = self ._extract_imports (content )
184- import_names = set ()
185- for i in imports :
186- if i .module :
187- import_names .add ("." .join ([i .module , i .name ]))
188- import_names .add (i .name )
181+ if process .returncode != 0 :
182+ error_msg = f"Error running semgrep on { source_code_path } with arguments" f" { process .args } "
183+ logger .debug (error_msg )
184+ raise HeuristicAnalyzerValueError (error_msg )
189185
190- for category , patterns in self .suspicious_patterns [IMPORTS ].items ():
191- category_info = []
186+ semgrep_output = json .loads (output_json_file .read ())
192187
193- suspicious_imports = set .intersection (import_names , set (patterns ))
194- if suspicious_imports :
195- category_info = [i for i in imports if i .name in suspicious_imports ]
196- result = HeuristicResult .FAIL
188+ if not semgrep_output :
189+ return result , {}
197190
198- detail_info [category ] = category_info
191+ semgrep_findings = json_extract (semgrep_output , ["results" ], list )
192+ if not semgrep_findings :
193+ return result , {}
199194
200- analysis_result [filename ] = {IMPORTS : detail_info }
195+ result = HeuristicResult .FAIL # some semgrep rules were triggered
196+ for finding in semgrep_findings :
197+ category = json_extract (finding , ["check_id" ], str )
198+ if not category :
199+ continue
201200
202- return result , analysis_result
201+ file = json_extract (finding , ["path" ], str )
202+ start = json_extract (finding , ["start" , "line" ], int )
203+ end = json_extract (finding , ["end" , "line" ], int )
204+ analysis_result [category ].append ({"file" : file , "start" : start , "end" : end })
205+
206+ self ._clear_rules ()
207+
208+ return result , dict (analysis_result )
203209
204210 def analyze_dataflow (self , pypi_package_json : PyPIPackageJsonAsset ) -> tuple [HeuristicResult , dict [str , JsonType ]]:
205211 """Analyze the source code of the package for malicious dataflow.
@@ -253,122 +259,43 @@ def analyze_dataflow(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[Heu
253259
254260 return result , analysis_result
255261
256- def _extract_imports (self , content : str ) -> set [Import ]:
257- try :
258- return self ._extract_imports_from_ast (content )
259- except SyntaxError :
260- return self ._extract_imports_from_lines (content )
262+ def _create_rules (self ) -> None :
263+ rule_list : list [dict [str , Any ]] = []
264+ contents : dict = {}
261265
262- def _extract_imports_from_ast ( self , content : str ) -> set [ Import ] :
263- """Extract imports from source code using the parsed AST.
266+ if self . rule_files :
267+ self . _clear_rules ()
264268
265- Parameters
266- ----------
267- source_content: str
268- The source code as a string.
269+ # import rules
270+ for category , patterns in self . suspicious_patterns [ IMPORTS ]. items ():
271+ rule : dict [ str , Any ] = {}
272+ pattern_list : list = []
269273
270- Returns
271- -------
272- set[str ]
273- The set of imports.
274+ rule [ "id" ] = category
275+ rule [ "severity" ] = "ERROR"
276+ rule [ "languages" ] = [ "python" ]
277+ rule [ "message" ] = f"Detected suspicious imports from the ' { category } ' category"
274278
275- Raises
276- ------
277- SyntaxError
278- If the code could not be parsed.
279- """
280- imports = set ()
281- tree = ast .parse (content )
282- for node in ast .walk (tree ):
283- if isinstance (node , ast .Import ):
284- for alias in node .names :
285- imports .add (Import (alias .name , alias .asname , None , alias .lineno , "" ))
286- elif isinstance (node , ast .ImportFrom ):
287- module = node .module
288- if module :
289- _module = "." * node .level + module
290- for name in node .names :
291- imports .add (Import (name .name , name .asname , _module , name .lineno , "" ))
292- return imports
293-
294- def _extract_imports_from_lines (self , content : str ) -> set [Import ]:
295- """Extract imports from source code using per line pattern matching.
279+ for pattern in patterns :
280+ pattern_list .append ({"pattern" : f"import { pattern } " })
281+ pattern_list .append ({"pattern" : f"from { pattern } import $X" })
282+ pattern_list .append ({"pattern" : f'__import__("{ pattern } ")' })
296283
297- Parameters
298- ----------
299- source_content: str
300- The source code as a string.
284+ rule ["pattern-either" ] = pattern_list
285+ rule_list .append (rule )
301286
302- Returns
303- -------
304- set[str]
305- The list of imports.
306- """
307- alias_pattern = r"\s+as\s+\w+(?:\.{0,1}\w+)*"
308- # Pattern for module aliases.
309-
310- module_name = r"\w+(?:\.{0,1}\w+"
311- # <module_name> as described under pattern_import.
312-
313- pattern_import = (
314- r"(?:import\s+)(" + module_name + r")*(?:" + alias_pattern + r")?"
315- r"(?:(?:\s*,\s*)(?:" + module_name + r")*(?:" + alias_pattern + r")?))*)(?:(?:\s|#).*)?"
316- )
317- # Allows for a standard import statement.
318- # E.g.: import <module_name(s)> <other_text>
319- # Where <module_name(s)> consists of one or more <module_name>.
320- # Where <module_name> consists of one or more words (a-z or 0-9 or underscore) separated by periods,
321- # with an optional alias.
322- # Where <other_text> allows any character(s) either after a single space or a hash (#).
323-
324- pattern_from_import = (
325- r"(?:from\s+)([.]*"
326- + module_name
327- + r")*)(?:\s+import\s+(\w+(?:\s+as\s+\w+)?(?:(?:\s*,\s*)(?:\w+(?:\s+as\s+\w+)?))*))"
328- )
329- # Allows for a from import statement.
330- # E.g.: from <module_name> import <module_component(s)> <other_text>
331- # Where <module_name> is as above, but can also be preceded by any number of periods.
332- # (Note only a single module can be placed here.)
333- # Where <module_component(s)> consists of one or more <module_component> with optional aliases.
334- # Where <module_component> is identical to <module_name> except without any periods.
335- # Where <other_text> requires at least one space followed by one or more word characters, plus
336- # any other characters following on from that.
337-
338- combined_pattern = f"^(?:{ pattern_import } )|(?:{ pattern_from_import } )$"
339- # The combined pattern creates two match groups:
340- # 1 - standard import statement.
341- # 2 - from import statement module.
342- # 3 - from import statement module components.
343-
344- imports = set ()
345- for lineno , line in enumerate (content .splitlines ()):
346- line .strip ()
347- match = re .match (combined_pattern , line )
348- if not match :
349- continue
287+ contents = {"rules" : rule_list }
288+
289+ with tempfile .NamedTemporaryFile (
290+ "w" , prefix = f"{ IMPORTS } _" , suffix = ".yaml" , delete = False
291+ ) as import_patterns_file :
292+ yaml .dump (contents , import_patterns_file )
293+ self .rule_files .append (import_patterns_file )
350294
351- if match .group (1 ):
352- # Standard import, handle commas and aliases if present.
353- splits = self ._prune_aliased_lines (match .group (1 ), alias_pattern )
354- for split in splits :
355- imports .add (Import (split , None , None , lineno , "" ))
356- elif match .group (2 ):
357- # From import
358- if match .group (3 ):
359- splits = self ._prune_aliased_lines (match .group (3 ), alias_pattern )
360- for split in splits :
361- imports .add (Import (split , None , match .group (2 ), lineno , "" ))
362- return imports
363-
364- def _prune_aliased_lines (self , text : str , alias_pattern : str ) -> list [str ]:
365- """Split the line on commas and remove any aliases from individual parts."""
366- results = []
367- splits = text .split ("," )
368- for split in splits :
369- split = split .strip ()
370- results .append (re .sub (alias_pattern , "" , split ))
371- return results
295+ def _clear_rules (self ) -> None :
296+ for file in self .rule_files :
297+ file .close ()
298+ self .rule_files .clear ()
372299
373300
374301class DataFlowTracer (ast .NodeVisitor ):
0 commit comments