diff --git a/initial_conf.py b/initial_conf.py index 1d395de9..55f57eac 100644 --- a/initial_conf.py +++ b/initial_conf.py @@ -1,7 +1,8 @@ -import pandas as pd -from getpass import getpass import argparse +from getpass import getpass + +import pandas as pd def initial_conf(save_user=True, replace_user=False, global_config_flag=True): diff --git a/ruff.toml b/ruff.toml index 6510f6fe..ad76a33e 100644 --- a/ruff.toml +++ b/ruff.toml @@ -18,3 +18,5 @@ ignore = [ "E501" ] +[lint.per-file-ignores] +"__init__.py" = ["F401"] diff --git a/scripts/conf_file_finding.py b/scripts/conf_file_finding.py index 4d1d9fd4..26dc0be0 100644 --- a/scripts/conf_file_finding.py +++ b/scripts/conf_file_finding.py @@ -2,6 +2,7 @@ import os import pathlib + def chdir_to_root(): root_dir_found = 0 diff --git a/scripts/create_schemas.py b/scripts/create_schemas.py index 1e05e21d..e69de29b 100644 --- a/scripts/create_schemas.py +++ b/scripts/create_schemas.py @@ -1 +0,0 @@ -from u19_pipeline import lab, reference, subject, task, action, acquisition, behavior, puffs diff --git a/scripts/drop_schemas.py b/scripts/drop_schemas.py index 2cccf92e..b5047991 100644 --- a/scripts/drop_schemas.py +++ b/scripts/drop_schemas.py @@ -1,6 +1,18 @@ -from u19_pipeline import lab, subject, acquisition, behavior, task, reference, recording, recording_process, ephys_pipeline, imaging_pipeline, ephys_sync -from u19_pipeline.imaging_pipeline import scan_element, imaging_element -from u19_pipeline.ephys_pipeline import probe_element, ephys_element +from u19_pipeline import ( + acquisition, + behavior, + ephys_pipeline, + ephys_sync, + imaging_pipeline, + lab, + recording, + recording_process, + reference, + subject, + task, +) +from u19_pipeline.ephys_pipeline import ephys_element, probe_element +from u19_pipeline.imaging_pipeline import imaging_element, scan_element behavior.schema.drop() recording_process.schema.drop() diff --git a/scripts/ingest_hdf5_to_db.py b/scripts/ingest_hdf5_to_db.py index 4ec21b9e..fba4fa53 100644 --- a/scripts/ingest_hdf5_to_db.py +++ b/scripts/ingest_hdf5_to_db.py @@ -4,17 +4,20 @@ # Folder structure for hdf5 files on bucket will be: # /jukebox/braininit/puffs/{netid}/{project_name}/{cohort}/{rig}/{hdf5_filename} +import glob +import json +import os +import sys +from datetime import timedelta + import datajoint as dj -import os, sys, glob, json -import pandas as pd import numpy as np -import time -from datetime import datetime, timedelta +import pandas as pd dj.config['database.host'] = 'datajoint00.pni.princeton.edu' # load dj creds from file credfile = '/jukebox/wang/ahoag/.djenv' -with open(credfile,'r') as infile: +with open(credfile) as infile: cred_dict = json.load(infile) dj.config['database.user'] = cred_dict.get('DJ_DB_USER') @@ -314,18 +317,9 @@ this_session_notes_key = f'sessions/{session_compressed_str}/notes' if this_session_notes_key in stored_session_keys: session_notes_dict = json.loads(data[this_session_notes_key].iloc[0]) - if 'notes' in session_notes_dict: - session_notes = session_notes_dict['notes'] - else: - session_notes = None - if 'stdout' in session_notes_dict: - session_stdout = session_notes_dict['stdout'] - else: - session_stdout = None - if 'stderr' in session_notes_dict: - session_stderr = session_notes_dict['stderr'] - else: - session_stderr = None + session_notes = session_notes_dict.get('notes', None) + session_stdout = session_notes_dict.get('stdout', None) + session_stderr = session_notes_dict.get('stderr', None) else: session_notes = None session_stdout = None diff --git a/scripts/populate_imaging.py b/scripts/populate_imaging.py index 14dead71..ce6e5ae1 100644 --- a/scripts/populate_imaging.py +++ b/scripts/populate_imaging.py @@ -1,5 +1,4 @@ -from u19_pipeline_python import lab, reference, subject, task, action, acquisition, imaging - +from u19_pipeline_python import acquisition kargs = dict(supress_errors=True, display_progress=True) acquisition.Scan.populate(**kargs) diff --git a/setup.py b/setup.py index ef48958a..521bd0fe 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,8 @@ #!/usr/bin/env python -from setuptools import setup, find_packages from os import path +from setuptools import find_packages, setup + here = path.abspath(path.dirname(__file__)) with open(path.join(here, 'requirements.txt')) as f: requirements = f.read().splitlines() diff --git a/u19_pipeline/__init__.py b/u19_pipeline/__init__.py index 79f1ef59..4c64b43a 100644 --- a/u19_pipeline/__init__.py +++ b/u19_pipeline/__init__.py @@ -1,3 +1,4 @@ -from os.path import join, dirname import os -import datajoint as dj \ No newline at end of file +from os.path import dirname, join + +import datajoint as dj diff --git a/u19_pipeline/acquisition.py b/u19_pipeline/acquisition.py index af69bc37..9349ce1b 100644 --- a/u19_pipeline/acquisition.py +++ b/u19_pipeline/acquisition.py @@ -1,5 +1,4 @@ import datajoint as dj -from u19_pipeline import lab, task, subject schema = dj.schema(dj.config['custom']['database.prefix'] + 'acquisition') diff --git a/u19_pipeline/action.py b/u19_pipeline/action.py index c9ecf403..7d25a3cf 100644 --- a/u19_pipeline/action.py +++ b/u19_pipeline/action.py @@ -2,7 +2,6 @@ import datajoint as dj -from . import lab, reference, subject schema = dj.schema(dj.config['custom']['database.prefix'] + 'action') diff --git a/u19_pipeline/alert_system/alert_code_skeleton.py b/u19_pipeline/alert_system/alert_code_skeleton.py index f5d262b5..fab6feb9 100644 --- a/u19_pipeline/alert_system/alert_code_skeleton.py +++ b/u19_pipeline/alert_system/alert_code_skeleton.py @@ -1,11 +1,5 @@ -import pandas as pd -import datajoint as dj -import datetime -import numpy as np -import u19_pipeline.alert_system.behavior_metrics as bm -import u19_pipeline.alert_system.alert_system_utility as asu # Slack Configuration dictionary diff --git a/u19_pipeline/alert_system/alert_system_utility.py b/u19_pipeline/alert_system/alert_system_utility.py index bd65016c..5cd5ea65 100644 --- a/u19_pipeline/alert_system/alert_system_utility.py +++ b/u19_pipeline/alert_system/alert_system_utility.py @@ -1,11 +1,13 @@ -import pandas as pd -import datajoint as dj import datetime +import datajoint as dj +import pandas as pd + import u19_pipeline.utils.dj_shortcuts as djs + def get_acquisition_data_alert_system(type='subject_fullname', data_days=60, min_sessions=20): ''' Get and filter data for alert system diff --git a/u19_pipeline/alert_system/behavior_metrics.py b/u19_pipeline/alert_system/behavior_metrics.py index 4d365702..2c259b5c 100644 --- a/u19_pipeline/alert_system/behavior_metrics.py +++ b/u19_pipeline/alert_system/behavior_metrics.py @@ -1,9 +1,7 @@ -import pandas as pd -import datajoint as dj -class BehaviorMetrics(): +class BehaviorMetrics: @staticmethod def get_bias_from_trial_df(trials_df, return_all_metrics=False): diff --git a/u19_pipeline/alert_system/cronjob_alert.py b/u19_pipeline/alert_system/cronjob_alert.py index 34461ba1..e9089586 100644 --- a/u19_pipeline/alert_system/cronjob_alert.py +++ b/u19_pipeline/alert_system/cronjob_alert.py @@ -1,12 +1,13 @@ import time + +import u19_pipeline.alert_system.main_alert_system as mas from scripts.conf_file_finding import try_find_conf_file + try_find_conf_file() time.sleep(1) -import datajoint as dj -import u19_pipeline.alert_system.main_alert_system as mas mas.main_alert_system() diff --git a/u19_pipeline/alert_system/custom_alerts/__init__.py b/u19_pipeline/alert_system/custom_alerts/__init__.py index 2f5c78e4..eb1f6e67 100644 --- a/u19_pipeline/alert_system/custom_alerts/__init__.py +++ b/u19_pipeline/alert_system/custom_alerts/__init__.py @@ -1,2 +1,3 @@ import pkgutil + __path__ = pkgutil.extend_path(__path__, __name__) \ No newline at end of file diff --git a/u19_pipeline/alert_system/custom_alerts/braininit_storage.py b/u19_pipeline/alert_system/custom_alerts/braininit_storage.py index 82c7ea42..609e433b 100644 --- a/u19_pipeline/alert_system/custom_alerts/braininit_storage.py +++ b/u19_pipeline/alert_system/custom_alerts/braininit_storage.py @@ -1,14 +1,9 @@ -import pandas as pd -import datajoint as dj - -import u19_pipeline.lab as lab - -import u19_pipeline.alert_system.behavior_metrics as bm -import u19_pipeline.alert_system.alert_system_utility as asu -import os import subprocess +import datajoint as dj +import pandas as pd + # Slack Configuration dictionary slack_configuration_dictionary = { 'slack_notification_channel': ['custom_alerts'], diff --git a/u19_pipeline/alert_system/custom_alerts/rig_bias.py b/u19_pipeline/alert_system/custom_alerts/rig_bias.py index 2f883542..bffb7404 100644 --- a/u19_pipeline/alert_system/custom_alerts/rig_bias.py +++ b/u19_pipeline/alert_system/custom_alerts/rig_bias.py @@ -1,12 +1,12 @@ -import pandas as pd -import datajoint as dj import datetime + +import datajoint as dj import numpy as np +import pandas as pd -import u19_pipeline.alert_system.behavior_metrics as bm import u19_pipeline.alert_system.alert_system_utility as asu - +import u19_pipeline.alert_system.behavior_metrics as bm # Slack Configuration dictionary slack_configuration_dictionary = { diff --git a/u19_pipeline/alert_system/custom_alerts/rig_trial.py b/u19_pipeline/alert_system/custom_alerts/rig_trial.py index 9f37778d..a662cef0 100644 --- a/u19_pipeline/alert_system/custom_alerts/rig_trial.py +++ b/u19_pipeline/alert_system/custom_alerts/rig_trial.py @@ -1,12 +1,10 @@ -import pandas as pd -import datajoint as dj import datetime + import numpy as np -import u19_pipeline.alert_system.behavior_metrics as bm import u19_pipeline.alert_system.alert_system_utility as asu - +import u19_pipeline.alert_system.behavior_metrics as bm # Slack Configuration dictionary slack_configuration_dictionary = { diff --git a/u19_pipeline/alert_system/custom_alerts/subject_bias.py b/u19_pipeline/alert_system/custom_alerts/subject_bias.py index 43c7a0b3..3a6037da 100644 --- a/u19_pipeline/alert_system/custom_alerts/subject_bias.py +++ b/u19_pipeline/alert_system/custom_alerts/subject_bias.py @@ -1,12 +1,12 @@ -import pandas as pd -import datajoint as dj import datetime + +import datajoint as dj import numpy as np +import pandas as pd -import u19_pipeline.alert_system.behavior_metrics as bm import u19_pipeline.alert_system.alert_system_utility as asu - +import u19_pipeline.alert_system.behavior_metrics as bm # Slack Configuration dictionary slack_configuration_dictionary = { diff --git a/u19_pipeline/alert_system/custom_alerts/subject_trial.py b/u19_pipeline/alert_system/custom_alerts/subject_trial.py index d69c4366..b11620df 100644 --- a/u19_pipeline/alert_system/custom_alerts/subject_trial.py +++ b/u19_pipeline/alert_system/custom_alerts/subject_trial.py @@ -1,12 +1,10 @@ -import pandas as pd -import datajoint as dj import datetime + import numpy as np -import u19_pipeline.alert_system.behavior_metrics as bm import u19_pipeline.alert_system.alert_system_utility as asu - +import u19_pipeline.alert_system.behavior_metrics as bm # Slack Configuration dictionary slack_configuration_dictionary = { diff --git a/u19_pipeline/alert_system/main_alert_system.py b/u19_pipeline/alert_system/main_alert_system.py index 4b45b304..5a3a6fcb 100644 --- a/u19_pipeline/alert_system/main_alert_system.py +++ b/u19_pipeline/alert_system/main_alert_system.py @@ -1,17 +1,14 @@ -import pandas as pd -import datajoint as dj -import pkgutil -import importlib import datetime +import importlib +import pkgutil +import time import traceback import u19_pipeline.alert_system.custom_alerts as ca import u19_pipeline.lab as lab import u19_pipeline.utils.slack_utils as su -import time - # Slack Configuration dictionary slack_configuration_dictionary = { 'slack_notification_channel': ['custom_alerts'] @@ -88,7 +85,7 @@ def slack_alert_message_format(alert_dictionaty, alert_module_name): m2_1["type"] = "mrkdwn" m2_1["text"] = '' - for key in alert_dictionaty.keys(): + for key in alert_dictionaty: m2_1["text"] += '*' + key + '* : ' + str(alert_dictionaty[key]) + '\n' m2['text'] = m2_1 diff --git a/u19_pipeline/automatic_job/clusters_paths_and_transfers.py b/u19_pipeline/automatic_job/clusters_paths_and_transfers.py index 0c4a1ae8..5959b2fe 100644 --- a/u19_pipeline/automatic_job/clusters_paths_and_transfers.py +++ b/u19_pipeline/automatic_job/clusters_paths_and_transfers.py @@ -1,15 +1,13 @@ -import datajoint as dj -import pathlib -import subprocess import json -import re import os +import pathlib +import subprocess import time -from datetime import datetime -from element_interface.utils import dict_to_uuid +import datajoint as dj import u19_pipeline.automatic_job.params_config as config + #Functions to transfer files (globus, scp, smbclient) #FOR PNI endpoint @@ -166,7 +164,7 @@ def request_globus_transfer(job_id_str, source_ep, dest_ep, source_filepath, des dict_output = json.loads(p.stdout.decode('UTF-8')) transfer_request['status'] = config.system_process['SUCCESS'] transfer_request['task_id'] = dict_output['task_id'] - except Exception as e: + except Exception: print('stdout is not a valid json, probably an error') transfer_request['status'] = config.system_process['ERROR'] transfer_request['error_info'] = p.stdout.decode('UTF-8') @@ -271,7 +269,7 @@ def get_error_log_str(recording_process_id): log_file_local_path = pathlib.Path(local_log_file_dir,log_filename).as_posix() if os.path.exists(log_file_local_path): - with open(log_file_local_path, 'r') as error_log_file: + with open(log_file_local_path) as error_log_file: error_log_data = ' '.join(error_log_file.readlines()) error_log_data = error_log_data.replace("activate the default environment with 'conda activate' or create a new environment to customize with 'conda create'.\n",'') diff --git a/u19_pipeline/automatic_job/create_missing_lfps_script.py b/u19_pipeline/automatic_job/create_missing_lfps_script.py index 240930fb..a23be741 100644 --- a/u19_pipeline/automatic_job/create_missing_lfps_script.py +++ b/u19_pipeline/automatic_job/create_missing_lfps_script.py @@ -1,11 +1,11 @@ import pathlib import sys -import datajoint as dj -from u19_pipeline import recording, ephys_pipeline, recording, recording_process +import datajoint as dj import u19_pipeline.automatic_job.params_config as config +from u19_pipeline import ephys_pipeline, recording, recording_process def main(recording_id): diff --git a/u19_pipeline/automatic_job/cronjob_automatic_job.py b/u19_pipeline/automatic_job/cronjob_automatic_job.py index 46396862..63254415 100644 --- a/u19_pipeline/automatic_job/cronjob_automatic_job.py +++ b/u19_pipeline/automatic_job/cronjob_automatic_job.py @@ -1,12 +1,14 @@ import time + +import u19_pipeline.automatic_job.recording_handler as rec_handler +import u19_pipeline.automatic_job.recording_process_handler as rec_process_handler from scripts.conf_file_finding import try_find_conf_file + try_find_conf_file() time.sleep(1) -import u19_pipeline.automatic_job.recording_handler as rec_handler -import u19_pipeline.automatic_job.recording_process_handler as rec_process_handler # Check recordings and then jobs rec_handler.RecordingHandler.pipeline_handler_main() diff --git a/u19_pipeline/automatic_job/ephys_element_ingest.py b/u19_pipeline/automatic_job/ephys_element_ingest.py index dda0cbf1..1f764b38 100644 --- a/u19_pipeline/automatic_job/ephys_element_ingest.py +++ b/u19_pipeline/automatic_job/ephys_element_ingest.py @@ -1,13 +1,17 @@ -import re import pathlib -from u19_pipeline import ephys_pipeline +import re -from u19_pipeline.ephys_pipeline import (probe_element, ephys_element, - get_ephys_root_data_dir, get_session_directory) from element_array_ephys.readers import spikeglx - from element_interface.utils import find_full_path +from u19_pipeline import ephys_pipeline +from u19_pipeline.ephys_pipeline import ( + ephys_element, + get_ephys_root_data_dir, + get_session_directory, + probe_element, +) + """ The ingestion routine for imaging element includes: diff --git a/u19_pipeline/automatic_job/ephys_element_populate.py b/u19_pipeline/automatic_job/ephys_element_populate.py index 40ae3744..59ef2644 100644 --- a/u19_pipeline/automatic_job/ephys_element_populate.py +++ b/u19_pipeline/automatic_job/ephys_element_populate.py @@ -1,7 +1,7 @@ -from u19_pipeline.ephys_pipeline import probe_element, ephys_element +import u19_pipeline.automatic_job.params_config as config from u19_pipeline import recording, recording_process +from u19_pipeline.ephys_pipeline import ephys_element, probe_element -import u19_pipeline.automatic_job.params_config as config def populate_element_data(job_id, display_progress=True, reserve_jobs=False, suppress_errors=False): @@ -33,10 +33,7 @@ def populate_element_data(job_id, display_progress=True, reserve_jobs=False, sup dict(paramset_idx=paramset_idx)).fetch1( 'clustering_method') - if len(precluster_paramsets)==0: - task_mode = 'none' - else: - task_mode = 'load' + task_mode = 'none' if len(precluster_paramsets) == 0 else 'load' precluster_key = dict(recording_id=process_key['recording_id'], insertion_number=fragment_number, diff --git a/u19_pipeline/automatic_job/imaging_element.py b/u19_pipeline/automatic_job/imaging_element.py index 57095fcc..093c0777 100644 --- a/u19_pipeline/automatic_job/imaging_element.py +++ b/u19_pipeline/automatic_job/imaging_element.py @@ -1,20 +1,19 @@ -from concurrent.futures import process -import os -import datajoint as dj -import numpy as np -from u19_pipeline import lab, imaging_rec, acquisition, subject, recording -from u19_pipeline.imaging_element import imaging_element, scan_element, \ - get_processed_dir, Equipment,\ - get_imaging_root_data_dir, \ - get_scan_image_files -from u19_pipeline.ingest.imaging_element_ingest import process_scan -from element_interface.scanimage_utils import get_scanimage_acq_time, parse_scanimage_header -from element_interface.utils import find_full_path -import scanreader +import os import pathlib + +import scanreader import tifffile -import datetime -import h5py +from element_interface.scanimage_utils import ( + parse_scanimage_header, +) + +from u19_pipeline import imaging_rec, lab, recording +from u19_pipeline.imaging_element import ( + get_imaging_root_data_dir, + get_scan_image_files, + imaging_element, + scan_element, +) # subject_fullname = 'koay_K65' # session_date = '2018-02-02' @@ -25,7 +24,7 @@ acq_software = 'ScanImage' #recording_id = os.environ['recording_id'] -recording_process_id = os.environ['recording_process_id'] +recording_process_id = os.environ['RECORDING_PROCESS_ID'] #process_method = os.environ['process_method'] #paramset_idx = os.environ['paramset_idx'] @@ -170,13 +169,13 @@ loaded_scan = scanreader.read_scan(scan_filepaths) header = parse_scanimage_header(loaded_scan) #scanner = header['SI_imagingSystem'].strip('\'') #TODO: If using tiffile, hardcode it to `mesoscope` - except Exception as e: + except Exception: print('LOADED Scan using Tifffile') scan_filepaths = scan_filepaths # TODO load all TIFF files from session possibly using TIFFSequence loaded_scan = tifffile.imread(scan_filepaths) #scanner = 'mesoscope' except: #TODO: Use except instead of else) - print(f'ScanImage loading error') #TODO: Modify the error message + print('ScanImage loading error') #TODO: Modify the error message #Equipment.insert1({'scanner': scanner}, skip_duplicates=True) scan_element.Scan.insert1( @@ -236,7 +235,7 @@ iscell_fp = plane_filepath / 'iscell.npy' if not ops_fp.exists() or not iscell_fp.exists(): raise FileNotFoundError( - 'No "ops.npy" or "iscell.npy" found. Invalid suite2p plane folder: {}'.format(plane_filepath)) + f'No "ops.npy" or "iscell.npy" found. Invalid suite2p plane folder: {plane_filepath}') elif processing_method == 'caiman': raise ValueError("caiman not supported yet") diff --git a/u19_pipeline/automatic_job/imaging_element_populate.py b/u19_pipeline/automatic_job/imaging_element_populate.py index 31f88154..60324652 100644 --- a/u19_pipeline/automatic_job/imaging_element_populate.py +++ b/u19_pipeline/automatic_job/imaging_element_populate.py @@ -1,9 +1,10 @@ -from u19_pipeline import recording, recording_process -from u19_pipeline.imaging_pipeline import imaging_element import pathlib import warnings import u19_pipeline.automatic_job.params_config as config +from u19_pipeline import recording, recording_process +from u19_pipeline.imaging_pipeline import imaging_element + def populate_element_data(job_id, display_progress=True, reserve_jobs=False, suppress_errors=False): @@ -38,10 +39,7 @@ def populate_element_data(job_id, display_progress=True, reserve_jobs=False, sup dict(paramset_idx=paramset_idx)).fetch1( 'processing_method') - if len(preprocess_paramsets)==0: - preprocess_task_mode = 'none' - else: - preprocess_task_mode = 'load' + preprocess_task_mode = 'none' if len(preprocess_paramsets) == 0 else 'load' preprocess_key = dict(recording_id=process_key['recording_id'], tiff_split=fragment_number, diff --git a/u19_pipeline/automatic_job/parameter_file_creator.py b/u19_pipeline/automatic_job/parameter_file_creator.py index e05e2ecc..ba0ac75a 100644 --- a/u19_pipeline/automatic_job/parameter_file_creator.py +++ b/u19_pipeline/automatic_job/parameter_file_creator.py @@ -2,14 +2,14 @@ #import os #import pathlib -import subprocess -import pathlib import json -import re +import pathlib + +from scipy.io import savemat + import u19_pipeline.automatic_job.clusters_paths_and_transfers as ft -import u19_pipeline.automatic_job.params_config as config +import u19_pipeline.automatic_job.params_config as config from u19_pipeline.utils.file_utils import write_file -from scipy.io import savemat # Functions to create parameter files and send them diff --git a/u19_pipeline/automatic_job/params_config.py b/u19_pipeline/automatic_job/params_config.py index 1140e898..b60f8d5c 100644 --- a/u19_pipeline/automatic_job/params_config.py +++ b/u19_pipeline/automatic_job/params_config.py @@ -1,11 +1,11 @@ -import pandas as pd -import numpy as np import os import pathlib -from scripts.conf_file_finding import get_root_directory +import pandas as pd + import u19_pipeline.lab as lab +from scripts.conf_file_finding import get_root_directory #Dictionary with main configuration for each modality (ephys or imaging) recording_modality_dict = [ diff --git a/u19_pipeline/automatic_job/pupillometry_check_handler_script.py b/u19_pipeline/automatic_job/pupillometry_check_handler_script.py index 31e2bd61..476dda40 100644 --- a/u19_pipeline/automatic_job/pupillometry_check_handler_script.py +++ b/u19_pipeline/automatic_job/pupillometry_check_handler_script.py @@ -1,11 +1,10 @@ - import time +import u19_pipeline.automatic_job.pupillometry_handler as ph from scripts.conf_file_finding import try_find_conf_file + try_find_conf_file() time.sleep(1) -import datajoint as dj -import u19_pipeline.automatic_job.pupillometry_handler as ph ph.PupillometryProcessingHandler.check_processed_pupillometry_sessions() diff --git a/u19_pipeline/automatic_job/pupillometry_handler.py b/u19_pipeline/automatic_job/pupillometry_handler.py index 36f8d8b7..49d963eb 100644 --- a/u19_pipeline/automatic_job/pupillometry_handler.py +++ b/u19_pipeline/automatic_job/pupillometry_handler.py @@ -1,28 +1,26 @@ -import datajoint as dj -import pathlib -import deeplabcut -import pandas as pd -import numpy as np +import copy import glob -import subprocess -import re import os +import pathlib +import re +import subprocess import sys -import copy import traceback -from skimage.measure import EllipseModel -from skimage.draw import ellipse_perimeter -from scipy import stats +import datajoint as dj +import deeplabcut +import numpy as np +import pandas as pd +from scipy import stats +from skimage.measure import EllipseModel -import u19_pipeline.utils.slack_utils as slack_utils -import u19_pipeline.automatic_job.slurm_creator as slurmlib import u19_pipeline.acquisition as acquisition -import u19_pipeline.pupillometry as pupillometry import u19_pipeline.automatic_job.params_config as config -from u19_pipeline.automatic_job import recording_handler +import u19_pipeline.automatic_job.slurm_creator as slurmlib +import u19_pipeline.pupillometry as pupillometry +import u19_pipeline.utils.slack_utils as slack_utils from u19_pipeline.utils.file_utils import write_file -import u19_pipeline.automatic_job.clusters_paths_and_transfers as ft + def pupillometry_exception_handler(func): def inner_function(*args, **kwargs): @@ -43,15 +41,15 @@ def inner_function(*args, **kwargs): return (config.RECORDING_STATUS_ERROR_ID, update_value_dict) return inner_function -class PupillometryProcessingHandler(): +class PupillometryProcessingHandler: spock_home_dir = '/mnt/cup/braininit/Shared/repos/U19-pipeline_python/' spock_log_dir = spock_home_dir + "u19_pipeline/automatic_job/OutputLog/" process_script_path = spock_home_dir + "u19_pipeline/automatic_job/pupillometry_handler.py" - spock_error_dir = spock_home_dir + "u19_pipeline/automatic_job/ErrorLog/" + spock_error_dir = spock_home_dir + "u19_pipeline/automatic_job/ErrorLog/" spock_slurm_filepath = spock_home_dir + "u19_pipeline/" spock_system_name = 'spockmk2-loginvm.pni.princeton.edu' - + pupillometry_slurm_filepath = os.path.abspath(os.path.realpath(__file__)+ "/../") #pupillometry_slurm_filepath = 'u19_pipeline/' @@ -88,14 +86,14 @@ def generate_slurm_pupillometry(slurm_dict): python ${process_script_path} ${video_dir} ${model_dir} ${output_dir} #python ${process_script_path} ${recording_process_id} ''' - - return slurm_text + + return slurm_text @staticmethod def create_pupillometry_slurm_params_file(slurm_dict): text_dict = '' - for slurm_param in slurm_dict.keys(): + for slurm_param in slurm_dict: if isinstance(slurm_dict[slurm_param], list): for list_param in slurm_dict[slurm_param]: @@ -104,7 +102,7 @@ def create_pupillometry_slurm_params_file(slurm_dict): text_dict += '#SBATCH --' + str(slurm_param) + '=' + str(slurm_dict[slurm_param]) + '\n' return text_dict - + @staticmethod def generate_slurm_file(video_dir): ''' @@ -127,7 +125,7 @@ def generate_slurm_file(video_dir): print('slurm_dict', slurm_dict) slurm_text = PupillometryProcessingHandler.generate_slurm_pupillometry(slurm_dict) - + slurm_file_local_path = str(pathlib.Path(PupillometryProcessingHandler.pupillometry_slurm_filepath, PupillometryProcessingHandler.pupillometry_slurm_filename)) @@ -144,7 +142,7 @@ def generate_slurm_file(video_dir): print(status) print(slurm_destination) - + return status, slurm_destination @staticmethod @@ -153,7 +151,7 @@ def queue_pupillometry_slurm_file(video_dir, model_dir, repository_dir, output_d id_slurm_job = -1 #Get all associated variables given the selected processing cluster - command = ['ssh', 'u19prod@'+PupillometryProcessingHandler.spock_system_name, 'sbatch', + command = ['ssh', 'u19prod@'+PupillometryProcessingHandler.spock_system_name, 'sbatch', "--export=video_dir='"+str(video_dir)+ "',model_dir='"+str(model_dir)+ "',repository_dir='"+str(repository_dir)+ @@ -198,7 +196,6 @@ def transfer_pupillometry_slurm_file(slurm_file_local_path, slurm_destination): transfer_status = p.wait() return transfer_status - return status @staticmethod def getPupilDiameter(analyzedVideoDataPath): @@ -239,9 +236,9 @@ def getPupilDiameter(analyzedVideoDataPath): outlierFlags = outlierFlags.rename(columns={outlierFlags.columns[0]: "OutlierFlag"}) # Concatenate outlier flags array to remove outliers from pupil diameter array temp = pd.concat([df, outlierFlags], axis=1) - temp.loc[temp['OutlierFlag']==True, 'PupilDiameter'] = None + temp.loc[temp['OutlierFlag'] is True, 'PupilDiameter'] = None pupilDiameter = temp['PupilDiameter'] - + return pupilDiameter.to_numpy() @staticmethod @@ -255,16 +252,16 @@ def analyze_videos_pupillometry(configPath, videoPath, output_dir): @pupillometry_exception_handler def check_pupillometry_sessions_queue(): - status_update = config.status_update_idx['NO_CHANGE'] + config.status_update_idx['NO_CHANGE'] update_value_dict = copy.deepcopy(config.default_update_value_dict) - sessions_missing_process = (acquisition.SessionVideo * + sessions_missing_process = (acquisition.SessionVideo * pupillometry.PupillometrySessionModelData & 'pupillometry_job_id is NULL').fetch(as_dict=True) - + print(sessions_missing_process) for pupillometry_2_process in sessions_missing_process: - + # If error, job id = -1 key_insert = dict((k, pupillometry_2_process[k]) for k in ('subject_fullname', 'session_date', 'session_number', 'model_id')) key_insert['pupillometry_job_id'] = -1 @@ -300,7 +297,7 @@ def check_pupillometry_sessions_queue(): # Error handling (generating slurm file) if status != config.system_process['SUCCESS']: - status_update = config.status_update_idx['ERROR_STATUS'] + config.status_update_idx['ERROR_STATUS'] update_value_dict['error_info']['error_message'] = 'Error while generating/transfering pupillometry slurm file' pupillometry.PupillometrySessionModelData.update1(key_insert) @@ -309,15 +306,15 @@ def check_pupillometry_sessions_queue(): #return (status_update, update_value_dict) continue - + # Queue slurm file in spock status, slurm_jobid, error_message = PupillometryProcessingHandler.queue_pupillometry_slurm_file( - videoPath, model_path, PupillometryProcessingHandler.spock_home_dir, output_dir, + videoPath, model_path, PupillometryProcessingHandler.spock_home_dir, output_dir, PupillometryProcessingHandler.process_script_path, slurm_filepath) - + # Error handling (queuing slurm file) if status != config.system_process['SUCCESS']: - status_update = config.status_update_idx['ERROR_STATUS'] + config.status_update_idx['ERROR_STATUS'] update_value_dict['error_info']['error_message'] = 'Error to queue pupillometry slurm file' pupillometry.PupillometrySessionModelData.update1(key_insert) @@ -326,7 +323,7 @@ def check_pupillometry_sessions_queue(): #return (status_update, update_value_dict) continue - + # If success, store job_id key_insert['pupillometry_job_id'] = slurm_jobid pupillometry.PupillometrySessionModelData.update1(key_insert) @@ -348,7 +345,7 @@ def check_processed_pupillometry_sessions(): print('key_update1', key_update) status_update, message = slurmlib.check_slurm_job('u19prod', PupillometryProcessingHandler.spock_system_name, str(session_check['pupillometry_job_id']), local_user=False) - + #If job finished copy over output and/or error log if status_update == config.status_update_idx['ERROR_STATUS']: @@ -360,7 +357,7 @@ def check_processed_pupillometry_sessions(): slack_utils.send_slack_error_pupillometry_notification(config.slack_webhooks_dict['automation_pipeline_error_notification'],\ update_value_dict['error_info'] ,session_check) continue - + # Get video location if status_update == config.status_update_idx['NEXT_STATUS']: @@ -388,7 +385,7 @@ def check_processed_pupillometry_sessions(): try: pupil_data = PupillometryProcessingHandler.getPupilDiameter(h5_files) - except Exception as e: + except Exception: update_value_dict['error_info']['error_message'] = 'Could not get pupil diameter (check h5 or video file)' slack_utils.send_slack_error_pupillometry_notification(config.slack_webhooks_dict['automation_pipeline_error_notification'],\ update_value_dict['error_info'] ,session_check) @@ -402,11 +399,12 @@ def check_processed_pupillometry_sessions(): slack_utils.send_slack_pupillometry_update_notification(config.slack_webhooks_dict['automation_pipeline_update_notification'],\ 'Pupillometry job finished', session_check) - + if __name__ == '__main__': - + import time + from scripts.conf_file_finding import try_find_conf_file try_find_conf_file() time.sleep(1) @@ -415,7 +413,7 @@ def check_processed_pupillometry_sessions(): args[1] = args[1] + '/config.yaml' print(args) # - + PupillometryProcessingHandler.analyze_videos_pupillometry(args[1], args[0], args[2]) diff --git a/u19_pipeline/automatic_job/pupillometry_handler_script.py b/u19_pipeline/automatic_job/pupillometry_handler_script.py index 19c02b57..a7097c2e 100644 --- a/u19_pipeline/automatic_job/pupillometry_handler_script.py +++ b/u19_pipeline/automatic_job/pupillometry_handler_script.py @@ -1,11 +1,10 @@ - import time +import u19_pipeline.automatic_job.pupillometry_handler as ph from scripts.conf_file_finding import try_find_conf_file + try_find_conf_file() time.sleep(1) -import datajoint as dj -import u19_pipeline.automatic_job.pupillometry_handler as ph ph.PupillometryProcessingHandler.check_pupillometry_sessions_queue() diff --git a/u19_pipeline/automatic_job/pupillometry_main.py b/u19_pipeline/automatic_job/pupillometry_main.py index f2f08b58..ab882904 100644 --- a/u19_pipeline/automatic_job/pupillometry_main.py +++ b/u19_pipeline/automatic_job/pupillometry_main.py @@ -3,6 +3,7 @@ if __name__ == '__main__': import time + from scripts.conf_file_finding import try_find_conf_file try_find_conf_file() time.sleep(1) diff --git a/u19_pipeline/automatic_job/recording_handler.py b/u19_pipeline/automatic_job/recording_handler.py index bf93a8f6..f89198e0 100644 --- a/u19_pipeline/automatic_job/recording_handler.py +++ b/u19_pipeline/automatic_job/recording_handler.py @@ -1,21 +1,25 @@ +import copy import pathlib import time import traceback -import pandas as pd -import datajoint as dj -import copy - from datetime import datetime -from u19_pipeline import recording, ephys_pipeline, imaging_pipeline, recording, recording_process, lab +import datajoint as dj +import pandas as pd + +import u19_pipeline.automatic_job.params_config as config import u19_pipeline.utils.dj_shortcuts as dj_short -import u19_pipeline.utils.slack_utils as slack_utils import u19_pipeline.utils.scp_transfers as scp_tr - +import u19_pipeline.utils.slack_utils as slack_utils +from u19_pipeline import ( + ephys_pipeline, + imaging_pipeline, + lab, + recording, + recording_process, +) from u19_pipeline.automatic_job import ephys_element_ingest -import u19_pipeline.automatic_job.params_config as config - def exception_handler(func): @@ -34,7 +38,7 @@ def inner_function(*args, **kwargs): return (config.RECORDING_STATUS_ERROR_ID, update_value_dict) return inner_function -class RecordingHandler(): +class RecordingHandler: @staticmethod def pipeline_handler_main(): @@ -397,7 +401,6 @@ def imaging_preingestion(rec_series): for this_fov in fovs_ingested: # Scan_id always zero because TIFF splitted (FOVs) already on imaging_pipeline schema - scan_id = 0 # Acquisition type will have Mesoscope or 2Photon scanner = rec_series['acquisition_type'] # Hardcoded acquisition software diff --git a/u19_pipeline/automatic_job/recording_process_handler.py b/u19_pipeline/automatic_job/recording_process_handler.py index 4fbdcb8e..0b285851 100644 --- a/u19_pipeline/automatic_job/recording_process_handler.py +++ b/u19_pipeline/automatic_job/recording_process_handler.py @@ -1,31 +1,33 @@ -import time -import traceback -import pandas as pd -import datajoint as dj import copy import pathlib -import numpy as np +import time +import traceback +from datetime import datetime -from u19_pipeline.automatic_job import recording_handler +import pandas as pd +from ecephys_spike_sorting.common.SGLXMetaToCoords import MetaToCoords -import u19_pipeline.utils.dj_shortcuts as dj_short -import u19_pipeline.utils.slack_utils as slack_utils import u19_pipeline.automatic_job.clusters_paths_and_transfers as ft -import u19_pipeline.automatic_job.slurm_creator as slurmlib -import u19_pipeline.automatic_job.parameter_file_creator as paramfilelib -import u19_pipeline.automatic_job.params_config as config import u19_pipeline.automatic_job.ephys_element_populate as ep import u19_pipeline.automatic_job.imaging_element_populate as ip +import u19_pipeline.automatic_job.parameter_file_creator as paramfilelib +import u19_pipeline.automatic_job.params_config as config +import u19_pipeline.automatic_job.slurm_creator as slurmlib +import u19_pipeline.utils.dj_shortcuts as dj_short +import u19_pipeline.utils.slack_utils as slack_utils +from u19_pipeline import ( + ephys_pipeline, + imaging_pipeline, + recording, + recording_process, + utility, +) +from u19_pipeline.automatic_job import recording_handler +from u19_pipeline.utility import is_this_spock -from datetime import datetime -from u19_pipeline import recording, recording_process, ephys_pipeline, imaging_pipeline, utility -from u19_pipeline.utility import create_str_from_dict, is_this_spock -from u19_pipeline.utils import ephys_utils - -from ecephys_spike_sorting.common.SGLXMetaToCoords import MetaToCoords -class RecProcessHandler(): +class RecProcessHandler: @staticmethod def pipeline_handler_main(): @@ -34,7 +36,7 @@ def pipeline_handler_main(): Update status of each process job accordingly ''' - #Get info from all the possible status for a processjob + #Get info from all the possible status for a processjob df_all_process_job = RecProcessHandler.get_active_process_jobs() #For all active process jobs @@ -45,15 +47,15 @@ def pipeline_handler_main(): #Filter current status info current_status = rec_process_series['status_processing_id'] - current_status_series = config.recording_process_status_df.loc[config.recording_process_status_df['Value'] == current_status, :].squeeze() + config.recording_process_status_df.loc[config.recording_process_status_df['Value'] == current_status, :].squeeze() next_status_series = config.recording_process_status_df.loc[config.recording_process_status_df['Value'] == current_status+1, :].squeeze() - # Get processing function + # Get processing function function_status_process = getattr(RecProcessHandler, next_status_series['ProcessFunction']) #Trigger process, if success update recording process record try: - status, update_dict = function_status_process(rec_process_series, next_status_series) + status, update_dict = function_status_process(rec_process_series, next_status_series) print('update_dict', update_dict) #Get dictionary of record process @@ -63,8 +65,8 @@ def pipeline_handler_main(): print(rec_process_series) if status == config.status_update_idx['NEXT_STATUS']: - - + + #Get values to update next_status = next_status_series['Value'] value_update = update_dict['value_update'] @@ -136,7 +138,7 @@ def transfer_request(rec_series, status_series): status_series (pd.Series) = Series with information about the next status of the process (if neeeded) Returns: status_update (int) = 1 if recording process status has to be updated to next step in recording.RecordingProcess - = 0 if recording process status not to be changed + = 0 if recording process status not to be changed = -1 if recording process status has to be updated to ERROR in recording.RecordingProcess update_value_dict (dict) = Dictionary with next keys: {'value_update': value to be updated in this stage (if applicable) @@ -151,7 +153,7 @@ def transfer_request(rec_series, status_series): proc_rel_path = rec_series['recording_process_post_path'] modality = rec_series['recording_modality'] - # If tiger, we trigger globus transfer + # If tiger, we trigger globus transfer if rec_series['program_selection_params']['process_cluster'] == "tiger": if status_series['Key'] == 'RAW_FILE_TRANSFER_REQUEST': @@ -167,17 +169,17 @@ def transfer_request(rec_series, status_series): if transfer_request['status'] == config.system_process['SUCCESS']: status_update = config.status_update_idx['NEXT_STATUS'] update_value_dict['value_update'] = transfer_request['task_id'] - + else: status_update = config.status_update_idx['ERROR_STATUS'] update_value_dict['error_info']['error_message'] = transfer_request['error_info'] - + # If not tiger let's go to next status else: status_update = config.status_update_idx['NEXT_STATUS'] return (status_update, update_value_dict) - + @staticmethod @recording_handler.exception_handler def transfer_check(rec_series, status_series): @@ -188,7 +190,7 @@ def transfer_check(rec_series, status_series): status_series (pd.Series) = Series with information about the next status of the process (if neeeded) Returns: status_update (int) = 1 if recording process status has to be updated to next step in recording.RecordingProcess - = 0 if recording process status not to be changed + = 0 if recording process status not to be changed = -1 if recording process status has to be updated to ERROR in recording.RecordingProcess update_value_dict (dict) = Dictionary with next keys: {'value_update': value to be updated in this stage (if applicable) @@ -199,7 +201,7 @@ def transfer_check(rec_series, status_series): update_value_dict = copy.deepcopy(config.default_update_value_dict) id_task = rec_series[status_series['FunctionField']] - # If tiger, we trigger globus transfer + # If tiger, we trigger globus transfer if rec_series['program_selection_params']['process_cluster'] == "tiger": transfer_request = ft.request_globus_transfer_status(str(id_task)) @@ -214,7 +216,7 @@ def transfer_check(rec_series, status_series): status_update = config.status_update_idx['NEXT_STATUS'] return (status_update, update_value_dict) - + @staticmethod @recording_handler.exception_handler @@ -226,13 +228,13 @@ def slurm_job_queue(rec_series, status_series): status_series (pd.Series) = Series with information about the next status of the process (if neeeded) Returns: status_update (int) = 1 if recording process status has to be updated to next step in recording.RecordingProcess - = 0 if recording process status not to be changed + = 0 if recording process status not to be changed = -1 if recording process status has to be updated to ERROR in recording.RecordingProcess update_value_dict (dict) = Dictionary with next keys: {'value_update': value to be updated in this stage (if applicable) 'error_info': error info to be inserted if error occured } """ - + status_update = config.status_update_idx['NO_CHANGE'] update_value_dict = copy.deepcopy(config.default_update_value_dict) @@ -256,11 +258,11 @@ def slurm_job_queue(rec_series, status_series): if status == config.system_process['SUCCESS'] and rec_series['recording_modality'] == 'electrophysiology': recording_key = (recording_process.Processing.proj('recording_id', insertion_number='fragment_number') & rec_series['query_key']).fetch1() del recording_key["job_id"] - + chanmap_filename = config.default_chanmap_filename % (rec_series['job_id']) chanmap_file_local_path = pathlib.Path(config.chanmap_files_filepath,chanmap_filename).as_posix() - raw_directory_for_chanmap = rec_series['recording_process_pre_path'] + rec_series['recording_process_pre_path'] spikeglx_meta_filepath = ephys_pipeline.get_spikeglx_meta_filepath(recording_key) # Chanmap mat file generation @@ -283,7 +285,7 @@ def slurm_job_queue(rec_series, status_series): status_update = config.status_update_idx['ERROR_STATUS'] update_value_dict['error_info']['error_message'] = 'Error while generating/transfering slurm file' return (status_update, update_value_dict) - + #Queue slurm file if status == config.system_process['SUCCESS']: @@ -295,7 +297,7 @@ def slurm_job_queue(rec_series, status_series): status_update = config.status_update_idx['ERROR_STATUS'] update_value_dict['error_info']['error_message'] = 'Error while generating/transfering slurm file' return (status_update, update_value_dict) - + if status == config.system_process['SUCCESS']: status_update = config.status_update_idx['NEXT_STATUS'] update_value_dict['value_update'] = slurm_jobid @@ -321,7 +323,7 @@ def slurm_job_check(rec_series, status_series): status_series (pd.Series) = Series with information about the next status of the process (if neeeded) Returns: status_update (int) = 1 if recording process status has to be updated to next step in recording.RecordingProcess - = 0 if recording process status not to be changed + = 0 if recording process status not to be changed = -1 if recording process status has to be updated to ERROR in recording.RecordingProcess update_value_dict (dict) = Dictionary with next keys: {'value_update': value to be updated in this stage (if applicable) @@ -332,7 +334,7 @@ def slurm_job_check(rec_series, status_series): # Only queue if processing in tiger if rec_series['program_selection_params']['local_or_cluster'] == "cluster": status_update = config.status_update_idx['NO_CHANGE'] - + local_user = False program_selection_params = rec_series['program_selection_params'] if program_selection_params['process_cluster'] == 'spock' and is_this_spock(): @@ -361,7 +363,7 @@ def slurm_job_check(rec_series, status_series): update_value_dict['error_info']['error_exception'] = error_log else: status_update = config.status_update_idx['NEXT_STATUS'] - + return (status_update, update_value_dict) @staticmethod @@ -374,7 +376,7 @@ def populate_element(rec_series, status_series): status_series (pd.Series) = Series with information about the next status of the process (if neeeded) Returns: status_update (int) = 1 if recording process status has to be updated to next step in recording.RecordingProcess - = 0 if recording process status not to be changed + = 0 if recording process status not to be changed = -1 if recording process status has to be updated to ERROR in recording.RecordingProcess update_value_dict (dict) = Dictionary with next keys: {'value_update': value to be updated in this stage (if applicable) @@ -383,11 +385,11 @@ def populate_element(rec_series, status_series): update_value_dict = copy.deepcopy(config.default_update_value_dict) if rec_series['recording_modality'] == 'electrophysiology': - # Add extra function to create xyz picks file + # Add extra function to create xyz picks file #chanmap_filename = config.default_chanmap_filename % (rec_series['job_id']) #chanmap_file_local_path = pathlib.Path(config.chanmap_files_filepath,chanmap_filename).as_posix() #ephys_utils.xyz_pick_file_creator.main_xyz_pick_file_function(rec_series['recording_id'], rec_series['fragment_number'], chanmap_file_local_path, rec_series['recording_process_post_path']) - + status_update = ep.populate_element_data(rec_series['job_id']) elif rec_series['recording_modality'] == 'imaging': @@ -408,14 +410,12 @@ def get_program_selection_params(modality): # Pack all features in a dictionary this_modality_program_selection_params_dict = this_modality_program_selection_params.to_dict('records') - this_modality_program_selection_params_dict + # Get two columns, (recording_modality & "packed" program_selection_params) this_modality_program_selection_params = this_modality_program_selection_params.loc[:, 'recording_modality'].to_frame().copy() this_modality_program_selection_params['program_selection_params'] = this_modality_program_selection_params_dict - this_modality_program_selection_params - return this_modality_program_selection_params @staticmethod @@ -458,7 +458,7 @@ def get_active_process_jobs(active=True): if this_modality == 'imaging': params_df = RecProcessHandler.get_imaging_params_jobs(these_process_keys) - + this_mod_df = this_mod_df.merge(params_df, on='job_id', how='left') this_mod_df = this_mod_df.merge(this_mod_program_selection_params, on='recording_modality', how='left') @@ -471,7 +471,7 @@ def get_active_process_jobs(active=True): df_process_jobs = pd.concat([df_process_jobs, this_mod_df], ignore_index=True) df_process_jobs = df_process_jobs.reset_index(drop=True) - + print(df_process_jobs) return df_process_jobs @@ -499,7 +499,7 @@ def get_ephys_params_jobs(rec_process_keys): preparams_df = pd.DataFrame((ephys_pipeline.ephys_element.PreClusterParamSteps * \ utility.smart_dj_join(ephys_pipeline.ephys_element.PreClusterParamSteps.Step, ephys_pipeline.ephys_element.PreClusterParamSet.proj('precluster_method', 'params')) * recording_process.Processing.EphysParams.proj('precluster_param_steps_id') & rec_process_keys).fetch(as_dict=True)) - + # Join precluster params for the same recording_process preparams_df['preparams'] = preparams_df.apply(lambda x : {x['precluster_method']: x['params']}, axis=1) preparams_df = preparams_df.sort_values(by=['job_id', 'step_number']) @@ -561,19 +561,19 @@ def update_status_pipeline(recording_process_key_dict, status, update_field=None recording_process_key_dict (dict): key to find recording_process record status (int): value of the status to be updated update_field (str): name of the field to be updated as extra (only applicable to some status) - update_value (str|int): field value to be inserted on in task_field + update_value (str|int): field value to be inserted on in task_field """ if update_field is not None: update_task_id_dict = recording_process_key_dict.copy() update_task_id_dict[update_field] = update_value recording_process.Processing.update1(update_task_id_dict) - + update_status_dict = recording_process_key_dict.copy() update_status_dict['status_processing_id'] = status recording_process.Processing.update1(update_status_dict) - + @staticmethod def update_job_id_log(job_id, current_status, next_status, error_info_dict): """ @@ -592,7 +592,7 @@ def update_job_id_log(job_id, current_status, next_status, error_info_dict): key['status_processing_id_old'] = current_status key['status_processing_id_new'] = next_status key['status_timestamp'] = date_time - + if error_info_dict['error_message'] is not None and len(error_info_dict['error_message']) >= 256: error_info_dict['error_message'] =error_info_dict['error_message'][:255] @@ -630,7 +630,7 @@ def check_job_process_deletion(): # Get jobs that were processed in cluster df_inactive_jobs['local_or_cluster'] = df_inactive_jobs['program_selection_params'].map(lambda x: x['local_or_cluster']) df_inactive_jobs['process_cluster'] = df_inactive_jobs['program_selection_params'].map(lambda x: x['process_cluster']) - + #Update all local jobs df_not_cluster = df_inactive_jobs.loc[df_inactive_jobs['local_or_cluster'] == 'local'].reset_index(drop=True) if df_not_cluster.shape[0] > 0: @@ -647,7 +647,7 @@ def check_job_process_deletion(): for i in range(df_not_cluster_post_error.shape[0]): RecProcessHandler.update_status_pipeline(df_not_cluster_post_error.loc[i, 'query_key'], config.JOB_STATUS_ERROR_DELETED) RecProcessHandler.update_job_id_log(df_not_cluster_post_error.loc[i, 'job_id'], config.JOB_STATUS_ERROR_ID, config.JOB_STATUS_ERROR_DELETED, update_value_dict['error_info']) - + #If no inactive job, after locals the end df_inactive_jobs = df_inactive_jobs.loc[df_inactive_jobs['local_or_cluster'] == 'cluster'].reset_index(drop=True) if df_inactive_jobs.shape[0] == 0: @@ -702,7 +702,7 @@ def check_job_process_deletion(): for this_cluster in clusters: ft.delete_empty_data_directory_cluster(this_cluster, type="raw") ft.delete_empty_data_directory_cluster(this_cluster, type="processed") - + # Update status for post-processed jobs df_post_processed = df_inactive_jobs.loc[(df_inactive_jobs['raw_dir_deleted'] == 1) & (df_inactive_jobs['processed_dir_deleted'] == 1) & (df_inactive_jobs['status_processing_id'] == config.JOB_STATUS_PROCESSED)] @@ -717,7 +717,7 @@ def check_job_process_deletion(): for i in range(df_post_error.shape[0]): RecProcessHandler.update_status_pipeline(df_post_error.loc[i, 'query_key'], config.JOB_STATUS_ERROR_DELETED) RecProcessHandler.update_job_id_log(df_post_error.loc[i, 'job_id'], config.JOB_STATUS_ERROR_ID, config.JOB_STATUS_ERROR_DELETED, update_value_dict['error_info']) - + return ''' @@ -726,12 +726,12 @@ def filter_session_status(df_rec_process, status): """ Filter dataframe with rec_process with a given status Args: - df_rec_process (pd.DataFrame): recording process dataframe + df_rec_process (pd.DataFrame): recording process dataframe status (int): value of the status to be filtered with Returns: df_rec_process_status (pd.DataFrame): recording process dataframe filtered with given status """ - + df_rec_process_status = df_rec_process.loc[df_sessions['status_processing_id'] == status, :] df_rec_process_status = df_rec_process_status.reset_index(drop=True) ''' diff --git a/u19_pipeline/automatic_job/slurm_creator.py b/u19_pipeline/automatic_job/slurm_creator.py index a801ce96..dd01aa91 100644 --- a/u19_pipeline/automatic_job/slurm_creator.py +++ b/u19_pipeline/automatic_job/slurm_creator.py @@ -1,14 +1,13 @@ import copy -import os -import subprocess import pathlib -import json import re +import subprocess + import u19_pipeline.automatic_job.clusters_paths_and_transfers as ft -from u19_pipeline.utility import create_str_from_dict, is_this_spock -import u19_pipeline.automatic_job.params_config as config +import u19_pipeline.automatic_job.params_config as config +from u19_pipeline.utility import is_this_spock from u19_pipeline.utils.file_utils import write_file # Functions to create slurm jobs @@ -159,7 +158,7 @@ def transfer_slurm_file(slurm_file_local_path, slurm_destination, cluster_vars): def create_slurm_params_file(slurm_dict): text_dict = '' - for slurm_param in slurm_dict.keys(): + for slurm_param in slurm_dict: if isinstance(slurm_dict[slurm_param], list): for list_param in slurm_dict[slurm_param]: diff --git a/u19_pipeline/automatic_job/tigress2pni.py b/u19_pipeline/automatic_job/tigress2pni.py index 2a9b8300..263e666d 100755 --- a/u19_pipeline/automatic_job/tigress2pni.py +++ b/u19_pipeline/automatic_job/tigress2pni.py @@ -13,18 +13,22 @@ previous transfers, so if this script is run twice in quick succession, the second run won't queue a duplicate transfer.""" +import contextlib import json -import sys import os -import six +import sys -from globus_sdk import (NativeAppAuthClient, TransferClient, - RefreshTokenAuthorizer, TransferData) +import six +from fair_research_login import NativeClient +from globus_sdk import ( + NativeAppAuthClient, + RefreshTokenAuthorizer, + TransferClient, + TransferData, +) from globus_sdk.exc import GlobusAPIError from globus_sdk.services.transfer.errors import TransferAPIError -from fair_research_login import NativeClient - # Princeton TIGRESS SOURCE_ENDPOINT = 'a9df83d2-42f0-11e6-80cf-22000b1701d1' # SOURCE_ENDPOINT = 'ef3a4e74-e742-11ec-9912-3b4cfda38030' @@ -74,7 +78,7 @@ def load_data_from_file(filepath): """Load a set of saved tokens.""" if not os.path.exists(filepath): return [] - with open(filepath, 'r') as f: + with open(filepath) as f: tokens = json.load(f) return tokens @@ -104,12 +108,12 @@ def setup_transfer_client(transfer_tokens): try: transfer_client.endpoint_autoactivate(SOURCE_ENDPOINT) - r = transfer_client.endpoint_autoactivate(DESTINATION_ENDPOINT) + transfer_client.endpoint_autoactivate(DESTINATION_ENDPOINT) except GlobusAPIError as ex: if ex.http_status == 401: sys.exit('Refresh token has expired. ' 'Please delete the `tokens` object from ' - '{} and try again.'.format(DATA_FILE)) + f'{DATA_FILE} and try again.') else: raise ex return transfer_client @@ -120,10 +124,7 @@ def check_endpoint_path(transfer_client, endpoint, path): try: transfer_client.operation_ls(endpoint, path=path) except TransferAPIError as tapie: - print('Failed to query endpoint "{}": {}'.format( - endpoint, - tapie.message - )) + print(f'Failed to query endpoint "{endpoint}": {tapie.message}') sys.exit(1) @@ -134,9 +135,9 @@ def create_destination_directory(transfer_client, dest_ep, dest_path): except TransferAPIError: try: transfer_client.operation_mkdir(dest_ep, dest_path) - print('Created directory: {}'.format(dest_path)) + print(f'Created directory: {dest_path}') except TransferAPIError as tapie: - print('Failed to start transfer: {}'.format(tapie.message)) + print(f'Failed to start transfer: {tapie.message}') sys.exit(1) @@ -155,10 +156,8 @@ def main(): # need to specify that we want refresh tokens print('login in') tokens = client.login(requested_scopes=SCOPES, refresh_tokens=True) - try: + with contextlib.suppress(Exception): client.save_tokens(tokens) - except: - pass transfer = setup_transfer_client(tokens['transfer.api.globus.org']) @@ -177,9 +176,7 @@ def main(): pass if len(sys.argv) < 2: - print('Usage: {} tigress_dir pni_dir'.format( - sys.argv[0] - )) + print(f'Usage: {sys.argv[0]} tigress_dir pni_dir') sys.exit(1) SOURCE_PATH = sys.argv[1] @@ -202,12 +199,7 @@ def main(): task = transfer.submit_transfer(tdata) save_data_to_file(DATA_FILE, 'task', task.data) - print('Transfer has been started from\n {}:{}\nto\n {}:{}'.format( - SOURCE_ENDPOINT, - SOURCE_PATH, - DESTINATION_ENDPOINT, - DESTINATION_PATH - )) + print(f'Transfer has been started from\n {SOURCE_ENDPOINT}:{SOURCE_PATH}\nto\n {DESTINATION_ENDPOINT}:{DESTINATION_PATH}') url_string = 'https://globus.org/app/transfer?' + \ six.moves.urllib.parse.urlencode({ 'origin_id': SOURCE_ENDPOINT, @@ -215,7 +207,7 @@ def main(): 'destination_id': DESTINATION_ENDPOINT, 'destination_path': DESTINATION_PATH }) - print('Visit the link below to see the changes:\n{}'.format(url_string)) + print(f'Visit the link below to see the changes:\n{url_string}') if __name__ == '__main__': diff --git a/u19_pipeline/behavior.py b/u19_pipeline/behavior.py index 9d72bae3..0d718b6d 100644 --- a/u19_pipeline/behavior.py +++ b/u19_pipeline/behavior.py @@ -1,5 +1,4 @@ import datajoint as dj -from . import acquisition, task, subject schema = dj.schema(dj.config['custom']['database.prefix'] + 'behavior') diff --git a/u19_pipeline/copy_table.py b/u19_pipeline/copy_table.py index 445938cd..8547530c 100644 --- a/u19_pipeline/copy_table.py +++ b/u19_pipeline/copy_table.py @@ -1,9 +1,10 @@ -import datajoint as dj import traceback -from u19_pipeline.temp import acquisition, behavior, imaging, meso, meso_analysis -from u19_pipeline import subject + +import datajoint as dj from tqdm import tqdm +from u19_pipeline import subject +from u19_pipeline.temp import acquisition, behavior, imaging, meso, meso_analysis acquisition_original = dj.create_virtual_module( 'acquisition_original', 'u19_acquisition' @@ -54,7 +55,7 @@ def copy_table(target_schema, src_schema, table_name, **kwargs): try: target_table.insert1(t, skip_duplicates=True, **kwargs) except Exception: - print("Error when inserting {}".format(t)) + print(f"Error when inserting {t}") traceback.print_exc() @@ -141,8 +142,7 @@ def copy_imaging_tables(): else: temp_table = getattr(imaging, table) - if isinstance(temp_table, dj.Lookup) or \ - isinstance(temp_table, dj.Manual): + if isinstance(temp_table, (dj.Lookup, dj.Manual)): copy_table(imaging, imaging_original, table, skip_duplicates=True) else: @@ -187,8 +187,7 @@ def copy_meso_tables(): else: temp_table = getattr(meso, table) - if isinstance(temp_table, dj.Lookup) or \ - isinstance(temp_table, dj.Manual): + if isinstance(temp_table, (dj.Lookup, dj.Manual)): copy_table(meso, meso_original, table) else: copy_table(meso, meso_original, table, @@ -216,8 +215,7 @@ def copy_meso_analysis_tables(): else: temp_table = getattr(meso_analysis, table) - if isinstance(temp_table, dj.Lookup) or \ - isinstance(temp_table, dj.Manual): + if isinstance(temp_table, (dj.Lookup, dj.Manual)): copy_table(meso_analysis, meso_analysis_original, table) else: copy_table(meso_analysis, meso_analysis_original, table, diff --git a/u19_pipeline/ephys.py b/u19_pipeline/ephys.py index 74686ce0..ab0053d6 100644 --- a/u19_pipeline/ephys.py +++ b/u19_pipeline/ephys.py @@ -1,15 +1,13 @@ -import datajoint as dj import pathlib -import numpy as np -from u19_pipeline import ephys, behavior - -from element_array_ephys import probe as probe_element +import datajoint as dj +import numpy as np from element_array_ephys import ephys as ephys_element +from element_array_ephys import probe as probe_element import u19_pipeline.utils.DemoReadSGLXData.readSGLX as readSGLX import u19_pipeline.utils.ephys_utils as ephys_utils - +from u19_pipeline import behavior, ephys from u19_pipeline.utils.DemoReadSGLXData.readSGLX import readMeta """ @@ -207,14 +205,14 @@ def make(self, key): thissession = behavior.TowersBlock().Trial() & key iterstart = thissession.fetch('vi_start') - first_vr_iteration = iterstart[0] + iterstart[0] # Obtain the precise times when the frames transition. # This is obtained from iteration_index_nidq ls = np.diff(iteration_index_nidq) ls[ls<0] = 1 # These are the trial transitions (see definition above). To get total number of frames, we define this as a transition like all others. ls[np.isnan(ls)] = 0 - iteration_transition_indexes = np.where(ls)[0] + np.where(ls)[0] # First iterations captured not in virmen because vr was not started yet #for i in range(first_vr_iteration): diff --git a/u19_pipeline/ephys_pipeline.py b/u19_pipeline/ephys_pipeline.py index bdcd827a..8fd5eefb 100644 --- a/u19_pipeline/ephys_pipeline.py +++ b/u19_pipeline/ephys_pipeline.py @@ -1,20 +1,20 @@ -import datajoint as dj -import pathlib +import datetime import glob +import json +import pathlib import re import subprocess -import json -import datetime -from element_array_ephys import probe as probe_element +import datajoint as dj from element_array_ephys import ephys_precluster as ephys_element +from element_array_ephys import probe as probe_element from element_array_ephys.readers import spikeglx from element_interface.utils import find_full_path -from u19_pipeline import recording -import u19_pipeline.utils.ephys_utils as ephys_utils import u19_pipeline.utils.DemoReadSGLXData.readSGLX as readSGLX +import u19_pipeline.utils.ephys_utils as ephys_utils +from u19_pipeline import recording from u19_pipeline.utils.DemoReadSGLXData.readSGLX import readMeta schema = dj.schema(dj.config['custom']['database.prefix'] + 'ephys_pipeline') @@ -183,7 +183,7 @@ def get_spikeglx_meta_filepath(ephys_recording_key): break else: raise FileNotFoundError( - 'No SpikeGLX data found for probe insertion: {}'.format(ephys_recording_key)) + f'No SpikeGLX data found for probe insertion: {ephys_recording_key}') return spikeglx_meta_filepath diff --git a/u19_pipeline/ephys_sync.py b/u19_pipeline/ephys_sync.py index e454997e..cc7f2d3c 100644 --- a/u19_pipeline/ephys_sync.py +++ b/u19_pipeline/ephys_sync.py @@ -1,14 +1,12 @@ -import datajoint as dj import pathlib -import numpy as np -from u19_pipeline import behavior -from u19_pipeline.ephys_pipeline import ephys_element, get_session_directory +import datajoint as dj +import numpy as np import u19_pipeline.utils.DemoReadSGLXData.readSGLX as readSGLX import u19_pipeline.utils.ephys_utils as ephys_utils -import u19_pipeline.utils.path_utils as pu -import u19_pipeline.automatic_job.params_config as config +from u19_pipeline import behavior +from u19_pipeline.ephys_pipeline import ephys_element, get_session_directory # Tables downstream from `ephys_pipeline` module --------------------------------------- schema = dj.schema(dj.config['custom']['database.prefix'] + 'ephys_sync') @@ -124,14 +122,14 @@ def make(self, key): thissession = behavior.TowersBlock().Trial() & key iterstart = thissession.fetch('vi_start') - first_vr_iteration = iterstart[0] + iterstart[0] # Obtain the precise times when the frames transition. # This is obtained from iteration_index_nidq ls = np.diff(iteration_index_nidq) ls[ls<0] = 1 # These are the trial transitions (see definition above). To get total number of frames, we define this as a transition like all others. ls[np.isnan(ls)] = 0 - iteration_transition_indexes = np.where(ls)[0] + np.where(ls)[0] # First iterations captured not in virmen because vr was not started yet #for i in range(first_vr_iteration): diff --git a/u19_pipeline/imaging.py b/u19_pipeline/imaging.py index df5a0f1b..2c3d2e6e 100644 --- a/u19_pipeline/imaging.py +++ b/u19_pipeline/imaging.py @@ -1,6 +1,4 @@ import datajoint as dj -from u19_pipeline import acquisition - schema = dj.schema(dj.config['custom']['database.prefix'] + 'imaging') diff --git a/u19_pipeline/imaging_pipeline.py b/u19_pipeline/imaging_pipeline.py index a113e387..449a5861 100644 --- a/u19_pipeline/imaging_pipeline.py +++ b/u19_pipeline/imaging_pipeline.py @@ -1,16 +1,14 @@ -import datajoint as dj import pathlib import subprocess -from u19_pipeline import lab, acquisition, subject, recording -import u19_pipeline.automatic_job.params_config as config -import u19_pipeline.utils.dj_shortcuts as dj_short - -from element_calcium_imaging import scan as scan_element +import datajoint as dj from element_calcium_imaging import imaging_preprocess as imaging_element from element_interface.utils import find_full_path +import u19_pipeline.automatic_job.params_config as config +import u19_pipeline.utils.dj_shortcuts as dj_short +from u19_pipeline import lab, recording, subject schema = dj.schema(dj.config['custom']['database.prefix'] + 'imaging_pipeline') @@ -153,7 +151,6 @@ class File(dj.Part): imaging_schema_name = dj.config['custom']['database.prefix'] + 'pipeline_imaging_element' # 2. Upstream tables ------------------------------------------------------------------- -from u19_pipeline.reference import BrainArea as Location Session = TiffSplit @@ -167,7 +164,6 @@ class Equipment(dj.Manual): """ # 3. Utility functions ----------------------------------------------------------------- -from u19_pipeline import recording_process def get_imaging_root_data_dir(): return dj.config.get('custom', {}).get('imaging_root_data_dir', None) @@ -190,10 +186,10 @@ def get_processed_dir(processing_task_key, process_method): sess_key = (ImagingPipelineSession & processing_task_key).fetch1('KEY') bucket_scan_dir = (TiffSplit & sess_key & {'tiff_split': processing_task_key['scan_id']}).fetch1('tiff_split_directory') - user_id = (subject.Subject & processing_task_key).fetch1('user_id') + (subject.Subject & processing_task_key).fetch1('user_id') sess_dir = find_full_path(get_imaging_root_data_dir(), bucket_scan_dir) - relative_suite2p_dir = (pathlib.Path(bucket_scan_dir) / process_method).as_posix() + (pathlib.Path(bucket_scan_dir) / process_method).as_posix() if not sess_dir.exists(): raise FileNotFoundError(f'Session directory not found ({sess_dir})') diff --git a/u19_pipeline/lab.py b/u19_pipeline/lab.py index c6d25d0c..b45cce59 100644 --- a/u19_pipeline/lab.py +++ b/u19_pipeline/lab.py @@ -1,12 +1,14 @@ """This module defines tables in the schema U19_lab""" +import os +import pathlib +import sys + import datajoint as dj import numpy as np import pandas as pd -import pathlib -import sys -import os + from u19_pipeline.utility import is_this_spock schema = dj.schema(dj.config['custom']['database.prefix'] + 'lab') @@ -290,9 +292,9 @@ def get_local_path2(self, bucket_path): bucket_base_dir = path_df['bucket_path'] if bucket_path.find('/mnt/cup/') != -1: - extra_bucket_dir = bucket_path.replace(bucket_base_dir + '/', ''); + extra_bucket_dir = bucket_path.replace(bucket_base_dir + '/', '') else: - extra_bucket_dir = bucket_path.replace(path_df['global_path'] + '/', ''); + extra_bucket_dir = bucket_path.replace(path_df['global_path'] + '/', '') if extra_bucket_dir[0] == '/': extra_bucket_dir = extra_bucket_dir[1:] @@ -305,7 +307,7 @@ def get_local_path2(self, bucket_path): baseDir = path_df['net_location'] # Correct extra bucket dir to adjust windows filesep - extra_bucket_dir = extra_bucket_dir.replace('/', '\\'); + extra_bucket_dir = extra_bucket_dir.replace('/', '\\') else: # For mac and linux the accesible path is the local_path field diff --git a/u19_pipeline/meso.py b/u19_pipeline/meso.py index 50e346e1..2ded474c 100644 --- a/u19_pipeline/meso.py +++ b/u19_pipeline/meso.py @@ -1,7 +1,5 @@ import datajoint as dj import numpy as np -from u19_pipeline import acquisition - schema = dj.schema('u19_meso') diff --git a/u19_pipeline/meso_analysis.py b/u19_pipeline/meso_analysis.py index be1dd5f9..419423ea 100644 --- a/u19_pipeline/meso_analysis.py +++ b/u19_pipeline/meso_analysis.py @@ -1,5 +1,4 @@ import datajoint as dj -from u19_pipeline import meso schema = dj.schema('u19_meso_analysis') diff --git a/u19_pipeline/puffs.py b/u19_pipeline/puffs.py index ee8fe6f7..3394d129 100644 --- a/u19_pipeline/puffs.py +++ b/u19_pipeline/puffs.py @@ -1,7 +1,6 @@ """This module defines tables in the schema ahoag_puffs_lab_demo""" import datajoint as dj -from . import lab, acquisition, task schema = dj.schema(dj.config['custom']['database.prefix'] + 'puffs') diff --git a/u19_pipeline/recording.py b/u19_pipeline/recording.py index 6748f93d..ddcbdaf6 100644 --- a/u19_pipeline/recording.py +++ b/u19_pipeline/recording.py @@ -1,7 +1,6 @@ + import datajoint as dj -import copy -from u19_pipeline import lab, subject, acquisition import u19_pipeline.automatic_job.params_config as config schema = dj.schema(dj.config['custom']['database.prefix'] + 'recording') diff --git a/u19_pipeline/recording_process.py b/u19_pipeline/recording_process.py index 3efaef30..bb60c46d 100644 --- a/u19_pipeline/recording_process.py +++ b/u19_pipeline/recording_process.py @@ -2,9 +2,7 @@ import pathlib import datajoint as dj -from u19_pipeline import recording -from u19_pipeline.imaging_pipeline import imaging_element -from u19_pipeline.ephys_pipeline import ephys_element + import u19_pipeline.automatic_job.params_config as config schema = dj.schema(dj.config['custom']['database.prefix'] + 'recording_process') diff --git a/u19_pipeline/subject.py b/u19_pipeline/subject.py index 98d0e166..90aa262e 100644 --- a/u19_pipeline/subject.py +++ b/u19_pipeline/subject.py @@ -2,7 +2,6 @@ import datajoint as dj -from . import lab schema = dj.schema(dj.config['custom']['database.prefix'] + 'subject') diff --git a/u19_pipeline/task.py b/u19_pipeline/task.py index 14706a9b..ce37d7f2 100644 --- a/u19_pipeline/task.py +++ b/u19_pipeline/task.py @@ -2,7 +2,6 @@ import datajoint as dj - schema = dj.schema(dj.config['custom']['database.prefix'] + 'task') diff --git a/u19_pipeline/temp/acquisition.py b/u19_pipeline/temp/acquisition.py index 87609495..4d14f722 100644 --- a/u19_pipeline/temp/acquisition.py +++ b/u19_pipeline/temp/acquisition.py @@ -1,7 +1,5 @@ import datajoint as dj -from u19_pipeline import lab, task, subject - schema = dj.schema('u19_acquisition') diff --git a/u19_pipeline/utility.py b/u19_pipeline/utility.py index 297c9262..7b98bee2 100644 --- a/u19_pipeline/utility.py +++ b/u19_pipeline/utility.py @@ -1,14 +1,14 @@ -import sys import os -import pandas as pd -from pandas.api.types import is_numeric_dtype +import subprocess +import sys + +import datajoint as dj import numpy as np -from scipy.optimize import curve_fit +import pandas as pd from astropy.stats import binom_conf_interval -import datajoint as dj -import subprocess +from scipy.optimize import curve_fit def is_this_spock(): @@ -100,8 +100,8 @@ def smart_dj_join(t1, t2): fields_t2 = pd.DataFrame.from_dict(t2.heading.attributes, orient='index') # Get only secondary fields and check matches - fields_t1_list = set(fields_t1.loc[fields_t1['in_key'] == False].index.to_list()) - fields_t2_list = set(fields_t2.loc[fields_t2['in_key'] == False].index.to_list()) + fields_t1_list = set(fields_t1.loc[fields_t1['in_key'] is False].index.to_list()) + fields_t2_list = set(fields_t2.loc[fields_t2['in_key'] is False].index.to_list()) intersected_fields = fields_t2_list.intersection(fields_t1_list) # If there are: @@ -147,7 +147,7 @@ def psychFit(deltaBins, numR, numL, choices): # Correct deltaBin & trialBin to produce same result as Matlab psychFit deltaBins_search = deltaBins.astype(float) - 1.5 trialBin = np.searchsorted(deltaBins_search, nCues_RminusL, side='right') - trialBin -= 1; + trialBin -= 1 # Put into evidence bins all Trials with corresponding choices for iTrial in range(len(choices)): @@ -158,7 +158,7 @@ def psychFit(deltaBins, numR, numL, choices): trialDelta[trialBin[iTrial]] = trialDelta[trialBin[iTrial]] + nCues_RminusL[iTrial] with np.errstate(divide='ignore', invalid='ignore'): - trialDelta = np.true_divide(trialDelta, numTrials); + trialDelta = np.true_divide(trialDelta, numTrials) # Select only bins with trials idx_zero = numTrials == 0 @@ -203,7 +203,7 @@ def psychFit(deltaBins, numR, numL, choices): delta = np.linspace(deltaBins[0] - 2, deltaBins[-1] + 2, num=50) # Repeat trialDelta 3 times for errorX why ?? - errorX = np.tile(trialDelta[~idx_zero], 3); + errorX = np.tile(trialDelta[~idx_zero], 3) # Confidence intervals are errorY, as a vector errorY = np.stack(pci[:, ~idx_zero]) @@ -212,10 +212,10 @@ def psychFit(deltaBins, numR, numL, choices): # Fill dictionary of results fit_results = dict() fit_results['delta_bins'] = deltaBins[~idx_zero] - fit_results['delta_data'] = trialDelta[~idx_zero]; - fit_results['pright_data'] = 100 * phat[~idx_zero]; - fit_results['delta_error'] = errorX; - fit_results['pright_error'] = 100 * errorY; + fit_results['delta_data'] = trialDelta[~idx_zero] + fit_results['pright_data'] = 100 * phat[~idx_zero] + fit_results['delta_error'] = errorX + fit_results['pright_error'] = 100 * errorY if is_there_psychometric: fit_results['delta_fit'] = delta @@ -281,7 +281,7 @@ def get_cols_rows_plot(num_plots, fig_size): def create_str_from_dict(key_dict): slurm_file_name = '' - for i in key_dict.keys(): + for i in key_dict: slurm_file_name += str(i) + '_' + str(key_dict[i]) return slurm_file_name diff --git a/u19_pipeline/utils/DemoReadSGLXData/readSGLX.py b/u19_pipeline/utils/DemoReadSGLXData/readSGLX.py index 47421a1a..c9e11971 100644 --- a/u19_pipeline/utils/DemoReadSGLXData/readSGLX.py +++ b/u19_pipeline/utils/DemoReadSGLXData/readSGLX.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- """ Requires python 3 @@ -15,11 +14,11 @@ much easier! """ -import numpy as np -import matplotlib.pyplot as plt from pathlib import Path -from tkinter import Tk -from tkinter import filedialog +from tkinter import Tk, filedialog + +import matplotlib.pyplot as plt +import numpy as np # Parse ini file returning a dictionary whose keys are the metadata diff --git a/u19_pipeline/utils/dj_shortcuts.py b/u19_pipeline/utils/dj_shortcuts.py index 0e87490f..312ba1b2 100644 --- a/u19_pipeline/utils/dj_shortcuts.py +++ b/u19_pipeline/utils/dj_shortcuts.py @@ -1,5 +1,6 @@ import pandas as pd + def get_primary_key_fields(t): """ Get list of all fields that compose primary key @@ -10,7 +11,7 @@ def get_primary_key_fields(t): """ fields_t = pd.DataFrame.from_dict(t.heading.attributes, orient='index') - primary_field_list = fields_t.loc[fields_t['in_key'] == True].index.to_list() + primary_field_list = fields_t.loc[fields_t['in_key'] is True].index.to_list() return primary_field_list @@ -27,8 +28,8 @@ def smart_dj_join(t1, t2): fields_t2 = pd.DataFrame.from_dict(t2.heading.attributes, orient='index') # Get only secondary fields and check matches - fields_t1_list = set(fields_t1.loc[fields_t1['in_key'] == False].index.to_list()) - fields_t2_list = set(fields_t2.loc[fields_t2['in_key'] == False].index.to_list()) + fields_t1_list = set(fields_t1.loc[fields_t1['in_key'] is False].index.to_list()) + fields_t2_list = set(fields_t2.loc[fields_t2['in_key'] is False].index.to_list()) intersected_fields = fields_t2_list.intersection(fields_t1_list) # If there are: diff --git a/u19_pipeline/utils/dlc_process.py b/u19_pipeline/utils/dlc_process.py index 7c659fe2..3349a003 100644 --- a/u19_pipeline/utils/dlc_process.py +++ b/u19_pipeline/utils/dlc_process.py @@ -1,15 +1,14 @@ import os -import deeplabcut -import pandas as pd -import numpy as np -import pickle import pathlib +import pickle import sys +import deeplabcut +import numpy as np +import pandas as pd from scipy import stats from skimage.measure import EllipseModel -from skimage.draw import ellipse_perimeter import u19_pipeline.utils.path_utils as pu @@ -37,7 +36,7 @@ def getPupilDiameter(destinationFolder=None): An array that contains the pupil diameter (index is the video frame) [numpy Array] """ # TODO make the function - + # Read the analyzed video data h5 file h5_file = pu.get_filepattern_paths(destinationFolder, "/*.h5") @@ -47,7 +46,7 @@ def getPupilDiameter(destinationFolder=None): raise Exception('No h5 file in directory: '+ destinationFolder) if len(h5_file) > 1: raise Exception('To many h5 files in directory: '+ destinationFolder) - + h5_file = h5_file[0] labels = pd.read_hdf(h5_file) @@ -78,14 +77,13 @@ def getPupilDiameter(destinationFolder=None): outlierFlags = outlierFlags.rename(columns={outlierFlags.columns[0]: "OutlierFlag"}) # Concatenate outlier flags array to remove outliers from pupil diameter array temp = pd.concat([df, outlierFlags], axis=1) - temp.loc[temp['OutlierFlag']==True, 'PupilDiameter'] = None + temp.loc[temp['OutlierFlag'] is True, 'PupilDiameter'] = None pupilDiameter = temp['PupilDiameter'].to_numpy() filename = pathlib.Path(destinationFolder, "pupil_diameter.pickle").as_posix() - file_to_store = open(filename, "wb") - pickle.dump(pupilDiameter, file_to_store) - file_to_store.close() + with open(filename, "wb") as file_to_store: + pickle.dump(pupilDiameter, file_to_store) if __name__ == "__main__": diff --git a/u19_pipeline/utils/ephys_utils.py b/u19_pipeline/utils/ephys_utils.py index ace162a3..7018d0a2 100644 --- a/u19_pipeline/utils/ephys_utils.py +++ b/u19_pipeline/utils/ephys_utils.py @@ -1,26 +1,27 @@ -import numpy as np -import datajoint as dj +import json import pathlib import warnings -import json +import datajoint as dj +import numpy as np +from bitstring import BitArray from scipy import signal as sp from scipy.io import loadmat -from scipy.spatial.transform import Rotation as R - -from bitstring import BitArray -from element_array_ephys import ephys as ephys_element -from u19_pipeline.utils.DemoReadSGLXData.readSGLX import SampRate, makeMemMapRaw, ExtractDigital +from u19_pipeline.utils.DemoReadSGLXData.readSGLX import ( + ExtractDigital, + SampRate, + makeMemMapRaw, +) class spice_glx_utility: - @staticmethod + @staticmethod def load_spice_glx_digital_file(file_path, nidq_meta, d_line_list=None): # Read NIDAQ digital file. - #Inputs + #Inputs # file_path = path for the spike glx file # nidq_meta = meta file read from readMeta spike glx utility # d_line_list = digital channels to read from the file @@ -55,7 +56,7 @@ def get_idx_trial_start(trial_pulse_signal): #Get index of samples when trial has started based on a pulse signal #Get idx samples trial starts - trial_start_idx = np.where(np.diff(trial_pulse_signal) == 1) + trial_start_idx = np.where(np.diff(trial_pulse_signal) == 1) trial_start_idx = trial_start_idx[0] #Detect fake trial init pulses (a single sample in 1 instead of 5ms signal) @@ -75,7 +76,7 @@ def get_idx_iter_start_pulsesignal(iteration_pulse_signal_trial, trial_start_idx #Get index of iteration starts on a trial based on a pulse start signal #Get idx of iteration start during trial - iter_samples = np.where(np.diff(iteration_pulse_signal_trial) == 1) + iter_samples = np.where(np.diff(iteration_pulse_signal_trial) == 1) iter_samples = iter_samples + trial_start_idx # First iteration is at trial start, just align first trial start iter_samples[0, 0] = trial_start_idx @@ -90,7 +91,7 @@ def get_idx_iter_start_counterbit(iteration_pulse_signal_trial, trial_start_idx) #Get idx of odd iteration during trial - iter_samples = np.where(np.diff(iteration_pulse_signal_trial) == 1) + iter_samples = np.where(np.diff(iteration_pulse_signal_trial) == 1) iter_samples = iter_samples + trial_start_idx if iteration_pulse_signal_trial[0] == 1: @@ -103,12 +104,12 @@ def get_idx_iter_start_counterbit(iteration_pulse_signal_trial, trial_start_idx) iter_samples = np.squeeze(iter_samples) #Get idx of even iteration during trial - iter_samples2 = np.where(np.diff(iteration_pulse_signal_trial) == 255) + iter_samples2 = np.where(np.diff(iteration_pulse_signal_trial) == 255) iter_samples2 = iter_samples2 + trial_start_idx iter_samples2 = np.squeeze(iter_samples2) iter_samples = np.concatenate([iter_samples, iter_samples2]) - iter_samples = np.sort(iter_samples) + iter_samples = np.sort(iter_samples) return iter_samples @@ -173,15 +174,15 @@ def get_iteration_sample_vector_from_digital_lines_pulses(trial_pulse_signal, it iter_samples = get_idx_iter_start_counterbit(iteration_pulse_signal[idx_start:idx_end], trial_start_idx[i]) else: iter_samples = get_idx_iter_start_pulsesignal(iteration_pulse_signal[idx_start:idx_end], trial_start_idx[i]) - + #Append as an array of arrays (each trial is an array with idx of iterations) iter_start_idx.append(iter_samples) #Calculate time for each iteration start times = iter_samples/nidq_sampling_rate times = times - times[0] iter_times_idx.append(times) - - #Fill vector samples + + #Fill vector samples for j in range(iter_samples.shape[0]-1): iteration_vector_output['framenumber_vector_samples'][iter_samples[j]:iter_samples[j+1]] = j+1 @@ -236,7 +237,7 @@ def get_iteration_sample_vector_from_digital_lines_word(digital_array, time, ite trialnumber[idx] = current_trial trial_list = np.array(np.unique(trialnumber[np.isfinite(trialnumber)]), dtype = np.int32) - # Fourth, find and remove the nidaq glitches + # Fourth, find and remove the nidaq glitches # These are single samples where the iteration number is corrupted # ... likely because sampling happened faster than output of the behavior PC. # This is also where skipped frames are detected. @@ -262,7 +263,7 @@ def get_iteration_sample_vector_from_digital_lines_word(digital_array, time, ite # This point we have framenumber_in_trial and trialnumber. Now just some refactoring to fit into the usual data structure iteration_vector_output = dict() - + iteration_vector_output['trialnumber_vector_samples'] = trialnumber iteration_vector_output['framenumber_vector_samples'] = framenumber_in_trial @@ -279,12 +280,10 @@ def assert_iteration_samples_count(iteration_sample_idx_output, behavior_time_ve # Count trial count differences trial_count_diff = np.abs(iteration_sample_idx_output.shape[0] - (behavior_time_vector.shape[0])) - count = 0 trials_diff_iteration_small = list() trials_diff_iteration_big = list() for idx_trial, iter_trials in enumerate(iteration_sample_idx_output): - count += 1 - print(count) + print(idx_trial) print(iter_trials.shape[0]) print(behavior_time_vector[idx_trial].shape[0]) #For each trial iteration # should be equal to the behavioral file iterations @@ -364,7 +363,7 @@ def fix_missing_iteration_trials(trials_diff_iteration_small, iteration_dict, be if not status: raise ValueError("Coud not find missing iteration in trial") - + iteration_dict['iter_start_idx'][idx_trial] = new_iter_start # Get new synced time vector for trial @@ -448,7 +447,7 @@ def behavior_sync_frame_counter_method(digital_array, behavior_time_vector, sess # framenumber_in_trial has the length of the number of samples of the NIDAQ card, and every entry is the currently presented iteration number of the VR in the respective trial. # trialnumber has the length of the number of samples of the NIDAQ card, and every entry is the current trial. # - # NOTE: some minor glitches have to be catched, if a NIDAQ sample happenes to be recorded while the VR System updates the iteration number. + # NOTE: some minor glitches have to be catched, if a NIDAQ sample happenes to be recorded while the VR System updates the iteration number. framenumber_in_trial = np.zeros(len(iterations_raw))*np.NaN trialnumber = np.zeros(len(iterations_raw))*np.NaN current_trial = 0 @@ -463,22 +462,22 @@ def behavior_sync_frame_counter_method(digital_array, behavior_time_vector, sess framenumber_in_trial[idx-1] = frame_number + overflow*(max_count+1) - 1 # In case this happened, the previous sample has to be corrected # Keep track of trial number endflag = framenumber_in_trial[idx-1] == (len(behavior_time_vector[current_trial])) # Trial end has been reached. - + transitionflag = frame_number == 2 # Next trial should start at zero again (it starts with two ??) if endflag & transitionflag: # Only at the transitions - + current_trial = current_trial + 1 # Increases trial count overflow = 0 # Reset the 7 bit counter for the next trial if overflow == 0: - framenumber_in_trial[idx] = frame_number - else: + framenumber_in_trial[idx] = frame_number + else: framenumber_in_trial[idx] = frame_number + overflow*(max_count+1) -1 trialnumber[idx] = current_trial trial_list = np.array(np.unique(trialnumber[np.isfinite(trialnumber)]), dtype = np.int64) - # 4: find and remove additional NIDAQ glitches of two types: + # 4: find and remove additional NIDAQ glitches of two types: # a) single samples where the iteration number is corrupted because sampling happened faster than output of the behevior PC. # b) Skipped frames are detected and filled in. # Find the glitches @@ -500,7 +499,7 @@ def behavior_sync_frame_counter_method(digital_array, behavior_time_vector, sess else: # If random number, nidaq sample in the middle of update. framenumber_in_trial[g+1] = framenumber_in_trial[g] - # A set of final asserts, making sure that the code worked as intended + # A set of final asserts, making sure that the code worked as intended assert len(trial_list) == len(session_trial_keys) # Make sure the trial number is correct. assert np.sum(np.diff(framenumber_in_trial)>1) == 0 # No frames should be skipped assert np.sum(np.diff(framenumber_in_trial)<0) 0: current_block_trial = current_block_trial.tolist() - valid_block = 1 + valid_block = 1 # One trial blocks are stored as dictionaries if isinstance(current_block_trial, dict): current_block_trial = [current_block_trial] - valid_block = 1 + valid_block = 1 if valid_block: block_trial_df = pd.DataFrame(current_block_trial) @@ -336,13 +336,13 @@ def convert_towers_block_2_df(current_block, num_block): valid_block = 0 # "Normal" blocks are stored as numpy arrays and its length is greater than 0 - if isinstance(current_block, np.ndarray) and current_block_trial.shape[0] > 0: + if isinstance(current_block, np.ndarray) and current_block.shape[0] > 0: current_block = current_block.tolist() - valid_block = 1 + valid_block = 1 # One trial blocks are stored as dictionaries if isinstance(current_block, dict): current_block = [current_block] - valid_block = 1 + valid_block = 1 if valid_block: block_df = pd.DataFrame(current_block) @@ -367,10 +367,7 @@ def convert_behavior_file(mat_file): matin = convert_mat_file_to_dict(mat_file) converted_metadata = convert_function_handle_to_str(mat_file_path=mat_file) - if bool(converted_metadata): - metadata_read = True - else: - metadata_read = False + metadata_read = bool(bool(converted_metadata)) session_block_trial_df = pd.DataFrame() @@ -389,10 +386,7 @@ def convert_behavior_file(mat_file): num_blocks_conv = 0 for i in range(length_blocks): - if dict_block: - block = matin['log']['block'] - else: - block = matin['log']['block'][i] + block = matin['log']['block'] if dict_block else matin['log']['block'][i] #Convert trial df and block df valid_block, block_trial_df = convert_towers_block_trial_2_df(block['trial'],i+1) @@ -414,6 +408,6 @@ def convert_behavior_file(mat_file): if metadata_read: session_block_trial_df['choice'] = converted_metadata['trial_choice'] session_block_trial_df['trialType'] = converted_metadata['trial_type'] - + session_block_trial_df = session_block_trial_df.reset_index(drop=True) return session_block_trial_df diff --git a/u19_pipeline/utils/path_utils.py b/u19_pipeline/utils/path_utils.py index 93fc391e..0136683c 100644 --- a/u19_pipeline/utils/path_utils.py +++ b/u19_pipeline/utils/path_utils.py @@ -1,9 +1,7 @@ -import pathlib -import os import glob +import os import subprocess -import sys file_patterns_acq = { "raw_imaging": ['/*.tiff', '/*.tif'], @@ -17,7 +15,6 @@ def check_file_pattern_dir(filepath, file_patterns): """ Check if directory (or its childs) contains some files with specific pattern names """ - dirs_with_session_files = [] child_dirs = [x[0] for x in os.walk(filepath)] patterns_found = 0 for dir in child_dirs: @@ -50,10 +47,7 @@ def get_size_directory(path): command = ["du", path, '-s'] s = subprocess.run(command, capture_output=True) output = s.stdout.decode('UTF-8') - if len(output) != 0: - kbytes = int(output.split('\t')[0]) - else: - kbytes = -1 + kbytes = int(output.split("\t")[0]) if len(output) != 0 else -1 return kbytes diff --git a/u19_pipeline/utils/scp_transfers.py b/u19_pipeline/utils/scp_transfers.py index 3a8798d0..8191f77d 100644 --- a/u19_pipeline/utils/scp_transfers.py +++ b/u19_pipeline/utils/scp_transfers.py @@ -1,13 +1,16 @@ import os -import psutil import subprocess import sys -from paramiko import SSHClient, AutoAddPolicy, RSAKey + +import psutil +from paramiko import AutoAddPolicy, RSAKey, SSHClient from paramiko.auth_handler import AuthenticationException, SSHException -from scp import SCPClient, SCPException +from scp import SCPClient -from u19_pipeline.automatic_job.clusters_paths_and_transfers import public_key_location as public_key_location +from u19_pipeline.automatic_job.clusters_paths_and_transfers import ( + public_key_location as public_key_location, +) #Steps on windows machine # https://thesysadminchannel.com/solved-add-windowscapability-failed-error-code-0x800f0954-rsat-fix/ diff --git a/u19_pipeline/utils/slack_utils.py b/u19_pipeline/utils/slack_utils.py index 79f11460..0c6bb570 100644 --- a/u19_pipeline/utils/slack_utils.py +++ b/u19_pipeline/utils/slack_utils.py @@ -2,10 +2,10 @@ import json import sys -import random -import requests from datetime import datetime +import requests + def send_slack_notification(webhook_url, slack_json_message):