Skip to content

Commit

Permalink
Update README
Browse files Browse the repository at this point in the history
  • Loading branch information
JulienPeloton committed Jan 29, 2025
1 parent b45df8c commit 4aea38f
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 48 deletions.
22 changes: 14 additions & 8 deletions bin/fink_db
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ while [ "$#" -gt 0 ]; do
shift 2
;;
-c)
if [[ $2 == "" || $2 == "-s" ]]; then
echo "$1 requires an argument. ${message_conf}" >&2
exit 1
fi
conf="$2"
shift 2
;;
Expand All @@ -57,6 +53,10 @@ while [ "$#" -gt 0 ]; do
CLEAN_NIGHT=true
shift 2
;;
--clean_merge)
CLEAN_MERGE=true
shift 2
;;
-night)
NIGHT="$2"
shift 2
Expand Down Expand Up @@ -97,10 +97,9 @@ echo "Processing night ${NIGHT} for survey ${SURVEY}"
# Check if the conf file exists
if [[ -f $conf ]]; then
echo "Reading custom Fink configuration file from " $conf
source $conf
else
echo "Reading default Fink conf from " ${FINK_HOME}/conf/${SURVEY}/fink.conf.prod
source ${FINK_HOME}/conf/${SURVEY}/fink.conf.prod
conf=${FINK_HOME}/conf/${SURVEY}/fink.conf.prod
fi

# Merge streaming data
Expand Down Expand Up @@ -188,11 +187,18 @@ else
echo "Pushing data to tables aborted..."
fi

if [[ $CLEAN_MERGE == true ]]; then
echo "Cleaning merged files..."
# If merge went wrong -- clean
hdfs dfs -rm -r /user/julien.peloton/archive/raw/year=${YEAR}/month=${MONTH}/day=${DAY}
hdfs dfs -rm -r /user/julien.peloton/archive/science/year=${YEAR}/month=${MONTH}/day=${DAY}
fi

if [[ ${CLEAN_NIGHT} == true ]]; then
# Check if data is on the archive
# If yes, delete temp ones
# Delete temporary files after the night
$(hdfs dfs -test -d /user/julien.peloton/archive/science/year=${YEAR}/month=${MONTH}/day=${DAY})
if [[ $? == 0 ]]; then
echo "Cleaning temporary files..."
# Remove data path
hdfs dfs -rm -r /user/julien.peloton/online/raw/${NIGHT}
hdfs dfs -rm -r /user/julien.peloton/online/science/${NIGHT}
Expand Down
18 changes: 9 additions & 9 deletions scheduler/README.md
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
# Fink ZTF scheduler

Operations for the night N start at 21.45pm UTC on the night N-1. There are 2 cronjobs scheduled:
Operations for the night N start at 00:01 Paris on the night N. There are 2 cronjobs scheduled:

```bash
# Paris time @ VD

# Fink real-time
45 23 * * * /home/julien.peloton/fink-broker/scheduler/launch_fink.sh
# Fink real-time, finishes at 20:00
01 00 * * * /home/julien.peloton/fink-broker/scheduler/launch_fink.sh

# Database service
35 20 * * * /home/julien.peloton/fink-broker/scheduler/database_service.sh
30 21 * * * /home/julien.peloton/fink-broker/scheduler/database_auxilliary.sh
05 20 * * * /home/julien.peloton/fink-broker/scheduler/ztf/launch_db.sh

# SSOFT - once a month
0 0 1 * * /home/julien.peloton/fink-broker/scheduler/launch_ssoft.sh
0 12 1 * * /home/julien.peloton/fink-broker/scheduler/launch_sso_resolver.sh

# Operation reports three times a day
# Operation reports four times a day
0 07 * * * /home/julien.peloton/fink-broker/scheduler/check_status.sh --telegram
0 12 * * * /home/julien.peloton/fink-broker/scheduler/check_status.sh --telegram
0 17 * * * /home/julien.peloton/fink-broker/scheduler/check_status.sh --telegram
0 23 * * * /home/julien.peloton/fink-broker/scheduler/check_status.sh --telegram
0 22 * * * /home/julien.peloton/fink-broker/scheduler/check_status.sh --telegram
```

