Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add to HBase the fields generated by the mag_rate science module #791

Merged
merged 4 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ jobs:
cd $USRLIBS
source scripts/start_services.sh --kafka-version ${KAFKA_VERSION} --hbase-version ${HBASE_VERSION}
cd $FINK_HOME
fink_test -c conf/fink.conf.dev --unit-tests
fink_test -c conf/fink.conf.dev --unit-tests --db-integration
- name: Run test suites [prod]
if: matrix.container == 'julienpeloton/fink-ci:prod'
run: |
Expand Down
9 changes: 6 additions & 3 deletions conf/fink.conf.dev
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,14 @@ FINK_PACKAGES=\
org.apache.spark:spark-streaming-kafka-0-10-assembly_2.12:3.4.1,\
org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1,\
org.apache.spark:spark-avro_2.12:3.4.1,\
org.apache.hbase:hbase-shaded-mapreduce:2.4.9
org.apache.hbase:hbase-shaded-mapreduce:2.2.7,\
org.slf4j:slf4j-log4j12:1.7.36,\
org.slf4j:slf4j-simple:1.7.36

# Other dependencies (incl. Scala part of Fink)
FINK_JARS=${FINK_HOME}/libs/hbase-spark-hbase2.4.8_spark3.4.1_scala2.12.0_hadoop3.3.6.jar,\
${FINK_HOME}/libs/hbase-spark-protocol-shaded-hbase2.4.8_spark3.4.1_scala2.12.0_hadoop3.3.6.jar
FINK_JARS=${FINK_HOME}/libs/fink-broker_2.11-1.2.jar,\
${FINK_HOME}/libs/hbase-spark-hbase2.3.0_spark3.4.1_scala2.12.0_hadoop3.3.6.jar,\
${FINK_HOME}/libs/hbase-spark-protocol-shaded-hbase2.3.0_spark3.4.1_scala2.12.0_hadoop3.3.6.jar

# Time interval between 2 trigger updates (second)
# i.e. the timing of streaming data processing.
Expand Down
58 changes: 21 additions & 37 deletions fink_broker/hbaseUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def load_fink_cols():
--------
>>> fink_cols, fink_nested_cols = load_fink_cols()
>>> print(len(fink_cols))
19
27

>>> print(len(fink_nested_cols))
18
Expand All @@ -69,6 +69,14 @@ def load_fink_cols():
'x4lac': {'type': 'string', 'default': 'Unknown'},
'lc_features_g': {'type': 'string', 'default': '[]'},
'lc_features_r': {'type': 'string', 'default': '[]'},
'jd_first_real_det': {'type': 'double', 'default': 0.0},
'jdstarthist_dt': {'type': 'double', 'default': 0.0},
'mag_rate': {'type': 'double', 'default': 0.0},
'sigma_rate': {'type': 'double', 'default': 0.0},
'lower_rate': {'type': 'double', 'default': 0.0},
'upper_rate': {'type': 'double', 'default': 0.0},
'delta_time': {'type': 'double', 'default': 0.0},
'from_upper': {'type': 'boolean', 'default': False},
}

fink_nested_cols = {}
Expand Down Expand Up @@ -99,7 +107,7 @@ def load_all_cols():
>>> root_level, candidates, images, fink_cols, fink_nested_cols = load_all_cols()
>>> out = {**root_level, **candidates, **images, **fink_cols, **fink_nested_cols}
>>> print(len(out))
148
156
"""
fink_cols, fink_nested_cols = load_fink_cols()

Expand Down Expand Up @@ -310,52 +318,28 @@ def load_ztf_index_cols():
--------
>>> out = load_ztf_index_cols()
>>> print(len(out))
72
81
"""
# From `root` or `candidates.`
common = [
'objectId', 'candid', 'publisher', 'rcid', 'chipsf', 'distnr',
'ra', 'dec', 'jd', 'fid', 'nid', 'field', 'xpos', 'ypos', 'rb',
'ssdistnr', 'ssmagnr', 'ssnamenr', 'jdstarthist', 'jdendhist', 'tooflag',
'sgscore1', 'distpsnr1', 'neargaia', 'maggaia', 'nmtchps', 'diffmaglim',
'magpsf', 'sigmapsf', 'magnr', 'sigmagnr', 'magzpsci', 'isdiffpos',
'cdsxmatch',
'roid',
'mulens',
'DR3Name',
'Plx',
'e_Plx',
'gcvs',
'vsx',
'snn_snia_vs_nonia', 'snn_sn_vs_all', 'rf_snia_vs_nonia',
'classtar', 'drb', 'ndethist', 'rf_kn_vs_nonkn', 'tracklet',
'anomaly_score', 'x4lac', 'x3hsp', 'lc_features_g', 'lc_features_r'
'classtar', 'drb', 'ndethist'
]

mangrove = [
'mangrove_2MASS_name',
'mangrove_HyperLEDA_name',
'mangrove_ang_dist',
'mangrove_lum_dist'
]
# Add Fink added values
fink_cols, fink_nested_cols = load_fink_cols()

t2 = [
't2_AGN',
't2_EB',
't2_KN',
't2_M-dwarf',
't2_Mira',
't2_RRL',
't2_SLSN-I',
't2_SNII',
't2_SNIa',
't2_SNIa-91bg',
't2_SNIax',
't2_SNIbc',
't2_TDE',
't2_mu-Lens-Single',
]
fink_cols_names = list(fink_cols.keys())
common += fink_cols_names

fink_nested_cols_names = [i.replace('.', '_') for i in fink_nested_cols.keys()]
common += fink_nested_cols_names

return common + mangrove + t2
return common

def load_ztf_crossmatch_cols():
""" Load columns used for the crossmatch table (casted).
Expand Down
Loading