From 3f4e0cbb308838ff6ce21b0bd844ea01c7613a50 Mon Sep 17 00:00:00 2001 From: JulienPeloton Date: Wed, 29 Jan 2025 16:05:31 +0100 Subject: [PATCH] Sentinel should work --- bin/fink | 33 +++++++++++-------- bin/fink_test | 37 ++++++++++++++-------- bin/ztf/archive_science.py | 2 +- bin/ztf/distribute.py | 5 +-- conf/ztf/fink.conf.dev | 8 ++--- conf/ztf/fink.conf.prod | 4 +-- fink_broker/common/distribution_utils.py | 2 +- fink_broker/common/partitioning.py | 6 ++-- fink_broker/ztf/hbase_utils.py | 2 +- fink_broker/ztf/mm_utils.py | 24 +++++++------- fink_broker/ztf/science.py | 14 ++++---- fink_broker/ztf/test_schema_converter.py | 4 +-- fink_broker/ztf/tracklet_identification.py | 2 +- scheduler/ztf/launch_stream.sh | 4 +-- 14 files changed, 80 insertions(+), 67 deletions(-) diff --git a/bin/fink b/bin/fink index 31ffe167..656b0d91 100755 --- a/bin/fink +++ b/bin/fink @@ -201,6 +201,20 @@ if [[ $DEPLOY_FINK_PYTHON == "true" ]]; then cd - fi +# Only if the operator pass an index table +if [[ ${INDEXTABLE} ]]; then + INDEXTABLE="-index_table ${INDEXTABLE}" +else + INDEXTABLE="" +fi + +# Only if the operator pass the location of the TNS catalog +if [[ ${TNS_RAW_OUTPUT} ]]; then + TNS_RAW_OUTPUT="-tns_raw_output ${TNS_RAW_OUTPUT}" +else + TNS_RAW_OUTPUT="" +fi + DISTRIBUTE_OPTIONS="" if [[ $service == "distribute" ]]; then if [[ -f $conf_distribution ]]; then @@ -211,14 +225,16 @@ if [[ $service == "distribute" ]]; then source ${FINK_HOME}/conf/${SURVEY}/fink.conf.distribution fi - DISTRIBUTE_OPTIONS="--files ${FINK_HOME}/conf/fink_kafka_producer_jaas.conf --driver-java-options -Djava.security.auth.login.config=${FINK_PRODUCER_JAAS} --conf spark.driver.extraJavaOptions=-Djava.security.auth.login.config=${FINK_PRODUCER_JAAS} --conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=${FINK_PRODUCER_JAAS} --conf spark.executorEnv.KNWEBHOOK=${KNWEBHOOK} --conf spark.executorEnv.KNWEBHOOK_FINK=${KNWEBHOOK_FINK} --conf spark.executorEnv.KNWEBHOOK_AMA_CL=${KNWEBHOOK_AMA_CL} --conf spark.executorEnv.KNWEBHOOK_AMA_GALAXIES=${KNWEBHOOK_AMA_GALAXIES} --conf spark.executorEnv.KNWEBHOOK_DWF=${KNWEBHOOK_DWF} --conf spark.executorEnv.FINK_TG_TOKEN=${FINK_TG_TOKEN}" + DISTRIBUTE_OPTIONS_SPARK="--files ${FINK_HOME}/conf/fink_kafka_producer_jaas.conf --driver-java-options -Djava.security.auth.login.config=${FINK_PRODUCER_JAAS} --conf spark.driver.extraJavaOptions=-Djava.security.auth.login.config=${FINK_PRODUCER_JAAS} --conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=${FINK_PRODUCER_JAAS} --conf spark.executorEnv.KNWEBHOOK=${KNWEBHOOK} --conf spark.executorEnv.KNWEBHOOK_FINK=${KNWEBHOOK_FINK} --conf spark.executorEnv.KNWEBHOOK_AMA_CL=${KNWEBHOOK_AMA_CL} --conf spark.executorEnv.KNWEBHOOK_AMA_GALAXIES=${KNWEBHOOK_AMA_GALAXIES} --conf spark.executorEnv.KNWEBHOOK_DWF=${KNWEBHOOK_DWF} --conf spark.executorEnv.FINK_TG_TOKEN=${FINK_TG_TOKEN}" + DISTRIBUTE_OPTIONS="-distribution_servers ${DISTRIBUTION_SERVERS} -substream_prefix ${SUBSTREAM_PREFIX} -kafka_sasl_username ${KAFKA_SASL_USERNAME} -kafka_sasl_password ${KAFKA_SASL_PASSWORD} -kafka_buffer_memory ${KAFKA_BUFFER_MEMORY} -kafka_delivery_timeout_ms ${KAFKA_DELIVERY_TIMEOUT_MS}" fi spark-submit --master ${SPARK_MASTER} \ - --packages ${FINK_PACKAGES} ${PYTHON_EXTRA_FILE} \ + --packages ${FINK_PACKAGES} --jars ${FINK_JARS} ${PYTHON_EXTRA_FILE} \ ${EXTRA_SPARK_CONFIG} \ --driver-memory ${DRIVER_MEMORY} --executor-memory ${EXECUTOR_MEMORY} \ --conf spark.cores.max=${SPARK_CORE_MAX} --conf spark.executor.cores=${SPARK_EXECUTOR_CORES} \ + ${DISTRIBUTE_OPTIONS_SPARK} \ ${FINK_HOME}/bin/${SURVEY}/${service}.py ${HELP_ON_SERVICE} \ -producer ${SURVEY} \ -servers ${KAFKA_IPPORT} \ @@ -229,22 +245,13 @@ spark-submit --master ${SPARK_MASTER} \ -max_offsets_per_trigger ${MAX_OFFSETS_PER_TRIGGER} \ -online_data_prefix ${ONLINE_DATA_PREFIX} \ -agg_data_prefix ${AGG_DATA_PREFIX} \ - -distribution_servers ${DISTRIBUTION_SERVERS} \ - -distribution_schema ${DISTRIBUTION_SCHEMA} \ - -kafka_sasl_username ${KAFKA_SASL_USERNAME} \ - -kafka_sasl_password ${KAFKA_SASL_PASSWORD} \ - -kafka_buffer_memory ${KAFKA_BUFFER_MEMORY} \ - -kafka_delivery_timeout_ms ${KAFKA_DELIVERY_TIMEOUT_MS} \ - -substream_prefix ${SUBSTREAM_PREFIX} \ -tinterval ${FINK_TRIGGER_UPDATE} \ -mmconfigpath ${FINK_MM_CONFIG} \ -fink_fat_output ${FINK_FAT_OUTPUT} \ - -hostless_folder $HOSTLESS_FOLDER \ + -hostless_folder ${HOSTLESS_FOLDER} \ -science_db_name ${SCIENCE_DB_NAME} \ -science_db_catalogs ${SCIENCE_DB_CATALOGS} \ - -index_table ${INDEXTABLE} \ -tns_folder ${TNS_FOLDER} \ - -tns_raw_output ${TNS_RAW_OUTPUT} \ - ${DISTRIBUTE_OPTIONS} ${NOSCIENCE} ${TNS_SANDBOX} \ + ${DISTRIBUTE_OPTIONS} ${TNS_RAW_OUTPUT} ${INDEXTABLE} ${NOSCIENCE} \ -log_level ${LOG_LEVEL} ${EXIT_AFTER} diff --git a/bin/fink_test b/bin/fink_test index a8c692c8..840b28e5 100755 --- a/bin/fink_test +++ b/bin/fink_test @@ -110,17 +110,18 @@ fi # Stream integration tests if [[ "$WITH_STREAM" = true ]] ; then + RESOURCES="-driver-memory 2g -executor-memory 2g -spark-cores-max 2 -spark-executor-cores 1" # Fire another stream fink_simulator -c ${FINK_HOME}/conf/${SURVEY}/fink_alert_simulator.conf # Connect the service to build the raw database from the stream - fink start stream2raw -s ${SURVEY} --simulator -exit_after 60 -c $conf -topic $KAFKA_TOPIC -night $NIGHT + fink start stream2raw -s ${SURVEY} --simulator -exit_after 60 -c $conf -topic $KAFKA_TOPIC -night $NIGHT ${RESOURCES} # Connect the service to build the science database from the raw one - fink start raw2science -s ${SURVEY} -exit_after 120 -c $conf -night $NIGHT + fink start raw2science -s ${SURVEY} -exit_after 120 -c $conf -night $NIGHT ${RESOURCES} # Start the distribution service - fink start distribution -s ${SURVEY} -exit_after 60 -c $conf -night $NIGHT + fink start distribute -s ${SURVEY} -exit_after 60 -c $conf -night $NIGHT ${RESOURCES} # check we have all topics echo "TOPICS FOUND:" @@ -129,22 +130,23 @@ fi # DB Integration tests if [[ "$WITH_DB" = true ]] ; then + RESOURCES="-driver-memory 2g -executor-memory 2g -spark-cores-max 2 -spark-executor-cores 1" # merge data - fink start merge -s ${SURVEY} -c $conf -night $NIGHT + fink start merge -s ${SURVEY} -c $conf -night $NIGHT ${RESOURCES} # Update science portal - fink start archive_science -s ${SURVEY} -c $conf -night $NIGHT - fink start archive_index -s ${SURVEY} -c $conf -night $NIGHT -index_table pixel128_jd_objectId - fink start archive_index -s ${SURVEY} -c $conf -night $NIGHT -index_table class_jd_objectId - fink start archive_index -s ${SURVEY} -c $conf -night $NIGHT -index_table upper_objectId_jd - fink start archive_index -s ${SURVEY} -c $conf -night $NIGHT -index_table uppervalid_objectId_jd - fink start archive_index -s ${SURVEY} -c $conf -night $NIGHT -index_table ssnamenr_jd - fink start archive_index -s ${SURVEY} -c $conf -night $NIGHT -index_table tracklet_objectId + fink start archive_science -s ${SURVEY} -c $conf -night $NIGHT ${RESOURCES} + fink start archive_index -s ${SURVEY} -c $conf -night $NIGHT -index_table pixel128_jd_objectId ${RESOURCES} + fink start archive_index -s ${SURVEY} -c $conf -night $NIGHT -index_table class_jd_objectId ${RESOURCES} + fink start archive_index -s ${SURVEY} -c $conf -night $NIGHT -index_table upper_objectId_jd ${RESOURCES} + fink start archive_index -s ${SURVEY} -c $conf -night $NIGHT -index_table uppervalid_objectId_jd ${RESOURCES} + fink start archive_index -s ${SURVEY} -c $conf -night $NIGHT -index_table ssnamenr_jd ${RESOURCES} + fink start archive_index -s ${SURVEY} -c $conf -night $NIGHT -index_table tracklet_objectId ${RESOURCES} # SSO candidates - fink start archive_sso_cand -s ${SURVEY} -c $conf -night $NIGHT + fink start archive_sso_cand -s ${SURVEY} -c $conf -night $NIGHT ${RESOURCES} - fink start archive_statistics -s ${SURVEY} -c $conf -night $NIGHT + fink start archive_statistics -s ${SURVEY} -c $conf -night $NIGHT ${RESOURCES} fi @@ -157,7 +159,7 @@ fi if [[ "$WITH_UNITS" = true ]] ; then # Run the test suite on the modules assuming the integration # tests have been run (to build the databases) - for i in ${FINK_HOME}/fink_broker/*.py + for i in ${FINK_HOME}/fink_broker/${SURVEY}/*.py do if [[ ${i##*/} = 'monitoring' ]] ; then echo "skip {i}" @@ -167,6 +169,13 @@ if [[ "$WITH_UNITS" = true ]] ; then --rcfile ${FINK_HOME}/.coveragerc $i fi done + + for i in ${FINK_HOME}/fink_broker/common/*.py + do + coverage run \ + --source=${FINK_HOME} \ + --rcfile ${FINK_HOME}/.coveragerc $i + done fi # Combine individual reports in one diff --git a/bin/ztf/archive_science.py b/bin/ztf/archive_science.py index 12111587..fe7c9d91 100644 --- a/bin/ztf/archive_science.py +++ b/bin/ztf/archive_science.py @@ -30,7 +30,7 @@ from fink_broker.ztf.hbase_utils import push_full_df_to_hbase from fink_broker.common.spark_utils import list_hdfs_files -from fink_broker.logging_utils import get_fink_logger, inspect_application +from fink_broker.common.logging_utils import get_fink_logger, inspect_application def main(): diff --git a/bin/ztf/distribute.py b/bin/ztf/distribute.py index 49aa3f69..98d7e266 100644 --- a/bin/ztf/distribute.py +++ b/bin/ztf/distribute.py @@ -91,8 +91,9 @@ def push_to_kafka(df_in, topicname, cnames, checkpointpath_kafka, tinterval, kaf disquery = ( df_kafka.writeStream.format("kafka") .option("kafka.bootstrap.servers", kafka_cfg["bootstrap.servers"]) - .option("kafka.security.protocol", "SASL_PLAINTEXT") - .option("kafka.sasl.mechanism", "SCRAM-SHA-512") + .option("kafka.security.protocol", "PLAINTEXT") + # .option("kafka.security.protocol", "SASL_PLAINTEXT") + # .option("kafka.sasl.mechanism", "SCRAM-SHA-512") # .option("kafka.sasl.username", kafka_cfg["username"]) # .option("kafka.sasl.password", kafka_cfg["password"]) # .option("kafka.buffer.memory", kafka_cfg["kafka_buf_mem"]) diff --git a/conf/ztf/fink.conf.dev b/conf/ztf/fink.conf.dev index e683739e..461d1ba6 100644 --- a/conf/ztf/fink.conf.dev +++ b/conf/ztf/fink.conf.dev @@ -20,7 +20,7 @@ KAFKA_IPPORT_SIM="localhost:${KAFKA_PORT_SIM}" # Cluster mode - you need a Kafka cluster installed with write mode. # Must match the one used for the Producer -KAFKA_IPPORT="" +KAFKA_IPPORT="${KAFKA_IPPORT_SIM}" # From which offset you want to start pulling data. Options are: # latest (only new data), earliest (connect from the oldest @@ -45,8 +45,7 @@ org.slf4j:slf4j-log4j12:1.7.36,\ org.slf4j:slf4j-simple:1.7.36 # Other dependencies (incl. Scala part of Fink) -FINK_JARS=${FINK_HOME}/libs/fink-broker_2.11-1.2.jar,\ -${FINK_HOME}/libs/hbase-spark-hbase2.3.0_spark3.4.1_scala2.12.0_hadoop3.3.6.jar,\ +FINK_JARS=${FINK_HOME}/libs/hbase-spark-hbase2.3.0_spark3.4.1_scala2.12.0_hadoop3.3.6.jar,\ ${FINK_HOME}/libs/hbase-spark-protocol-shaded-hbase2.3.0_spark3.4.1_scala2.12.0_hadoop3.3.6.jar # Time interval between 2 trigger updates (second) @@ -94,8 +93,7 @@ FINK_MM_CONFIG=${FINK_HOME}/conf/ztf/fink_mm.conf # TNS configuration TNS_FOLDER=${FINK_HOME}/tns_logs -TNS_RAW_OUTPUT=/spark_mongo_tmp/julien.peloton -TNS_SANDBOX=false +TNS_RAW_OUTPUT="" # Hostless folder HOSTLESS_FOLDER=${FINK_HOME}/hostless_ids diff --git a/conf/ztf/fink.conf.prod b/conf/ztf/fink.conf.prod index b3635a3e..c758a99d 100644 --- a/conf/ztf/fink.conf.prod +++ b/conf/ztf/fink.conf.prod @@ -43,8 +43,7 @@ org.apache.spark:spark-avro_2.12:3.4.1,\ org.apache.hbase:hbase-shaded-mapreduce:2.2.7 # Other dependencies (incl. Scala part of Fink) -FINK_JARS=${FINK_HOME}/libs/fink-broker_2.11-1.2.jar,\ -${FINK_HOME}/libs/hbase-spark-hbase2.3.0_spark3.4.1_scala2.12.0_hadoop3.3.6.jar,\ +FINK_JARS={FINK_HOME}/libs/hbase-spark-hbase2.3.0_spark3.4.1_scala2.12.0_hadoop3.3.6.jar,\ ${FINK_HOME}/libs/hbase-spark-protocol-shaded-hbase2.3.0_spark3.4.1_scala2.12.0_hadoop3.3.6.jar # Time interval between 2 trigger updates (second) @@ -93,7 +92,6 @@ FINK_MM_CONFIG=${FINK_HOME}/conf/ztf/fink_mm.conf # TNS configuration TNS_FOLDER=${FINK_HOME}/tns_logs TNS_RAW_OUTPUT=/spark_mongo_tmp/julien.peloton -TNS_SANDBOX=false # Hostless folder HOSTLESS_FOLDER=${FINK_HOME}/hostless_ids \ No newline at end of file diff --git a/fink_broker/common/distribution_utils.py b/fink_broker/common/distribution_utils.py index afb6cb94..7bf8e5b1 100644 --- a/fink_broker/common/distribution_utils.py +++ b/fink_broker/common/distribution_utils.py @@ -19,7 +19,7 @@ from pyspark.sql.functions import struct, lit from pyspark.sql.avro.functions import to_avro as to_avro_native -from fink_broker.tester import spark_unit_tests +from fink_broker.common.tester import spark_unit_tests def get_kafka_df(df: DataFrame, key: str, elasticc: bool = False) -> DataFrame: diff --git a/fink_broker/common/partitioning.py b/fink_broker/common/partitioning.py index 94bf21b0..9b9ecef6 100644 --- a/fink_broker/common/partitioning.py +++ b/fink_broker/common/partitioning.py @@ -43,7 +43,7 @@ def convert_to_millitime(jd: pd.Series, format=None, now=None): Examples -------- - >>> from fink_broker.spark_utils import load_parquet_files + >>> from fink_broker.common.spark_utils import load_parquet_files >>> df = load_parquet_files("online/raw/20200101") >>> df = df.withColumn('millis', convert_to_millitime(df['candidate.jd'])) >>> pdf = df.select('millis').toPandas() @@ -98,7 +98,7 @@ def convert_to_datetime(jd: pd.Series, format=None) -> pd.Series: Examples -------- - >>> from fink_broker.spark_utils import load_parquet_files + >>> from fink_broker.common.spark_utils import load_parquet_files >>> df = load_parquet_files("online/raw/20200101") >>> df = df.withColumn('datetime', convert_to_datetime(df['candidate.jd'])) >>> pdf = df.select('datetime').toPandas() @@ -129,7 +129,7 @@ def compute_num_part(df, partition_size=128.0): Examples -------- - >>> from fink_broker.spark_utils import load_parquet_files + >>> from fink_broker.common.spark_utils import load_parquet_files >>> df = load_parquet_files("online/raw/20200101") >>> numpart = compute_num_part(df, partition_size=128.) >>> print(numpart) diff --git a/fink_broker/ztf/hbase_utils.py b/fink_broker/ztf/hbase_utils.py index 60fc27e7..3c1a1b06 100644 --- a/fink_broker/ztf/hbase_utils.py +++ b/fink_broker/ztf/hbase_utils.py @@ -28,7 +28,7 @@ from fink_science.t2.utilities import T2_COLS from fink_science.xmatch.utils import MANGROVE_COLS -from fink_broker.tester import spark_unit_tests +from fink_broker.common.tester import spark_unit_tests _LOG = logging.getLogger(__name__) diff --git a/fink_broker/ztf/mm_utils.py b/fink_broker/ztf/mm_utils.py index b83238fa..c39d493d 100644 --- a/fink_broker/ztf/mm_utils.py +++ b/fink_broker/ztf/mm_utils.py @@ -27,8 +27,8 @@ from astropy.time import Time from datetime import timedelta -from fink_broker.spark_utils import connect_to_raw_database, path_exist -from fink_broker.logging_utils import init_logger +from fink_broker.common.spark_utils import connect_to_raw_database, path_exist +from fink_broker.common.logging_utils import init_logger from pyspark.sql.streaming import StreamingQuery @@ -56,7 +56,7 @@ def distribute_launch_fink_mm(spark, args: dict) -> Tuple[int, List]: """ if args.mmconfigpath != "no-config": from fink_mm.init import get_config - from fink_broker.mm_utils import mm2distribute + from fink_broker.ztf.mm_utils import mm2distribute _LOG.info(f"Fink-MM configuration file: {args.mmconfigpath}") config = get_config({"--config": args.mmconfigpath}) @@ -71,14 +71,14 @@ def distribute_launch_fink_mm(spark, args: dict) -> Tuple[int, List]: # if there is gcn and ztf data if path_exist(mmtmpdatapath): t_before = time.time() - _LOG.info("starting mm2distribute in 300 seconds...") - time.sleep(300) + _LOG.info("starting mm2distribute in 30 seconds...") + time.sleep(30) stream_distrib_list = mm2distribute(spark, config, args) time_spent_in_wait += time.time() - t_before break - time_spent_in_wait += 300 - time.sleep(300) + time_spent_in_wait += 30 + time.sleep(30) if stream_distrib_list == []: _LOG.warning( f"{mmtmpdatapath} does not exist. mm2distribute could not start before the end of the job." @@ -114,7 +114,7 @@ def raw2science_launch_fink_mm( """ if args.mmconfigpath != "no-config": from fink_mm.init import get_config - from fink_broker.mm_utils import science2mm + from fink_broker.ztf.mm_utils import science2mm _LOG.info(f"Fink-MM configuration file: {args.mmconfigpath}") config = get_config({"--config": args.mmconfigpath}) @@ -131,15 +131,15 @@ def raw2science_launch_fink_mm( if path_exist(gcn_path) and path_exist(scitmpdatapath): # Start the GCN x ZTF cross-match stream t_before = time.time() - _LOG.info("starting science2mm in 300 seconds...") - time.sleep(300) + _LOG.info("starting science2mm in 30 seconds...") + time.sleep(30) countquery_mm = science2mm(args, config, gcn_path, scitmpdatapath) time_spent_in_wait += time.time() - t_before break else: # wait for comming GCN - time_spent_in_wait += 300 - time.sleep(300) + time_spent_in_wait += 30 + time.sleep(30) if countquery_mm is None: _LOG.warning("science2mm could not start before the end of the job.") diff --git a/fink_broker/ztf/science.py b/fink_broker/ztf/science.py index a1e36eb5..b95c3fc3 100644 --- a/fink_broker/ztf/science.py +++ b/fink_broker/ztf/science.py @@ -27,7 +27,7 @@ from fink_utils.spark.utils import concat_col -from fink_broker.tester import spark_unit_tests +from fink_broker.common.tester import spark_unit_tests # Import of science modules from fink_science.random_forest_snia.processor import rfscore_sigmoid_full @@ -89,7 +89,7 @@ def ang2pix(ra: pd.Series, dec: pd.Series, nside: pd.Series) -> pd.Series: Examples -------- - >>> from fink_broker.spark_utils import load_parquet_files + >>> from fink_broker.common.spark_utils import load_parquet_files >>> df = load_parquet_files(ztf_alert_sample) >>> df_index = df.withColumn( @@ -128,7 +128,7 @@ def ang2pix_array(ra: pd.Series, dec: pd.Series, nside: pd.Series) -> pd.Series: Examples -------- - >>> from fink_broker.spark_utils import load_parquet_files + >>> from fink_broker.common.spark_utils import load_parquet_files >>> df = load_parquet_files(ztf_alert_sample) >>> nsides = F.array([F.lit(256), F.lit(4096), F.lit(131072)]) @@ -202,8 +202,8 @@ def apply_science_modules(df: DataFrame, tns_raw_output: str = "") -> DataFrame: Examples -------- - >>> from fink_broker.spark_utils import load_parquet_files - >>> from fink_broker.logging_utils import get_fink_logger + >>> from fink_broker.common.spark_utils import load_parquet_files + >>> from fink_broker.common.logging_utils import get_fink_logger >>> logger = get_fink_logger('raw2cience_test', 'INFO') >>> _LOG = logging.getLogger(__name__) >>> df = load_parquet_files(ztf_alert_sample) @@ -478,8 +478,8 @@ def apply_science_modules_elasticc(df: DataFrame) -> DataFrame: Examples -------- - >>> from fink_broker.spark_utils import load_parquet_files - >>> from fink_broker.logging_utils import get_fink_logger + >>> from fink_broker.common.spark_utils import load_parquet_files + >>> from fink_broker.common.logging_utils import get_fink_logger >>> logger = get_fink_logger('raw2cience_elasticc_test', 'INFO') >>> _LOG = logging.getLogger(__name__) >>> df = load_parquet_files(elasticc_alert_sample) diff --git a/fink_broker/ztf/test_schema_converter.py b/fink_broker/ztf/test_schema_converter.py index b8c589ab..8c8eec26 100644 --- a/fink_broker/ztf/test_schema_converter.py +++ b/fink_broker/ztf/test_schema_converter.py @@ -20,9 +20,9 @@ from pyspark.sql import DataFrame -from fink_broker.avro_utils import readschemafromavrofile +from fink_broker.common.avro_utils import readschemafromavrofile from fink_utils.spark.schema_converter import to_avro -from fink_broker.spark_utils import connect_to_raw_database, init_sparksession +from fink_broker.common.spark_utils import connect_to_raw_database, init_sparksession _LOG = logging.getLogger(__name__) diff --git a/fink_broker/ztf/tracklet_identification.py b/fink_broker/ztf/tracklet_identification.py index 7c29acd9..6e6ee1ac 100644 --- a/fink_broker/ztf/tracklet_identification.py +++ b/fink_broker/ztf/tracklet_identification.py @@ -25,7 +25,7 @@ from astropy.coordinates import SkyCoord from astropy.time import Time -from fink_broker.tester import spark_unit_tests +from fink_broker.common.tester import spark_unit_tests def apply_tracklet_cuts(df: DataFrame) -> DataFrame: diff --git a/scheduler/ztf/launch_stream.sh b/scheduler/ztf/launch_stream.sh index 699dba08..308dd5b3 100755 --- a/scheduler/ztf/launch_stream.sh +++ b/scheduler/ztf/launch_stream.sh @@ -144,10 +144,10 @@ if [[ ! ${POLL_ONLY} ]] && [[ ! ${ENRICH_ONLY} ]]; then # difference between now and max end (8pm CEST) LEASETIME=$(( `date +'%s' -d ${STOP_AT}` - `date +'%s' -d 'now'` )) - nohup ${FINK_HOME}/bin/fink start distribution \ + nohup ${FINK_HOME}/bin/fink start distribute \ -s ztf \ -c ${FINK_HOME}/conf/fink.conf.prod \ - -conf_distribution ${FINK_HOME}/conf/ztf/fink.conf.distribution_cluster \ + -conf_distribution ${FINK_HOME}/conf/ztf/fink.conf.distribution \ -night ${NIGHT} \ -driver-memory 4g -executor-memory 2g \ -spark-cores-max 4 -spark-executor-cores 1 \