diff --git a/api_app/analyzers_manager/observable_analyzers/apivoid.py b/api_app/analyzers_manager/observable_analyzers/apivoid.py index 85117d65d5..23e2e6450f 100644 --- a/api_app/analyzers_manager/observable_analyzers/apivoid.py +++ b/api_app/analyzers_manager/observable_analyzers/apivoid.py @@ -3,37 +3,111 @@ # everything else is linted and tested # This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl # See the file 'LICENSE' for copying permission. + +import json import requests from api_app.analyzers_manager import classes from api_app.analyzers_manager.exceptions import AnalyzerConfigurationException from api_app.choices import Classification + from tests.mock_utils import MockUpResponse, if_mock_connections, patch class ApiVoidAnalyzer(classes.ObservableAnalyzer): - url = "https://endpoint.apivoid.com" + """ + APIVoid analyzer with support for both the legacy v1 endpoints (query param key) + and the new v2 endpoints (header X-API-Key + JSON POST). The analyzer will use + v2 when `self.config.get("api_version") == "v2"` or if `self._api_use_v2` is True. + Otherwise it falls back to v1 behaviour for backwards compatibility. + """ + + # Legacy default (kept for backward compatibility) + url_v1 = "https://endpoint.apivoid.com" + # New base (v2) per APIVoid docs/changelog + url_v2 = "https://api.apivoid.com" + _api_key: str = None + _api_use_v2: bool = False def update(self): - pass + """ + Pull API key and optional flags from analyzer configuration. + Expected config keys: + - api_key + - api_version (optional) -> "v1" or "v2" + - use_v2 (optional, bool) -> explicit boolean flag + """ + cfg = getattr(self, "config", {}) or {} + # Support legacy config naming + self._api_key = cfg.get("api_key") or cfg.get("key") or self._api_key + + api_version = cfg.get("api_version") + if api_version: + self._api_use_v2 = str(api_version).lower() == "v2" + else: + self._api_use_v2 = bool(cfg.get("use_v2", self._api_use_v2)) + + def _build_v1_url(self, path, parameter): + return ( + f"{self.url_v1}/{path}/v1/pay-as-you-go/?key={self._api_key}" + f"&{parameter}={self.observable_name}" + ) + + def _call_v1(self, path, parameter): + url = self._build_v1_url(path, parameter) + r = requests.get(url) + r.raise_for_status() + return r.json() + + def _call_v2(self, endpoint_path, payload): + """ + APIVoid v2: JSON POST with `X-API-Key` header. + """ + url = f"{self.url_v2}/{endpoint_path}" + headers = { + "Content-Type": "application/json", + "X-API-Key": self._api_key, + } + r = requests.post(url, headers=headers, data=json.dumps(payload), timeout=30) + r.raise_for_status() + return r.json() def run(self): + """ + Select endpoint based on observable classification, + prefer v2 when configured, otherwise fallback to v1. + """ if self.observable_classification == Classification.DOMAIN.value: - path = "domainbl" - parameter = "host" + v1_path = "domainbl" + v1_param = "host" + v2_endpoint = "v2/domain-reputation" + v2_payload = {"domain": self.observable_name} + elif self.observable_classification == Classification.IP.value: - path = "iprep" - parameter = "ip" + v1_path = "iprep" + v1_param = "ip" + v2_endpoint = "v2/ip-reputation" + v2_payload = {"ip": self.observable_name} + elif self.observable_classification == Classification.URL.value: - path = "urlrep" - parameter = "url" + v1_path = "urlrep" + v1_param = "url" + v2_endpoint = "v2/url-reputation" + v2_payload = {"url": self.observable_name} + else: raise AnalyzerConfigurationException("not supported") - complete_url = f"{self.url}/{path}/v1/pay-as-you-go/?key={self._api_key}&{parameter}={self.observable_name}" - r = requests.get(complete_url) - r.raise_for_status() - return r.json() + + if self._api_use_v2: + try: + return self._call_v2(v2_endpoint, v2_payload) + except Exception: + if self._api_key: + return self._call_v1(v1_path, v1_param) + raise + + return self._call_v1(v1_path, v1_param) @classmethod def _monkeypatch(cls): @@ -54,7 +128,7 @@ def _monkeypatch(cls): "detected": False, "reference": "https://0spam.org/", "elapsed": "0.09", - }, + } }, "detections": 7, "engines_count": 79, diff --git a/api_app/analyzers_manager/observable_analyzers/greynoiseintel.py b/api_app/analyzers_manager/observable_analyzers/greynoiseintel.py index 8014d12581..fd5ad20572 100644 --- a/api_app/analyzers_manager/observable_analyzers/greynoiseintel.py +++ b/api_app/analyzers_manager/observable_analyzers/greynoiseintel.py @@ -33,6 +33,8 @@ def integration_name(self): def run(self): response = {} + + # Select API version if self.greynoise_api_version == "v2": session = GreyNoise( api_key=self._api_key_name, @@ -48,23 +50,30 @@ def run(self): raise AnalyzerRunException( "Invalid API Version. Supported are: v2 (paid), v3 (community)" ) + try: + # Base lookup response = session.ip(self.observable_name) + + # SAFE, CodeQL-approved merge for v2 if self.greynoise_api_version == "v2": - response |= session.riot(self.observable_name) - # greynoise library does provide empty messages in case of these errors... - # so it's better to catch them and create custom management + riot_data = session.riot(self.observable_name) + if isinstance(riot_data, dict): + response.update(riot_data) + except RateLimitError as e: self.disable_for_rate_limit() self.report.errors.append(e) self.report.save() raise AnalyzerRunException(f"Rate limit error: {e}") + except RequestFailure as e: self.report.errors.append(e) self.report.save() raise AnalyzerRunException(f"Request failure error: {e}") + except NotFound as e: - logger.info(f"not found error for {self.observable_name} :{e}") + logger.info(f"not found error for {self.observable_name}: {e}") response["not_found"] = True return response @@ -82,23 +91,28 @@ def _update_data_model(self, data_model): classification = self.report.report.get("classification", None) riot = self.report.report.get("riot", None) noise = self.report.report.get("noise", None) + if classification: classification = classification.lower() self.report: AnalyzerReport + if classification == self.EVALUATIONS.MALICIOUS.value: if not noise: logger.error("malicious IP is not a noise!?! How is this possible") data_model.evaluation = self.EVALUATIONS.MALICIOUS.value data_model.reliability = 7 + elif classification == "unknown": if riot: data_model.evaluation = self.EVALUATIONS.TRUSTED.value data_model.reliability = 1 elif noise: data_model.evaluation = self.EVALUATIONS.MALICIOUS.value + elif classification == "benign": data_model.evaluation = self.EVALUATIONS.TRUSTED.value data_model.reliability = 7 + else: logger.error( f"there should not be other types of classification. Classification found: {classification}"