Skip to content

Commit

Permalink
Sentinel should work
Browse files Browse the repository at this point in the history
  • Loading branch information
JulienPeloton committed Jan 29, 2025
1 parent 4aea38f commit 3f4e0cb
Show file tree
Hide file tree
Showing 14 changed files with 80 additions and 67 deletions.
33 changes: 20 additions & 13 deletions bin/fink
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,20 @@ if [[ $DEPLOY_FINK_PYTHON == "true" ]]; then
cd -
fi

# Only if the operator pass an index table
if [[ ${INDEXTABLE} ]]; then
INDEXTABLE="-index_table ${INDEXTABLE}"
else
INDEXTABLE=""
fi

# Only if the operator pass the location of the TNS catalog
if [[ ${TNS_RAW_OUTPUT} ]]; then
TNS_RAW_OUTPUT="-tns_raw_output ${TNS_RAW_OUTPUT}"
else
TNS_RAW_OUTPUT=""
fi

DISTRIBUTE_OPTIONS=""
if [[ $service == "distribute" ]]; then
if [[ -f $conf_distribution ]]; then
Expand All @@ -211,14 +225,16 @@ if [[ $service == "distribute" ]]; then
source ${FINK_HOME}/conf/${SURVEY}/fink.conf.distribution
fi

DISTRIBUTE_OPTIONS="--files ${FINK_HOME}/conf/fink_kafka_producer_jaas.conf --driver-java-options -Djava.security.auth.login.config=${FINK_PRODUCER_JAAS} --conf spark.driver.extraJavaOptions=-Djava.security.auth.login.config=${FINK_PRODUCER_JAAS} --conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=${FINK_PRODUCER_JAAS} --conf spark.executorEnv.KNWEBHOOK=${KNWEBHOOK} --conf spark.executorEnv.KNWEBHOOK_FINK=${KNWEBHOOK_FINK} --conf spark.executorEnv.KNWEBHOOK_AMA_CL=${KNWEBHOOK_AMA_CL} --conf spark.executorEnv.KNWEBHOOK_AMA_GALAXIES=${KNWEBHOOK_AMA_GALAXIES} --conf spark.executorEnv.KNWEBHOOK_DWF=${KNWEBHOOK_DWF} --conf spark.executorEnv.FINK_TG_TOKEN=${FINK_TG_TOKEN}"
DISTRIBUTE_OPTIONS_SPARK="--files ${FINK_HOME}/conf/fink_kafka_producer_jaas.conf --driver-java-options -Djava.security.auth.login.config=${FINK_PRODUCER_JAAS} --conf spark.driver.extraJavaOptions=-Djava.security.auth.login.config=${FINK_PRODUCER_JAAS} --conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=${FINK_PRODUCER_JAAS} --conf spark.executorEnv.KNWEBHOOK=${KNWEBHOOK} --conf spark.executorEnv.KNWEBHOOK_FINK=${KNWEBHOOK_FINK} --conf spark.executorEnv.KNWEBHOOK_AMA_CL=${KNWEBHOOK_AMA_CL} --conf spark.executorEnv.KNWEBHOOK_AMA_GALAXIES=${KNWEBHOOK_AMA_GALAXIES} --conf spark.executorEnv.KNWEBHOOK_DWF=${KNWEBHOOK_DWF} --conf spark.executorEnv.FINK_TG_TOKEN=${FINK_TG_TOKEN}"
DISTRIBUTE_OPTIONS="-distribution_servers ${DISTRIBUTION_SERVERS} -substream_prefix ${SUBSTREAM_PREFIX} -kafka_sasl_username ${KAFKA_SASL_USERNAME} -kafka_sasl_password ${KAFKA_SASL_PASSWORD} -kafka_buffer_memory ${KAFKA_BUFFER_MEMORY} -kafka_delivery_timeout_ms ${KAFKA_DELIVERY_TIMEOUT_MS}"
fi