```bash
Expand All @@ -30,11 +30,11 @@ Operations for the night N start at 21.45pm UTC on the night N-1. There are 2 cr

The first script is for live operations:

- `stream2raw`: Wrapped in `launch_fink.sh`. It listens to the ZTF stream, collect alerts, and store them on disks. Starts at 9pm UTC on night N-1.
- `stream2raw`: Wrapped in `launch_fink.sh`. It listens to the ZTF stream, collect alerts, and store them on disks. Starts at 00:01 Paris on night N.
- `raw2science`: Wrapped in `science_service.sh`. It loads collected alerts, processes them, and stores them on disks. It starts automatically when `stream2raw` collects alerts for the first time.
- `distribution`: Wrapped in `distribution_service.sh`. It loads processed alerts, splits into substreams (Kafka topics), and sends substreams to the Kafka cluster which will redirect alerts to users. It starts automatically when `raw2science` collects alerts for the first time.

All jobs stop at 8pm UTC on the night N. In addition there is one other cron job at the end of the night for database management (`database_service.sh`) that starts at 8.05pm UTC (just after streaming jobs end).
All jobs stop at 8pm UTC on the night N. In addition there is one other cron job at the end of the night for database management that starts at 8.05pm UTC (just after streaming jobs end).

![image](schedule_example.png)
_Screenshot from the ZTF Grafana dashboard for the nights 20210116 and 20210117._
Expand Down
8 changes: 8 additions & 0 deletions scheduler/ztf/launch_db.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/bash

source ~/.bash_profile

fink_db -s ztf --merge
fink_db -s ztf --main_table
fink_db -s ztf --index_tables
fink_db -s ztf --clean_night
31 changes: 0 additions & 31 deletions scheduler/ztf/launch_fink.sh

This file was deleted.

163 changes: 163 additions & 0 deletions scheduler/ztf/launch_stream.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
#!/bin/bash
# Copyright 2019-2025 AstroLab Software
# Author: Julien Peloton
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
source ~/.bash_profile

# Grab the command line arguments
while [ "$#" -gt 0 ]; do
case "$1" in
-h)
HELP_ON_SERVICE="-h"
shift 1
;;
-c)
conf="$2"
shift 2
;;
--poll_only)
POLL_ONLY=true
shift 2
;;
--enrich_only)
ENRICH_ONLY=true
shift 2
;;
--distribute_only)
DISTRIBUTE_ONLY=true
shift 2
;;
-night)
NIGHT="$2"
shift 2
;;
-stop_at)
STOP_AT="$2"
shift 2
;;
-*)
echo "unknown option: $1" >&2
exit 1
;;
*)
echo "unknown argument: $1" >&2
exit 1
;;
esac
done

if [[ ! $NIGHT ]]; then
# Current night
NIGHT=`date +"%Y%m%d"`
fi
echo "Processing night ${NIGHT}"

# Check if the conf file exists
if [[ -f $conf ]]; then
echo "Reading custom Fink configuration file from " $conf
else
echo "Reading default Fink conf from " ${FINK_HOME}/conf/${SURVEY}/fink.conf.prod
conf=${FINK_HOME}/conf/${SURVEY}/fink.conf.prod
fi

if [[ ! ${STOP_AT} ]]; then
STOP_AT='20:00 today'
fi

# stream2raw
if [[ ! ${ENRICH_ONLY} ]] && [[ ! ${DISTRIBUTE_ONLY} ]]; then
echo "Launching stream2raw"

LEASETIME=$(( `date +'%s' -d ${STOP_AT}` - `date +'%s' -d 'now'` ))

nohup ${FINK_HOME}/bin/fink start stream2raw \
-s ztf \
-c ${FINK_HOME}/conf/ztf/fink.conf.prod \
-topic "ztf_${NIGHT}.*" \
-night $NIGHT \
-driver-memory 4g -executor-memory 2g \
-spark-cores-max 4 -spark-executor-cores 1 \
-exit_after ${LEASETIME} > ${FINK_HOME}/broker_logs/stream2raw_${NIGHT}.log 2>&1 &
fi

# raw2science
if [[ ! ${POLL_ONLY} ]] && [[ ! ${DISTRIBUTE_ONLY} ]]; then
echo "Launching raw2science"
while : ; do
# check folder exist
$(hdfs dfs -test -d /user/julien.peloton/online/raw/${NIGHT})
if [[ $? == 0 ]]; then
# check folder is not empty
isEmpty=$(hdfs dfs -count /user/julien.peloton/online/raw/${NIGHT} | awk '{print $2}')
if [[ $isEmpty > 0 ]]; then
echo "Data detected."
echo "Waiting 60 seconds for one batch to complete before launching..."
sleep 60
echo "Launching service"

# LEASETIME must be computed by taking the
# difference between now and max end (8pm CE(S)T)
LEASETIME=$(( `date +'%s' -d ${STOP_AT}` - `date +'%s' -d 'now'` ))

nohup ${FINK_HOME}/bin/fink start raw2science \
-s ztf \
-c ${FINK_HOME}/conf/ztf/fink.conf.prod \
-driver-memory 4g -executor-memory 2g \
-spark-cores-max 8 -spark-executor-cores 1 \
-night ${NIGHT} \
-exit_after ${LEASETIME} > ${FINK_HOME}/broker_logs/raw2science_${NIGHT}.log 2>&1 &
break
fi
fi
DDATE=`date`
echo "${DDATE}: no data yet. Sleeping 300 seconds..."
sleep 300
done
fi

# distribution
if [[ ! ${POLL_ONLY} ]] && [[ ! ${ENRICH_ONLY} ]]; then
echo "Launching distribution"
while true; do
# check folder exist
$(hdfs dfs -test -d /user/julien.peloton/online/science/${NIGHT})
if [[ $? == 0 ]]; then
# check folder is not empty
isEmpty=$(hdfs dfs -count /user/julien.peloton/online/science/${NIGHT} | awk '{print $2}')
if [[ $isEmpty > 0 ]]; then
echo "Data detected."
echo "Waiting 60 seconds for one batch to complete before launching..."
sleep 60
echo "Launching service..."
# LEASETIME must be computed by taking the
# difference between now and max end (8pm CEST)
LEASETIME=$(( `date +'%s' -d ${STOP_AT}` - `date +'%s' -d 'now'` ))

nohup ${FINK_HOME}/bin/fink start distribution \
-s ztf \
-c ${FINK_HOME}/conf/fink.conf.prod \
-conf_distribution ${FINK_HOME}/conf/ztf/fink.conf.distribution_cluster \
-night ${NIGHT} \
-driver-memory 4g -executor-memory 2g \
-spark-cores-max 4 -spark-executor-cores 1 \
-exit_after ${LEASETIME} > ${FINK_HOME}/broker_logs/distribute_${NIGHT}.log 2>&1 &
break
fi
fi
DDATE=`date`
echo "${DDATE}: no data yet. Sleeping 300 seconds..."
sleep 300
done
fi

0 comments on commit 4aea38f

Please sign in to comment.