Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SSO index table #774

Merged
merged 5 commits into from
Dec 21, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 187 additions & 0 deletions bin/index_sso_resolver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
#!/usr/bin/env python
# Copyright 2023 AstroLab Software
# Author: Julien Peloton
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Construct SSO index table, and push to HBase
"""
import os
import argparse
import datetime

import pandas as pd
import numpy as np

from fink_broker.sparkUtils import init_sparksession
from fink_broker.hbaseUtils import push_to_hbase
from fink_broker.loggingUtils import get_fink_logger, inspect_application

from fink_spins.ssoft import rockify


def mark_as_duplicate(series, count=0):
""" Iteratively tag duplicates

Parameters
----------
series: pd.Series
Pandas series containing strings
count: int, optional
Counter, starting at 0.

Returns
----------
series: pd.Series
Input series with duplicate markers of
the form _{level of duplicate}

Examples
----------
>>> pdf = pd.DataFrame({'col': [['a', 'b', 'c'], ['a', 'b'], ['b', 'c']]}).explode('col')
>>> mark_as_duplicate(pdf['col'].copy())
0 a_0
0 b_0
0 c_0
1 a_1
1 b_1
2 b_2
2 c_1
Name: col, dtype: object
"""
if count == 0:
# initialise markers
series = series.apply(lambda x: x + "_{}".format(count))

mask = series.duplicated(keep='first')
if np.sum(mask) > 0:
# duplicates exists
series[mask] = series[mask].apply(lambda x: x.replace('_{}'.format(count), '_{}'.format(count + 1)))
return mark_as_duplicate(series.copy(), count + 1)
else:
return series


def main():
parser = argparse.ArgumentParser(description=__doc__)

parser.add_argument(
'-version', type=str, default=None,
help="""
Version to use in the format YYYY.MM
Default is None, meaning current Year.Month
"""
)
parser.add_argument(
'-science_db_name', type=str, default='ztf',
help="""
The prefix of the HBase table
"""
)
args = parser.parse_args(None)

if args.version is None:
now = datetime.datetime.now()
version = '{}.{:02d}'.format(now.year, now.month)
else:
version = args.version

# Initialise Spark session
spark = init_sparksession(
name="index_sso_resolver_{}".format(version),
shuffle_partitions=2
)

logger = get_fink_logger(spark.sparkContext.appName, "INFO")

# debug statements
inspect_application(logger)

# This is generated by generate_ssoft.py
df = spark.read.format('parquet').load('sso_aggregated_{}'.format(version))

pdf = df.select('ssnamenr').toPandas()

sso_name, sso_number = rockify(pdf.ssnamenr.copy())

# fill None with values from original ssnamenr
mask1 = np.equal(sso_name, None)
sso_name[mask1] = pdf.ssnamenr[mask1]

# Keep only valid number
mask2 = np.equal(sso_number, sso_number)
sso_number_valid = sso_number[mask2]

# Vector contains (MPC_names, MPC_numbers, ZTF_ssnamenr)
index_ssodnet = np.concatenate((sso_name, sso_number_valid, pdf.ssnamenr.values))

# Source identifier
index_source = np.concatenate(
(
['name'] * len(sso_name),
['number'] * len(sso_number_valid),
['ssnamenr'] * len(pdf)
)
)

# create index vector for Fink
index_fink = np.concatenate((pdf.ssnamenr.values, pdf.ssnamenr[mask2].values, pdf.ssnamenr.values))

msg = """
Number of (unique) SSO objects in Fink: {:,}

STATISTIC FROM QUAERO
-------------------------------------
Number of numbered objects: {:,}
--> Number of objects with no number: {:,}
--> Number of objects with only prov. designation: {:,}
--> Number of objects unindentified by quaero: {:,}
""".format(
len(pdf),
len(sso_number_valid),
len(sso_number) - len(sso_number_valid),
np.sum(~mask1 * ~mask2),
np.sum(mask1)
)

logger.info(msg)

pdf_index = pd.DataFrame(
{
'ssodnet': index_ssodnet,
'ssnamenr': index_fink,
'source': index_source
},
dtype=str
)

index_col = mark_as_duplicate(pdf_index['ssodnet'].copy())
pdf_index['ssodnet'] = index_col

df_index = spark.createDataFrame(pdf_index)

cf = {i: 'i' for i in df_index.columns}

index_row_key_name = 'ssodnet'
index_name = '.sso_resolver'

push_to_hbase(
df=df_index,
table_name=args.science_db_name + index_name,
rowkeyname=index_row_key_name,
cf=cf,
catfolder=os.environ['FINK_HOME']
)


if __name__ == "__main__":
main()
Loading