Skip to content

Commit

Permalink
Update to Spark 3.4.1 (#759)
Browse files Browse the repository at this point in the history
* Add explicitly mode in the executePlan args

* Fix typo

* use ticks for name when hsing SQL commands

* Expand PYTHONPATH

* Fix syntax

* hardcode py4j version for test

* Missing dollar when updating ENV

* Update CAST with ALIAS

* Try hadoop3 compiled jar in prod (warning: we still have hbase 2.2 whereas the jar is compiled for hbase 2.4)

* Try hadoop3 compiled jar in prod (warning: we still have hbase 2.2 whereas the jar is compiled for hbase 2.4)

* Bump shaded jar

* Disable PROD workflow to test the hypothesis that HBase 2.2 is not compatible with Hadoop 3

* Manually add deps on Json4s

* Manually add deps on Json4s

* Manually add deps on scala JSON

* Manually add deps on scala JSON

* New HBase connectors

* Add slf4j dependency explicitly

* Add slf4j-simple dependency explicitly

* Downgrade slf4j deps

* Use ALIAS when selecting casted columns

* Upgrade to Spark 3.4.1 on kubernetes

* Automatic generation of the SSOFT (#760)

* New bin to generate the SSOFT

* Update args

* Update type of frac

* Missing deps

* Missing deps

* Fix bug in the argument name

* Fix typo

* Take the number of cores differently

* PEP89

* add fast transient module to the science pipeline (#757)

* add fast transient module to the science pipeline

* fix DataFrame

---------

Co-authored-by: FusRoman <[email protected]>
  • Loading branch information
JulienPeloton and FusRoman authored Nov 28, 2023
1 parent 47e4b0f commit 3a058ca
Show file tree
Hide file tree
Showing 13 changed files with 152 additions and 27 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ jobs:
run: |
echo "PYTHONPATH=${FINK_HOME}:${FINK_ALERT_SIMULATOR}:${PYTHONPATH}" >> $GITHUB_ENV
echo "${FINK_HOME}/bin:${FINK_ALERT_SIMULATOR}/bin" >> $GITHUB_PATH
echo "PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH" >> $GITHUB_ENV
- name: Check env
run: |
echo "FINK_HOME: $FINK_HOME"
Expand Down
105 changes: 105 additions & 0 deletions bin/generate_ssoft.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#!/usr/bin/env python
# Copyright 2023 AstroLab Software
# Author: Julien Peloton
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Construct the Solar System Object Fink Table (SSOFT)
"""
import argparse
import datetime

from fink_broker.loggingUtils import get_fink_logger, inspect_application
from fink_broker.sparkUtils import init_sparksession

from fink_spins.ssoft import build_the_ssoft

def main():
parser = argparse.ArgumentParser(description=__doc__)

# Add specific arguments
parser.add_argument(
'-model', type=str, default='SHG1G2',
help="""
Lightcurve model: SHG1G2, HG1G2, HG
"""
)
parser.add_argument(
'-version', type=str, default=None,
help="""
Version to use in the format YYYY.MM
Default is None, meaning current Year.Month
"""
)
parser.add_argument(
'-frac', type=float, default=None,
help="""
Use only fraction (between 0 and 1) of the input dataset to build the SSOFT
Default is None, meaning all available data is considered.
"""
)
parser.add_argument(
'-nmin', type=int, default=50,
help="""
Minimum number of points in the lightcurve of an
object to be considered for the SSOFT. Default is 50
"""
)
parser.add_argument(
'--pre_aggregate_data', action="store_true",
help="""
If specified, aggregate and save data on HDFS before computing the SSOFT (slower).
Otherwise, read pre-aggregated data on HDFS to compute the SSOFT (faster).
"""
)
args = parser.parse_args(None)

if args.version is None:
now = datetime.datetime.now()
version = '{}.{:02d}'.format(now.year, now.month)
else:
version = args.version

# Initialise Spark session
spark = init_sparksession(
name="ssoft_{}_{}".format(args.model, version),
shuffle_partitions=2
)

# The level here should be controlled by an argument.
logger = get_fink_logger(spark.sparkContext.appName, "INFO")

# debug statements
inspect_application(logger)

# We map processing 1:1 with the cores
ncores = int(spark.sparkContext.getConf().get("spark.cores.max"))
print("NCORES: {}".format(ncores))

if args.pre_aggregate_data:
filename = None
else:
filename = 'sso_aggregated_{}'.format(version)

pdf = build_the_ssoft(
aggregated_filename=filename,
nproc=ncores, nmin=args.nmin,
frac=args.frac, model=args.model,
version=version
)

pdf.to_parquet('ssoft_{}_{}.parquet'.format(args.model, version))


if __name__ == "__main__":
main()
12 changes: 6 additions & 6 deletions bin/index_archival.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ def main():
# Restrict the input DataFrame to the subset of wanted columns.
if 'upper' in args.index_table:
df = data.select(
F.col('objectId').cast('string'),
F.col('prv_candidates.jd').cast('array<double>'),
F.col('prv_candidates.fid').cast('array<int>'),
F.col('prv_candidates.magpsf').cast('array<float>'),
F.col('prv_candidates.sigmapsf').cast('array<float>'),
F.col('prv_candidates.diffmaglim').cast('array<float>')
F.col('objectId').cast('string').alias('objectId'),
F.col('prv_candidates.jd').cast('array<double>').alias('jd'),
F.col('prv_candidates.fid').cast('array<int>').alias('fid'),
F.col('prv_candidates.magpsf').cast('array<float>').alias('magpsf'),
F.col('prv_candidates.sigmapsf').cast('array<float>').alias('sigmapsf'),
F.col('prv_candidates.diffmaglim').cast('array<float>').alias('diffmaglim')
)
else:
df = df_flat
Expand Down
7 changes: 3 additions & 4 deletions conf.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,16 @@ fi

# Spark parameters
# ----------------
# Assuming Scala 2.11

# Spark image tag
# Spark image is built here: https://github.com/astrolabsoftware/k8s-spark-py/
SPARK_IMAGE_TAG="k8s-3.2.3"
SPARK_IMAGE_TAG="k8s-3.4.1"

# Spark version
SPARK_VERSION="3.2.3"
SPARK_VERSION="3.4.1"

# Name for the Spark archive
SPARK_NAME="spark-${SPARK_VERSION}-bin-hadoop3.2"
SPARK_NAME="spark-${SPARK_VERSION}-bin-hadoop3"

# Spark install location
SPARK_INSTALL_DIR="${HOME}/fink-k8s-tmp"
Expand Down
10 changes: 5 additions & 5 deletions conf/fink.conf.dev
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ SECURED_KAFKA_CONFIG=''
# Change the version according to your Spark version.
# NOTE: HBase packages are not required for Parquet archiving
FINK_PACKAGES=\
org.apache.spark:spark-streaming-kafka-0-10-assembly_2.12:3.1.3,\
org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3,\
org.apache.spark:spark-avro_2.12:3.1.3,\
org.apache.spark:spark-streaming-kafka-0-10-assembly_2.12:3.4.1,\
org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1,\
org.apache.spark:spark-avro_2.12:3.4.1,\
org.apache.hbase:hbase-shaded-mapreduce:2.4.9

# Other dependencies (incl. Scala part of Fink)
FINK_JARS=${FINK_HOME}/libs/hbase-spark-hbase2.4_spark3_scala2.12_hadoop3.2.jar,\
${FINK_HOME}/libs/hbase-spark-protocol-shaded-hbase2.4_spark3_scala2.12_hadoop3.2.jar
FINK_JARS=${FINK_HOME}/libs/hbase-spark-hbase2.4.8_spark3.4.1_scala2.12.0_hadoop3.3.6.jar,\
${FINK_HOME}/libs/hbase-spark-protocol-shaded-hbase2.4.8_spark3.4.1_scala2.12.0_hadoop3.3.6.jar

# Time interval between 2 trigger updates (second)
# i.e. the timing of streaming data processing.
Expand Down
14 changes: 8 additions & 6 deletions conf/fink.conf.prod
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,16 @@ SECURED_KAFKA_CONFIG=''
# Change the version according to your Spark version.
# NOTE: HBase packages are not required for Parquet archiving
FINK_PACKAGES=\
org.apache.spark:spark-streaming-kafka-0-10-assembly_2.12:3.1.3,\
org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3,\
org.apache.spark:spark-avro_2.12:3.1.3,\
org.apache.hbase:hbase-shaded-mapreduce:2.2.7
org.apache.spark:spark-streaming-kafka-0-10-assembly_2.12:3.4.1,\
org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1,\
org.apache.spark:spark-avro_2.12:3.4.1,\
org.apache.hbase:hbase-shaded-mapreduce:2.2.7,\
org.slf4j:slf4j-log4j12:1.7.36,\
org.slf4j:slf4j-simple:1.7.36

# Other dependencies (incl. Scala part of Fink)
FINK_JARS=${FINK_HOME}/libs/hbase-spark-hbase2.2_spark3_scala2.11_hadoop2.7.jar,\
${FINK_HOME}/libs/hbase-spark-protocol-shaded-hbase2.2_spark3_scala2.11_hadoop2.7.jar
FINK_JARS=${FINK_HOME}/libs/hbase-spark-hbase2.3.0_spark3.4.1_scala2.12.0_hadoop3.3.6.jar,\
${FINK_HOME}/libs/hbase-spark-protocol-shaded-hbase2.3.0_spark3.4.1_scala2.12.0_hadoop3.3.6.jar

# Time interval between 2 trigger updates (second)
# i.e. the timing of streaming data processing.
Expand Down
8 changes: 4 additions & 4 deletions fink_broker/hbaseUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ def bring_to_current_schema(df):

# assuming no missing columns
for colname, coltype in candidates.items():
tmp_i.append(F.col(colname).cast(coltype))
tmp_i.append(F.col(colname).cast(coltype).alias(colname.split('.')[-1]))

cols_i = df.select(tmp_i).columns

Expand Down Expand Up @@ -601,9 +601,9 @@ def assign_column_family_names(df, cols_i, cols_d, cols_b):
column qualifiers), and the corresponding column family.
"""
cf = {i: 'i' for i in df.select(cols_i).columns}
cf.update({i: 'd' for i in df.select(cols_d).columns})
cf.update({i: 'b' for i in df.select(cols_b).columns})
cf = {i: 'i' for i in df.select(['`{}`'.format(k) for k in cols_i]).columns}
cf.update({i: 'd' for i in df.select(['`{}`'.format(k) for k in cols_d]).columns})
cf.update({i: 'b' for i in df.select(['`{}`'.format(k) for k in cols_b]).columns})

return cf

Expand Down
4 changes: 3 additions & 1 deletion fink_broker/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,11 @@ def numPart(df, partition_size=128.):
.builder \
.getOrCreate()

logical_plan = df._jdf.queryExecution().logical()
mode = df._jdf.queryExecution().mode()
b = spark._jsparkSession\
.sessionState()\
.executePlan(df._jdf.queryExecution().logical())\
.executePlan(logical_plan, mode)\
.optimizedPlan()\
.stats()\
.sizeInBytes()
Expand Down
18 changes: 17 additions & 1 deletion fink_broker/science.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
from fink_science.cats.processor import predict_nn
from fink_science.agn.processor import agn_elasticc
from fink_science.slsn.processor import slsn_elasticc
from fink_science.fast_transient_rate.processor import magnitude_rate
from fink_science.fast_transient_rate import rate_module_output_schema
# from fink_science.t2.processor import t2
except ImportError:
_LOG.warning("Fink science modules are not available. ")
Expand Down Expand Up @@ -208,7 +210,8 @@ def apply_science_modules(df: DataFrame, noscience: bool = False) -> DataFrame:
# Retrieve time-series information
to_expand = [
'jd', 'fid', 'magpsf', 'sigmapsf',
'magnr', 'sigmagnr', 'isdiffpos', 'distnr'
'magnr', 'sigmagnr', 'isdiffpos', 'distnr',
'diffmaglim'
]

# Append temp columns with historical + current measurements
Expand Down Expand Up @@ -373,6 +376,19 @@ def apply_science_modules(df: DataFrame, noscience: bool = False) -> DataFrame:
.withColumn("lc_features_r", df['lc_features'].getItem("2"))\
.drop('lc_features')

# Apply level one processor: fast transient
_LOG.info("New processor: magnitude rate for fast transient")
mag_rate_args = [
"candidate.magpsf", "candidate.sigmapsf", "candidate.jd",
"candidate.jdstarthist", "candidate.fid", "cmagpsf", "csigmapsf",
"cjd", "cfid", "cdiffmaglim", F.lit(10000), F.lit(None)
]
cols_before = df.columns
df = df.withColumn("ft_module", magnitude_rate(*mag_rate_args))
df = df.select(
cols_before + [df["ft_module"][k].alias(k) for k in rate_module_output_schema.keys()]
)

# Drop temp columns
df = df.drop(*expanded)

Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

0 comments on commit 3a058ca

Please sign in to comment.