diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 50215476..378b3510 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -48,6 +48,7 @@ jobs: run: | echo "PYTHONPATH=${FINK_HOME}:${FINK_ALERT_SIMULATOR}:${PYTHONPATH}" >> $GITHUB_ENV echo "${FINK_HOME}/bin:${FINK_ALERT_SIMULATOR}/bin" >> $GITHUB_PATH + echo "PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH" >> $GITHUB_ENV - name: Check env run: | echo "FINK_HOME: $FINK_HOME" diff --git a/bin/generate_ssoft.py b/bin/generate_ssoft.py new file mode 100644 index 00000000..07d099ac --- /dev/null +++ b/bin/generate_ssoft.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python +# Copyright 2023 AstroLab Software +# Author: Julien Peloton +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Construct the Solar System Object Fink Table (SSOFT) +""" +import argparse +import datetime + +from fink_broker.loggingUtils import get_fink_logger, inspect_application +from fink_broker.sparkUtils import init_sparksession + +from fink_spins.ssoft import build_the_ssoft + +def main(): + parser = argparse.ArgumentParser(description=__doc__) + + # Add specific arguments + parser.add_argument( + '-model', type=str, default='SHG1G2', + help=""" + Lightcurve model: SHG1G2, HG1G2, HG + """ + ) + parser.add_argument( + '-version', type=str, default=None, + help=""" + Version to use in the format YYYY.MM + Default is None, meaning current Year.Month + """ + ) + parser.add_argument( + '-frac', type=float, default=None, + help=""" + Use only fraction (between 0 and 1) of the input dataset to build the SSOFT + Default is None, meaning all available data is considered. + """ + ) + parser.add_argument( + '-nmin', type=int, default=50, + help=""" + Minimum number of points in the lightcurve of an + object to be considered for the SSOFT. Default is 50 + """ + ) + parser.add_argument( + '--pre_aggregate_data', action="store_true", + help=""" + If specified, aggregate and save data on HDFS before computing the SSOFT (slower). + Otherwise, read pre-aggregated data on HDFS to compute the SSOFT (faster). + """ + ) + args = parser.parse_args(None) + + if args.version is None: + now = datetime.datetime.now() + version = '{}.{:02d}'.format(now.year, now.month) + else: + version = args.version + + # Initialise Spark session + spark = init_sparksession( + name="ssoft_{}_{}".format(args.model, version), + shuffle_partitions=2 + ) + + # The level here should be controlled by an argument. + logger = get_fink_logger(spark.sparkContext.appName, "INFO") + + # debug statements + inspect_application(logger) + + # We map processing 1:1 with the cores + ncores = int(spark.sparkContext.getConf().get("spark.cores.max")) + print("NCORES: {}".format(ncores)) + + if args.pre_aggregate_data: + filename = None + else: + filename = 'sso_aggregated_{}'.format(version) + + pdf = build_the_ssoft( + aggregated_filename=filename, + nproc=ncores, nmin=args.nmin, + frac=args.frac, model=args.model, + version=version + ) + + pdf.to_parquet('ssoft_{}_{}.parquet'.format(args.model, version)) + + +if __name__ == "__main__": + main() diff --git a/bin/index_archival.py b/bin/index_archival.py index 59a3c7b3..5063c6b5 100644 --- a/bin/index_archival.py +++ b/bin/index_archival.py @@ -91,12 +91,12 @@ def main(): # Restrict the input DataFrame to the subset of wanted columns. if 'upper' in args.index_table: df = data.select( - F.col('objectId').cast('string'), - F.col('prv_candidates.jd').cast('array'), - F.col('prv_candidates.fid').cast('array'), - F.col('prv_candidates.magpsf').cast('array'), - F.col('prv_candidates.sigmapsf').cast('array'), - F.col('prv_candidates.diffmaglim').cast('array') + F.col('objectId').cast('string').alias('objectId'), + F.col('prv_candidates.jd').cast('array').alias('jd'), + F.col('prv_candidates.fid').cast('array').alias('fid'), + F.col('prv_candidates.magpsf').cast('array').alias('magpsf'), + F.col('prv_candidates.sigmapsf').cast('array').alias('sigmapsf'), + F.col('prv_candidates.diffmaglim').cast('array').alias('diffmaglim') ) else: df = df_flat diff --git a/conf.sh b/conf.sh index a62578e5..cf1d48ca 100755 --- a/conf.sh +++ b/conf.sh @@ -30,17 +30,16 @@ fi # Spark parameters # ---------------- -# Assuming Scala 2.11 # Spark image tag # Spark image is built here: https://github.com/astrolabsoftware/k8s-spark-py/ -SPARK_IMAGE_TAG="k8s-3.2.3" +SPARK_IMAGE_TAG="k8s-3.4.1" # Spark version -SPARK_VERSION="3.2.3" +SPARK_VERSION="3.4.1" # Name for the Spark archive -SPARK_NAME="spark-${SPARK_VERSION}-bin-hadoop3.2" +SPARK_NAME="spark-${SPARK_VERSION}-bin-hadoop3" # Spark install location SPARK_INSTALL_DIR="${HOME}/fink-k8s-tmp" diff --git a/conf/fink.conf.dev b/conf/fink.conf.dev index 172fecd2..96d72957 100644 --- a/conf/fink.conf.dev +++ b/conf/fink.conf.dev @@ -43,14 +43,14 @@ SECURED_KAFKA_CONFIG='' # Change the version according to your Spark version. # NOTE: HBase packages are not required for Parquet archiving FINK_PACKAGES=\ -org.apache.spark:spark-streaming-kafka-0-10-assembly_2.12:3.1.3,\ -org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3,\ -org.apache.spark:spark-avro_2.12:3.1.3,\ +org.apache.spark:spark-streaming-kafka-0-10-assembly_2.12:3.4.1,\ +org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1,\ +org.apache.spark:spark-avro_2.12:3.4.1,\ org.apache.hbase:hbase-shaded-mapreduce:2.4.9 # Other dependencies (incl. Scala part of Fink) -FINK_JARS=${FINK_HOME}/libs/hbase-spark-hbase2.4_spark3_scala2.12_hadoop3.2.jar,\ -${FINK_HOME}/libs/hbase-spark-protocol-shaded-hbase2.4_spark3_scala2.12_hadoop3.2.jar +FINK_JARS=${FINK_HOME}/libs/hbase-spark-hbase2.4.8_spark3.4.1_scala2.12.0_hadoop3.3.6.jar,\ +${FINK_HOME}/libs/hbase-spark-protocol-shaded-hbase2.4.8_spark3.4.1_scala2.12.0_hadoop3.3.6.jar # Time interval between 2 trigger updates (second) # i.e. the timing of streaming data processing. diff --git a/conf/fink.conf.prod b/conf/fink.conf.prod index 42fe7bc2..b27674cb 100644 --- a/conf/fink.conf.prod +++ b/conf/fink.conf.prod @@ -43,14 +43,16 @@ SECURED_KAFKA_CONFIG='' # Change the version according to your Spark version. # NOTE: HBase packages are not required for Parquet archiving FINK_PACKAGES=\ -org.apache.spark:spark-streaming-kafka-0-10-assembly_2.12:3.1.3,\ -org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3,\ -org.apache.spark:spark-avro_2.12:3.1.3,\ -org.apache.hbase:hbase-shaded-mapreduce:2.2.7 +org.apache.spark:spark-streaming-kafka-0-10-assembly_2.12:3.4.1,\ +org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1,\ +org.apache.spark:spark-avro_2.12:3.4.1,\ +org.apache.hbase:hbase-shaded-mapreduce:2.2.7,\ +org.slf4j:slf4j-log4j12:1.7.36,\ +org.slf4j:slf4j-simple:1.7.36 # Other dependencies (incl. Scala part of Fink) -FINK_JARS=${FINK_HOME}/libs/hbase-spark-hbase2.2_spark3_scala2.11_hadoop2.7.jar,\ -${FINK_HOME}/libs/hbase-spark-protocol-shaded-hbase2.2_spark3_scala2.11_hadoop2.7.jar +FINK_JARS=${FINK_HOME}/libs/hbase-spark-hbase2.3.0_spark3.4.1_scala2.12.0_hadoop3.3.6.jar,\ +${FINK_HOME}/libs/hbase-spark-protocol-shaded-hbase2.3.0_spark3.4.1_scala2.12.0_hadoop3.3.6.jar # Time interval between 2 trigger updates (second) # i.e. the timing of streaming data processing. diff --git a/fink_broker/hbaseUtils.py b/fink_broker/hbaseUtils.py index 3879910a..d8f92f2b 100644 --- a/fink_broker/hbaseUtils.py +++ b/fink_broker/hbaseUtils.py @@ -252,7 +252,7 @@ def bring_to_current_schema(df): # assuming no missing columns for colname, coltype in candidates.items(): - tmp_i.append(F.col(colname).cast(coltype)) + tmp_i.append(F.col(colname).cast(coltype).alias(colname.split('.')[-1])) cols_i = df.select(tmp_i).columns @@ -601,9 +601,9 @@ def assign_column_family_names(df, cols_i, cols_d, cols_b): column qualifiers), and the corresponding column family. """ - cf = {i: 'i' for i in df.select(cols_i).columns} - cf.update({i: 'd' for i in df.select(cols_d).columns}) - cf.update({i: 'b' for i in df.select(cols_b).columns}) + cf = {i: 'i' for i in df.select(['`{}`'.format(k) for k in cols_i]).columns} + cf.update({i: 'd' for i in df.select(['`{}`'.format(k) for k in cols_d]).columns}) + cf.update({i: 'b' for i in df.select(['`{}`'.format(k) for k in cols_b]).columns}) return cf diff --git a/fink_broker/partitioning.py b/fink_broker/partitioning.py index 0eb0b95c..1ebf0f22 100644 --- a/fink_broker/partitioning.py +++ b/fink_broker/partitioning.py @@ -142,9 +142,11 @@ def numPart(df, partition_size=128.): .builder \ .getOrCreate() + logical_plan = df._jdf.queryExecution().logical() + mode = df._jdf.queryExecution().mode() b = spark._jsparkSession\ .sessionState()\ - .executePlan(df._jdf.queryExecution().logical())\ + .executePlan(logical_plan, mode)\ .optimizedPlan()\ .stats()\ .sizeInBytes() diff --git a/fink_broker/science.py b/fink_broker/science.py index b0a846d2..07d52a49 100644 --- a/fink_broker/science.py +++ b/fink_broker/science.py @@ -56,6 +56,8 @@ from fink_science.cats.processor import predict_nn from fink_science.agn.processor import agn_elasticc from fink_science.slsn.processor import slsn_elasticc + from fink_science.fast_transient_rate.processor import magnitude_rate + from fink_science.fast_transient_rate import rate_module_output_schema # from fink_science.t2.processor import t2 except ImportError: _LOG.warning("Fink science modules are not available. ") @@ -208,7 +210,8 @@ def apply_science_modules(df: DataFrame, noscience: bool = False) -> DataFrame: # Retrieve time-series information to_expand = [ 'jd', 'fid', 'magpsf', 'sigmapsf', - 'magnr', 'sigmagnr', 'isdiffpos', 'distnr' + 'magnr', 'sigmagnr', 'isdiffpos', 'distnr', + 'diffmaglim' ] # Append temp columns with historical + current measurements @@ -373,6 +376,19 @@ def apply_science_modules(df: DataFrame, noscience: bool = False) -> DataFrame: .withColumn("lc_features_r", df['lc_features'].getItem("2"))\ .drop('lc_features') + # Apply level one processor: fast transient + _LOG.info("New processor: magnitude rate for fast transient") + mag_rate_args = [ + "candidate.magpsf", "candidate.sigmapsf", "candidate.jd", + "candidate.jdstarthist", "candidate.fid", "cmagpsf", "csigmapsf", + "cjd", "cfid", "cdiffmaglim", F.lit(10000), F.lit(None) + ] + cols_before = df.columns + df = df.withColumn("ft_module", magnitude_rate(*mag_rate_args)) + df = df.select( + cols_before + [df["ft_module"][k].alias(k) for k in rate_module_output_schema.keys()] + ) + # Drop temp columns df = df.drop(*expanded) diff --git a/libs/hbase-spark-hbase2.3.0_spark3.4.1_scala2.12.0_hadoop3.3.6.jar b/libs/hbase-spark-hbase2.3.0_spark3.4.1_scala2.12.0_hadoop3.3.6.jar new file mode 100644 index 00000000..1d6f5f3c Binary files /dev/null and b/libs/hbase-spark-hbase2.3.0_spark3.4.1_scala2.12.0_hadoop3.3.6.jar differ diff --git a/libs/hbase-spark-hbase2.4.8_spark3.4.1_scala2.12.0_hadoop3.3.6.jar b/libs/hbase-spark-hbase2.4.8_spark3.4.1_scala2.12.0_hadoop3.3.6.jar new file mode 100644 index 00000000..6ada7f41 Binary files /dev/null and b/libs/hbase-spark-hbase2.4.8_spark3.4.1_scala2.12.0_hadoop3.3.6.jar differ diff --git a/libs/hbase-spark-protocol-shaded-hbase2.3.0_spark3.4.1_scala2.12.0_hadoop3.3.6.jar b/libs/hbase-spark-protocol-shaded-hbase2.3.0_spark3.4.1_scala2.12.0_hadoop3.3.6.jar new file mode 100644 index 00000000..824fd7e7 Binary files /dev/null and b/libs/hbase-spark-protocol-shaded-hbase2.3.0_spark3.4.1_scala2.12.0_hadoop3.3.6.jar differ diff --git a/libs/hbase-spark-protocol-shaded-hbase2.4.8_spark3.4.1_scala2.12.0_hadoop3.3.6.jar b/libs/hbase-spark-protocol-shaded-hbase2.4.8_spark3.4.1_scala2.12.0_hadoop3.3.6.jar new file mode 100644 index 00000000..48f483f9 Binary files /dev/null and b/libs/hbase-spark-protocol-shaded-hbase2.4.8_spark3.4.1_scala2.12.0_hadoop3.3.6.jar differ