diff --git a/bin/fink_db b/bin/fink_db index 81b677f5..367ebdfd 100755 --- a/bin/fink_db +++ b/bin/fink_db @@ -34,10 +34,6 @@ while [ "$#" -gt 0 ]; do shift 2 ;; -c) - if [[ $2 == "" || $2 == "-s" ]]; then - echo "$1 requires an argument. ${message_conf}" >&2 - exit 1 - fi conf="$2" shift 2 ;; @@ -57,6 +53,10 @@ while [ "$#" -gt 0 ]; do CLEAN_NIGHT=true shift 2 ;; + --clean_merge) + CLEAN_MERGE=true + shift 2 + ;; -night) NIGHT="$2" shift 2 @@ -97,10 +97,9 @@ echo "Processing night ${NIGHT} for survey ${SURVEY}" # Check if the conf file exists if [[ -f $conf ]]; then echo "Reading custom Fink configuration file from " $conf - source $conf else echo "Reading default Fink conf from " ${FINK_HOME}/conf/${SURVEY}/fink.conf.prod - source ${FINK_HOME}/conf/${SURVEY}/fink.conf.prod + conf=${FINK_HOME}/conf/${SURVEY}/fink.conf.prod fi # Merge streaming data @@ -188,11 +187,18 @@ else echo "Pushing data to tables aborted..." fi +if [[ $CLEAN_MERGE == true ]]; then + echo "Cleaning merged files..." + # If merge went wrong -- clean + hdfs dfs -rm -r /user/julien.peloton/archive/raw/year=${YEAR}/month=${MONTH}/day=${DAY} + hdfs dfs -rm -r /user/julien.peloton/archive/science/year=${YEAR}/month=${MONTH}/day=${DAY} +fi + if [[ ${CLEAN_NIGHT} == true ]]; then - # Check if data is on the archive - # If yes, delete temp ones + # Delete temporary files after the night $(hdfs dfs -test -d /user/julien.peloton/archive/science/year=${YEAR}/month=${MONTH}/day=${DAY}) if [[ $? == 0 ]]; then + echo "Cleaning temporary files..." # Remove data path hdfs dfs -rm -r /user/julien.peloton/online/raw/${NIGHT} hdfs dfs -rm -r /user/julien.peloton/online/science/${NIGHT} diff --git a/scheduler/README.md b/scheduler/README.md index a84e8bd5..b08384df 100644 --- a/scheduler/README.md +++ b/scheduler/README.md @@ -1,25 +1,25 @@ # Fink ZTF scheduler -Operations for the night N start at 21.45pm UTC on the night N-1. There are 2 cronjobs scheduled: +Operations for the night N start at 00:01 Paris on the night N. There are 2 cronjobs scheduled: ```bash # Paris time @ VD -# Fink real-time -45 23 * * * /home/julien.peloton/fink-broker/scheduler/launch_fink.sh +# Fink real-time, finishes at 20:00 +01 00 * * * /home/julien.peloton/fink-broker/scheduler/launch_fink.sh # Database service -35 20 * * * /home/julien.peloton/fink-broker/scheduler/database_service.sh -30 21 * * * /home/julien.peloton/fink-broker/scheduler/database_auxilliary.sh +05 20 * * * /home/julien.peloton/fink-broker/scheduler/ztf/launch_db.sh # SSOFT - once a month 0 0 1 * * /home/julien.peloton/fink-broker/scheduler/launch_ssoft.sh 0 12 1 * * /home/julien.peloton/fink-broker/scheduler/launch_sso_resolver.sh -# Operation reports three times a day +# Operation reports four times a day +0 07 * * * /home/julien.peloton/fink-broker/scheduler/check_status.sh --telegram 0 12 * * * /home/julien.peloton/fink-broker/scheduler/check_status.sh --telegram 0 17 * * * /home/julien.peloton/fink-broker/scheduler/check_status.sh --telegram -0 23 * * * /home/julien.peloton/fink-broker/scheduler/check_status.sh --telegram +0 22 * * * /home/julien.peloton/fink-broker/scheduler/check_status.sh --telegram ``` ```bash @@ -30,11 +30,11 @@ Operations for the night N start at 21.45pm UTC on the night N-1. There are 2 cr The first script is for live operations: -- `stream2raw`: Wrapped in `launch_fink.sh`. It listens to the ZTF stream, collect alerts, and store them on disks. Starts at 9pm UTC on night N-1. +- `stream2raw`: Wrapped in `launch_fink.sh`. It listens to the ZTF stream, collect alerts, and store them on disks. Starts at 00:01 Paris on night N. - `raw2science`: Wrapped in `science_service.sh`. It loads collected alerts, processes them, and stores them on disks. It starts automatically when `stream2raw` collects alerts for the first time. - `distribution`: Wrapped in `distribution_service.sh`. It loads processed alerts, splits into substreams (Kafka topics), and sends substreams to the Kafka cluster which will redirect alerts to users. It starts automatically when `raw2science` collects alerts for the first time. -All jobs stop at 8pm UTC on the night N. In addition there is one other cron job at the end of the night for database management (`database_service.sh`) that starts at 8.05pm UTC (just after streaming jobs end). +All jobs stop at 8pm UTC on the night N. In addition there is one other cron job at the end of the night for database management that starts at 8.05pm UTC (just after streaming jobs end). ![image](schedule_example.png) _Screenshot from the ZTF Grafana dashboard for the nights 20210116 and 20210117._ diff --git a/scheduler/ztf/launch_db.sh b/scheduler/ztf/launch_db.sh new file mode 100755 index 00000000..6d082531 --- /dev/null +++ b/scheduler/ztf/launch_db.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +source ~/.bash_profile + +fink_db -s ztf --merge +fink_db -s ztf --main_table +fink_db -s ztf --index_tables +fink_db -s ztf --clean_night diff --git a/scheduler/ztf/launch_fink.sh b/scheduler/ztf/launch_fink.sh deleted file mode 100755 index 7680ea8c..00000000 --- a/scheduler/ztf/launch_fink.sh +++ /dev/null @@ -1,31 +0,0 @@ -#!/bin/bash - -# user:julien.peloton -# Launch the broker at 11.45pm CE(S)T (9.45pm UTC) on day D -# to collect and process the night D+1. -# Usually, ZTF observations start around 3am CE(S)T on D+1 (2am UTC). -# Broker will be up from 11.45pm day D to 8pm day D+1 CE(S)T. -# Then database operations take place between 8pm and 9pm CE(S)T, and another night starts. - -source ~/.bash_profile - -NIGHT=`date +"%Y%m%d" -d "now + 1 days"` - -# 20 hours lease -LEASETIME=72000 - -# stream2raw -nohup fink start stream2raw \ - -s ztf \ - -c ${FINK_HOME}/conf/ztf/fink.conf.prod \ - -topic "ztf_${NIGHT}.*" \ - -night $NIGHT \ - -driver-memory 4g -executor-memory 2g \ - -spark-cores-max 4 -spark-executor-cores 1 \ - -exit_after ${LEASETIME} > ${FINK_HOME}/broker_logs/stream2raw_${NIGHT}.log 2>&1 & - -# raw2science -nohup ${FINK_HOME}/scheduler/science_service.sh > ${FINK_HOME}/broker_logs/raw2science_${NIGHT}.log 2>&1 & - -# disrtribute -nohup ${FINK_HOME}/scheduler/distribution_service.sh > ${FINK_HOME}/broker_logs/distribute_${NIGHT}.log 2>&1 & diff --git a/scheduler/ztf/launch_stream.sh b/scheduler/ztf/launch_stream.sh new file mode 100755 index 00000000..699dba08 --- /dev/null +++ b/scheduler/ztf/launch_stream.sh @@ -0,0 +1,163 @@ +#!/bin/bash +# Copyright 2019-2025 AstroLab Software +# Author: Julien Peloton +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +source ~/.bash_profile + +# Grab the command line arguments +while [ "$#" -gt 0 ]; do + case "$1" in + -h) + HELP_ON_SERVICE="-h" + shift 1 + ;; + -c) + conf="$2" + shift 2 + ;; + --poll_only) + POLL_ONLY=true + shift 2 + ;; + --enrich_only) + ENRICH_ONLY=true + shift 2 + ;; + --distribute_only) + DISTRIBUTE_ONLY=true + shift 2 + ;; + -night) + NIGHT="$2" + shift 2 + ;; + -stop_at) + STOP_AT="$2" + shift 2 + ;; + -*) + echo "unknown option: $1" >&2 + exit 1 + ;; + *) + echo "unknown argument: $1" >&2 + exit 1 + ;; + esac +done + +if [[ ! $NIGHT ]]; then + # Current night + NIGHT=`date +"%Y%m%d"` +fi +echo "Processing night ${NIGHT}" + +# Check if the conf file exists +if [[ -f $conf ]]; then + echo "Reading custom Fink configuration file from " $conf +else + echo "Reading default Fink conf from " ${FINK_HOME}/conf/${SURVEY}/fink.conf.prod + conf=${FINK_HOME}/conf/${SURVEY}/fink.conf.prod +fi + +if [[ ! ${STOP_AT} ]]; then + STOP_AT='20:00 today' +fi + +# stream2raw +if [[ ! ${ENRICH_ONLY} ]] && [[ ! ${DISTRIBUTE_ONLY} ]]; then + echo "Launching stream2raw" + + LEASETIME=$(( `date +'%s' -d ${STOP_AT}` - `date +'%s' -d 'now'` )) + + nohup ${FINK_HOME}/bin/fink start stream2raw \ + -s ztf \ + -c ${FINK_HOME}/conf/ztf/fink.conf.prod \ + -topic "ztf_${NIGHT}.*" \ + -night $NIGHT \ + -driver-memory 4g -executor-memory 2g \ + -spark-cores-max 4 -spark-executor-cores 1 \ + -exit_after ${LEASETIME} > ${FINK_HOME}/broker_logs/stream2raw_${NIGHT}.log 2>&1 & +fi + +# raw2science +if [[ ! ${POLL_ONLY} ]] && [[ ! ${DISTRIBUTE_ONLY} ]]; then + echo "Launching raw2science" + while : ; do + # check folder exist + $(hdfs dfs -test -d /user/julien.peloton/online/raw/${NIGHT}) + if [[ $? == 0 ]]; then + # check folder is not empty + isEmpty=$(hdfs dfs -count /user/julien.peloton/online/raw/${NIGHT} | awk '{print $2}') + if [[ $isEmpty > 0 ]]; then + echo "Data detected." + echo "Waiting 60 seconds for one batch to complete before launching..." + sleep 60 + echo "Launching service" + + # LEASETIME must be computed by taking the + # difference between now and max end (8pm CE(S)T) + LEASETIME=$(( `date +'%s' -d ${STOP_AT}` - `date +'%s' -d 'now'` )) + + nohup ${FINK_HOME}/bin/fink start raw2science \ + -s ztf \ + -c ${FINK_HOME}/conf/ztf/fink.conf.prod \ + -driver-memory 4g -executor-memory 2g \ + -spark-cores-max 8 -spark-executor-cores 1 \ + -night ${NIGHT} \ + -exit_after ${LEASETIME} > ${FINK_HOME}/broker_logs/raw2science_${NIGHT}.log 2>&1 & + break + fi + fi + DDATE=`date` + echo "${DDATE}: no data yet. Sleeping 300 seconds..." + sleep 300 + done +fi + +# distribution +if [[ ! ${POLL_ONLY} ]] && [[ ! ${ENRICH_ONLY} ]]; then + echo "Launching distribution" + while true; do + # check folder exist + $(hdfs dfs -test -d /user/julien.peloton/online/science/${NIGHT}) + if [[ $? == 0 ]]; then + # check folder is not empty + isEmpty=$(hdfs dfs -count /user/julien.peloton/online/science/${NIGHT} | awk '{print $2}') + if [[ $isEmpty > 0 ]]; then + echo "Data detected." + echo "Waiting 60 seconds for one batch to complete before launching..." + sleep 60 + echo "Launching service..." + # LEASETIME must be computed by taking the + # difference between now and max end (8pm CEST) + LEASETIME=$(( `date +'%s' -d ${STOP_AT}` - `date +'%s' -d 'now'` )) + + nohup ${FINK_HOME}/bin/fink start distribution \ + -s ztf \ + -c ${FINK_HOME}/conf/fink.conf.prod \ + -conf_distribution ${FINK_HOME}/conf/ztf/fink.conf.distribution_cluster \ + -night ${NIGHT} \ + -driver-memory 4g -executor-memory 2g \ + -spark-cores-max 4 -spark-executor-cores 1 \ + -exit_after ${LEASETIME} > ${FINK_HOME}/broker_logs/distribute_${NIGHT}.log 2>&1 & + break + fi + fi + DDATE=`date` + echo "${DDATE}: no data yet. Sleeping 300 seconds..." + sleep 300 + done +fi +