From 4b1e0fd5ba00fec4c8638f4874de4b1d1e09434e Mon Sep 17 00:00:00 2001 From: arik Date: Mon, 21 Jun 2021 15:21:09 +0300 Subject: [PATCH] kafka sink --- playbooks/deployment_babysitter.py | 25 +++++++++++-- playbooks/grafana_enrichment.py | 37 +++++++++++++++++++ src/poetry.lock | 17 ++++++++- src/pyproject.toml | 1 + src/robusta/api/__init__.py | 4 ++ .../integrations/sinks/kafka/__init__.py | 0 .../integrations/sinks/kafka/kafka_sink.py | 14 +++++++ .../sinks/kafka/kafka_sink_config.py | 6 +++ .../sinks/kafka/kafka_sink_manager.py | 23 ++++++++++++ src/robusta/integrations/sinks/sink_base.py | 5 +++ src/robusta/integrations/sinks/sink_config.py | 8 ++++ .../integrations/sinks/sink_factory.py | 15 ++++++++ 12 files changed, 151 insertions(+), 4 deletions(-) create mode 100644 src/robusta/integrations/sinks/kafka/__init__.py create mode 100644 src/robusta/integrations/sinks/kafka/kafka_sink.py create mode 100644 src/robusta/integrations/sinks/kafka/kafka_sink_config.py create mode 100644 src/robusta/integrations/sinks/kafka/kafka_sink_manager.py create mode 100644 src/robusta/integrations/sinks/sink_base.py create mode 100644 src/robusta/integrations/sinks/sink_config.py create mode 100644 src/robusta/integrations/sinks/sink_factory.py diff --git a/playbooks/deployment_babysitter.py b/playbooks/deployment_babysitter.py index 0ebaa5721..ef741a01f 100644 --- a/playbooks/deployment_babysitter.py +++ b/playbooks/deployment_babysitter.py @@ -10,13 +10,14 @@ class DeploymentBabysitterConfig(BaseModel): - slack_channel: str + slack_channel: str = "" fields_to_monitor: Tuple[str] = ( "status.readyReplicas", "message", "reason", "spec" ) + sinks: List[SinkConfigBase] = None # TODO: filter out all the managed fields crap @@ -40,6 +41,7 @@ def babysitter_get_blocks(diffs: List[DiffDetail]): @on_deployment_all_changes def deployment_babysitter(event: DeploymentEvent, config: DeploymentBabysitterConfig): """Track changes to a deployment and send the changes in slack.""" + filtered_diffs = None if event.operation == K8sOperationType.UPDATE: all_diffs = event.obj.diff(event.old_obj) filtered_diffs = list(filter(lambda x: babysitter_should_include_diff(x, config), all_diffs)) @@ -48,5 +50,22 @@ def deployment_babysitter(event: DeploymentEvent, config: DeploymentBabysitterCo event.report_attachment_blocks.extend(babysitter_get_blocks(filtered_diffs)) event.report_title = f"Deployment {event.obj.metadata.name} {event.operation.value}d in namespace {event.obj.metadata.namespace}" - event.slack_channel = config.slack_channel - send_to_slack(event) + if config.slack_channel: + event.slack_channel = config.slack_channel + send_to_slack(event) + + if config.sinks: + data = { + "deployment": event.obj.metadata.name, + "deployment_namespace": event.obj.metadata.namespace, + "message": "Deployment properties change", + "changed_properties": [{ + "property": ".".join(diff.path), + "old": diff.other_value, + "new": diff.value + } for diff in filtered_diffs] + } + for sink_config in config.sinks: + SinkFactory.get_sink(sink_config).write(data) + + diff --git a/playbooks/grafana_enrichment.py b/playbooks/grafana_enrichment.py index 6b09b2278..f6ee928bd 100644 --- a/playbooks/grafana_enrichment.py +++ b/playbooks/grafana_enrichment.py @@ -29,6 +29,43 @@ def add_deployment_lines_to_grafana(event: DeploymentEvent, action_params: Param grafana.add_line_to_dashboard(action_params.grafana_dashboard_uid, msg, tags=[event.obj.metadata.name]) +class ImageChangesParams(BaseModel): + sinks: List[SinkConfigBase] + +@on_deployment_update +def report_image_changes(event: DeploymentEvent, action_params: ImageChangesParams): + """ + Report image changed whenever a new application version is deployed so that you can easily see changes. + """ + new_images = event.obj.get_images() + old_images = event.old_obj.get_images() + if new_images == old_images: + return + + msg = "" + changed_properties = [] + if new_images.keys() != old_images.keys(): + msg = f"number or names of images changed: new - {new_images} old - {old_images}" + else: + for name in new_images: + if new_images[name] != old_images[name]: + msg += f"image name: {name} new tag: {new_images[name]} old tag {old_images[name]}" + changed_properties.append({ + "property": "image", + "old": f"{name}:{old_images[name]}", + "new": f"{name}:{new_images[name]}" + }) + + data = { + "deployment": event.obj.metadata.name, + "deployment_namespace": event.obj.metadata.namespace, + "message": msg, + "changed_properties": changed_properties + } + for sink_config in action_params.sinks: + SinkFactory.get_sink(sink_config).write(data) + + @on_pod_create def test_pod_orm(event : PodEvent): logging.info('running test_pod_orm') diff --git a/src/poetry.lock b/src/poetry.lock index 03434292a..9ed74f6b8 100644 --- a/src/poetry.lock +++ b/src/poetry.lock @@ -310,6 +310,17 @@ MarkupSafe = ">=0.23" [package.extras] i18n = ["Babel (>=0.8)"] +[[package]] +name = "kafka-python" +version = "2.0.2" +description = "Pure Python client for Apache Kafka" +category = "main" +optional = false +python-versions = "*" + +[package.extras] +crc32c = ["crc32c"] + [[package]] name = "kubernetes" version = "12.0.1" @@ -688,7 +699,7 @@ watchdog = ["watchdog"] [metadata] lock-version = "1.1" python-versions = "^3.6.1" -content-hash = "fecd178e958fa000217a13fc97ba68fd7f895826fc0f9b48b5b5a343131326d7" +content-hash = "3174b7f91ff015fb668cc902d63c7111feac0056cfb21980c2795c7fa0b00c43" [metadata.files] appdirs = [ @@ -849,6 +860,10 @@ jinja2 = [ {file = "Jinja2-2.11.3-py2.py3-none-any.whl", hash = "sha256:03e47ad063331dd6a3f04a43eddca8a966a26ba0c5b7207a9a9e4e08f1b29419"}, {file = "Jinja2-2.11.3.tar.gz", hash = "sha256:a6d58433de0ae800347cab1fa3043cebbabe8baa9d29e668f1c768cb87a333c6"}, ] +kafka-python = [ + {file = "kafka-python-2.0.2.tar.gz", hash = "sha256:04dfe7fea2b63726cd6f3e79a2d86e709d608d74406638c5da33a01d45a9d7e3"}, + {file = "kafka_python-2.0.2-py2.py3-none-any.whl", hash = "sha256:2d92418c7cb1c298fa6c7f0fb3519b520d0d7526ac6cb7ae2a4fc65a51a94b6e"}, +] kubernetes = [ {file = "kubernetes-12.0.1-py2.py3-none-any.whl", hash = "sha256:23c85d8571df8f56e773f1a413bc081537536dc47e2b5e8dc2e6262edb2c57ca"}, {file = "kubernetes-12.0.1.tar.gz", hash = "sha256:ec52ea01d52e2ec3da255992f7e859f3a76f2bdb51cf65ba8cd71dfc309d8daa"}, diff --git a/src/pyproject.toml b/src/pyproject.toml index 103c97630..d178ac643 100644 --- a/src/pyproject.toml +++ b/src/pyproject.toml @@ -27,6 +27,7 @@ dulwich = "^0.20.23" better-exceptions = "^0.3.3" CairoSVG = "^2.5.2" tabulate = "^0.8.9" +kafka-python = "^2.0.2" [tool.poetry.dev-dependencies] hikaru = {git = "https://github.com/aantn/hikaru.git", rev = "fix_datetimes"} diff --git a/src/robusta/api/__init__.py b/src/robusta/api/__init__.py index 152856ebe..84abd5d8d 100644 --- a/src/robusta/api/__init__.py +++ b/src/robusta/api/__init__.py @@ -14,6 +14,10 @@ from ..integrations.manual.triggers import * from ..integrations.scheduled.triggers import * from ..integrations.git.git_repo_manager import * +from ..integrations.sinks.sink_base import * +from ..integrations.sinks.sink_config import * +from ..integrations.sinks.sink_factory import * +from ..integrations.sinks.kafka import * from ..core.persistency.in_memory import get_persistent_data from ..utils.rate_limiter import RateLimiter from ..runner.object_updater import * diff --git a/src/robusta/integrations/sinks/kafka/__init__.py b/src/robusta/integrations/sinks/kafka/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/robusta/integrations/sinks/kafka/kafka_sink.py b/src/robusta/integrations/sinks/kafka/kafka_sink.py new file mode 100644 index 000000000..1a97ddc43 --- /dev/null +++ b/src/robusta/integrations/sinks/kafka/kafka_sink.py @@ -0,0 +1,14 @@ +import json + +from ..sink_base import SinkBase +from kafka import KafkaProducer + + +class KafkaSink(SinkBase): + + def __init__(self, producer: KafkaProducer, topic: str): + self.producer = producer + self.topic = topic + + def write(self, data: dict): + self.producer.send(self.topic, value=json.dumps(data).encode("utf-8")) \ No newline at end of file diff --git a/src/robusta/integrations/sinks/kafka/kafka_sink_config.py b/src/robusta/integrations/sinks/kafka/kafka_sink_config.py new file mode 100644 index 000000000..3b8900338 --- /dev/null +++ b/src/robusta/integrations/sinks/kafka/kafka_sink_config.py @@ -0,0 +1,6 @@ +from pydantic import BaseModel + + +class KafkaSinkConfig(BaseModel): + kafka_url: str + topic: str \ No newline at end of file diff --git a/src/robusta/integrations/sinks/kafka/kafka_sink_manager.py b/src/robusta/integrations/sinks/kafka/kafka_sink_manager.py new file mode 100644 index 000000000..d14425a24 --- /dev/null +++ b/src/robusta/integrations/sinks/kafka/kafka_sink_manager.py @@ -0,0 +1,23 @@ +import threading +from collections import defaultdict + +from kafka import KafkaProducer + +from .kafka_sink import KafkaSink +from ..sink_base import SinkBase + + +class KafkaSinkManager: + + manager_lock = threading.Lock() + producers_map = defaultdict(None) + + @staticmethod + def get_kafka_sink(kafka_url : str, topic: str) -> SinkBase: + with KafkaSinkManager.manager_lock: + producer = KafkaSinkManager.producers_map.get(kafka_url) + if producer is not None: + return KafkaSink(producer, topic) + producer = KafkaProducer(bootstrap_servers=kafka_url) + KafkaSinkManager.producers_map[kafka_url] = producer + return KafkaSink(producer, topic) diff --git a/src/robusta/integrations/sinks/sink_base.py b/src/robusta/integrations/sinks/sink_base.py new file mode 100644 index 000000000..587d89880 --- /dev/null +++ b/src/robusta/integrations/sinks/sink_base.py @@ -0,0 +1,5 @@ + +class SinkBase: + + def write(self, data: dict): + pass \ No newline at end of file diff --git a/src/robusta/integrations/sinks/sink_config.py b/src/robusta/integrations/sinks/sink_config.py new file mode 100644 index 000000000..41508316b --- /dev/null +++ b/src/robusta/integrations/sinks/sink_config.py @@ -0,0 +1,8 @@ +from typing import Dict + +from pydantic import BaseModel + + +class SinkConfigBase(BaseModel): + sink_type: str + params: Dict \ No newline at end of file diff --git a/src/robusta/integrations/sinks/sink_factory.py b/src/robusta/integrations/sinks/sink_factory.py new file mode 100644 index 000000000..c4093ec4a --- /dev/null +++ b/src/robusta/integrations/sinks/sink_factory.py @@ -0,0 +1,15 @@ +from .kafka.kafka_sink_config import KafkaSinkConfig +from .kafka.kafka_sink_manager import KafkaSinkManager +from .sink_config import SinkConfigBase +from .sink_base import SinkBase + + +class SinkFactory: + + @staticmethod + def get_sink(sink_config: SinkConfigBase) -> SinkBase: + if sink_config.sink_type == "kafka": + kafka_sink_config = KafkaSinkConfig(**sink_config.params) + return KafkaSinkManager.get_kafka_sink(kafka_sink_config.kafka_url, kafka_sink_config.topic) + + raise Exception(f"Sink not supported {sink_config.sink_type}") \ No newline at end of file