diff --git a/plottr/apps/monitr.py b/plottr/apps/monitr.py
index ee9bc6f6..b6264177 100644
--- a/plottr/apps/monitr.py
+++ b/plottr/apps/monitr.py
@@ -1,77 +1,83 @@
-""" plottr.monitr -- a GUI tool for monitoring data files.
-"""
-import copy
-import sys
-import os
+"""plottr.monitr -- a GUI tool for monitoring data files."""
+
import argparse
-import time
+import copy
import importlib
-
-# Uncomment the next 2 lines if the app suddenly crash with no error.
-# import cgitb
-# cgitb.enable(format = 'text')
-
+import json
import logging
-import re
+import os
import pprint
-import json
+import re
+import sys
+import time
from enum import Enum, auto
-from pathlib import Path
+from functools import partial
+from itertools import cycle
from multiprocessing import Process
+from pathlib import Path
from typing import (
- List,
- Optional,
- Dict,
Any,
- Union,
+ Dict,
Generator,
Iterable,
- Tuple,
+ List,
+ Optional,
Sequence,
+ Tuple,
+ Union,
cast,
)
-from functools import partial
-from itertools import cycle
from watchdog.events import FileSystemEvent, FileSystemMovedEvent
-from .. import log as plottrlog
-from .. import QtCore, QtWidgets, Signal, Slot, QtGui, plottrPath
+from .. import QtCore, QtGui, QtWidgets, Signal, Slot
from .. import config_entry as getcfg
+from .. import log as plottrlog
+from .. import plottrPath
+from ..apps.watchdog_classes import WatcherClient
+from ..data.datadict import DataDict
+from ..data.datadict_storage import all_datadicts_from_hdf5, datadict_from_hdf5
+from ..gui.widgets import Collapsible
+from ..icons import get_completeIcon as get_complete_icon
+from ..icons import get_imageIcon as get_img_icon
+from ..icons import get_interruptedIcon as get_interrupted_icon
+from ..icons import get_jsonIcon as get_json_icon
+from ..icons import get_mdIcon as get_md_icon
+from ..icons import get_starIcon as get_star_icon
+from ..icons import get_trashIcon as get_trash_icon
from ..plot.mpl.autoplot import AutoPlot as MPLAutoPlot
from ..plot.pyqtgraph.autoplot import AutoPlot as PGAutoPlot
-from ..data.datadict_storage import all_datadicts_from_hdf5, datadict_from_hdf5
-from ..data.datadict import DataDict
from ..utils.misc import unwrap_optional
-from ..apps.watchdog_classes import WatcherClient
-from ..gui.widgets import Collapsible
-from .json_viewer import JsonModel, JsonTreeView
-from ..icons import get_starIcon as get_star_icon, get_trashIcon as get_trash_icon, get_completeIcon as get_complete_icon, get_interruptedIcon as get_interrupted_icon, get_imageIcon as get_img_icon, get_jsonIcon as get_json_icon, get_mdIcon as get_md_icon
from .appmanager import AppManager
+from .json_viewer import JsonModel, JsonTreeView
+
+# Uncomment the next 2 lines if the app suddenly crash with no error.
+# import cgitb
+# cgitb.enable(format = 'text')
+
TIMESTRFORMAT = "%Y-%m-%dT%H%M%S"
# Change this variable to change the module of the app that monitr should open.
-AUTOPLOTMODULE = 'plottr.apps.autoplot'
+AUTOPLOTMODULE = "plottr.apps.autoplot"
# Function that the app manager should run to open a new app.
-AUTOPLOTFUNC = 'autoplotDDH5App'
+AUTOPLOTFUNC = "autoplotDDH5App"
-
-LOGGER = logging.getLogger('plottr.apps.monitr')
+LOGGER = logging.getLogger("plottr.apps.monitr")
def html_color_generator() -> Generator[str, None, None]:
"""
Generator that cycles through string colors for use in html code.
"""
- colors = ['red', 'blue', 'green', 'purple', 'orange', 'brown', 'magenta']
+ colors = ["red", "blue", "green", "purple", "orange", "brown", "magenta"]
for color in cycle(colors):
yield color
def is_file_lock(path: Path) -> bool:
- if path.name[0] == '~' and path.suffix == '.lock':
+ if path.name[0] == "~" and path.suffix == ".lock":
return True
return False
@@ -93,6 +99,7 @@ class ContentType(Enum):
Enum class for the types of files that are of interest in the monitored subdirectories. Contains helper methods to
sort files and assign colors to each file type.
"""
+
data = auto()
tag = auto()
json = auto()
@@ -112,17 +119,22 @@ def sort(cls, file: Optional[Union[str, Path]] = None) -> "ContentType":
if not isinstance(file, str):
file = str(file)
extension = file.split(".")[-1].lower()
- if extension == 'ddh5':
+ if extension == "ddh5":
return ContentType.data
- elif extension == 'tag':
+ elif extension == "tag":
return ContentType.tag
- elif extension == 'json':
+ elif extension == "json":
return ContentType.json
- elif extension == 'md':
+ elif extension == "md":
return ContentType.md
- elif extension == 'py':
+ elif extension == "py":
return ContentType.py
- elif extension == 'jpg' or extension == 'jpeg' or extension == 'png' or extension == 'image':
+ elif (
+ extension == "jpg"
+ or extension == "jpeg"
+ or extension == "png"
+ or extension == "image"
+ ):
return ContentType.image
else:
return ContentType.unknown
@@ -133,18 +145,18 @@ def sort_Qcolor(cls, item: Optional["ContentType"] = None) -> QtGui.QBrush:
Returns the Qt color for the specified ContentType
"""
if item == ContentType.data:
- return QtGui.QBrush(QtGui.QColor('red'))
+ return QtGui.QBrush(QtGui.QColor("red"))
if item == ContentType.tag:
- return QtGui.QBrush(QtGui.QColor('blue'))
+ return QtGui.QBrush(QtGui.QColor("blue"))
if item == ContentType.json:
- return QtGui.QBrush(QtGui.QColor('green'))
+ return QtGui.QBrush(QtGui.QColor("green"))
- return QtGui.QBrush(QtGui.QColor('black'))
+ return QtGui.QBrush(QtGui.QColor("black"))
class SupportedDataTypes:
- valid_types = ['.ddh5', '.md', '.json', '.py']
+ valid_types = [".ddh5", ".md", ".json", ".py"]
@classmethod
def check_valid_data(cls, file_names: Sequence[Union[str, Path]]) -> bool:
@@ -189,41 +201,47 @@ def __init__(self, path: Path, files: Dict[Path, ContentType] = {}):
self.show = True
if files is not None:
self.files.update(files)
- self.tags = [file.stem for file, file_type in self.files.items() if file_type == ContentType.tag]
-
- if '__star__' in self.tags and '__trash__' in self.tags:
- star_path = self.path.joinpath('__star__.tag')
- trash_path = self.path.joinpath('__trash__.tag')
+ self.tags = [
+ file.stem
+ for file, file_type in self.files.items()
+ if file_type == ContentType.tag
+ ]
+
+ if "__star__" in self.tags and "__trash__" in self.tags:
+ star_path = self.path.joinpath("__star__.tag")
+ trash_path = self.path.joinpath("__trash__.tag")
if star_path.is_file() and trash_path.is_file():
LOGGER.error(
- f'The folder: {self.path} contains both the star and trash tag. Both tags will be deleted.')
+ f"The folder: {self.path} contains both the star and trash tag. Both tags will be deleted."
+ )
star_path.unlink()
trash_path.unlink()
- self.tags.remove('__star__')
- self.tags.remove('__trash__')
- elif '__star__' in self.tags:
+ self.tags.remove("__star__")
+ self.tags.remove("__trash__")
+ elif "__star__" in self.tags:
self.star = True
- self.tags.remove('__star__')
- elif '__trash__' in self.tags:
+ self.tags.remove("__star__")
+ elif "__trash__" in self.tags:
self.trash = True
- self.tags.remove('__trash__')
+ self.tags.remove("__trash__")
- if '__complete__' in self.tags and '__interrupted__' in self.tags:
- complete_path = self.path.joinpath('__complete__.tag')
- interrupted_path = self.path.joinpath('__interrupted__.tag')
+ if "__complete__" in self.tags and "__interrupted__" in self.tags:
+ complete_path = self.path.joinpath("__complete__.tag")
+ interrupted_path = self.path.joinpath("__interrupted__.tag")
if complete_path.is_file() and interrupted_path.is_file():
LOGGER.error(
- f'The folder: {self.path} contains both the complete and interrupted tag. Both tags will be deleted.')
+ f"The folder: {self.path} contains both the complete and interrupted tag. Both tags will be deleted."
+ )
complete_path.unlink()
interrupted_path.unlink()
- self.tags.remove('__complete__')
- self.tags.remove('__interrupted__')
- elif '__complete__' in self.tags:
+ self.tags.remove("__complete__")
+ self.tags.remove("__interrupted__")
+ elif "__complete__" in self.tags:
self.complete = True
- self.tags.remove('__complete__')
- elif '__interrupted__' in self.tags:
+ self.tags.remove("__complete__")
+ elif "__interrupted__" in self.tags:
self.interrupted = True
- self.tags.remove('__interrupted__')
+ self.tags.remove("__interrupted__")
self.tags_widget = ItemTagLabel(self.tags)
@@ -243,60 +261,65 @@ def add_file(self, path: Path) -> None:
model = self.model()
assert isinstance(model, FileModel)
- if path.name == '__star__.tag':
+ if path.name == "__star__.tag":
# Check if the item is not already trash.
- trash_path = path.parent.joinpath('__trash__.tag')
+ trash_path = path.parent.joinpath("__trash__.tag")
if trash_path.is_file():
path.unlink()
error_msg = QtWidgets.QMessageBox()
- error_msg.setText(f'Folder is already trash. Please do not add both __trash__ and __star__ tags in the same folder. '
- f' \n {path} was deleted ')
- error_msg.setWindowTitle(f'Deleting __star__.tag')
+ error_msg.setText(
+ f"Folder is already trash. Please do not add both __trash__ and __star__ tags in the same folder. "
+ f" \n {path} was deleted "
+ )
+ error_msg.setWindowTitle(f"Deleting __star__.tag")
error_msg.exec_()
return
else:
self.star = True
- elif path.name == '__trash__.tag':
+ elif path.name == "__trash__.tag":
# Check if the item is not already star.
- star_path = path.parent.joinpath('__star__.tag')
+ star_path = path.parent.joinpath("__star__.tag")
if star_path.is_file():
path.unlink()
error_msg = QtWidgets.QMessageBox()
error_msg.setText(
- f'Folder is already star. Please do not add both __trash__ and __star__ tags in the same folder. '
- f' \n {path} was deleted ')
- error_msg.setWindowTitle(f'Deleting __trash__.tag')
+ f"Folder is already star. Please do not add both __trash__ and __star__ tags in the same folder. "
+ f" \n {path} was deleted "
+ )
+ error_msg.setWindowTitle(f"Deleting __trash__.tag")
error_msg.exec_()
return
else:
self.trash = True
- elif path.name == '__complete__.tag':
+ elif path.name == "__complete__.tag":
# Check if the item is already tagged as interrupted.
- interrupted_path = path.parent.joinpath('__interrupted__.tag')
+ interrupted_path = path.parent.joinpath("__interrupted__.tag")
if interrupted_path.is_file():
path.unlink()
error_msg = QtWidgets.QMessageBox()
error_msg.setText(
- f'Folder is already tagged as interrupted. Please do not add both __complete__ and __interrupted__ tags in the same folder.\n'
- f'{path} was deleted.')
- error_msg.setWindowTitle(f'Deleting __complete__.tag')
+ f"Folder is already tagged as interrupted. Please do not add both __complete__ and __interrupted__ tags in the same folder.\n"
+ f"{path} was deleted."
+ )
+ error_msg.setWindowTitle(f"Deleting __complete__.tag")
error_msg.exec_()
return
else:
self.complete = True
- elif path.name == '__interrupted__.tag':
+ elif path.name == "__interrupted__.tag":
# Check if the item is already tagged as complete.
- complete_path = path.parent.joinpath('__complete__.tag')
+ complete_path = path.parent.joinpath("__complete__.tag")
if complete_path.is_file():
path.unlink()
error_msg = QtWidgets.QMessageBox()
error_msg.setText(
- f'Folder is already tagged as complete. Please do not add both __complete__ and __interrupted__ tags in the same folder.\n'
- f'{path} was deleted.')
- error_msg.setWindowTitle(f'Deleting __interrupted__.tag')
+ f"Folder is already tagged as complete. Please do not add both __complete__ and __interrupted__ tags in the same folder.\n"
+ f"{path} was deleted."
+ )
+ error_msg.setWindowTitle(f"Deleting __interrupted__.tag")
error_msg.exec_()
return
else:
@@ -327,13 +350,13 @@ def delete_file(self, path: Path) -> None:
self.tags_widget.delete_tag(path.stem)
model.tag_deleted(path.stem)
- if path.name == '__star__.tag':
+ if path.name == "__star__.tag":
self.star = False
- elif path.name == '__trash__.tag':
+ elif path.name == "__trash__.tag":
self.trash = False
- elif path.name == '__complete__.tag':
+ elif path.name == "__complete__.tag":
self.complete = False
- elif path.name == '__interrupted__.tag':
+ elif path.name == "__interrupted__.tag":
self.interrupted = False
model.item_files_changed(self)
@@ -381,6 +404,7 @@ class FileModel(QtGui.QStandardItemModel):
:param Parent: The parent of the model.
:param watcher_on: If False, the model will not start the watcher.
"""
+
# Signal(Path) -- Emitted when there has been an update to the currently selected folder.
#: Arguments:
#: - The path of the currently selected folder.
@@ -417,11 +441,17 @@ class FileModel(QtGui.QStandardItemModel):
#: - The deleted tag.
tag_deleted_signal = Signal(str)
- def __init__(self, monitor_path: str, rows: int, columns: int, parent: Optional[Any] = None,
- watcher_on: bool = True):
+ def __init__(
+ self,
+ monitor_path: str,
+ rows: int,
+ columns: int,
+ parent: Optional[Any] = None,
+ watcher_on: bool = True,
+ ):
super().__init__(rows, columns, parent=parent)
self.monitor_path = Path(monitor_path)
- self.header_labels = ['File path', 'Tags']
+ self.header_labels = ["File path", "Tags"]
self.currently_selected_folder = None
# The main dictionary has all the datasets (folders) Path as keys, with the actual item as its value.
@@ -429,7 +459,7 @@ def __init__(self, monitor_path: str, rows: int, columns: int, parent: Optional[
self.tags_dict: Dict[str, int] = {}
self.tags_model = QtGui.QStandardItemModel()
self.tags_model.dataChanged.connect(self.on_checked_tag_change)
- first_tag_item = QtGui.QStandardItem('Tag Filter')
+ first_tag_item = QtGui.QStandardItem("Tag Filter")
first_tag_item.setSelectable(False)
self.tags_model.insertRow(0, first_tag_item)
self.load_data()
@@ -507,14 +537,20 @@ def load_data(self) -> None:
# Directory_2: {file_1: file_type
# file_2: file_type}...}
data_dictionary = {
- Path(walk_entry[0]): {Path(walk_entry[0]).joinpath(file): ContentType.sort(file) for file in walk_entry[2]}
- for walk_entry in walk_results if SupportedDataTypes.check_valid_data(file_names=walk_entry[
- 2])}
+ Path(walk_entry[0]): {
+ Path(walk_entry[0]).joinpath(file): ContentType.sort(file)
+ for file in walk_entry[2]
+ }
+ for walk_entry in walk_results
+ if SupportedDataTypes.check_valid_data(file_names=walk_entry[2])
+ }
for folder_path, files_dict in data_dictionary.items():
self.sort_and_add_item(folder_path, files_dict)
- def sort_and_add_item(self, folder_path: Path, files_dict: Optional[Dict] = None) -> Optional[bool]:
+ def sort_and_add_item(
+ self, folder_path: Path, files_dict: Optional[Dict] = None
+ ) -> Optional[bool]:
"""
Adds one or more items into the model. New parent items are created if required.
@@ -527,28 +563,40 @@ def sort_and_add_item(self, folder_path: Path, files_dict: Optional[Dict] = None
if folder_path == self.monitor_path:
if files_dict is not None:
- LOGGER.warning(f'The following files in the monitoring directory will not be displayed: '
- f'\n{[str(file) for file in files_dict if file.is_file()]}\nplease move them to a specific folder')
+ LOGGER.warning(
+ f"The following files in the monitoring directory will not be displayed: "
+ f"\n{[str(file) for file in files_dict if file.is_file()]}\nplease move them to a specific folder"
+ )
else:
- LOGGER.warning(f'Files have been found in the monitoring folder, please remove them')
+ LOGGER.warning(
+ f"Files have been found in the monitoring folder, please remove them"
+ )
return False
# Checks if the item is in a hidden folder, if it is ignore.
- if any(part.startswith('.') for part in folder_path.parts): return None
+ if any(part.startswith(".") for part in folder_path.parts):
+ return None
# Check if the new item should have a parent item. If the new item should have a parent, but this does
# not yet exist, create it.
if folder_path.parent == self.monitor_path:
parent_item, parent_path = None, None
elif folder_path.parent in self.main_dictionary:
- parent_item, parent_path = \
- self.main_dictionary[folder_path.parent], folder_path.parent
+ parent_item, parent_path = (
+ self.main_dictionary[folder_path.parent],
+ folder_path.parent,
+ )
else:
- parent_folder_files = {file: ContentType.sort(file) for file in folder_path.parent.iterdir() if
- file.is_file()}
+ parent_folder_files = {
+ file: ContentType.sort(file)
+ for file in folder_path.parent.iterdir()
+ if file.is_file()
+ }
self.sort_and_add_item(folder_path.parent, parent_folder_files)
- parent_item, parent_path = \
- self.main_dictionary[folder_path.parent], folder_path.parent
+ parent_item, parent_path = (
+ self.main_dictionary[folder_path.parent],
+ folder_path.parent,
+ )
# Create Item and add it to the model
if files_dict is None:
@@ -562,7 +610,9 @@ def sort_and_add_item(self, folder_path: Path, files_dict: Optional[Dict] = None
else:
self.tags_dict[tag] = 1
new_tag_item = QtGui.QStandardItem(tag) # Item for the combox model
- new_tag_item.setFlags(QtCore.Qt.ItemIsUserCheckable | QtCore.Qt.ItemIsEnabled)
+ new_tag_item.setFlags(
+ QtCore.Qt.ItemIsUserCheckable | QtCore.Qt.ItemIsEnabled
+ )
new_tag_item.setData(QtCore.Qt.Unchecked, QtCore.Qt.CheckStateRole)
self.tags_model.setItem(self.tags_model.rowCount(), 0, new_tag_item)
@@ -599,7 +649,7 @@ def on_file_created(self, event: FileSystemEvent) -> None:
path = Path(str(event.src_path))
# If a folder is created, it will be added when a data file will be created.
- if not path.is_dir() and not any(part.startswith('.') for part in path.parts):
+ if not path.is_dir() and not any(part.startswith(".") for part in path.parts):
# If the file created is a lock, we ignore it.
if not is_file_lock(path):
@@ -609,14 +659,21 @@ def on_file_created(self, event: FileSystemEvent) -> None:
parent = self.main_dictionary[path.parent]
if path not in parent.files:
parent.add_file(path)
- if self.currently_selected_folder is not None and _is_relative_to(parent.path,
- self.currently_selected_folder):
+ if (
+ self.currently_selected_folder is not None
+ and _is_relative_to(
+ parent.path, self.currently_selected_folder
+ )
+ ):
self.update_me.emit(parent.path)
# If the parent of the file does not exist, we first need to check that file is valid data.
elif SupportedDataTypes.check_valid_data([path]):
- new_files_dict = {file: ContentType.sort(file) for file in path.parent.iterdir() if
- str(file.suffix) != ''}
+ new_files_dict = {
+ file: ContentType.sort(file)
+ for file in path.parent.iterdir()
+ if str(file.suffix) != ""
+ }
# If sort_and_add_item returns false, it means that it could not add an item because it was
# triggered for files in the monitoring directory.
@@ -624,8 +681,12 @@ def on_file_created(self, event: FileSystemEvent) -> None:
if added_status is None:
item = self.main_dictionary[path.parent]
# Send signal indicating that current folder requires update
- if self.currently_selected_folder is not None and \
- _is_relative_to(item.path, self.currently_selected_folder):
+ if (
+ self.currently_selected_folder is not None
+ and _is_relative_to(
+ item.path, self.currently_selected_folder
+ )
+ ):
self.update_me.emit(item.path)
@Slot(FileSystemEvent)
@@ -663,8 +724,10 @@ def on_file_deleted(self, event: FileSystemEvent) -> None:
all_folder_files = [file for file in parent.path.iterdir()]
# Checks if the folder itself needs to be deleted or only the file
- if SupportedDataTypes.check_valid_data(
- all_folder_files) or parent.hasChildren():
+ if (
+ SupportedDataTypes.check_valid_data(all_folder_files)
+ or parent.hasChildren()
+ ):
parent.delete_file(path)
else:
# If the parent needs to be deleted, removes it from the correct widget.
@@ -676,8 +739,9 @@ def on_file_deleted(self, event: FileSystemEvent) -> None:
else:
parent.delete_file(path)
# Send signal indicating that current folder requires update.
- if self.currently_selected_folder is not None and \
- _is_relative_to(parent.path, self.currently_selected_folder):
+ if self.currently_selected_folder is not None and _is_relative_to(
+ parent.path, self.currently_selected_folder
+ ):
# Checks if the folder still exists. If the user has the folder that is getting deleted at that
# moment, no update should happen.
if self.currently_selected_folder.is_dir():
@@ -690,7 +754,11 @@ def _delete_all_children_from_main_dictionary(self, item: Item) -> None:
:param item: The item whose children should be deleted.
"""
path = item.path
- children_folders = [key for key in self.main_dictionary.keys() if _is_relative_to(key, path) and key != path]
+ children_folders = [
+ key
+ for key in self.main_dictionary.keys()
+ if _is_relative_to(key, path) and key != path
+ ]
for child in children_folders:
if child in self.main_dictionary:
child_item = self.main_dictionary[child]
@@ -709,8 +777,12 @@ def on_file_moved(self, event: FileSystemMovedEvent) -> None:
# File moved gets triggered with None and '', for the event paths. From what I can tell, they are not useful,
# so we ignore them.
- if event.src_path is not None and event.src_path != '' \
- and event.dest_path is not None and event.dest_path != '':
+ if (
+ event.src_path is not None
+ and event.src_path != ""
+ and event.dest_path is not None
+ and event.dest_path != ""
+ ):
src_path = Path(str(event.src_path))
dest_path = Path(str(event.dest_path))
@@ -722,8 +794,9 @@ def on_file_moved(self, event: FileSystemMovedEvent) -> None:
changed_item.change_path(dest_path)
# Checking for a file becoming a data file.
- elif not SupportedDataTypes.check_valid_data([src_path]) and SupportedDataTypes.check_valid_data(
- [dest_path]):
+ elif not SupportedDataTypes.check_valid_data(
+ [src_path]
+ ) and SupportedDataTypes.check_valid_data([dest_path]):
# If the parent exists in the main dictionary, the model already has all the files and its tracking
# that folder, only updates the file itself.
if src_path.parent in self.main_dictionary:
@@ -737,14 +810,18 @@ def on_file_moved(self, event: FileSystemMovedEvent) -> None:
# New folder to keep track.
else:
- new_entry = {file: ContentType.sort(file) for file in dest_path.parent.iterdir() if
- str(file.suffix) != ''}
+ new_entry = {
+ file: ContentType.sort(file)
+ for file in dest_path.parent.iterdir()
+ if str(file.suffix) != ""
+ }
self.sort_and_add_item(dest_path.parent, new_entry)
parent = self.main_dictionary[dest_path.parent]
# Checking if a data file stops being a data file.
- elif SupportedDataTypes.check_valid_data([src_path]) and not SupportedDataTypes.check_valid_data(
- [dest_path]):
+ elif SupportedDataTypes.check_valid_data(
+ [src_path]
+ ) and not SupportedDataTypes.check_valid_data([dest_path]):
if src_path.parent in self.main_dictionary:
parent = self.main_dictionary[src_path.parent]
elif dest_path.parent in self.main_dictionary:
@@ -756,8 +833,10 @@ def on_file_moved(self, event: FileSystemMovedEvent) -> None:
# Checks if there are other data files in the parent.
parent_files = [key for key in parent.files.keys()]
- if not SupportedDataTypes.check_valid_data(
- parent_files) and not parent.hasChildren():
+ if (
+ not SupportedDataTypes.check_valid_data(parent_files)
+ and not parent.hasChildren()
+ ):
# If the parent has other children, it means there are more data files down the file tree
# and the model should keep track of these folders.
del self.main_dictionary[parent.path]
@@ -792,17 +871,23 @@ def on_file_moved(self, event: FileSystemMovedEvent) -> None:
if dest_path not in parent.files:
parent.add_file(dest_path)
- if self.currently_selected_folder is not None and _is_relative_to(dest_path, self.currently_selected_folder):
+ if self.currently_selected_folder is not None and _is_relative_to(
+ dest_path, self.currently_selected_folder
+ ):
# This happens when a top level item is changed.
if parent is None:
- check = self.check_all_files_are_valid(self.main_dictionary[dest_path], dest_path)[0]
+ check = self.check_all_files_are_valid(
+ self.main_dictionary[dest_path], dest_path
+ )[0]
else:
check = self.check_all_files_are_valid(parent, parent.path)[0]
if check:
self.update_me.emit(self.currently_selected_folder)
- def check_all_files_are_valid(self, item: Item, first_path: Path) -> Tuple[bool, Path]:
+ def check_all_files_are_valid(
+ self, item: Item, first_path: Path
+ ) -> Tuple[bool, Path]:
"""
Checks that all the files inside of the item have a valid path. This is used when changing the name of currently
selected folders to see if an update to change the folders should be triggered or not.
@@ -843,8 +928,9 @@ def on_file_modified(self, event: FileSystemEvent) -> None:
parent = self.main_dictionary[path.parent]
# If the folder is not currently being selected I don't care about modifications.
- if self.currently_selected_folder is not None and _is_relative_to(parent.path,
- self.currently_selected_folder):
+ if self.currently_selected_folder is not None and _is_relative_to(
+ parent.path, self.currently_selected_folder
+ ):
# If im expecting this update, ignore it.
if path in self.modified_exceptions:
@@ -887,7 +973,9 @@ def update_currently_selected_folder(self, path: Path) -> None:
self.modified_exceptions = self._get_all_files_of_item(item)
self.currently_selected_folder = path
- def _get_all_files_of_item(self, item: Item, partial_list: List[Path] = []) -> List[Path]:
+ def _get_all_files_of_item(
+ self, item: Item, partial_list: List[Path] = []
+ ) -> List[Path]:
"""
Recursively gets a list of all the files that are in item and all of its children.
@@ -897,7 +985,7 @@ def _get_all_files_of_item(self, item: Item, partial_list: List[Path] = []) -> L
partial_list = partial_list + [file for file in item.files.keys()]
if item.hasChildren():
for i in range(item.rowCount()):
- child = item.child(i,0)
+ child = item.child(i, 0)
assert isinstance(child, Item)
partial_list = partial_list + self._get_all_files_of_item(child)
return partial_list
@@ -919,9 +1007,9 @@ def tag_action_triggered(self, item_index: QtCore.QModelIndex, tag: str) -> None
item = self.itemFromIndex(item_index)
assert isinstance(item, Item)
path = item.path
- star_path = path.joinpath('__star__.tag')
- trash_path = path.joinpath('__trash__.tag')
- if tag == 'star':
+ star_path = path.joinpath("__star__.tag")
+ trash_path = path.joinpath("__trash__.tag")
+ if tag == "star":
# If a trash file in the star folder exists, delete it.
if trash_path.is_file():
trash_path.unlink()
@@ -930,9 +1018,9 @@ def tag_action_triggered(self, item_index: QtCore.QModelIndex, tag: str) -> None
if star_path.is_file():
star_path.unlink()
else:
- with open(star_path, 'w') as file:
- file.write('')
- elif tag == 'trash':
+ with open(star_path, "w") as file:
+ file.write("")
+ elif tag == "trash":
# If a star file in the star folder exists, delete it.
if star_path.is_file():
star_path.unlink()
@@ -941,16 +1029,16 @@ def tag_action_triggered(self, item_index: QtCore.QModelIndex, tag: str) -> None
if trash_path.is_file():
trash_path.unlink()
else:
- with open(trash_path, 'w') as file:
- file.write('')
+ with open(trash_path, "w") as file:
+ file.write("")
else:
- tag_path = path.joinpath(tag + '.tag')
+ tag_path = path.joinpath(tag + ".tag")
if tag_path.is_file():
tag_path.unlink()
else:
- with open(tag_path, 'w') as file:
- file.write('')
+ with open(tag_path, "w") as file:
+ file.write("")
def delete_item(self, item_index: QtCore.QModelIndex) -> None:
"""
@@ -1013,8 +1101,11 @@ def tag_deleted(self, tag: str) -> None:
self.tag_deleted_signal.emit(tag)
def currently_selected_tags(self) -> List[str]:
- return [self.tags_model.item(i, 0).text() for i in range(self.tags_model.rowCount()) if
- self.tags_model.item(i, 0).checkState()]
+ return [
+ self.tags_model.item(i, 0).text()
+ for i in range(self.tags_model.rowCount())
+ if self.tags_model.item(i, 0).checkState()
+ ]
@Slot()
def on_checked_tag_change(self) -> None:
@@ -1058,13 +1149,20 @@ def setSourceModel(self, sourceModel: QtCore.QAbstractItemModel) -> None:
self.allowed_items = [item for item in sourceModel.main_dictionary.values()]
super().setSourceModel(sourceModel)
- def filter_requested(self, allowed_items: List[QtGui.QStandardItem], star_status: bool, trash_status: bool) -> None:
+ def filter_requested(
+ self,
+ allowed_items: List[QtGui.QStandardItem],
+ star_status: bool,
+ trash_status: bool,
+ ) -> None:
self.star_status = star_status
self.trash_status = trash_status
self.allowed_items = allowed_items
self.trigger_filter()
- def filterAcceptsRow(self, source_row: int, source_parent: QtCore.QModelIndex) -> bool:
+ def filterAcceptsRow(
+ self, source_row: int, source_parent: QtCore.QModelIndex
+ ) -> bool:
"""
Override of the QSortFilterProxyModel. Our custom filtering needs are implemented here.
Checks whether or not to show the item against its allowed items list.
@@ -1082,7 +1180,11 @@ def filterAcceptsRow(self, source_row: int, source_parent: QtCore.QModelIndex) -
else:
item = parent_item.child(source_row, 0)
- if self.allowed_items is None and self.star_status == False and self.trash_status == False:
+ if (
+ self.allowed_items is None
+ and self.star_status == False
+ and self.trash_status == False
+ ):
if item is not None:
item.show = True
return True
@@ -1145,16 +1247,16 @@ def __init__(self, proxy_model: SortFilterProxyModel, parent: Optional[Any] = No
assert isinstance(model, FileModel)
self.model_ = model
self.collapsed_state: Dict[QtCore.QPersistentModelIndex, bool] = {}
- self.star_text = 'star'
- self.un_star_text = 'un-star'
- self.trash_text = 'trash'
- self.un_trash_text = 'un-trash'
+ self.star_text = "star"
+ self.un_star_text = "un-star"
+ self.trash_text = "trash"
+ self.un_trash_text = "un-trash"
self.context_menu = QtWidgets.QMenu(self)
- self.copy_path_action = QtWidgets.QAction('copy path')
- self.star_action = QtWidgets.QAction('star')
- self.trash_action = QtWidgets.QAction('trash')
- self.delete_action = QtWidgets.QAction('delete')
+ self.copy_path_action = QtWidgets.QAction("copy path")
+ self.star_action = QtWidgets.QAction("star")
+ self.trash_action = QtWidgets.QAction("trash")
+ self.delete_action = QtWidgets.QAction("delete")
self.tag_actions: Dict[str, QtWidgets.QAction] = {}
for tag in self.model_.tags_dict.keys():
if tag not in self.tag_actions:
@@ -1259,7 +1361,7 @@ def on_context_menu_requested(self, pos: QtCore.QPoint) -> None:
self.context_menu.addSeparator()
for tag, action in self.tag_actions.items():
if tag in item.tags:
- action.setText('un-' + tag)
+ action.setText("un-" + tag)
else:
action.setText(tag)
self.context_menu.addAction(action)
@@ -1294,7 +1396,7 @@ def on_delete_tag_action(self, deleted_tag: str) -> None:
@Slot(QtWidgets.QAction)
def on_context_action_triggered(self, action: QtWidgets.QAction) -> None:
tag = action.text()
- if tag[0:3] == 'un-':
+ if tag[0:3] == "un-":
tag = tag[3:]
item_proxy_index = self.currentIndex()
@@ -1308,7 +1410,9 @@ def on_context_action_triggered(self, action: QtWidgets.QAction) -> None:
self.model_.tag_action_triggered(item_index, tag)
- def currentChanged(self, current: QtCore.QModelIndex, previous: QtCore.QModelIndex) -> None:
+ def currentChanged(
+ self, current: QtCore.QModelIndex, previous: QtCore.QModelIndex
+ ) -> None:
"""
Gets called everytime the selection of the tree changes. Emits a signal indicating the current and previous
selected item.
@@ -1331,7 +1435,9 @@ def create_collapsed_state(self, incoming_item: Optional[Item] = None) -> None:
if item.show:
source_index = self.model_.index(i, 0, QtCore.QModelIndex())
index = self.proxy_model.mapFromSource(source_index)
- self.collapsed_state[QtCore.QPersistentModelIndex(source_index)] = self.isExpanded(index)
+ self.collapsed_state[QtCore.QPersistentModelIndex(source_index)] = (
+ self.isExpanded(index)
+ )
if item.hasChildren():
self.create_collapsed_state(incoming_item=item)
@@ -1343,7 +1449,9 @@ def create_collapsed_state(self, incoming_item: Optional[Item] = None) -> None:
if child.show:
source_index = self.model_.indexFromItem(child)
child_index = self.proxy_model.mapFromSource(source_index)
- self.collapsed_state[QtCore.QPersistentModelIndex(source_index)] = self.isExpanded(child_index)
+ self.collapsed_state[QtCore.QPersistentModelIndex(source_index)] = (
+ self.isExpanded(child_index)
+ )
if child.hasChildren():
self.create_collapsed_state(incoming_item=child)
@@ -1353,7 +1461,12 @@ def restore_previous_collapsed_state(self) -> None:
"""
for persistent_index, state in self.collapsed_state.items():
proxy_index = self.proxy_model.mapFromSource(
- self.model_.index(persistent_index.row(), persistent_index.column(), persistent_index.parent()))
+ self.model_.index(
+ persistent_index.row(),
+ persistent_index.column(),
+ persistent_index.parent(),
+ )
+ )
self.setExpanded(proxy_index, state)
@@ -1368,13 +1481,28 @@ class FilterWorker(QtCore.QObject):
# dictionary have passed the filtering. Second item is the queries dictionary.
finished = Signal(tuple)
- def run(self, model: FileModel, star_status: bool, trash_status: bool, filter: str, tag_filter: List[str] = []) -> None:
- filter_dict = self.filter_items(model, star_status, trash_status, filter, tag_filter)
+ def run(
+ self,
+ model: FileModel,
+ star_status: bool,
+ trash_status: bool,
+ filter: str,
+ tag_filter: List[str] = [],
+ ) -> None:
+ filter_dict = self.filter_items(
+ model, star_status, trash_status, filter, tag_filter
+ )
if filter_dict is not None:
self.finished.emit(filter_dict)
- def filter_items(self, model: FileModel, star_status: bool, trash_status: bool, filter: str,
- tag_filter: List[str] = []) -> Optional[Tuple[Dict[Path, Item], Dict[str, List[str]]]]:
+ def filter_items(
+ self,
+ model: FileModel,
+ star_status: bool,
+ trash_status: bool,
+ filter: str,
+ tag_filter: List[str] = [],
+ ) -> Optional[Tuple[Dict[Path, Item], Dict[str, List[str]]]]:
"""
Process the text in filter, separates them into the different queries and filters the items.
@@ -1423,7 +1551,9 @@ def filter_items(self, model: FileModel, star_status: bool, trash_status: bool,
assert trashed_dict is not None
if item.path not in trashed_dict and item.trash:
# When trashing an item, keep a record that it has been trashed and all of its children.
- trashed_dict, current_dict = self._trash_item(item, trashed_dict, current_dict)
+ trashed_dict, current_dict = self._trash_item(
+ item, trashed_dict, current_dict
+ )
if trashed_dict is None:
return None
continue
@@ -1436,22 +1566,31 @@ def filter_items(self, model: FileModel, star_status: bool, trash_status: bool,
for query_type, queries in queries_dict.items():
if self.thread().isInterruptionRequested():
return None
- if query_type == 'name':
+ if query_type == "name":
for query in queries:
if self.thread().isInterruptionRequested():
return None
match_pattern = re.compile(query, flags=re.IGNORECASE)
- new_matches = {path: item for path, item in current_dict.items() if
- match_pattern.search(str(path))}
+ new_matches = {
+ path: item
+ for path, item in current_dict.items()
+ if match_pattern.search(str(path))
+ }
current_dict = new_matches
else:
for query in queries:
if self.thread().isInterruptionRequested():
return None
match_pattern = re.compile(query, flags=re.IGNORECASE)
- new_matches = {path: item for path, item in current_dict.items()
- for file_path, file_type in item.files.items()
- if (file_type == ContentType.sort(query_type) and match_pattern.search(str(file_path.name)))}
+ new_matches = {
+ path: item
+ for path, item in current_dict.items()
+ for file_path, file_type in item.files.items()
+ if (
+ file_type == ContentType.sort(query_type)
+ and match_pattern.search(str(file_path.name))
+ )
+ }
current_dict = new_matches
# Add all the children and parents (if these have not been trashed) of the passed items.
@@ -1476,8 +1615,12 @@ def filter_items(self, model: FileModel, star_status: bool, trash_status: bool,
return current_dict, queries_dict
- def _add_parent(self, item: Item, adding_dict: Optional[Dict[Path, Item]],
- trashed_dictionary: Dict[Path, Item]) -> Optional[Dict[Path, Item]]:
+ def _add_parent(
+ self,
+ item: Item,
+ adding_dict: Optional[Dict[Path, Item]],
+ trashed_dictionary: Dict[Path, Item],
+ ) -> Optional[Dict[Path, Item]]:
"""
Adds all the parents (if these have not been trashed) of item to adding_dict.
@@ -1503,8 +1646,12 @@ def _add_parent(self, item: Item, adding_dict: Optional[Dict[Path, Item]],
adding_dict[parent_item.path] = parent_item
return adding_dict
- def _add_children(self, item: Item, adding_dict: Optional[Dict[Path, Item]],
- trashed_dictionary: Optional[Dict[Path, Item]]) -> Optional[Dict[Path, Item]]:
+ def _add_children(
+ self,
+ item: Item,
+ adding_dict: Optional[Dict[Path, Item]],
+ trashed_dictionary: Optional[Dict[Path, Item]],
+ ) -> Optional[Dict[Path, Item]]:
"""
Adds all the children of an item (if these haven not be trashed) to adding_dict.
@@ -1522,15 +1669,21 @@ def _add_children(self, item: Item, adding_dict: Optional[Dict[Path, Item]],
if child.path in trashed_dictionary:
continue
if child.hasChildren():
- adding_dict = self._add_children(child, adding_dict, trashed_dictionary)
+ adding_dict = self._add_children(
+ child, adding_dict, trashed_dictionary
+ )
if adding_dict is None:
return None
adding_dict[child.path] = child
return adding_dict
- def _trash_item(self, item: Item, trashed_dict: Optional[Dict[Path, Item]], current_dict: Dict[Path, Item]) -> Tuple[
- Optional[Dict[Path, Item]], Dict[Path, Item]]:
+ def _trash_item(
+ self,
+ item: Item,
+ trashed_dict: Optional[Dict[Path, Item]],
+ current_dict: Dict[Path, Item],
+ ) -> Tuple[Optional[Dict[Path, Item]], Dict[Path, Item]]:
"""
Trashes an item and all of its children items. Removes the items trashed from current_dict.
@@ -1544,7 +1697,9 @@ def _trash_item(self, item: Item, trashed_dict: Optional[Dict[Path, Item]], curr
return None, {}
child = item.child(i, 0)
assert isinstance(child, Item)
- trashed_dict, current_dict = self._trash_item(child, trashed_dict, current_dict)
+ trashed_dict, current_dict = self._trash_item(
+ child, trashed_dict, current_dict
+ )
if trashed_dict is None:
return None, {}
@@ -1555,7 +1710,9 @@ def _trash_item(self, item: Item, trashed_dict: Optional[Dict[Path, Item]], curr
return trashed_dict, current_dict
@classmethod
- def parse_queries(cls, filter: str, tag_filter: List[str] = []) -> Dict[str, List[str]]:
+ def parse_queries(
+ cls, filter: str, tag_filter: List[str] = []
+ ) -> Dict[str, List[str]]:
"""
Separates a string of queries into a dictionary where the queries are organized by categories.
@@ -1575,9 +1732,14 @@ def parse_queries(cls, filter: str, tag_filter: List[str] = []) -> Dict[str, Lis
:returns: Dictionary with the keys: tag, md, image, json, and name. Each contains a list with the queries
for each respective category.
"""
- raw_queries = filter.split(',')
- queries_with_empty_spaces = [item[1:] if len(item) >= 1 and item[0] == " " else item for item in raw_queries]
- queries = [item for item in queries_with_empty_spaces if item != '' and item != ' ']
+ raw_queries = filter.split(",")
+ queries_with_empty_spaces = [
+ item[1:] if len(item) >= 1 and item[0] == " " else item
+ for item in raw_queries
+ ]
+ queries = [
+ item for item in queries_with_empty_spaces if item != "" and item != " "
+ ]
queries_dict = {}
if len(queries) > 0 or len(tag_filter) > 0:
@@ -1587,32 +1749,34 @@ def parse_queries(cls, filter: str, tag_filter: List[str] = []) -> Dict[str, Lis
json_queries = []
name_queries = []
for query in queries:
- if query != '':
- if query[:4] == 'tag:':
+ if query != "":
+ if query[:4] == "tag:":
tag_queries.append(cls._remove_whitespace(query[4:]))
- elif query[:2] == 't:' or query[:2] == 'T:':
+ elif query[:2] == "t:" or query[:2] == "T:":
tag_queries.append(cls._remove_whitespace(query[2:]))
- elif query[:3] == 'md:':
+ elif query[:3] == "md:":
md_queries.append(cls._remove_whitespace(query[3:]))
- elif query[:2] == 'm:' or query[:2] == 'M:':
+ elif query[:2] == "m:" or query[:2] == "M:":
md_queries.append(cls._remove_whitespace(query[2:]))
- elif query[:6] == 'image:':
+ elif query[:6] == "image:":
image_queries.append(cls._remove_whitespace(query[6:]))
- elif query[:2] == 'i:' or query[:2] == 'I:':
+ elif query[:2] == "i:" or query[:2] == "I:":
image_queries.append(cls._remove_whitespace(query[2:]))
- elif query[:5] == 'json:':
+ elif query[:5] == "json:":
json_queries.append(cls._remove_whitespace(query[5:]))
- elif query[:2] == 'j:' or query[:2] == 'J:':
+ elif query[:2] == "j:" or query[:2] == "J:":
json_queries.append(cls._remove_whitespace(query[2:]))
else:
name_queries.append(cls._remove_whitespace(query))
tag_queries = list(set(tag_queries + tag_filter))
- queries_dict = {'tag': tag_queries,
- 'md': md_queries,
- 'image': image_queries,
- 'json': json_queries,
- 'name': name_queries, }
+ queries_dict = {
+ "tag": tag_queries,
+ "md": md_queries,
+ "image": image_queries,
+ "json": json_queries,
+ "name": name_queries,
+ }
return queries_dict
@@ -1624,14 +1788,21 @@ def _remove_whitespace(cls, text: str) -> str:
:param text: The string we want to remove the initial or ending whitespace.
"""
if len(text) > 0:
- if text[0] == ' ':
+ if text[0] == " ":
text = text[1:]
- if len(text) > 0 and text[-1] == ' ':
+ if len(text) > 0 and text[-1] == " ":
text = text[0:-1]
return text
@classmethod
- def is_item_shown(cls, item: Item, filter: str, tag_filter: List[str], star_status: bool, trash_status: bool) -> bool:
+ def is_item_shown(
+ cls,
+ item: Item,
+ filter: str,
+ tag_filter: List[str],
+ star_status: bool,
+ trash_status: bool,
+ ) -> bool:
"""
Checks if the item should be currently shown. True if it should, False if it shouldn't.
It takes into account all the rules of normal filtering.
@@ -1659,7 +1830,9 @@ def is_item_shown(cls, item: Item, filter: str, tag_filter: List[str], star_stat
return True
@classmethod
- def _item_check(cls, item: Item, star_status: bool, queries_dict: Dict[str, List[str]]) -> bool:
+ def _item_check(
+ cls, item: Item, star_status: bool, queries_dict: Dict[str, List[str]]
+ ) -> bool:
"""
Checks if the item passes the queries in the queries dict, Including if the item is a star when the star status
is activated.
@@ -1674,7 +1847,7 @@ def _item_check(cls, item: Item, star_status: bool, queries_dict: Dict[str, List
return False
for query_type, queries in queries_dict.items():
- if query_type == 'name':
+ if query_type == "name":
for query in queries:
match_pattern = re.compile(query, flags=re.IGNORECASE)
if not match_pattern.search(str(item.path)):
@@ -1682,22 +1855,30 @@ def _item_check(cls, item: Item, star_status: bool, queries_dict: Dict[str, List
else:
if len(queries) > 0:
sorted_query_type = ContentType.sort(query_type)
- correct_files_type = [file_path for file_path, file_type in item.files.items() if
- file_type == sorted_query_type]
+ correct_files_type = [
+ file_path
+ for file_path, file_type in item.files.items()
+ if file_type == sorted_query_type
+ ]
if not len(correct_files_type) > 0:
return False
for query in queries:
match_pattern = re.compile(query, flags=re.IGNORECASE)
- matches = [match_pattern.search(str(path)) for path in correct_files_type]
+ matches = [
+ match_pattern.search(str(path))
+ for path in correct_files_type
+ ]
if not any(matches):
return False
return True
@classmethod
- def _parents_query_check(cls, item: Item, star_status: bool, queries_dict: Dict[str, List[str]]) -> bool:
+ def _parents_query_check(
+ cls, item: Item, star_status: bool, queries_dict: Dict[str, List[str]]
+ ) -> bool:
"""
Checks recursively if any parent of the item passes the query check.
@@ -1711,10 +1892,16 @@ def _parents_query_check(cls, item: Item, star_status: bool, queries_dict: Dict[
if cls._item_check(item, star_status, queries_dict):
return True
- return cls._item_check(parent, star_status, queries_dict, )
+ return cls._item_check(
+ parent,
+ star_status,
+ queries_dict,
+ )
@classmethod
- def _children_query_check(cls, item: Item, star_status: bool, queries_dict: Dict[str, List[str]]) -> bool:
+ def _children_query_check(
+ cls, item: Item, star_status: bool, queries_dict: Dict[str, List[str]]
+ ) -> bool:
"""
Checks recursively if any child of the item passes the query check.
@@ -1761,8 +1948,13 @@ class FileExplorer(QtWidgets.QWidget):
Helper widget to unify the FileTree with the line edit and status buttons.
"""
- def __init__(self, proxy_model: SortFilterProxyModel, parent: Optional[Any]=None,
- *args: Any, **kwargs: Any):
+ def __init__(
+ self,
+ proxy_model: SortFilterProxyModel,
+ parent: Optional[Any] = None,
+ *args: Any,
+ **kwargs: Any,
+ ):
super().__init__(parent=parent, *args, **kwargs) # type: ignore[misc] # I suspect this error comes from having parent possibly be a kwarg too.
# Holds all the current .ddh5 file paths that are currently being displayed.
@@ -1781,16 +1973,18 @@ def __init__(self, proxy_model: SortFilterProxyModel, parent: Optional[Any]=None
self.bottom_buttons_layout = QtWidgets.QHBoxLayout()
self.filter_line_edit = QtWidgets.QLineEdit()
- self.filter_line_edit.setPlaceholderText('Filter Items')
-
- self.star_button = QtWidgets.QPushButton('Star')
- self.trash_button = QtWidgets.QPushButton('Hide Trash')
- self.refresh_button = QtWidgets.QPushButton('Refresh')
- self.expand_button = QtWidgets.QPushButton('Expand')
- self.collapse_button = QtWidgets.QPushButton('Collapse')
- self.copy_button = QtWidgets.QPushButton('Copy Path')
+ self.filter_line_edit.setPlaceholderText("Filter Items")
+
+ self.star_button = QtWidgets.QPushButton("Star")
+ self.trash_button = QtWidgets.QPushButton("Hide Trash")
+ self.refresh_button = QtWidgets.QPushButton("Refresh")
+ self.expand_button = QtWidgets.QPushButton("Expand")
+ self.collapse_button = QtWidgets.QPushButton("Collapse")
+ self.copy_button = QtWidgets.QPushButton("Copy Path")
self.tag_filter_combobox = QtWidgets.QComboBox()
- self.tag_filter_combobox.setSizePolicy(QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Minimum)
+ self.tag_filter_combobox.setSizePolicy(
+ QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Minimum
+ )
self.tag_filter_combobox.setModel(self.model.tags_model)
self.selected_tags: List[str] = []
self.queries_dict: Dict[str, List[str]] = {}
@@ -1799,7 +1993,9 @@ def __init__(self, proxy_model: SortFilterProxyModel, parent: Optional[Any]=None
self.trash_button.setCheckable(True)
self.loading_label: Optional[IconLabel] = None
- self.loading_movie = QtGui.QMovie(os.path.join(plottrPath, 'resource', 'gfx', "loading_gif.gif"))
+ self.loading_movie = QtGui.QMovie(
+ os.path.join(plottrPath, "resource", "gfx", "loading_gif.gif")
+ )
self.filter_worker: Optional[FilterWorker] = None
self.filter_thread: Optional[QtCore.QThread] = None
@@ -1857,7 +2053,9 @@ def on_filter_triggered(self, filter: str) -> None:
:param filter: The string of the line edit.
"""
if self.loading_label is None:
- self.loading_label = IconLabel(self.loading_movie, self.star_button.height())
+ self.loading_label = IconLabel(
+ self.loading_movie, self.star_button.height()
+ )
self.filter_and_buttons_layout.insertWidget(1, self.loading_label)
self.loading_label.start_animation()
@@ -1884,7 +2082,9 @@ def on_filter_triggered(self, filter: str) -> None:
self.filter_thread.start()
@Slot(tuple)
- def on_finished_filtering(self, filtering_results: Tuple[Dict[Path, Item], Dict[str, List[str]]]) -> None:
+ def on_finished_filtering(
+ self, filtering_results: Tuple[Dict[Path, Item], Dict[str, List[str]]]
+ ) -> None:
"""
Gets called when the FilterWorker is done filtering. Ends the loading animation and the thread and triggers the
filtering in the proxy model.
@@ -1904,7 +2104,11 @@ def on_finished_filtering(self, filtering_results: Tuple[Dict[Path, Item], Dict[
self.queries_dict = queries_dict
items_list = [item for item in results_dict.values()]
- self.proxy_model.filter_requested(list(items_list), self.star_button.isChecked(), self.trash_button.isChecked())
+ self.proxy_model.filter_requested(
+ list(items_list),
+ self.star_button.isChecked(),
+ self.trash_button.isChecked(),
+ )
self.on_create_path_list()
@@ -1925,7 +2129,7 @@ def on_new_item_created(self, item: Item) -> None:
return
for query_type, queries in self.queries_dict.items():
- if query_type == 'name':
+ if query_type == "name":
for query in queries:
match_pattern = re.compile(query, flags=re.IGNORECASE)
if not match_pattern.search(str(item.path)):
@@ -1951,8 +2155,13 @@ def on_existing_item_files_updated(self, item: Item) -> None:
:param item: The item whose files changed.
"""
- should_item_show = FilterWorker.is_item_shown(item, self.filter_line_edit.text(), self.selected_tags,
- self.star_button.isChecked(), self.trash_button.isChecked())
+ should_item_show = FilterWorker.is_item_shown(
+ item,
+ self.filter_line_edit.text(),
+ self.selected_tags,
+ self.star_button.isChecked(),
+ self.trash_button.isChecked(),
+ )
if should_item_show:
if not item in self.proxy_model.allowed_items:
@@ -1963,7 +2172,9 @@ def on_existing_item_files_updated(self, item: Item) -> None:
self.proxy_model.allowed_items.remove(item)
self.proxy_model.trigger_filter()
- def on_create_path_list(self, item_index: Optional[QtCore.QModelIndex] = None) -> None:
+ def on_create_path_list(
+ self, item_index: Optional[QtCore.QModelIndex] = None
+ ) -> None:
"""
Creates the path list for the copy button. The path list is a list of all the paths of the data files of the
allowed items. If no item_index is passed, it will copy all the items that are currently shown by the view.
@@ -1986,11 +2197,15 @@ def on_create_path_list(self, item_index: Optional[QtCore.QModelIndex] = None) -
self.path_list.append(str(path))
if len(self.path_list) > 10:
- self.copy_button.setToolTip('Copy the paths of the data files for the currently filtered items.')
+ self.copy_button.setToolTip(
+ "Copy the paths of the data files for the currently filtered items."
+ )
else:
if len(self.path_list) == 1:
self.path_list = "'" + str(self.path_list[0]) + "'"
- self.copy_button.setToolTip('Copy the following paths to clipboard:\n' + str(self.path_list))
+ self.copy_button.setToolTip(
+ "Copy the following paths to clipboard:\n" + str(self.path_list)
+ )
@Slot()
def on_copy_button_clicked(self) -> None:
@@ -2044,7 +2259,14 @@ class DataTreeWidget(QtWidgets.QTreeWidget):
plot_requested = Signal(Path)
# incoming_data: Dict[str, Union[Path, str, DataDict]]
- def __init__(self, paths: List[Path], names: List[str], data: DataDict, *args: Any, **kwargs: Any):
+ def __init__(
+ self,
+ paths: List[Path],
+ names: List[str],
+ data: DataDict,
+ *args: Any,
+ **kwargs: Any,
+ ):
super().__init__(*args, **kwargs)
header_item = self.headerItem()
@@ -2057,7 +2279,7 @@ def __init__(self, paths: List[Path], names: List[str], data: DataDict, *args: A
self.data = data
# Popup menu.
- self.plot_popup_action = QtWidgets.QAction('Plot')
+ self.plot_popup_action = QtWidgets.QAction("Plot")
self.popup_menu = QtWidgets.QMenu(self)
self.plot_popup_action.triggered.connect(self.emit_plot_requested_signal)
@@ -2073,25 +2295,31 @@ def set_data(self) -> None:
"""
for index, data in enumerate(self.data):
- parent_tree_widget = DataTreeWidgetItem(self.paths[index], self, [self.names[index]])
+ parent_tree_widget = DataTreeWidgetItem(
+ self.paths[index], self, [self.names[index]]
+ )
- data_parent = QtWidgets.QTreeWidgetItem(parent_tree_widget, ['Data'])
- meta_parent = QtWidgets.QTreeWidgetItem(parent_tree_widget, ['Meta'])
+ data_parent = QtWidgets.QTreeWidgetItem(parent_tree_widget, ["Data"])
+ meta_parent = QtWidgets.QTreeWidgetItem(parent_tree_widget, ["Meta"])
for name, value in data.data_items():
- column_content = [name, str(data.meta_val('shape', name))]
+ column_content = [name, str(data.meta_val("shape", name))]
if name in data.dependents():
- column_content.append(f'Depends on {str(tuple(data.axes(name)))}')
+ column_content.append(f"Depends on {str(tuple(data.axes(name)))}")
else:
- column_content.append(f'Independent')
+ column_content.append(f"Independent")
parameter_item = QtWidgets.QTreeWidgetItem(data_parent, column_content)
for meta_name, meta_value in data.meta_items(name):
- parameter_meta_item = QtWidgets.QTreeWidgetItem(parameter_item, [meta_name, str(meta_value)])
+ parameter_meta_item = QtWidgets.QTreeWidgetItem(
+ parameter_item, [meta_name, str(meta_value)]
+ )
for name, value in data.meta_items():
- parameter_meta_item = QtWidgets.QTreeWidgetItem(meta_parent, [name, str(value)])
+ parameter_meta_item = QtWidgets.QTreeWidgetItem(
+ meta_parent, [name, str(value)]
+ )
parent_tree_widget.setExpanded(True)
data_parent.setExpanded(True)
@@ -2138,7 +2366,7 @@ def sizeHint(self) -> QtCore.QSize:
index = self.indexFromItem(it.value())
height += self.rowHeight(index)
it += 1 # type: ignore[assignment, operator] # Taken from this example:
-# https://riverbankcomputing.com/pipermail/pyqt/2014-May/034315.html
+ # https://riverbankcomputing.com/pipermail/pyqt/2014-May/034315.html
# calculating width:
width = 2 * self.frameWidth()
@@ -2148,6 +2376,26 @@ def sizeHint(self) -> QtCore.QSize:
return QtCore.QSize(width, height)
+class CopyPathSection(QtWidgets.QWidget):
+ def __init__(self, path: str, parent=None):
+ super().__init__(parent)
+ self.path = path
+
+ layout = QtWidgets.QHBoxLayout(self)
+ layout.setContentsMargins(0, 0, 0, 0)
+
+ self.copy_btn = QtWidgets.QPushButton("Copy path")
+ self.copy_btn.setFixedSize(80, 24)
+ self.copy_btn.clicked.connect(self.copy_path_to_clipboard)
+
+ layout.addStretch()
+ layout.addWidget(self.copy_btn)
+
+ def copy_path_to_clipboard(self):
+ clipboard = QtWidgets.QApplication.clipboard()
+ clipboard.setText(self.path)
+
+
class FloatingButtonWidget(QtWidgets.QPushButton):
"""
Floating button inside the textbox showing any md file. Allows editing or saving the file.
@@ -2164,8 +2412,8 @@ class FloatingButtonWidget(QtWidgets.QPushButton):
def __init__(self, parent: QtWidgets.QWidget):
super().__init__(parent)
self.padding_right = 5
- self.edit_text = 'Edit'
- self.save_text = 'Save'
+ self.edit_text = "Edit"
+ self.save_text = "Save"
# Start in save mode (True), since you cannot edit the text. Clicks the edit button to switch to edit mode and
# vice versa.
@@ -2178,7 +2426,7 @@ def update_position(self) -> None:
"""
parent = self.parent()
assert isinstance(parent, QtWidgets.QWidget)
- if hasattr(parent, 'viewport'):
+ if hasattr(parent, "viewport"):
parent_rect = parent.viewport().rect()
else:
parent_rect = parent.rect()
@@ -2221,7 +2469,9 @@ def __init__(self, path: Path, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
self.path = path
- size_policy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Minimum, QtWidgets.QSizePolicy.Minimum)
+ size_policy = QtWidgets.QSizePolicy(
+ QtWidgets.QSizePolicy.Minimum, QtWidgets.QSizePolicy.Minimum
+ )
self.setSizePolicy(size_policy)
try:
@@ -2229,7 +2479,7 @@ def __init__(self, path: Path, *args: Any, **kwargs: Any):
self.file_text = file.read()
except FileNotFoundError as e:
LOGGER.error(e)
- self.file_text = 'Comment file could not load. Do not edit as this could rewrite the original comment.'
+ self.file_text = "Comment file could not load. Do not edit as this could rewrite the original comment."
self.setReadOnly(True)
self.setPlainText(self.file_text)
document = QtGui.QTextDocument(self.file_text, parent=self)
@@ -2273,6 +2523,15 @@ class TextEditWidget(TextViewWidget):
def __init__(self, path: Path, *args: Any, **kwargs: Any):
super().__init__(path, *args, **kwargs)
+
+ # Apply monospace font only for files ending with '.mono.md'
+ if path.suffixes[-2:] == [".mono", ".md"]:
+ monospace_font = QtGui.QFont(
+ "Courier New"
+ ) # or your preferred monospace font
+ monospace_font.setPointSize(10) # adjust size if needed
+ self.setFont(monospace_font)
+
self.floating_button = FloatingButtonWidget(parent=self)
self.floating_button.hide()
self.floating_button.save_activated.connect(self.save_activated)
@@ -2304,7 +2563,7 @@ def save_activated(self) -> None:
"""
self.setReadOnly(True)
try:
- with open(self.path, 'w') as file:
+ with open(self.path, "w") as file:
file.write(self.toPlainText())
except Exception as e:
# Set text how it was before
@@ -2312,7 +2571,7 @@ def save_activated(self) -> None:
# Show the error message
error_msg = QtWidgets.QMessageBox()
error_msg.setText(f"{e}")
- error_msg.setWindowTitle(f'Error trying to save markdown edit.')
+ error_msg.setWindowTitle(f"Error trying to save markdown edit.")
error_msg.exec_()
@Slot()
@@ -2335,7 +2594,7 @@ def __init__(self, parent: QtWidgets.QWidget):
super().__init__(parent)
self.paddingLeft = 5
self.paddingTop = 5
- self.save_text = 'Save'
+ self.save_text = "Save"
self.setText(self.save_text)
@@ -2345,7 +2604,7 @@ def update_position(self) -> None:
"""
parent = self.parent()
assert isinstance(parent, QtWidgets.QWidget)
- if hasattr(parent, 'viewport'):
+ if hasattr(parent, "viewport"):
parent_rect = parent.viewport().rect()
else:
parent_rect = parent.rect()
@@ -2374,6 +2633,7 @@ class TextInput(QtWidgets.QTextEdit):
:param path: The Path of the folder where the file should be saved.
"""
+
def __init__(self, path: Path, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
self.path = path
@@ -2400,31 +2660,37 @@ def create_md_file(self) -> None:
t = time.localtime()
time_str = time.strftime(TIMESTRFORMAT, t)
- dialog_text, response = QtWidgets.QInputDialog.getText(self, "Input comment name", "Name:",)
+ dialog_text, response = QtWidgets.QInputDialog.getText(
+ self,
+ "Input comment name",
+ "Name:",
+ )
if response:
- if dialog_text[-3:] != '.md':
- if dialog_text == '':
- dialog_text = time_str + '.md'
+ if dialog_text[-3:] != ".md":
+ if dialog_text == "":
+ dialog_text = time_str + ".md"
else:
- dialog_text = time_str + '_' + dialog_text + '.md'
+ dialog_text = time_str + "_" + dialog_text + ".md"
try:
comment_path = self.path.joinpath(dialog_text)
if not comment_path.is_file():
- with open(comment_path, 'w') as file:
+ with open(comment_path, "w") as file:
file.write(current_text)
- self.setText('')
+ self.setText("")
else:
error_msg = QtWidgets.QMessageBox()
- error_msg.setText(f"File: {comment_path} already exists, please select a different file name.")
- error_msg.setWindowTitle(f'Error trying to save comment.')
+ error_msg.setText(
+ f"File: {comment_path} already exists, please select a different file name."
+ )
+ error_msg.setWindowTitle(f"Error trying to save comment.")
error_msg.exec_()
except Exception as e:
# Show the error message
error_msg = QtWidgets.QMessageBox()
error_msg.setText(f"{e}")
- error_msg.setWindowTitle(f'Error trying to save comment.')
+ error_msg.setWindowTitle(f"Error trying to save comment.")
error_msg.exec_()
def resizeEvent(self, event: QtGui.QResizeEvent) -> None:
@@ -2469,6 +2735,7 @@ class ImageViewer(QtWidgets.QLabel):
:param path_file: The path of the image.
"""
+
def __init__(self, path_file: Path, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
self.path = path_file
@@ -2487,13 +2754,13 @@ def __init__(self, path_file: Path, *args: Any, **kwargs: Any):
# except Exception as e:
except FileNotFoundError as e:
- self.setText(f'Image could not be displayed')
+ self.setText(f"Image could not be displayed")
LOGGER.error(e)
self.context_menu = QtWidgets.QMenu(self)
# creating actions
- self.copy_action = QtWidgets.QAction('copy')
+ self.copy_action = QtWidgets.QAction("copy")
self.copy_action.triggered.connect(self.on_copy_action)
self.setContextMenuPolicy(QtCore.Qt.CustomContextMenu)
@@ -2531,9 +2798,11 @@ def eventFilter(self, a0: QtCore.QObject, a1: QtCore.QEvent) -> bool:
parent = self.parent()
assert isinstance(parent, QtWidgets.QWidget)
parent_size = parent.size()
- scaled_pixmap = QtGui.QPixmap.fromImage(self.image.copy(QtCore.QRect())).scaled(parent_size.width(),
- parent_size.height(),
- QtCore.Qt.KeepAspectRatio)
+ scaled_pixmap = QtGui.QPixmap.fromImage(
+ self.image.copy(QtCore.QRect())
+ ).scaled(
+ parent_size.width(), parent_size.height(), QtCore.Qt.KeepAspectRatio
+ )
# If a resizing event happen, only update the pixmap if the size of the pixmap changed.
if self.old_pixmap.size() != scaled_pixmap.size():
# Check if the new image is bigger than the original picture size. If it is don't show it.
@@ -2555,6 +2824,7 @@ class VerticalScrollArea(QtWidgets.QScrollArea):
"""
Custom QScrollArea. Allows for only vertical scroll instead of vertical and horizontal.
"""
+
def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
self.first_scroll = False
@@ -2592,16 +2862,18 @@ class TagLabel(QtWidgets.QWidget):
:param tree_item: Indicates if this widget is used on the right side of the app or in the treeWidget.
"""
- def __init__(self, tags: List[str], tree_item: bool = False, *args: Any, **kwargs: Any):
+ def __init__(
+ self, tags: List[str], tree_item: bool = False, *args: Any, **kwargs: Any
+ ):
super().__init__(*args, **kwargs)
self.tags = tags
self.html_tags: List[str] = []
self.tree_item = tree_item
- self.tags_str = ''
+ self.tags_str = ""
if not tags:
- self.tags_str = 'No labels present.'
+ self.tags_str = "No labels present."
else:
self.generate_tag_string()
@@ -2612,7 +2884,7 @@ def __init__(self, tags: List[str], tree_item: bool = False, *args: Any, **kwarg
if not self.tree_item:
self.tags_label.setWordWrap(True)
- self.header_label = QtWidgets.QLabel('This is tagged by:', parent=self)
+ self.header_label = QtWidgets.QLabel("This is tagged by:", parent=self)
self.layout_.addWidget(self.header_label)
self.tags_label.setIndent(30)
@@ -2620,7 +2892,9 @@ def __init__(self, tags: List[str], tree_item: bool = False, *args: Any, **kwarg
self.setLayout(self.layout_)
- size_policy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Minimum, QtWidgets.QSizePolicy.Minimum)
+ size_policy = QtWidgets.QSizePolicy(
+ QtWidgets.QSizePolicy.Minimum, QtWidgets.QSizePolicy.Minimum
+ )
self.setSizePolicy(size_policy)
def add_tag(self, tag: str) -> None:
@@ -2651,20 +2925,20 @@ def generate_tag_string(self) -> None:
"""
Converts the list of tags into the html formatted string.
"""
- self.tags_str = ''
+ self.tags_str = ""
self.html_tags = []
color_generator = html_color_generator()
# Add every tag followed by a coma, except the last item.
for i in range(len(self.tags) - 1):
- html_str = f'{self.tags[i]}, '
+ html_str = f"{self.tags[i]}, "
self.html_tags.append(html_str)
# Last item is followed by a dot instead of a coma.
- html_str = f'{self.tags[-1]}.'
+ html_str = f"{self.tags[-1]}."
self.html_tags.append(html_str)
- self.tags_str = ''.join(self.html_tags)
+ self.tags_str = "".join(self.html_tags)
class ItemTagLabel(QtWidgets.QLabel):
@@ -2709,7 +2983,7 @@ def generate_tag_string(self) -> None:
"""
Converts the list of tags into the html formatted string.
"""
- self.tags_str = ''
+ self.tags_str = ""
self.html_tags = []
if self.tags:
@@ -2717,14 +2991,16 @@ def generate_tag_string(self) -> None:
# Add every tag followed by a coma, except the last item.
for i in range(len(self.tags) - 1):
- html_str = f'{self.tags[i]}, '
+ html_str = (
+ f"{self.tags[i]}, "
+ )
self.html_tags.append(html_str)
# Last item is followed by a dot instead of a coma.
- html_str = f'{self.tags[-1]}.'
+ html_str = f"{self.tags[-1]}."
self.html_tags.append(html_str)
- self.tags_str = ''.join(self.html_tags)
+ self.tags_str = "".join(self.html_tags)
class TagCreator(QtWidgets.QLineEdit):
@@ -2738,7 +3014,7 @@ def __init__(self, current_folder_path: Path, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
self.current_folder_path = current_folder_path
- self.setPlaceholderText('Create new tags')
+ self.setPlaceholderText("Create new tags")
self.returnPressed.connect(self.create_new_tags)
@@ -2750,21 +3026,27 @@ def create_new_tags(self) -> None:
"""
text = self.text()
- raw_text = text.split(',')
- text_with_empty_spaces = [item[1:] if len(item) >= 1 and item[0] == " " else item for item in raw_text]
- new_tags = [item for item in text_with_empty_spaces if item != '' and item != ' ']
+ raw_text = text.split(",")
+ text_with_empty_spaces = [
+ item[1:] if len(item) >= 1 and item[0] == " " else item for item in raw_text
+ ]
+ new_tags = [
+ item for item in text_with_empty_spaces if item != "" and item != " "
+ ]
for tag in new_tags:
- tag_path = self.current_folder_path.joinpath(f'{tag}.tag')
+ tag_path = self.current_folder_path.joinpath(f"{tag}.tag")
if not tag_path.exists():
- f = open(tag_path, 'x')
+ f = open(tag_path, "x")
- self.setText('')
+ self.setText("")
class IconLabel(QtWidgets.QLabel):
- def __init__(self, movie: QtGui.QMovie, size: Optional[int] = None, *args: Any, **kwargs: Any):
+ def __init__(
+ self, movie: QtGui.QMovie, size: Optional[int] = None, *args: Any, **kwargs: Any
+ ):
"""
Label used to display loading animations.
@@ -2804,8 +3086,9 @@ def run(self, item: Item, only_data_files: bool = False) -> None:
if data is not None:
self.finished.emit(data)
- def gather_all_right_side_window_data(self, item: Item, only_data_files: bool = False) -> \
- Optional[dict]:
+ def gather_all_right_side_window_data(
+ self, item: Item, only_data_files: bool = False
+ ) -> Optional[dict]:
"""
Method used to create a dictionary with all the necessary information (file names, paths, etc.)
of an item of the model to create the right side window. This function will also go through all the children
@@ -2820,13 +3103,13 @@ def gather_all_right_side_window_data(self, item: Item, only_data_files: bool =
'data': [DataDict]},
'extra_files': [(Path, str, ContentType)]}
"""
- data = {'tag_labels': [],
- 'data_files': {'paths': [],
- 'names': [],
- 'data': []},
- 'extra_files': []}
+ data = {
+ "tag_labels": [],
+ "data_files": {"paths": [], "names": [], "data": []},
+ "extra_files": [],
+ }
- data_ret = self._fill_dict(data, item.files, '', only_data_files)
+ data_ret = self._fill_dict(data, item.files, "", only_data_files)
if data_ret is None:
return None
data = data_ret
@@ -2843,11 +3126,18 @@ def gather_all_right_side_window_data(self, item: Item, only_data_files: bool =
data = data_ret
# Sort the files so that they appear in reverse alphabetical order.
- data['extra_files'] = sorted(data['extra_files'], key=lambda x: str.lower(x[1]), reverse=True)
+ data["extra_files"] = sorted(
+ data["extra_files"], key=lambda x: str.lower(x[1]), reverse=True
+ )
return data
- def _fill_dict(self, data_in: Optional[dict], files_dict: Dict[Path, ContentType], prefix_text: str,
- only_data_files: bool = False) -> Optional[dict]:
+ def _fill_dict(
+ self,
+ data_in: Optional[dict],
+ files_dict: Dict[Path, ContentType],
+ prefix_text: str,
+ only_data_files: bool = False,
+ ) -> Optional[dict]:
"""
Helper method for gather_all_right_side_window_data. Fills in the data dictionary with the files inside of
files_dict and adds prefix text to all tittles.
@@ -2866,23 +3156,35 @@ def _fill_dict(self, data_in: Optional[dict], files_dict: Dict[Path, ContentType
# There might be an error with the ddh5 trying to be loaded.
try:
data_dict = datadict_from_hdf5(str(file), structure_only=True)
- data_in['data_files']['data'].append(data_dict)
- data_in['data_files']['paths'].append(file)
- data_in['data_files']['names'].append(prefix_text + str(file.stem))
+ data_in["data_files"]["data"].append(data_dict)
+ data_in["data_files"]["paths"].append(file)
+ data_in["data_files"]["names"].append(prefix_text + str(file.stem))
except Exception as e:
- LOGGER.error(f'Failed to load the data file: {file} \n {e}')
+ LOGGER.error(f"Failed to load the data file: {file} \n {e}")
if not only_data_files:
if file_type == ContentType.tag:
- data_in['tag_labels'].append(prefix_text + str(file.stem))
- elif file_type in [ContentType.json, ContentType.md, ContentType.py, ContentType.image]:
+ data_in["tag_labels"].append(prefix_text + str(file.stem))
+ elif file_type in [
+ ContentType.json,
+ ContentType.md,
+ ContentType.py,
+ ContentType.image,
+ ]:
# Check if the files exist.
if file.is_file():
- data_in['extra_files'].append((file, prefix_text + str(file.name), file_type))
+ data_in["extra_files"].append(
+ (file, prefix_text + str(file.name), file_type)
+ )
return data_in
- def _check_children_data(self, child_item: Item, data_in: Optional[dict], deepness: int,
- only_data_files: bool = False) -> Optional[dict]:
+ def _check_children_data(
+ self,
+ child_item: Item,
+ data_in: Optional[dict],
+ deepness: int,
+ only_data_files: bool = False,
+ ) -> Optional[dict]:
"""
Helper function for gather_all_right_side_window_data. Fills the data_in dictionary with the files of
child_item and all of its children. Returns the filled dictionary with the information of child_item and all
@@ -2896,27 +3198,32 @@ def _check_children_data(self, child_item: Item, data_in: Optional[dict], deepne
"""
child_path = child_item.path
- prefix_text = ''
+ prefix_text = ""
# Make the prefix text. Should be all the parent folders until the original parent item.
for i in range(deepness):
- prefix_text = child_path.parts[-i - 1] + '/' + prefix_text
+ prefix_text = child_path.parts[-i - 1] + "/" + prefix_text
- data_in = self._fill_dict(data_in, child_item.files, prefix_text, only_data_files)
+ data_in = self._fill_dict(
+ data_in, child_item.files, prefix_text, only_data_files
+ )
for i in range(child_item.rowCount()):
if self.thread().isInterruptionRequested():
return None
child = child_item.child(i, 0)
assert isinstance(child, Item)
- data_in = self._check_children_data(child, data_in, deepness + 1, only_data_files)
+ data_in = self._check_children_data(
+ child, data_in, deepness + 1, only_data_files
+ )
return data_in
# TODO: Instead of saving the currently selected folder, save the currently and previously selected item.
class Monitr(QtWidgets.QMainWindow):
- def __init__(self, monitorPath: str = '.',
- parent: Optional[QtWidgets.QMainWindow] = None):
+ def __init__(
+ self, monitorPath: str = ".", parent: Optional[QtWidgets.QMainWindow] = None
+ ):
super().__init__(parent=parent)
# Instantiating variables.
@@ -2924,9 +3231,11 @@ def __init__(self, monitorPath: str = '.',
self.current_selected_folder = Path()
self.previous_selected_folder = Path()
self.collapsed_state_dictionary: Dict[Path, bool] = {}
- self.setWindowTitle('Monitr')
+ self.setWindowTitle("Monitr")
- self.app_manager = AppManager() # Currently Ids only increase with every new app.
+ self.app_manager = (
+ AppManager()
+ ) # Currently Ids only increase with every new app.
self.current_app_id = 0
self.model = FileModel(self.monitor_path, 0, 2)
@@ -2943,10 +3252,13 @@ def __init__(self, monitorPath: str = '.',
menu_bar = self.menuBar()
menu = menu_bar.addMenu("Backend")
self.backend_group = QtWidgets.QActionGroup(menu)
- for backend, plotWidgetClass in [("matplotlib", MPLAutoPlot), ("pyqtgraph", PGAutoPlot)]:
+ for backend, plotWidgetClass in [
+ ("matplotlib", MPLAutoPlot),
+ ("pyqtgraph", PGAutoPlot),
+ ]:
action = QtWidgets.QAction(backend)
action.setCheckable(True)
- action.setChecked(getcfg('main', 'default-plotwidget') == plotWidgetClass)
+ action.setChecked(getcfg("main", "default-plotwidget") == plotWidgetClass)
self.backend_group.addAction(action)
menu.addAction(action)
@@ -2955,17 +3267,22 @@ def __init__(self, monitorPath: str = '.',
self.left_side_dummy_widget = QtWidgets.QWidget()
self.left_side_dummy_widget.setLayout(self.left_side_layout)
- left_side_dummy_size_policy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Preferred,
- QtWidgets.QSizePolicy.Preferred)
+ left_side_dummy_size_policy = QtWidgets.QSizePolicy(
+ QtWidgets.QSizePolicy.Preferred, QtWidgets.QSizePolicy.Preferred
+ )
left_side_dummy_size_policy.setHorizontalStretch(1)
left_side_dummy_size_policy.setVerticalStretch(0)
self.left_side_dummy_widget.setSizePolicy(left_side_dummy_size_policy)
# Load left side layout
- self.file_explorer = FileExplorer(proxy_model=self.proxy_model, parent=self.left_side_dummy_widget)
+ self.file_explorer = FileExplorer(
+ proxy_model=self.proxy_model, parent=self.left_side_dummy_widget
+ )
self.left_side_layout.addWidget(self.file_explorer)
- self.file_explorer.file_tree.selection_changed.connect(self.on_current_item_selection_changed)
+ self.file_explorer.file_tree.selection_changed.connect(
+ self.on_current_item_selection_changed
+ )
# Right side items
self.right_side_dummy_widget = QtWidgets.QWidget()
@@ -2973,6 +3290,7 @@ def __init__(self, monitorPath: str = '.',
self.right_side_dummy_widget.setLayout(self.right_side_layout)
self.data_window: Optional[Collapsible] = None
+ self.copy_path_widget = None
self.text_input: Optional[Collapsible] = None
self.file_windows: List[Collapsible] = []
self.scroll_area: Optional[VerticalScrollArea] = None
@@ -2981,7 +3299,9 @@ def __init__(self, monitorPath: str = '.',
self.invalid_data_label: Optional[QtWidgets.QLabel] = None
self.header_label: Optional[QtWidgets.QLabel] = None
self.loading_label: Optional[IconLabel] = None
- self.loading_movie = QtGui.QMovie(os.path.join(plottrPath, 'resource', 'gfx', "loading_gif.gif"))
+ self.loading_movie = QtGui.QMovie(
+ os.path.join(plottrPath, "resource", "gfx", "loading_gif.gif")
+ )
self.last_data_window_update_time = time.time()
# Sets the minimum time between updates of the right data_window.
@@ -3005,7 +3325,6 @@ def __init__(self, monitorPath: str = '.',
# self.extra_action_button.clicked.connect(self.extra_action)
# self.left_side_layout.addLayout(self.debug_layout)
-
self.main_partition_splitter.addWidget(self.left_side_dummy_widget)
# Threading stuff
@@ -3027,14 +3346,16 @@ def create_inner_dictionary(item: Item) -> Dict:
child_dictionary = create_inner_dictionary(child)
step_dictionary[child.path.name] = child_dictionary
- step_dictionary['files'] = item.files
- step_dictionary['star'] = item.star
- step_dictionary['trash'] = item.trash
+ step_dictionary["files"] = item.files
+ step_dictionary["star"] = item.star
+ step_dictionary["trash"] = item.trash
return step_dictionary
- print('==================================================================================')
+ print(
+ "=================================================================================="
+ )
n_rows = self.model.rowCount()
- print(f'The model has {n_rows} rows')
+ print(f"The model has {n_rows} rows")
printable_dict = {}
for i in range(n_rows):
@@ -3043,18 +3364,21 @@ def create_inner_dictionary(item: Item) -> Dict:
item_dictionary = create_inner_dictionary(main_item)
assert isinstance(main_item.path, Path)
printable_dict[main_item.path.name] = item_dictionary
- print(f'here comes the dictionary')
+ print(f"here comes the dictionary")
pprint.pprint(printable_dict)
-
def print_model_main_dictionary(self) -> None:
"""
Debug function. Prints the main dictionary.
"""
- print('---------------------------------------------------------------------------------')
- print(f'Here comes the model main dictionary')
+ print(
+ "---------------------------------------------------------------------------------"
+ )
+ print(f"Here comes the model main dictionary")
pprint.pprint(self.model.main_dictionary)
- print(f'the length of the items in the main dictionary is: {len(self.model.main_dictionary)}')
+ print(
+ f"the length of the items in the main dictionary is: {len(self.model.main_dictionary)}"
+ )
def extra_action(self) -> None:
"""
@@ -3064,7 +3388,9 @@ def extra_action(self) -> None:
# print(f'\n \n \n \n \n \n \n \n \n \n \n \n .')
@Slot(QtCore.QModelIndex, QtCore.QModelIndex)
- def on_current_item_selection_changed(self, current: QtCore.QModelIndex, previous: QtCore.QModelIndex) -> None:
+ def on_current_item_selection_changed(
+ self, current: QtCore.QModelIndex, previous: QtCore.QModelIndex
+ ) -> None:
"""
Gets called everytime the selected item gets changed. Converts the model index from the proxy sorting model,
into an index from self.model and gets the current and previous item. Triggers the right side window creation.
@@ -3092,7 +3418,9 @@ def on_current_item_selection_changed(self, current: QtCore.QModelIndex, previou
if current_item != previous_item:
assert isinstance(current_item, Item)
self.current_selected_folder = current_item.path
- self.model.update_currently_selected_folder(self.current_selected_folder)
+ self.model.update_currently_selected_folder(
+ self.current_selected_folder
+ )
# The first time the user clicks on a folder, the previous item is None.
assert isinstance(previous_item, Item)
self.previous_selected_folder = previous_item.path
@@ -3130,7 +3458,10 @@ def generate_right_side_window(self) -> None:
self.loader_thread = QtCore.QThread(self)
self.loader_worker = LoaderWorker()
self.loader_worker.moveToThread(self.loader_thread)
- run_fun = partial(self.loader_worker.run, self.model.main_dictionary[self.current_selected_folder])
+ run_fun = partial(
+ self.loader_worker.run,
+ self.model.main_dictionary[self.current_selected_folder],
+ )
self.loader_thread.started.connect(run_fun)
self.loader_worker.finished.connect(self.populate_right_side_window)
self.loader_thread.start()
@@ -3162,10 +3493,11 @@ def populate_right_side_window(self, files_meta: dict) -> None:
self.loader_thread = None
self.add_folder_header()
- self.add_tag_label(files_meta['tag_labels'])
- self.add_data_window(files_meta['data_files'])
+ self.add_tag_label(files_meta["tag_labels"])
+ self.add_data_window(files_meta["data_files"])
+ self.add_copy_path_buttons()
self.add_text_input(self.current_selected_folder)
- self.add_all_files(files_meta['extra_files'])
+ self.add_all_files(files_meta["extra_files"])
# Sets the stretch factor so when the main window expands, the files get the extra real-state instead
# of the file tree
@@ -3188,7 +3520,9 @@ def clear_right_layout(self) -> None:
if self.previous_selected_folder in self.model.main_dictionary:
bar = self.scroll_area.verticalScrollBar()
if bar is not None:
- previous_item = self.model.main_dictionary[self.previous_selected_folder]
+ previous_item = self.model.main_dictionary[
+ self.previous_selected_folder
+ ]
previous_item.scroll_height = bar.value()
if self.header_label is not None:
@@ -3216,6 +3550,11 @@ def clear_right_layout(self) -> None:
self.invalid_data_label.deleteLater()
self.invalid_data_label = None
+ if self.copy_path_widget is not None:
+ self.right_side_layout.removeWidget(self.copy_path_widget)
+ self.copy_path_widget.deleteLater()
+ self.copy_path_widget = None
+
if self.text_input is not None:
self.right_side_layout.removeWidget(self.text_input)
self.text_input.deleteLater()
@@ -3223,8 +3562,11 @@ def clear_right_layout(self) -> None:
if len(self.file_windows) >= 1:
# Save the collapsed state before deleting them.
- current_collapsed_state = {window.widget.path: window.btn.isChecked() for window in self.file_windows if
- hasattr(window.widget, 'path')}
+ current_collapsed_state = {
+ window.widget.path: window.btn.isChecked()
+ for window in self.file_windows
+ if hasattr(window.widget, "path")
+ }
self.collapsed_state_dictionary.update(current_collapsed_state)
@@ -3277,17 +3619,23 @@ def add_data_window(self, data_files: Dict) -> None:
'values': array([]) ...
"""
# Checks that there is data to display, if not just create a Qlabel indicating that there is no valid data.
- if len(data_files['data']) < 1:
- self.invalid_data_label = QtWidgets.QLabel(f'No data to display.')
- self.right_side_layout.addWidget(self.invalid_data_label)
- return
+ if len(data_files["data"]) < 1:
+ self.invalid_data_label = QtWidgets.QLabel(f"No data to display.")
+ self.right_side_layout.addWidget(self.invalid_data_label)
+ return
- self.data_window = Collapsible(DataTreeWidget(data_files['paths'], data_files['names'], data_files['data']),
- 'Data Display')
+ self.data_window = Collapsible(
+ DataTreeWidget(
+ data_files["paths"], data_files["names"], data_files["data"]
+ ),
+ "Data Display",
+ )
assert isinstance(self.data_window.widget, DataTreeWidget)
self.data_window.widget.plot_requested.connect(self.on_plot_data)
- size_policy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Expanding)
+ size_policy = QtWidgets.QSizePolicy(
+ QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Expanding
+ )
self.data_window.setSizePolicy(size_policy)
self.right_side_layout.addWidget(self.data_window)
@@ -3305,7 +3653,14 @@ def on_plot_data(self, path: Path) -> None:
backend = "default"
else:
backend = self.backend_group.checkedAction().text()
- self.app_manager.launchApp(self.current_app_id, AUTOPLOTMODULE, AUTOPLOTFUNC, str(path), 'data', backend)
+ self.app_manager.launchApp(
+ self.current_app_id,
+ AUTOPLOTMODULE,
+ AUTOPLOTFUNC,
+ str(path),
+ "data",
+ backend,
+ )
self.current_app_id += 1
def add_text_input(self, path: Path) -> None:
@@ -3314,9 +3669,60 @@ def add_text_input(self, path: Path) -> None:
:param path: The path of the folder being selected
"""
- self.text_input = Collapsible(TextInput(path), title='Add Comment:')
+ self.text_input = Collapsible(TextInput(path), title="Add Comment:")
self.right_side_layout.addWidget(self.text_input)
+ def add_copy_path_buttons(self):
+ self.copy_path_widget = QtWidgets.QWidget()
+ layout = QtWidgets.QHBoxLayout(self.copy_path_widget)
+ layout.setContentsMargins(0, 0, 0, 0)
+
+ path_file = self.current_selected_folder / "paths.md"
+ if path_file.exists():
+ with open(path_file, "r", encoding="utf-8") as f:
+ paths = [line.strip() for line in f if line.strip()]
+ else:
+ paths = [str(self.current_selected_folder)]
+
+ local_paths = [p for p in paths if p.startswith("C")]
+ remote_paths = [p for p in paths if not p.startswith("C")]
+
+ for path in local_paths:
+ btn = QtWidgets.QPushButton("Copy local db path")
+ btn.clicked.connect(
+ lambda _, p=path: QtWidgets.QApplication.clipboard().setText(p)
+ )
+ layout.addWidget(btn)
+
+ for path in remote_paths:
+ btn = QtWidgets.QPushButton("Copy remote db path")
+ btn.clicked.connect(
+ lambda _, p=path: QtWidgets.QApplication.clipboard().setText(p)
+ )
+ layout.addWidget(btn)
+
+ layout.addStretch()
+ self.right_side_layout.addWidget(self.copy_path_widget)
+
+ @staticmethod
+ def _sort_right_window_files(x: Tuple[Path, str, ContentType]) -> Tuple[int, str]:
+ file_name, file_type = x[1], x[2]
+ # 1. Images
+ if file_type == ContentType.image:
+ return (0, file_name)
+ # 2. directory path
+ elif file_name in ["paths.md", "directry_path.md"]:
+ return (1, file_name)
+ # 3. Param dict
+ elif file_name in ["qpu_old.json", "qpu_new.json", "param_dict.json"]:
+ return (2, file_name)
+ # Last - python scripts
+ elif file_type == ContentType.py:
+ return (4, file_name)
+ # Everything else in between
+ else:
+ return (3, file_name)
+
def add_all_files(self, files_data: List[Tuple[Path, str, ContentType]]) -> None:
"""
Adds all other md, json or images files on the right side of the screen.
@@ -3324,14 +3730,21 @@ def add_all_files(self, files_data: List[Tuple[Path, str, ContentType]]) -> None
:param file_dict: List containing 3 items Tuples. The first item should always be the Path of the file.
The second item should be the name of the file. The third item should be the ContentType of it.
"""
+ # Sort files before displaying content
+ files_data.sort(key=self._sort_right_window_files)
+
for file, name, file_type in files_data:
if file_type == ContentType.json:
expand = False
if file in self.collapsed_state_dictionary:
expand = self.collapsed_state_dictionary[file]
- json_view = Collapsible(widget=JsonTreeView(path=file), title=name, expanding=expand,
- icon=get_json_icon())
+ json_view = Collapsible(
+ widget=JsonTreeView(path=file),
+ title=name,
+ expanding=expand,
+ icon=get_json_icon(),
+ )
json_view.widget.setVisible(expand)
json_view.btn.setChecked(expand)
if expand:
@@ -3356,8 +3769,12 @@ def add_all_files(self, files_data: List[Tuple[Path, str, ContentType]]) -> None
expand = True
if file in self.collapsed_state_dictionary:
expand = self.collapsed_state_dictionary[file]
- plain_text_edit = Collapsible(widget=TextEditWidget(path=file),
- title=name, expanding=expand, icon=get_md_icon())
+ plain_text_edit = Collapsible(
+ widget=TextEditWidget(path=file),
+ title=name,
+ expanding=expand,
+ icon=get_md_icon(),
+ )
plain_text_edit.widget.setVisible(expand)
plain_text_edit.btn.setChecked(expand)
@@ -3373,8 +3790,12 @@ def add_all_files(self, files_data: List[Tuple[Path, str, ContentType]]) -> None
expand = True
if file in self.collapsed_state_dictionary:
expand = self.collapsed_state_dictionary[file]
- plain_text_edit = Collapsible(widget=TextViewWidget(path=file),
- title=name, expanding=expand, icon=get_md_icon())
+ plain_text_edit = Collapsible(
+ widget=TextViewWidget(path=file),
+ title=name,
+ expanding=expand,
+ icon=get_md_icon(),
+ )
plain_text_edit.widget.setVisible(expand)
plain_text_edit.btn.setChecked(expand)
@@ -3390,8 +3811,12 @@ def add_all_files(self, files_data: List[Tuple[Path, str, ContentType]]) -> None
expand = True
if file in self.collapsed_state_dictionary:
expand = self.collapsed_state_dictionary[file]
- label = Collapsible(ImageViewer(file, parent=self.right_side_dummy_widget),
- title=name, expanding=expand, icon=get_img_icon())
+ label = Collapsible(
+ ImageViewer(file, parent=self.right_side_dummy_widget),
+ title=name,
+ expanding=expand,
+ icon=get_img_icon(),
+ )
label.widget.setVisible(expand)
label.btn.setChecked(expand)
if expand:
@@ -3426,17 +3851,25 @@ def on_update_data_widget(self, path: Path) -> None:
:param path: The path of the data file that should be updated.
"""
current_time = time.time()
- if current_time - self.last_data_window_update_time > self.data_widget_update_buffer:
- if _is_relative_to(path, self.current_selected_folder) and path.parent in self.model.main_dictionary:
+ if (
+ current_time - self.last_data_window_update_time
+ > self.data_widget_update_buffer
+ ):
+ if (
+ _is_relative_to(path, self.current_selected_folder)
+ and path.parent in self.model.main_dictionary
+ ):
# Always gather the data for the currently selected folder, since a child item might need the update
# but the currently selected item with all of its childs should be shown.
item = self.model.main_dictionary[self.current_selected_folder]
loader_worker = LoaderWorker()
data_dicts = loader_worker.gather_all_right_side_window_data(item, True)
assert data_dicts is not None
- data_window_widget = DataTreeWidget(data_dicts['data_files']['paths'],
- data_dicts['data_files']['names'],
- data_dicts['data_files']['data'])
+ data_window_widget = DataTreeWidget(
+ data_dicts["data_files"]["paths"],
+ data_dicts["data_files"]["names"],
+ data_dicts["data_files"]["data"],
+ )
if self.data_window is not None:
self.data_window.restart_widget(data_window_widget)
data_window_widget.plot_requested.connect(self.on_plot_data)
@@ -3445,7 +3878,10 @@ def on_update_data_widget(self, path: Path) -> None:
if not self.active_timer:
self.data_file_need_update = path
self.active_timer = True
- QtCore.QTimer.singleShot(round(self.data_widget_update_buffer * 1e3), self.on_data_window_timer)
+ QtCore.QTimer.singleShot(
+ round(self.data_widget_update_buffer * 1e3),
+ self.on_data_window_timer,
+ )
@Slot()
def on_data_window_timer(self) -> None:
@@ -3466,16 +3902,21 @@ def closeEvent(self, a0: QtGui.QCloseEvent) -> None:
def script() -> int:
- parser = argparse.ArgumentParser(description='Monitr main application')
+ parser = argparse.ArgumentParser(description="Monitr main application")
parser.add_argument("path", help="path to monitor for data", default=None)
- parser.add_argument("-r", "--refresh_interval", default=2, type=float,
- help="interval at which to look for changes in the "
- "monitored path (in seconds)")
+ parser.add_argument(
+ "-r",
+ "--refresh_interval",
+ default=2,
+ type=float,
+ help="interval at which to look for changes in the "
+ "monitored path (in seconds)",
+ )
args = parser.parse_args()
path = os.path.abspath(args.path)
if not (os.path.exists(path) and os.path.isdir(path)):
- print('Invalid path.')
+ print("Invalid path.")
sys.exit()
app = QtWidgets.QApplication([])