Skip to content

Commit

Permalink
Add new parameter to control Kaffka producer security protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
JulienPeloton committed Feb 3, 2025
1 parent 1003928 commit a47657a
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 14 deletions.
37 changes: 24 additions & 13 deletions bin/ztf/distribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,7 @@ def push_to_kafka(df_in, topicname, cnames, checkpointpath_kafka, tinterval, kaf

disquery = (
df_kafka.writeStream.format("kafka")
.option("kafka.bootstrap.servers", kafka_cfg["bootstrap.servers"])
#.option("kafka.security.protocol", "PLAINTEXT")
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
# .option("kafka.sasl.username", kafka_cfg["username"])
# .option("kafka.sasl.password", kafka_cfg["password"])
# .option("kafka.buffer.memory", kafka_cfg["kafka_buf_mem"])
# .option("kafka.delivery.timeout.ms", kafka_cfg["kafka_timeout_ms"])
.option("kafka.auto.create.topics.enable", True)
.options(**kafka_cfg)
.option("topic", topicname)
.option("checkpointLocation", checkpointpath_kafka + "/" + topicname)
.trigger(processingTime="{} seconds".format(tinterval))
Expand Down Expand Up @@ -191,11 +183,30 @@ def main():

kafka_cfg = {
"bootstrap.servers": args.distribution_servers,
# "username": args.kafka_sasl_username,
# "password": args.kafka_sasl_password,
# "kafka_buf_mem": args.kafka_buffer_memory,
# "kafka_timeout_ms": args.kafka_delivery_timeout_ms,
}

if args.kafka_security_protocol == "SASL_PLAINTEXT":
# CI - k8s
kafka_cfg.setdefault("kafka.security.protocol", "SASL_PLAINTEXT")
kafka_cfg.setdefault("kafka.sasl.mechanism", "SCRAM-SHA-512")
elif args.kafka_security_protocol == "PLAINTEXT":
# CI - sentinel
kafka_cfg.setdefault("kafka.security.protocol", "PLAINTEXT")
elif args.kafka_security_protocol == "VD":
# VD
kafka_cfg.setdefault("kafka.sasl.username", args.kafka_sasl_username)
kafka_cfg.setdefault("kafka.sasl.password", args.kafka_sasl_password)
kafka_cfg.setdefault("kafka.buffer.memory", args.kafka_buffer_memory)
kafka_cfg.setdefault(
"kafka.delivery.timeout.ms", args.kafka_delivery_timeout_ms
)
else:
msg = " Kafka producer security protocol {} is not known".format(
args.kafka_security_protocol
)
logger.warn(msg)
spark.stop()

for userfilter in userfilters:
if args.noscience:
logger.debug(
Expand Down
2 changes: 2 additions & 0 deletions chart/templates/spark-fink-distribution.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ spec:
- '{{ .Values.distribution.kafka.schema }}'
- '-substream_prefix'
- '{{ .Values.distribution.kafka.substream_prefix }}'
- '-kafka_security_protocol'
- '{{ .Values.distribution.kafka.kafka_security_protocol }}'
- '-night'
- '{{ .Values.night }}'
sparkVersion: "3.4.1"
Expand Down
1 change: 1 addition & 0 deletions chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ distribution:
out_sockets: "kafka-cluster-kafka-external-bootstrap.kafka:9094"
schema: "/home/fink/fink-alert-schemas/ztf/distribution_schema_0p2.avsc"
substream_prefix: "fink_"
kafka_security_protocol: "SASL_PLAINTEXT"


storage: hdfs
Expand Down
3 changes: 2 additions & 1 deletion conf/ztf/fink.conf.distribution
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ FINK_PRODUCER_JAAS="${FINK_HOME}/conf/fink_kafka_producer_jaas.conf"
# Fink test Consumer
FINK_TEST_CONSUMER_JAAS="${FINK_HOME}/conf/fink_kafka_consumer_jaas.conf"

KAFKA_SECURITY_PROTOCOL=PLAINTEXT
KAFKA_SASL_USERNAME=toto
KAFKA_SASL_PASSWORD=tata
KAFKA_BUFFER_MEMORY=134217728
KAFKA_DELIVERY_TIMEOUT_MS=240000
KAFKA_DELIVERY_TIMEOUT_MS=240000
9 changes: 9 additions & 0 deletions fink_broker/ztf/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,15 @@ def getargs(parser: argparse.ArgumentParser) -> argparse.Namespace:
[DISTRIBUTION_SCHEMA]
""",
)
parser.add_argument(
"-kafka_security_protocol",
type=str,
default="",
help="""
the security protocol for kafka producer
[KAFKA_SECURITY_PROTOCOL]
""",
)
parser.add_argument(
"-kafka_sasl_username",
type=str,
Expand Down

0 comments on commit a47657a

Please sign in to comment.