Skip to content

Commit

Permalink
[stream2raw] Parametrise the producer (#654)
Browse files Browse the repository at this point in the history
* Parametrise the producer

* Elasticc need auth as well
  • Loading branch information
JulienPeloton authored Oct 25, 2022
1 parent 052d432 commit 87439da
Show file tree
Hide file tree
Showing 11 changed files with 42 additions and 13 deletions.
2 changes: 1 addition & 1 deletion bin/distribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def main():
args = getargs(parser)

# Initialise Spark session
spark = init_sparksession(name="distribute_{}".format(args.night), shuffle_partitions=2)
spark = init_sparksession(name="distribute_{}_{}".format(args.producer, args.night), shuffle_partitions=2)

# The level here should be controlled by an argument.
logger = get_fink_logger(spark.sparkContext.appName, args.log_level)
Expand Down
4 changes: 3 additions & 1 deletion bin/fink
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ elif [[ $service == "stream2raw" ]]; then
spark-submit --master ${SPARK_MASTER} \
--packages ${FINK_PACKAGES} ${PYTHON_EXTRA_FILE} \
${SECURED_KAFKA_CONFIG} ${EXTRA_SPARK_CONFIG} \
${FINK_HOME}/bin/stream2raw.py ${HELP_ON_SERVICE} -servers ${KAFKA_IPPORT} -topic ${KAFKA_TOPIC} \
${FINK_HOME}/bin/stream2raw.py ${HELP_ON_SERVICE} -producer ${PRODUCER} -servers ${KAFKA_IPPORT} -topic ${KAFKA_TOPIC} \
-schema ${FINK_ALERT_SCHEMA} -startingoffsets_stream ${KAFKA_STARTING_OFFSET} \
-online_data_prefix ${ONLINE_DATA_PREFIX} \
-tinterval ${FINK_TRIGGER_UPDATE} -log_level ${LOG_LEVEL} ${EXIT_AFTER}
Expand All @@ -264,6 +264,7 @@ elif [[ $service == "raw2science" ]]; then
${PYTHON_EXTRA_FILE} \
${SECURED_KAFKA_CONFIG} ${EXTRA_SPARK_CONFIG} \
${FINK_HOME}/bin/raw2science.py ${HELP_ON_SERVICE} \
-producer ${PRODUCER} \
-online_data_prefix ${ONLINE_DATA_PREFIX} \
-tinterval ${FINK_TRIGGER_UPDATE} \
-night ${NIGHT} \
Expand All @@ -285,6 +286,7 @@ elif [[ $service == "distribution" ]]; then
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=${FINK_PRODUCER_JAAS}" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=${FINK_PRODUCER_JAAS}" \
$SCRIPT ${HELP_ON_SERVICE} \
-producer ${PRODUCER} \
-online_data_prefix ${ONLINE_DATA_PREFIX} \
-distribution_servers ${DISTRIBUTION_SERVERS} \
-distribution_schema ${DISTRIBUTION_SCHEMA} \
Expand Down
4 changes: 2 additions & 2 deletions bin/raw2science.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def main():
tz = None

# Initialise Spark session
spark = init_sparksession(name="raw2science_{}".format(args.night), shuffle_partitions=2, tz=tz)
spark = init_sparksession(name="raw2science_{}_{}".format(args.producer, args.night), shuffle_partitions=2, tz=tz)

# Logger to print useful debug statements
logger = get_fink_logger(spark.sparkContext.appName, args.log_level)
Expand All @@ -62,7 +62,7 @@ def main():
scitmpdatapath = args.online_data_prefix + '/science'
checkpointpath_sci_tmp = args.online_data_prefix + '/science_checkpoint'

if args.night == 'elasticc':
if args.producer == 'elasticc':
df = connect_to_raw_database(
rawdatapath, rawdatapath, latestfirst=False
)
Expand Down
19 changes: 11 additions & 8 deletions bin/stream2raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def main():
tz = None

# Initialise Spark session
spark = init_sparksession(name="stream2raw", shuffle_partitions=2, tz=tz)
spark = init_sparksession(name="stream2raw_{}_{}".format(args.producer, args.night), shuffle_partitions=2, tz=tz)

# The level here should be controlled by an argument.
logger = get_fink_logger(spark.sparkContext.appName, args.log_level)
Expand All @@ -64,7 +64,10 @@ def main():
checkpointpath_raw = args.online_data_prefix + '/raw_checkpoint'

# Create a streaming dataframe pointing to a Kafka stream
kerberos = 'public2.alerts.ztf' in args.servers
if (args.producer == 'ztf') or (args.producer == 'elasticc'):
kerberos = True
else:
kerberos = False
df = connect_to_kafka(
servers=args.servers,
topic=args.topic,
Expand All @@ -73,27 +76,27 @@ def main():
kerberos=kerberos)

# Get Schema of alerts
if 'elasticc' not in args.topic:
if args.producer != 'elasticc':
alert_schema, _, alert_schema_json = get_schemas_from_avro(args.schema)

# Decode the Avro data, and keep only (timestamp, data)
if '134.158.' in args.servers or 'localhost' in args.servers:
if args.producer == 'sims':
# using custom from_avro (not available for Spark 2.4.x)
# it will be available from Spark 3.0 though
df_decoded = df.select(
[
from_avro(df["value"], alert_schema_json).alias("decoded")
]
)
elif 'elasticc' in args.topic:
elif args.producer == 'elasticc':
schema = fastavro.schema.load_schema(args.schema)
alert_schema_json = fastavro.schema.to_parsing_canonical_form(schema)
df_decoded = df.select(
[
from_avro(df["value"], alert_schema_json).alias("decoded")
]
)
elif 'public2.alerts.ztf' in args.servers:
elif args.producer == 'ztf':
# Decode on-the-fly using fastavro
f = F.udf(lambda x: next(fastavro.reader(io.BytesIO(x))), alert_schema)
df_decoded = df.select(
Expand All @@ -102,8 +105,8 @@ def main():
]
)
else:
msg = "Data source {} is not known - a decoder must be set".format(
args.servers)
msg = "Data source {} and producer {} is not known - a decoder must be set".format(
args.servers, args.producer)
logger.warn(msg)
spark.stop()