spark-submit --master ${SPARK_MASTER} \
--packages ${FINK_PACKAGES} ${PYTHON_EXTRA_FILE} \
--packages ${FINK_PACKAGES} --jars ${FINK_JARS} ${PYTHON_EXTRA_FILE} \
${EXTRA_SPARK_CONFIG} \
--driver-memory ${DRIVER_MEMORY} --executor-memory ${EXECUTOR_MEMORY} \
--conf spark.cores.max=${SPARK_CORE_MAX} --conf spark.executor.cores=${SPARK_EXECUTOR_CORES} \
${DISTRIBUTE_OPTIONS_SPARK} \
${FINK_HOME}/bin/${SURVEY}/${service}.py ${HELP_ON_SERVICE} \
-producer ${SURVEY} \
-servers ${KAFKA_IPPORT} \
Expand All @@ -229,22 +245,13 @@ spark-submit --master ${SPARK_MASTER} \
-max_offsets_per_trigger ${MAX_OFFSETS_PER_TRIGGER} \
-online_data_prefix ${ONLINE_DATA_PREFIX} \
-agg_data_prefix ${AGG_DATA_PREFIX} \
-distribution_servers ${DISTRIBUTION_SERVERS} \
-distribution_schema ${DISTRIBUTION_SCHEMA} \
-kafka_sasl_username ${KAFKA_SASL_USERNAME} \
-kafka_sasl_password ${KAFKA_SASL_PASSWORD} \
-kafka_buffer_memory ${KAFKA_BUFFER_MEMORY} \
-kafka_delivery_timeout_ms ${KAFKA_DELIVERY_TIMEOUT_MS} \
-substream_prefix ${SUBSTREAM_PREFIX} \
-tinterval ${FINK_TRIGGER_UPDATE} \
-mmconfigpath ${FINK_MM_CONFIG} \
-fink_fat_output ${FINK_FAT_OUTPUT} \
-hostless_folder $HOSTLESS_FOLDER \
-hostless_folder ${HOSTLESS_FOLDER} \
-science_db_name ${SCIENCE_DB_NAME} \
-science_db_catalogs ${SCIENCE_DB_CATALOGS} \
-index_table ${INDEXTABLE} \
-tns_folder ${TNS_FOLDER} \
-tns_raw_output ${TNS_RAW_OUTPUT} \
${DISTRIBUTE_OPTIONS} ${NOSCIENCE} ${TNS_SANDBOX} \
${DISTRIBUTE_OPTIONS} ${TNS_RAW_OUTPUT} ${INDEXTABLE} ${NOSCIENCE} \
-log_level ${LOG_LEVEL} ${EXIT_AFTER}

37 changes: 23 additions & 14 deletions bin/fink_test
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,18 @@ fi

# Stream integration tests
if [[ "$WITH_STREAM" = true ]] ; then
RESOURCES="-driver-memory 2g -executor-memory 2g -spark-cores-max 2 -spark-executor-cores 1"
# Fire another stream
fink_simulator -c ${FINK_HOME}/conf/${SURVEY}/fink_alert_simulator.conf

# Connect the service to build the raw database from the stream
fink start stream2raw -s ${SURVEY} --simulator -exit_after 60 -c $conf -topic $KAFKA_TOPIC -night $NIGHT
fink start stream2raw -s ${SURVEY} --simulator -exit_after 60 -c $conf -topic $KAFKA_TOPIC -night $NIGHT ${RESOURCES}

# Connect the service to build the science database from the raw one
fink start raw2science -s ${SURVEY} -exit_after 120 -c $conf -night $NIGHT
fink start raw2science -s ${SURVEY} -exit_after 120 -c $conf -night $NIGHT ${RESOURCES}

# Start the distribution service
fink start distribution -s ${SURVEY} -exit_after 60 -c $conf -night $NIGHT
fink start distribute -s ${SURVEY} -exit_after 60 -c $conf -night $NIGHT ${RESOURCES}

# check we have all topics
echo "TOPICS FOUND:"
Expand All @@ -129,22 +130,23 @@ fi

