Skip to content

Commit

Permalink
Cache only required fields when computing statistics (#792)
Browse files Browse the repository at this point in the history
* Cache only required fields when computing statistics

* Update conf file with dependencies

* Use filter for SIMBAD statistics (#797)

* Use filter for SIMBAD statistics

* Use fink-utils definition of list of extragalactic sources. Closes #630

* Fix missing import

* Cast numpy array into list for Spark operation
  • Loading branch information
JulienPeloton authored Jan 30, 2024
1 parent 5990e34 commit edffb06
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 54 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ jobs:
cd $USRLIBS
source scripts/start_services.sh --kafka-version ${KAFKA_VERSION} --hbase-version ${HBASE_VERSION}
cd $FINK_HOME
fink_test -c conf/fink.conf.dev --unit-tests
fink_test -c conf/fink.conf.dev --unit-tests --db-integration
- name: Run test suites [prod]
if: matrix.container == 'julienpeloton/fink-ci:prod'
run: |
Expand Down
91 changes: 42 additions & 49 deletions bin/daily_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@
import numpy as np
import pandas as pd

import pyspark.sql.functions as F

from fink_broker.sparkUtils import init_sparksession
from fink_broker.hbaseUtils import push_to_hbase
from fink_broker.parser import getargs
from fink_broker.loggingUtils import get_fink_logger, inspect_application

from fink_filters.classification import extract_fink_classification
from fink_filters.filter_simbad_candidates.filter import simbad_candidates

from fink_utils.xmatch.simbad import return_list_of_eg_host


def main():
Expand Down Expand Up @@ -58,50 +63,7 @@ def main():
df_raw = spark.read.format('parquet').load(input_raw)
df_sci = spark.read.format('parquet').load(input_science)

df_sci = df_sci.cache()

# Number of alerts
n_raw_alert = df_raw.count()
n_sci_alert = df_sci.count()

out_dic = {}
out_dic['raw'] = n_raw_alert
out_dic['sci'] = n_sci_alert

# matches with SIMBAD
n_simbad = df_sci.select('cdsxmatch')\
.filter(df_sci['cdsxmatch'] != 'Unknown')\
.count()

out_dic['simbad_tot'] = n_simbad

# Alerts with a close-by candidate host-galaxy
list_simbad_galaxies = [
"galaxy",
"Galaxy",
"EmG",
"Seyfert",
"Seyfert_1",
"Seyfert_2",
"BlueCompG",
"StarburstG",
"LSB_G",
"HII_G",
"High_z_G",
"GinPair",
"GinGroup",
"BClG",
"GinCl",
"PartofG",
]

n_simbad_gal = df_sci.select('cdsxmatch')\
.filter(df_sci['cdsxmatch'].isin(list_simbad_galaxies))\
.count()

out_dic['simbad_gal'] = n_simbad_gal

df_class = df_sci.withColumn(
df_sci = df_sci.withColumn(
'class',
extract_fink_classification(
df_sci['cdsxmatch'],
Expand All @@ -120,26 +82,57 @@ def main():
)
)

out_class = df_class.groupBy('class').count().collect()
cols = [
'cdsxmatch',
'candidate.field',
'candidate.fid',
'candidate.jd',
'class'
]
df_sci = df_sci.select(cols).cache()

# Number of alerts
n_raw_alert = df_raw.count()
n_sci_alert = df_sci.count()

out_dic = {}
out_dic['raw'] = n_raw_alert
out_dic['sci'] = n_sci_alert

# matches with SIMBAD
n_simbad = df_sci.withColumn(
"is_simbad",
simbad_candidates("cdsxmatch")
).filter(F.col("is_simbad")).count()

out_dic['simbad_tot'] = n_simbad

n_simbad_gal = df_sci.select('cdsxmatch')\
.filter(df_sci['cdsxmatch'].isin(list(return_list_of_eg_host())))\
.count()

out_dic['simbad_gal'] = n_simbad_gal

out_class = df_sci.groupBy('class').count().collect()
out_class_ = [o.asDict() for o in out_class]
out_class_ = [list(o.values()) for o in out_class_]
for kv in out_class_:
out_dic[kv[0]] = kv[1]

# Number of fields
n_field = df_sci.select('candidate.field').distinct().count()
n_field = df_sci.select('field').distinct().count()

out_dic['fields'] = n_field

# number of measurements per band
n_g = df_sci.select('candidate.fid').filter('fid == 1').count()
n_r = df_sci.select('candidate.fid').filter('fid == 2').count()
n_g = df_sci.select('fid').filter('fid == 1').count()
n_r = df_sci.select('fid').filter('fid == 2').count()

out_dic['n_g'] = n_g
out_dic['n_r'] = n_r

# Number of exposures
n_exp = df_sci.select('candidate.jd').distinct().count()
n_exp = df_sci.select('jd').distinct().count()

out_dic['exposures'] = n_exp

Expand Down
11 changes: 7 additions & 4 deletions conf/fink.conf.dev
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2018-2022 AstroLab Software
# Copyright 2018-2024 AstroLab Software
# Author: Julien Peloton
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -46,11 +46,14 @@ FINK_PACKAGES=\
org.apache.spark:spark-streaming-kafka-0-10-assembly_2.12:3.4.1,\
org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1,\
org.apache.spark:spark-avro_2.12:3.4.1,\
org.apache.hbase:hbase-shaded-mapreduce:2.4.9
org.apache.hbase:hbase-shaded-mapreduce:2.2.7,\
org.slf4j:slf4j-log4j12:1.7.36,\
org.slf4j:slf4j-simple:1.7.36

# Other dependencies (incl. Scala part of Fink)
FINK_JARS=${FINK_HOME}/libs/hbase-spark-hbase2.4.8_spark3.4.1_scala2.12.0_hadoop3.3.6.jar,\
${FINK_HOME}/libs/hbase-spark-protocol-shaded-hbase2.4.8_spark3.4.1_scala2.12.0_hadoop3.3.6.jar
FINK_JARS=${FINK_HOME}/libs/fink-broker_2.11-1.2.jar,\
${FINK_HOME}/libs/hbase-spark-hbase2.3.0_spark3.4.1_scala2.12.0_hadoop3.3.6.jar,\
${FINK_HOME}/libs/hbase-spark-protocol-shaded-hbase2.3.0_spark3.4.1_scala2.12.0_hadoop3.3.6.jar

# Time interval between 2 trigger updates (second)
# i.e. the timing of streaming data processing.
Expand Down

0 comments on commit edffb06

Please sign in to comment.