From 3daa20715cd6118397bd2372257e7c504d95010c Mon Sep 17 00:00:00 2001 From: Julien Riou Date: Wed, 4 Sep 2024 17:27:04 +0200 Subject: [PATCH] feat: Add check_clickhouse Signed-off-by: Julien Riou --- bin/check_clickhouse | 406 +++++++++++++++++++++++++++++++++++++++ docs/check_clickhouse.md | 77 ++++++++ 2 files changed, 483 insertions(+) create mode 100755 bin/check_clickhouse create mode 100644 docs/check_clickhouse.md diff --git a/bin/check_clickhouse b/bin/check_clickhouse new file mode 100755 index 0000000..f08f5b6 --- /dev/null +++ b/bin/check_clickhouse @@ -0,0 +1,406 @@ +#!/usr/bin/env python3 + +import argparse +import time + +import nagiosplugin +import nagiosplugin.state + +try: + from clickhouse_driver import Client + + HAS_CLICKHOUSE_DRIVER = True +except ModuleNotFoundError: + HAS_CLICKHOUSE_DRIVER = False + + +# Default variables +# https://clickhouse.com/docs/en/guides/sre/network-ports +CH_PORT = 9000 +CH_PORT_TLS = 9440 +SLEEP_TIME = 1 + + +class BooleanContext(nagiosplugin.Context): + """ + Check if a boolean has the expected value (default True) + Return nagios state (default CRITICAL) + """ + + def __init__(self, name, warning=False, expected=True): + super().__init__(name) + self.expected = expected + self.warning = warning + + def evaluate(self, metric, resource): + if metric.value is self.expected: + state = nagiosplugin.state.Ok + elif self.warning: + state = nagiosplugin.state.Warn + else: + state = nagiosplugin.state.Critical + return self.result_cls(state, metric.name, metric) + + +class ScalarContext(nagiosplugin.ScalarContext): + """Handle thresholds with zero value""" + + def __init__( + self, + name, + warning=None, + critical=None, + fmt_metric="{name} is {valueunit}", + result_cls=nagiosplugin.Result, + ): + super(nagiosplugin.ScalarContext, self).__init__(name, fmt_metric, result_cls) + self.warning = nagiosplugin.Range(self.parse_zero_scalar(warning)) + self.critical = nagiosplugin.Range(self.parse_zero_scalar(critical)) + + def parse_zero_scalar(self, threshold): + return "~:0" if threshold == 0 else threshold + + +class InvertedScalarContext(nagiosplugin.ScalarContext): + """Match against an inverted range (for uptime for example)""" + + def __init__( + self, + name, + warning=None, + critical=None, + fmt_metric="{name} is {valueunit}", + result_cls=nagiosplugin.Result, + ): + super(nagiosplugin.ScalarContext, self).__init__(name, fmt_metric, result_cls) + self.warning = nagiosplugin.Range(f"@{warning}") + self.critical = nagiosplugin.Range(f"@{critical}") + + +class ExtendedSummary(nagiosplugin.Summary): + """Display results with more information than the default Summary""" + + def problem(self, results): + return ", ".join(sorted(str(r) for r in results if not r.metric.value)) + + def ok(self, results): + return ", ".join(sorted(str(r) for r in results)) + + +class Clickhouse(nagiosplugin.Resource): + """Nagios resource used to gather metrics from ClickHouse instance""" + + def __init__(self, command, host, port, secure): + self.command = command + self.secure = secure + self.client = Client(host=host, port=port, secure=secure) + + def query(self, query, default=None): + """Query and return one value or the default value""" + result = self.client.execute(query) + if not result: + return default + return result[0][0] + + def probe(self): + """Gather all metrics""" + if hasattr(self, f"check_{self.command}") and callable( + func := getattr(self, f"check_{self.command}") + ): + return func() + + def check_connections(self): + connections = self.query( + "select value from system.metrics where name = 'TCPConnection'" + ) + max_connections = self.query( + "select value from system.settings where name = 'max_connections'", 1024 + ) + percentage = round(connections * 100 / max_connections, 2) + return [ + nagiosplugin.Metric( + "connections", percentage, uom="%", context="connections" + ) + ] + + def check_cluster_connectivity(self): + metrics = [] + members = self.client.execute("select host_address, port from system.clusters") + for host, port in members: + value = f"{host}:{port}" + try: + client = Client(host=host, port=port, secure=self.secure) + client.execute("select 1") + result = True + except Exception as err: + result = False + value += f" ({str(err)})" + metrics.append( + nagiosplugin.Metric(value, result, context="cluster_connectivity") + ) + return metrics + + def check_detached_parts(self): + parts = self.query("select count() from system.detached_parts") + return [nagiosplugin.Metric("detached parts", parts, context="detached_parts")] + + def check_delayed_inserts(self): + inserts = self.query( + "select value from system.metrics where metric = 'DelayedInserts'" + ) + return [ + nagiosplugin.Metric("delayed inserts", inserts, context="delayed_inserts") + ] + + def check_keeper_health(self): + try: + self.client.execute( + "select count(*) from system.zookeeper where path = '/'" + ) + value = "keeper_health" + result = True + except Exception as err: + value = str(err) + result = False + return [nagiosplugin.Metric(value, result, context="keeper_health")] + + def check_max_part_count_for_partition(self): + query = """ + select value + from system.asynchronous_metrics + where metric = 'MaxPartCountForPartition' + """ + max_count = self.query(query) + return [ + nagiosplugin.Metric( + "MaxPartCountForPartition", + max_count, + context="max_part_count_for_partition", + ) + ] + + def check_pending_files_rate(self): + query = ( + "select value from system.metrics where metric = 'DistributedFilesToInsert'" + ) + before = self.query(query) + time.sleep(SLEEP_TIME) + after = self.query(query) + return [ + nagiosplugin.Metric( + "pending files rate", after - before, context="pending_files_rate" + ) + ] + + def check_queries(self): + queries = self.query("select value from system.metrics where metric = 'Query'") + return [nagiosplugin.Metric("queries", queries, context="queries")] + + def check_replication_queue(self): + query = """ + select count() + from system.replication_queue + where num_tries > 100 or num_postponed > 1000 + """ + queue = self.query(query) + return [ + nagiosplugin.Metric("replication queue", queue, context="replication_queue") + ] + + def check_uptime(self): + uptime = self.query("select uptime()") + return [nagiosplugin.Metric("uptime", uptime, uom="s", context="uptime")] + + +def parse_args(): + parser = argparse.ArgumentParser() + + parser.add_argument( + "-H", + "--host", + type=str, + dest="host", + default="127.0.0.1", + help="Hostname or IP address used to connect to the ClickHouse instance", + ) + parser.add_argument( + "-P", + "--port", + type=int, + dest="port", + help="Port used to connect to the ClickHouse instance", + required=False, + ) + parser.add_argument( + "--secure", + dest="secure", + action="store_true", + help="Enable TLS when connecting to the ClickHouse instance", + ) + + subparsers = parser.add_subparsers(dest="command") + + connections = subparsers.add_parser( + "connections", help="Check the percentage of the number of connections" + ) + connections.add_argument( + "-w", "--warning", type=int, default=70, help="Warning threshold" + ) + connections.add_argument( + "-c", "--critical", type=int, default=90, help="Critical threshold" + ) + + cluster_connectivity = subparsers.add_parser( + "cluster_connectivity", help="Check if all nodes of the cluster are reachable" + ) + cluster_connectivity.add_argument( + "-w", + "--warning", + action="store_true", + help="Return warning instead of critical", + ) + + detached_parts = subparsers.add_parser( + "detached_parts", help="Check the number of detached parts" + ) + detached_parts.add_argument( + "-w", "--warning", type=int, default=0, help="Warning threshold" + ) + detached_parts.add_argument( + "-c", "--critical", type=int, default=1000, help="Critical threshold" + ) + + delayed_inserts = subparsers.add_parser( + "delayed_inserts", help="Check the number of delayed inserts" + ) + delayed_inserts.add_argument( + "-w", "--warning", type=int, default=0, help="Warning threshold" + ) + delayed_inserts.add_argument( + "-c", "--critical", type=int, default=1000, help="Critical threshold" + ) + + keeper_health = subparsers.add_parser( + "keeper_health", help="Check if the keeper is healthy" + ) + keeper_health.add_argument( + "-w", + "--warning", + action="store_true", + help="Return warning instead of critical", + ) + + max_part_count_for_partition = subparsers.add_parser( + "max_part_count_for_partition", + help=( + "Check the maximum number of parts per partition across all partitions" + "of all tables of MergeTree family" + ), + ) + max_part_count_for_partition.add_argument( + "-w", "--warning", type=int, default=100, help="Warning threshold" + ) + max_part_count_for_partition.add_argument( + "-c", "--critical", type=int, default=300, help="Critical threshold" + ) + + pending_files_rate = subparsers.add_parser( + "pending_files_rate", + help=( + "Check the rate of the number of pending files to process for asynchronous insertion" + "into Distributed tables" + ), + ) + pending_files_rate.add_argument( + "-w", "--warning", type=int, default=100, help="Warning threshold" + ) + pending_files_rate.add_argument( + "-c", "--critical", type=int, default=300, help="Critical threshold" + ) + + queries = subparsers.add_parser( + "queries", help="Check the number of current queries" + ) + queries.add_argument( + "-w", "--warning", type=int, default=100, help="Warning threshold" + ) + queries.add_argument( + "-c", "--critical", type=int, default=1000, help="Critical threshold" + ) + + replication_queue = subparsers.add_parser( + "replication_queue", help="Check the number of tasks in the replication queue" + ) + replication_queue.add_argument( + "-w", "--warning", type=int, default=0, help="Warning threshold" + ) + replication_queue.add_argument( + "-c", "--critical", type=int, default=1000, help="Critical threshold" + ) + + uptime = subparsers.add_parser( + "uptime", help="Check the number of seconds since the service has been started" + ) + uptime.add_argument( + "-w", "--warning", type=int, default=300, help="Warning threshold" + ) + uptime.add_argument( + "-c", "--critical", type=int, default=60, help="Critical threshold" + ) + + return parser.parse_args() + + +@nagiosplugin.guarded +def main(): + if not HAS_CLICKHOUSE_DRIVER: + raise nagiosplugin.CheckError("clickhouse-driver is required") + + args = parse_args() + + if args.command == "connections": + context = ScalarContext("connections", args.warning, args.critical) + elif args.command == "cluster_connectivity": + context = BooleanContext("cluster_connectivity", warning=args.warning) + elif args.command == "detached_parts": + context = ScalarContext("detached_parts", args.warning, args.critical) + elif args.command == "delayed_inserts": + context = ScalarContext("delayed_inserts", args.warning, args.critical) + elif args.command == "keeper_health": + context = BooleanContext("keeper_health", warning=args.warning) + elif args.command == "max_part_count_for_partition": + context = ScalarContext( + "max_part_count_for_partition", args.warning, args.critical + ) + elif args.command == "pending_files_rate": + context = ScalarContext("pending_files_rate", args.warning, args.critical) + elif args.command == "queries": + context = ScalarContext("queries", args.warning, args.critical) + elif args.command == "replication_queue": + context = ScalarContext("replication_queue", args.warning, args.critical) + elif args.command == "uptime": + context = InvertedScalarContext("uptime", args.warning, args.critical) + else: + raise nagiosplugin.CheckError("Invalid command") + + port = args.port + if not port: + port = CH_PORT + if args.secure: + port = CH_PORT_TLS + + resource = Clickhouse( + command=args.command, + host=args.host, + port=port, + secure=args.secure, + ) + + summary = ExtendedSummary() + + check = nagiosplugin.Check(resource, context, summary) + check.main() + + +if __name__ == "__main__": + main() diff --git a/docs/check_clickhouse.md b/docs/check_clickhouse.md new file mode 100644 index 0000000..1c98fa4 --- /dev/null +++ b/docs/check_clickhouse.md @@ -0,0 +1,77 @@ +# check_clickhouse + +## cluster_connectivity + +Check if the cluster members are reachable via the SQL interface. + +``` +command[check_clickhouse_cluster_connectivity]=/opt/ovh-nagios-plugins/bin/check_clickhouse --secure cluster_connectivity +``` + +## connections + +Check the percentage of used connections. + +``` +command[check_clickhouse_connections]=/opt/ovh-nagios-plugins/bin/check_clickhouse --secure connections +``` + +## delayed_inserts + +Check if insertions are being delayed. + +``` +command[check_clickhouse_delayed_inserts]=/opt/ovh-nagios-plugins/bin/check_clickhouse --secure delayed_inserts +``` + +## detached_parts + +Check if parts are detached. + +``` +command[check_clickhouse_detached_parts]=/opt/ovh-nagios-plugins/bin/check_clickhouse --secure detached_parts +``` + +## keeper_health + +Check if the Keeper cluster has been configured and is healthy. + +``` +command[check_clickhouse_keeper_health]=/opt/ovh-nagios-plugins/bin/check_clickhouse --secure keeper_health +``` + +## max_part_count_for_partition + +Check the maximum number of parts per partition across all partitions of all +tables of MergeTree family. Values larger than 300 indicates misconfiguration, +overload, or massive data loading (according to the +[documentation](https://clickhouse.com/docs/en/operations/system-tables/asynchronous_metrics)). + +``` +command[check_clickhouse_max_part_count_for_partition]=/opt/ovh-nagios-plugins/bin/check_clickhouse --secure max_part_count_for_partition +``` + + +## pending_files_rate + +Check if the number of distributed files in the queue is increasing over time. + +``` +command[check_clickhouse_pending_files_rate]=/opt/ovh-nagios-plugins/bin/check_clickhouse --secure pending_files_rate +``` + +## queries + +Check the number of running queries. + +``` +command[check_clickhouse_queries]=/opt/ovh-nagios-plugins/bin/check_clickhouse --secure queries +``` + +## replication_queue + +Check if replication tasks are stuck. + +``` +command[check_clickhouse_replication_queue]=/opt/ovh-nagios-plugins/bin/check_clickhouse --secure replication_queue +```