# DB Integration tests
if [[ "$WITH_DB" = true ]] ; then
RESOURCES="-driver-memory 2g -executor-memory 2g -spark-cores-max 2 -spark-executor-cores 1"
# merge data
fink start merge -s ${SURVEY} -c $conf -night $NIGHT
fink start merge -s ${SURVEY} -c $conf -night $NIGHT ${RESOURCES}

# Update science portal
fink start archive_science -s ${SURVEY} -c $conf -night $NIGHT
fink start archive_index -s ${SURVEY} -c $conf -night $NIGHT -index_table pixel128_jd_objectId
fink start archive_index -s ${SURVEY} -c $conf -night $NIGHT -index_table class_jd_objectId
fink start archive_index -s ${SURVEY} -c $conf -night $NIGHT -index_table upper_objectId_jd
fink start archive_index -s ${SURVEY} -c $conf -night $NIGHT -index_table uppervalid_objectId_jd
fink start archive_index -s ${SURVEY} -c $conf -night $NIGHT -index_table ssnamenr_jd
fink start archive_index -s ${SURVEY} -c $conf -night $NIGHT -index_table tracklet_objectId
fink start archive_science -s ${SURVEY} -c $conf -night $NIGHT ${RESOURCES}
fink start archive_index -s ${SURVEY} -c $conf -night $NIGHT -index_table pixel128_jd_objectId ${RESOURCES}
fink start archive_index -s ${SURVEY} -c $conf -night $NIGHT -index_table class_jd_objectId ${RESOURCES}
fink start archive_index -s ${SURVEY} -c $conf -night $NIGHT -index_table upper_objectId_jd ${RESOURCES}
fink start archive_index -s ${SURVEY} -c $conf -night $NIGHT -index_table uppervalid_objectId_jd ${RESOURCES}
fink start archive_index -s ${SURVEY} -c $conf -night $NIGHT -index_table ssnamenr_jd ${RESOURCES}
fink start archive_index -s ${SURVEY} -c $conf -night $NIGHT -index_table tracklet_objectId ${RESOURCES}

# SSO candidates
fink start archive_sso_cand -s ${SURVEY} -c $conf -night $NIGHT
fink start archive_sso_cand -s ${SURVEY} -c $conf -night $NIGHT ${RESOURCES}

fink start archive_statistics -s ${SURVEY} -c $conf -night $NIGHT
fink start archive_statistics -s ${SURVEY} -c $conf -night $NIGHT ${RESOURCES}

fi

Expand All @@ -157,7 +159,7 @@ fi
if [[ "$WITH_UNITS" = true ]] ; then
# Run the test suite on the modules assuming the integration
# tests have been run (to build the databases)
for i in ${FINK_HOME}/fink_broker/*.py
for i in ${FINK_HOME}/fink_broker/${SURVEY}/*.py
do
if [[ ${i##*/} = 'monitoring' ]] ; then
echo "skip {i}"
Expand All @@ -167,6 +169,13 @@ if [[ "$WITH_UNITS" = true ]] ; then
--rcfile ${FINK_HOME}/.coveragerc $i
fi
done

