From 23d5e5403bf94a0037b5eb57ad0b360667154759 Mon Sep 17 00:00:00 2001
From: Parsa Yousefi
Date: Sun, 30 Jun 2024 01:11:43 +0330
Subject: [PATCH] output/kafka: support other compression types (#104)
---
README.md | 3 +++
config-sample.ini | 3 +++
docs/content/en/docs/Outputs/kafka.md | 3 +++
internal/output/kafka.go | 16 +++++++++++++++-
4 files changed, 24 insertions(+), 1 deletion(-)
diff --git a/README.md b/README.md
index f7cf3b3d..761e81b5 100644
--- a/README.md
+++ b/README.md
@@ -449,6 +449,9 @@ Note that command line arguments are case-insensitive as of v0.9.5
# Compress Kafka connection
--kafkacompress
+# Compression Type[gzip, snappy, lz4, zstd] default is snappy
+--kafkacompressiontype=snappy
+
# Use TLS for kafka connection
--kafkasecure
diff --git a/config-sample.ini b/config-sample.ini
index d508b1e0..1714ba87 100644
--- a/config-sample.ini
+++ b/config-sample.ini
@@ -206,6 +206,9 @@ kafkabatchdelay = 1s
; Compress Kafka connection
kafkacompress = false
+; Compression Type[gzip, snappy, lz4, zstd] default is snappy
+kafkacompressiontype = snappy
+
; Use TLS for kafka connection
kafkasecure = false
diff --git a/docs/content/en/docs/Outputs/kafka.md b/docs/content/en/docs/Outputs/kafka.md
index 35d11991..e94fb181 100644
--- a/docs/content/en/docs/Outputs/kafka.md
+++ b/docs/content/en/docs/Outputs/kafka.md
@@ -35,6 +35,9 @@ KafkaBatchDelay = 1s
; Compress Kafka connection
KafkaCompress = false
+; Compression Type[gzip, snappy, lz4, zstd] default is snappy
+KafkaCompressiontype = snappy
+
; Use TLS for kafka connection
KafkaSecure = false
diff --git a/internal/output/kafka.go b/internal/output/kafka.go
index 81d50821..16442e6a 100644
--- a/internal/output/kafka.go
+++ b/internal/output/kafka.go
@@ -44,6 +44,7 @@ type kafkaConfig struct {
KafkaTimeout uint `long:"kafkatimeout" ini-name:"kafkatimeout" env:"DNSMONSTER_KAFKATIMEOUT" default:"3" description:"Kafka connection timeout in seconds"`
KafkaBatchDelay time.Duration `long:"kafkabatchdelay" ini-name:"kafkabatchdelay" env:"DNSMONSTER_KAFKABATCHDELAY" default:"1s" description:"Interval between sending results to Kafka if Batch size is not filled"`
KafkaCompress bool `long:"kafkacompress" ini-name:"kafkacompress" env:"DNSMONSTER_KAFKACOMPRESS" description:"Compress Kafka connection"`
+ KafkaCompressionType string `long:"kafkacompressiontype" ini-name:"kafkacompressiontype" env:"DNSMONSTER_KAFKACOMPRESSIONTYPE" default:"snappy" description:"Compression Type Kafka connection [snappy gzip lz4 zstd]; default(snappy)." choice:"snappy" choice:"gzip" choice:"lz4" choice:"zstd"`
KafkaSecure bool `long:"kafkasecure" ini-name:"kafkasecure" env:"DNSMONSTER_KAFKASECURE" description:"Use TLS for kafka connection"`
KafkaCACertificatePath string `long:"kafkacacertificatepath" ini-name:"kafkacacertificatepath" env:"DNSMONSTER_KAFKACACERTIFICATEPATH" default:"" description:"Path of CA certificate that signs Kafka broker certificate"`
KafkaTLSCertificatePath string `long:"kafkatlscertificatepath" ini-name:"kafkatlscertificatepath" env:"DNSMONSTER_KAFKATLSCERTIFICATEPATH" default:"" description:"Path of TLS certificate to present to broker"`
@@ -148,7 +149,20 @@ func (kafConfig kafkaConfig) getWriter() *kafka.Writer {
}
if kafConfig.KafkaCompress {
- kWriter.Compression = kafka.Snappy
+ switch kafConfig.KafkaCompressionType {
+ case "gzip":
+ kWriter.Compression = kafka.Gzip
+ log.Info("Kafka using compression: gzip")
+ case "snappy":
+ kWriter.Compression = kafka.Snappy
+ log.Info("Kafka using compression: snappy")
+ case "lz4":
+ kWriter.Compression = kafka.Lz4
+ log.Info("Kafka using compression: lz4")
+ case "zstd":
+ kWriter.Compression = kafka.Zstd
+ log.Info("Kafka using compression: zstd")
+ }
}
return kWriter