Skip to content

Commit

Permalink
Change partitioning strategy for online processing (#793)
Browse files Browse the repository at this point in the history
* Remove dropduplicates which is no more needed

* Remove partitioning for online data

* Add timings in the alert packets

* Fix path for distribution

* Modify database script with new paths

* Enable DB integration tests

* Rename paths for test data

* Fix missing import

* Update configuration file with dependencies

* Update paths

* Check and lint

* Fix typo when importing module

* Remove unused code

* [Fink-MM x Fink-Broker] Add the fink_mm pipeline into raw2science (#811)

* add the fink_mm pipeline into raw2science

* pep8 requirements and add documentation and comments

* fix bugs and problems with fink CI, restore the stream test, preparation for fink-mm test

* pep8

* unit test fixed

* add stream_integration argument

* add echo path in test

* fixed pythonpath

* fixed pythonpath

* install fink-mm dev version, to remove after the test dev phase

* add datatest for all topics

* add datasim for join with gcn

* add gcn data test

* pep8

* integrate fink-mm distribution to the broker

* update fink-mm commit in workflow, pep8

* add mechanism to avoid bad schema inference of spark dataframe with fink-mm

* raw2science too short in CI to generate MM join data

* add tests for fink-mm offline

* review modification, fix the fink_mm offline test conf

* fix distribution CI, drop new timestamp column for fink_mm, convert new timestamp column into string for fink-broker

* pep8

* fix parser default

* Format files

* Remove NIGHT declaration duplicate

* Style

* Fix headers

* Ruff formatting

* Fix module path

* Merge mm utils into a single module mm_utils.py

* Refactor the fink-mm section in raw2science

* Refactor distribute

* Cleaning files (#849)

* Remove the need for SCRAM

* Update fink bin

* Increase the number of shuffle partition for SSO

* Push all alerts in once

* Update science elasticc

* Use subscribePattern instead of subscribe

* Update scheduler

* Format code

* Discard alerts with i band measurements (#839)

* Add new argument in configuration file

* Update conf files

* Add missing argument

* Format raw2science

* Better path management

* Fix bug in path

* Check if files exist -- not just the folder (#851)

* Check if files exist -- not just the folder

* Bump fink-filters to 3.29, and test it on CI

* Bump fink-filters to 3.30

* Improve verbosity when trying to launch fink-mm

* Apply ruff

* Add missing parameter in the conf file

* Check HDFS folder is not empty before launching services

* Wait for one batch to complete before launching

* Switch Docker image

* Use the streaming DF to infer schema (#853)

---------

Co-authored-by: JulienPeloton <[email protected]>

* Do not filter alerts containing i-band only in the history (#855)

* Update ZTF schedule (#857)

* Update the rowkey construction (#859)

* Update the rowkey construction

* Update tester to enable capability to test one file

* Improve CD process

- increase argoCD usage
- use Spark operator
- use Minio operator
- add Helm chart for fink-broker
- Improve logging management
- Use finkctl to create kafka secret
- Bump ciux to v0.0.3-rc4
- Bump ktbx to v1.1.3-rc1
- Increase sync checks in CI
  * Wait for input topic to exist
  * Wait for fink-producer secret to appear

* Add temporary hack to CI

* Improve code format an linting

* add delta time (#860)

* Fix typo

* Fix column name when constructing the rowkey (#864)

* Fix column name when constructing the rowkey

* Reformat

* Remove unused (and wrong) row key addition

* PEP8

* Fix bug in column names

* Improve logging message

* Trigger GHA build via cron

* Remove tmate session in ci

* Remove sudo for docker prune in ci

* Improve pip dependencies management

Add Dockerfile to ciux source pathes
Increase parameters management
Add separate log level for spark
Improve build script configuration

* Improve fink-broker configuration

* Fix ciux init in CI

* Improve fink startup script

* Use finkctl new release

* Document release management

* Ruff

* clean CI yaml

* Ruff

* Force ipv4 for Kafka

* Change the path to the fink alert simulator

* Restore the path. We have a problem because schema cannot be read.

* Update the configuration for the schema

* Change get_fink_logger into init_logger

---------

Co-authored-by: Fabrice Jammes <[email protected]>
Co-authored-by: Anais Möller <[email protected]>
Co-authored-by: Fabrice Jammes <[email protected]>

* Update the topic value for helm

* Add night as asrgument for stream2raw in the CI

* Update conf

* Increase the default Kafka delivery.timeout.ms

* Change topic name for Sentinel

* Use the night argument to make the partitioning instead of the alert timestamp

* Relaunch CI

* Add a new argument --noscience to bin/fink

* Cast only if fields exist

* Update distribute with old Kafka setup

* Put on hold e2e tests on VD until a solution is found. Only relying on Sentinel for the science and e2e-gha for the noscience

* Check the output topic in Kafka for Sentinel

* Add script to detect hostless candidates (#870)

* Add script to detect hostless candidates

* notes

* updated model (#869)

* Bump fink-science and fink-filters

* Switch to Telegram bot for hostless detection

* Ruff

* Bump requirements for fink-utils

* Fix missing args

* Split database operations

---------

Co-authored-by: Anais Möller <[email protected]>

* Ignore unecessary rule

---------

Co-authored-by: FusRoman <[email protected]>
Co-authored-by: Fabrice Jammes <[email protected]>
Co-authored-by: Anais Möller <[email protected]>
Co-authored-by: Fabrice Jammes <[email protected]>
  • Loading branch information
5 people authored Aug 26, 2024
1 parent 0ef921a commit dbb8f5c
Show file tree
Hide file tree
Showing 626 changed files with 102,720 additions and 395 deletions.
File renamed without changes.
File renamed without changes.
26 changes: 8 additions & 18 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:

strategy:
matrix:
container: ["julienpeloton/fink-ci:dev"]
container: ["julienpeloton/fink-ci:latest"]

container:
image: ${{ matrix.container }}
Expand All @@ -35,11 +35,6 @@ jobs:
echo "FINK_HOME=$GITHUB_WORKSPACE" >> $GITHUB_ENV
echo "JAVA_HOME=$(dirname $(dirname $(readlink -f $(which java))))" >> $GITHUB_ENV
echo "${BINPATH}" >> $GITHUB_PATH
#- name: Download test data
# run: |
# cd $FINK_HOME/datasim/basic_alerts
# source $FINK_HOME/datasim/basic_alerts/download_ztf_alert_data.sh
# cd $FINK_HOME
- name: Download simulator
run: |
git clone https://github.com/astrolabsoftware/fink-alert-simulator.git
Expand All @@ -50,33 +45,28 @@ jobs:
echo "FINK_SCHEMA=${FINK_HOME}/fink-alert-schemas" >> $GITHUB_ENV
- name: Set up env [2/2]
run: |
echo "PYTHONPATH=${FINK_HOME}:${FINK_ALERT_SIMULATOR}:${PYTHONPATH}" >> $GITHUB_ENV
echo "PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.10.9.7-src.zip:${FINK_HOME}:${FINK_ALERT_SIMULATOR}:${PYTHONPATH}" >> $GITHUB_ENV
echo "${FINK_HOME}/bin:${FINK_ALERT_SIMULATOR}/bin" >> $GITHUB_PATH
echo "PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH" >> $GITHUB_ENV
- name: Check env
run: |
echo "PATH: $GITHUB_ENV"
echo "FINK_HOME: $FINK_HOME"
echo "SPARK_HOME: $SPARK_HOME"
echo "SPARKLIB: $SPARKLIB"
echo "FINK_ALERT_SIMULATOR: $FINK_ALERT_SIMULATOR"
echo "FINK_SCHEMA: $FINK_SCHEMA"
echo "KAFKA_HOME: $KAFKA_HOME"
echo "PYTHONPATH: $PYTHONPATH"
echo "JAVA_HOME: $JAVA_HOME"
echo `python -V`
- name: Run test suites [dev]
if: matrix.container == 'julienpeloton/fink-ci:dev'
- name: Run test suite
run: |
# to remove
export KAFKA_OPTS="-Djava.net.preferIPv4Stack=True"
cd $USRLIBS
source scripts/start_services.sh --kafka-version ${KAFKA_VERSION} --hbase-version ${HBASE_VERSION}
cd $FINK_HOME
fink_test -c conf/fink.conf.dev --unit-tests --db-integration
- name: Run test suites [prod]
if: matrix.container == 'julienpeloton/fink-ci:prod'
run: |
cd $USRLIBS
source scripts/start_services.sh --kafka-version ${KAFKA_VERSION} --hbase-version ${HBASE_VERSION}
cd $FINK_HOME
fink_test -c conf/fink.conf.prod --unit-tests --db-integration
fink_test -c conf/fink.conf.dev --stream-integration --db-integration --mm-offline --unit-tests
curl -s https://codecov.io/bash | bash
- uses: act10ns/slack@v1
with:
Expand Down
2 changes: 1 addition & 1 deletion .ruff.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ target-version = "py38"
# Rules can be found at https://docs.astral.sh/ruff/rules
select = ["E4", "E7", "E9", "F", "E1", "E2", "W", "N", "D", "B", "C4", "PD", "PERF"]
ignore = [
"C408", "C901",
"C408", "C420", "C901",
"D400", "D401", "D100", "D102", "D103", "D104", "D415", "D419",
"E731",
"N812", "N806", "N803",
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ All are open source, and we thank all contributors!
- Publications: https://fink-broker.org/papers
- Release notes: https://fink-broker.readthedocs.io/en/latest/release-notes


## Outside academia

Fink has participated to a number of events with the private sector, in particular:
Expand Down
10 changes: 8 additions & 2 deletions bin/active_learning_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def main():

# model path
curdir = os.path.dirname(os.path.abspath(__file__))
model = curdir + "/data/models/for_al_loop/model_20240422.pkl"
model = curdir + "/data/models/for_al_loop/model_20240729.pkl"

# Run SN classification using AL model
rfscore_args = ["cjd", "cfid", "cmagpsf", "csigmapsf"]
Expand Down Expand Up @@ -153,6 +153,8 @@ def main():
"al_snia_vs_nonia",
"rf_snia_vs_nonia",
"candidate.ndethist",
"candidate.jdstarthist",
"candidate.jd",
]

pdf = df_filt.select(cols_).toPandas()
Expand All @@ -166,8 +168,12 @@ def main():

msg_handler_slack(slack_data, "bot_al_loop", init_msg)

# maximum 10 days between intial and final detection
c7 = (pdf["jd"] - pdf["jdstarthist"]) <= 10.0
pdf_early = pdf[c7]

# Filter for high probabilities
pdf_hp = pdf[pdf["al_snia_vs_nonia"] > 0.5]
pdf_hp = pdf_early[pdf_early["al_snia_vs_nonia"] > 0.5]
pdf_hp = pdf_hp.sort_values("al_snia_vs_nonia", ascending=False)

init_msg = f"Number of candidates for the night {args.night} (high probability): {len(pdf_hp)} ({len(np.unique(pdf_hp.objectId))} unique objects)."
Expand Down
5 changes: 1 addition & 4 deletions bin/anomaly_archival.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from fink_broker.logging_utils import get_fink_logger, inspect_application

from fink_broker.hbase_utils import push_full_df_to_hbase, add_row_key
from fink_broker.hbase_utils import push_full_df_to_hbase


def main():
Expand Down Expand Up @@ -87,9 +87,6 @@ def main():

# Row key
row_key_name = "jd_objectId"
df_hbase = add_row_key(
df_hbase, row_key_name=row_key_name, cols=["candidate.jd", "objectId"]
)

# push data to HBase
push_full_df_to_hbase(
Expand Down
140 changes: 111 additions & 29 deletions bin/distribute.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python
# Copyright 2019-2023 AstroLab Software
# Copyright 2019-2024 AstroLab Software
# Author: Abhishek Chauhan, Julien Peloton
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -23,18 +23,26 @@
"""

import argparse
import logging
import time
import os

from fink_utils.spark import schema_converter
from fink_broker.parser import getargs
from fink_broker.spark_utils import init_sparksession, connect_to_raw_database
from fink_broker.spark_utils import (
init_sparksession,
connect_to_raw_database,
path_exist,
)
from fink_broker.distribution_utils import get_kafka_df
from fink_broker.logging_utils import get_fink_logger, inspect_application

from fink_utils.spark.utils import concat_col

from fink_utils.spark.utils import apply_user_defined_filter


_LOG = logging.getLogger(__name__)

# User-defined topics
userfilters = [
"fink_filters.filter_early_sn_candidates.filter.early_sn_candidates",
Expand All @@ -52,6 +60,64 @@
]


def launch_fink_mm(spark, args: dict):
"""Manage multimessenger operations
Parameters
----------
spark: SparkSession
Spark Session
args: dict
Arguments from Fink configuration file
Returns
-------
time_spent_in_wait: int
Time spent in waiting for GCN to come
before launching the streaming query.
stream_distrib_list: list of StreamingQuery
List of Spark Streaming queries
"""
if args.noscience:
_LOG.info("No science: fink-mm is not applied")
return 0, []
elif args.mmconfigpath != "no-config":
from fink_mm.init import get_config
from fink_broker.mm_utils import mm2distribute

_LOG.info("Fink-MM configuration file: {args.mmconfigpath}")
config = get_config({"--config": args.mmconfigpath})

# Wait for GCN comming
time_spent_in_wait = 0
stream_distrib_list = []
while time_spent_in_wait < args.exit_after:
mm_path_output = config["PATH"]["online_grb_data_prefix"]
mmtmpdatapath = os.path.join(mm_path_output, "online")

# if there is gcn and ztf data
if path_exist(mmtmpdatapath):
t_before = time.time()
_LOG.info("starting mm2distribute ...")
stream_distrib_list = mm2distribute(spark, config, args)
time_spent_in_wait += time.time() - t_before
break

time_spent_in_wait += 1
time.sleep(1.0)
if stream_distrib_list == []:
_LOG.warning(
f"{mmtmpdatapath} does not exist. mm2distribute could not start before the end of the job."
)
else:
_LOG.info("Time spent in waiting for Fink-MM: {time_spent_in_wait} seconds")
return time_spent_in_wait, stream_distrib_list

_LOG.warning("No configuration found for fink-mm -- not applied")
return 0, []


def main():
parser = argparse.ArgumentParser(description=__doc__)
args = getargs(parser)
Expand All @@ -63,28 +129,29 @@ def main():
log_level=args.spark_log_level,
)

# The level here should be controlled by an argument.
logger = get_fink_logger(spark.sparkContext.appName, args.log_level)

# debug statements
inspect_application(logger)

# data path
scitmpdatapath = args.online_data_prefix + "/science"
checkpointpath_kafka = args.online_data_prefix + "/kafka_checkpoint"

# Connect to the TMP science database
input_sci = scitmpdatapath + "/year={}/month={}/day={}".format(
args.night[0:4], args.night[4:6], args.night[6:8]
scitmpdatapath = args.online_data_prefix + "/science/{}".format(args.night)
checkpointpath_kafka = args.online_data_prefix + "/kafka_checkpoint/{}".format(
args.night
)
df = connect_to_raw_database(input_sci, input_sci, latestfirst=False)

# Drop partitioning columns
df = df.drop("year").drop("month").drop("day")
# Connect to the TMP science database
df = connect_to_raw_database(scitmpdatapath, scitmpdatapath, latestfirst=False)

# Cast fields to ease the distribution
cnames = df.columns
cnames[cnames.index("timestamp")] = "cast(timestamp as string) as timestamp"

if "brokerEndProcessTimestamp" in cnames:
cnames[cnames.index("brokerEndProcessTimestamp")] = (
"cast(brokerEndProcessTimestamp as string) as brokerEndProcessTimestamp"
)
cnames[cnames.index("brokerStartProcessTimestamp")] = (
"cast(brokerStartProcessTimestamp as string) as brokerStartProcessTimestamp"
)
cnames[cnames.index("brokerIngestTimestamp")] = (
"cast(brokerIngestTimestamp as string) as brokerIngestTimestamp"
)

cnames[cnames.index("cutoutScience")] = "struct(cutoutScience.*) as cutoutScience"
cnames[cnames.index("cutoutTemplate")] = (
"struct(cutoutTemplate.*) as cutoutTemplate"
Expand All @@ -96,6 +163,7 @@ def main():
"explode(array(prv_candidates)) as prv_candidates"
)
cnames[cnames.index("candidate")] = "struct(candidate.*) as candidate"

if not args.noscience:
# This column is added by the science pipeline
cnames[cnames.index("lc_features_g")] = (
Expand All @@ -105,12 +173,6 @@ def main():
"struct(lc_features_r.*) as lc_features_r"
)

# Extract schema
df_schema = spark.read.format("parquet").load(input_sci)
df_schema = df_schema.selectExpr(cnames)

schema = schema_converter.to_avro(df_schema.coalesce(1).limit(1).schema)

# Retrieve time-series information
to_expand = [
"jd",
Expand All @@ -136,6 +198,10 @@ def main():
df = df.withColumn("cstampDatac", df["cutoutScience.stampData"])

broker_list = args.distribution_servers
# username = args.kafka_sasl_username
# password = args.kafka_sasl_password
# kafka_buf_mem = args.kafka_buffer_memory
# kafka_timeout_ms = args.kafka_delivery_timeout_ms
for userfilter in userfilters:
# The topic name is the filter name
topicname = args.substream_prefix + userfilter.split(".")[-1] + "_ztf"
Expand All @@ -144,11 +210,15 @@ def main():
if args.noscience:
df_tmp = df
else:
df_tmp = apply_user_defined_filter(df, userfilter, logger)
df_tmp = apply_user_defined_filter(df, userfilter, _LOG)

# Wrap alert data
df_tmp = df_tmp.selectExpr(cnames)

# get schema from the streaming dataframe to
# avoid non-nullable bug #852
schema = schema_converter.to_avro(df_tmp.schema)

# Get the DataFrame for publishing to Kafka (avro serialized)
df_kafka = get_kafka_df(df_tmp, key=schema, elasticc=False)

Expand All @@ -158,17 +228,29 @@ def main():
.option("kafka.bootstrap.servers", broker_list)
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
# .option("kafka.sasl.username", username)
# .option("kafka.sasl.password", password)
# .option("kafka.buffer.memory", kafka_buf_mem)
# .option("kafka.delivery.timeout.ms", kafka_timeout_ms)
# .option("kafka.auto.create.topics.enable", True)
.option("topic", topicname)
.option("checkpointLocation", checkpointpath_kafka + topicname)
.option("checkpointLocation", checkpointpath_kafka + "/" + topicname)
.trigger(processingTime="{} seconds".format(args.tinterval))
.start()
)

time_spent_in_wait, stream_distrib_list = launch_fink_mm(spark, args)

# Keep the Streaming running until something or someone ends it!
if args.exit_after is not None:
time.sleep(args.exit_after)
remaining_time = args.exit_after - time_spent_in_wait
remaining_time = remaining_time if remaining_time > 0 else 0
time.sleep(remaining_time)
disquery.stop()
logger.info("Exiting the distribute service normally...")
if stream_distrib_list != []:
for stream in stream_distrib_list:
stream.stop()
_LOG.info("Exiting the distribute service normally...")
else:
# Wait for the end of queries
spark.streams.awaitAnyTermination()
Expand Down
Loading

0 comments on commit dbb8f5c

Please sign in to comment.