Skip to content

Commit

Permalink
Small error handling tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
msm-code committed Nov 14, 2024
1 parent f45c98e commit c742ccb
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 16 deletions.
3 changes: 3 additions & 0 deletions src/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@

def parse(family: str, config: Dict[str, Any]) -> IocCollection:
iocs = IocCollection()

# Handlers for special-cased families
if family in modules.modules:
iocs = modules.modules[family](config)

# Generic parser for the rest of the fields
modules.parse(config, iocs)

return iocs
19 changes: 13 additions & 6 deletions src/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from base64 import b64encode
from enum import Enum
from typing import List, Optional, Tuple, Union, cast
import logging
from urllib.parse import urlparse

from Cryptodome.PublicKey import RSA # type: ignore
Expand All @@ -10,6 +11,8 @@

from .errors import IocExtractError

log = logging.getLogger(__name__)

PUBKEY_PEM_TEMPLATE = (
"-----BEGIN PUBLIC KEY-----\n{}\n-----END PUBLIC KEY-----"
)
Expand Down Expand Up @@ -230,21 +233,21 @@ def try_add_rsa_from_pem(self, pem: str) -> None:
if pem:
self.add_rsa_key(RsaKey.parse_pem(pem))
except IocExtractError:
pass
log.warn("Failed to parse a RSA key from PEM")

def try_add_rsa_from_asn1_bytes(self, blob: bytes) -> None:
pem = PUBKEY_PEM_TEMPLATE.format(b64encode(blob).decode())

try:
self.add_rsa_key(RsaKey.parse_pem(pem))
except IocExtractError:
pass
log.warn("Failed to parse a RSA key from ASN1")

def try_add_rsa_from_base64(self, pem: str) -> None:
try:
self.add_rsa_key(RsaKey.parse_base64(pem))
except IocExtractError:
pass
log.warn("Failed to parse a RSA key from base64")

def add_network_location(self, netloc: NetworkLocation) -> None:
self.network_locations.append(netloc)
Expand All @@ -253,13 +256,17 @@ def add_host_port(
self, host: str, port: Union[str, int], schema: str = "unknown"
) -> None:
if isinstance(port, str):
port_val = int(port)
try:
port_val = int(port)
except ValueError:
log.warn("Failed to add URL from host_port")
return
else:
port_val = port
try:
self.try_add_url(f"{schema}://{host}:{port_val}")
except IocExtractError:
pass
log.warn("Failed to add URL from host_port")

def try_add_url(
self, url: str, location_type: LocationType = LocationType.CNC
Expand All @@ -271,7 +278,7 @@ def try_add_url(
NetworkLocation(url, location_type=location_type)
)
except IocExtractError:
pass
log.warn("Failed to add URL directly")

def add_password(self, password: str) -> None:
self.passwords.append(password)
Expand Down
30 changes: 21 additions & 9 deletions src/modules.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,26 @@
import string
from base64 import b64decode
from typing import Any, Dict, List
import logging

from .errors import ModuleAlreadyRegisteredError
from .errors import ModuleAlreadyRegisteredError, IocExtractError
from .model import EcdsaCurve, IocCollection, LocationType, RsaKey

modules: Dict[str, Any] = {}
log = logging.getLogger(__name__)


# Utils
class CantFindAHostForTheDomain(IocExtractError):
"""Can't find a host for the domain when adding url."""
pass

class DomainHasToBeAStringOrADict(IocExtractError):
"""Adding URL from something other than string or a dict."""
pass

class UnknownRsaKeyType(IocExtractError):
"""Can't guess the RSA key format."""
pass

def module(name):
def decorator(func):
Expand Down Expand Up @@ -50,11 +61,9 @@ def add_url(iocs: IocCollection, config: Dict[str, Any], key: str) -> None:
iocs.try_add_url(domain[hostkey])
break
else:
raise NotImplementedError("Can't find a host for the domain")
raise CantFindAHostForTheDomain()
else:
raise NotImplementedError(
"The domain has to be either a string or a list"
)
raise DomainHasToBeAStringOrADict()


def add_rsa_key(iocs: IocCollection, config: Dict, key: str) -> None:
Expand Down Expand Up @@ -87,7 +96,7 @@ def add_rsa_key(iocs: IocCollection, config: Dict, key: str) -> None:
iocs.try_add_rsa_from_asn1_bytes(enc_bytes.rstrip(b"\x00"))
continue

raise NotImplementedError("Unknown RSA key type")
raise UnknownRsaKeyType()


def add_key(iocs: IocCollection, config: Dict, key: str, keytype: str) -> None:
Expand All @@ -106,7 +115,10 @@ def add_mutex(iocs: IocCollection, config: Dict, key: str) -> None:

def parse(config: Dict[str, Any], iocs: IocCollection) -> None:
for name in ["publickey", "rsapub", "rsakey", "pubkey", "privkey"]:
add_rsa_key(iocs, config, name)
try:
add_rsa_key(iocs, config, name)
except UnknownRsaKeyType:
log.warn("Unknown RSA key type")

for name in [
"urls",
Expand Down Expand Up @@ -372,7 +384,7 @@ def parse_lockbit(config: Dict[str, Any]) -> IocCollection:
iocs.add_rsa_key(RsaKey(n=n, e=e))
del config["rsa_pub"]
except Exception:
pass
log.warn("Failed to parse a lockbit key")

return iocs

Expand Down
12 changes: 11 additions & 1 deletion src/run.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import argparse
import logging

from mwdblib import MWDB # type: ignore

Expand All @@ -14,8 +15,16 @@ def main():
parser.add_argument(
"config_id", help="Config to parse", default=None, nargs="?"
)
parser.add_argument(
"-v", "--verbose", help="Print debug logs", action="store_true"
)
args = parser.parse_args()

if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)

mwdb = MWDB()
mwdb.login(args.mwdb_user, args.mwdb_pass)

Expand All @@ -28,9 +37,10 @@ def main():
for cfg in mwdb.recent_configs():
if cfg.type != "static":
continue
print(cfg.id)
print(cfg.family, cfg.id)
iocs = parse(cfg.family, cfg.cfg)
print(iocs.prettyprint())
print()
continue


Expand Down

0 comments on commit c742ccb

Please sign in to comment.