Skip to content

Commit

Permalink
Elasticc round two! (#705)
Browse files Browse the repository at this point in the history
* Install fink-science from URL

* Update deps [skip-ci]

* Update args for active learning in SNN

* Remove T2 for elasticc

* Remove T2 from the elasticc distribution

* Test fink science v4 installation

* Update CATS syntax

* Add SLSN

* Add new args when extracting features (#704)

* Update taxonomy

* Well string literals can be duplicated in my ideal world

* Well string literals can be duplicated in my ideal world

* Remove hardcoded installation of fink-science

* Fake t2 for test purposes (t2 is not compatible with tensorflow 2.9)

* Add key in the payload when writing elasticc data

* Replace key by topic

* PEP8

* Bump to 3.0

* Update requirements for tensorflow (fink-science 4.2+)
  • Loading branch information
JulienPeloton authored Jun 19, 2023
1 parent 7ae58e5 commit 01e6d90
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 48 deletions.
1 change: 1 addition & 0 deletions .github/workflows/sonarqube.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ jobs:
-Dsonar.organization=astrolabsoftware
-Dsonar.projectKey=finkbroker
-Dsonar.sources=fink_broker/,bin/
-Dsonar.issue.ignore.multicriteria.j1.ruleKey=python:S107,python:S1192
-Dsonar.test.exclusions=fink_broker/htmlcov,fink_broker/slackUtils.py
-Dsonar.verbose=true
-Dsonar.coverage.exclusions=**/**
Expand Down
8 changes: 0 additions & 8 deletions bin/distribute_elasticc.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ def format_df_to_elasticc(df):
df['cats_fine_max_prob'].astype('float'),
df['rf_snia_vs_nonia'].astype('float'),
1.0 - df['rf_snia_vs_nonia'].astype('float'),
df['t2_broad_max_prob'].astype('float'),
)
).withColumn(
'classes',
Expand All @@ -96,7 +95,6 @@ def format_df_to_elasticc(df):
df['cats_fine_class'].astype('int'),
F.lit(111), # EarlySN
F.lit(0), # EarlySN Others
df['t2_broad_class'].astype('int')
)
).withColumn(
'classifications',
Expand Down Expand Up @@ -149,12 +147,6 @@ def format_df_to_elasticc(df):
F.col("classes").getItem(7),
F.col("scores").getItem(7)
),
F.struct(
F.lit('T2 classifier'),
F.lit('version 1.0'),
F.col("classes").getItem(8),
F.col("scores").getItem(8)
),
).cast(classifications_schema)
).drop("scores").drop("classes")

Expand Down
3 changes: 2 additions & 1 deletion bin/stream2raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ def main():
alert_schema_json = fastavro.schema.to_parsing_canonical_form(schema)
df_decoded = df.select(
[
from_avro(df["value"], alert_schema_json).alias("decoded")
from_avro(df["value"], alert_schema_json).alias("decoded"),
df["topic"]
]
)
elif args.producer == 'ztf':
Expand Down
2 changes: 1 addition & 1 deletion deps/requirements-science-no-deps.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ torch==1.12.0+cpu
george
imbalanced-learn==0.7.0
optuna==2.3.0
tensorflow==2.8.0
tensorflow==2.9.2
2 changes: 1 addition & 1 deletion deps/requirements-science.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ torch==1.12.0+cpu
george
imbalanced-learn==0.7.0
optuna==2.3.0
tensorflow==2.8.0
tensorflow==2.9.2
2 changes: 1 addition & 1 deletion fink_broker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__version__ = "2.9"
__version__ = "3.0"
86 changes: 51 additions & 35 deletions fink_broker/science.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StringType, LongType
from pyspark.sql.types import StringType, LongType, MapType, FloatType

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -44,7 +44,8 @@
from fink_science.snn.processor import snn_ia_elasticc, snn_broad_elasticc
from fink_science.cats.processor import predict_nn
from fink_science.agn.processor import agn_elasticc
from fink_science.t2.processor import t2
from fink_science.slsn.processor import slsn_elasticc
# from fink_science.t2.processor import t2

from fink_broker.tester import spark_unit_tests

Expand Down Expand Up @@ -144,6 +145,22 @@ def ang2pix_array(ra: pd.Series, dec: pd.Series, nside: pd.Series) -> pd.Series:

return pd.Series(to_return)

@pandas_udf(MapType(StringType(), FloatType()), PandasUDFType.SCALAR)
def fake_t2(incol):
""" Return all t2 probabilities as zero
Only for test purposes.
"""
keys = [
'M-dwarf', 'KN', 'AGN', 'SLSN-I',
'RRL', 'Mira', 'SNIax', 'TDE',
'SNIa', 'SNIbc', 'SNIa-91bg',
'mu-Lens-Single', 'EB', 'SNII'
]
values = [0.0] * len(keys)
out = {k: v for k, v in zip(keys, values)}
return pd.Series([out] * len(incol))

def apply_science_modules(df: DataFrame, logger: Logger) -> DataFrame:
"""Load and apply Fink science modules to enrich alert content
Expand Down Expand Up @@ -317,9 +334,10 @@ def apply_science_modules(df: DataFrame, logger: Logger) -> DataFrame:
df = df.withColumn('rf_kn_vs_nonkn', knscore(*knscore_args))

logger.info("New processor: T2")
t2_args = ['candid', 'cjd', 'cfid', 'cmagpsf', 'csigmapsf']
t2_args += [F.col('roid'), F.col('cdsxmatch'), F.col('candidate.jdstarthist')]
df = df.withColumn('t2', t2(*t2_args))
# t2_args = ['candid', 'cjd', 'cfid', 'cmagpsf', 'csigmapsf']
# t2_args += [F.col('roid'), F.col('cdsxmatch'), F.col('candidate.jdstarthist')]
# df = df.withColumn('t2', t2(*t2_args))
df = df.withColumn('t2', fake_t2('objectId'))

# Apply level one processor: snad (light curve features)
logger.info("New processor: ad_features")
Expand Down Expand Up @@ -406,9 +424,18 @@ def apply_science_modules_elasticc(df: DataFrame, logger: Logger) -> DataFrame:
df = df.withColumn('redshift_err', F.col('diaObject.z_final_err'))

logger.info("New processor: EarlySN")

args = ['cmidPointTai', 'cfilterName', 'cpsFlux', 'cpsFluxErr']
# fake args
args += [F.col('cdsxmatch'), F.lit(20), F.lit(40)]

# fake cdsxmatch and nobs
args += [F.col('cdsxmatch'), F.lit(20)]
args += [F.col('diaObject.ra'), F.col('diaObject.decl')]
args += [F.col('diaObject.hostgal_ra'), F.col('diaObject.hostgal_dec')]
args += [F.col('diaObject.hostgal_zphot')]
args += [F.col('diaObject.hostgal_zphot_err'), F.col('diaObject.mwebv')]

# maxduration
args += [F.lit(40)]
df = df.withColumn('rf_snia_vs_nonia', rfscore_sigmoid_elasticc(*args))

# Apply level one processor: superNNova
Expand All @@ -429,7 +456,6 @@ def apply_science_modules_elasticc(df: DataFrame, logger: Logger) -> DataFrame:
df = df.withColumn('preds_snn', snn_broad_elasticc(*args))

mapping_snn = {
-1: 0,
0: 11,
1: 13,
2: 12,
Expand All @@ -449,32 +475,17 @@ def apply_science_modules_elasticc(df: DataFrame, logger: Logger) -> DataFrame:
df = df.withColumn('cbpf_preds', predict_nn(*args))

mapping_cats_general = {
-1: 0,
0: 111,
1: 112,
2: 113,
3: 114,
4: 115,
5: 121,
6: 122,
7: 123,
8: 124,
9: 131,
10: 132,
11: 133,
12: 134,
13: 135,
14: 211,
15: 212,
16: 213,
17: 214,
18: 221
0: 11,
1: 12,
2: 13,
3: 21,
4: 22,
}
mapping_cats_general_expr = F.create_map([F.lit(x) for x in chain(*mapping_cats_general.items())])

col_fine_class = F.col('cbpf_preds').getItem(0).astype('int')
df = df.withColumn('cats_fine_class', mapping_cats_general_expr[col_fine_class])
df = df.withColumn('cats_fine_max_prob', F.col('cbpf_preds').getItem(1))
df = df.withColumn('argmax', F.expr('array_position(cbpf_preds, array_max(cbpf_preds)) - 1'))
df = df.withColumn('cats_broad_class', mapping_cats_general_expr[df['argmax']])
df = df.withColumn('cats_broad_max_prob', F.array_max(df['cbpf_preds']))

# AGN
args_forced = [
Expand All @@ -485,13 +496,18 @@ def apply_science_modules_elasticc(df: DataFrame, logger: Logger) -> DataFrame:
]
df = df.withColumn('rf_agn_vs_nonagn', agn_elasticc(*args_forced))

# T2
df = df.withColumn('t2_broad_class', F.lit(0))
df = df.withColumn('t2_broad_max_prob', F.lit(0.0))
# SLSN
args_forced = [
'diaObject.diaObjectId', 'cmidPointTai', 'cpsFlux', 'cpsFluxErr', 'cfilterName',
'diaSource.ra', 'diaSource.decl',
'diaObject.hostgal_zphot', 'diaObject.hostgal_zphot_err',
'diaObject.hostgal_ra', 'diaObject.hostgal_dec'
]
df = df.withColumn('rf_slsn_vs_nonslsn', slsn_elasticc(*args_forced))

# Drop temp columns
df = df.drop(*expanded)
df = df.drop(*['preds_snn', 'cbpf_preds', 'redshift', 'redshift_err', 'cdsxmatch', 'roid'])
df = df.drop(*['preds_snn', 'cbpf_preds', 'redshift', 'redshift_err', 'cdsxmatch', 'roid', 'argmax'])

return df

Expand Down
2 changes: 1 addition & 1 deletion sonar-project.properties
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ sonar.exclusions=fink_broker/htmlcov,fink_broker/slackUtils.py

# Functions, methods and lambdas should not have too many parameters
# Well, they shouldn't.
sonar.issue.ignore.multicriteria.j1.ruleKey=python:S107
sonar.issue.ignore.multicriteria.j1.ruleKey=python:S107,python:S1192
sonar.issue.ignore.multicriteria.j1.resourceKey=**/*.py

# Path to coverage file (need xml)
Expand Down

0 comments on commit 01e6d90

Please sign in to comment.