From a47657a1944a21b6fe9ce262028b48324630722b Mon Sep 17 00:00:00 2001 From: JulienPeloton Date: Mon, 3 Feb 2025 13:58:10 +0100 Subject: [PATCH] Add new parameter to control Kaffka producer security protocol --- bin/ztf/distribute.py | 37 +++++++++++++------- chart/templates/spark-fink-distribution.yaml | 2 ++ chart/values.yaml | 1 + conf/ztf/fink.conf.distribution | 3 +- fink_broker/ztf/parser.py | 9 +++++ 5 files changed, 38 insertions(+), 14 deletions(-) diff --git a/bin/ztf/distribute.py b/bin/ztf/distribute.py index 75c7df92..f2e9a5e0 100644 --- a/bin/ztf/distribute.py +++ b/bin/ztf/distribute.py @@ -90,15 +90,7 @@ def push_to_kafka(df_in, topicname, cnames, checkpointpath_kafka, tinterval, kaf disquery = ( df_kafka.writeStream.format("kafka") - .option("kafka.bootstrap.servers", kafka_cfg["bootstrap.servers"]) - #.option("kafka.security.protocol", "PLAINTEXT") - .option("kafka.security.protocol", "SASL_PLAINTEXT") - .option("kafka.sasl.mechanism", "SCRAM-SHA-512") - # .option("kafka.sasl.username", kafka_cfg["username"]) - # .option("kafka.sasl.password", kafka_cfg["password"]) - # .option("kafka.buffer.memory", kafka_cfg["kafka_buf_mem"]) - # .option("kafka.delivery.timeout.ms", kafka_cfg["kafka_timeout_ms"]) - .option("kafka.auto.create.topics.enable", True) + .options(**kafka_cfg) .option("topic", topicname) .option("checkpointLocation", checkpointpath_kafka + "/" + topicname) .trigger(processingTime="{} seconds".format(tinterval)) @@ -191,11 +183,30 @@ def main(): kafka_cfg = { "bootstrap.servers": args.distribution_servers, - # "username": args.kafka_sasl_username, - # "password": args.kafka_sasl_password, - # "kafka_buf_mem": args.kafka_buffer_memory, - # "kafka_timeout_ms": args.kafka_delivery_timeout_ms, } + + if args.kafka_security_protocol == "SASL_PLAINTEXT": + # CI - k8s + kafka_cfg.setdefault("kafka.security.protocol", "SASL_PLAINTEXT") + kafka_cfg.setdefault("kafka.sasl.mechanism", "SCRAM-SHA-512") + elif args.kafka_security_protocol == "PLAINTEXT": + # CI - sentinel + kafka_cfg.setdefault("kafka.security.protocol", "PLAINTEXT") + elif args.kafka_security_protocol == "VD": + # VD + kafka_cfg.setdefault("kafka.sasl.username", args.kafka_sasl_username) + kafka_cfg.setdefault("kafka.sasl.password", args.kafka_sasl_password) + kafka_cfg.setdefault("kafka.buffer.memory", args.kafka_buffer_memory) + kafka_cfg.setdefault( + "kafka.delivery.timeout.ms", args.kafka_delivery_timeout_ms + ) + else: + msg = " Kafka producer security protocol {} is not known".format( + args.kafka_security_protocol + ) + logger.warn(msg) + spark.stop() + for userfilter in userfilters: if args.noscience: logger.debug( diff --git a/chart/templates/spark-fink-distribution.yaml b/chart/templates/spark-fink-distribution.yaml index 5e48fc68..448690ff 100644 --- a/chart/templates/spark-fink-distribution.yaml +++ b/chart/templates/spark-fink-distribution.yaml @@ -12,6 +12,8 @@ spec: - '{{ .Values.distribution.kafka.schema }}' - '-substream_prefix' - '{{ .Values.distribution.kafka.substream_prefix }}' + - '-kafka_security_protocol' + - '{{ .Values.distribution.kafka.kafka_security_protocol }}' - '-night' - '{{ .Values.night }}' sparkVersion: "3.4.1" diff --git a/chart/values.yaml b/chart/values.yaml index bda4e683..dda8ff45 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -61,6 +61,7 @@ distribution: out_sockets: "kafka-cluster-kafka-external-bootstrap.kafka:9094" schema: "/home/fink/fink-alert-schemas/ztf/distribution_schema_0p2.avsc" substream_prefix: "fink_" + kafka_security_protocol: "SASL_PLAINTEXT" storage: hdfs diff --git a/conf/ztf/fink.conf.distribution b/conf/ztf/fink.conf.distribution index a514f480..1f0016d2 100644 --- a/conf/ztf/fink.conf.distribution +++ b/conf/ztf/fink.conf.distribution @@ -77,7 +77,8 @@ FINK_PRODUCER_JAAS="${FINK_HOME}/conf/fink_kafka_producer_jaas.conf" # Fink test Consumer FINK_TEST_CONSUMER_JAAS="${FINK_HOME}/conf/fink_kafka_consumer_jaas.conf" +KAFKA_SECURITY_PROTOCOL=PLAINTEXT KAFKA_SASL_USERNAME=toto KAFKA_SASL_PASSWORD=tata KAFKA_BUFFER_MEMORY=134217728 -KAFKA_DELIVERY_TIMEOUT_MS=240000 \ No newline at end of file +KAFKA_DELIVERY_TIMEOUT_MS=240000 diff --git a/fink_broker/ztf/parser.py b/fink_broker/ztf/parser.py index 568d81b5..f2ec1e76 100644 --- a/fink_broker/ztf/parser.py +++ b/fink_broker/ztf/parser.py @@ -181,6 +181,15 @@ def getargs(parser: argparse.ArgumentParser) -> argparse.Namespace: [DISTRIBUTION_SCHEMA] """, ) + parser.add_argument( + "-kafka_security_protocol", + type=str, + default="", + help=""" + the security protocol for kafka producer + [KAFKA_SECURITY_PROTOCOL] + """, + ) parser.add_argument( "-kafka_sasl_username", type=str,