diff --git a/plottr/apps/monitr.py b/plottr/apps/monitr.py index ee9bc6f6..56baa7e8 100644 --- a/plottr/apps/monitr.py +++ b/plottr/apps/monitr.py @@ -1,5 +1,5 @@ -""" plottr.monitr -- a GUI tool for monitoring data files. -""" +"""plottr.monitr -- a GUI tool for monitoring data files.""" + import copy import sys import os @@ -46,32 +46,40 @@ from ..apps.watchdog_classes import WatcherClient from ..gui.widgets import Collapsible from .json_viewer import JsonModel, JsonTreeView -from ..icons import get_starIcon as get_star_icon, get_trashIcon as get_trash_icon, get_completeIcon as get_complete_icon, get_interruptedIcon as get_interrupted_icon, get_imageIcon as get_img_icon, get_jsonIcon as get_json_icon, get_mdIcon as get_md_icon +from ..icons import ( + get_starIcon as get_star_icon, + get_trashIcon as get_trash_icon, + get_completeIcon as get_complete_icon, + get_interruptedIcon as get_interrupted_icon, + get_imageIcon as get_img_icon, + get_jsonIcon as get_json_icon, + get_mdIcon as get_md_icon, +) from .appmanager import AppManager TIMESTRFORMAT = "%Y-%m-%dT%H%M%S" # Change this variable to change the module of the app that monitr should open. -AUTOPLOTMODULE = 'plottr.apps.autoplot' +AUTOPLOTMODULE = "plottr.apps.autoplot" # Function that the app manager should run to open a new app. -AUTOPLOTFUNC = 'autoplotDDH5App' +AUTOPLOTFUNC = "autoplotDDH5App" -LOGGER = logging.getLogger('plottr.apps.monitr') +LOGGER = logging.getLogger("plottr.apps.monitr") def html_color_generator() -> Generator[str, None, None]: """ Generator that cycles through string colors for use in html code. """ - colors = ['red', 'blue', 'green', 'purple', 'orange', 'brown', 'magenta'] + colors = ["red", "blue", "green", "purple", "orange", "brown", "magenta"] for color in cycle(colors): yield color def is_file_lock(path: Path) -> bool: - if path.name[0] == '~' and path.suffix == '.lock': + if path.name[0] == "~" and path.suffix == ".lock": return True return False @@ -93,6 +101,7 @@ class ContentType(Enum): Enum class for the types of files that are of interest in the monitored subdirectories. Contains helper methods to sort files and assign colors to each file type. """ + data = auto() tag = auto() json = auto() @@ -112,17 +121,22 @@ def sort(cls, file: Optional[Union[str, Path]] = None) -> "ContentType": if not isinstance(file, str): file = str(file) extension = file.split(".")[-1].lower() - if extension == 'ddh5': + if extension == "ddh5": return ContentType.data - elif extension == 'tag': + elif extension == "tag": return ContentType.tag - elif extension == 'json': + elif extension == "json": return ContentType.json - elif extension == 'md': + elif extension == "md": return ContentType.md - elif extension == 'py': + elif extension == "py": return ContentType.py - elif extension == 'jpg' or extension == 'jpeg' or extension == 'png' or extension == 'image': + elif ( + extension == "jpg" + or extension == "jpeg" + or extension == "png" + or extension == "image" + ): return ContentType.image else: return ContentType.unknown @@ -133,18 +147,17 @@ def sort_Qcolor(cls, item: Optional["ContentType"] = None) -> QtGui.QBrush: Returns the Qt color for the specified ContentType """ if item == ContentType.data: - return QtGui.QBrush(QtGui.QColor('red')) + return QtGui.QBrush(QtGui.QColor("red")) if item == ContentType.tag: - return QtGui.QBrush(QtGui.QColor('blue')) + return QtGui.QBrush(QtGui.QColor("blue")) if item == ContentType.json: - return QtGui.QBrush(QtGui.QColor('green')) + return QtGui.QBrush(QtGui.QColor("green")) - return QtGui.QBrush(QtGui.QColor('black')) + return QtGui.QBrush(QtGui.QColor("black")) class SupportedDataTypes: - - valid_types = ['.ddh5', '.md', '.json', '.py'] + valid_types = [".ddh5", ".md", ".json", ".py"] @classmethod def check_valid_data(cls, file_names: Sequence[Union[str, Path]]) -> bool: @@ -189,41 +202,47 @@ def __init__(self, path: Path, files: Dict[Path, ContentType] = {}): self.show = True if files is not None: self.files.update(files) - self.tags = [file.stem for file, file_type in self.files.items() if file_type == ContentType.tag] - - if '__star__' in self.tags and '__trash__' in self.tags: - star_path = self.path.joinpath('__star__.tag') - trash_path = self.path.joinpath('__trash__.tag') + self.tags = [ + file.stem + for file, file_type in self.files.items() + if file_type == ContentType.tag + ] + + if "__star__" in self.tags and "__trash__" in self.tags: + star_path = self.path.joinpath("__star__.tag") + trash_path = self.path.joinpath("__trash__.tag") if star_path.is_file() and trash_path.is_file(): LOGGER.error( - f'The folder: {self.path} contains both the star and trash tag. Both tags will be deleted.') + f"The folder: {self.path} contains both the star and trash tag. Both tags will be deleted." + ) star_path.unlink() trash_path.unlink() - self.tags.remove('__star__') - self.tags.remove('__trash__') - elif '__star__' in self.tags: + self.tags.remove("__star__") + self.tags.remove("__trash__") + elif "__star__" in self.tags: self.star = True - self.tags.remove('__star__') - elif '__trash__' in self.tags: + self.tags.remove("__star__") + elif "__trash__" in self.tags: self.trash = True - self.tags.remove('__trash__') + self.tags.remove("__trash__") - if '__complete__' in self.tags and '__interrupted__' in self.tags: - complete_path = self.path.joinpath('__complete__.tag') - interrupted_path = self.path.joinpath('__interrupted__.tag') + if "__complete__" in self.tags and "__interrupted__" in self.tags: + complete_path = self.path.joinpath("__complete__.tag") + interrupted_path = self.path.joinpath("__interrupted__.tag") if complete_path.is_file() and interrupted_path.is_file(): LOGGER.error( - f'The folder: {self.path} contains both the complete and interrupted tag. Both tags will be deleted.') + f"The folder: {self.path} contains both the complete and interrupted tag. Both tags will be deleted." + ) complete_path.unlink() interrupted_path.unlink() - self.tags.remove('__complete__') - self.tags.remove('__interrupted__') - elif '__complete__' in self.tags: + self.tags.remove("__complete__") + self.tags.remove("__interrupted__") + elif "__complete__" in self.tags: self.complete = True - self.tags.remove('__complete__') - elif '__interrupted__' in self.tags: + self.tags.remove("__complete__") + elif "__interrupted__" in self.tags: self.interrupted = True - self.tags.remove('__interrupted__') + self.tags.remove("__interrupted__") self.tags_widget = ItemTagLabel(self.tags) @@ -243,60 +262,65 @@ def add_file(self, path: Path) -> None: model = self.model() assert isinstance(model, FileModel) - if path.name == '__star__.tag': + if path.name == "__star__.tag": # Check if the item is not already trash. - trash_path = path.parent.joinpath('__trash__.tag') + trash_path = path.parent.joinpath("__trash__.tag") if trash_path.is_file(): path.unlink() error_msg = QtWidgets.QMessageBox() - error_msg.setText(f'Folder is already trash. Please do not add both __trash__ and __star__ tags in the same folder. ' - f' \n {path} was deleted ') - error_msg.setWindowTitle(f'Deleting __star__.tag') + error_msg.setText( + f"Folder is already trash. Please do not add both __trash__ and __star__ tags in the same folder. " + f" \n {path} was deleted " + ) + error_msg.setWindowTitle(f"Deleting __star__.tag") error_msg.exec_() return else: self.star = True - elif path.name == '__trash__.tag': + elif path.name == "__trash__.tag": # Check if the item is not already star. - star_path = path.parent.joinpath('__star__.tag') + star_path = path.parent.joinpath("__star__.tag") if star_path.is_file(): path.unlink() error_msg = QtWidgets.QMessageBox() error_msg.setText( - f'Folder is already star. Please do not add both __trash__ and __star__ tags in the same folder. ' - f' \n {path} was deleted ') - error_msg.setWindowTitle(f'Deleting __trash__.tag') + f"Folder is already star. Please do not add both __trash__ and __star__ tags in the same folder. " + f" \n {path} was deleted " + ) + error_msg.setWindowTitle(f"Deleting __trash__.tag") error_msg.exec_() return else: self.trash = True - elif path.name == '__complete__.tag': + elif path.name == "__complete__.tag": # Check if the item is already tagged as interrupted. - interrupted_path = path.parent.joinpath('__interrupted__.tag') + interrupted_path = path.parent.joinpath("__interrupted__.tag") if interrupted_path.is_file(): path.unlink() error_msg = QtWidgets.QMessageBox() error_msg.setText( - f'Folder is already tagged as interrupted. Please do not add both __complete__ and __interrupted__ tags in the same folder.\n' - f'{path} was deleted.') - error_msg.setWindowTitle(f'Deleting __complete__.tag') + f"Folder is already tagged as interrupted. Please do not add both __complete__ and __interrupted__ tags in the same folder.\n" + f"{path} was deleted." + ) + error_msg.setWindowTitle(f"Deleting __complete__.tag") error_msg.exec_() return else: self.complete = True - elif path.name == '__interrupted__.tag': + elif path.name == "__interrupted__.tag": # Check if the item is already tagged as complete. - complete_path = path.parent.joinpath('__complete__.tag') + complete_path = path.parent.joinpath("__complete__.tag") if complete_path.is_file(): path.unlink() error_msg = QtWidgets.QMessageBox() error_msg.setText( - f'Folder is already tagged as complete. Please do not add both __complete__ and __interrupted__ tags in the same folder.\n' - f'{path} was deleted.') - error_msg.setWindowTitle(f'Deleting __interrupted__.tag') + f"Folder is already tagged as complete. Please do not add both __complete__ and __interrupted__ tags in the same folder.\n" + f"{path} was deleted." + ) + error_msg.setWindowTitle(f"Deleting __interrupted__.tag") error_msg.exec_() return else: @@ -327,13 +351,13 @@ def delete_file(self, path: Path) -> None: self.tags_widget.delete_tag(path.stem) model.tag_deleted(path.stem) - if path.name == '__star__.tag': + if path.name == "__star__.tag": self.star = False - elif path.name == '__trash__.tag': + elif path.name == "__trash__.tag": self.trash = False - elif path.name == '__complete__.tag': + elif path.name == "__complete__.tag": self.complete = False - elif path.name == '__interrupted__.tag': + elif path.name == "__interrupted__.tag": self.interrupted = False model.item_files_changed(self) @@ -381,6 +405,7 @@ class FileModel(QtGui.QStandardItemModel): :param Parent: The parent of the model. :param watcher_on: If False, the model will not start the watcher. """ + # Signal(Path) -- Emitted when there has been an update to the currently selected folder. #: Arguments: #: - The path of the currently selected folder. @@ -417,11 +442,17 @@ class FileModel(QtGui.QStandardItemModel): #: - The deleted tag. tag_deleted_signal = Signal(str) - def __init__(self, monitor_path: str, rows: int, columns: int, parent: Optional[Any] = None, - watcher_on: bool = True): + def __init__( + self, + monitor_path: str, + rows: int, + columns: int, + parent: Optional[Any] = None, + watcher_on: bool = True, + ): super().__init__(rows, columns, parent=parent) self.monitor_path = Path(monitor_path) - self.header_labels = ['File path', 'Tags'] + self.header_labels = ["File path", "Tags"] self.currently_selected_folder = None # The main dictionary has all the datasets (folders) Path as keys, with the actual item as its value. @@ -429,7 +460,7 @@ def __init__(self, monitor_path: str, rows: int, columns: int, parent: Optional[ self.tags_dict: Dict[str, int] = {} self.tags_model = QtGui.QStandardItemModel() self.tags_model.dataChanged.connect(self.on_checked_tag_change) - first_tag_item = QtGui.QStandardItem('Tag Filter') + first_tag_item = QtGui.QStandardItem("Tag Filter") first_tag_item.setSelectable(False) self.tags_model.insertRow(0, first_tag_item) self.load_data() @@ -507,14 +538,20 @@ def load_data(self) -> None: # Directory_2: {file_1: file_type # file_2: file_type}...} data_dictionary = { - Path(walk_entry[0]): {Path(walk_entry[0]).joinpath(file): ContentType.sort(file) for file in walk_entry[2]} - for walk_entry in walk_results if SupportedDataTypes.check_valid_data(file_names=walk_entry[ - 2])} + Path(walk_entry[0]): { + Path(walk_entry[0]).joinpath(file): ContentType.sort(file) + for file in walk_entry[2] + } + for walk_entry in walk_results + if SupportedDataTypes.check_valid_data(file_names=walk_entry[2]) + } for folder_path, files_dict in data_dictionary.items(): self.sort_and_add_item(folder_path, files_dict) - def sort_and_add_item(self, folder_path: Path, files_dict: Optional[Dict] = None) -> Optional[bool]: + def sort_and_add_item( + self, folder_path: Path, files_dict: Optional[Dict] = None + ) -> Optional[bool]: """ Adds one or more items into the model. New parent items are created if required. @@ -527,28 +564,40 @@ def sort_and_add_item(self, folder_path: Path, files_dict: Optional[Dict] = None if folder_path == self.monitor_path: if files_dict is not None: - LOGGER.warning(f'The following files in the monitoring directory will not be displayed: ' - f'\n{[str(file) for file in files_dict if file.is_file()]}\nplease move them to a specific folder') + LOGGER.warning( + f"The following files in the monitoring directory will not be displayed: " + f"\n{[str(file) for file in files_dict if file.is_file()]}\nplease move them to a specific folder" + ) else: - LOGGER.warning(f'Files have been found in the monitoring folder, please remove them') + LOGGER.warning( + f"Files have been found in the monitoring folder, please remove them" + ) return False # Checks if the item is in a hidden folder, if it is ignore. - if any(part.startswith('.') for part in folder_path.parts): return None + if any(part.startswith(".") for part in folder_path.parts): + return None # Check if the new item should have a parent item. If the new item should have a parent, but this does # not yet exist, create it. if folder_path.parent == self.monitor_path: parent_item, parent_path = None, None elif folder_path.parent in self.main_dictionary: - parent_item, parent_path = \ - self.main_dictionary[folder_path.parent], folder_path.parent + parent_item, parent_path = ( + self.main_dictionary[folder_path.parent], + folder_path.parent, + ) else: - parent_folder_files = {file: ContentType.sort(file) for file in folder_path.parent.iterdir() if - file.is_file()} + parent_folder_files = { + file: ContentType.sort(file) + for file in folder_path.parent.iterdir() + if file.is_file() + } self.sort_and_add_item(folder_path.parent, parent_folder_files) - parent_item, parent_path = \ - self.main_dictionary[folder_path.parent], folder_path.parent + parent_item, parent_path = ( + self.main_dictionary[folder_path.parent], + folder_path.parent, + ) # Create Item and add it to the model if files_dict is None: @@ -562,7 +611,9 @@ def sort_and_add_item(self, folder_path: Path, files_dict: Optional[Dict] = None else: self.tags_dict[tag] = 1 new_tag_item = QtGui.QStandardItem(tag) # Item for the combox model - new_tag_item.setFlags(QtCore.Qt.ItemIsUserCheckable | QtCore.Qt.ItemIsEnabled) + new_tag_item.setFlags( + QtCore.Qt.ItemIsUserCheckable | QtCore.Qt.ItemIsEnabled + ) new_tag_item.setData(QtCore.Qt.Unchecked, QtCore.Qt.CheckStateRole) self.tags_model.setItem(self.tags_model.rowCount(), 0, new_tag_item) @@ -599,24 +650,29 @@ def on_file_created(self, event: FileSystemEvent) -> None: path = Path(str(event.src_path)) # If a folder is created, it will be added when a data file will be created. - if not path.is_dir() and not any(part.startswith('.') for part in path.parts): - + if not path.is_dir() and not any(part.startswith(".") for part in path.parts): # If the file created is a lock, we ignore it. if not is_file_lock(path): - # Every folder that is currently in the tree will be in the main dictionary. if path.parent in self.main_dictionary: parent = self.main_dictionary[path.parent] if path not in parent.files: parent.add_file(path) - if self.currently_selected_folder is not None and _is_relative_to(parent.path, - self.currently_selected_folder): + if ( + self.currently_selected_folder is not None + and _is_relative_to( + parent.path, self.currently_selected_folder + ) + ): self.update_me.emit(parent.path) # If the parent of the file does not exist, we first need to check that file is valid data. elif SupportedDataTypes.check_valid_data([path]): - new_files_dict = {file: ContentType.sort(file) for file in path.parent.iterdir() if - str(file.suffix) != ''} + new_files_dict = { + file: ContentType.sort(file) + for file in path.parent.iterdir() + if str(file.suffix) != "" + } # If sort_and_add_item returns false, it means that it could not add an item because it was # triggered for files in the monitoring directory. @@ -624,8 +680,12 @@ def on_file_created(self, event: FileSystemEvent) -> None: if added_status is None: item = self.main_dictionary[path.parent] # Send signal indicating that current folder requires update - if self.currently_selected_folder is not None and \ - _is_relative_to(item.path, self.currently_selected_folder): + if ( + self.currently_selected_folder is not None + and _is_relative_to( + item.path, self.currently_selected_folder + ) + ): self.update_me.emit(item.path) @Slot(FileSystemEvent) @@ -654,7 +714,6 @@ def on_file_deleted(self, event: FileSystemEvent) -> None: if path.parent in self.main_dictionary: # If the file created is a lock, we ignore it. if not is_file_lock(path): - parent = self.main_dictionary[path.parent] if path in parent.files: # Checks if the file is a data file. @@ -663,8 +722,10 @@ def on_file_deleted(self, event: FileSystemEvent) -> None: all_folder_files = [file for file in parent.path.iterdir()] # Checks if the folder itself needs to be deleted or only the file - if SupportedDataTypes.check_valid_data( - all_folder_files) or parent.hasChildren(): + if ( + SupportedDataTypes.check_valid_data(all_folder_files) + or parent.hasChildren() + ): parent.delete_file(path) else: # If the parent needs to be deleted, removes it from the correct widget. @@ -676,8 +737,9 @@ def on_file_deleted(self, event: FileSystemEvent) -> None: else: parent.delete_file(path) # Send signal indicating that current folder requires update. - if self.currently_selected_folder is not None and \ - _is_relative_to(parent.path, self.currently_selected_folder): + if self.currently_selected_folder is not None and _is_relative_to( + parent.path, self.currently_selected_folder + ): # Checks if the folder still exists. If the user has the folder that is getting deleted at that # moment, no update should happen. if self.currently_selected_folder.is_dir(): @@ -690,7 +752,11 @@ def _delete_all_children_from_main_dictionary(self, item: Item) -> None: :param item: The item whose children should be deleted. """ path = item.path - children_folders = [key for key in self.main_dictionary.keys() if _is_relative_to(key, path) and key != path] + children_folders = [ + key + for key in self.main_dictionary.keys() + if _is_relative_to(key, path) and key != path + ] for child in children_folders: if child in self.main_dictionary: child_item = self.main_dictionary[child] @@ -709,8 +775,12 @@ def on_file_moved(self, event: FileSystemMovedEvent) -> None: # File moved gets triggered with None and '', for the event paths. From what I can tell, they are not useful, # so we ignore them. - if event.src_path is not None and event.src_path != '' \ - and event.dest_path is not None and event.dest_path != '': + if ( + event.src_path is not None + and event.src_path != "" + and event.dest_path is not None + and event.dest_path != "" + ): src_path = Path(str(event.src_path)) dest_path = Path(str(event.dest_path)) @@ -722,8 +792,9 @@ def on_file_moved(self, event: FileSystemMovedEvent) -> None: changed_item.change_path(dest_path) # Checking for a file becoming a data file. - elif not SupportedDataTypes.check_valid_data([src_path]) and SupportedDataTypes.check_valid_data( - [dest_path]): + elif not SupportedDataTypes.check_valid_data( + [src_path] + ) and SupportedDataTypes.check_valid_data([dest_path]): # If the parent exists in the main dictionary, the model already has all the files and its tracking # that folder, only updates the file itself. if src_path.parent in self.main_dictionary: @@ -737,14 +808,18 @@ def on_file_moved(self, event: FileSystemMovedEvent) -> None: # New folder to keep track. else: - new_entry = {file: ContentType.sort(file) for file in dest_path.parent.iterdir() if - str(file.suffix) != ''} + new_entry = { + file: ContentType.sort(file) + for file in dest_path.parent.iterdir() + if str(file.suffix) != "" + } self.sort_and_add_item(dest_path.parent, new_entry) parent = self.main_dictionary[dest_path.parent] # Checking if a data file stops being a data file. - elif SupportedDataTypes.check_valid_data([src_path]) and not SupportedDataTypes.check_valid_data( - [dest_path]): + elif SupportedDataTypes.check_valid_data( + [src_path] + ) and not SupportedDataTypes.check_valid_data([dest_path]): if src_path.parent in self.main_dictionary: parent = self.main_dictionary[src_path.parent] elif dest_path.parent in self.main_dictionary: @@ -756,8 +831,10 @@ def on_file_moved(self, event: FileSystemMovedEvent) -> None: # Checks if there are other data files in the parent. parent_files = [key for key in parent.files.keys()] - if not SupportedDataTypes.check_valid_data( - parent_files) and not parent.hasChildren(): + if ( + not SupportedDataTypes.check_valid_data(parent_files) + and not parent.hasChildren() + ): # If the parent has other children, it means there are more data files down the file tree # and the model should keep track of these folders. del self.main_dictionary[parent.path] @@ -792,17 +869,23 @@ def on_file_moved(self, event: FileSystemMovedEvent) -> None: if dest_path not in parent.files: parent.add_file(dest_path) - if self.currently_selected_folder is not None and _is_relative_to(dest_path, self.currently_selected_folder): + if self.currently_selected_folder is not None and _is_relative_to( + dest_path, self.currently_selected_folder + ): # This happens when a top level item is changed. if parent is None: - check = self.check_all_files_are_valid(self.main_dictionary[dest_path], dest_path)[0] + check = self.check_all_files_are_valid( + self.main_dictionary[dest_path], dest_path + )[0] else: check = self.check_all_files_are_valid(parent, parent.path)[0] if check: self.update_me.emit(self.currently_selected_folder) - def check_all_files_are_valid(self, item: Item, first_path: Path) -> Tuple[bool, Path]: + def check_all_files_are_valid( + self, item: Item, first_path: Path + ) -> Tuple[bool, Path]: """ Checks that all the files inside of the item have a valid path. This is used when changing the name of currently selected folders to see if an update to change the folders should be triggered or not. @@ -837,15 +920,13 @@ def on_file_modified(self, event: FileSystemEvent) -> None: path = Path(str(event.src_path)) if path.parent in self.main_dictionary: - # If the file created is a lock, we ignore it. if not is_file_lock(path): - parent = self.main_dictionary[path.parent] # If the folder is not currently being selected I don't care about modifications. - if self.currently_selected_folder is not None and _is_relative_to(parent.path, - self.currently_selected_folder): - + if self.currently_selected_folder is not None and _is_relative_to( + parent.path, self.currently_selected_folder + ): # If im expecting this update, ignore it. if path in self.modified_exceptions: self.modified_exceptions.remove(path) @@ -887,7 +968,9 @@ def update_currently_selected_folder(self, path: Path) -> None: self.modified_exceptions = self._get_all_files_of_item(item) self.currently_selected_folder = path - def _get_all_files_of_item(self, item: Item, partial_list: List[Path] = []) -> List[Path]: + def _get_all_files_of_item( + self, item: Item, partial_list: List[Path] = [] + ) -> List[Path]: """ Recursively gets a list of all the files that are in item and all of its children. @@ -897,7 +980,7 @@ def _get_all_files_of_item(self, item: Item, partial_list: List[Path] = []) -> L partial_list = partial_list + [file for file in item.files.keys()] if item.hasChildren(): for i in range(item.rowCount()): - child = item.child(i,0) + child = item.child(i, 0) assert isinstance(child, Item) partial_list = partial_list + self._get_all_files_of_item(child) return partial_list @@ -919,9 +1002,9 @@ def tag_action_triggered(self, item_index: QtCore.QModelIndex, tag: str) -> None item = self.itemFromIndex(item_index) assert isinstance(item, Item) path = item.path - star_path = path.joinpath('__star__.tag') - trash_path = path.joinpath('__trash__.tag') - if tag == 'star': + star_path = path.joinpath("__star__.tag") + trash_path = path.joinpath("__trash__.tag") + if tag == "star": # If a trash file in the star folder exists, delete it. if trash_path.is_file(): trash_path.unlink() @@ -930,9 +1013,9 @@ def tag_action_triggered(self, item_index: QtCore.QModelIndex, tag: str) -> None if star_path.is_file(): star_path.unlink() else: - with open(star_path, 'w') as file: - file.write('') - elif tag == 'trash': + with open(star_path, "w") as file: + file.write("") + elif tag == "trash": # If a star file in the star folder exists, delete it. if star_path.is_file(): star_path.unlink() @@ -941,16 +1024,16 @@ def tag_action_triggered(self, item_index: QtCore.QModelIndex, tag: str) -> None if trash_path.is_file(): trash_path.unlink() else: - with open(trash_path, 'w') as file: - file.write('') + with open(trash_path, "w") as file: + file.write("") else: - tag_path = path.joinpath(tag + '.tag') + tag_path = path.joinpath(tag + ".tag") if tag_path.is_file(): tag_path.unlink() else: - with open(tag_path, 'w') as file: - file.write('') + with open(tag_path, "w") as file: + file.write("") def delete_item(self, item_index: QtCore.QModelIndex) -> None: """ @@ -1013,8 +1096,11 @@ def tag_deleted(self, tag: str) -> None: self.tag_deleted_signal.emit(tag) def currently_selected_tags(self) -> List[str]: - return [self.tags_model.item(i, 0).text() for i in range(self.tags_model.rowCount()) if - self.tags_model.item(i, 0).checkState()] + return [ + self.tags_model.item(i, 0).text() + for i in range(self.tags_model.rowCount()) + if self.tags_model.item(i, 0).checkState() + ] @Slot() def on_checked_tag_change(self) -> None: @@ -1033,7 +1119,6 @@ def quit(self) -> None: class SortFilterProxyModel(QtCore.QSortFilterProxyModel): - # Signal() -- Emitted before filtering is going to happen. filter_incoming = Signal() @@ -1058,13 +1143,20 @@ def setSourceModel(self, sourceModel: QtCore.QAbstractItemModel) -> None: self.allowed_items = [item for item in sourceModel.main_dictionary.values()] super().setSourceModel(sourceModel) - def filter_requested(self, allowed_items: List[QtGui.QStandardItem], star_status: bool, trash_status: bool) -> None: + def filter_requested( + self, + allowed_items: List[QtGui.QStandardItem], + star_status: bool, + trash_status: bool, + ) -> None: self.star_status = star_status self.trash_status = trash_status self.allowed_items = allowed_items self.trigger_filter() - def filterAcceptsRow(self, source_row: int, source_parent: QtCore.QModelIndex) -> bool: + def filterAcceptsRow( + self, source_row: int, source_parent: QtCore.QModelIndex + ) -> bool: """ Override of the QSortFilterProxyModel. Our custom filtering needs are implemented here. Checks whether or not to show the item against its allowed items list. @@ -1082,7 +1174,11 @@ def filterAcceptsRow(self, source_row: int, source_parent: QtCore.QModelIndex) - else: item = parent_item.child(source_row, 0) - if self.allowed_items is None and self.star_status == False and self.trash_status == False: + if ( + self.allowed_items is None + and self.star_status == False + and self.trash_status == False + ): if item is not None: item.show = True return True @@ -1145,16 +1241,16 @@ def __init__(self, proxy_model: SortFilterProxyModel, parent: Optional[Any] = No assert isinstance(model, FileModel) self.model_ = model self.collapsed_state: Dict[QtCore.QPersistentModelIndex, bool] = {} - self.star_text = 'star' - self.un_star_text = 'un-star' - self.trash_text = 'trash' - self.un_trash_text = 'un-trash' + self.star_text = "star" + self.un_star_text = "un-star" + self.trash_text = "trash" + self.un_trash_text = "un-trash" self.context_menu = QtWidgets.QMenu(self) - self.copy_path_action = QtWidgets.QAction('copy path') - self.star_action = QtWidgets.QAction('star') - self.trash_action = QtWidgets.QAction('trash') - self.delete_action = QtWidgets.QAction('delete') + self.copy_path_action = QtWidgets.QAction("copy path") + self.star_action = QtWidgets.QAction("star") + self.trash_action = QtWidgets.QAction("trash") + self.delete_action = QtWidgets.QAction("delete") self.tag_actions: Dict[str, QtWidgets.QAction] = {} for tag in self.model_.tags_dict.keys(): if tag not in self.tag_actions: @@ -1259,7 +1355,7 @@ def on_context_menu_requested(self, pos: QtCore.QPoint) -> None: self.context_menu.addSeparator() for tag, action in self.tag_actions.items(): if tag in item.tags: - action.setText('un-' + tag) + action.setText("un-" + tag) else: action.setText(tag) self.context_menu.addAction(action) @@ -1294,7 +1390,7 @@ def on_delete_tag_action(self, deleted_tag: str) -> None: @Slot(QtWidgets.QAction) def on_context_action_triggered(self, action: QtWidgets.QAction) -> None: tag = action.text() - if tag[0:3] == 'un-': + if tag[0:3] == "un-": tag = tag[3:] item_proxy_index = self.currentIndex() @@ -1308,7 +1404,9 @@ def on_context_action_triggered(self, action: QtWidgets.QAction) -> None: self.model_.tag_action_triggered(item_index, tag) - def currentChanged(self, current: QtCore.QModelIndex, previous: QtCore.QModelIndex) -> None: + def currentChanged( + self, current: QtCore.QModelIndex, previous: QtCore.QModelIndex + ) -> None: """ Gets called everytime the selection of the tree changes. Emits a signal indicating the current and previous selected item. @@ -1331,7 +1429,9 @@ def create_collapsed_state(self, incoming_item: Optional[Item] = None) -> None: if item.show: source_index = self.model_.index(i, 0, QtCore.QModelIndex()) index = self.proxy_model.mapFromSource(source_index) - self.collapsed_state[QtCore.QPersistentModelIndex(source_index)] = self.isExpanded(index) + self.collapsed_state[QtCore.QPersistentModelIndex(source_index)] = ( + self.isExpanded(index) + ) if item.hasChildren(): self.create_collapsed_state(incoming_item=item) @@ -1343,7 +1443,9 @@ def create_collapsed_state(self, incoming_item: Optional[Item] = None) -> None: if child.show: source_index = self.model_.indexFromItem(child) child_index = self.proxy_model.mapFromSource(source_index) - self.collapsed_state[QtCore.QPersistentModelIndex(source_index)] = self.isExpanded(child_index) + self.collapsed_state[QtCore.QPersistentModelIndex(source_index)] = ( + self.isExpanded(child_index) + ) if child.hasChildren(): self.create_collapsed_state(incoming_item=child) @@ -1353,7 +1455,12 @@ def restore_previous_collapsed_state(self) -> None: """ for persistent_index, state in self.collapsed_state.items(): proxy_index = self.proxy_model.mapFromSource( - self.model_.index(persistent_index.row(), persistent_index.column(), persistent_index.parent())) + self.model_.index( + persistent_index.row(), + persistent_index.column(), + persistent_index.parent(), + ) + ) self.setExpanded(proxy_index, state) @@ -1368,13 +1475,28 @@ class FilterWorker(QtCore.QObject): # dictionary have passed the filtering. Second item is the queries dictionary. finished = Signal(tuple) - def run(self, model: FileModel, star_status: bool, trash_status: bool, filter: str, tag_filter: List[str] = []) -> None: - filter_dict = self.filter_items(model, star_status, trash_status, filter, tag_filter) + def run( + self, + model: FileModel, + star_status: bool, + trash_status: bool, + filter: str, + tag_filter: List[str] = [], + ) -> None: + filter_dict = self.filter_items( + model, star_status, trash_status, filter, tag_filter + ) if filter_dict is not None: self.finished.emit(filter_dict) - def filter_items(self, model: FileModel, star_status: bool, trash_status: bool, filter: str, - tag_filter: List[str] = []) -> Optional[Tuple[Dict[Path, Item], Dict[str, List[str]]]]: + def filter_items( + self, + model: FileModel, + star_status: bool, + trash_status: bool, + filter: str, + tag_filter: List[str] = [], + ) -> Optional[Tuple[Dict[Path, Item], Dict[str, List[str]]]]: """ Process the text in filter, separates them into the different queries and filters the items. @@ -1423,7 +1545,9 @@ def filter_items(self, model: FileModel, star_status: bool, trash_status: bool, assert trashed_dict is not None if item.path not in trashed_dict and item.trash: # When trashing an item, keep a record that it has been trashed and all of its children. - trashed_dict, current_dict = self._trash_item(item, trashed_dict, current_dict) + trashed_dict, current_dict = self._trash_item( + item, trashed_dict, current_dict + ) if trashed_dict is None: return None continue @@ -1436,22 +1560,31 @@ def filter_items(self, model: FileModel, star_status: bool, trash_status: bool, for query_type, queries in queries_dict.items(): if self.thread().isInterruptionRequested(): return None - if query_type == 'name': + if query_type == "name": for query in queries: if self.thread().isInterruptionRequested(): return None match_pattern = re.compile(query, flags=re.IGNORECASE) - new_matches = {path: item for path, item in current_dict.items() if - match_pattern.search(str(path))} + new_matches = { + path: item + for path, item in current_dict.items() + if match_pattern.search(str(path)) + } current_dict = new_matches else: for query in queries: if self.thread().isInterruptionRequested(): return None match_pattern = re.compile(query, flags=re.IGNORECASE) - new_matches = {path: item for path, item in current_dict.items() - for file_path, file_type in item.files.items() - if (file_type == ContentType.sort(query_type) and match_pattern.search(str(file_path.name)))} + new_matches = { + path: item + for path, item in current_dict.items() + for file_path, file_type in item.files.items() + if ( + file_type == ContentType.sort(query_type) + and match_pattern.search(str(file_path.name)) + ) + } current_dict = new_matches # Add all the children and parents (if these have not been trashed) of the passed items. @@ -1476,8 +1609,12 @@ def filter_items(self, model: FileModel, star_status: bool, trash_status: bool, return current_dict, queries_dict - def _add_parent(self, item: Item, adding_dict: Optional[Dict[Path, Item]], - trashed_dictionary: Dict[Path, Item]) -> Optional[Dict[Path, Item]]: + def _add_parent( + self, + item: Item, + adding_dict: Optional[Dict[Path, Item]], + trashed_dictionary: Dict[Path, Item], + ) -> Optional[Dict[Path, Item]]: """ Adds all the parents (if these have not been trashed) of item to adding_dict. @@ -1503,8 +1640,12 @@ def _add_parent(self, item: Item, adding_dict: Optional[Dict[Path, Item]], adding_dict[parent_item.path] = parent_item return adding_dict - def _add_children(self, item: Item, adding_dict: Optional[Dict[Path, Item]], - trashed_dictionary: Optional[Dict[Path, Item]]) -> Optional[Dict[Path, Item]]: + def _add_children( + self, + item: Item, + adding_dict: Optional[Dict[Path, Item]], + trashed_dictionary: Optional[Dict[Path, Item]], + ) -> Optional[Dict[Path, Item]]: """ Adds all the children of an item (if these haven not be trashed) to adding_dict. @@ -1522,15 +1663,21 @@ def _add_children(self, item: Item, adding_dict: Optional[Dict[Path, Item]], if child.path in trashed_dictionary: continue if child.hasChildren(): - adding_dict = self._add_children(child, adding_dict, trashed_dictionary) + adding_dict = self._add_children( + child, adding_dict, trashed_dictionary + ) if adding_dict is None: return None adding_dict[child.path] = child return adding_dict - def _trash_item(self, item: Item, trashed_dict: Optional[Dict[Path, Item]], current_dict: Dict[Path, Item]) -> Tuple[ - Optional[Dict[Path, Item]], Dict[Path, Item]]: + def _trash_item( + self, + item: Item, + trashed_dict: Optional[Dict[Path, Item]], + current_dict: Dict[Path, Item], + ) -> Tuple[Optional[Dict[Path, Item]], Dict[Path, Item]]: """ Trashes an item and all of its children items. Removes the items trashed from current_dict. @@ -1544,7 +1691,9 @@ def _trash_item(self, item: Item, trashed_dict: Optional[Dict[Path, Item]], curr return None, {} child = item.child(i, 0) assert isinstance(child, Item) - trashed_dict, current_dict = self._trash_item(child, trashed_dict, current_dict) + trashed_dict, current_dict = self._trash_item( + child, trashed_dict, current_dict + ) if trashed_dict is None: return None, {} @@ -1555,7 +1704,9 @@ def _trash_item(self, item: Item, trashed_dict: Optional[Dict[Path, Item]], curr return trashed_dict, current_dict @classmethod - def parse_queries(cls, filter: str, tag_filter: List[str] = []) -> Dict[str, List[str]]: + def parse_queries( + cls, filter: str, tag_filter: List[str] = [] + ) -> Dict[str, List[str]]: """ Separates a string of queries into a dictionary where the queries are organized by categories. @@ -1575,9 +1726,14 @@ def parse_queries(cls, filter: str, tag_filter: List[str] = []) -> Dict[str, Lis :returns: Dictionary with the keys: tag, md, image, json, and name. Each contains a list with the queries for each respective category. """ - raw_queries = filter.split(',') - queries_with_empty_spaces = [item[1:] if len(item) >= 1 and item[0] == " " else item for item in raw_queries] - queries = [item for item in queries_with_empty_spaces if item != '' and item != ' '] + raw_queries = filter.split(",") + queries_with_empty_spaces = [ + item[1:] if len(item) >= 1 and item[0] == " " else item + for item in raw_queries + ] + queries = [ + item for item in queries_with_empty_spaces if item != "" and item != " " + ] queries_dict = {} if len(queries) > 0 or len(tag_filter) > 0: @@ -1587,32 +1743,34 @@ def parse_queries(cls, filter: str, tag_filter: List[str] = []) -> Dict[str, Lis json_queries = [] name_queries = [] for query in queries: - if query != '': - if query[:4] == 'tag:': + if query != "": + if query[:4] == "tag:": tag_queries.append(cls._remove_whitespace(query[4:])) - elif query[:2] == 't:' or query[:2] == 'T:': + elif query[:2] == "t:" or query[:2] == "T:": tag_queries.append(cls._remove_whitespace(query[2:])) - elif query[:3] == 'md:': + elif query[:3] == "md:": md_queries.append(cls._remove_whitespace(query[3:])) - elif query[:2] == 'm:' or query[:2] == 'M:': + elif query[:2] == "m:" or query[:2] == "M:": md_queries.append(cls._remove_whitespace(query[2:])) - elif query[:6] == 'image:': + elif query[:6] == "image:": image_queries.append(cls._remove_whitespace(query[6:])) - elif query[:2] == 'i:' or query[:2] == 'I:': + elif query[:2] == "i:" or query[:2] == "I:": image_queries.append(cls._remove_whitespace(query[2:])) - elif query[:5] == 'json:': + elif query[:5] == "json:": json_queries.append(cls._remove_whitespace(query[5:])) - elif query[:2] == 'j:' or query[:2] == 'J:': + elif query[:2] == "j:" or query[:2] == "J:": json_queries.append(cls._remove_whitespace(query[2:])) else: name_queries.append(cls._remove_whitespace(query)) tag_queries = list(set(tag_queries + tag_filter)) - queries_dict = {'tag': tag_queries, - 'md': md_queries, - 'image': image_queries, - 'json': json_queries, - 'name': name_queries, } + queries_dict = { + "tag": tag_queries, + "md": md_queries, + "image": image_queries, + "json": json_queries, + "name": name_queries, + } return queries_dict @@ -1624,14 +1782,21 @@ def _remove_whitespace(cls, text: str) -> str: :param text: The string we want to remove the initial or ending whitespace. """ if len(text) > 0: - if text[0] == ' ': + if text[0] == " ": text = text[1:] - if len(text) > 0 and text[-1] == ' ': + if len(text) > 0 and text[-1] == " ": text = text[0:-1] return text @classmethod - def is_item_shown(cls, item: Item, filter: str, tag_filter: List[str], star_status: bool, trash_status: bool) -> bool: + def is_item_shown( + cls, + item: Item, + filter: str, + tag_filter: List[str], + star_status: bool, + trash_status: bool, + ) -> bool: """ Checks if the item should be currently shown. True if it should, False if it shouldn't. It takes into account all the rules of normal filtering. @@ -1659,7 +1824,9 @@ def is_item_shown(cls, item: Item, filter: str, tag_filter: List[str], star_stat return True @classmethod - def _item_check(cls, item: Item, star_status: bool, queries_dict: Dict[str, List[str]]) -> bool: + def _item_check( + cls, item: Item, star_status: bool, queries_dict: Dict[str, List[str]] + ) -> bool: """ Checks if the item passes the queries in the queries dict, Including if the item is a star when the star status is activated. @@ -1674,7 +1841,7 @@ def _item_check(cls, item: Item, star_status: bool, queries_dict: Dict[str, List return False for query_type, queries in queries_dict.items(): - if query_type == 'name': + if query_type == "name": for query in queries: match_pattern = re.compile(query, flags=re.IGNORECASE) if not match_pattern.search(str(item.path)): @@ -1682,22 +1849,30 @@ def _item_check(cls, item: Item, star_status: bool, queries_dict: Dict[str, List else: if len(queries) > 0: sorted_query_type = ContentType.sort(query_type) - correct_files_type = [file_path for file_path, file_type in item.files.items() if - file_type == sorted_query_type] + correct_files_type = [ + file_path + for file_path, file_type in item.files.items() + if file_type == sorted_query_type + ] if not len(correct_files_type) > 0: return False for query in queries: match_pattern = re.compile(query, flags=re.IGNORECASE) - matches = [match_pattern.search(str(path)) for path in correct_files_type] + matches = [ + match_pattern.search(str(path)) + for path in correct_files_type + ] if not any(matches): return False return True @classmethod - def _parents_query_check(cls, item: Item, star_status: bool, queries_dict: Dict[str, List[str]]) -> bool: + def _parents_query_check( + cls, item: Item, star_status: bool, queries_dict: Dict[str, List[str]] + ) -> bool: """ Checks recursively if any parent of the item passes the query check. @@ -1711,10 +1886,16 @@ def _parents_query_check(cls, item: Item, star_status: bool, queries_dict: Dict[ if cls._item_check(item, star_status, queries_dict): return True - return cls._item_check(parent, star_status, queries_dict, ) + return cls._item_check( + parent, + star_status, + queries_dict, + ) @classmethod - def _children_query_check(cls, item: Item, star_status: bool, queries_dict: Dict[str, List[str]]) -> bool: + def _children_query_check( + cls, item: Item, star_status: bool, queries_dict: Dict[str, List[str]] + ) -> bool: """ Checks recursively if any child of the item passes the query check. @@ -1761,8 +1942,13 @@ class FileExplorer(QtWidgets.QWidget): Helper widget to unify the FileTree with the line edit and status buttons. """ - def __init__(self, proxy_model: SortFilterProxyModel, parent: Optional[Any]=None, - *args: Any, **kwargs: Any): + def __init__( + self, + proxy_model: SortFilterProxyModel, + parent: Optional[Any] = None, + *args: Any, + **kwargs: Any, + ): super().__init__(parent=parent, *args, **kwargs) # type: ignore[misc] # I suspect this error comes from having parent possibly be a kwarg too. # Holds all the current .ddh5 file paths that are currently being displayed. @@ -1781,16 +1967,18 @@ def __init__(self, proxy_model: SortFilterProxyModel, parent: Optional[Any]=None self.bottom_buttons_layout = QtWidgets.QHBoxLayout() self.filter_line_edit = QtWidgets.QLineEdit() - self.filter_line_edit.setPlaceholderText('Filter Items') - - self.star_button = QtWidgets.QPushButton('Star') - self.trash_button = QtWidgets.QPushButton('Hide Trash') - self.refresh_button = QtWidgets.QPushButton('Refresh') - self.expand_button = QtWidgets.QPushButton('Expand') - self.collapse_button = QtWidgets.QPushButton('Collapse') - self.copy_button = QtWidgets.QPushButton('Copy Path') + self.filter_line_edit.setPlaceholderText("Filter Items") + + self.star_button = QtWidgets.QPushButton("Star") + self.trash_button = QtWidgets.QPushButton("Hide Trash") + self.refresh_button = QtWidgets.QPushButton("Refresh") + self.expand_button = QtWidgets.QPushButton("Expand") + self.collapse_button = QtWidgets.QPushButton("Collapse") + self.copy_button = QtWidgets.QPushButton("Copy Path") self.tag_filter_combobox = QtWidgets.QComboBox() - self.tag_filter_combobox.setSizePolicy(QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Minimum) + self.tag_filter_combobox.setSizePolicy( + QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Minimum + ) self.tag_filter_combobox.setModel(self.model.tags_model) self.selected_tags: List[str] = [] self.queries_dict: Dict[str, List[str]] = {} @@ -1799,7 +1987,9 @@ def __init__(self, proxy_model: SortFilterProxyModel, parent: Optional[Any]=None self.trash_button.setCheckable(True) self.loading_label: Optional[IconLabel] = None - self.loading_movie = QtGui.QMovie(os.path.join(plottrPath, 'resource', 'gfx', "loading_gif.gif")) + self.loading_movie = QtGui.QMovie( + os.path.join(plottrPath, "resource", "gfx", "loading_gif.gif") + ) self.filter_worker: Optional[FilterWorker] = None self.filter_thread: Optional[QtCore.QThread] = None @@ -1857,7 +2047,9 @@ def on_filter_triggered(self, filter: str) -> None: :param filter: The string of the line edit. """ if self.loading_label is None: - self.loading_label = IconLabel(self.loading_movie, self.star_button.height()) + self.loading_label = IconLabel( + self.loading_movie, self.star_button.height() + ) self.filter_and_buttons_layout.insertWidget(1, self.loading_label) self.loading_label.start_animation() @@ -1884,7 +2076,9 @@ def on_filter_triggered(self, filter: str) -> None: self.filter_thread.start() @Slot(tuple) - def on_finished_filtering(self, filtering_results: Tuple[Dict[Path, Item], Dict[str, List[str]]]) -> None: + def on_finished_filtering( + self, filtering_results: Tuple[Dict[Path, Item], Dict[str, List[str]]] + ) -> None: """ Gets called when the FilterWorker is done filtering. Ends the loading animation and the thread and triggers the filtering in the proxy model. @@ -1904,7 +2098,11 @@ def on_finished_filtering(self, filtering_results: Tuple[Dict[Path, Item], Dict[ self.queries_dict = queries_dict items_list = [item for item in results_dict.values()] - self.proxy_model.filter_requested(list(items_list), self.star_button.isChecked(), self.trash_button.isChecked()) + self.proxy_model.filter_requested( + list(items_list), + self.star_button.isChecked(), + self.trash_button.isChecked(), + ) self.on_create_path_list() @@ -1925,7 +2123,7 @@ def on_new_item_created(self, item: Item) -> None: return for query_type, queries in self.queries_dict.items(): - if query_type == 'name': + if query_type == "name": for query in queries: match_pattern = re.compile(query, flags=re.IGNORECASE) if not match_pattern.search(str(item.path)): @@ -1951,8 +2149,13 @@ def on_existing_item_files_updated(self, item: Item) -> None: :param item: The item whose files changed. """ - should_item_show = FilterWorker.is_item_shown(item, self.filter_line_edit.text(), self.selected_tags, - self.star_button.isChecked(), self.trash_button.isChecked()) + should_item_show = FilterWorker.is_item_shown( + item, + self.filter_line_edit.text(), + self.selected_tags, + self.star_button.isChecked(), + self.trash_button.isChecked(), + ) if should_item_show: if not item in self.proxy_model.allowed_items: @@ -1963,7 +2166,9 @@ def on_existing_item_files_updated(self, item: Item) -> None: self.proxy_model.allowed_items.remove(item) self.proxy_model.trigger_filter() - def on_create_path_list(self, item_index: Optional[QtCore.QModelIndex] = None) -> None: + def on_create_path_list( + self, item_index: Optional[QtCore.QModelIndex] = None + ) -> None: """ Creates the path list for the copy button. The path list is a list of all the paths of the data files of the allowed items. If no item_index is passed, it will copy all the items that are currently shown by the view. @@ -1986,11 +2191,15 @@ def on_create_path_list(self, item_index: Optional[QtCore.QModelIndex] = None) - self.path_list.append(str(path)) if len(self.path_list) > 10: - self.copy_button.setToolTip('Copy the paths of the data files for the currently filtered items.') + self.copy_button.setToolTip( + "Copy the paths of the data files for the currently filtered items." + ) else: if len(self.path_list) == 1: self.path_list = "'" + str(self.path_list[0]) + "'" - self.copy_button.setToolTip('Copy the following paths to clipboard:\n' + str(self.path_list)) + self.copy_button.setToolTip( + "Copy the following paths to clipboard:\n" + str(self.path_list) + ) @Slot() def on_copy_button_clicked(self) -> None: @@ -2044,7 +2253,14 @@ class DataTreeWidget(QtWidgets.QTreeWidget): plot_requested = Signal(Path) # incoming_data: Dict[str, Union[Path, str, DataDict]] - def __init__(self, paths: List[Path], names: List[str], data: DataDict, *args: Any, **kwargs: Any): + def __init__( + self, + paths: List[Path], + names: List[str], + data: DataDict, + *args: Any, + **kwargs: Any, + ): super().__init__(*args, **kwargs) header_item = self.headerItem() @@ -2057,7 +2273,7 @@ def __init__(self, paths: List[Path], names: List[str], data: DataDict, *args: A self.data = data # Popup menu. - self.plot_popup_action = QtWidgets.QAction('Plot') + self.plot_popup_action = QtWidgets.QAction("Plot") self.popup_menu = QtWidgets.QMenu(self) self.plot_popup_action.triggered.connect(self.emit_plot_requested_signal) @@ -2073,25 +2289,31 @@ def set_data(self) -> None: """ for index, data in enumerate(self.data): - parent_tree_widget = DataTreeWidgetItem(self.paths[index], self, [self.names[index]]) + parent_tree_widget = DataTreeWidgetItem( + self.paths[index], self, [self.names[index]] + ) - data_parent = QtWidgets.QTreeWidgetItem(parent_tree_widget, ['Data']) - meta_parent = QtWidgets.QTreeWidgetItem(parent_tree_widget, ['Meta']) + data_parent = QtWidgets.QTreeWidgetItem(parent_tree_widget, ["Data"]) + meta_parent = QtWidgets.QTreeWidgetItem(parent_tree_widget, ["Meta"]) for name, value in data.data_items(): - column_content = [name, str(data.meta_val('shape', name))] + column_content = [name, str(data.meta_val("shape", name))] if name in data.dependents(): - column_content.append(f'Depends on {str(tuple(data.axes(name)))}') + column_content.append(f"Depends on {str(tuple(data.axes(name)))}") else: - column_content.append(f'Independent') + column_content.append(f"Independent") parameter_item = QtWidgets.QTreeWidgetItem(data_parent, column_content) for meta_name, meta_value in data.meta_items(name): - parameter_meta_item = QtWidgets.QTreeWidgetItem(parameter_item, [meta_name, str(meta_value)]) + parameter_meta_item = QtWidgets.QTreeWidgetItem( + parameter_item, [meta_name, str(meta_value)] + ) for name, value in data.meta_items(): - parameter_meta_item = QtWidgets.QTreeWidgetItem(meta_parent, [name, str(value)]) + parameter_meta_item = QtWidgets.QTreeWidgetItem( + meta_parent, [name, str(value)] + ) parent_tree_widget.setExpanded(True) data_parent.setExpanded(True) @@ -2138,7 +2360,7 @@ def sizeHint(self) -> QtCore.QSize: index = self.indexFromItem(it.value()) height += self.rowHeight(index) it += 1 # type: ignore[assignment, operator] # Taken from this example: -# https://riverbankcomputing.com/pipermail/pyqt/2014-May/034315.html + # https://riverbankcomputing.com/pipermail/pyqt/2014-May/034315.html # calculating width: width = 2 * self.frameWidth() @@ -2164,8 +2386,8 @@ class FloatingButtonWidget(QtWidgets.QPushButton): def __init__(self, parent: QtWidgets.QWidget): super().__init__(parent) self.padding_right = 5 - self.edit_text = 'Edit' - self.save_text = 'Save' + self.edit_text = "Edit" + self.save_text = "Save" # Start in save mode (True), since you cannot edit the text. Clicks the edit button to switch to edit mode and # vice versa. @@ -2178,7 +2400,7 @@ def update_position(self) -> None: """ parent = self.parent() assert isinstance(parent, QtWidgets.QWidget) - if hasattr(parent, 'viewport'): + if hasattr(parent, "viewport"): parent_rect = parent.viewport().rect() else: parent_rect = parent.rect() @@ -2221,7 +2443,9 @@ def __init__(self, path: Path, *args: Any, **kwargs: Any): super().__init__(*args, **kwargs) self.path = path - size_policy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Minimum, QtWidgets.QSizePolicy.Minimum) + size_policy = QtWidgets.QSizePolicy( + QtWidgets.QSizePolicy.Minimum, QtWidgets.QSizePolicy.Minimum + ) self.setSizePolicy(size_policy) try: @@ -2229,7 +2453,7 @@ def __init__(self, path: Path, *args: Any, **kwargs: Any): self.file_text = file.read() except FileNotFoundError as e: LOGGER.error(e) - self.file_text = 'Comment file could not load. Do not edit as this could rewrite the original comment.' + self.file_text = "Comment file could not load. Do not edit as this could rewrite the original comment." self.setReadOnly(True) self.setPlainText(self.file_text) document = QtGui.QTextDocument(self.file_text, parent=self) @@ -2304,7 +2528,7 @@ def save_activated(self) -> None: """ self.setReadOnly(True) try: - with open(self.path, 'w') as file: + with open(self.path, "w") as file: file.write(self.toPlainText()) except Exception as e: # Set text how it was before @@ -2312,7 +2536,7 @@ def save_activated(self) -> None: # Show the error message error_msg = QtWidgets.QMessageBox() error_msg.setText(f"{e}") - error_msg.setWindowTitle(f'Error trying to save markdown edit.') + error_msg.setWindowTitle(f"Error trying to save markdown edit.") error_msg.exec_() @Slot() @@ -2335,7 +2559,7 @@ def __init__(self, parent: QtWidgets.QWidget): super().__init__(parent) self.paddingLeft = 5 self.paddingTop = 5 - self.save_text = 'Save' + self.save_text = "Save" self.setText(self.save_text) @@ -2345,7 +2569,7 @@ def update_position(self) -> None: """ parent = self.parent() assert isinstance(parent, QtWidgets.QWidget) - if hasattr(parent, 'viewport'): + if hasattr(parent, "viewport"): parent_rect = parent.viewport().rect() else: parent_rect = parent.rect() @@ -2374,6 +2598,7 @@ class TextInput(QtWidgets.QTextEdit): :param path: The Path of the folder where the file should be saved. """ + def __init__(self, path: Path, *args: Any, **kwargs: Any): super().__init__(*args, **kwargs) self.path = path @@ -2400,31 +2625,37 @@ def create_md_file(self) -> None: t = time.localtime() time_str = time.strftime(TIMESTRFORMAT, t) - dialog_text, response = QtWidgets.QInputDialog.getText(self, "Input comment name", "Name:",) + dialog_text, response = QtWidgets.QInputDialog.getText( + self, + "Input comment name", + "Name:", + ) if response: - if dialog_text[-3:] != '.md': - if dialog_text == '': - dialog_text = time_str + '.md' + if dialog_text[-3:] != ".md": + if dialog_text == "": + dialog_text = time_str + ".md" else: - dialog_text = time_str + '_' + dialog_text + '.md' + dialog_text = time_str + "_" + dialog_text + ".md" try: comment_path = self.path.joinpath(dialog_text) if not comment_path.is_file(): - with open(comment_path, 'w') as file: + with open(comment_path, "w") as file: file.write(current_text) - self.setText('') + self.setText("") else: error_msg = QtWidgets.QMessageBox() - error_msg.setText(f"File: {comment_path} already exists, please select a different file name.") - error_msg.setWindowTitle(f'Error trying to save comment.') + error_msg.setText( + f"File: {comment_path} already exists, please select a different file name." + ) + error_msg.setWindowTitle(f"Error trying to save comment.") error_msg.exec_() except Exception as e: # Show the error message error_msg = QtWidgets.QMessageBox() error_msg.setText(f"{e}") - error_msg.setWindowTitle(f'Error trying to save comment.') + error_msg.setWindowTitle(f"Error trying to save comment.") error_msg.exec_() def resizeEvent(self, event: QtGui.QResizeEvent) -> None: @@ -2469,6 +2700,7 @@ class ImageViewer(QtWidgets.QLabel): :param path_file: The path of the image. """ + def __init__(self, path_file: Path, *args: Any, **kwargs: Any): super().__init__(*args, **kwargs) self.path = path_file @@ -2487,13 +2719,13 @@ def __init__(self, path_file: Path, *args: Any, **kwargs: Any): # except Exception as e: except FileNotFoundError as e: - self.setText(f'Image could not be displayed') + self.setText(f"Image could not be displayed") LOGGER.error(e) self.context_menu = QtWidgets.QMenu(self) # creating actions - self.copy_action = QtWidgets.QAction('copy') + self.copy_action = QtWidgets.QAction("copy") self.copy_action.triggered.connect(self.on_copy_action) self.setContextMenuPolicy(QtCore.Qt.CustomContextMenu) @@ -2531,9 +2763,11 @@ def eventFilter(self, a0: QtCore.QObject, a1: QtCore.QEvent) -> bool: parent = self.parent() assert isinstance(parent, QtWidgets.QWidget) parent_size = parent.size() - scaled_pixmap = QtGui.QPixmap.fromImage(self.image.copy(QtCore.QRect())).scaled(parent_size.width(), - parent_size.height(), - QtCore.Qt.KeepAspectRatio) + scaled_pixmap = QtGui.QPixmap.fromImage( + self.image.copy(QtCore.QRect()) + ).scaled( + parent_size.width(), parent_size.height(), QtCore.Qt.KeepAspectRatio + ) # If a resizing event happen, only update the pixmap if the size of the pixmap changed. if self.old_pixmap.size() != scaled_pixmap.size(): # Check if the new image is bigger than the original picture size. If it is don't show it. @@ -2555,6 +2789,7 @@ class VerticalScrollArea(QtWidgets.QScrollArea): """ Custom QScrollArea. Allows for only vertical scroll instead of vertical and horizontal. """ + def __init__(self, *args: Any, **kwargs: Any): super().__init__(*args, **kwargs) self.first_scroll = False @@ -2592,16 +2827,18 @@ class TagLabel(QtWidgets.QWidget): :param tree_item: Indicates if this widget is used on the right side of the app or in the treeWidget. """ - def __init__(self, tags: List[str], tree_item: bool = False, *args: Any, **kwargs: Any): + def __init__( + self, tags: List[str], tree_item: bool = False, *args: Any, **kwargs: Any + ): super().__init__(*args, **kwargs) self.tags = tags self.html_tags: List[str] = [] self.tree_item = tree_item - self.tags_str = '' + self.tags_str = "" if not tags: - self.tags_str = 'No labels present.' + self.tags_str = "No labels present." else: self.generate_tag_string() @@ -2612,7 +2849,7 @@ def __init__(self, tags: List[str], tree_item: bool = False, *args: Any, **kwarg if not self.tree_item: self.tags_label.setWordWrap(True) - self.header_label = QtWidgets.QLabel('This is tagged by:', parent=self) + self.header_label = QtWidgets.QLabel("This is tagged by:", parent=self) self.layout_.addWidget(self.header_label) self.tags_label.setIndent(30) @@ -2620,7 +2857,9 @@ def __init__(self, tags: List[str], tree_item: bool = False, *args: Any, **kwarg self.setLayout(self.layout_) - size_policy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Minimum, QtWidgets.QSizePolicy.Minimum) + size_policy = QtWidgets.QSizePolicy( + QtWidgets.QSizePolicy.Minimum, QtWidgets.QSizePolicy.Minimum + ) self.setSizePolicy(size_policy) def add_tag(self, tag: str) -> None: @@ -2651,20 +2890,20 @@ def generate_tag_string(self) -> None: """ Converts the list of tags into the html formatted string. """ - self.tags_str = '' + self.tags_str = "" self.html_tags = [] color_generator = html_color_generator() # Add every tag followed by a coma, except the last item. for i in range(len(self.tags) - 1): - html_str = f'{self.tags[i]}, ' + html_str = f"{self.tags[i]}, " self.html_tags.append(html_str) # Last item is followed by a dot instead of a coma. - html_str = f'{self.tags[-1]}.' + html_str = f"{self.tags[-1]}." self.html_tags.append(html_str) - self.tags_str = ''.join(self.html_tags) + self.tags_str = "".join(self.html_tags) class ItemTagLabel(QtWidgets.QLabel): @@ -2709,7 +2948,7 @@ def generate_tag_string(self) -> None: """ Converts the list of tags into the html formatted string. """ - self.tags_str = '' + self.tags_str = "" self.html_tags = [] if self.tags: @@ -2717,14 +2956,16 @@ def generate_tag_string(self) -> None: # Add every tag followed by a coma, except the last item. for i in range(len(self.tags) - 1): - html_str = f'{self.tags[i]}, ' + html_str = ( + f"{self.tags[i]}, " + ) self.html_tags.append(html_str) # Last item is followed by a dot instead of a coma. - html_str = f'{self.tags[-1]}.' + html_str = f"{self.tags[-1]}." self.html_tags.append(html_str) - self.tags_str = ''.join(self.html_tags) + self.tags_str = "".join(self.html_tags) class TagCreator(QtWidgets.QLineEdit): @@ -2738,7 +2979,7 @@ def __init__(self, current_folder_path: Path, *args: Any, **kwargs: Any): super().__init__(*args, **kwargs) self.current_folder_path = current_folder_path - self.setPlaceholderText('Create new tags') + self.setPlaceholderText("Create new tags") self.returnPressed.connect(self.create_new_tags) @@ -2750,21 +2991,26 @@ def create_new_tags(self) -> None: """ text = self.text() - raw_text = text.split(',') - text_with_empty_spaces = [item[1:] if len(item) >= 1 and item[0] == " " else item for item in raw_text] - new_tags = [item for item in text_with_empty_spaces if item != '' and item != ' '] + raw_text = text.split(",") + text_with_empty_spaces = [ + item[1:] if len(item) >= 1 and item[0] == " " else item for item in raw_text + ] + new_tags = [ + item for item in text_with_empty_spaces if item != "" and item != " " + ] for tag in new_tags: - tag_path = self.current_folder_path.joinpath(f'{tag}.tag') + tag_path = self.current_folder_path.joinpath(f"{tag}.tag") if not tag_path.exists(): - f = open(tag_path, 'x') + f = open(tag_path, "x") - self.setText('') + self.setText("") class IconLabel(QtWidgets.QLabel): - - def __init__(self, movie: QtGui.QMovie, size: Optional[int] = None, *args: Any, **kwargs: Any): + def __init__( + self, movie: QtGui.QMovie, size: Optional[int] = None, *args: Any, **kwargs: Any + ): """ Label used to display loading animations. @@ -2804,8 +3050,9 @@ def run(self, item: Item, only_data_files: bool = False) -> None: if data is not None: self.finished.emit(data) - def gather_all_right_side_window_data(self, item: Item, only_data_files: bool = False) -> \ - Optional[dict]: + def gather_all_right_side_window_data( + self, item: Item, only_data_files: bool = False + ) -> Optional[dict]: """ Method used to create a dictionary with all the necessary information (file names, paths, etc.) of an item of the model to create the right side window. This function will also go through all the children @@ -2820,13 +3067,13 @@ def gather_all_right_side_window_data(self, item: Item, only_data_files: bool = 'data': [DataDict]}, 'extra_files': [(Path, str, ContentType)]} """ - data = {'tag_labels': [], - 'data_files': {'paths': [], - 'names': [], - 'data': []}, - 'extra_files': []} + data = { + "tag_labels": [], + "data_files": {"paths": [], "names": [], "data": []}, + "extra_files": [], + } - data_ret = self._fill_dict(data, item.files, '', only_data_files) + data_ret = self._fill_dict(data, item.files, "", only_data_files) if data_ret is None: return None data = data_ret @@ -2843,11 +3090,18 @@ def gather_all_right_side_window_data(self, item: Item, only_data_files: bool = data = data_ret # Sort the files so that they appear in reverse alphabetical order. - data['extra_files'] = sorted(data['extra_files'], key=lambda x: str.lower(x[1]), reverse=True) + data["extra_files"] = sorted( + data["extra_files"], key=lambda x: str.lower(x[1]), reverse=True + ) return data - def _fill_dict(self, data_in: Optional[dict], files_dict: Dict[Path, ContentType], prefix_text: str, - only_data_files: bool = False) -> Optional[dict]: + def _fill_dict( + self, + data_in: Optional[dict], + files_dict: Dict[Path, ContentType], + prefix_text: str, + only_data_files: bool = False, + ) -> Optional[dict]: """ Helper method for gather_all_right_side_window_data. Fills in the data dictionary with the files inside of files_dict and adds prefix text to all tittles. @@ -2866,23 +3120,35 @@ def _fill_dict(self, data_in: Optional[dict], files_dict: Dict[Path, ContentType # There might be an error with the ddh5 trying to be loaded. try: data_dict = datadict_from_hdf5(str(file), structure_only=True) - data_in['data_files']['data'].append(data_dict) - data_in['data_files']['paths'].append(file) - data_in['data_files']['names'].append(prefix_text + str(file.stem)) + data_in["data_files"]["data"].append(data_dict) + data_in["data_files"]["paths"].append(file) + data_in["data_files"]["names"].append(prefix_text + str(file.stem)) except Exception as e: - LOGGER.error(f'Failed to load the data file: {file} \n {e}') + LOGGER.error(f"Failed to load the data file: {file} \n {e}") if not only_data_files: if file_type == ContentType.tag: - data_in['tag_labels'].append(prefix_text + str(file.stem)) - elif file_type in [ContentType.json, ContentType.md, ContentType.py, ContentType.image]: + data_in["tag_labels"].append(prefix_text + str(file.stem)) + elif file_type in [ + ContentType.json, + ContentType.md, + ContentType.py, + ContentType.image, + ]: # Check if the files exist. if file.is_file(): - data_in['extra_files'].append((file, prefix_text + str(file.name), file_type)) + data_in["extra_files"].append( + (file, prefix_text + str(file.name), file_type) + ) return data_in - def _check_children_data(self, child_item: Item, data_in: Optional[dict], deepness: int, - only_data_files: bool = False) -> Optional[dict]: + def _check_children_data( + self, + child_item: Item, + data_in: Optional[dict], + deepness: int, + only_data_files: bool = False, + ) -> Optional[dict]: """ Helper function for gather_all_right_side_window_data. Fills the data_in dictionary with the files of child_item and all of its children. Returns the filled dictionary with the information of child_item and all @@ -2896,27 +3162,32 @@ def _check_children_data(self, child_item: Item, data_in: Optional[dict], deepne """ child_path = child_item.path - prefix_text = '' + prefix_text = "" # Make the prefix text. Should be all the parent folders until the original parent item. for i in range(deepness): - prefix_text = child_path.parts[-i - 1] + '/' + prefix_text + prefix_text = child_path.parts[-i - 1] + "/" + prefix_text - data_in = self._fill_dict(data_in, child_item.files, prefix_text, only_data_files) + data_in = self._fill_dict( + data_in, child_item.files, prefix_text, only_data_files + ) for i in range(child_item.rowCount()): if self.thread().isInterruptionRequested(): return None child = child_item.child(i, 0) assert isinstance(child, Item) - data_in = self._check_children_data(child, data_in, deepness + 1, only_data_files) + data_in = self._check_children_data( + child, data_in, deepness + 1, only_data_files + ) return data_in # TODO: Instead of saving the currently selected folder, save the currently and previously selected item. class Monitr(QtWidgets.QMainWindow): - def __init__(self, monitorPath: str = '.', - parent: Optional[QtWidgets.QMainWindow] = None): + def __init__( + self, monitorPath: str = ".", parent: Optional[QtWidgets.QMainWindow] = None + ): super().__init__(parent=parent) # Instantiating variables. @@ -2924,9 +3195,11 @@ def __init__(self, monitorPath: str = '.', self.current_selected_folder = Path() self.previous_selected_folder = Path() self.collapsed_state_dictionary: Dict[Path, bool] = {} - self.setWindowTitle('Monitr') + self.setWindowTitle("Monitr") - self.app_manager = AppManager() # Currently Ids only increase with every new app. + self.app_manager = ( + AppManager() + ) # Currently Ids only increase with every new app. self.current_app_id = 0 self.model = FileModel(self.monitor_path, 0, 2) @@ -2943,10 +3216,13 @@ def __init__(self, monitorPath: str = '.', menu_bar = self.menuBar() menu = menu_bar.addMenu("Backend") self.backend_group = QtWidgets.QActionGroup(menu) - for backend, plotWidgetClass in [("matplotlib", MPLAutoPlot), ("pyqtgraph", PGAutoPlot)]: + for backend, plotWidgetClass in [ + ("matplotlib", MPLAutoPlot), + ("pyqtgraph", PGAutoPlot), + ]: action = QtWidgets.QAction(backend) action.setCheckable(True) - action.setChecked(getcfg('main', 'default-plotwidget') == plotWidgetClass) + action.setChecked(getcfg("main", "default-plotwidget") == plotWidgetClass) self.backend_group.addAction(action) menu.addAction(action) @@ -2955,17 +3231,22 @@ def __init__(self, monitorPath: str = '.', self.left_side_dummy_widget = QtWidgets.QWidget() self.left_side_dummy_widget.setLayout(self.left_side_layout) - left_side_dummy_size_policy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Preferred, - QtWidgets.QSizePolicy.Preferred) + left_side_dummy_size_policy = QtWidgets.QSizePolicy( + QtWidgets.QSizePolicy.Preferred, QtWidgets.QSizePolicy.Preferred + ) left_side_dummy_size_policy.setHorizontalStretch(1) left_side_dummy_size_policy.setVerticalStretch(0) self.left_side_dummy_widget.setSizePolicy(left_side_dummy_size_policy) # Load left side layout - self.file_explorer = FileExplorer(proxy_model=self.proxy_model, parent=self.left_side_dummy_widget) + self.file_explorer = FileExplorer( + proxy_model=self.proxy_model, parent=self.left_side_dummy_widget + ) self.left_side_layout.addWidget(self.file_explorer) - self.file_explorer.file_tree.selection_changed.connect(self.on_current_item_selection_changed) + self.file_explorer.file_tree.selection_changed.connect( + self.on_current_item_selection_changed + ) # Right side items self.right_side_dummy_widget = QtWidgets.QWidget() @@ -2981,7 +3262,9 @@ def __init__(self, monitorPath: str = '.', self.invalid_data_label: Optional[QtWidgets.QLabel] = None self.header_label: Optional[QtWidgets.QLabel] = None self.loading_label: Optional[IconLabel] = None - self.loading_movie = QtGui.QMovie(os.path.join(plottrPath, 'resource', 'gfx', "loading_gif.gif")) + self.loading_movie = QtGui.QMovie( + os.path.join(plottrPath, "resource", "gfx", "loading_gif.gif") + ) self.last_data_window_update_time = time.time() # Sets the minimum time between updates of the right data_window. @@ -3005,7 +3288,6 @@ def __init__(self, monitorPath: str = '.', # self.extra_action_button.clicked.connect(self.extra_action) # self.left_side_layout.addLayout(self.debug_layout) - self.main_partition_splitter.addWidget(self.left_side_dummy_widget) # Threading stuff @@ -3027,14 +3309,16 @@ def create_inner_dictionary(item: Item) -> Dict: child_dictionary = create_inner_dictionary(child) step_dictionary[child.path.name] = child_dictionary - step_dictionary['files'] = item.files - step_dictionary['star'] = item.star - step_dictionary['trash'] = item.trash + step_dictionary["files"] = item.files + step_dictionary["star"] = item.star + step_dictionary["trash"] = item.trash return step_dictionary - print('==================================================================================') + print( + "==================================================================================" + ) n_rows = self.model.rowCount() - print(f'The model has {n_rows} rows') + print(f"The model has {n_rows} rows") printable_dict = {} for i in range(n_rows): @@ -3043,18 +3327,21 @@ def create_inner_dictionary(item: Item) -> Dict: item_dictionary = create_inner_dictionary(main_item) assert isinstance(main_item.path, Path) printable_dict[main_item.path.name] = item_dictionary - print(f'here comes the dictionary') + print(f"here comes the dictionary") pprint.pprint(printable_dict) - def print_model_main_dictionary(self) -> None: """ Debug function. Prints the main dictionary. """ - print('---------------------------------------------------------------------------------') - print(f'Here comes the model main dictionary') + print( + "---------------------------------------------------------------------------------" + ) + print(f"Here comes the model main dictionary") pprint.pprint(self.model.main_dictionary) - print(f'the length of the items in the main dictionary is: {len(self.model.main_dictionary)}') + print( + f"the length of the items in the main dictionary is: {len(self.model.main_dictionary)}" + ) def extra_action(self) -> None: """ @@ -3064,7 +3351,9 @@ def extra_action(self) -> None: # print(f'\n \n \n \n \n \n \n \n \n \n \n \n .') @Slot(QtCore.QModelIndex, QtCore.QModelIndex) - def on_current_item_selection_changed(self, current: QtCore.QModelIndex, previous: QtCore.QModelIndex) -> None: + def on_current_item_selection_changed( + self, current: QtCore.QModelIndex, previous: QtCore.QModelIndex + ) -> None: """ Gets called everytime the selected item gets changed. Converts the model index from the proxy sorting model, into an index from self.model and gets the current and previous item. Triggers the right side window creation. @@ -3092,7 +3381,9 @@ def on_current_item_selection_changed(self, current: QtCore.QModelIndex, previou if current_item != previous_item: assert isinstance(current_item, Item) self.current_selected_folder = current_item.path - self.model.update_currently_selected_folder(self.current_selected_folder) + self.model.update_currently_selected_folder( + self.current_selected_folder + ) # The first time the user clicks on a folder, the previous item is None. assert isinstance(previous_item, Item) self.previous_selected_folder = previous_item.path @@ -3106,7 +3397,6 @@ def generate_right_side_window(self) -> None: """ # Check that the folder passed is a dataset. if self.current_selected_folder in self.model.main_dictionary: - # If it's the first time, create the right side scroll area and add it to the splitter. if self.scroll_area is None: self.scroll_area = VerticalScrollArea() @@ -3130,7 +3420,10 @@ def generate_right_side_window(self) -> None: self.loader_thread = QtCore.QThread(self) self.loader_worker = LoaderWorker() self.loader_worker.moveToThread(self.loader_thread) - run_fun = partial(self.loader_worker.run, self.model.main_dictionary[self.current_selected_folder]) + run_fun = partial( + self.loader_worker.run, + self.model.main_dictionary[self.current_selected_folder], + ) self.loader_thread.started.connect(run_fun) self.loader_worker.finished.connect(self.populate_right_side_window) self.loader_thread.start() @@ -3162,10 +3455,10 @@ def populate_right_side_window(self, files_meta: dict) -> None: self.loader_thread = None self.add_folder_header() - self.add_tag_label(files_meta['tag_labels']) - self.add_data_window(files_meta['data_files']) + self.add_tag_label(files_meta["tag_labels"]) + self.add_data_window(files_meta["data_files"]) self.add_text_input(self.current_selected_folder) - self.add_all_files(files_meta['extra_files']) + self.add_all_files(files_meta["extra_files"]) # Sets the stretch factor so when the main window expands, the files get the extra real-state instead # of the file tree @@ -3188,7 +3481,9 @@ def clear_right_layout(self) -> None: if self.previous_selected_folder in self.model.main_dictionary: bar = self.scroll_area.verticalScrollBar() if bar is not None: - previous_item = self.model.main_dictionary[self.previous_selected_folder] + previous_item = self.model.main_dictionary[ + self.previous_selected_folder + ] previous_item.scroll_height = bar.value() if self.header_label is not None: @@ -3223,8 +3518,11 @@ def clear_right_layout(self) -> None: if len(self.file_windows) >= 1: # Save the collapsed state before deleting them. - current_collapsed_state = {window.widget.path: window.btn.isChecked() for window in self.file_windows if - hasattr(window.widget, 'path')} + current_collapsed_state = { + window.widget.path: window.btn.isChecked() + for window in self.file_windows + if hasattr(window.widget, "path") + } self.collapsed_state_dictionary.update(current_collapsed_state) @@ -3239,7 +3537,7 @@ def add_folder_header(self) -> None: """ self.header_label = QtWidgets.QLabel(parent=self.right_side_dummy_widget) self.header_label.setWordWrap(True) - text = f'

