From 5392714c775fae661dbe4f6beba7c154902ad284 Mon Sep 17 00:00:00 2001 From: Julien Date: Tue, 30 Jan 2024 13:11:12 +0100 Subject: [PATCH] Add more columns for uppervalid alerts (#785) * Update dependencies * Update HBase deps in the config file * Keep updating the conf file * Limit concurrency * Fix typo * Add distnr for uppervalid * Fix typo --- .github/workflows/test.yml | 6 +++ bin/index_archival.py | 80 ++++++++++++++++++++++++++++---------- deps/requirements.txt | 6 +-- 3 files changed, 68 insertions(+), 24 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e75d3a68..2861c1bf 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -9,6 +9,10 @@ on: - master pull_request: +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + jobs: test-suite: runs-on: ubuntu-latest @@ -62,6 +66,8 @@ jobs: - name: Run test suites [dev] if: matrix.container == 'julienpeloton/fink-ci:dev' run: | + # TODO: remove me + pip install fink-utils==0.13.11 cd $USRLIBS source scripts/start_services.sh --kafka-version ${KAFKA_VERSION} --hbase-version ${HBASE_VERSION} cd $FINK_HOME diff --git a/bin/index_archival.py b/bin/index_archival.py index 5063c6b5..5919a855 100644 --- a/bin/index_archival.py +++ b/bin/index_archival.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# Copyright 2020-2023 AstroLab Software +# Copyright 2020-2024 AstroLab Software # Author: Julien Peloton # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -42,6 +42,7 @@ from fink_broker.loggingUtils import get_fink_logger, inspect_application from fink_filters.classification import extract_fink_classification +from fink_utils.spark.utils import check_status_last_prv_candidates from fink_tns.utils import download_catalog @@ -88,17 +89,9 @@ def main(): # This is independent from the final structure cf = assign_column_family_names(df_flat, cols_i, cols_d, cols_b) - # Restrict the input DataFrame to the subset of wanted columns. - if 'upper' in args.index_table: - df = data.select( - F.col('objectId').cast('string').alias('objectId'), - F.col('prv_candidates.jd').cast('array').alias('jd'), - F.col('prv_candidates.fid').cast('array').alias('fid'), - F.col('prv_candidates.magpsf').cast('array').alias('magpsf'), - F.col('prv_candidates.sigmapsf').cast('array').alias('sigmapsf'), - F.col('prv_candidates.diffmaglim').cast('array').alias('diffmaglim') - ) - else: + # Restrict the input DataFrame to the subset of wanted columns, + # except for tables containing uppervalid & upper limit data + if 'upper' not in args.index_table: df = df_flat # Load common cols (casted) @@ -212,9 +205,21 @@ def main(): row_key_name=index_row_key_name ) elif columns[0] == 'upper': - # This case is the same as the main table - # but we keep only upper limit measurements. + # select only data for which there are new recent upper limits + data = check_status_last_prv_candidates(data, status='upper') + data = data.filter('upper') + + df = data.select( + F.col('objectId').cast('string').alias('objectId'), + F.col('prv_candidates.jd').cast('array').alias('jd'), + F.col('prv_candidates.fid').cast('array').alias('fid'), + F.col('prv_candidates.magpsf').cast('array').alias('magpsf'), + F.col('prv_candidates.sigmapsf').cast('array').alias('sigmapsf'), + F.col('prv_candidates.diffmaglim').cast('array').alias('diffmaglim') + ) + index_row_key_name = 'objectId_jd' + # explode df_ex = df.withColumn( "tmp", @@ -229,18 +234,40 @@ def main(): F.col("tmp.diffmaglim") ) - # take only upper limits + # take only upper limits within remaining historical data df_index = df_ex.filter(~df_ex['magpsf'].isNotNull()) # drop NaN columns df_index = df_index.drop(*['magpsf', 'sigmapsf']) elif columns[0] == 'uppervalid': - # This case is the same as the main table - # but we keep only upper limit measurements. + # select only data for which there are new recent low quality data + data = check_status_last_prv_candidates(data, status='uppervalid') + data = data.filter('uppervalid') + + df = data.select( + F.col('objectId').cast('string').alias('objectId'), + F.col('prv_candidates.jd').cast('array').alias('jd'), + F.col('prv_candidates.fid').cast('array').alias('fid'), + F.col('prv_candidates.magpsf').cast('array').alias('magpsf'), + F.col('prv_candidates.sigmapsf').cast('array').alias('sigmapsf'), + F.col('prv_candidates.diffmaglim').cast('array').alias('diffmaglim'), + F.col('prv_candidates.magnr').cast('array').alias('magnr'), + F.col('prv_candidates.sigmagnr').cast('array').alias('sigmagnr'), + F.col('prv_candidates.isdiffpos').cast('array').alias('isdiffpos'), + F.col('prv_candidates.distnr').cast('array').alias('distnr'), + F.col('prv_candidates.rb').cast('array').alias('rb'), + F.col('prv_candidates.nbad').cast('array').alias('nbad') + ) + index_row_key_name = 'objectId_jd' + # explode df_ex = df.withColumn( "tmp", - F.arrays_zip("magpsf", "sigmapsf", "diffmaglim", "jd", "fid") + F.arrays_zip( + "magpsf", "sigmapsf", "diffmaglim", "jd", "fid", + "magnr", "sigmagnr", "isdiffpos", "distnr", + "rb", "nbad" + ) ).withColumn("tmp", F.explode("tmp")).select( F.concat_ws('_', 'objectId', 'tmp.jd').alias(index_row_key_name), "objectId", @@ -248,11 +275,22 @@ def main(): F.col("tmp.fid"), F.col("tmp.magpsf"), F.col("tmp.sigmapsf"), - F.col("tmp.diffmaglim") + F.col("tmp.diffmaglim"), + F.col("tmp.magnr"), + F.col("tmp.sigmagnr"), + F.col("tmp.isdiffpos"), + F.col("tmp.distnr"), + F.col("tmp.rb"), + F.col("tmp.nbad") ) - # take only valid measurements from the history - df_index = df_ex.filter(df_ex['magpsf'].isNotNull()) + # take only noisy measurements from the history + f1 = (df_ex["rb"] >= 0.55) & (df_ex["nbad"] == 0) + cond = ~f1 & df_ex['magpsf'].isNotNull() + df_index = df_ex.filter(cond) + + # Remove unused columns + df_index = df_index.drop(*["rb", "nbad"]) elif columns[0] == 'tns': with open('{}/tns_marker.txt'.format(args.tns_folder)) as f: tns_marker = f.read().replace('\n', '') diff --git a/deps/requirements.txt b/deps/requirements.txt index 7867ad43..6449663c 100644 --- a/deps/requirements.txt +++ b/deps/requirements.txt @@ -19,9 +19,9 @@ fastavro==1.6.0 # Fink core fink_filters>=3.24 -git+https://github.com/astrolabsoftware/fink-science@5.6.1 -fink-utils>=0.13.8 -fink-spins>=0.3.5 +git+https://github.com/astrolabsoftware/fink-science@5.6.2 +fink-utils>=0.13.11 +fink-spins>=0.3.7 fink-tns>=0.9 # Misc