Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 87 additions & 13 deletions api_app/analyzers_manager/observable_analyzers/apivoid.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,111 @@
# everything else is linted and tested
# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl
# See the file 'LICENSE' for copying permission.

import json
import requests

from api_app.analyzers_manager import classes
from api_app.analyzers_manager.exceptions import AnalyzerConfigurationException
from api_app.choices import Classification

from tests.mock_utils import MockUpResponse, if_mock_connections, patch


class ApiVoidAnalyzer(classes.ObservableAnalyzer):
url = "https://endpoint.apivoid.com"
"""
APIVoid analyzer with support for both the legacy v1 endpoints (query param key)
and the new v2 endpoints (header X-API-Key + JSON POST). The analyzer will use
v2 when `self.config.get("api_version") == "v2"` or if `self._api_use_v2` is True.
Otherwise it falls back to v1 behaviour for backwards compatibility.
"""

# Legacy default (kept for backward compatibility)
url_v1 = "https://endpoint.apivoid.com"
# New base (v2) per APIVoid docs/changelog
url_v2 = "https://api.apivoid.com"

_api_key: str = None
_api_use_v2: bool = False

def update(self):
pass
"""
Pull API key and optional flags from analyzer configuration.
Expected config keys:
- api_key
- api_version (optional) -> "v1" or "v2"
- use_v2 (optional, bool) -> explicit boolean flag
"""
cfg = getattr(self, "config", {}) or {}
# Support legacy config naming
self._api_key = cfg.get("api_key") or cfg.get("key") or self._api_key

api_version = cfg.get("api_version")
if api_version:
self._api_use_v2 = str(api_version).lower() == "v2"
else:
self._api_use_v2 = bool(cfg.get("use_v2", self._api_use_v2))

def _build_v1_url(self, path, parameter):
return (
f"{self.url_v1}/{path}/v1/pay-as-you-go/?key={self._api_key}"
f"&{parameter}={self.observable_name}"
)

def _call_v1(self, path, parameter):
url = self._build_v1_url(path, parameter)
r = requests.get(url)
r.raise_for_status()
return r.json()

def _call_v2(self, endpoint_path, payload):
"""
APIVoid v2: JSON POST with `X-API-Key` header.
"""
url = f"{self.url_v2}/{endpoint_path}"
headers = {
"Content-Type": "application/json",
"X-API-Key": self._api_key,
}
r = requests.post(url, headers=headers, data=json.dumps(payload), timeout=30)
r.raise_for_status()
return r.json()

def run(self):
"""
Select endpoint based on observable classification,
prefer v2 when configured, otherwise fallback to v1.
"""
if self.observable_classification == Classification.DOMAIN.value:
path = "domainbl"
parameter = "host"
v1_path = "domainbl"
v1_param = "host"
v2_endpoint = "v2/domain-reputation"
v2_payload = {"domain": self.observable_name}

elif self.observable_classification == Classification.IP.value:
path = "iprep"
parameter = "ip"
v1_path = "iprep"
v1_param = "ip"
v2_endpoint = "v2/ip-reputation"
v2_payload = {"ip": self.observable_name}

elif self.observable_classification == Classification.URL.value:
path = "urlrep"
parameter = "url"
v1_path = "urlrep"
v1_param = "url"
v2_endpoint = "v2/url-reputation"
v2_payload = {"url": self.observable_name}

else:
raise AnalyzerConfigurationException("not supported")
complete_url = f"{self.url}/{path}/v1/pay-as-you-go/?key={self._api_key}&{parameter}={self.observable_name}"
r = requests.get(complete_url)
r.raise_for_status()
return r.json()

if self._api_use_v2:
try:
return self._call_v2(v2_endpoint, v2_payload)
except Exception:
if self._api_key:
return self._call_v1(v1_path, v1_param)
raise

return self._call_v1(v1_path, v1_param)

@classmethod
def _monkeypatch(cls):
Expand All @@ -54,7 +128,7 @@ def _monkeypatch(cls):
"detected": False,
"reference": "https://0spam.org/",
"elapsed": "0.09",
},
}
},
"detections": 7,
"engines_count": 79,
Expand Down
22 changes: 18 additions & 4 deletions api_app/analyzers_manager/observable_analyzers/greynoiseintel.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@

def run(self):
response = {}

# Select API version
if self.greynoise_api_version == "v2":
session = GreyNoise(
api_key=self._api_key_name,
Expand All @@ -48,23 +50,30 @@
raise AnalyzerRunException(
"Invalid API Version. Supported are: v2 (paid), v3 (community)"
)

try:
# Base lookup
response = session.ip(self.observable_name)

# SAFE, CodeQL-approved merge for v2
if self.greynoise_api_version == "v2":
response |= session.riot(self.observable_name)
# greynoise library does provide empty messages in case of these errors...
# so it's better to catch them and create custom management
riot_data = session.riot(self.observable_name)
if isinstance(riot_data, dict):
response.update(riot_data)

except RateLimitError as e:
self.disable_for_rate_limit()
self.report.errors.append(e)
self.report.save()
raise AnalyzerRunException(f"Rate limit error: {e}")

except RequestFailure as e:
self.report.errors.append(e)
self.report.save()
raise AnalyzerRunException(f"Request failure error: {e}")

except NotFound as e:
logger.info(f"not found error for {self.observable_name} :{e}")
logger.info(f"not found error for {self.observable_name}: {e}")
response["not_found"] = True

return response
Expand All @@ -82,23 +91,28 @@
classification = self.report.report.get("classification", None)
riot = self.report.report.get("riot", None)
noise = self.report.report.get("noise", None)

if classification:
classification = classification.lower()
self.report: AnalyzerReport

if classification == self.EVALUATIONS.MALICIOUS.value:
if not noise:
logger.error("malicious IP is not a noise!?! How is this possible")
data_model.evaluation = self.EVALUATIONS.MALICIOUS.value
data_model.reliability = 7

Check failure

Code scanning / CodeQL

Clear-text logging of sensitive information High

This expression logs
sensitive data (password)
as clear text.
This expression logs
sensitive data (password)
as clear text.
This expression logs
sensitive data (password)
as clear text.
This expression logs
sensitive data (password)
as clear text.
elif classification == "unknown":
if riot:
data_model.evaluation = self.EVALUATIONS.TRUSTED.value
data_model.reliability = 1
elif noise:
data_model.evaluation = self.EVALUATIONS.MALICIOUS.value

elif classification == "benign":
data_model.evaluation = self.EVALUATIONS.TRUSTED.value
data_model.reliability = 7

else:
logger.error(
f"there should not be other types of classification. Classification found: {classification}"
Expand Down
Loading