Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change partitioning strategy for online processing #793

Merged
merged 36 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
9fcd4bf
Remove dropduplicates which is no more needed
JulienPeloton Jan 18, 2024
1bf32a0
Remove partitioning for online data
JulienPeloton Jan 18, 2024
ee5ba52
Add timings in the alert packets
JulienPeloton Jan 18, 2024
a5952d4
Fix path for distribution
JulienPeloton Jan 18, 2024
566b757
Modify database script with new paths
JulienPeloton Jan 18, 2024
6c79f12
Enable DB integration tests
JulienPeloton Jan 18, 2024
278db90
Rename paths for test data
JulienPeloton Jan 18, 2024
a8ec0a6
Fix missing import
JulienPeloton Jan 18, 2024
82da46b
Update configuration file with dependencies
JulienPeloton Jan 18, 2024
5cedfac
Update paths
JulienPeloton Jan 18, 2024
7bb3592
Merge remote-tracking branch 'origin' into issue/770/duplicates
JulienPeloton Feb 12, 2024
1202d0f
Merge remote-tracking branch 'origin' into issue/770/duplicates
JulienPeloton Feb 20, 2024
d77d4f8
Fix conflicts
JulienPeloton May 22, 2024
029a0c4
Check and lint
JulienPeloton May 22, 2024
58b0bfc
Fix typo when importing module
JulienPeloton May 22, 2024
1468c2e
Remove unused code
JulienPeloton May 22, 2024
34bb6ac
[Fink-MM x Fink-Broker] Add the fink_mm pipeline into raw2science (#811)
FusRoman Jun 11, 2024
be7430a
Fix conflict with origin
JulienPeloton Jun 11, 2024
41ef904
Do not filter alerts containing i-band only in the history (#855)
JulienPeloton Jun 12, 2024
410c106
Update ZTF schedule (#857)
JulienPeloton Jun 13, 2024
a180a1c
Update the rowkey construction (#859)
JulienPeloton Jul 26, 2024
9710782
Merge branch 'master' into issue/770/duplicates
JulienPeloton Jul 29, 2024
8b699c3
Update the topic value for helm
JulienPeloton Jul 29, 2024
50a1d4a
Add night as asrgument for stream2raw in the CI
JulienPeloton Jul 29, 2024
7a78ef4
Update conf
JulienPeloton Jul 29, 2024
bd21c56
Increase the default Kafka delivery.timeout.ms
JulienPeloton Jul 29, 2024
227eed1
Change topic name for Sentinel
JulienPeloton Jul 30, 2024
24ad76a
Use the night argument to make the partitioning instead of the alert …
JulienPeloton Jul 30, 2024
1790b18
Relaunch CI
JulienPeloton Jul 31, 2024
ff9e880
Add a new argument --noscience to bin/fink
JulienPeloton Aug 1, 2024
7d8e934
Cast only if fields exist
JulienPeloton Aug 1, 2024
26f3d47
Update distribute with old Kafka setup
JulienPeloton Aug 1, 2024
594b227
Put on hold e2e tests on VD until a solution is found. Only relying o…
JulienPeloton Aug 1, 2024
f7ef099
Check the output topic in Kafka for Sentinel
JulienPeloton Aug 1, 2024
990fbf1
Add script to detect hostless candidates (#870)
JulienPeloton Aug 26, 2024
6d45905
Ignore unecessary rule
JulienPeloton Aug 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
26 changes: 8 additions & 18 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:

strategy:
matrix:
container: ["julienpeloton/fink-ci:dev"]
container: ["julienpeloton/fink-ci:latest"]

container:
image: ${{ matrix.container }}
Expand All @@ -35,11 +35,6 @@ jobs:
echo "FINK_HOME=$GITHUB_WORKSPACE" >> $GITHUB_ENV
echo "JAVA_HOME=$(dirname $(dirname $(readlink -f $(which java))))" >> $GITHUB_ENV
echo "${BINPATH}" >> $GITHUB_PATH
#- name: Download test data
# run: |
# cd $FINK_HOME/datasim/basic_alerts
# source $FINK_HOME/datasim/basic_alerts/download_ztf_alert_data.sh
# cd $FINK_HOME
- name: Download simulator
run: |
git clone https://github.com/astrolabsoftware/fink-alert-simulator.git
Expand All @@ -50,33 +45,28 @@ jobs:
echo "FINK_SCHEMA=${FINK_HOME}/fink-alert-schemas" >> $GITHUB_ENV
- name: Set up env [2/2]
run: |
echo "PYTHONPATH=${FINK_HOME}:${FINK_ALERT_SIMULATOR}:${PYTHONPATH}" >> $GITHUB_ENV
echo "PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.10.9.7-src.zip:${FINK_HOME}:${FINK_ALERT_SIMULATOR}:${PYTHONPATH}" >> $GITHUB_ENV
echo "${FINK_HOME}/bin:${FINK_ALERT_SIMULATOR}/bin" >> $GITHUB_PATH
echo "PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH" >> $GITHUB_ENV
- name: Check env
run: |
echo "PATH: $GITHUB_ENV"
echo "FINK_HOME: $FINK_HOME"
echo "SPARK_HOME: $SPARK_HOME"
echo "SPARKLIB: $SPARKLIB"
echo "FINK_ALERT_SIMULATOR: $FINK_ALERT_SIMULATOR"
echo "FINK_SCHEMA: $FINK_SCHEMA"
echo "KAFKA_HOME: $KAFKA_HOME"
echo "PYTHONPATH: $PYTHONPATH"
echo "JAVA_HOME: $JAVA_HOME"
echo `python -V`
- name: Run test suites [dev]
if: matrix.container == 'julienpeloton/fink-ci:dev'
- name: Run test suite
run: |
# to remove
export KAFKA_OPTS="-Djava.net.preferIPv4Stack=True"
cd $USRLIBS
source scripts/start_services.sh --kafka-version ${KAFKA_VERSION} --hbase-version ${HBASE_VERSION}
cd $FINK_HOME
fink_test -c conf/fink.conf.dev --unit-tests --db-integration
- name: Run test suites [prod]
if: matrix.container == 'julienpeloton/fink-ci:prod'
run: |
cd $USRLIBS
source scripts/start_services.sh --kafka-version ${KAFKA_VERSION} --hbase-version ${HBASE_VERSION}
cd $FINK_HOME
fink_test -c conf/fink.conf.prod --unit-tests --db-integration
fink_test -c conf/fink.conf.dev --stream-integration --db-integration --mm-offline --unit-tests
curl -s https://codecov.io/bash | bash
- uses: act10ns/slack@v1
with:
Expand Down
2 changes: 1 addition & 1 deletion .ruff.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ target-version = "py38"
# Rules can be found at https://docs.astral.sh/ruff/rules
select = ["E4", "E7", "E9", "F", "E1", "E2", "W", "N", "D", "B", "C4", "PD", "PERF"]
ignore = [
"C408", "C901",
"C408", "C420", "C901",
"D400", "D401", "D100", "D102", "D103", "D104", "D415", "D419",
"E731",
"N812", "N806", "N803",
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ All are open source, and we thank all contributors!
- Publications: https://fink-broker.org/papers
- Release notes: https://fink-broker.readthedocs.io/en/latest/release-notes


## Outside academia

Fink has participated to a number of events with the private sector, in particular:
Expand Down
10 changes: 8 additions & 2 deletions bin/active_learning_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def main():

# model path
curdir = os.path.dirname(os.path.abspath(__file__))
model = curdir + "/data/models/for_al_loop/model_20240422.pkl"
model = curdir + "/data/models/for_al_loop/model_20240729.pkl"

# Run SN classification using AL model
rfscore_args = ["cjd", "cfid", "cmagpsf", "csigmapsf"]
Expand Down Expand Up @@ -153,6 +153,8 @@ def main():
"al_snia_vs_nonia",
"rf_snia_vs_nonia",
"candidate.ndethist",
"candidate.jdstarthist",
"candidate.jd",
]

pdf = df_filt.select(cols_).toPandas()
Expand All @@ -166,8 +168,12 @@ def main():

msg_handler_slack(slack_data, "bot_al_loop", init_msg)

# maximum 10 days between intial and final detection
c7 = (pdf["jd"] - pdf["jdstarthist"]) <= 10.0
pdf_early = pdf[c7]

# Filter for high probabilities
pdf_hp = pdf[pdf["al_snia_vs_nonia"] > 0.5]
pdf_hp = pdf_early[pdf_early["al_snia_vs_nonia"] > 0.5]
pdf_hp = pdf_hp.sort_values("al_snia_vs_nonia", ascending=False)

init_msg = f"Number of candidates for the night {args.night} (high probability): {len(pdf_hp)} ({len(np.unique(pdf_hp.objectId))} unique objects)."
Expand Down
5 changes: 1 addition & 4 deletions bin/anomaly_archival.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from fink_broker.logging_utils import get_fink_logger, inspect_application

from fink_broker.hbase_utils import push_full_df_to_hbase, add_row_key
from fink_broker.hbase_utils import push_full_df_to_hbase


def main():
Expand Down Expand Up @@ -87,9 +87,6 @@ def main():

# Row key
row_key_name = "jd_objectId"
df_hbase = add_row_key(
df_hbase, row_key_name=row_key_name, cols=["candidate.jd", "objectId"]
)

# push data to HBase
push_full_df_to_hbase(
Expand Down
140 changes: 111 additions & 29 deletions bin/distribute.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python
# Copyright 2019-2023 AstroLab Software
# Copyright 2019-2024 AstroLab Software
# Author: Abhishek Chauhan, Julien Peloton
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -23,18 +23,26 @@
"""

import argparse
import logging
import time
import os

from fink_utils.spark import schema_converter
from fink_broker.parser import getargs
from fink_broker.spark_utils import init_sparksession, connect_to_raw_database
from fink_broker.spark_utils import (
init_sparksession,
connect_to_raw_database,
path_exist,
)
from fink_broker.distribution_utils import get_kafka_df
from fink_broker.logging_utils import get_fink_logger, inspect_application

from fink_utils.spark.utils import concat_col

from fink_utils.spark.utils import apply_user_defined_filter


_LOG = logging.getLogger(__name__)

# User-defined topics
userfilters = [
"fink_filters.filter_early_sn_candidates.filter.early_sn_candidates",
Expand All @@ -52,6 +60,64 @@
]


def launch_fink_mm(spark, args: dict):
"""Manage multimessenger operations

Parameters
----------
spark: SparkSession
Spark Session
args: dict
Arguments from Fink configuration file

Returns
-------
time_spent_in_wait: int
Time spent in waiting for GCN to come
before launching the streaming query.
stream_distrib_list: list of StreamingQuery
List of Spark Streaming queries

"""
if args.noscience:
_LOG.info("No science: fink-mm is not applied")
return 0, []
elif args.mmconfigpath != "no-config":
from fink_mm.init import get_config
from fink_broker.mm_utils import mm2distribute

_LOG.info("Fink-MM configuration file: {args.mmconfigpath}")
config = get_config({"--config": args.mmconfigpath})

# Wait for GCN comming
time_spent_in_wait = 0
stream_distrib_list = []
while time_spent_in_wait < args.exit_after:
mm_path_output = config["PATH"]["online_grb_data_prefix"]
mmtmpdatapath = os.path.join(mm_path_output, "online")

# if there is gcn and ztf data
if path_exist(mmtmpdatapath):
t_before = time.time()
_LOG.info("starting mm2distribute ...")
stream_distrib_list = mm2distribute(spark, config, args)
time_spent_in_wait += time.time() - t_before
break

time_spent_in_wait += 1
time.sleep(1.0)
if stream_distrib_list == []:
_LOG.warning(
f"{mmtmpdatapath} does not exist. mm2distribute could not start before the end of the job."
)
else:
_LOG.info("Time spent in waiting for Fink-MM: {time_spent_in_wait} seconds")
return time_spent_in_wait, stream_distrib_list

_LOG.warning("No configuration found for fink-mm -- not applied")
return 0, []


def main():
parser = argparse.ArgumentParser(description=__doc__)
args = getargs(parser)
Expand All @@ -63,28 +129,29 @@ def main():
log_level=args.spark_log_level,
)

# The level here should be controlled by an argument.
logger = get_fink_logger(spark.sparkContext.appName, args.log_level)

# debug statements
inspect_application(logger)

# data path
scitmpdatapath = args.online_data_prefix + "/science"
checkpointpath_kafka = args.online_data_prefix + "/kafka_checkpoint"

# Connect to the TMP science database
input_sci = scitmpdatapath + "/year={}/month={}/day={}".format(
args.night[0:4], args.night[4:6], args.night[6:8]
scitmpdatapath = args.online_data_prefix + "/science/{}".format(args.night)
checkpointpath_kafka = args.online_data_prefix + "/kafka_checkpoint/{}".format(
args.night
)
df = connect_to_raw_database(input_sci, input_sci, latestfirst=False)

# Drop partitioning columns
df = df.drop("year").drop("month").drop("day")
# Connect to the TMP science database
df = connect_to_raw_database(scitmpdatapath, scitmpdatapath, latestfirst=False)

# Cast fields to ease the distribution
cnames = df.columns
cnames[cnames.index("timestamp")] = "cast(timestamp as string) as timestamp"

if "brokerEndProcessTimestamp" in cnames:
cnames[cnames.index("brokerEndProcessTimestamp")] = (
"cast(brokerEndProcessTimestamp as string) as brokerEndProcessTimestamp"
)
cnames[cnames.index("brokerStartProcessTimestamp")] = (
"cast(brokerStartProcessTimestamp as string) as brokerStartProcessTimestamp"
)
cnames[cnames.index("brokerIngestTimestamp")] = (
"cast(brokerIngestTimestamp as string) as brokerIngestTimestamp"
)

cnames[cnames.index("cutoutScience")] = "struct(cutoutScience.*) as cutoutScience"
cnames[cnames.index("cutoutTemplate")] = (
"struct(cutoutTemplate.*) as cutoutTemplate"
Expand All @@ -96,6 +163,7 @@ def main():
"explode(array(prv_candidates)) as prv_candidates"
)
cnames[cnames.index("candidate")] = "struct(candidate.*) as candidate"

if not args.noscience:
# This column is added by the science pipeline
cnames[cnames.index("lc_features_g")] = (
Expand All @@ -105,12 +173,6 @@ def main():
"struct(lc_features_r.*) as lc_features_r"
)

# Extract schema
df_schema = spark.read.format("parquet").load(input_sci)
df_schema = df_schema.selectExpr(cnames)

schema = schema_converter.to_avro(df_schema.coalesce(1).limit(1).schema)

# Retrieve time-series information
to_expand = [
"jd",
Expand All @@ -136,6 +198,10 @@ def main():
df = df.withColumn("cstampDatac", df["cutoutScience.stampData"])

broker_list = args.distribution_servers
# username = args.kafka_sasl_username
# password = args.kafka_sasl_password
# kafka_buf_mem = args.kafka_buffer_memory
# kafka_timeout_ms = args.kafka_delivery_timeout_ms
for userfilter in userfilters:
# The topic name is the filter name
topicname = args.substream_prefix + userfilter.split(".")[-1] + "_ztf"
Expand All @@ -144,11 +210,15 @@ def main():
if args.noscience:
df_tmp = df
else:
df_tmp = apply_user_defined_filter(df, userfilter, logger)
df_tmp = apply_user_defined_filter(df, userfilter, _LOG)

# Wrap alert data
df_tmp = df_tmp.selectExpr(cnames)

# get schema from the streaming dataframe to
# avoid non-nullable bug #852
schema = schema_converter.to_avro(df_tmp.schema)

# Get the DataFrame for publishing to Kafka (avro serialized)
df_kafka = get_kafka_df(df_tmp, key=schema, elasticc=False)

Expand All @@ -158,17 +228,29 @@ def main():
.option("kafka.bootstrap.servers", broker_list)
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
# .option("kafka.sasl.username", username)
# .option("kafka.sasl.password", password)
# .option("kafka.buffer.memory", kafka_buf_mem)
# .option("kafka.delivery.timeout.ms", kafka_timeout_ms)
# .option("kafka.auto.create.topics.enable", True)
.option("topic", topicname)
.option("checkpointLocation", checkpointpath_kafka + topicname)
.option("checkpointLocation", checkpointpath_kafka + "/" + topicname)
.trigger(processingTime="{} seconds".format(args.tinterval))
.start()
)

time_spent_in_wait, stream_distrib_list = launch_fink_mm(spark, args)

# Keep the Streaming running until something or someone ends it!
if args.exit_after is not None:
time.sleep(args.exit_after)
remaining_time = args.exit_after - time_spent_in_wait
remaining_time = remaining_time if remaining_time > 0 else 0
time.sleep(remaining_time)
disquery.stop()
logger.info("Exiting the distribute service normally...")
if stream_distrib_list != []:
for stream in stream_distrib_list:
stream.stop()
_LOG.info("Exiting the distribute service normally...")
else:
# Wait for the end of queries
spark.streams.awaitAnyTermination()
Expand Down
Loading
Loading