{self.current_selected_folder.name.replace("_", " ")}

' + text = f"

{self.current_selected_folder.name.replace('_', ' ')}

" self.header_label.setText(text) self.right_side_layout.addWidget(self.header_label) @@ -3277,17 +3575,23 @@ def add_data_window(self, data_files: Dict) -> None: 'values': array([]) ... """ # Checks that there is data to display, if not just create a Qlabel indicating that there is no valid data. - if len(data_files['data']) < 1: - self.invalid_data_label = QtWidgets.QLabel(f'No data to display.') - self.right_side_layout.addWidget(self.invalid_data_label) - return + if len(data_files["data"]) < 1: + self.invalid_data_label = QtWidgets.QLabel(f"No data to display.") + self.right_side_layout.addWidget(self.invalid_data_label) + return - self.data_window = Collapsible(DataTreeWidget(data_files['paths'], data_files['names'], data_files['data']), - 'Data Display') + self.data_window = Collapsible( + DataTreeWidget( + data_files["paths"], data_files["names"], data_files["data"] + ), + "Data Display", + ) assert isinstance(self.data_window.widget, DataTreeWidget) self.data_window.widget.plot_requested.connect(self.on_plot_data) - size_policy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Expanding) + size_policy = QtWidgets.QSizePolicy( + QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Expanding + ) self.data_window.setSizePolicy(size_policy) self.right_side_layout.addWidget(self.data_window) @@ -3305,7 +3609,14 @@ def on_plot_data(self, path: Path) -> None: backend = "default" else: backend = self.backend_group.checkedAction().text() - self.app_manager.launchApp(self.current_app_id, AUTOPLOTMODULE, AUTOPLOTFUNC, str(path), 'data', backend) + self.app_manager.launchApp( + self.current_app_id, + AUTOPLOTMODULE, + AUTOPLOTFUNC, + str(path), + "data", + backend, + ) self.current_app_id += 1 def add_text_input(self, path: Path) -> None: @@ -3314,7 +3625,7 @@ def add_text_input(self, path: Path) -> None: :param path: The path of the folder being selected """ - self.text_input = Collapsible(TextInput(path), title='Add Comment:') + self.text_input = Collapsible(TextInput(path), title="Add Comment:") self.right_side_layout.addWidget(self.text_input) def add_all_files(self, files_data: List[Tuple[Path, str, ContentType]]) -> None: @@ -3326,12 +3637,15 @@ def add_all_files(self, files_data: List[Tuple[Path, str, ContentType]]) -> None """ for file, name, file_type in files_data: if file_type == ContentType.json: - expand = False if file in self.collapsed_state_dictionary: expand = self.collapsed_state_dictionary[file] - json_view = Collapsible(widget=JsonTreeView(path=file), title=name, expanding=expand, - icon=get_json_icon()) + json_view = Collapsible( + widget=JsonTreeView(path=file), + title=name, + expanding=expand, + icon=get_json_icon(), + ) json_view.widget.setVisible(expand) json_view.btn.setChecked(expand) if expand: @@ -3356,8 +3670,12 @@ def add_all_files(self, files_data: List[Tuple[Path, str, ContentType]]) -> None expand = True if file in self.collapsed_state_dictionary: expand = self.collapsed_state_dictionary[file] - plain_text_edit = Collapsible(widget=TextEditWidget(path=file), - title=name, expanding=expand, icon=get_md_icon()) + plain_text_edit = Collapsible( + widget=TextEditWidget(path=file), + title=name, + expanding=expand, + icon=get_md_icon(), + ) plain_text_edit.widget.setVisible(expand) plain_text_edit.btn.setChecked(expand) @@ -3373,8 +3691,12 @@ def add_all_files(self, files_data: List[Tuple[Path, str, ContentType]]) -> None expand = True if file in self.collapsed_state_dictionary: expand = self.collapsed_state_dictionary[file] - plain_text_edit = Collapsible(widget=TextViewWidget(path=file), - title=name, expanding=expand, icon=get_md_icon()) + plain_text_edit = Collapsible( + widget=TextViewWidget(path=file), + title=name, + expanding=expand, + icon=get_md_icon(), + ) plain_text_edit.widget.setVisible(expand) plain_text_edit.btn.setChecked(expand) @@ -3390,8 +3712,12 @@ def add_all_files(self, files_data: List[Tuple[Path, str, ContentType]]) -> None expand = True if file in self.collapsed_state_dictionary: expand = self.collapsed_state_dictionary[file] - label = Collapsible(ImageViewer(file, parent=self.right_side_dummy_widget), - title=name, expanding=expand, icon=get_img_icon()) + label = Collapsible( + ImageViewer(file, parent=self.right_side_dummy_widget), + title=name, + expanding=expand, + icon=get_img_icon(), + ) label.widget.setVisible(expand) label.btn.setChecked(expand) if expand: @@ -3426,17 +3752,25 @@ def on_update_data_widget(self, path: Path) -> None: :param path: The path of the data file that should be updated. """ current_time = time.time() - if current_time - self.last_data_window_update_time > self.data_widget_update_buffer: - if _is_relative_to(path, self.current_selected_folder) and path.parent in self.model.main_dictionary: + if ( + current_time - self.last_data_window_update_time + > self.data_widget_update_buffer + ): + if ( + _is_relative_to(path, self.current_selected_folder) + and path.parent in self.model.main_dictionary + ): # Always gather the data for the currently selected folder, since a child item might need the update # but the currently selected item with all of its childs should be shown. item = self.model.main_dictionary[self.current_selected_folder] loader_worker = LoaderWorker() data_dicts = loader_worker.gather_all_right_side_window_data(item, True) assert data_dicts is not None - data_window_widget = DataTreeWidget(data_dicts['data_files']['paths'], - data_dicts['data_files']['names'], - data_dicts['data_files']['data']) + data_window_widget = DataTreeWidget( + data_dicts["data_files"]["paths"], + data_dicts["data_files"]["names"], + data_dicts["data_files"]["data"], + ) if self.data_window is not None: self.data_window.restart_widget(data_window_widget) data_window_widget.plot_requested.connect(self.on_plot_data) @@ -3445,7 +3779,10 @@ def on_update_data_widget(self, path: Path) -> None: if not self.active_timer: self.data_file_need_update = path self.active_timer = True - QtCore.QTimer.singleShot(round(self.data_widget_update_buffer * 1e3), self.on_data_window_timer) + QtCore.QTimer.singleShot( + round(self.data_widget_update_buffer * 1e3), + self.on_data_window_timer, + ) @Slot() def on_data_window_timer(self) -> None: @@ -3466,16 +3803,20 @@ def closeEvent(self, a0: QtGui.QCloseEvent) -> None: def script() -> int: - parser = argparse.ArgumentParser(description='Monitr main application') + parser = argparse.ArgumentParser(description="Monitr main application") parser.add_argument("path", help="path to monitor for data", default=None) - parser.add_argument("-r", "--refresh_interval", default=2, type=float, - help="interval at which to look for changes in the " - "monitored path (in seconds)") + parser.add_argument( + "-r", + "--refresh_interval", + default=2, + type=float, + help="interval at which to look for changes in the monitored path (in seconds)", + ) args = parser.parse_args() path = os.path.abspath(args.path) if not (os.path.exists(path) and os.path.isdir(path)): - print('Invalid path.') + print("Invalid path.") sys.exit() app = QtWidgets.QApplication([])