for i in ${FINK_HOME}/fink_broker/common/*.py
do
coverage run \
--source=${FINK_HOME} \
--rcfile ${FINK_HOME}/.coveragerc $i
done
fi

# Combine individual reports in one
Expand Down
2 changes: 1 addition & 1 deletion bin/ztf/archive_science.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from fink_broker.ztf.hbase_utils import push_full_df_to_hbase
from fink_broker.common.spark_utils import list_hdfs_files

from fink_broker.logging_utils import get_fink_logger, inspect_application
from fink_broker.common.logging_utils import get_fink_logger, inspect_application


def main():
Expand Down
5 changes: 3 additions & 2 deletions bin/ztf/distribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ def push_to_kafka(df_in, topicname, cnames, checkpointpath_kafka, tinterval, kaf
disquery = (
df_kafka.writeStream.format("kafka")
.option("kafka.bootstrap.servers", kafka_cfg["bootstrap.servers"])
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
.option("kafka.security.protocol", "PLAINTEXT")
# .option("kafka.security.protocol", "SASL_PLAINTEXT")
# .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
# .option("kafka.sasl.username", kafka_cfg["username"])
# .option("kafka.sasl.password", kafka_cfg["password"])
# .option("kafka.buffer.memory", kafka_cfg["kafka_buf_mem"])
Expand Down
8 changes: 3 additions & 5 deletions conf/ztf/fink.conf.dev
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ KAFKA_IPPORT_SIM="localhost:${KAFKA_PORT_SIM}"

# Cluster mode - you need a Kafka cluster installed with write mode.
# Must match the one used for the Producer
KAFKA_IPPORT=""
KAFKA_IPPORT="${KAFKA_IPPORT_SIM}"

# From which offset you want to start pulling data. Options are:
# latest (only new data), earliest (connect from the oldest
Expand All @@ -45,8 +45,7 @@ org.slf4j:slf4j-log4j12:1.7.36,\
org.slf4j:slf4j-simple:1.7.36

# Other dependencies (incl. Scala part of Fink)
FINK_JARS=${FINK_HOME}/libs/fink-broker_2.11-1.2.jar,\
${FINK_HOME}/libs/hbase-spark-hbase2.3.0_spark3.4.1_scala2.12.0_hadoop3.3.6.jar,\
FINK_JARS=${FINK_HOME}/libs/hbase-spark-hbase2.3.0_spark3.4.1_scala2.12.0_hadoop3.3.6.jar,\
${FINK_HOME}/libs/hbase-spark-protocol-shaded-hbase2.3.0_spark3.4.1_scala2.12.0_hadoop3.3.6.jar

# Time interval between 2 trigger updates (second)
Expand Down Expand Up @@ -94,8 +93,7 @@ FINK_MM_CONFIG=${FINK_HOME}/conf/ztf/fink_mm.conf

# TNS configuration
TNS_FOLDER=${FINK_HOME}/tns_logs
TNS_RAW_OUTPUT=/spark_mongo_tmp/julien.peloton
TNS_SANDBOX=false
TNS_RAW_OUTPUT=""

# Hostless folder
HOSTLESS_FOLDER=${FINK_HOME}/hostless_ids
4 changes: 1 addition & 3 deletions conf/ztf/fink.conf.prod
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ org.apache.spark:spark-avro_2.12:3.4.1,\
org.apache.hbase:hbase-shaded-mapreduce:2.2.7

# Other dependencies (incl. Scala part of Fink)
FINK_JARS=${FINK_HOME}/libs/fink-broker_2.11-1.2.jar,\
${FINK_HOME}/libs/hbase-spark-hbase2.3.0_spark3.4.1_scala2.12.0_hadoop3.3.6.jar,\
FINK_JARS={FINK_HOME}/libs/hbase-spark-hbase2.3.0_spark3.4.1_scala2.12.0_hadoop3.3.6.jar,\
${FINK_HOME}/libs/hbase-spark-protocol-shaded-hbase2.3.0_spark3.4.1_scala2.12.0_hadoop3.3.6.jar

# Time interval between 2 trigger updates (second)
Expand Down Expand Up @@ -93,7 +92,6 @@ FINK_MM_CONFIG=${FINK_HOME}/conf/ztf/fink_mm.conf
# TNS configuration
TNS_FOLDER=${FINK_HOME}/tns_logs
TNS_RAW_OUTPUT=/spark_mongo_tmp/julien.peloton
TNS_SANDBOX=false

# Hostless folder
HOSTLESS_FOLDER=${FINK_HOME}/hostless_ids
2 changes: 1 addition & 1 deletion fink_broker/common/distribution_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from pyspark.sql.functions import struct, lit
from pyspark.sql.avro.functions import to_avro as to_avro_native

from fink_broker.tester import spark_unit_tests
from fink_broker.common.tester import spark_unit_tests


def get_kafka_df(df: DataFrame, key: str, elasticc: bool = False) -> DataFrame:
Expand Down
6 changes: 3 additions & 3 deletions fink_broker/common/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def convert_to_millitime(jd: pd.Series, format=None, now=None):
Examples
--------
>>> from fink_broker.spark_utils import load_parquet_files
>>> from fink_broker.common.spark_utils import load_parquet_files
>>> df = load_parquet_files("online/raw/20200101")
>>> df = df.withColumn('millis', convert_to_millitime(df['candidate.jd']))
>>> pdf = df.select('millis').toPandas()
Expand Down Expand Up @@ -98,7 +98,7 @@ def convert_to_datetime(jd: pd.Series, format=None) -> pd.Series:
Examples
--------
>>> from fink_broker.spark_utils import load_parquet_files
>>> from fink_broker.common.spark_utils import load_parquet_files
>>> df = load_parquet_files("online/raw/20200101")
>>> df = df.withColumn('datetime', convert_to_datetime(df['candidate.jd']))
>>> pdf = df.select('datetime').toPandas()
Expand Down Expand Up @@ -129,7 +129,7 @@ def compute_num_part(df, partition_size=128.0):
Examples
--------
>>> from fink_broker.spark_utils import load_parquet_files
>>> from fink_broker.common.spark_utils import load_parquet_files
>>> df = load_parquet_files("online/raw/20200101")
>>> numpart = compute_num_part(df, partition_size=128.)
>>> print(numpart)
Expand Down
2 changes: 1 addition & 1 deletion fink_broker/ztf/hbase_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from fink_science.t2.utilities import T2_COLS
from fink_science.xmatch.utils import MANGROVE_COLS

from fink_broker.tester import spark_unit_tests
from fink_broker.common.tester import spark_unit_tests

_LOG = logging.getLogger(__name__)

Expand Down
24 changes: 12 additions & 12 deletions fink_broker/ztf/mm_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
from astropy.time import Time
from datetime import timedelta

from fink_broker.spark_utils import connect_to_raw_database, path_exist
from fink_broker.logging_utils import init_logger
from fink_broker.common.spark_utils import connect_to_raw_database, path_exist
from fink_broker.common.logging_utils import init_logger

from pyspark.sql.streaming import StreamingQuery

Expand Down Expand Up @@ -56,7 +56,7 @@ def distribute_launch_fink_mm(spark, args: dict) -> Tuple[int, List]:
"""
if args.mmconfigpath != "no-config":
from fink_mm.init import get_config
from fink_broker.mm_utils import mm2distribute
from fink_broker.ztf.mm_utils import mm2distribute

_LOG.info(f"Fink-MM configuration file: {args.mmconfigpath}")
config = get_config({"--config": args.mmconfigpath})
Expand All @@ -71,14 +71,14 @@ def distribute_launch_fink_mm(spark, args: dict) -> Tuple[int, List]:
# if there is gcn and ztf data
if path_exist(mmtmpdatapath):
t_before = time.time()
_LOG.info("starting mm2distribute in 300 seconds...")
time.sleep(300)
_LOG.info("starting mm2distribute in 30 seconds...")
time.sleep(30)
stream_distrib_list = mm2distribute(spark, config, args)
time_spent_in_wait += time.time() - t_before
break

time_spent_in_wait += 300
time.sleep(300)
time_spent_in_wait += 30
time.sleep(30)
if stream_distrib_list == []:
_LOG.warning(
f"{mmtmpdatapath} does not exist. mm2distribute could not start before the end of the job."
Expand Down Expand Up @@ -114,7 +114,7 @@ def raw2science_launch_fink_mm(
"""
if args.mmconfigpath != "no-config":
from fink_mm.init import get_config
from fink_broker.mm_utils import science2mm
from fink_broker.ztf.mm_utils import science2mm

_LOG.info(f"Fink-MM configuration file: {args.mmconfigpath}")
config = get_config({"--config": args.mmconfigpath})
Expand All @@ -131,15 +131,15 @@ def raw2science_launch_fink_mm(
if path_exist(gcn_path) and path_exist(scitmpdatapath):
# Start the GCN x ZTF cross-match stream
t_before = time.time()
_LOG.info("starting science2mm in 300 seconds...")
time.sleep(300)
_LOG.info("starting science2mm in 30 seconds...")
time.sleep(30)
countquery_mm = science2mm(args, config, gcn_path, scitmpdatapath)
time_spent_in_wait += time.time() - t_before
break
else:
# wait for comming GCN
time_spent_in_wait += 300
time.sleep(300)
time_spent_in_wait += 30
time.sleep(30)

if countquery_mm is None:
_LOG.warning("science2mm could not start before the end of the job.")
Expand Down
14 changes: 7 additions & 7 deletions fink_broker/ztf/science.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

from fink_utils.spark.utils import concat_col

from fink_broker.tester import spark_unit_tests
from fink_broker.common.tester import spark_unit_tests

# Import of science modules
from fink_science.random_forest_snia.processor import rfscore_sigmoid_full
Expand Down Expand Up @@ -89,7 +89,7 @@ def ang2pix(ra: pd.Series, dec: pd.Series, nside: pd.Series) -> pd.Series:
Examples
--------
>>> from fink_broker.spark_utils import load_parquet_files
>>> from fink_broker.common.spark_utils import load_parquet_files
>>> df = load_parquet_files(ztf_alert_sample)
>>> df_index = df.withColumn(
Expand Down Expand Up @@ -128,7 +128,7 @@ def ang2pix_array(ra: pd.Series, dec: pd.Series, nside: pd.Series) -> pd.Series:
Examples
--------
>>> from fink_broker.spark_utils import load_parquet_files
>>> from fink_broker.common.spark_utils import load_parquet_files
>>> df = load_parquet_files(ztf_alert_sample)
>>> nsides = F.array([F.lit(256), F.lit(4096), F.lit(131072)])
Expand Down Expand Up @@ -202,8 +202,8 @@ def apply_science_modules(df: DataFrame, tns_raw_output: str = "") -> DataFrame:
Examples
--------
>>> from fink_broker.spark_utils import load_parquet_files
>>> from fink_broker.logging_utils import get_fink_logger
>>> from fink_broker.common.spark_utils import load_parquet_files
>>> from fink_broker.common.logging_utils import get_fink_logger
>>> logger = get_fink_logger('raw2cience_test', 'INFO')
>>> _LOG = logging.getLogger(__name__)
>>> df = load_parquet_files(ztf_alert_sample)
Expand Down Expand Up @@ -478,8 +478,8 @@ def apply_science_modules_elasticc(df: DataFrame) -> DataFrame:
Examples
--------
>>> from fink_broker.spark_utils import load_parquet_files
>>> from fink_broker.logging_utils import get_fink_logger
>>> from fink_broker.common.spark_utils import load_parquet_files
>>> from fink_broker.common.logging_utils import get_fink_logger
>>> logger = get_fink_logger('raw2cience_elasticc_test', 'INFO')
>>> _LOG = logging.getLogger(__name__)
>>> df = load_parquet_files(elasticc_alert_sample)
Expand Down
4 changes: 2 additions & 2 deletions fink_broker/ztf/test_schema_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

from pyspark.sql import DataFrame

from fink_broker.avro_utils import readschemafromavrofile
from fink_broker.common.avro_utils import readschemafromavrofile
from fink_utils.spark.schema_converter import to_avro
from fink_broker.spark_utils import connect_to_raw_database, init_sparksession
from fink_broker.common.spark_utils import connect_to_raw_database, init_sparksession

_LOG = logging.getLogger(__name__)

Expand Down
2 changes: 1 addition & 1 deletion fink_broker/ztf/tracklet_identification.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from astropy.coordinates import SkyCoord
from astropy.time import Time

from fink_broker.tester import spark_unit_tests
from fink_broker.common.tester import spark_unit_tests


def apply_tracklet_cuts(df: DataFrame) -> DataFrame:
Expand Down
Loading

0 comments on commit 3f4e0cb

Please sign in to comment.