From 9665d9d7afadbc1d7e542baaa1c833808876a47d Mon Sep 17 00:00:00 2001 From: Robert Szefler Date: Sun, 5 May 2024 01:10:21 +0200 Subject: [PATCH] Slack notification grouping (#1371) * Slack notification grouping and summarization * post-review fixes * change the way grouping_summary_mode and grouping_enabled are initialized in SinkBase * more post-review fixes * more readable formatting of the interval in group summary messages * more post-review fixes * don't create Slack threads unless in summary mode; summary msg handling simplifications * doc fixes, make ignore_first mandatory, change default interval * fix time window tracking; oop refactor * improve dependency specs --- docs/configuration/sinks/slack.rst | 80 ++++++++- poetry.lock | 2 +- pyproject.toml | 6 +- .../core/sinks/datadog/datadog_sink_params.py | 4 + .../core/sinks/discord/discord_sink_params.py | 4 + .../core/sinks/file/file_sink_params.py | 4 + .../sinks/google_chat/google_chat_params.py | 4 + .../core/sinks/jira/jira_sink_params.py | 4 + .../core/sinks/kafka/kafka_sink_params.py | 4 + .../core/sinks/mail/mail_sink_params.py | 4 + .../mattermost/mattermost_sink_params.py | 4 + .../core/sinks/msteams/msteams_sink_params.py | 4 + .../sinks/opsgenie/opsgenie_sink_params.py | 4 + .../sinks/pagerduty/pagerduty_sink_params.py | 4 + .../sinks/pushover/pushover_sink_params.py | 5 + .../core/sinks/robusta/robusta_sink_params.py | 4 + .../rocketchat/rocketchat_sink_params.py | 4 + .../servicenow/servicenow_sink_params.py | 4 + src/robusta/core/sinks/sink_base.py | 126 +++++++++++++- src/robusta/core/sinks/sink_base_params.py | 63 ++++++- src/robusta/core/sinks/slack/slack_sink.py | 77 ++++++++- .../core/sinks/slack/slack_sink_params.py | 8 + .../sinks/telegram/telegram_sink_params.py | 4 + .../sinks/victorops/victorops_sink_params.py | 4 + .../core/sinks/webex/webex_sink_params.py | 4 + .../core/sinks/webhook/webhook_sink_params.py | 4 + .../yamessenger/yamessenger_sink_params.py | 4 + src/robusta/integrations/slack/sender.py | 159 +++++++++++++++++- tests/test_blocks.py | 41 +++-- tests/test_scope_matching.py | 15 +- tests/test_slack.py | 16 +- 31 files changed, 622 insertions(+), 52 deletions(-) diff --git a/docs/configuration/sinks/slack.rst b/docs/configuration/sinks/slack.rst index 3318b92b2..9bf60fb42 100644 --- a/docs/configuration/sinks/slack.rst +++ b/docs/configuration/sinks/slack.rst @@ -68,7 +68,7 @@ Example: .. code-block:: yaml sinks_config: - # slack integration params + # slack integration params, like slack_channel, api_key etc - slack_sink: name: main_slack_sink api_key: xoxb-112... @@ -115,6 +115,84 @@ If you'd like to automatically tag users on builtin alerts, please `let us know `_. We want to hear requirements. + +Grouping and summarizing messages +------------------------------------------------------------------- + +Some large systems that are being monitored by Robusta could generate +considerable amounts of notifications that are quite similar to each other +(for example, concern one type of a problem occurring over some part of +the cluster). For such cases, Robusta provides a mechanism that will reduce +the amount of clutter in Slack channels by grouping notifications based +on their properties and possibly summarizing the numbers of their +occurrences. + +The mechanism is enabled by the ``grouping`` section in the Slack sink +config. The parameters you can group on (specified in the ``group_by`` +section) are basically any values in the k8s event payload, with one +special addition - ``workload`` that will hold the name of the top-level +entity name for the event. Labels and annotations are supported as +described in the example below. If you specify ``grouping`` without +defining a ``group_by``, the default will be to group by the cluster +name. + +The grouping mechanism supports the ``interval`` setting, which defines +the length of the window over which notifications will be aggregated. +The window starts when the first message belonging to the group arrives, +and ends when the specified interval elapses. If you don't specify the +``interval``, the default value will be 15 minutes. + +There are two modes for this functionality, selected by the +``notification_mode`` subsection. For the ``regular`` mode, you have to +specify the ``ignore_first`` value. This value will determine the +minimum amount of notifications in any group that would have to occur +in the time specified by ``interval`` before they are sent as Slack +messages. This mode works like a false positive filter - it only triggers +the Slack sink if notifications are incoming at a speed above the set +threshold. + +The ``summary`` mode allows summarizing the number of notifications in a +succinct way. The summary will be sent to Slack as a single message and will +include information about the number of all the messages in the group. +The summarization will be formatted as a table and done according +to the attributes listed under ``summary.by``. In case ``summary.threaded`` +is ``true``, all the Slack notifications belonging to this group will be +appended in a thread under this header message. If ``summary.threaded`` is +``false``, the notifications will not be sent to Slack at all, and only the +summary message will appear. + +The information in the summary message will be dynamically updated with +numbers of notifications in the group as they are incoming, regardless +of whether ``summary.threaded`` is enabled or not. + +.. code-block:: + + sinksConfig: + - slack_sink: + # slack integration params, like slack_channel, api_key etc + grouping: + group_by: + - workload + - labels: + - app + - annotations: + - experimental_deployment + interval: 1800 # group time window, seconds + notification_mode: + summary: + threaded: true + by: + - identifier + - severity + +.. note:: + + In the current, initial implementation of this mechanism, the + statistics of notifications are held in memory and not persisted + anywhere, so when the Robusta runner dies/restarts, they are lost + and the counting starts anew. + + Creating Custom Slack Apps ------------------------------------------------------------------- diff --git a/poetry.lock b/poetry.lock index 9f1c0cd9b..344715e44 100644 --- a/poetry.lock +++ b/poetry.lock @@ -3794,4 +3794,4 @@ all = ["CairoSVG", "Flask", "better-exceptions", "datadog-api-client", "dulwich" [metadata] lock-version = "2.0" python-versions = "^3.8, <3.11" -content-hash = "13125594642af04ae1059a52c9205fc0cdd42fec1a5b3c12890ea2f50132c099" +content-hash = "da2bd19191a79a253835d11e4ed03944a13218b3c0ee07872796574606eb2f13" diff --git a/pyproject.toml b/pyproject.toml index 3bbbe05c0..902668089 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,6 +58,11 @@ watchgod = "^0.7" webexteamssdk = "^1.6.1" bitmath = "^1.3.3.1" croniter = "^1.3.15" +humanize = "^3.13.1" + +# The following are added to speed up poetry dependency resolution +botocore = "1.31.72" +boto3 = "1.28.72" # we're freezing a specific version here because the latest version doesn't have prebuilt wheels on pypi # and therefore requires gcc to install which we'd like to avoid @@ -88,7 +93,6 @@ sphinxcontrib-images = "^0.9.4" jsonref = "^0.2" Pillow = "^10.3.0" sphinxcontrib-mermaid = "^0.7.1" -humanize = "^3.13.1" cssselect = "^1.1.0" pygal = "^3.0.0" tinycss = "^0.4" diff --git a/src/robusta/core/sinks/datadog/datadog_sink_params.py b/src/robusta/core/sinks/datadog/datadog_sink_params.py index 70a1c2411..f4d15d924 100644 --- a/src/robusta/core/sinks/datadog/datadog_sink_params.py +++ b/src/robusta/core/sinks/datadog/datadog_sink_params.py @@ -5,6 +5,10 @@ class DataDogSinkParams(SinkBaseParams): api_key: str + @classmethod + def _get_sink_type(cls): + return "datadog" + class DataDogSinkConfigWrapper(SinkConfigBase): datadog_sink: DataDogSinkParams diff --git a/src/robusta/core/sinks/discord/discord_sink_params.py b/src/robusta/core/sinks/discord/discord_sink_params.py index f65da8947..f58fa14e8 100644 --- a/src/robusta/core/sinks/discord/discord_sink_params.py +++ b/src/robusta/core/sinks/discord/discord_sink_params.py @@ -5,6 +5,10 @@ class DiscordSinkParams(SinkBaseParams): url: str + @classmethod + def _get_sink_type(cls): + return "discord" + class DiscordSinkConfigWrapper(SinkConfigBase): discord_sink: DiscordSinkParams diff --git a/src/robusta/core/sinks/file/file_sink_params.py b/src/robusta/core/sinks/file/file_sink_params.py index 0ba97eabd..706def1c2 100644 --- a/src/robusta/core/sinks/file/file_sink_params.py +++ b/src/robusta/core/sinks/file/file_sink_params.py @@ -7,6 +7,10 @@ class FileSinkParms(SinkBaseParams): file_name: str = None + @classmethod + def _get_sink_type(cls): + return "file" + class FileSinkConfigWrapper(SinkConfigBase): file_sink: FileSinkParms diff --git a/src/robusta/core/sinks/google_chat/google_chat_params.py b/src/robusta/core/sinks/google_chat/google_chat_params.py index 08e2d12d8..781f62cf7 100644 --- a/src/robusta/core/sinks/google_chat/google_chat_params.py +++ b/src/robusta/core/sinks/google_chat/google_chat_params.py @@ -7,6 +7,10 @@ class GoogleChatSinkParams(SinkBaseParams): webhook_url: SecretStr + @classmethod + def _get_sink_type(cls): + return "google_chat" + class GoogleChatSinkConfigWrapper(SinkConfigBase): google_chat_sink: GoogleChatSinkParams diff --git a/src/robusta/core/sinks/jira/jira_sink_params.py b/src/robusta/core/sinks/jira/jira_sink_params.py index 395810e8a..9564d5711 100644 --- a/src/robusta/core/sinks/jira/jira_sink_params.py +++ b/src/robusta/core/sinks/jira/jira_sink_params.py @@ -19,6 +19,10 @@ class JiraSinkParams(SinkBaseParams): reopenStatusName: Optional[str] = "To Do" noReopenResolution: Optional[str] = "" + @classmethod + def _get_sink_type(cls): + return "jira" + class JiraSinkConfigWrapper(SinkConfigBase): jira_sink: JiraSinkParams diff --git a/src/robusta/core/sinks/kafka/kafka_sink_params.py b/src/robusta/core/sinks/kafka/kafka_sink_params.py index c379b36f3..3e5009678 100644 --- a/src/robusta/core/sinks/kafka/kafka_sink_params.py +++ b/src/robusta/core/sinks/kafka/kafka_sink_params.py @@ -7,6 +7,10 @@ class KafkaSinkParams(SinkBaseParams): topic: str auth: dict = {} + @classmethod + def _get_sink_type(cls): + return "kafka" + class KafkaSinkConfigWrapper(SinkConfigBase): kafka_sink: KafkaSinkParams diff --git a/src/robusta/core/sinks/mail/mail_sink_params.py b/src/robusta/core/sinks/mail/mail_sink_params.py index a21079d86..07e5d7016 100644 --- a/src/robusta/core/sinks/mail/mail_sink_params.py +++ b/src/robusta/core/sinks/mail/mail_sink_params.py @@ -7,6 +7,10 @@ class MailSinkParams(SinkBaseParams): mailto: str + @classmethod + def _get_sink_type(cls): + return "mail" + @validator("mailto") def validate_mailto(cls, mailto): # Make sure we only handle emails and exclude other schemes provided by apprise diff --git a/src/robusta/core/sinks/mattermost/mattermost_sink_params.py b/src/robusta/core/sinks/mattermost/mattermost_sink_params.py index 889e63593..7cc4387a1 100644 --- a/src/robusta/core/sinks/mattermost/mattermost_sink_params.py +++ b/src/robusta/core/sinks/mattermost/mattermost_sink_params.py @@ -15,6 +15,10 @@ class MattermostSinkParams(SinkBaseParams): team: Optional[str] team_id: Optional[str] + @classmethod + def _get_sink_type(cls): + return "mattermost" + @validator("url") def set_http_schema(cls, url): parsed_url = urlparse(url) diff --git a/src/robusta/core/sinks/msteams/msteams_sink_params.py b/src/robusta/core/sinks/msteams/msteams_sink_params.py index d45ddeb19..826c0ce27 100644 --- a/src/robusta/core/sinks/msteams/msteams_sink_params.py +++ b/src/robusta/core/sinks/msteams/msteams_sink_params.py @@ -5,6 +5,10 @@ class MsTeamsSinkParams(SinkBaseParams): webhook_url: str + @classmethod + def _get_sink_type(cls): + return "msteams" + class MsTeamsSinkConfigWrapper(SinkConfigBase): ms_teams_sink: MsTeamsSinkParams diff --git a/src/robusta/core/sinks/opsgenie/opsgenie_sink_params.py b/src/robusta/core/sinks/opsgenie/opsgenie_sink_params.py index 09ad33aca..9a69ddc72 100644 --- a/src/robusta/core/sinks/opsgenie/opsgenie_sink_params.py +++ b/src/robusta/core/sinks/opsgenie/opsgenie_sink_params.py @@ -10,6 +10,10 @@ class OpsGenieSinkParams(SinkBaseParams): tags: List[str] = [] host: Optional[str] = None # NOTE: If None, the default value will be used from opsgenie_sdk + @classmethod + def _get_sink_type(cls): + return "opsgenie" + class OpsGenieSinkConfigWrapper(SinkConfigBase): opsgenie_sink: OpsGenieSinkParams diff --git a/src/robusta/core/sinks/pagerduty/pagerduty_sink_params.py b/src/robusta/core/sinks/pagerduty/pagerduty_sink_params.py index 918d9183c..f467067ed 100644 --- a/src/robusta/core/sinks/pagerduty/pagerduty_sink_params.py +++ b/src/robusta/core/sinks/pagerduty/pagerduty_sink_params.py @@ -5,6 +5,10 @@ class PagerdutySinkParams(SinkBaseParams): api_key: str + @classmethod + def _get_sink_type(cls): + return "pagerduty" + class PagerdutyConfigWrapper(SinkConfigBase): pagerduty_sink: PagerdutySinkParams diff --git a/src/robusta/core/sinks/pushover/pushover_sink_params.py b/src/robusta/core/sinks/pushover/pushover_sink_params.py index 0cf31bee2..71222946d 100644 --- a/src/robusta/core/sinks/pushover/pushover_sink_params.py +++ b/src/robusta/core/sinks/pushover/pushover_sink_params.py @@ -10,6 +10,11 @@ class PushoverSinkParams(SinkBaseParams): device: str = None pushover_url: str = "https://api.pushover.net/1/messages.json" + @classmethod + def _get_sink_type(cls): + return "pushover" + + class PushoverSinkConfigWrapper(SinkConfigBase): pushover_sink: PushoverSinkParams diff --git a/src/robusta/core/sinks/robusta/robusta_sink_params.py b/src/robusta/core/sinks/robusta/robusta_sink_params.py index 0f58df315..32f1d0881 100644 --- a/src/robusta/core/sinks/robusta/robusta_sink_params.py +++ b/src/robusta/core/sinks/robusta/robusta_sink_params.py @@ -17,6 +17,10 @@ class RobustaSinkParams(SinkBaseParams): ttl_hours: int = 4380 # Time before unactive cluster data is deleted. 6 Months default. persist_events: bool = False + @classmethod + def _get_sink_type(cls): + return "robusta" + class RobustaSinkConfigWrapper(SinkConfigBase): robusta_sink: RobustaSinkParams diff --git a/src/robusta/core/sinks/rocketchat/rocketchat_sink_params.py b/src/robusta/core/sinks/rocketchat/rocketchat_sink_params.py index 20a1c1143..d82333d4d 100644 --- a/src/robusta/core/sinks/rocketchat/rocketchat_sink_params.py +++ b/src/robusta/core/sinks/rocketchat/rocketchat_sink_params.py @@ -12,6 +12,10 @@ class RocketchatSinkParams(SinkBaseParams): user_id: str server_url: str + @classmethod + def _get_sink_type(cls): + return "rocketchat" + def get_rocketchat_channel(self) -> str: return self.channel diff --git a/src/robusta/core/sinks/servicenow/servicenow_sink_params.py b/src/robusta/core/sinks/servicenow/servicenow_sink_params.py index 50b0a20d7..95b671341 100644 --- a/src/robusta/core/sinks/servicenow/servicenow_sink_params.py +++ b/src/robusta/core/sinks/servicenow/servicenow_sink_params.py @@ -12,6 +12,10 @@ class ServiceNowSinkParams(SinkBaseParams): password: SecretStr caller_id: Optional[str] + @classmethod + def _get_sink_type(cls): + return "servicenow" + class ServiceNowSinkConfigWrapper(SinkConfigBase): service_now_sink: ServiceNowSinkParams diff --git a/src/robusta/core/sinks/sink_base.py b/src/robusta/core/sinks/sink_base.py index 578c67bdf..28d012f77 100644 --- a/src/robusta/core/sinks/sink_base.py +++ b/src/robusta/core/sinks/sink_base.py @@ -1,4 +1,10 @@ -from typing import Any +import threading +import time +from abc import abstractmethod, ABC +from collections import defaultdict +from typing import Any, List, Dict, Tuple, DefaultDict, Optional + +from pydantic import BaseModel, Field from robusta.core.model.k8s_operation_type import K8sOperationType from robusta.core.reporting.base import Finding @@ -6,7 +12,53 @@ from robusta.core.sinks.timing import TimeSlice, TimeSliceAlways -class SinkBase: +KeyT = Tuple[str, ...] + + +class NotificationGroup(BaseModel): + timestamps: List[float] = [] + + def register_notification(self, interval: int, threshold: int) -> bool: + """Record information about an incoming notification. Returns True if this + notification should be emitted by a sink, False otherwise.""" + now_ts = time.time() + + # Prune any events older than the interval + while self.timestamps and now_ts - self.timestamps[0] > interval: + self.timestamps.pop(0) + + self.timestamps.append(now_ts) + return len(self.timestamps) >= threshold + + +class NotificationSummary(BaseModel): + message_id: Optional[str] = None # identifier of the summary message + start_ts: float = Field(default_factory=lambda: time.time()) # Timestamp of the first notification + # Keys for the table are determined by grouping.notification_mode.summary.by + summary_table: DefaultDict[KeyT, List[int]] = None + + def register_notification(self, summary_key: KeyT, resolved: bool, interval: int): + now_ts = time.time() + idx = 1 if resolved else 0 + if now_ts - self.start_ts > interval or not self.summary_table: + # Expired or the first summary ever for this group_key, reset the data + self.summary_table = defaultdict(lambda: [0, 0]) + self.start_ts = now_ts + self.message_id = None + self.summary_table[summary_key][idx] += 1 + + +class SinkBase(ABC): + grouping_enabled: bool + grouping_summary_mode: bool + + # Keys for groups and summaries are determined by grouping.group_by + groups: DefaultDict[KeyT, NotificationGroup] + summaries: DefaultDict[KeyT, NotificationSummary] + + # Notification summaries + summary_header: List[str] # descriptive header for the summary table + def __init__(self, sink_params: SinkBaseParams, registry): self.sink_name = sink_params.name self.params = sink_params @@ -20,6 +72,75 @@ def __init__(self, sink_params: SinkBaseParams, registry): self.time_slices = self._build_time_slices_from_params(self.params.activity) + self.grouping_summary_mode = False + self.grouping_enabled = False + + if sink_params.grouping: + self.finding_group_lock = threading.RLock() + self.grouping_enabled = True + if sink_params.grouping.notification_mode.summary: + self.grouping_summary_mode = True + self.summaries = defaultdict(lambda: NotificationSummary()) + self.summary_header = self.create_summary_header() + else: + self.grouping_summary_mode = False + self.groups = defaultdict(lambda: NotificationGroup()) + + def create_summary_header(self): + summary_header = [] + for attr in self.params.grouping.notification_mode.summary.by: + if isinstance(attr, str): + summary_header.append("notification" if attr == "identifier" else attr) + elif isinstance(attr, dict): + keys = list(attr.keys()) + if len(keys) > 1: + raise ValueError( + "Invalid sink configuration: multiple values for one of the elements in" + "grouping.notification_mode.summary.by" + ) + key = keys[0] + if key not in ["labels", "annotations"]: + raise ValueError( + f"Sink configuration: grouping.notification_mode.summary.by.{key} is invalid " + "(only labels/annotations allowed)" + ) + for label_or_attr_name in attr[key]: + summary_header.append(f"{key[:-1]}:{label_or_attr_name}") + return summary_header + + def get_group_key_and_header(self, finding_data: Dict, attributes: List) -> Tuple[KeyT, List[str]]: + """Generates group key and descriptive header from finding data. + + For example, for finding_data = {"a":1, "b":2, "x": 3, "y": None} and attributes = ["x", "y"] + this method will return ((3, "(undefined)"), [])""" + values = () + descriptions = [] + for attr in attributes: + if isinstance(attr, str): + if attr not in finding_data: + continue + value = finding_data[attr] + values += (value, ) + descriptions.append( + f"{'notification' if attr=='identifier' else attr}: {self.display_value(value)}" + ) + elif isinstance(attr, dict): + # This is typically for labels and annotations + top_level_attr_name = list(attr.keys())[0] + values += tuple( + finding_data.get(top_level_attr_name, {}).get(element_name) + for element_name in sorted(attr[top_level_attr_name]) + ) + subvalues = [] + for subattr_name in sorted(attr[top_level_attr_name]): + subvalues.append((subattr_name, finding_data.get(top_level_attr_name, {}).get(subattr_name))) + subvalues_str = ", ".join(f"{key}={self.display_value(value)}" for key, value in sorted(subvalues)) + descriptions.append(f"{top_level_attr_name}: {subvalues_str}") + return values, descriptions + + def display_value(self, value: Optional[str]) -> str: + return value if value is not None else "(undefined)" + def _build_time_slices_from_params(self, params: ActivityParams): if params is None: return [TimeSliceAlways()] @@ -47,6 +168,7 @@ def accepts(self, finding: Finding) -> bool: and any(time_slice.is_active_now for time_slice in self.time_slices) ) + @abstractmethod def write_finding(self, finding: Finding, platform_enabled: bool): raise NotImplementedError(f"write_finding not implemented for sink {self.sink_name}") diff --git a/src/robusta/core/sinks/sink_base_params.py b/src/robusta/core/sinks/sink_base_params.py index 315503a07..9347ef74f 100644 --- a/src/robusta/core/sinks/sink_base_params.py +++ b/src/robusta/core/sinks/sink_base_params.py @@ -1,8 +1,10 @@ import logging import re +from abc import ABC, abstractmethod from typing import Dict, List, Optional, Union from pydantic import BaseModel, root_validator, validator +from pydantic.types import PositiveInt import pytz from robusta.core.playbooks.playbook_utils import replace_env_vars_values @@ -55,13 +57,57 @@ def check_intervals(cls, intervals: List[ActivityInterval]): return intervals -class SinkBaseParams(BaseModel): +class RegularNotificationModeParams(BaseModel): + # This is mandatory because using the regular mode without setting it + # would make no sense - all the notifications would just pass through + # normally. + ignore_first: PositiveInt + + +# a list of attribute names, which can be strings or dicts of the form +# {"labels": ["app", "label-xyz"]} etc. +GroupingAttributeSelectorListT = List[Union[str, Dict[str, List[str]]]] + + +class SummaryNotificationModeParams(BaseModel): + threaded: bool = True + by: GroupingAttributeSelectorListT = ["identifier"] + + +class NotificationModeParams(BaseModel): + regular: Optional[RegularNotificationModeParams] + summary: Optional[SummaryNotificationModeParams] + # TODO should we enforce that only one of these is set? + + @root_validator + def validate_exactly_one_defined(cls, values: Dict): + if values.get("regular") and values.get("summary"): + raise ValueError('"regular" and "summary" notification grouping modes must not be defined at the same time') + if not (values.get("regular") or values.get("summary")): + raise ValueError('either "regular" or "summary" notification grouping mode must be defined') + return values + + +class GroupingParams(BaseModel): + group_by: GroupingAttributeSelectorListT = ["cluster"] + interval: int = 15*60 # in seconds + notification_mode: Optional[NotificationModeParams] + + @root_validator + def validate_notification_mode(cls, values: Dict): + if values is None: + return {"summary": SummaryNotificationModeParams()} + return values + + +class SinkBaseParams(ABC, BaseModel): name: str send_svg: bool = False default: bool = True match: dict = {} scope: Optional[ScopeParams] activity: Optional[ActivityParams] + grouping: Optional[GroupingParams] stop: bool = False # Stop processing if this sink has been matched @root_validator @@ -77,6 +123,12 @@ def env_values_validation(cls, values: Dict): match["annotations"] = cls.__parse_dict_matchers(annotations) return updated + @root_validator + def validate_grouping(cls, values: Dict): + if values.get("grouping") and not cls._supports_grouping(): + logging.warning(f"Sinks of type {cls._get_sink_type()} do not support notification grouping") + return values + @classmethod def __parse_dict_matchers(cls, matchers) -> Union[Dict, List[Dict]]: if isinstance(matchers, List): @@ -95,3 +147,12 @@ def __selectors_dict(selectors: str) -> Dict: return {} result[kv[0].strip()] = kv[1].strip() return result + + @classmethod + def _supports_grouping(cls): + return False + + @classmethod + @abstractmethod + def _get_sink_type(cls): + raise NotImplementedError diff --git a/src/robusta/core/sinks/slack/slack_sink.py b/src/robusta/core/sinks/slack/slack_sink.py index e9676bc1a..a9aa8c623 100644 --- a/src/robusta/core/sinks/slack/slack_sink.py +++ b/src/robusta/core/sinks/slack/slack_sink.py @@ -1,15 +1,80 @@ -from robusta.core.reporting.base import Finding -from robusta.core.sinks.sink_base import SinkBase -from robusta.core.sinks.slack.slack_sink_params import SlackSinkConfigWrapper +from robusta.core.model.env_vars import ROBUSTA_UI_DOMAIN +from robusta.core.reporting.base import Finding, FindingStatus +from robusta.core.sinks.sink_base import NotificationGroup, NotificationSummary, SinkBase +from robusta.core.sinks.slack.slack_sink_params import SlackSinkConfigWrapper, SlackSinkParams from robusta.integrations import slack as slack_module class SlackSink(SinkBase): + params: SlackSinkParams + def __init__(self, sink_config: SlackSinkConfigWrapper, registry): super().__init__(sink_config.slack_sink, registry) self.slack_channel = sink_config.slack_sink.slack_channel self.api_key = sink_config.slack_sink.api_key - self.slack_sender = slack_module.SlackSender(self.api_key, self.account_id, self.cluster_name, self.signing_key) + self.slack_sender = slack_module.SlackSender( + self.api_key, self.account_id, self.cluster_name, self.signing_key, self.slack_channel + ) + + def write_finding(self, finding: Finding, platform_enabled: bool) -> None: + if self.grouping_enabled: + self.handle_notification_grouping(finding, platform_enabled) + else: + self.slack_sender.send_finding_to_slack(finding, self.params, platform_enabled) + + def handle_notification_grouping(self, finding: Finding, platform_enabled: bool) -> None: + # There is a lock over the whole of the method to account: + # 1) to prevent concurrent modifications to group accounting data structures + # 2) to make sure two threads with identical group_key don't create + # two identical messages (of which one would eventually be orphaned and + # the other one used as a thread header). + # TODO: this could probably be refined to a more granular locking strategy. + with self.finding_group_lock: + investigate_uri = self.get_timeline_uri(self.account_id, self.cluster_name) + finding_data = finding.attribute_map + # The top level entity name (the owner of the pod etc) + finding_data["workload"] = finding.service.name if finding.service else None + finding_data["cluster"] = self.cluster_name + resolved = finding.title.startswith("[RESOLVED]") + + # 1. Notification accounting + group_key, group_header = self.get_group_key_and_header( + finding_data, self.params.grouping.group_by + ) + + if self.grouping_summary_mode: + summary_key, _ = self.get_group_key_and_header( + finding_data, self.params.grouping.notification_mode.summary.by + ) + notification_summary = self.summaries[group_key] + notification_summary.register_notification( + summary_key, resolved, self.params.grouping.interval + ) + slack_thread_ts = self.slack_sender.send_or_update_summary_message( + group_header, + self.summary_header, + notification_summary.summary_table, + self.params, + platform_enabled, + summary_start=notification_summary.start_ts, + threaded=self.params.grouping.notification_mode.summary.threaded, + msg_ts=notification_summary.message_id, + investigate_uri=investigate_uri, + grouping_interval=self.params.grouping.interval, + ) + notification_summary.message_id = slack_thread_ts + should_send_notification = self.params.grouping.notification_mode.summary.threaded + else: + should_send_notification = self.groups[group_key].register_notification( + self.params.grouping.interval, + self.params.grouping.notification_mode.regular.ignore_first + ) + slack_thread_ts = None + if should_send_notification: + self.slack_sender.send_finding_to_slack( + finding, self.params, platform_enabled, thread_ts=slack_thread_ts + ) + - def write_finding(self, finding: Finding, platform_enabled: bool): - self.slack_sender.send_finding_to_slack(finding, self.params, platform_enabled) + def get_timeline_uri(self, account_id: str, cluster_name: str) -> str: + return f"{ROBUSTA_UI_DOMAIN}/graphs?account_id={account_id}&cluster={cluster_name}" diff --git a/src/robusta/core/sinks/slack/slack_sink_params.py b/src/robusta/core/sinks/slack/slack_sink_params.py index 24057f185..172500f5d 100644 --- a/src/robusta/core/sinks/slack/slack_sink_params.py +++ b/src/robusta/core/sinks/slack/slack_sink_params.py @@ -12,6 +12,14 @@ class SlackSinkParams(SinkBaseParams): channel_override: Optional[str] = None max_log_file_limit_kb: int = 1000 + @classmethod + def _supports_grouping(cls): + return True + + @classmethod + def _get_sink_type(cls): + return "slack" + @validator("channel_override") def validate_channel_override(cls, v: str): return ChannelTransformer.validate_channel_override(v) diff --git a/src/robusta/core/sinks/telegram/telegram_sink_params.py b/src/robusta/core/sinks/telegram/telegram_sink_params.py index 3b7964ec2..cf1d002b0 100644 --- a/src/robusta/core/sinks/telegram/telegram_sink_params.py +++ b/src/robusta/core/sinks/telegram/telegram_sink_params.py @@ -10,6 +10,10 @@ class TelegramSinkParams(SinkBaseParams): thread_id: int = None send_files: bool = True # Change to False, to omit file attachments + @classmethod + def _get_sink_type(cls): + return "telegram" + class TelegramSinkConfigWrapper(SinkConfigBase): telegram_sink: TelegramSinkParams diff --git a/src/robusta/core/sinks/victorops/victorops_sink_params.py b/src/robusta/core/sinks/victorops/victorops_sink_params.py index 5b1c9ad61..5a2e36f15 100644 --- a/src/robusta/core/sinks/victorops/victorops_sink_params.py +++ b/src/robusta/core/sinks/victorops/victorops_sink_params.py @@ -5,6 +5,10 @@ class VictoropsSinkParams(SinkBaseParams): url: str + @classmethod + def _get_sink_type(cls): + return "victorops" + class VictoropsConfigWrapper(SinkConfigBase): victorops_sink: VictoropsSinkParams diff --git a/src/robusta/core/sinks/webex/webex_sink_params.py b/src/robusta/core/sinks/webex/webex_sink_params.py index 7174e903a..39ea58144 100644 --- a/src/robusta/core/sinks/webex/webex_sink_params.py +++ b/src/robusta/core/sinks/webex/webex_sink_params.py @@ -6,6 +6,10 @@ class WebexSinkParams(SinkBaseParams): bot_access_token: str room_id: str + @classmethod + def _get_sink_type(cls): + return "webex" + class WebexSinkConfigWrapper(SinkConfigBase): webex_sink: WebexSinkParams diff --git a/src/robusta/core/sinks/webhook/webhook_sink_params.py b/src/robusta/core/sinks/webhook/webhook_sink_params.py index 7c29fc778..0b88bc643 100644 --- a/src/robusta/core/sinks/webhook/webhook_sink_params.py +++ b/src/robusta/core/sinks/webhook/webhook_sink_params.py @@ -10,6 +10,10 @@ class WebhookSinkParams(SinkBaseParams): authorization: SecretStr = None format: str = "text" + @classmethod + def _get_sink_type(cls): + return "webhook" + class WebhookSinkConfigWrapper(SinkConfigBase): webhook_sink: WebhookSinkParams diff --git a/src/robusta/core/sinks/yamessenger/yamessenger_sink_params.py b/src/robusta/core/sinks/yamessenger/yamessenger_sink_params.py index 73a6fbb9d..f0eb1bba6 100644 --- a/src/robusta/core/sinks/yamessenger/yamessenger_sink_params.py +++ b/src/robusta/core/sinks/yamessenger/yamessenger_sink_params.py @@ -14,6 +14,10 @@ class YaMessengerSinkParams(SinkBaseParams): mark_important: bool = False # Mark sent messages as important send_files: bool = True # Send files (logs, images) + @classmethod + def _get_sink_type(cls): + return "yamessenger" + @root_validator() def validate_fields(cls, fields): assert fields["chat_id"] is not None or fields["user_name"] is not None, "chat_id or user_name must be defined for the Yandex Messenger sink" diff --git a/src/robusta/integrations/slack/sender.py b/src/robusta/integrations/slack/sender.py index c84a0c1f4..3358cacc6 100644 --- a/src/robusta/integrations/slack/sender.py +++ b/src/robusta/integrations/slack/sender.py @@ -1,9 +1,14 @@ import logging import ssl import tempfile +import time +from datetime import datetime, timedelta +from itertools import chain from typing import Any, Dict, List, Set import certifi +import humanize +from dateutil import tz from slack_sdk import WebClient from slack_sdk.errors import SlackApiError @@ -27,6 +32,7 @@ from robusta.core.reporting.callbacks import ExternalActionRequestBuilder from robusta.core.reporting.consts import EnrichmentAnnotation, FindingSource, SlackAnnotations from robusta.core.reporting.utils import add_pngs_for_all_svgs +from robusta.core.sinks.sink_base import KeyT from robusta.core.sinks.slack.slack_sink_params import SlackSinkParams from robusta.core.sinks.transformer import Transformer from robusta.core.sinks.common import ChannelTransformer @@ -39,8 +45,9 @@ class SlackSender: verified_api_tokens: Set[str] = set() + channel_name_to_id = {} - def __init__(self, slack_token: str, account_id: str, cluster_name: str, signing_key: str): + def __init__(self, slack_token: str, account_id: str, cluster_name: str, signing_key: str, slack_channel: str): """ Connect to Slack and verify that the Slack token is valid. Return True on success, False on failure @@ -160,7 +167,9 @@ def __to_slack_table(self, block: TableBlock): return self.__to_slack_markdown(block.to_markdown()) - def __to_slack(self, block: BaseBlock, sink_name: str) -> List[SlackBlock]: + def __to_slack(self, block: BaseBlock, sink_name: str = None) -> List[SlackBlock]: + # sink_name can be omitted if we are sure that neither KubernetesDiffBlock nor + # CallbackBlock will be processed. if isinstance(block, MarkdownBlock): return self.__to_slack_markdown(block) elif isinstance(block, DividerBlock): @@ -183,13 +192,17 @@ def __to_slack(self, block: BaseBlock, sink_name: str) -> List[SlackBlock]: elif isinstance(block, ListBlock): return self.__to_slack_markdown(block.to_markdown()) elif isinstance(block, KubernetesDiffBlock): + assert sink_name is not None return self.__to_slack_diff(block, sink_name) elif isinstance(block, CallbackBlock): + assert sink_name is not None return self.__get_action_block_for_choices(sink_name, block.choices) elif isinstance(block, LinksBlock): return self.__to_slack_links(block.links) elif isinstance(block, ScanReportBlock): raise AssertionError("to_slack() should never be called on a ScanReportBlock") + elif isinstance(block, dict): + return [block] else: logging.warning(f"cannot convert block of type {type(block)} to slack format block: {block}") return [] # no reason to crash the entire report @@ -235,7 +248,8 @@ def __send_blocks_to_slack( unfurl: bool, status: FindingStatus, channel: str, - ): + thread_ts: str = None, + ) -> str: file_blocks = add_pngs_for_all_svgs([b for b in report_blocks if isinstance(b, FileBlock)]) if not sink_params.send_svg: file_blocks = [b for b in file_blocks if not b.filename.endswith(".svg")] @@ -266,17 +280,25 @@ def __send_blocks_to_slack( ) try: - self.slack_client.chat_postMessage( + if thread_ts: + kwargs = {"thread_ts": thread_ts} + else: + kwargs = {} + resp = self.slack_client.chat_postMessage( channel=channel, text=message, blocks=output_blocks, display_as_bot=True, - attachments=[{"color": status.to_color_hex(), "blocks": attachment_blocks}] - if attachment_blocks - else None, + attachments=( + [{"color": status.to_color_hex(), "blocks": attachment_blocks}] if attachment_blocks else None + ), unfurl_links=unfurl, unfurl_media=unfurl, + **kwargs ) + # We will need channel ids for future message updates + self.channel_name_to_id[channel] = resp["channel"] + return resp["ts"] except Exception as e: logging.error( f"error sending message to slack\ne={e}\ntext={message}\nchannel={channel}\nblocks={*output_blocks,}\nattachment_blocks={*attachment_blocks,}" @@ -329,7 +351,8 @@ def send_finding_to_slack( finding: Finding, sink_params: SlackSinkParams, platform_enabled: bool, - ): + thread_ts: str = None, + ) -> str: blocks: List[BaseBlock] = [] attachment_blocks: List[BaseBlock] = [] @@ -370,7 +393,7 @@ def send_finding_to_slack( if len(attachment_blocks): attachment_blocks.append(DividerBlock()) - self.__send_blocks_to_slack( + return self.__send_blocks_to_slack( blocks, attachment_blocks, finding.title, @@ -384,4 +407,122 @@ def send_finding_to_slack( finding.subject.labels, finding.subject.annotations, ), + thread_ts=thread_ts, + ) + + def send_or_update_summary_message( + self, + group_by_classification_header: List[str], + summary_header: List[str], + summary_table: Dict[KeyT, List[int]], + sink_params: SlackSinkParams, + platform_enabled: bool, + summary_start: float, # timestamp + threaded: bool, + msg_ts: str = None, # message identifier (for updates) + investigate_uri: str = None, + grouping_interval: int = None, # in seconds + ): + """Create or update a summary message with tabular information about the amount of events + fired/resolved and a header describing the event group that this information concerns.""" + now_ts = time.time() + + rows = [] + n_total_alerts = 0 + for key, value in sorted(summary_table.items()): + # key is a tuple of attribute names; value is a 2-element list with + # the number of firing and resolved notifications. + row = list(str(e) for e in chain(key, value)) + rows.append(row) + n_total_alerts += value[0] + value[1] # count firing and resolved notifications + + table_block = TableBlock(headers=summary_header + ["Fired", "Resolved"], rows=rows) + summary_start_utc_dt = datetime.fromtimestamp(summary_start).astimezone(tz.UTC) + formatted_summary_start = summary_start_utc_dt.strftime("%Y-%m-%d %H:%M UTC") + grouping_interval_str = humanize.precisedelta( + timedelta(seconds=grouping_interval), minimum_unit="seconds" + ) + time_text = ( + f"*Time interval:* `{grouping_interval_str}` starting at " + f"``" + ) + group_by_criteria_str = ", ".join(f"`{header}`" for header in group_by_classification_header) + + blocks = [ + MarkdownBlock(f"*Alerts Summary - {n_total_alerts} Notifications*"), + ] + + source_txt = f"*Source:* `{self.cluster_name}`" + if platform_enabled: + blocks.extend([ + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": source_txt, + }, + "accessory": { + "type": "button", + "text": { + "type": "plain_text", + "text": "Investigate 🔎", + }, + "url": investigate_uri, + }, + } + ]) + else: + blocks.append(MarkdownBlock(text=source_txt)) + + blocks.extend( + [ + MarkdownBlock(f"*Matching criteria*: {group_by_criteria_str}"), + MarkdownBlock(text=time_text), + table_block, + ] + ) + + if threaded: + blocks.append(MarkdownBlock(text="See thread for individual alerts")) + + output_blocks = [] + for block in blocks: + output_blocks.extend(self.__to_slack(block, sink_params.name)) + + # TODO for the purpose of summary messages, we assume the chanel is determined as if + # labels and annotations were empty, bypassing the elaborate logic in ChannelTransformer. + # Is this acceptable? I don't really see a way around this. + channel = ChannelTransformer.template( + sink_params.channel_override, + sink_params.slack_channel, + self.cluster_name, + {}, + {}, ) + if msg_ts is not None: + method = self.slack_client.chat_update + kwargs = {"ts": msg_ts} + # chat_update calls require channel ids (like "C123456") as opposed to channel names + # for chat_postMessage calls. + if channel not in self.channel_name_to_id: + logging.error( + f"Slack channel id for channel name {channel} could not be determined " + "from previous API calls, message update cannot be performed" + ) + return + channel = self.channel_name_to_id[channel] + else: + method = self.slack_client.chat_postMessage + kwargs = {} + + try: + resp = method( + channel=channel, + text="Summary for: " + ", ".join(group_by_classification_header), + blocks=output_blocks, + display_as_bot=True, + **kwargs, + ) + return resp["ts"] + except Exception as e: + logging.exception(f"error sending message to slack\n{e}\nchannel={channel}\n") diff --git a/tests/test_blocks.py b/tests/test_blocks.py index bd118e866..2cf90a20e 100644 --- a/tests/test_blocks.py +++ b/tests/test_blocks.py @@ -1,11 +1,6 @@ import json -import os -from datetime import datetime -import pytest -from hikaru import DiffDetail, DiffType from hikaru.model.rel_1_26 import HikaruDocumentBase, ObjectMeta, Pod -from prometrix import PrometheusQueryResult from robusta.api import ( # LinkProp,; LinksBlock, CallbackBlock, @@ -17,11 +12,8 @@ HeaderBlock, JsonBlock, KubernetesDiffBlock, - KubernetesFieldsBlock, ListBlock, MarkdownBlock, - PrometheusBlock, - ScanReportBlock, ScanReportRow, SlackSender, TableBlock, @@ -38,7 +30,9 @@ def test_send_to_slack(slack_channel: SlackChannel): - slack_sender = SlackSender(CONFIG.PYTEST_IN_CLUSTER_SLACK_TOKEN, TEST_ACCOUNT, TEST_CLUSTER, TEST_KEY) + slack_sender = SlackSender( + CONFIG.PYTEST_IN_CLUSTER_SLACK_TOKEN, TEST_ACCOUNT, TEST_CLUSTER, TEST_KEY, slack_channel.channel_name + ) msg = "Test123" finding = Finding(title=msg, aggregation_key=msg) finding.add_enrichment([MarkdownBlock("testing")]) @@ -65,8 +59,9 @@ def create_finding_with_all_blocks(): obj = Pod(metadata=ObjectMeta(name="theName", namespace="the-namespace")) obj2 = Pod(metadata=ObjectMeta(name="theName", namespace="the-namespace2")) diff_detail = obj.diff(obj2) - kubernetes_diff_block = KubernetesDiffBlock(diff_detail, obj, obj2, - "sample_kubernetes_diff_block", kind=obj.kind, namespace="default") + kubernetes_diff_block = KubernetesDiffBlock( + diff_detail, obj, obj2, "sample_kubernetes_diff_block", kind=obj.kind, namespace="default" + ) json_block = JsonBlock(json.dumps({"key": "value"})) @@ -97,16 +92,16 @@ def test_callback(event: ExecutionBaseEvent): content=[], priority=1.0, ) - scan_report_block = ScanReportBlock( - title="Test Report", - scan_id="1234", - type=ScanType.POPEYE, - start_time=datetime.now(), - end_time=datetime.now(), - score="1", - results=[scan_report_row], - config="sample_config", - ) + # scan_report_block = ScanReportBlock( + # title="Test Report", + # scan_id="1234", + # type=ScanType.POPEYE, + # start_time=datetime.now(), + # end_time=datetime.now(), + # score="1", + # results=[scan_report_row], + # config="sample_config", + # ) # Now that we have all the blocks, we add them to a finding finding = Finding(title="Sample Finding", aggregation_key="FooBar") # TODO: support default @@ -131,7 +126,9 @@ def test_callback(event: ExecutionBaseEvent): def test_all_block_types(slack_channel: SlackChannel): - slack_sender = SlackSender(CONFIG.PYTEST_IN_CLUSTER_SLACK_TOKEN, TEST_ACCOUNT, TEST_CLUSTER, TEST_KEY) + slack_sender = SlackSender( + CONFIG.PYTEST_IN_CLUSTER_SLACK_TOKEN, TEST_ACCOUNT, TEST_CLUSTER, TEST_KEY, slack_channel.channel_name + ) slack_params = SlackSinkParams(name="test_slack", slack_channel=slack_channel.channel_name, api_key="") finding = create_finding_with_all_blocks() result = slack_sender.send_finding_to_slack(finding, slack_params, False) diff --git a/tests/test_scope_matching.py b/tests/test_scope_matching.py index 1161d4070..34824669a 100644 --- a/tests/test_scope_matching.py +++ b/tests/test_scope_matching.py @@ -118,10 +118,21 @@ def test_scope_params_normalization(self, include_data, exclude_data, expected_i assert params.exclude == expected_exclude_data +class _TestSink(SinkBase): + def write_finding(self, finding: Finding, platform_enabled: bool): + pass + + +class _TestSinkParams(SinkBaseParams): + @classmethod + def _get_sink_type(cls): + return "test" + + class TestSinkBase: @pytest.mark.parametrize("matches_result,expected_result", [(True, True), (False, False)]) def test_accepts(self, matches_result, expected_result): - sink_base = SinkBase(sink_params=SinkBaseParams(name="x"), registry=Mock()) + sink_base = _TestSink(sink_params=_TestSinkParams(name="x"), registry=Mock()) finding = Finding(title="y", aggregation_key="Aaa") finding.matches = Mock(return_value=matches_result) # sink_base.time_slices is [TimeSliceAlways()] here, so the result will depend @@ -204,7 +215,7 @@ def test_sink_scopes(self, finding): test_config = _TestConfig(**yaml.safe_load(test_config_file)) for scope_test in test_config.tests: - sink_base = SinkBase(sink_params=SinkBaseParams(name="x", scope=scope_test.scope), registry=Mock()) + sink_base = _TestSink(sink_params=_TestSinkParams(name="x", scope=scope_test.scope), registry=Mock()) for check in scope_test.checks: finding = check.finding.create_finding() diff --git a/tests/test_slack.py b/tests/test_slack.py index 4ed907fe7..150004dac 100644 --- a/tests/test_slack.py +++ b/tests/test_slack.py @@ -1,7 +1,3 @@ -import os - -import pytest - from robusta.api import Finding, MarkdownBlock, SlackSender, TableBlock from robusta.core.sinks.slack.slack_sink_params import SlackSinkParams from tests.config import CONFIG @@ -13,7 +9,9 @@ def test_send_to_slack(slack_channel: SlackChannel): - slack_sender = SlackSender(CONFIG.PYTEST_IN_CLUSTER_SLACK_TOKEN, TEST_ACCOUNT, TEST_CLUSTER, TEST_KEY) + slack_sender = SlackSender( + CONFIG.PYTEST_IN_CLUSTER_SLACK_TOKEN, TEST_ACCOUNT, TEST_CLUSTER, TEST_KEY, slack_channel.channel_name + ) msg = "Test123" finding = Finding(title=msg, aggregation_key=msg) finding.add_enrichment([MarkdownBlock("testing")]) @@ -23,7 +21,9 @@ def test_send_to_slack(slack_channel: SlackChannel): def test_long_slack_messages(slack_channel: SlackChannel): - slack_sender = SlackSender(CONFIG.PYTEST_IN_CLUSTER_SLACK_TOKEN, TEST_ACCOUNT, TEST_CLUSTER, TEST_KEY) + slack_sender = SlackSender( + CONFIG.PYTEST_IN_CLUSTER_SLACK_TOKEN, TEST_ACCOUNT, TEST_CLUSTER, TEST_KEY, slack_channel.channel_name + ) finding = Finding(title="A" * 151, aggregation_key="A" * 151) finding.add_enrichment([MarkdownBlock("H" * 3001)]) slack_params = SlackSinkParams(name="test_slack", slack_channel=slack_channel.channel_name, api_key="") @@ -31,7 +31,9 @@ def test_long_slack_messages(slack_channel: SlackChannel): def test_long_table_columns(slack_channel: SlackChannel): - slack_sender = SlackSender(CONFIG.PYTEST_IN_CLUSTER_SLACK_TOKEN, TEST_ACCOUNT, TEST_CLUSTER, TEST_KEY) + slack_sender = SlackSender( + CONFIG.PYTEST_IN_CLUSTER_SLACK_TOKEN, TEST_ACCOUNT, TEST_CLUSTER, TEST_KEY, slack_channel.channel_name + ) finding = Finding(title="Testing table blocks", aggregation_key="TestingTableBlocks") finding.add_enrichment( [