From 87439da8072925d1106999d95c5ebcd6af4843bd Mon Sep 17 00:00:00 2001 From: Julien Date: Tue, 25 Oct 2022 09:50:09 +0200 Subject: [PATCH] [stream2raw] Parametrise the producer (#654) * Parametrise the producer * Elasticc need auth as well --- bin/distribute.py | 2 +- bin/fink | 4 +++- bin/raw2science.py | 4 ++-- bin/stream2raw.py | 19 +++++++++++-------- conf/fink.conf | 3 +++ conf/fink.conf.dev | 3 +++ conf/fink.conf.prod | 3 +++ conf_cluster/fink.conf.ztf_distribute | 3 +++ conf_cluster/fink.conf.ztf_raw2science | 3 +++ conf_cluster/fink.conf.ztf_stream2raw | 3 +++ fink_broker/parser.py | 8 +++++++- 11 files changed, 42 insertions(+), 13 deletions(-) diff --git a/bin/distribute.py b/bin/distribute.py index 89d75ad3..1d8b9f02 100644 --- a/bin/distribute.py +++ b/bin/distribute.py @@ -51,7 +51,7 @@ def main(): args = getargs(parser) # Initialise Spark session - spark = init_sparksession(name="distribute_{}".format(args.night), shuffle_partitions=2) + spark = init_sparksession(name="distribute_{}_{}".format(args.producer, args.night), shuffle_partitions=2) # The level here should be controlled by an argument. logger = get_fink_logger(spark.sparkContext.appName, args.log_level) diff --git a/bin/fink b/bin/fink index 457945d0..afe8d612 100755 --- a/bin/fink +++ b/bin/fink @@ -251,7 +251,7 @@ elif [[ $service == "stream2raw" ]]; then spark-submit --master ${SPARK_MASTER} \ --packages ${FINK_PACKAGES} ${PYTHON_EXTRA_FILE} \ ${SECURED_KAFKA_CONFIG} ${EXTRA_SPARK_CONFIG} \ - ${FINK_HOME}/bin/stream2raw.py ${HELP_ON_SERVICE} -servers ${KAFKA_IPPORT} -topic ${KAFKA_TOPIC} \ + ${FINK_HOME}/bin/stream2raw.py ${HELP_ON_SERVICE} -producer ${PRODUCER} -servers ${KAFKA_IPPORT} -topic ${KAFKA_TOPIC} \ -schema ${FINK_ALERT_SCHEMA} -startingoffsets_stream ${KAFKA_STARTING_OFFSET} \ -online_data_prefix ${ONLINE_DATA_PREFIX} \ -tinterval ${FINK_TRIGGER_UPDATE} -log_level ${LOG_LEVEL} ${EXIT_AFTER} @@ -264,6 +264,7 @@ elif [[ $service == "raw2science" ]]; then ${PYTHON_EXTRA_FILE} \ ${SECURED_KAFKA_CONFIG} ${EXTRA_SPARK_CONFIG} \ ${FINK_HOME}/bin/raw2science.py ${HELP_ON_SERVICE} \ + -producer ${PRODUCER} \ -online_data_prefix ${ONLINE_DATA_PREFIX} \ -tinterval ${FINK_TRIGGER_UPDATE} \ -night ${NIGHT} \ @@ -285,6 +286,7 @@ elif [[ $service == "distribution" ]]; then --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=${FINK_PRODUCER_JAAS}" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=${FINK_PRODUCER_JAAS}" \ $SCRIPT ${HELP_ON_SERVICE} \ + -producer ${PRODUCER} \ -online_data_prefix ${ONLINE_DATA_PREFIX} \ -distribution_servers ${DISTRIBUTION_SERVERS} \ -distribution_schema ${DISTRIBUTION_SCHEMA} \ diff --git a/bin/raw2science.py b/bin/raw2science.py index 169db7aa..53191797 100644 --- a/bin/raw2science.py +++ b/bin/raw2science.py @@ -49,7 +49,7 @@ def main(): tz = None # Initialise Spark session - spark = init_sparksession(name="raw2science_{}".format(args.night), shuffle_partitions=2, tz=tz) + spark = init_sparksession(name="raw2science_{}_{}".format(args.producer, args.night), shuffle_partitions=2, tz=tz) # Logger to print useful debug statements logger = get_fink_logger(spark.sparkContext.appName, args.log_level) @@ -62,7 +62,7 @@ def main(): scitmpdatapath = args.online_data_prefix + '/science' checkpointpath_sci_tmp = args.online_data_prefix + '/science_checkpoint' - if args.night == 'elasticc': + if args.producer == 'elasticc': df = connect_to_raw_database( rawdatapath, rawdatapath, latestfirst=False ) diff --git a/bin/stream2raw.py b/bin/stream2raw.py index 8328e811..ccbcd925 100644 --- a/bin/stream2raw.py +++ b/bin/stream2raw.py @@ -51,7 +51,7 @@ def main(): tz = None # Initialise Spark session - spark = init_sparksession(name="stream2raw", shuffle_partitions=2, tz=tz) + spark = init_sparksession(name="stream2raw_{}_{}".format(args.producer, args.night), shuffle_partitions=2, tz=tz) # The level here should be controlled by an argument. logger = get_fink_logger(spark.sparkContext.appName, args.log_level) @@ -64,7 +64,10 @@ def main(): checkpointpath_raw = args.online_data_prefix + '/raw_checkpoint' # Create a streaming dataframe pointing to a Kafka stream - kerberos = 'public2.alerts.ztf' in args.servers + if (args.producer == 'ztf') or (args.producer == 'elasticc'): + kerberos = True + else: + kerberos = False df = connect_to_kafka( servers=args.servers, topic=args.topic, @@ -73,11 +76,11 @@ def main(): kerberos=kerberos) # Get Schema of alerts - if 'elasticc' not in args.topic: + if args.producer != 'elasticc': alert_schema, _, alert_schema_json = get_schemas_from_avro(args.schema) # Decode the Avro data, and keep only (timestamp, data) - if '134.158.' in args.servers or 'localhost' in args.servers: + if args.producer == 'sims': # using custom from_avro (not available for Spark 2.4.x) # it will be available from Spark 3.0 though df_decoded = df.select( @@ -85,7 +88,7 @@ def main(): from_avro(df["value"], alert_schema_json).alias("decoded") ] ) - elif 'elasticc' in args.topic: + elif args.producer == 'elasticc': schema = fastavro.schema.load_schema(args.schema) alert_schema_json = fastavro.schema.to_parsing_canonical_form(schema) df_decoded = df.select( @@ -93,7 +96,7 @@ def main(): from_avro(df["value"], alert_schema_json).alias("decoded") ] ) - elif 'public2.alerts.ztf' in args.servers: + elif args.producer == 'ztf': # Decode on-the-fly using fastavro f = F.udf(lambda x: next(fastavro.reader(io.BytesIO(x))), alert_schema) df_decoded = df.select( @@ -102,8 +105,8 @@ def main(): ] ) else: - msg = "Data source {} is not known - a decoder must be set".format( - args.servers) + msg = "Data source {} and producer {} is not known - a decoder must be set".format( + args.servers, args.producer) logger.warn(msg) spark.stop() diff --git a/conf/fink.conf b/conf/fink.conf index d59c6ffe..0720ad10 100644 --- a/conf/fink.conf +++ b/conf/fink.conf @@ -13,6 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. ###################################### +# Alert producer: ztf, elasticc or sims +PRODUCER=sims + # Local mode (Kafka cluster is spun up on-the-fly in docker). # Must match the ones used for the Producer KAFKA_PORT_SIM=29092 diff --git a/conf/fink.conf.dev b/conf/fink.conf.dev index ede22165..2eae9751 100644 --- a/conf/fink.conf.dev +++ b/conf/fink.conf.dev @@ -13,6 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. ###################################### +# Alert producer: ztf, elasticc or sims +PRODUCER=sims + # Local mode (Kafka cluster is spun up on-the-fly in docker). # Must match the ones used for the Producer KAFKA_PORT_SIM=9092 diff --git a/conf/fink.conf.prod b/conf/fink.conf.prod index a37359dd..2ea87ab2 100644 --- a/conf/fink.conf.prod +++ b/conf/fink.conf.prod @@ -13,6 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. ###################################### +# Alert producer: ztf, elasticc or sims +PRODUCER=sims + # Local mode (Kafka cluster is spun up on-the-fly in docker). # Must match the ones used for the Producer KAFKA_PORT_SIM=9092 diff --git a/conf_cluster/fink.conf.ztf_distribute b/conf_cluster/fink.conf.ztf_distribute index 97e1d387..cd589b20 100644 --- a/conf_cluster/fink.conf.ztf_distribute +++ b/conf_cluster/fink.conf.ztf_distribute @@ -13,6 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. ###################################### +# Alert producer: ztf, elasticc or sims +PRODUCER=ztf + # Local mode (Kafka cluster is spun up on-the-fly in docker). # Must match the ones used for the Producer KAFKA_PORT_SIM=29092 diff --git a/conf_cluster/fink.conf.ztf_raw2science b/conf_cluster/fink.conf.ztf_raw2science index a3838570..3d490902 100644 --- a/conf_cluster/fink.conf.ztf_raw2science +++ b/conf_cluster/fink.conf.ztf_raw2science @@ -13,6 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. ###################################### +# Alert producer: ztf, elasticc or sims +PRODUCER=ztf + # Local mode (Kafka cluster is spun up on-the-fly in docker). # Must match the ones used for the Producer KAFKA_PORT_SIM=29092 diff --git a/conf_cluster/fink.conf.ztf_stream2raw b/conf_cluster/fink.conf.ztf_stream2raw index 25d0e692..8cd2031e 100644 --- a/conf_cluster/fink.conf.ztf_stream2raw +++ b/conf_cluster/fink.conf.ztf_stream2raw @@ -13,6 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. ###################################### +# Alert producer: ztf, elasticc or sims +PRODUCER=ztf + # Local mode (Kafka cluster is spun up on-the-fly in docker). # Must match the ones used for the Producer KAFKA_PORT_SIM=29092 diff --git a/fink_broker/parser.py b/fink_broker/parser.py index 4847e767..926da385 100644 --- a/fink_broker/parser.py +++ b/fink_broker/parser.py @@ -1,4 +1,4 @@ -# Copyright 2019 AstroLab Software +# Copyright 2019-2022 AstroLab Software # Author: Julien Peloton # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -230,6 +230,12 @@ def getargs(parser: argparse.ArgumentParser) -> argparse.Namespace: Folder that contains fink-fat output parquet files [FINK_FAT_OUTPUT] """) + parser.add_argument( + '-producer', type=str, default='ztf', + help=""" + Name of the alert producer. Currently available: ztf, elasticc, sims + [PRODUCER] + """) args = parser.parse_args(None) return args