Expand Down
3 changes: 3 additions & 0 deletions conf/fink.conf
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
######################################
# Alert producer: ztf, elasticc or sims
PRODUCER=sims

# Local mode (Kafka cluster is spun up on-the-fly in docker).
# Must match the ones used for the Producer
KAFKA_PORT_SIM=29092
Expand Down
3 changes: 3 additions & 0 deletions conf/fink.conf.dev
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
######################################
# Alert producer: ztf, elasticc or sims
PRODUCER=sims

# Local mode (Kafka cluster is spun up on-the-fly in docker).
# Must match the ones used for the Producer
KAFKA_PORT_SIM=9092
Expand Down
3 changes: 3 additions & 0 deletions conf/fink.conf.prod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
######################################
# Alert producer: ztf, elasticc or sims
PRODUCER=sims

# Local mode (Kafka cluster is spun up on-the-fly in docker).
# Must match the ones used for the Producer
KAFKA_PORT_SIM=9092
Expand Down
3 changes: 3 additions & 0 deletions conf_cluster/fink.conf.ztf_distribute
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
######################################
# Alert producer: ztf, elasticc or sims
PRODUCER=ztf

# Local mode (Kafka cluster is spun up on-the-fly in docker).
# Must match the ones used for the Producer
KAFKA_PORT_SIM=29092
Expand Down
3 changes: 3 additions & 0 deletions conf_cluster/fink.conf.ztf_raw2science
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
######################################
# Alert producer: ztf, elasticc or sims
PRODUCER=ztf

# Local mode (Kafka cluster is spun up on-the-fly in docker).
# Must match the ones used for the Producer
KAFKA_PORT_SIM=29092
Expand Down
3 changes: 3 additions & 0 deletions conf_cluster/fink.conf.ztf_stream2raw
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
######################################
# Alert producer: ztf, elasticc or sims
PRODUCER=ztf

# Local mode (Kafka cluster is spun up on-the-fly in docker).
# Must match the ones used for the Producer
KAFKA_PORT_SIM=29092
Expand Down
8 changes: 7 additions & 1 deletion fink_broker/parser.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2019 AstroLab Software
# Copyright 2019-2022 AstroLab Software
# Author: Julien Peloton
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -230,6 +230,12 @@ def getargs(parser: argparse.ArgumentParser) -> argparse.Namespace:
Folder that contains fink-fat output parquet files
[FINK_FAT_OUTPUT]
""")
parser.add_argument(
'-producer', type=str, default='ztf',
help="""
Name of the alert producer. Currently available: ztf, elasticc, sims
[PRODUCER]
""")
args = parser.parse_args(None)
return args

Expand Down

0 comments on commit 87439da

Please sign in to comment.