-
Notifications
You must be signed in to change notification settings - Fork 29
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
291 additions
and
56 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,102 +1,322 @@ | ||
from typing import Any, Dict | ||
from typing import Any, Dict, List, Optional | ||
|
||
from maco.model import CategoryEnum, ExtractorModel | ||
from maco.model import CategoryEnum, ConnUsageEnum, ExtractorModel | ||
|
||
from .extractor import Extractor | ||
|
||
|
||
class ConfigBuilder: | ||
def __init__(self, parent: Extractor): | ||
""" | ||
Allows to build configuration imperatively, using | ||
Maco model https://github.com/CybercentreCanada/Maco | ||
.. versionadded:: 4.4.0 | ||
""" | ||
def __init__(self, parent: Extractor) -> None: | ||
self.parent = parent | ||
|
||
def push_config(self, config: Dict[str, Any]): | ||
def push_config(self, config: Dict[str, Any]) -> None: | ||
# 'family' is required field in ExtractorModel | ||
# In our case 'family' is indicator that family has been actually matched | ||
# which is determined by returned value from extractor method | ||
config = ExtractorModel.parse_obj( | ||
{**config, "family": self.parent.family} | ||
).dict( | ||
exclude_unset=True, | ||
exclude_defaults=True, | ||
exclude={ | ||
"family", | ||
}, | ||
) | ||
self.parent.push_config(config) | ||
self.parent.push_config(config, jsonable=False) | ||
|
||
def add_others(self, others: Dict[str, Any]): | ||
def add_other(self, others: Dict[str, Any]) -> None: | ||
return self.push_config(dict(others=others)) | ||
|
||
def set_version(self, version: str): | ||
def set_version(self, version: str) -> None: | ||
return self.push_config(dict(version=version)) | ||
|
||
def add_category(self, *category: CategoryEnum): | ||
def add_category(self, *category: CategoryEnum) -> None: | ||
return self.push_config(dict(category=category)) | ||
|
||
def add_attack_ref(self, *attack_refs: str): | ||
return self.push_config(dict(attack=attack_refs)) | ||
def add_attack_ref(self, *attack_ref: str) -> None: | ||
return self.push_config(dict(attack=attack_ref)) | ||
|
||
def add_capability_enabled(self, *capability_enabled: str): | ||
def add_capability_enabled(self, *capability_enabled: str) -> None: | ||
return self.push_config(dict(capability_enabled=capability_enabled)) | ||
|
||
def add_capability_disabled(self, *capability_disabled: str): | ||
def add_capability_disabled(self, *capability_disabled: str) -> None: | ||
return self.push_config(dict(capability_disabled=capability_disabled)) | ||
|
||
def add_campaign_id(self, *campaign_id: str): | ||
def add_campaign_id(self, *campaign_id: str) -> None: | ||
return self.push_config(dict(campaign_id=campaign_id)) | ||
|
||
def add_identifier(self, *identifier: str): | ||
def add_decoded_strings(self, *string: str) -> None: | ||
return self.push_config(dict(decoded_strings=string)) | ||
|
||
def add_identifier(self, *identifier: str) -> None: | ||
return self.push_config(dict(identifier=identifier)) | ||
|
||
def add_password(self, *password: str): | ||
def add_password(self, *password: str) -> None: | ||
return self.push_config(dict(password=password)) | ||
|
||
def add_mutex(self, *mutex: str): | ||
def add_mutex(self, *mutex: str) -> None: | ||
return self.push_config(dict(mutex=mutex)) | ||
|
||
def add_pipe(self, *pipe: str): | ||
def add_pipe(self, *pipe: str) -> None: | ||
return self.push_config(dict(pipe=pipe)) | ||
|
||
def set_sleep_delay(self, sleep_delay: int): | ||
def set_sleep_delay(self, sleep_delay: int) -> None: | ||
return self.push_config(dict(sleep_delay=sleep_delay)) | ||
|
||
def add_inject_exe(self, *inject_exe: str): | ||
def add_inject_exe(self, *inject_exe: str) -> None: | ||
return self.push_config(dict(inject_exe=inject_exe)) | ||
|
||
def add_binary(self, *binary: ExtractorModel.Binary): | ||
return self.push_config(dict(binaries=binary)) | ||
def add_binary( | ||
self, | ||
filename: str, | ||
data: bytes, | ||
datatype: Optional[ExtractorModel.Binary.TypeEnum] = None, | ||
encryption: Optional[ExtractorModel.Binary.Encryption] = None, | ||
other: Optional[Dict[str, Any]] = None, | ||
) -> None: | ||
binary = ExtractorModel.Binary( | ||
data=data, | ||
datatype=datatype, | ||
encryption=encryption, | ||
other={**(other or {}), "filename": filename}, | ||
) | ||
return self.push_config(dict(binaries=[binary])) | ||
|
||
def add_ftp( | ||
self, | ||
username: Optional[str] = None, | ||
password: Optional[str] = None, | ||
hostname: Optional[str] = None, | ||
port: Optional[int] = None, | ||
path: Optional[str] = None, | ||
usage: Optional[ConnUsageEnum] = None, | ||
) -> None: | ||
ftp = ExtractorModel.FTP( | ||
username=username, | ||
password=password, | ||
hostname=hostname, | ||
port=port, | ||
path=path, | ||
usage=usage, | ||
) | ||
return self.push_config(dict(ftp=[ftp])) | ||
|
||
def add_ftp(self, *ftp: ExtractorModel.FTP): | ||
return self.push_config(dict(ftp=ftp)) | ||
def add_smtp( | ||
self, | ||
username: Optional[str] = None, | ||
password: Optional[str] = None, | ||
hostname: Optional[str] = None, | ||
port: Optional[int] = None, | ||
mail_to: Optional[List[str]] = None, | ||
mail_from: Optional[str] = None, | ||
subject: Optional[str] = None, | ||
) -> None: | ||
smtp = ExtractorModel.SMTP( | ||
username=username, | ||
password=password, | ||
hostname=hostname, | ||
port=port, | ||
mail_to=mail_to or [], | ||
mail_from=mail_from, | ||
subject=subject, | ||
) | ||
return self.push_config(dict(smtp=[smtp])) | ||
|
||
def add_smtp(self, *smtp: ExtractorModel.SMTP): | ||
return self.push_config(dict(smtp=smtp)) | ||
def add_http( | ||
self, | ||
uri: Optional[str] = None, | ||
protocol: Optional[str] = None, | ||
username: Optional[str] = None, | ||
password: Optional[str] = None, | ||
hostname: Optional[str] = None, | ||
port: Optional[int] = None, | ||
path: Optional[str] = None, | ||
query: Optional[str] = None, | ||
fragment: Optional[str] = None, | ||
user_agent: Optional[str] = None, | ||
method: Optional[str] = None, | ||
headers: Optional[Dict[str, str]] = None, | ||
max_size: Optional[int] = None, | ||
usage: Optional[ConnUsageEnum] = None, | ||
) -> None: | ||
http = ExtractorModel.Http( | ||
uri=uri, | ||
protocol=protocol, | ||
username=username, | ||
password=password, | ||
hostname=hostname, | ||
port=port, | ||
path=path, | ||
query=query, | ||
fragment=fragment, | ||
user_agent=user_agent, | ||
method=method, | ||
headers=headers, | ||
max_size=max_size, | ||
usage=usage, | ||
) | ||
return self.push_config(dict(http=[http])) | ||
|
||
def add_http(self, *http: ExtractorModel.Http): | ||
return self.push_config(dict(http=http)) | ||
def add_ssh( | ||
self, | ||
username: Optional[str] = None, | ||
password: Optional[str] = None, | ||
hostname: Optional[str] = None, | ||
port: Optional[int] = None, | ||
usage: Optional[ConnUsageEnum] = None, | ||
) -> None: | ||
ssh = ExtractorModel.SSH( | ||
username=username, | ||
password=password, | ||
hostname=hostname, | ||
port=port, | ||
usage=usage, | ||
) | ||
return self.push_config(dict(ssh=[ssh])) | ||
|
||
def add_ssh(self, *ssh: ExtractorModel.SSH): | ||
return self.push_config(dict(ssh=ssh)) | ||
def add_proxy( | ||
self, | ||
protocol: Optional[str] = None, | ||
username: Optional[str] = None, | ||
password: Optional[str] = None, | ||
hostname: Optional[str] = None, | ||
port: Optional[int] = None, | ||
usage: Optional[ConnUsageEnum] = None, | ||
) -> None: | ||
proxy = ExtractorModel.Proxy( | ||
protocol=protocol, | ||
username=username, | ||
password=password, | ||
hostname=hostname, | ||
port=port, | ||
usage=usage, | ||
) | ||
return self.push_config(dict(proxy=[proxy])) | ||
|
||
def add_proxy(self, *proxy: ExtractorModel.Proxy): | ||
return self.push_config(dict(proxy=proxy)) | ||
def add_dns( | ||
self, | ||
ip: Optional[str] = None, | ||
port: Optional[int] = None, | ||
usage: Optional[ConnUsageEnum] = None, | ||
) -> None: | ||
dns = ExtractorModel.DNS( | ||
ip=ip, | ||
port=port, | ||
usage=usage, | ||
) | ||
return self.push_config(dict(dns=[dns])) | ||
|
||
def add_dns(self, *dns: ExtractorModel.DNS): | ||
return self.push_config(dict(proxy=dns)) | ||
def add_tcp( | ||
self, | ||
client_ip: Optional[str] = None, | ||
client_port: Optional[int] = None, | ||
server_ip: Optional[str] = None, | ||
server_domain: Optional[str] = None, | ||
server_port: Optional[int] = None, | ||
usage: Optional[ConnUsageEnum] = None, | ||
) -> None: | ||
tcp = ExtractorModel.Connection( | ||
client_ip=client_ip, | ||
client_port=client_port, | ||
server_ip=server_ip, | ||
server_domain=server_domain, | ||
server_port=server_port, | ||
usage=usage, | ||
) | ||
return self.push_config(dict(tcp=[tcp])) | ||
|
||
def add_tcp(self, *tcp: ExtractorModel.Connection): | ||
return self.push_config(dict(tcp=tcp)) | ||
def add_udp( | ||
self, | ||
client_ip: Optional[str] = None, | ||
client_port: Optional[int] = None, | ||
server_ip: Optional[str] = None, | ||
server_domain: Optional[str] = None, | ||
server_port: Optional[int] = None, | ||
usage: Optional[ConnUsageEnum] = None, | ||
) -> None: | ||
udp = ExtractorModel.Connection( | ||
client_ip=client_ip, | ||
client_port=client_port, | ||
server_ip=server_ip, | ||
server_domain=server_domain, | ||
server_port=server_port, | ||
usage=usage, | ||
) | ||
return self.push_config(dict(udp=[udp])) | ||
|
||
def add_udp(self, *udp: ExtractorModel.Connection): | ||
return self.push_config(dict(udp=udp)) | ||
def add_encryption( | ||
self, | ||
algorithm: Optional[str] = None, | ||
public_key: Optional[str] = None, | ||
key: Optional[str] = None, | ||
provider: Optional[str] = None, | ||
mode: Optional[str] = None, | ||
iv: Optional[str] = None, | ||
seed: Optional[str] = None, | ||
nonce: Optional[str] = None, | ||
constants: List[str] = None, | ||
usage: Optional[ExtractorModel.Encryption.UsageEnum] = None, | ||
) -> None: | ||
encryption = ExtractorModel.Encryption( | ||
algorithm=algorithm, | ||
public_key=public_key, | ||
key=key, | ||
provider=provider, | ||
mode=mode, | ||
iv=iv, | ||
seed=seed, | ||
nonce=nonce, | ||
constants=constants or [], | ||
usage=usage, | ||
) | ||
return self.push_config(dict(encryption=[encryption])) | ||
|
||
def add_encryption(self, *encryption: ExtractorModel.Encryption): | ||
return self.push_config(dict(encryption=encryption)) | ||
def add_service( | ||
self, | ||
dll: Optional[str] = None, # dll that the service is loaded from | ||
name: Optional[str] = None, # service/driver name for persistence | ||
display_name: Optional[str] = None, # display name for service | ||
description: Optional[str] = None, # description for service | ||
): | ||
service = ExtractorModel.Service( | ||
dll=dll, name=name, display_name=display_name, description=description | ||
) | ||
return self.push_config(dict(service=[service])) | ||
|
||
def add_cryptocurrency(self, *cryptocurrency: ExtractorModel.Cryptocurrency): | ||
return self.push_config(dict(cryptocurrency=cryptocurrency)) | ||
def add_cryptocurrency( | ||
self, | ||
coin: Optional[str] = None, # BTC,ETH,USDT,BNB, etc | ||
address: Optional[str] = None, | ||
ransom_amount: Optional[ | ||
float | ||
] = None, # number of coins required (if hardcoded) | ||
usage: Optional[ExtractorModel.Cryptocurrency.UsageEnum] = None, | ||
) -> None: | ||
cryptocurrency = ExtractorModel.Service( | ||
coin=coin, address=address, ransom_amount=ransom_amount, usage=usage | ||
) | ||
return self.push_config(dict(cryptocurrency=[cryptocurrency])) | ||
|
||
def add_path(self, *path: ExtractorModel.Path): | ||
return self.push_config(dict(path=path)) | ||
def add_path( | ||
self, | ||
path: str, | ||
usage: Optional[ExtractorModel.Path.UsageEnum] = None, | ||
) -> None: | ||
path_obj = ExtractorModel.Path(path=path, usage=usage) | ||
return self.push_config(dict(path=[path_obj])) | ||
|
||
def add_registry(self, *registry: ExtractorModel.Registry): | ||
return self.push_config(dict(registry=registry)) | ||
def add_registry( | ||
self, | ||
key: str, | ||
usage: Optional[ExtractorModel.Registry.UsageEnum] = None, | ||
) -> None: | ||
registry = ExtractorModel.Registry( | ||
key=key, | ||
usage=usage, | ||
) | ||
return self.push_config(dict(registry=[registry])) |
Oops, something went wrong.