From 9dec9066ee5db11961ca389bc801fb06a6e7caf8 Mon Sep 17 00:00:00 2001 From: Eric McGinnis Date: Sat, 29 Mar 2025 15:06:20 -0700 Subject: [PATCH 1/4] Move risk_score and risk_severity from tags into higher level Detection_Abstract object. Make sure risk_severity is consistent and included only in json content that requires it. Updated serialization logic for detections.json --- .../detection_abstract.py | 59 +++++++++++++++++-- contentctl/objects/rba.py | 34 +---------- contentctl/output/api_json_output.py | 8 ++- .../templates/savedsearches_detections.j2 | 7 +-- 4 files changed, 60 insertions(+), 48 deletions(-) diff --git a/contentctl/objects/abstract_security_content_objects/detection_abstract.py b/contentctl/objects/abstract_security_content_objects/detection_abstract.py index 036ac5c4..6c77a66a 100644 --- a/contentctl/objects/abstract_security_content_objects/detection_abstract.py +++ b/contentctl/objects/abstract_security_content_objects/detection_abstract.py @@ -43,10 +43,11 @@ DetectionStatus, NistCategory, ProvidingTechnology, + RiskSeverity, ) from contentctl.objects.integration_test import IntegrationTest from contentctl.objects.manual_test import ManualTest -from contentctl.objects.rba import RBAObject +from contentctl.objects.rba import RBAObject, RiskScoreValue_Type from contentctl.objects.security_content_object import SecurityContentObject from contentctl.objects.test_group import TestGroup from contentctl.objects.unit_test import UnitTest @@ -66,6 +67,54 @@ class Detection_Abstract(SecurityContentObject): how_to_implement: str = Field(..., min_length=4) known_false_positives: str = Field(..., min_length=4) rba: Optional[RBAObject] = Field(default=None) + + @computed_field + @property + def risk_score(self) -> RiskScoreValue_Type: + # First get the maximum score associated with + # a risk object. If there are no objects, then + # we should throw an exception. + if self.rba is None or len(self.rba.risk_objects) == 0: + raise Exception( + "There must be at least one Risk Object present to get Severity." + ) + return max([risk_object.score for risk_object in self.rba.risk_objects]) + + @computed_field + @property + def severity(self) -> RiskSeverity: + """ + Severity is required for notables (but not risk objects). + In the contentctl codebase, instead of requiring an additional + field to be added to the YMLs, we derive the severity from the + HIGHEST risk score of any risk object that is part of this detection. + However, if a detection does not have a risk object but still has a notable, + we will use a default value of high. This only impact Correlation searches. As + TTP searches, which also generate notables, must also have risk object(s) + """ + try: + risk_score = self.risk_score + except Exception: + # This object does not have any RBA objects, + # hence no disk score is returned. So we will + # return the defualt value of high + return RiskSeverity.HIGH + + if 0 <= risk_score <= 20: + return RiskSeverity.INFORMATIONAL + elif 20 < risk_score <= 40: + return RiskSeverity.LOW + elif 40 < risk_score <= 60: + return RiskSeverity.MEDIUM + elif 60 < risk_score <= 80: + return RiskSeverity.HIGH + elif 80 < risk_score <= 100: + return RiskSeverity.CRITICAL + else: + raise Exception( + f"Error getting severity - risk_score must be between 0-100, but was actually {self.risk_score}" + ) + explanation: None | str = Field( default=None, exclude=True, # Don't serialize this value when dumping the object @@ -435,12 +484,10 @@ def serialize_model(self): "datamodel": self.datamodel, "source": self.source, "nes_fields": self.nes_fields, + "rba": self.rba, } - if self.rba is not None: - model["risk_severity"] = self.rba.severity - model["tags"]["risk_score"] = self.rba.risk_score - else: - model["tags"]["risk_score"] = 0 + if self.deployment.alert_action.notable: + model["risk_severity"] = self.severity # Only a subset of macro fields are required: all_macros: list[dict[str, str | list[str]]] = [] diff --git a/contentctl/objects/rba.py b/contentctl/objects/rba.py index a63c043e..4d923c67 100644 --- a/contentctl/objects/rba.py +++ b/contentctl/objects/rba.py @@ -4,9 +4,7 @@ from enum import Enum from typing import Annotated, Set -from pydantic import BaseModel, Field, computed_field, model_serializer - -from contentctl.objects.enums import RiskSeverity +from pydantic import BaseModel, Field, model_serializer RiskScoreValue_Type = Annotated[int, Field(ge=1, le=100)] @@ -108,36 +106,6 @@ class RBAObject(BaseModel, ABC): risk_objects: Annotated[Set[RiskObject], Field(min_length=1)] threat_objects: Set[ThreatObject] - @computed_field - @property - def risk_score(self) -> RiskScoreValue_Type: - # First get the maximum score associated with - # a risk object. If there are no objects, then - # we should throw an exception. - if len(self.risk_objects) == 0: - raise Exception( - "There must be at least one Risk Object present to get Severity." - ) - return max([risk_object.score for risk_object in self.risk_objects]) - - @computed_field - @property - def severity(self) -> RiskSeverity: - if 0 <= self.risk_score <= 20: - return RiskSeverity.INFORMATIONAL - elif 20 < self.risk_score <= 40: - return RiskSeverity.LOW - elif 40 < self.risk_score <= 60: - return RiskSeverity.MEDIUM - elif 60 < self.risk_score <= 80: - return RiskSeverity.HIGH - elif 80 < self.risk_score <= 100: - return RiskSeverity.CRITICAL - else: - raise Exception( - f"Error getting severity - risk_score must be between 0-100, but was actually {self.risk_score}" - ) - @model_serializer def serialize_rba(self) -> dict[str, str | list[dict[str, str | int]]]: return { diff --git a/contentctl/output/api_json_output.py b/contentctl/output/api_json_output.py index 80c66b23..846d44bb 100644 --- a/contentctl/output/api_json_output.py +++ b/contentctl/output/api_json_output.py @@ -1,14 +1,15 @@ from __future__ import annotations + from typing import TYPE_CHECKING if TYPE_CHECKING: + from contentctl.objects.baseline import Baseline + from contentctl.objects.deployment import Deployment from contentctl.objects.detection import Detection + from contentctl.objects.investigation import Investigation from contentctl.objects.lookup import Lookup from contentctl.objects.macro import Macro from contentctl.objects.story import Story - from contentctl.objects.baseline import Baseline - from contentctl.objects.investigation import Investigation - from contentctl.objects.deployment import Deployment import os import pathlib @@ -42,6 +43,7 @@ def writeDetections( "search", "how_to_implement", "known_false_positives", + "rba", "references", "datamodel", "macros", diff --git a/contentctl/output/templates/savedsearches_detections.j2 b/contentctl/output/templates/savedsearches_detections.j2 index 1df24b42..ff71f6c0 100644 --- a/contentctl/output/templates/savedsearches_detections.j2 +++ b/contentctl/output/templates/savedsearches_detections.j2 @@ -65,12 +65,7 @@ action.notable.param.nes_fields = {{ detection.nes_fields }} action.notable.param.rule_description = {{ detection.deployment.alert_action.notable.rule_description | custom_jinja2_enrichment_filter(detection) | escapeNewlines()}} action.notable.param.rule_title = {% if detection.type | lower == "correlation" %}RBA: {{ detection.deployment.alert_action.notable.rule_title | custom_jinja2_enrichment_filter(detection) }}{% else %}{{ detection.deployment.alert_action.notable.rule_title | custom_jinja2_enrichment_filter(detection) }}{% endif +%} action.notable.param.security_domain = {{ detection.tags.security_domain }} -{% if detection.rba %} -action.notable.param.severity = {{ detection.rba.severity }} -{% else %} -{# Correlations do not have detection.rba defined, but should get a default severity #} -action.notable.param.severity = high -{% endif %} +action.notable.param.severity = {{ detection.severity }} {% endif %} {% if detection.deployment.alert_action.email %} action.email.subject.alert = {{ detection.deployment.alert_action.email.subject | custom_jinja2_enrichment_filter(detection) | escapeNewlines() }} From 8011fee58508ddea2c90692965f0ab0c421c2cb4 Mon Sep 17 00:00:00 2001 From: Eric McGinnis Date: Thu, 10 Apr 2025 09:41:49 -0700 Subject: [PATCH 2/4] do not include objects with null values when serializing for api output --- contentctl/output/api_json_output.py | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/contentctl/output/api_json_output.py b/contentctl/output/api_json_output.py index 846d44bb..bc4747af 100644 --- a/contentctl/output/api_json_output.py +++ b/contentctl/output/api_json_output.py @@ -16,6 +16,8 @@ from contentctl.output.json_writer import JsonWriter +EXCLUDE_NONE: bool = True + class ApiJsonOutput: output_path: pathlib.Path @@ -51,7 +53,8 @@ def writeDetections( "source", "nes_fields", ] - ) + ), + exclude_none=EXCLUDE_NONE, ) for detection in objects ] @@ -78,7 +81,10 @@ def writeMacros( objects: list[Macro], ) -> None: macros = [ - macro.model_dump(include=set(["definition", "description", "name"])) + macro.model_dump( + include=set(["definition", "description", "name"]), + exclude_none=EXCLUDE_NONE, + ) for macro in objects ] for macro in macros: @@ -111,7 +117,8 @@ def writeStories( "baseline_names", "detections", ] - ) + ), + exclude_none=EXCLUDE_NONE, ) for story in objects ] @@ -157,7 +164,8 @@ def writeBaselines( "references", "tags", ] - ) + ), + exclude_none=EXCLUDE_NONE, ) for baseline in objects ] @@ -190,7 +198,8 @@ def writeInvestigations( "tags", "lowercase_name", ] - ) + ), + exclude_none=EXCLUDE_NONE, ) for investigation in objects ] @@ -218,7 +227,8 @@ def writeLookups( "min_matches", "case_sensitive_match", ] - ) + ), + exclude_none=EXCLUDE_NONE, ) for lookup in objects ] @@ -248,7 +258,8 @@ def writeDeployments( "rba", "tags", ] - ) + ), + exclude_none=EXCLUDE_NONE, ) for deployment in objects ] From 79fbb2b3a20f2b644b928e4ed2059e69d829ae7c Mon Sep 17 00:00:00 2001 From: Eric McGinnis Date: Thu, 10 Apr 2025 10:14:04 -0700 Subject: [PATCH 3/4] Revert "do not include objects with" This reverts commit 8011fee58508ddea2c90692965f0ab0c421c2cb4. --- contentctl/output/api_json_output.py | 25 +++++++------------------ 1 file changed, 7 insertions(+), 18 deletions(-) diff --git a/contentctl/output/api_json_output.py b/contentctl/output/api_json_output.py index bc4747af..846d44bb 100644 --- a/contentctl/output/api_json_output.py +++ b/contentctl/output/api_json_output.py @@ -16,8 +16,6 @@ from contentctl.output.json_writer import JsonWriter -EXCLUDE_NONE: bool = True - class ApiJsonOutput: output_path: pathlib.Path @@ -53,8 +51,7 @@ def writeDetections( "source", "nes_fields", ] - ), - exclude_none=EXCLUDE_NONE, + ) ) for detection in objects ] @@ -81,10 +78,7 @@ def writeMacros( objects: list[Macro], ) -> None: macros = [ - macro.model_dump( - include=set(["definition", "description", "name"]), - exclude_none=EXCLUDE_NONE, - ) + macro.model_dump(include=set(["definition", "description", "name"])) for macro in objects ] for macro in macros: @@ -117,8 +111,7 @@ def writeStories( "baseline_names", "detections", ] - ), - exclude_none=EXCLUDE_NONE, + ) ) for story in objects ] @@ -164,8 +157,7 @@ def writeBaselines( "references", "tags", ] - ), - exclude_none=EXCLUDE_NONE, + ) ) for baseline in objects ] @@ -198,8 +190,7 @@ def writeInvestigations( "tags", "lowercase_name", ] - ), - exclude_none=EXCLUDE_NONE, + ) ) for investigation in objects ] @@ -227,8 +218,7 @@ def writeLookups( "min_matches", "case_sensitive_match", ] - ), - exclude_none=EXCLUDE_NONE, + ) ) for lookup in objects ] @@ -258,8 +248,7 @@ def writeDeployments( "rba", "tags", ] - ), - exclude_none=EXCLUDE_NONE, + ) ) for deployment in objects ] From ddafd48cfbd19b73144807320afee2afea5201f4 Mon Sep 17 00:00:00 2001 From: Eric McGinnis Date: Thu, 10 Apr 2025 10:14:57 -0700 Subject: [PATCH 4/4] make sure that when serializing if rba was None, it is actually now {} --- .../abstract_security_content_objects/detection_abstract.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contentctl/objects/abstract_security_content_objects/detection_abstract.py b/contentctl/objects/abstract_security_content_objects/detection_abstract.py index 6c77a66a..d938d3d4 100644 --- a/contentctl/objects/abstract_security_content_objects/detection_abstract.py +++ b/contentctl/objects/abstract_security_content_objects/detection_abstract.py @@ -484,7 +484,7 @@ def serialize_model(self): "datamodel": self.datamodel, "source": self.source, "nes_fields": self.nes_fields, - "rba": self.rba, + "rba": self.rba or {}, } if self.deployment.alert_action.notable: model["risk_severity"] = self.severity