From d8b9cda02d1a8ce6975703345f7cfd809cc817a7 Mon Sep 17 00:00:00 2001 From: MasloMaslane Date: Sat, 8 Mar 2025 14:01:27 +0100 Subject: [PATCH 1/7] Add workflow parsing --- src/sio3pack/graph/__init__.py | 3 - src/sio3pack/graph/graph.py | 17 -- src/sio3pack/graph/node.py | 7 - src/sio3pack/packages/package/model.py | 8 +- src/sio3pack/packages/sinolpack/model.py | 22 +-- src/sio3pack/visualizer/__init__.py | 2 +- src/sio3pack/workflow/__init__.py | 3 + src/sio3pack/workflow/execution/__init__.py | 0 .../workflow/execution/filesystems.py | 157 +++++++++++++++++ .../workflow/execution/mount_namespace.py | 104 ++++++++++++ src/sio3pack/workflow/execution/pipe.py | 29 ++++ src/sio3pack/workflow/execution/process.py | 44 +++++ .../workflow/execution/resource_group.py | 93 ++++++++++ src/sio3pack/workflow/object.py | 51 ++++++ src/sio3pack/workflow/tasks.py | 160 ++++++++++++++++++ src/sio3pack/workflow/workflow.py | 81 +++++++++ .../workflow_manager.py} | 10 +- .../graph_op.py => workflow/workflow_op.py} | 10 +- 18 files changed, 748 insertions(+), 53 deletions(-) delete mode 100644 src/sio3pack/graph/__init__.py delete mode 100644 src/sio3pack/graph/graph.py delete mode 100644 src/sio3pack/graph/node.py create mode 100644 src/sio3pack/workflow/__init__.py create mode 100644 src/sio3pack/workflow/execution/__init__.py create mode 100644 src/sio3pack/workflow/execution/filesystems.py create mode 100644 src/sio3pack/workflow/execution/mount_namespace.py create mode 100644 src/sio3pack/workflow/execution/pipe.py create mode 100644 src/sio3pack/workflow/execution/process.py create mode 100644 src/sio3pack/workflow/execution/resource_group.py create mode 100644 src/sio3pack/workflow/object.py create mode 100644 src/sio3pack/workflow/tasks.py create mode 100644 src/sio3pack/workflow/workflow.py rename src/sio3pack/{graph/graph_manager.py => workflow/workflow_manager.py} (71%) rename src/sio3pack/{graph/graph_op.py => workflow/workflow_op.py} (63%) diff --git a/src/sio3pack/graph/__init__.py b/src/sio3pack/graph/__init__.py deleted file mode 100644 index 79efe46..0000000 --- a/src/sio3pack/graph/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from sio3pack.graph.graph import Graph -from sio3pack.graph.graph_manager import GraphManager -from sio3pack.graph.graph_op import GraphOperation diff --git a/src/sio3pack/graph/graph.py b/src/sio3pack/graph/graph.py deleted file mode 100644 index 370cbb8..0000000 --- a/src/sio3pack/graph/graph.py +++ /dev/null @@ -1,17 +0,0 @@ -class Graph: - """ - A class to represent a job graph. - """ - - @classmethod - def from_dict(cls, data: dict): - raise NotImplementedError() - - def __init__(self, name: str): - self.name = name - - def get_prog_files(self) -> list[str]: - """ - Get all program files in the graph. - """ - raise NotImplementedError() diff --git a/src/sio3pack/graph/node.py b/src/sio3pack/graph/node.py deleted file mode 100644 index 51fc2ef..0000000 --- a/src/sio3pack/graph/node.py +++ /dev/null @@ -1,7 +0,0 @@ -class Node: - """ - A class to represent a node in a graph. - """ - - def __init__(self, name: str): - self.name = name diff --git a/src/sio3pack/packages/package/model.py b/src/sio3pack/packages/package/model.py index cd78727..318c4d5 100644 --- a/src/sio3pack/packages/package/model.py +++ b/src/sio3pack/packages/package/model.py @@ -3,7 +3,7 @@ from sio3pack import LocalFile from sio3pack.files import File -from sio3pack.graph import Graph, GraphOperation +from sio3pack.workflow import Workflow, WorkflowOperation from sio3pack.packages.exceptions import UnknownPackageType from sio3pack.packages.package.handler import NoDjangoHandler from sio3pack.test import Test @@ -135,15 +135,15 @@ def get_test(self, test_id: str) -> Test: pass @wrap_exceptions - def get_unpack_graph(self) -> GraphOperation | None: + def get_unpack_graph(self) -> WorkflowOperation | None: pass @wrap_exceptions - def get_run_graph(self, file: File, tests: list[Test] | None = None) -> GraphOperation | None: + def get_run_graph(self, file: File, tests: list[Test] | None = None) -> WorkflowOperation | None: pass @wrap_exceptions - def get_save_outs_graph(self, tests: list[Test] | None = None) -> GraphOperation | None: + def get_save_outs_graph(self, tests: list[Test] | None = None) -> WorkflowOperation | None: pass @wrap_exceptions diff --git a/src/sio3pack/packages/sinolpack/model.py b/src/sio3pack/packages/sinolpack/model.py index 9920377..86e0cc4 100644 --- a/src/sio3pack/packages/sinolpack/model.py +++ b/src/sio3pack/packages/sinolpack/model.py @@ -5,7 +5,7 @@ import yaml from sio3pack.files import File, LocalFile -from sio3pack.graph import Graph, GraphManager, GraphOperation +from sio3pack.workflow import Workflow, WorkflowManager, WorkflowOperation from sio3pack.packages.exceptions import ImproperlyConfigured from sio3pack.packages.package import Package from sio3pack.packages.sinolpack.enums import ModelSolutionKind @@ -101,8 +101,8 @@ def _from_file(self, file: LocalFile, django_settings=None): self.rootdir = file.path try: - graph_file = self.get_in_root("graph.json") - self.graph_manager = GraphManager.from_file(graph_file) + graph_file = self.get_in_root("workflow.json") + self.graph_manager = WorkflowManager.from_file(graph_file) except FileNotFoundError: self.has_custom_graph = False @@ -116,10 +116,10 @@ def _from_db(self, problem_id: int): if not self.django_enabled: raise ImproperlyConfigured("sio3pack is not installed with Django support.") - def _default_graph_manager(self) -> GraphManager: - return GraphManager( + def _default_graph_manager(self) -> WorkflowManager: + return WorkflowManager( { - "unpack": Graph.from_dict( + "unpack": Workflow.from_dict( { "name": "unpack", # ... @@ -178,7 +178,7 @@ def _process_package(self): self._process_attachments() if not self.has_custom_graph: - # Create the graph with processed files. + # Create the workflow with processed files. # TODO: Uncomment this line when Graph will work. # self.graph_manager = self._default_graph_manager() pass @@ -283,8 +283,8 @@ def sort_key(model_solution): def _process_prog_files(self): """ Process all files in the problem's program directory that are used. - Saves all models solution files. If the problem has a custom graph file, - takes the files that are used in the graph. Otherwise, ingen, inwer and + Saves all models solution files. If the problem has a custom workflow file, + takes the files that are used in the workflow. Otherwise, ingen, inwer and files in `extra_compilation_files` and `extra_execution_files` from config are saved. """ @@ -374,9 +374,9 @@ def _process_attachments(self): if os.path.isfile(os.path.join(attachments_dir, attachment)) ] - def get_unpack_graph(self) -> GraphOperation | None: + def get_unpack_graph(self) -> WorkflowOperation | None: try: - return GraphOperation( + return WorkflowOperation( self.graph_manager.get("unpack"), True, self._unpack_return_data, diff --git a/src/sio3pack/visualizer/__init__.py b/src/sio3pack/visualizer/__init__.py index 8d44c80..f138d1c 100644 --- a/src/sio3pack/visualizer/__init__.py +++ b/src/sio3pack/visualizer/__init__.py @@ -15,7 +15,7 @@ def main(): if len(sys.argv) != 2: - print("Usage: python -m sio3pack.visualizer ") + print("Usage: python -m sio3pack.visualizer ") sys.exit(1) file_path = sys.argv[1] if not file_path.endswith(".json"): diff --git a/src/sio3pack/workflow/__init__.py b/src/sio3pack/workflow/__init__.py new file mode 100644 index 0000000..518f717 --- /dev/null +++ b/src/sio3pack/workflow/__init__.py @@ -0,0 +1,3 @@ +from sio3pack.workflow.workflow import Workflow +from sio3pack.workflow.workflow_manager import WorkflowManager +from sio3pack.workflow.workflow_op import WorkflowOperation diff --git a/src/sio3pack/workflow/execution/__init__.py b/src/sio3pack/workflow/execution/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/sio3pack/workflow/execution/filesystems.py b/src/sio3pack/workflow/execution/filesystems.py new file mode 100644 index 0000000..a4ea916 --- /dev/null +++ b/src/sio3pack/workflow/execution/filesystems.py @@ -0,0 +1,157 @@ +from sio3pack.workflow.object import Object + + +class Filesystem: + def __init__(self, id: int): + """ + Represent a filesystem. + :param id: The id of the filesystem. + """ + self.id = id + + +class ImageFilesystem(Filesystem): + def __init__(self, id: int, image: str, path: str = None): + """ + Represent an image filesystem. + :param id: The id of the image filesystem. + :param image: The image to use. + :param path: The path to the image. If None, the path is "" + """ + super().__init__(id) + self.image = image + self.path = path or "" + + def __str__(self): + return f'' + + @classmethod + def from_dict(cls, data: dict, id: int, workflow: "Workflow"): + """ + Create a new image filesystem from a dictionary. + :param data: The dictionary to create the image filesystem from. + :param id: The id of the image filesystem. + :param workflow: The workflow the image filesystem belongs to. + """ + return cls(id, data['image'], data['path']) + + def to_dict(self) -> dict: + """ + Convert the image filesystem to a dictionary. + """ + return { + 'type': 'image', + 'image': self.image, + 'path': self.path + } + + +class EmptyFilesystem(Filesystem): + def __init__(self, id: int): + """ + Represent an empty filesystem. Can be used as tmpfs. + :param id: The id of the empty filesystem. + """ + super().__init__(id) + + def __str__(self): + return '' + + @classmethod + def from_dict(cls, data: dict, id: int, workflow: "Workflow"): + """ + Create a new empty filesystem from a dictionary. + :param data: The dictionary to create the empty filesystem from. + :param id: The id of the empty filesystem. + :param workflow: The workflow the empty filesystem belongs to. + """ + return cls(id) + + def to_dict(self) -> dict: + """ + Convert the empty filesystem to a dictionary. + """ + return { + 'type': 'empty' + } + + +class ObjectFilesystem(Filesystem): + def __init__(self, id: int, object: Object, workflow: "Workflow"): + """ + Represent an object filesystem. + :param id: The id of the object filesystem. + :param object: The object to use. + :param workflow: The workflow the object belongs to. + """ + super().__init__(id) + self.object = object + + def __str__(self): + return f'' + + @classmethod + def from_dict(cls, data: dict, id: int, workflow: "Workflow"): + """ + Create a new object filesystem from a dictionary. + :param data: The dictionary to create the object filesystem from. + :param id: The id of the object filesystem. + :param workflow: The workflow the object filesystem belongs to. + """ + return cls(id, workflow.objects_manager.get_or_create_object(data['handle']), workflow) + + def to_dict(self) -> dict: + """ + Convert the object filesystem to a dictionary. + """ + return { + 'type': 'object', + 'handle': self.object.handle, + } + + +class FilesystemManager: + def __init__(self, task: "Task"): + """ + Create a new filesystem manager. + :param task: The task the filesystem manager belongs to. + """ + self.filesystems: [Filesystem] = [] + self.id = 0 + self.task = task + + def from_dict(self, data: [dict], workflow: "Workflow"): + """ + Create filesystems from a list of dictionaries. + :param data: The list of dictionaries to create the filesystems from. + :param workflow: The workflow the filesystems belong to. + """ + for fs in data: + if fs['type'] == 'image': + self.filesystems.append(ImageFilesystem.from_dict(fs, self.id, workflow)) + elif fs['type'] == 'empty': + self.filesystems.append(EmptyFilesystem.from_dict(fs, self.id, workflow)) + elif fs['type'] == 'object': + self.filesystems.append(ObjectFilesystem.from_dict(fs, self.id, workflow)) + self.id += 1 + + def to_dict(self) -> [dict]: + """ + Convert the filesystems to a list of dictionaries. + """ + return [fs.to_dict() for fs in self.filesystems] + + def get_by_id(self, id: int) -> Filesystem: + """ + Get a filesystem by id. + :param id: The id of the filesystem. + """ + return self.filesystems[id] + + def add(self, filesystem: Filesystem): + """ + Add a filesystem to the manager. + :param filesystem: The filesystem to add. + """ + self.filesystems.append(filesystem) + self.id += 1 diff --git a/src/sio3pack/workflow/execution/mount_namespace.py b/src/sio3pack/workflow/execution/mount_namespace.py new file mode 100644 index 0000000..3564c4b --- /dev/null +++ b/src/sio3pack/workflow/execution/mount_namespace.py @@ -0,0 +1,104 @@ +from sio3pack.workflow.execution.filesystems import Filesystem, FilesystemManager + + +class Mountpoint: + def __init__(self, source: Filesystem, target: str, writable: bool = False): + self.source = source + self.target = target + self.writable = writable + + @classmethod + def from_dict(cls, data: dict, filesystem_manager: FilesystemManager): + """ + Create a new mountpoint from a dictionary. + :param data: The dictionary to create the mountpoint from. + :param filesystem_manager: The filesystem manager to use. + """ + if isinstance(data['source'], str): # TODO: delete this + return cls(data['source'], data['target'], data['writable']) + return cls(filesystem_manager.get_by_id(int(data['source'])), data['target'], data['writable']) + + def to_dict(self) -> dict: + """ + Convert the mountpoint to a dictionary. + """ + if isinstance(self.source, str): # TODO: delete this + return { + 'source': self.source, + 'target': self.target, + 'writable': self.writable + } + return { + 'source': self.source.id, + 'target': self.target, + 'writable': self.writable + } + + +class MountNamespace: + def __init__(self, id: int, mountpoints: [Mountpoint] = None, root: int = 0): + self.mountpoints = mountpoints or [] + self.root = root + self.id = id + + @classmethod + def from_dict(cls, data: dict, id: int, filesystem_manager: FilesystemManager): + """ + Create a new mount namespace from a dictionary. + :param data: The dictionary to create the mount namespace from. + :param id: The id of the mount namespace. + :param filesystem_manager: The filesystem manager to use. + """ + return cls(id, [Mountpoint.from_dict(mountpoint, filesystem_manager) for mountpoint in data['mountpoints']], + data['root']) + + def to_dict(self) -> dict: + """ + Convert the mount namespace to a dictionary. + """ + return { + 'mountpoints': [mountpoint.to_dict() for mountpoint in self.mountpoints], + 'root': self.root + } + + +class MountNamespaceManager: + def __init__(self, task: "Task", filesystem_manager: FilesystemManager): + """ + Create a new mount namespace manager. + :param task: The task the mount namespace manager belongs to. + """ + self.mount_namespaces: [MountNamespace] = [] + self.id = 0 + self.task = task + self.filesystem_manager = filesystem_manager + + def from_dict(self, data: [dict]): + """ + Create a new mount namespace manager from a list of dictionaries. + :param data: The list of dictionaries to create the mount namespace manager from. + :param filesystem_manager: The filesystem manager to use. + """ + for mount_namespace in data: + self.add(MountNamespace.from_dict(mount_namespace, self.id, self.filesystem_manager)) + self.id += 1 + + def add(self, mount_namespace: MountNamespace): + """ + Add a mount namespace to the manager. + :param mount_namespace: The mount namespace to add. + """ + self.mount_namespaces.append(mount_namespace) + + def get_by_id(self, id: int) -> MountNamespace: + """ + Get a mount namespace by its id. + :param id: The id of the mount namespace. + """ + return self.mount_namespaces[id] + + def to_dict(self) -> [dict]: + """ + Convert the mount namespace manager to a dictionary. + """ + return [mount_namespace.to_dict() for mount_namespace in self.mount_namespaces] diff --git a/src/sio3pack/workflow/execution/pipe.py b/src/sio3pack/workflow/execution/pipe.py new file mode 100644 index 0000000..0ba4701 --- /dev/null +++ b/src/sio3pack/workflow/execution/pipe.py @@ -0,0 +1,29 @@ +class Pipe: + def __init__(self, buffer_size: int = 1048576, file_buffer_size: int = 1073741824, limit: int = 2147483648): + """ + Create a new pipe config. + :param buffer_size: The buffer size for the pipe. + :param file_buffer_size: The buffer size for files. + :param limit: The limit for the pipe. + """ + self.buffer_size = buffer_size + self.file_buffer_size = file_buffer_size + self.limit = limit + + @classmethod + def from_dict(cls, data: dict): + """ + Create a new pipe config from a dictionary. + :param data: The dictionary to create the pipe config from. + """ + return cls(data['buffer_size'], data['file_buffer_size'], data['limit']) + + def to_dict(self) -> dict: + """ + Convert the pipe config to a dictionary. + """ + return { + 'buffer_size': self.buffer_size, + 'file_buffer_size': self.file_buffer_size, + 'limit': self.limit + } diff --git a/src/sio3pack/workflow/execution/process.py b/src/sio3pack/workflow/execution/process.py new file mode 100644 index 0000000..092838e --- /dev/null +++ b/src/sio3pack/workflow/execution/process.py @@ -0,0 +1,44 @@ +from sio3pack.workflow.execution.mount_namespace import MountNamespace, MountNamespaceManager +from sio3pack.workflow.execution.resource_group import ResourceGroup, ResourceGroupManager + + +class Process: + def __init__(self, arguments: [str] = None, environment: dict = None, image: str = "", + mount_namespace: MountNamespace = None, resource_group: ResourceGroup = None, pid_namespace: int = 0, + working_directory: str = "/"): + """ + Represent a process. + :param arguments: The arguments of the process. + :param environment: The environment of the process. + :param image: The image of the process. + :param mount_namespace: The mount namespace of the process. + :param resource_group: The resource group of the process. + :param working_directory: The working directory of the process. + """ + self.arguments = arguments or [] + self.environment = environment or {} + self.image = image + self.mount_namespace = mount_namespace + self.resource_group = resource_group + self.pid_namespace = pid_namespace + self.working_directory = working_directory + + def to_dict(self): + return { + 'arguments': self.arguments, + 'environment': [f'{key}={value}' for key, value in self.environment.items()], + 'image': self.image, + 'mount_namespace': self.mount_namespace.id, + 'resource_group': self.resource_group.id, + 'pid_namespace': self.pid_namespace, + 'working_directory': self.working_directory + } + + @classmethod + def from_dict(cls, data: dict, mountnamespace_manager: MountNamespaceManager, resource_group_manager: ResourceGroupManager): + env = {} + for var in data['environment']: + key, value = var.split('=', 1) + env[key] = value + return cls(data['arguments'], env, data['image'], mountnamespace_manager.get_by_id(data['mount_namespace']), + resource_group_manager.get_by_id(data['resource_group']), data['pid_namespace'], data['working_directory']) diff --git a/src/sio3pack/workflow/execution/resource_group.py b/src/sio3pack/workflow/execution/resource_group.py new file mode 100644 index 0000000..690a943 --- /dev/null +++ b/src/sio3pack/workflow/execution/resource_group.py @@ -0,0 +1,93 @@ +class ResourceGroup: + def __init__(self, id: int, cpu_usage_limit: int = 100., instruction_limit: int = 1e9, + memory_limit: int = 2147483648, oom_terminate_all_tasks: bool = False, pid_limit: int = 2, + swap_limit: int = 0, time_limit: int = 1e9): + """ + Create a new resource group. + :param id: The id of the resource group. + :param cpu_usage_limit: The CPU usage limit. + :param instruction_limit: The instruction usage limit. + :param memory_limit: The memory limit. + :param oom_terminate_all_tasks: Whether to terminate all tasks on OOM. + :param pid_limit: The PID limit. + :param swap_limit: The swap limit. + :param time_limit: The time limit. + """ + self.id = id + self.cpu_usage_limit = cpu_usage_limit + self.instruction_limit = instruction_limit + self.memory_limit = memory_limit + self.oom_terminate_all_tasks = oom_terminate_all_tasks + self.pid_limit = pid_limit + self.swap_limit = swap_limit + self.time_limit = time_limit + + @classmethod + def from_dict(cls, data: dict, id: int): + """ + Create a new resource group from a dictionary. + :param data: The dictionary to create the resource group from. + :param id: The id of the resource group. + """ + return cls(id, data['cpu_usage_limit'], data['instruction_limit'], data['memory_limit'], + data['oom_terminate_all_tasks'], data['pid_limit'], data['swap_limit'], data['time_limit']) + + def to_dict(self) -> dict: + """ + Convert the resource group to a dictionary. + """ + return { + 'cpu_usage_limit': self.cpu_usage_limit, + 'instruction_limit': self.instruction_limit, + 'memory_limit': self.memory_limit, + 'oom_terminate_all_tasks': self.oom_terminate_all_tasks, + 'pid_limit': self.pid_limit, + 'swap_limit': self.swap_limit, + 'time_limit': self.time_limit + } + + +class ResourceGroupManager: + def __init__(self, task: "Task"): + """ + Create a new resource group manager. + :param task: The task the resource group manager belongs to. + """ + self.resource_groups: [ResourceGroup] = [] + self.id = 0 + + def add(self, resource_group: ResourceGroup): + """ + Add a resource group to the resource group manager. + :param resource_group: The resource group to add. + """ + self.resource_groups.append(resource_group) + self.id += 1 + + def get_by_id(self, id: int) -> ResourceGroup: + """ + Get a resource group by its id. + :param id: The id of the resource group to get. + """ + return self.resource_groups[id] + + def to_dict(self) -> [dict]: + """ + Convert the resource group manager to a dictionary. + """ + return [resource_group.to_dict() for resource_group in self.resource_groups] + + def from_dict(self, data: [dict]): + """ + Create a new resource group manager from a list of dictionaries. + :param data: The list of dictionaries to create the resource group manager from. + """ + for resource_group in data: + self.add(ResourceGroup.from_dict(resource_group, self.id)) + self.id += 1 + + def all(self): + """ + Get all resource groups. + """ + return self.resource_groups diff --git a/src/sio3pack/workflow/object.py b/src/sio3pack/workflow/object.py new file mode 100644 index 0000000..171958b --- /dev/null +++ b/src/sio3pack/workflow/object.py @@ -0,0 +1,51 @@ +class Object: + """ + A class to represent an object in a workflow. + """ + def __init__(self, handle: str): + """ + Create a new object. + :param handle: The handle of the object. + """ + self.handle = handle + + def __str__(self): + return f'' + + +class ObjectsManager: + def __init__(self): + self.objects = {} + + def create_object(self, handle: str) -> Object: + """ + Create and return a new object. + :param handle: The handle of the object. + :return: The created object. + """ + obj = Object(handle) + self.objects[handle] = obj + return obj + + def add_object(self, obj: Object): + """ + Add an object to the manager. + :param obj: The object to add. + """ + self.objects[obj.handle] = obj + + def get_object(self, handle: str) -> Object: + """ + Get an object by its handle. + :param handle: The handle of the object. + """ + return self.objects[handle] + + def get_or_create_object(self, handle: str) -> Object: + """ + Get an object by its handle, creating it if it does not exist. + :param handle: The handle of the object. + """ + if handle not in self.objects: + return self.create_object(handle) + return self.get_object(handle) diff --git a/src/sio3pack/workflow/tasks.py b/src/sio3pack/workflow/tasks.py new file mode 100644 index 0000000..02f4911 --- /dev/null +++ b/src/sio3pack/workflow/tasks.py @@ -0,0 +1,160 @@ +from sio3pack.workflow.execution.filesystems import Filesystem, FilesystemManager +from sio3pack.workflow.execution.mount_namespace import MountNamespace, MountNamespaceManager +from sio3pack.workflow.execution.pipe import Pipe +from sio3pack.workflow.execution.process import Process +from sio3pack.workflow.execution.resource_group import ResourceGroupManager, ResourceGroup + + +class Task: + @classmethod + def from_dict(cls, data: dict, workflow: "Workflow"): + """ + Create a new task from a dictionary. + :param data: The dictionary to create the task from. + :param workflow: The workflow the task belongs to. + """ + if data['type'] == 'execution': + return ExecutionTask.from_dict(data, workflow) + elif data['type'] == 'script': + return ScriptTask.from_dict(data, workflow) + else: + raise ValueError(f"Unknown task type: {data['type']}") + + +class ExecutionTask(Task): + def __init__(self, name: str, workflow: "Workflow", exclusive: bool = False, hard_time_limit: int = None, + extra_limit: int = None, output_register: int = None, input_register: int = None, pid_namespaces: int = 1, + processes: [Process] = None, pipes: [Pipe] = None, system_pipes: int = 3): + """ + Create a new execution task. + :param name: The name of the task. + :param workflow: The workflow the task belongs to. + :param exclusive: Whether the task is exclusive. + :param hard_time_limit: The hard time limit. + :param extra_limit: If set, the hard_time_limit for the task will be the maximum time limit of all resource groups + plus this value. + :param output_register: The output register of the task. + :param input_register: The input register of the task. TODO delete + :param pid_namespaces: The number of PID namespaces. + :param processes: The processes of the task. + :param pipes: The pipes of the task. + :param system_pipes: The number of system pipes. + """ + self.name = name + self.workflow = workflow + self.exclusive = exclusive + if hard_time_limit is not None: + self.hard_time_limit = hard_time_limit + self.output_register = output_register + self.input_register = input_register + self.pid_namespaces = pid_namespaces + self.processes = processes or [] + self.system_pipes = system_pipes + self.pipes = [Pipe.from_dict(pipe) for pipe in pipes] if pipes else [] + self.extra_limit = extra_limit + + self.filesystem_manager = FilesystemManager(self) + self.mountnamespace_manager = MountNamespaceManager(self, self.filesystem_manager) + self.resource_group_manager = ResourceGroupManager(self) + + @classmethod + def from_dict(cls, data: dict, workflow: "Workflow"): + """ + Create a new execution task from a dictionary. + :param data: The dictionary to create the task from. + :param workflow: The workflow the task belongs to. + """ + task = cls(data['name'], workflow, data['exclusive'], data.get('hard_time_limit'), + output_register=data.get('output_register'), input_register=data.get('input_register'), + pid_namespaces=data['pid_namespaces'], pipes=data['pipes'], system_pipes=data.get('system_pipes', 3)) + task.filesystem_manager.from_dict(data['filesystems'], workflow) + task.mountnamespace_manager.from_dict(data['mount_namespaces']) + task.resource_group_manager.from_dict(data['resource_groups']) + task.processes = [Process.from_dict(process, task.mountnamespace_manager, task.resource_group_manager) + for process in data['processes']] + return task + + def to_dict(self): + """ + Convert the task to a dictionary. + """ + hard_time_limit = self.hard_time_limit + if self.extra_limit is not None: + hard_time_limit = 0 + for rg in self.resource_group_manager.all(): + hard_time_limit = max(hard_time_limit, rg.time_limit) + hard_time_limit += self.extra_limit + + res = { + 'name': self.name, + 'type': 'execution', + 'exclusive': self.exclusive, + 'hard_time_limit': hard_time_limit, + 'output_register': self.output_register, + 'pid_namespaces': self.pid_namespaces, + 'filesystems': self.filesystem_manager.to_dict(), + 'mount_namespaces': self.mountnamespace_manager.to_dict(), + 'pipes': [pipe.to_dict() for pipe in self.pipes], + 'system_pipes': self.system_pipes, + 'resource_groups': self.resource_group_manager.to_dict(), + 'processes': [process.to_dict() for process in self.processes], + } + if self.input_register is not None: + res['input_register'] = self.input_register + return res + + def add_filesystem(self, filesystem: Filesystem): + self.filesystem_manager.add(filesystem) + + def add_mount_namespace(self, mount_namespace: MountNamespace): + self.mountnamespace_manager.add(mount_namespace) + + def add_resource_group(self, resource_group: ResourceGroup): + self.resource_group_manager.add(resource_group) + + + +class ScriptTask(Task): + def __init__(self, name: str, workflow: "Workflow", reactive: bool = False, input_registers: [int] = None, + output_registers: [int] = None, script: str = None): + """ + Create a new script task. + :param name: The name of the task. + :param workflow: The workflow the task belongs to. + :param reactive: Whether the task is reactive. + :param input_registers: The input registers of the task. + :param output_registers: The output registers of the task. + :param script: The script to run. + """ + self.name = name + self.workflow = workflow + self.reactive = reactive + self.input_registers = input_registers or [] + self.output_registers = output_registers or [] + self.script = script + + def __str__(self): + return f"" + + @classmethod + def from_dict(cls, data: dict, workflow: "Workflow"): + """ + Create a new script task from a dictionary. + :param data: The dictionary to create the task from. + :param workflow: The workflow the task belongs to. + """ + return cls(data['name'], workflow, data['reactive'], data['input_registers'], data['output_registers'], + data['script']) + + def to_dict(self) -> dict: + """ + Convert the task to a dictionary. + """ + return { + 'name': self.name, + 'type': 'script', + 'reactive': self.reactive, + 'input_registers': self.input_registers, + 'output_registers': self.output_registers, + 'script': self.script + } diff --git a/src/sio3pack/workflow/workflow.py b/src/sio3pack/workflow/workflow.py new file mode 100644 index 0000000..4e16a3a --- /dev/null +++ b/src/sio3pack/workflow/workflow.py @@ -0,0 +1,81 @@ +from sio3pack.workflow.object import ObjectsManager +from sio3pack.workflow.tasks import Task + + +class Workflow: + """ + A class to represent a job workflow. + """ + + @classmethod + def from_dict(cls, data: dict): + """ + Create a new workflow from a dictionary. + :param data: The dictionary to create the workflow from. + """ + workflow = cls(data['name'], data['external_objects'], data['observable_objects'], data['observable_registers']) + for task in data['tasks']: + workflow.add_task(Task.from_dict(task, workflow)) + return workflow + + def __init__(self, name: str, external_objects: [str] = None, observable_objects: [str] = None, + observable_registers: int = 0, tasks: [Task] = None): + """ + Create a new workflow. Number of required registers is calculated automatically. + :param name: The name of the workflow. + :param external_objects: The external objects used in the workflow. + :param observable_objects: The observable objects used in the workflow. + :param observable_registers: The number of observable registers used in the workflow. + :param tasks: The tasks in the workflow. + """ + self.name = name + self.observable_registers = observable_registers + self.tasks = tasks or [] + self.objects_manager = ObjectsManager() + + self.external_objects = [] + for obj in external_objects or []: + self.external_objects.append(self.objects_manager.get_or_create_object(obj)) + + self.observable_objects = [] + for obj in observable_objects or []: + self.observable_objects.append(self.objects_manager.get_or_create_object(obj)) + + + def __str__(self): + return f"" + + def to_dict(self) -> dict: + """ + Convert the workflow to a dictionary. + """ + res = { + 'name': self.name, + 'external_objects': [obj.handle for obj in self.external_objects], + 'observable_objects': [obj.handle for obj in self.observable_objects], + 'observable_registers': self.observable_registers, + 'tasks': [task.to_dict() for task in self.tasks] + } + num_registers = 0 + for task in res["tasks"]: + if "input_registers" in task: + num_registers = max(num_registers, max(task["input_registers"])) + if "output_registers" in task: + num_registers = max(num_registers, max(task["output_registers"])) + if "output_register" in task: + num_registers = max(num_registers, task["output_register"]) + res["registers"] = num_registers + 1 + return res + + def add_task(self, task: Task): + """ + Add a task to the workflow. + :param task: The task to add. + """ + self.tasks.append(task) + + def get_prog_files(self) -> list[str]: + """ + Get all program files in the workflow. + """ + raise NotImplementedError() diff --git a/src/sio3pack/graph/graph_manager.py b/src/sio3pack/workflow/workflow_manager.py similarity index 71% rename from src/sio3pack/graph/graph_manager.py rename to src/sio3pack/workflow/workflow_manager.py index 4a8e4f8..5ba9d1c 100644 --- a/src/sio3pack/graph/graph_manager.py +++ b/src/sio3pack/workflow/workflow_manager.py @@ -1,19 +1,19 @@ import json from sio3pack.files import File -from sio3pack.graph.graph import Graph +from sio3pack.workflow.workflow import Workflow -class GraphManager: +class WorkflowManager: @classmethod def from_file(cls, file: File): graphs = {} content = json.loads(file.read()) for name, graph in content.items(): - graphs[name] = Graph.from_dict(graph) + graphs[name] = Workflow.from_dict(graph) return cls(graphs) - def __init__(self, graphs: dict[str, Graph]): + def __init__(self, graphs: dict[str, Workflow]): self.graphs = graphs def get_prog_files(self) -> list[str]: @@ -25,5 +25,5 @@ def get_prog_files(self) -> list[str]: files.update(graph.get_prog_files()) return list(files) - def get(self, name: str) -> Graph: + def get(self, name: str) -> Workflow: return self.graphs[name] diff --git a/src/sio3pack/graph/graph_op.py b/src/sio3pack/workflow/workflow_op.py similarity index 63% rename from src/sio3pack/graph/graph_op.py rename to src/sio3pack/workflow/workflow_op.py index 3ef4257..d8867e9 100644 --- a/src/sio3pack/graph/graph_op.py +++ b/src/sio3pack/workflow/workflow_op.py @@ -1,15 +1,15 @@ -from sio3pack.graph.graph import Graph +from sio3pack.workflow.workflow import Workflow -class GraphOperation: +class WorkflowOperation: """ - A class to represent a graph that should be run on workers. + A class to represent a workflow that should be run on workers. Allows for returning results. """ - def __init__(self, graph: Graph, return_results: bool = False, return_func: callable = None): + def __init__(self, graph: Workflow, return_results: bool = False, return_func: callable = None): """ - :param graph: The graph to run on workers. + :param graph: The workflow to run on workers. :param return_results: Whether to return the results. :param return_func: The function to use to return the results, if return_results is True. From 6a46dbccce042815a533ec30df1b43baf025c7b0 Mon Sep 17 00:00:00 2001 From: MasloMaslane Date: Sat, 8 Mar 2025 14:01:35 +0100 Subject: [PATCH 2/7] Add a test --- .../example.json | 3 ++ .../run.json | 34 +++++++++++++------ setup.cfg | 1 + tests/workflow/__init__.py | 0 tests/workflow/test_workflow.py | 16 +++++++++ 5 files changed, 43 insertions(+), 11 deletions(-) rename {example_graphs => example_workflows}/example.json (95%) rename {example_graphs => example_workflows}/run.json (96%) create mode 100644 tests/workflow/__init__.py create mode 100644 tests/workflow/test_workflow.py diff --git a/example_graphs/example.json b/example_workflows/example.json similarity index 95% rename from example_graphs/example.json rename to example_workflows/example.json index 8f0b865..7c53951 100644 --- a/example_graphs/example.json +++ b/example_workflows/example.json @@ -1,4 +1,5 @@ { + "name": "Example workflow", "external_objects" : [ "example-object" ], @@ -7,6 +8,7 @@ "registers" : 3, "tasks" : [ { + "name": "Example script", "input_registers" : [ 1 ], @@ -19,6 +21,7 @@ "type" : "script" }, { + "name": "Example execution", "exclusive" : true, "filesystems" : [ { diff --git a/example_graphs/run.json b/example_workflows/run.json similarity index 96% rename from example_graphs/run.json rename to example_workflows/run.json index b17a2a1..b1e0878 100644 --- a/example_graphs/run.json +++ b/example_workflows/run.json @@ -1,4 +1,5 @@ { + "name": "Run the tests and grade the results", "external_objects": [ "abc0a.in", "abc0b.in", @@ -35,7 +36,7 @@ } ], "hard_time_limit": 2137, - "mount_namespace": [ + "mount_namespaces": [ { "mountpoints": [ { @@ -67,6 +68,7 @@ "/exe", "" }, { "type": "script", diff --git a/setup.cfg b/setup.cfg index b2890a1..a9668f6 100644 --- a/setup.cfg +++ b/setup.cfg @@ -32,6 +32,7 @@ tests = pytest pytest-cov pytest-xdist + deepdiff django_tests = pytest-django django = diff --git a/tests/workflow/__init__.py b/tests/workflow/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/workflow/test_workflow.py b/tests/workflow/test_workflow.py new file mode 100644 index 0000000..385fd3c --- /dev/null +++ b/tests/workflow/test_workflow.py @@ -0,0 +1,16 @@ +import os +import json +from deepdiff import DeepDiff + +from sio3pack.workflow import Workflow + + +def test_workflow_parsing(): + workflows_dir = os.path.join(os.path.dirname(__file__), "..", "..", "example_workflows") + for file in os.listdir(workflows_dir): + if not os.path.splitext(file)[1] == ".json": + continue + print(f'Parsing {file}') + data = json.load(open(os.path.join(workflows_dir, file))) + workflow = Workflow.from_dict(data) + assert workflow.to_dict() == data, f"Failed for {file}. Diff: {DeepDiff(data, workflow.to_dict())}" From e31d16a54231ecea5a959b7e8deb115d4e40cb0d Mon Sep 17 00:00:00 2001 From: MasloMaslane Date: Sat, 8 Mar 2025 14:04:45 +0100 Subject: [PATCH 3/7] Run formatters --- src/sio3pack/packages/package/model.py | 7 +- src/sio3pack/packages/sinolpack/model.py | 2 +- src/sio3pack/visualizer/__init__.py | 241 ++++++++++-------- .../workflow/execution/filesystems.py | 28 +- .../workflow/execution/mount_namespace.py | 32 +-- src/sio3pack/workflow/execution/pipe.py | 8 +- src/sio3pack/workflow/execution/process.py | 46 ++-- .../workflow/execution/resource_group.py | 40 ++- src/sio3pack/workflow/object.py | 3 +- src/sio3pack/workflow/tasks.py | 104 +++++--- src/sio3pack/workflow/workflow.py | 25 +- tests/workflow/test_workflow.py | 5 +- 12 files changed, 309 insertions(+), 232 deletions(-) diff --git a/src/sio3pack/packages/package/model.py b/src/sio3pack/packages/package/model.py index 318c4d5..eca39ea 100644 --- a/src/sio3pack/packages/package/model.py +++ b/src/sio3pack/packages/package/model.py @@ -2,15 +2,14 @@ from typing import Any from sio3pack import LocalFile +from sio3pack.exceptions import SIO3PackException from sio3pack.files import File -from sio3pack.workflow import Workflow, WorkflowOperation from sio3pack.packages.exceptions import UnknownPackageType from sio3pack.packages.package.handler import NoDjangoHandler from sio3pack.test import Test from sio3pack.utils.archive import Archive from sio3pack.utils.classinit import RegisteredSubclassesBase - -from sio3pack.exceptions import SIO3PackException +from sio3pack.workflow import Workflow, WorkflowOperation def wrap_exceptions(func): @@ -20,7 +19,7 @@ def decorator(*args, **kwargs): try: return func(*args, **kwargs) except SIO3PackException: - raise # Do not wrap SIO3PackExceptions again + raise # Do not wrap SIO3PackExceptions again except Exception as e: raise SIO3PackException(f"SIO3Pack raised an exception in {func.__name__} function.", e) diff --git a/src/sio3pack/packages/sinolpack/model.py b/src/sio3pack/packages/sinolpack/model.py index 86e0cc4..5fde23c 100644 --- a/src/sio3pack/packages/sinolpack/model.py +++ b/src/sio3pack/packages/sinolpack/model.py @@ -5,12 +5,12 @@ import yaml from sio3pack.files import File, LocalFile -from sio3pack.workflow import Workflow, WorkflowManager, WorkflowOperation from sio3pack.packages.exceptions import ImproperlyConfigured from sio3pack.packages.package import Package from sio3pack.packages.sinolpack.enums import ModelSolutionKind from sio3pack.util import naturalsort_key from sio3pack.utils.archive import Archive, UnrecognizedArchiveFormat +from sio3pack.workflow import Workflow, WorkflowManager, WorkflowOperation class Sinolpack(Package): diff --git a/src/sio3pack/visualizer/__init__.py b/src/sio3pack/visualizer/__init__.py index f138d1c..46338fe 100644 --- a/src/sio3pack/visualizer/__init__.py +++ b/src/sio3pack/visualizer/__init__.py @@ -4,13 +4,13 @@ try: import dash import dash_cytoscape as cyto - from dash import html, Output, Input + from dash import Input, Output, html except ImportError: raise ImportError("Please install the 'dash' and 'dash-cytoscape' packages to use the visualizer.") +import json import os import sys -import json def main(): @@ -32,32 +32,28 @@ def main(): # Create nodes for observable registers. for register in range(graph["observable_registers"]): - elements.append({ - "data": { - "id": f"obs_register_{register}", - "label": f"Observable register {register}", - "info": "This is an observable register. It's an output of a workflow." - }, - "classes": "register" - }) + elements.append( + { + "data": { + "id": f"obs_register_{register}", + "label": f"Observable register {register}", + "info": "This is an observable register. It's an output of a workflow.", + }, + "classes": "register", + } + ) ins[register] = [f"obs_register_{register}"] rendered_registers.add(register) - script_i = 0 execution_i = 0 # First pass to create nodes and mark input registers. for task in graph["tasks"]: if task["type"] == "script": id = f"script_{script_i}" - elements.append({ - "data": { - "id": id, - "label": task.get("name", f"Script {script_i}"), - "info": task - }, - "classes": "script" - }) + elements.append( + {"data": {"id": id, "label": task.get("name", f"Script {script_i}"), "info": task}, "classes": "script"} + ) if task["reactive"]: elements[-1]["classes"] += " reactive" script_i += 1 @@ -67,14 +63,12 @@ def main(): ins[register].append(id) elif task["type"] == "execution": id = f"execution_{execution_i}" - elements.append({ - "data": { - "id": id, - "label": task.get("name", f"Execution {execution_i}"), - "info": task - }, - "classes": "execution" - }) + elements.append( + { + "data": {"id": id, "label": task.get("name", f"Execution {execution_i}"), "info": task}, + "classes": "execution", + } + ) if task["exclusive"]: elements[-1]["classes"] += " exclusive" @@ -99,31 +93,37 @@ def main(): for register in registers: if register not in ins: - elements.append({ - "data": { - "id": f"register_{register}", - "label": f"Register {register}", - "info": f"This is a register. It's an intermediate value in a workflow." - }, - "classes": "register" - }) + elements.append( + { + "data": { + "id": f"register_{register}", + "label": f"Register {register}", + "info": f"This is a register. It's an intermediate value in a workflow.", + }, + "classes": "register", + } + ) ins[register] = [f"register_{register}"] rendered_registers.add(register) for id in ins[register]: if task["type"] == "script": - elements.append({ - "data": { - "source": f"script_{script_i}", - "target": id, + elements.append( + { + "data": { + "source": f"script_{script_i}", + "target": id, + } } - }) + ) elif task["type"] == "execution": - elements.append({ - "data": { - "source": f"execution_{execution_i}", - "target": id, + elements.append( + { + "data": { + "source": f"execution_{execution_i}", + "target": id, + } } - }) + ) if register not in rendered_registers: elements[-1]["data"]["label"] = f"via register {register}" @@ -133,70 +133,99 @@ def main(): execution_i += 1 app = dash.Dash(__name__) - app.layout = html.Div([ - html.Div([ - cyto.Cytoscape( - id='cytoscape', - layout={"name": "breadthfirst", "directed": True}, - style={'width': '100%', 'height': '100vh'}, - elements=elements, - stylesheet=[ - {"selector": "node", "style": { - "label": "data(label)", - "text-valign": "center", - "text-margin-y": "-20px", - }}, - {"selector": "edge", "style": { - "curve-style": "bezier", # Makes edges curved for better readability - "target-arrow-shape": "triangle", # Adds an arrowhead to indicate direction - "arrow-scale": 1.5, # Makes the arrow larger - "line-color": "#0074D9", # Edge color - "target-arrow-color": "#0074D9", # Arrow color - "width": 2, # Line thickness - "content": "data(label)", # Show edge label on hover - "font-size": "12px", - "color": "#ff4136", - "text-background-opacity": 1, - "text-background-color": "white", - "text-background-shape": "roundrectangle", - "text-border-opacity": 1, - "text-border-width": 1, - "text-border-color": "#ff4136", - }}, - {"selector": ".register", "style": { - "shape": "rectangle", - }}, - {"selector": ".script", "style": { - "shape": "roundrectangle", - }}, - {"selector": ".execution", "style": { - "shape": "ellipse", - }}, - {"selector": ".reactive", "style": { - "background-color": "#ff851b", - }}, - {"selector": ".exclusive", "style": { - "background-color": "#ff4136", - }}, + app.layout = html.Div( + [ + html.Div( + [ + cyto.Cytoscape( + id="cytoscape", + layout={"name": "breadthfirst", "directed": True}, + style={"width": "100%", "height": "100vh"}, + elements=elements, + stylesheet=[ + { + "selector": "node", + "style": { + "label": "data(label)", + "text-valign": "center", + "text-margin-y": "-20px", + }, + }, + { + "selector": "edge", + "style": { + "curve-style": "bezier", # Makes edges curved for better readability + "target-arrow-shape": "triangle", # Adds an arrowhead to indicate direction + "arrow-scale": 1.5, # Makes the arrow larger + "line-color": "#0074D9", # Edge color + "target-arrow-color": "#0074D9", # Arrow color + "width": 2, # Line thickness + "content": "data(label)", # Show edge label on hover + "font-size": "12px", + "color": "#ff4136", + "text-background-opacity": 1, + "text-background-color": "white", + "text-background-shape": "roundrectangle", + "text-border-opacity": 1, + "text-border-width": 1, + "text-border-color": "#ff4136", + }, + }, + { + "selector": ".register", + "style": { + "shape": "rectangle", + }, + }, + { + "selector": ".script", + "style": { + "shape": "roundrectangle", + }, + }, + { + "selector": ".execution", + "style": { + "shape": "ellipse", + }, + }, + { + "selector": ".reactive", + "style": { + "background-color": "#ff851b", + }, + }, + { + "selector": ".exclusive", + "style": { + "background-color": "#ff4136", + }, + }, + ], + ), ], + style={"flex": "3", "height": "100vh"}, ), - ], style={"flex": "3", "height": "100vh"}), - - html.Div([ - html.Pre(id='node-data', style={ - "padding": "10px", - "white-space": "pre", - "overflow": "auto", - "max-height": "95vh", - "max-width": "100%" - }) - ], style={"flex": "1", "height": "100vh", "background-color": "#f7f7f7"}) - ], style={"display": "flex", "flex-direction": "row", "height": "100vh"}) - - @app.callback( - Output('node-data', 'children'), - Input('cytoscape', 'tapNodeData') + html.Div( + [ + html.Pre( + id="node-data", + style={ + "padding": "10px", + "white-space": "pre", + "overflow": "auto", + "max-height": "95vh", + "max-width": "100%", + }, + ) + ], + style={"flex": "1", "height": "100vh", "background-color": "#f7f7f7"}, + ), + ], + style={"display": "flex", "flex-direction": "row", "height": "100vh"}, ) + + @app.callback(Output("node-data", "children"), Input("cytoscape", "tapNodeData")) def display_task_info(data): if data is None: return "Click on a node to see its info." diff --git a/src/sio3pack/workflow/execution/filesystems.py b/src/sio3pack/workflow/execution/filesystems.py index a4ea916..5d5c18c 100644 --- a/src/sio3pack/workflow/execution/filesystems.py +++ b/src/sio3pack/workflow/execution/filesystems.py @@ -33,17 +33,13 @@ def from_dict(cls, data: dict, id: int, workflow: "Workflow"): :param id: The id of the image filesystem. :param workflow: The workflow the image filesystem belongs to. """ - return cls(id, data['image'], data['path']) + return cls(id, data["image"], data["path"]) def to_dict(self) -> dict: """ Convert the image filesystem to a dictionary. """ - return { - 'type': 'image', - 'image': self.image, - 'path': self.path - } + return {"type": "image", "image": self.image, "path": self.path} class EmptyFilesystem(Filesystem): @@ -55,7 +51,7 @@ def __init__(self, id: int): super().__init__(id) def __str__(self): - return '' + return "" @classmethod def from_dict(cls, data: dict, id: int, workflow: "Workflow"): @@ -71,9 +67,7 @@ def to_dict(self) -> dict: """ Convert the empty filesystem to a dictionary. """ - return { - 'type': 'empty' - } + return {"type": "empty"} class ObjectFilesystem(Filesystem): @@ -88,7 +82,7 @@ def __init__(self, id: int, object: Object, workflow: "Workflow"): self.object = object def __str__(self): - return f'' + return f"" @classmethod def from_dict(cls, data: dict, id: int, workflow: "Workflow"): @@ -98,15 +92,15 @@ def from_dict(cls, data: dict, id: int, workflow: "Workflow"): :param id: The id of the object filesystem. :param workflow: The workflow the object filesystem belongs to. """ - return cls(id, workflow.objects_manager.get_or_create_object(data['handle']), workflow) + return cls(id, workflow.objects_manager.get_or_create_object(data["handle"]), workflow) def to_dict(self) -> dict: """ Convert the object filesystem to a dictionary. """ return { - 'type': 'object', - 'handle': self.object.handle, + "type": "object", + "handle": self.object.handle, } @@ -127,11 +121,11 @@ def from_dict(self, data: [dict], workflow: "Workflow"): :param workflow: The workflow the filesystems belong to. """ for fs in data: - if fs['type'] == 'image': + if fs["type"] == "image": self.filesystems.append(ImageFilesystem.from_dict(fs, self.id, workflow)) - elif fs['type'] == 'empty': + elif fs["type"] == "empty": self.filesystems.append(EmptyFilesystem.from_dict(fs, self.id, workflow)) - elif fs['type'] == 'object': + elif fs["type"] == "object": self.filesystems.append(ObjectFilesystem.from_dict(fs, self.id, workflow)) self.id += 1 diff --git a/src/sio3pack/workflow/execution/mount_namespace.py b/src/sio3pack/workflow/execution/mount_namespace.py index 3564c4b..fe02318 100644 --- a/src/sio3pack/workflow/execution/mount_namespace.py +++ b/src/sio3pack/workflow/execution/mount_namespace.py @@ -14,25 +14,17 @@ def from_dict(cls, data: dict, filesystem_manager: FilesystemManager): :param data: The dictionary to create the mountpoint from. :param filesystem_manager: The filesystem manager to use. """ - if isinstance(data['source'], str): # TODO: delete this - return cls(data['source'], data['target'], data['writable']) - return cls(filesystem_manager.get_by_id(int(data['source'])), data['target'], data['writable']) + if isinstance(data["source"], str): # TODO: delete this + return cls(data["source"], data["target"], data["writable"]) + return cls(filesystem_manager.get_by_id(int(data["source"])), data["target"], data["writable"]) def to_dict(self) -> dict: """ Convert the mountpoint to a dictionary. """ - if isinstance(self.source, str): # TODO: delete this - return { - 'source': self.source, - 'target': self.target, - 'writable': self.writable - } - return { - 'source': self.source.id, - 'target': self.target, - 'writable': self.writable - } + if isinstance(self.source, str): # TODO: delete this + return {"source": self.source, "target": self.target, "writable": self.writable} + return {"source": self.source.id, "target": self.target, "writable": self.writable} class MountNamespace: @@ -49,17 +41,17 @@ def from_dict(cls, data: dict, id: int, filesystem_manager: FilesystemManager): :param id: The id of the mount namespace. :param filesystem_manager: The filesystem manager to use. """ - return cls(id, [Mountpoint.from_dict(mountpoint, filesystem_manager) for mountpoint in data['mountpoints']], - data['root']) + return cls( + id, + [Mountpoint.from_dict(mountpoint, filesystem_manager) for mountpoint in data["mountpoints"]], + data["root"], + ) def to_dict(self) -> dict: """ Convert the mount namespace to a dictionary. """ - return { - 'mountpoints': [mountpoint.to_dict() for mountpoint in self.mountpoints], - 'root': self.root - } + return {"mountpoints": [mountpoint.to_dict() for mountpoint in self.mountpoints], "root": self.root} class MountNamespaceManager: diff --git a/src/sio3pack/workflow/execution/pipe.py b/src/sio3pack/workflow/execution/pipe.py index 0ba4701..e74df5e 100644 --- a/src/sio3pack/workflow/execution/pipe.py +++ b/src/sio3pack/workflow/execution/pipe.py @@ -16,14 +16,10 @@ def from_dict(cls, data: dict): Create a new pipe config from a dictionary. :param data: The dictionary to create the pipe config from. """ - return cls(data['buffer_size'], data['file_buffer_size'], data['limit']) + return cls(data["buffer_size"], data["file_buffer_size"], data["limit"]) def to_dict(self) -> dict: """ Convert the pipe config to a dictionary. """ - return { - 'buffer_size': self.buffer_size, - 'file_buffer_size': self.file_buffer_size, - 'limit': self.limit - } + return {"buffer_size": self.buffer_size, "file_buffer_size": self.file_buffer_size, "limit": self.limit} diff --git a/src/sio3pack/workflow/execution/process.py b/src/sio3pack/workflow/execution/process.py index 092838e..b871b1f 100644 --- a/src/sio3pack/workflow/execution/process.py +++ b/src/sio3pack/workflow/execution/process.py @@ -3,9 +3,16 @@ class Process: - def __init__(self, arguments: [str] = None, environment: dict = None, image: str = "", - mount_namespace: MountNamespace = None, resource_group: ResourceGroup = None, pid_namespace: int = 0, - working_directory: str = "/"): + def __init__( + self, + arguments: [str] = None, + environment: dict = None, + image: str = "", + mount_namespace: MountNamespace = None, + resource_group: ResourceGroup = None, + pid_namespace: int = 0, + working_directory: str = "/", + ): """ Represent a process. :param arguments: The arguments of the process. @@ -25,20 +32,29 @@ def __init__(self, arguments: [str] = None, environment: dict = None, image: str def to_dict(self): return { - 'arguments': self.arguments, - 'environment': [f'{key}={value}' for key, value in self.environment.items()], - 'image': self.image, - 'mount_namespace': self.mount_namespace.id, - 'resource_group': self.resource_group.id, - 'pid_namespace': self.pid_namespace, - 'working_directory': self.working_directory + "arguments": self.arguments, + "environment": [f"{key}={value}" for key, value in self.environment.items()], + "image": self.image, + "mount_namespace": self.mount_namespace.id, + "resource_group": self.resource_group.id, + "pid_namespace": self.pid_namespace, + "working_directory": self.working_directory, } @classmethod - def from_dict(cls, data: dict, mountnamespace_manager: MountNamespaceManager, resource_group_manager: ResourceGroupManager): + def from_dict( + cls, data: dict, mountnamespace_manager: MountNamespaceManager, resource_group_manager: ResourceGroupManager + ): env = {} - for var in data['environment']: - key, value = var.split('=', 1) + for var in data["environment"]: + key, value = var.split("=", 1) env[key] = value - return cls(data['arguments'], env, data['image'], mountnamespace_manager.get_by_id(data['mount_namespace']), - resource_group_manager.get_by_id(data['resource_group']), data['pid_namespace'], data['working_directory']) + return cls( + data["arguments"], + env, + data["image"], + mountnamespace_manager.get_by_id(data["mount_namespace"]), + resource_group_manager.get_by_id(data["resource_group"]), + data["pid_namespace"], + data["working_directory"], + ) diff --git a/src/sio3pack/workflow/execution/resource_group.py b/src/sio3pack/workflow/execution/resource_group.py index 690a943..12f2b41 100644 --- a/src/sio3pack/workflow/execution/resource_group.py +++ b/src/sio3pack/workflow/execution/resource_group.py @@ -1,7 +1,15 @@ class ResourceGroup: - def __init__(self, id: int, cpu_usage_limit: int = 100., instruction_limit: int = 1e9, - memory_limit: int = 2147483648, oom_terminate_all_tasks: bool = False, pid_limit: int = 2, - swap_limit: int = 0, time_limit: int = 1e9): + def __init__( + self, + id: int, + cpu_usage_limit: int = 100.0, + instruction_limit: int = 1e9, + memory_limit: int = 2147483648, + oom_terminate_all_tasks: bool = False, + pid_limit: int = 2, + swap_limit: int = 0, + time_limit: int = 1e9, + ): """ Create a new resource group. :param id: The id of the resource group. @@ -29,21 +37,29 @@ def from_dict(cls, data: dict, id: int): :param data: The dictionary to create the resource group from. :param id: The id of the resource group. """ - return cls(id, data['cpu_usage_limit'], data['instruction_limit'], data['memory_limit'], - data['oom_terminate_all_tasks'], data['pid_limit'], data['swap_limit'], data['time_limit']) + return cls( + id, + data["cpu_usage_limit"], + data["instruction_limit"], + data["memory_limit"], + data["oom_terminate_all_tasks"], + data["pid_limit"], + data["swap_limit"], + data["time_limit"], + ) def to_dict(self) -> dict: """ Convert the resource group to a dictionary. """ return { - 'cpu_usage_limit': self.cpu_usage_limit, - 'instruction_limit': self.instruction_limit, - 'memory_limit': self.memory_limit, - 'oom_terminate_all_tasks': self.oom_terminate_all_tasks, - 'pid_limit': self.pid_limit, - 'swap_limit': self.swap_limit, - 'time_limit': self.time_limit + "cpu_usage_limit": self.cpu_usage_limit, + "instruction_limit": self.instruction_limit, + "memory_limit": self.memory_limit, + "oom_terminate_all_tasks": self.oom_terminate_all_tasks, + "pid_limit": self.pid_limit, + "swap_limit": self.swap_limit, + "time_limit": self.time_limit, } diff --git a/src/sio3pack/workflow/object.py b/src/sio3pack/workflow/object.py index 171958b..2372126 100644 --- a/src/sio3pack/workflow/object.py +++ b/src/sio3pack/workflow/object.py @@ -2,6 +2,7 @@ class Object: """ A class to represent an object in a workflow. """ + def __init__(self, handle: str): """ Create a new object. @@ -10,7 +11,7 @@ def __init__(self, handle: str): self.handle = handle def __str__(self): - return f'' + return f"" class ObjectsManager: diff --git a/src/sio3pack/workflow/tasks.py b/src/sio3pack/workflow/tasks.py index 02f4911..7121d3a 100644 --- a/src/sio3pack/workflow/tasks.py +++ b/src/sio3pack/workflow/tasks.py @@ -2,7 +2,7 @@ from sio3pack.workflow.execution.mount_namespace import MountNamespace, MountNamespaceManager from sio3pack.workflow.execution.pipe import Pipe from sio3pack.workflow.execution.process import Process -from sio3pack.workflow.execution.resource_group import ResourceGroupManager, ResourceGroup +from sio3pack.workflow.execution.resource_group import ResourceGroup, ResourceGroupManager class Task: @@ -13,18 +13,29 @@ def from_dict(cls, data: dict, workflow: "Workflow"): :param data: The dictionary to create the task from. :param workflow: The workflow the task belongs to. """ - if data['type'] == 'execution': + if data["type"] == "execution": return ExecutionTask.from_dict(data, workflow) - elif data['type'] == 'script': + elif data["type"] == "script": return ScriptTask.from_dict(data, workflow) else: raise ValueError(f"Unknown task type: {data['type']}") class ExecutionTask(Task): - def __init__(self, name: str, workflow: "Workflow", exclusive: bool = False, hard_time_limit: int = None, - extra_limit: int = None, output_register: int = None, input_register: int = None, pid_namespaces: int = 1, - processes: [Process] = None, pipes: [Pipe] = None, system_pipes: int = 3): + def __init__( + self, + name: str, + workflow: "Workflow", + exclusive: bool = False, + hard_time_limit: int = None, + extra_limit: int = None, + output_register: int = None, + input_register: int = None, + pid_namespaces: int = 1, + processes: [Process] = None, + pipes: [Pipe] = None, + system_pipes: int = 3, + ): """ Create a new execution task. :param name: The name of the task. @@ -64,14 +75,24 @@ def from_dict(cls, data: dict, workflow: "Workflow"): :param data: The dictionary to create the task from. :param workflow: The workflow the task belongs to. """ - task = cls(data['name'], workflow, data['exclusive'], data.get('hard_time_limit'), - output_register=data.get('output_register'), input_register=data.get('input_register'), - pid_namespaces=data['pid_namespaces'], pipes=data['pipes'], system_pipes=data.get('system_pipes', 3)) - task.filesystem_manager.from_dict(data['filesystems'], workflow) - task.mountnamespace_manager.from_dict(data['mount_namespaces']) - task.resource_group_manager.from_dict(data['resource_groups']) - task.processes = [Process.from_dict(process, task.mountnamespace_manager, task.resource_group_manager) - for process in data['processes']] + task = cls( + data["name"], + workflow, + data["exclusive"], + data.get("hard_time_limit"), + output_register=data.get("output_register"), + input_register=data.get("input_register"), + pid_namespaces=data["pid_namespaces"], + pipes=data["pipes"], + system_pipes=data.get("system_pipes", 3), + ) + task.filesystem_manager.from_dict(data["filesystems"], workflow) + task.mountnamespace_manager.from_dict(data["mount_namespaces"]) + task.resource_group_manager.from_dict(data["resource_groups"]) + task.processes = [ + Process.from_dict(process, task.mountnamespace_manager, task.resource_group_manager) + for process in data["processes"] + ] return task def to_dict(self): @@ -86,21 +107,21 @@ def to_dict(self): hard_time_limit += self.extra_limit res = { - 'name': self.name, - 'type': 'execution', - 'exclusive': self.exclusive, - 'hard_time_limit': hard_time_limit, - 'output_register': self.output_register, - 'pid_namespaces': self.pid_namespaces, - 'filesystems': self.filesystem_manager.to_dict(), - 'mount_namespaces': self.mountnamespace_manager.to_dict(), - 'pipes': [pipe.to_dict() for pipe in self.pipes], - 'system_pipes': self.system_pipes, - 'resource_groups': self.resource_group_manager.to_dict(), - 'processes': [process.to_dict() for process in self.processes], + "name": self.name, + "type": "execution", + "exclusive": self.exclusive, + "hard_time_limit": hard_time_limit, + "output_register": self.output_register, + "pid_namespaces": self.pid_namespaces, + "filesystems": self.filesystem_manager.to_dict(), + "mount_namespaces": self.mountnamespace_manager.to_dict(), + "pipes": [pipe.to_dict() for pipe in self.pipes], + "system_pipes": self.system_pipes, + "resource_groups": self.resource_group_manager.to_dict(), + "processes": [process.to_dict() for process in self.processes], } if self.input_register is not None: - res['input_register'] = self.input_register + res["input_register"] = self.input_register return res def add_filesystem(self, filesystem: Filesystem): @@ -113,10 +134,16 @@ def add_resource_group(self, resource_group: ResourceGroup): self.resource_group_manager.add(resource_group) - class ScriptTask(Task): - def __init__(self, name: str, workflow: "Workflow", reactive: bool = False, input_registers: [int] = None, - output_registers: [int] = None, script: str = None): + def __init__( + self, + name: str, + workflow: "Workflow", + reactive: bool = False, + input_registers: [int] = None, + output_registers: [int] = None, + script: str = None, + ): """ Create a new script task. :param name: The name of the task. @@ -143,18 +170,19 @@ def from_dict(cls, data: dict, workflow: "Workflow"): :param data: The dictionary to create the task from. :param workflow: The workflow the task belongs to. """ - return cls(data['name'], workflow, data['reactive'], data['input_registers'], data['output_registers'], - data['script']) + return cls( + data["name"], workflow, data["reactive"], data["input_registers"], data["output_registers"], data["script"] + ) def to_dict(self) -> dict: """ Convert the task to a dictionary. """ return { - 'name': self.name, - 'type': 'script', - 'reactive': self.reactive, - 'input_registers': self.input_registers, - 'output_registers': self.output_registers, - 'script': self.script + "name": self.name, + "type": "script", + "reactive": self.reactive, + "input_registers": self.input_registers, + "output_registers": self.output_registers, + "script": self.script, } diff --git a/src/sio3pack/workflow/workflow.py b/src/sio3pack/workflow/workflow.py index 4e16a3a..9f84778 100644 --- a/src/sio3pack/workflow/workflow.py +++ b/src/sio3pack/workflow/workflow.py @@ -13,13 +13,19 @@ def from_dict(cls, data: dict): Create a new workflow from a dictionary. :param data: The dictionary to create the workflow from. """ - workflow = cls(data['name'], data['external_objects'], data['observable_objects'], data['observable_registers']) - for task in data['tasks']: + workflow = cls(data["name"], data["external_objects"], data["observable_objects"], data["observable_registers"]) + for task in data["tasks"]: workflow.add_task(Task.from_dict(task, workflow)) return workflow - def __init__(self, name: str, external_objects: [str] = None, observable_objects: [str] = None, - observable_registers: int = 0, tasks: [Task] = None): + def __init__( + self, + name: str, + external_objects: [str] = None, + observable_objects: [str] = None, + observable_registers: int = 0, + tasks: [Task] = None, + ): """ Create a new workflow. Number of required registers is calculated automatically. :param name: The name of the workflow. @@ -41,7 +47,6 @@ def __init__(self, name: str, external_objects: [str] = None, observable_objects for obj in observable_objects or []: self.observable_objects.append(self.objects_manager.get_or_create_object(obj)) - def __str__(self): return f"" @@ -50,11 +55,11 @@ def to_dict(self) -> dict: Convert the workflow to a dictionary. """ res = { - 'name': self.name, - 'external_objects': [obj.handle for obj in self.external_objects], - 'observable_objects': [obj.handle for obj in self.observable_objects], - 'observable_registers': self.observable_registers, - 'tasks': [task.to_dict() for task in self.tasks] + "name": self.name, + "external_objects": [obj.handle for obj in self.external_objects], + "observable_objects": [obj.handle for obj in self.observable_objects], + "observable_registers": self.observable_registers, + "tasks": [task.to_dict() for task in self.tasks], } num_registers = 0 for task in res["tasks"]: diff --git a/tests/workflow/test_workflow.py b/tests/workflow/test_workflow.py index 385fd3c..3ea0bc6 100644 --- a/tests/workflow/test_workflow.py +++ b/tests/workflow/test_workflow.py @@ -1,5 +1,6 @@ -import os import json +import os + from deepdiff import DeepDiff from sio3pack.workflow import Workflow @@ -10,7 +11,7 @@ def test_workflow_parsing(): for file in os.listdir(workflows_dir): if not os.path.splitext(file)[1] == ".json": continue - print(f'Parsing {file}') + print(f"Parsing {file}") data = json.load(open(os.path.join(workflows_dir, file))) workflow = Workflow.from_dict(data) assert workflow.to_dict() == data, f"Failed for {file}. Diff: {DeepDiff(data, workflow.to_dict())}" From c0a4434ca1eb3698120dc3705f47bece2a217a39 Mon Sep 17 00:00:00 2001 From: Tomasz Kwiatkowski Date: Mon, 17 Mar 2025 10:45:26 +0100 Subject: [PATCH 4/7] Rename to/from_json --- src/sio3pack/packages/sinolpack/model.py | 2 +- .../workflow/execution/filesystems.py | 24 ++++++------- .../workflow/execution/mount_namespace.py | 20 +++++------ src/sio3pack/workflow/execution/pipe.py | 4 +-- src/sio3pack/workflow/execution/process.py | 4 +-- .../workflow/execution/resource_group.py | 12 +++---- src/sio3pack/workflow/tasks.py | 34 +++++++++---------- src/sio3pack/workflow/workflow.py | 8 ++--- src/sio3pack/workflow/workflow_manager.py | 2 +- tests/workflow/test_workflow.py | 4 +-- 10 files changed, 57 insertions(+), 57 deletions(-) diff --git a/src/sio3pack/packages/sinolpack/model.py b/src/sio3pack/packages/sinolpack/model.py index 5fde23c..3382d84 100644 --- a/src/sio3pack/packages/sinolpack/model.py +++ b/src/sio3pack/packages/sinolpack/model.py @@ -119,7 +119,7 @@ def _from_db(self, problem_id: int): def _default_graph_manager(self) -> WorkflowManager: return WorkflowManager( { - "unpack": Workflow.from_dict( + "unpack": Workflow.from_json( { "name": "unpack", # ... diff --git a/src/sio3pack/workflow/execution/filesystems.py b/src/sio3pack/workflow/execution/filesystems.py index 5d5c18c..2ed893b 100644 --- a/src/sio3pack/workflow/execution/filesystems.py +++ b/src/sio3pack/workflow/execution/filesystems.py @@ -26,7 +26,7 @@ def __str__(self): return f'' @classmethod - def from_dict(cls, data: dict, id: int, workflow: "Workflow"): + def from_json(cls, data: dict, id: int, workflow: "Workflow"): """ Create a new image filesystem from a dictionary. :param data: The dictionary to create the image filesystem from. @@ -35,7 +35,7 @@ def from_dict(cls, data: dict, id: int, workflow: "Workflow"): """ return cls(id, data["image"], data["path"]) - def to_dict(self) -> dict: + def to_json(self) -> dict: """ Convert the image filesystem to a dictionary. """ @@ -54,7 +54,7 @@ def __str__(self): return "" @classmethod - def from_dict(cls, data: dict, id: int, workflow: "Workflow"): + def from_json(cls, data: dict, id: int, workflow: "Workflow"): """ Create a new empty filesystem from a dictionary. :param data: The dictionary to create the empty filesystem from. @@ -63,7 +63,7 @@ def from_dict(cls, data: dict, id: int, workflow: "Workflow"): """ return cls(id) - def to_dict(self) -> dict: + def to_json(self) -> dict: """ Convert the empty filesystem to a dictionary. """ @@ -85,7 +85,7 @@ def __str__(self): return f"" @classmethod - def from_dict(cls, data: dict, id: int, workflow: "Workflow"): + def from_json(cls, data: dict, id: int, workflow: "Workflow"): """ Create a new object filesystem from a dictionary. :param data: The dictionary to create the object filesystem from. @@ -94,7 +94,7 @@ def from_dict(cls, data: dict, id: int, workflow: "Workflow"): """ return cls(id, workflow.objects_manager.get_or_create_object(data["handle"]), workflow) - def to_dict(self) -> dict: + def to_json(self) -> dict: """ Convert the object filesystem to a dictionary. """ @@ -114,7 +114,7 @@ def __init__(self, task: "Task"): self.id = 0 self.task = task - def from_dict(self, data: [dict], workflow: "Workflow"): + def from_json(self, data: [dict], workflow: "Workflow"): """ Create filesystems from a list of dictionaries. :param data: The list of dictionaries to create the filesystems from. @@ -122,18 +122,18 @@ def from_dict(self, data: [dict], workflow: "Workflow"): """ for fs in data: if fs["type"] == "image": - self.filesystems.append(ImageFilesystem.from_dict(fs, self.id, workflow)) + self.filesystems.append(ImageFilesystem.from_json(fs, self.id, workflow)) elif fs["type"] == "empty": - self.filesystems.append(EmptyFilesystem.from_dict(fs, self.id, workflow)) + self.filesystems.append(EmptyFilesystem.from_json(fs, self.id, workflow)) elif fs["type"] == "object": - self.filesystems.append(ObjectFilesystem.from_dict(fs, self.id, workflow)) + self.filesystems.append(ObjectFilesystem.from_json(fs, self.id, workflow)) self.id += 1 - def to_dict(self) -> [dict]: + def to_json(self) -> [dict]: """ Convert the filesystems to a list of dictionaries. """ - return [fs.to_dict() for fs in self.filesystems] + return [fs.to_json() for fs in self.filesystems] def get_by_id(self, id: int) -> Filesystem: """ diff --git a/src/sio3pack/workflow/execution/mount_namespace.py b/src/sio3pack/workflow/execution/mount_namespace.py index fe02318..3fc8dc6 100644 --- a/src/sio3pack/workflow/execution/mount_namespace.py +++ b/src/sio3pack/workflow/execution/mount_namespace.py @@ -8,7 +8,7 @@ def __init__(self, source: Filesystem, target: str, writable: bool = False): self.writable = writable @classmethod - def from_dict(cls, data: dict, filesystem_manager: FilesystemManager): + def from_json(cls, data: dict, filesystem_manager: FilesystemManager): """ Create a new mountpoint from a dictionary. :param data: The dictionary to create the mountpoint from. @@ -18,7 +18,7 @@ def from_dict(cls, data: dict, filesystem_manager: FilesystemManager): return cls(data["source"], data["target"], data["writable"]) return cls(filesystem_manager.get_by_id(int(data["source"])), data["target"], data["writable"]) - def to_dict(self) -> dict: + def to_json(self) -> dict: """ Convert the mountpoint to a dictionary. """ @@ -34,7 +34,7 @@ def __init__(self, id: int, mountpoints: [Mountpoint] = None, root: int = 0): self.id = id @classmethod - def from_dict(cls, data: dict, id: int, filesystem_manager: FilesystemManager): + def from_json(cls, data: dict, id: int, filesystem_manager: FilesystemManager): """ Create a new mount namespace from a dictionary. :param data: The dictionary to create the mount namespace from. @@ -43,15 +43,15 @@ def from_dict(cls, data: dict, id: int, filesystem_manager: FilesystemManager): """ return cls( id, - [Mountpoint.from_dict(mountpoint, filesystem_manager) for mountpoint in data["mountpoints"]], + [Mountpoint.from_json(mountpoint, filesystem_manager) for mountpoint in data["mountpoints"]], data["root"], ) - def to_dict(self) -> dict: + def to_json(self) -> dict: """ Convert the mount namespace to a dictionary. """ - return {"mountpoints": [mountpoint.to_dict() for mountpoint in self.mountpoints], "root": self.root} + return {"mountpoints": [mountpoint.to_json() for mountpoint in self.mountpoints], "root": self.root} class MountNamespaceManager: @@ -65,14 +65,14 @@ def __init__(self, task: "Task", filesystem_manager: FilesystemManager): self.task = task self.filesystem_manager = filesystem_manager - def from_dict(self, data: [dict]): + def from_json(self, data: [dict]): """ Create a new mount namespace manager from a list of dictionaries. :param data: The list of dictionaries to create the mount namespace manager from. :param filesystem_manager: The filesystem manager to use. """ for mount_namespace in data: - self.add(MountNamespace.from_dict(mount_namespace, self.id, self.filesystem_manager)) + self.add(MountNamespace.from_json(mount_namespace, self.id, self.filesystem_manager)) self.id += 1 def add(self, mount_namespace: MountNamespace): @@ -89,8 +89,8 @@ def get_by_id(self, id: int) -> MountNamespace: """ return self.mount_namespaces[id] - def to_dict(self) -> [dict]: + def to_json(self) -> [dict]: """ Convert the mount namespace manager to a dictionary. """ - return [mount_namespace.to_dict() for mount_namespace in self.mount_namespaces] + return [mount_namespace.to_json() for mount_namespace in self.mount_namespaces] diff --git a/src/sio3pack/workflow/execution/pipe.py b/src/sio3pack/workflow/execution/pipe.py index e74df5e..0b2e78a 100644 --- a/src/sio3pack/workflow/execution/pipe.py +++ b/src/sio3pack/workflow/execution/pipe.py @@ -11,14 +11,14 @@ def __init__(self, buffer_size: int = 1048576, file_buffer_size: int = 107374182 self.limit = limit @classmethod - def from_dict(cls, data: dict): + def from_json(cls, data: dict): """ Create a new pipe config from a dictionary. :param data: The dictionary to create the pipe config from. """ return cls(data["buffer_size"], data["file_buffer_size"], data["limit"]) - def to_dict(self) -> dict: + def to_json(self) -> dict: """ Convert the pipe config to a dictionary. """ diff --git a/src/sio3pack/workflow/execution/process.py b/src/sio3pack/workflow/execution/process.py index b871b1f..209f390 100644 --- a/src/sio3pack/workflow/execution/process.py +++ b/src/sio3pack/workflow/execution/process.py @@ -30,7 +30,7 @@ def __init__( self.pid_namespace = pid_namespace self.working_directory = working_directory - def to_dict(self): + def to_json(self): return { "arguments": self.arguments, "environment": [f"{key}={value}" for key, value in self.environment.items()], @@ -42,7 +42,7 @@ def to_dict(self): } @classmethod - def from_dict( + def from_json( cls, data: dict, mountnamespace_manager: MountNamespaceManager, resource_group_manager: ResourceGroupManager ): env = {} diff --git a/src/sio3pack/workflow/execution/resource_group.py b/src/sio3pack/workflow/execution/resource_group.py index 12f2b41..f32c658 100644 --- a/src/sio3pack/workflow/execution/resource_group.py +++ b/src/sio3pack/workflow/execution/resource_group.py @@ -31,7 +31,7 @@ def __init__( self.time_limit = time_limit @classmethod - def from_dict(cls, data: dict, id: int): + def from_json(cls, data: dict, id: int): """ Create a new resource group from a dictionary. :param data: The dictionary to create the resource group from. @@ -48,7 +48,7 @@ def from_dict(cls, data: dict, id: int): data["time_limit"], ) - def to_dict(self) -> dict: + def to_json(self) -> dict: """ Convert the resource group to a dictionary. """ @@ -87,19 +87,19 @@ def get_by_id(self, id: int) -> ResourceGroup: """ return self.resource_groups[id] - def to_dict(self) -> [dict]: + def to_json(self) -> [dict]: """ Convert the resource group manager to a dictionary. """ - return [resource_group.to_dict() for resource_group in self.resource_groups] + return [resource_group.to_json() for resource_group in self.resource_groups] - def from_dict(self, data: [dict]): + def from_json(self, data: [dict]): """ Create a new resource group manager from a list of dictionaries. :param data: The list of dictionaries to create the resource group manager from. """ for resource_group in data: - self.add(ResourceGroup.from_dict(resource_group, self.id)) + self.add(ResourceGroup.from_json(resource_group, self.id)) self.id += 1 def all(self): diff --git a/src/sio3pack/workflow/tasks.py b/src/sio3pack/workflow/tasks.py index 7121d3a..8046cf5 100644 --- a/src/sio3pack/workflow/tasks.py +++ b/src/sio3pack/workflow/tasks.py @@ -7,16 +7,16 @@ class Task: @classmethod - def from_dict(cls, data: dict, workflow: "Workflow"): + def from_json(cls, data: dict, workflow: "Workflow"): """ Create a new task from a dictionary. :param data: The dictionary to create the task from. :param workflow: The workflow the task belongs to. """ if data["type"] == "execution": - return ExecutionTask.from_dict(data, workflow) + return ExecutionTask.from_json(data, workflow) elif data["type"] == "script": - return ScriptTask.from_dict(data, workflow) + return ScriptTask.from_json(data, workflow) else: raise ValueError(f"Unknown task type: {data['type']}") @@ -61,7 +61,7 @@ def __init__( self.pid_namespaces = pid_namespaces self.processes = processes or [] self.system_pipes = system_pipes - self.pipes = [Pipe.from_dict(pipe) for pipe in pipes] if pipes else [] + self.pipes = [Pipe.from_json(pipe) for pipe in pipes] if pipes else [] self.extra_limit = extra_limit self.filesystem_manager = FilesystemManager(self) @@ -69,7 +69,7 @@ def __init__( self.resource_group_manager = ResourceGroupManager(self) @classmethod - def from_dict(cls, data: dict, workflow: "Workflow"): + def from_json(cls, data: dict, workflow: "Workflow"): """ Create a new execution task from a dictionary. :param data: The dictionary to create the task from. @@ -86,16 +86,16 @@ def from_dict(cls, data: dict, workflow: "Workflow"): pipes=data["pipes"], system_pipes=data.get("system_pipes", 3), ) - task.filesystem_manager.from_dict(data["filesystems"], workflow) - task.mountnamespace_manager.from_dict(data["mount_namespaces"]) - task.resource_group_manager.from_dict(data["resource_groups"]) + task.filesystem_manager.from_json(data["filesystems"], workflow) + task.mountnamespace_manager.from_json(data["mount_namespaces"]) + task.resource_group_manager.from_json(data["resource_groups"]) task.processes = [ - Process.from_dict(process, task.mountnamespace_manager, task.resource_group_manager) + Process.from_json(process, task.mountnamespace_manager, task.resource_group_manager) for process in data["processes"] ] return task - def to_dict(self): + def to_json(self): """ Convert the task to a dictionary. """ @@ -113,12 +113,12 @@ def to_dict(self): "hard_time_limit": hard_time_limit, "output_register": self.output_register, "pid_namespaces": self.pid_namespaces, - "filesystems": self.filesystem_manager.to_dict(), - "mount_namespaces": self.mountnamespace_manager.to_dict(), - "pipes": [pipe.to_dict() for pipe in self.pipes], + "filesystems": self.filesystem_manager.to_json(), + "mount_namespaces": self.mountnamespace_manager.to_json(), + "pipes": [pipe.to_json() for pipe in self.pipes], "system_pipes": self.system_pipes, - "resource_groups": self.resource_group_manager.to_dict(), - "processes": [process.to_dict() for process in self.processes], + "resource_groups": self.resource_group_manager.to_json(), + "processes": [process.to_json() for process in self.processes], } if self.input_register is not None: res["input_register"] = self.input_register @@ -164,7 +164,7 @@ def __str__(self): return f"" @classmethod - def from_dict(cls, data: dict, workflow: "Workflow"): + def from_json(cls, data: dict, workflow: "Workflow"): """ Create a new script task from a dictionary. :param data: The dictionary to create the task from. @@ -174,7 +174,7 @@ def from_dict(cls, data: dict, workflow: "Workflow"): data["name"], workflow, data["reactive"], data["input_registers"], data["output_registers"], data["script"] ) - def to_dict(self) -> dict: + def to_json(self) -> dict: """ Convert the task to a dictionary. """ diff --git a/src/sio3pack/workflow/workflow.py b/src/sio3pack/workflow/workflow.py index 9f84778..01bbd55 100644 --- a/src/sio3pack/workflow/workflow.py +++ b/src/sio3pack/workflow/workflow.py @@ -8,14 +8,14 @@ class Workflow: """ @classmethod - def from_dict(cls, data: dict): + def from_json(cls, data: dict): """ Create a new workflow from a dictionary. :param data: The dictionary to create the workflow from. """ workflow = cls(data["name"], data["external_objects"], data["observable_objects"], data["observable_registers"]) for task in data["tasks"]: - workflow.add_task(Task.from_dict(task, workflow)) + workflow.add_task(Task.from_json(task, workflow)) return workflow def __init__( @@ -50,7 +50,7 @@ def __init__( def __str__(self): return f"" - def to_dict(self) -> dict: + def to_json(self) -> dict: """ Convert the workflow to a dictionary. """ @@ -59,7 +59,7 @@ def to_dict(self) -> dict: "external_objects": [obj.handle for obj in self.external_objects], "observable_objects": [obj.handle for obj in self.observable_objects], "observable_registers": self.observable_registers, - "tasks": [task.to_dict() for task in self.tasks], + "tasks": [task.to_json() for task in self.tasks], } num_registers = 0 for task in res["tasks"]: diff --git a/src/sio3pack/workflow/workflow_manager.py b/src/sio3pack/workflow/workflow_manager.py index 5ba9d1c..1757e1c 100644 --- a/src/sio3pack/workflow/workflow_manager.py +++ b/src/sio3pack/workflow/workflow_manager.py @@ -10,7 +10,7 @@ def from_file(cls, file: File): graphs = {} content = json.loads(file.read()) for name, graph in content.items(): - graphs[name] = Workflow.from_dict(graph) + graphs[name] = Workflow.from_json(graph) return cls(graphs) def __init__(self, graphs: dict[str, Workflow]): diff --git a/tests/workflow/test_workflow.py b/tests/workflow/test_workflow.py index 3ea0bc6..8f8cf7c 100644 --- a/tests/workflow/test_workflow.py +++ b/tests/workflow/test_workflow.py @@ -13,5 +13,5 @@ def test_workflow_parsing(): continue print(f"Parsing {file}") data = json.load(open(os.path.join(workflows_dir, file))) - workflow = Workflow.from_dict(data) - assert workflow.to_dict() == data, f"Failed for {file}. Diff: {DeepDiff(data, workflow.to_dict())}" + workflow = Workflow.from_json(data) + assert workflow.to_json() == data, f"Failed for {file}. Diff: {DeepDiff(data, workflow.to_json())}" From 25491998b8e0802138cdcaa26f37ea677d2554a8 Mon Sep 17 00:00:00 2001 From: Tomasz Kwiatkowski Date: Mon, 17 Mar 2025 10:46:09 +0100 Subject: [PATCH 5/7] Unused imports --- src/sio3pack/visualizer/__init__.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/sio3pack/visualizer/__init__.py b/src/sio3pack/visualizer/__init__.py index 46338fe..9c488e6 100644 --- a/src/sio3pack/visualizer/__init__.py +++ b/src/sio3pack/visualizer/__init__.py @@ -1,6 +1,3 @@ -from django.core.files.locks import kernel32 -from django.shortcuts import render - try: import dash import dash_cytoscape as cyto From 19a45f57f020f07c552a05b0340254a4fe21bcaa Mon Sep 17 00:00:00 2001 From: Tomasz Kwiatkowski Date: Mon, 17 Mar 2025 10:46:43 +0100 Subject: [PATCH 6/7] Fix example run workflow --- example_workflows/run.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example_workflows/run.json b/example_workflows/run.json index b1e0878..10d4da4 100644 --- a/example_workflows/run.json +++ b/example_workflows/run.json @@ -15,7 +15,7 @@ "sol.e" ], "observable_objects": [], - "observable_registers": 0, + "observable_registers": 1, "registers": 19, "tasks": [ { From 06f218b8579f976d103598fefda8f8a40a8e1441 Mon Sep 17 00:00:00 2001 From: Tomasz Kwiatkowski Date: Mon, 17 Mar 2025 11:17:22 +0100 Subject: [PATCH 7/7] Improve typing --- .../workflow/execution/filesystems.py | 22 ++++++++++++++++--- .../workflow/execution/mount_namespace.py | 8 +++---- src/sio3pack/workflow/execution/process.py | 4 ++-- .../workflow/execution/resource_group.py | 8 +++---- src/sio3pack/workflow/tasks.py | 8 +++---- src/sio3pack/workflow/workflow.py | 6 ++--- 6 files changed, 36 insertions(+), 20 deletions(-) diff --git a/src/sio3pack/workflow/execution/filesystems.py b/src/sio3pack/workflow/execution/filesystems.py index 2ed893b..c91b954 100644 --- a/src/sio3pack/workflow/execution/filesystems.py +++ b/src/sio3pack/workflow/execution/filesystems.py @@ -9,6 +9,22 @@ def __init__(self, id: int): """ self.id = id + @classmethod + def from_json(cls, data: dict, id: int, workflow: "Workflow"): + """ + Create a new filesystem from a dictionary. + :param data: The dictionary to create the filesystem from. + :param id: The id of the filesystem. + :param workflow: The workflow the filesystem belongs to. + """ + return NotImplementedError() + + def to_json(self) -> dict: + """ + Convert the filesystem to a dictionary. + """ + return NotImplementedError() + class ImageFilesystem(Filesystem): def __init__(self, id: int, image: str, path: str = None): @@ -110,11 +126,11 @@ def __init__(self, task: "Task"): Create a new filesystem manager. :param task: The task the filesystem manager belongs to. """ - self.filesystems: [Filesystem] = [] + self.filesystems: list[Filesystem] = [] self.id = 0 self.task = task - def from_json(self, data: [dict], workflow: "Workflow"): + def from_json(self, data: list[dict], workflow: "Workflow"): """ Create filesystems from a list of dictionaries. :param data: The list of dictionaries to create the filesystems from. @@ -129,7 +145,7 @@ def from_json(self, data: [dict], workflow: "Workflow"): self.filesystems.append(ObjectFilesystem.from_json(fs, self.id, workflow)) self.id += 1 - def to_json(self) -> [dict]: + def to_json(self) -> list[dict]: """ Convert the filesystems to a list of dictionaries. """ diff --git a/src/sio3pack/workflow/execution/mount_namespace.py b/src/sio3pack/workflow/execution/mount_namespace.py index 3fc8dc6..e71e3be 100644 --- a/src/sio3pack/workflow/execution/mount_namespace.py +++ b/src/sio3pack/workflow/execution/mount_namespace.py @@ -28,7 +28,7 @@ def to_json(self) -> dict: class MountNamespace: - def __init__(self, id: int, mountpoints: [Mountpoint] = None, root: int = 0): + def __init__(self, id: int, mountpoints: list[Mountpoint] = None, root: int = 0): self.mountpoints = mountpoints or [] self.root = root self.id = id @@ -60,12 +60,12 @@ def __init__(self, task: "Task", filesystem_manager: FilesystemManager): Create a new mount namespace manager. :param task: The task the mount namespace manager belongs to. """ - self.mount_namespaces: [MountNamespace] = [] + self.mount_namespaces: list[MountNamespace] = [] self.id = 0 self.task = task self.filesystem_manager = filesystem_manager - def from_json(self, data: [dict]): + def from_json(self, data: list[dict]): """ Create a new mount namespace manager from a list of dictionaries. :param data: The list of dictionaries to create the mount namespace manager from. @@ -89,7 +89,7 @@ def get_by_id(self, id: int) -> MountNamespace: """ return self.mount_namespaces[id] - def to_json(self) -> [dict]: + def to_json(self) -> list[dict]: """ Convert the mount namespace manager to a dictionary. """ diff --git a/src/sio3pack/workflow/execution/process.py b/src/sio3pack/workflow/execution/process.py index 209f390..506ed26 100644 --- a/src/sio3pack/workflow/execution/process.py +++ b/src/sio3pack/workflow/execution/process.py @@ -5,7 +5,7 @@ class Process: def __init__( self, - arguments: [str] = None, + arguments: list[str] = None, environment: dict = None, image: str = "", mount_namespace: MountNamespace = None, @@ -30,7 +30,7 @@ def __init__( self.pid_namespace = pid_namespace self.working_directory = working_directory - def to_json(self): + def to_json(self) -> dict: return { "arguments": self.arguments, "environment": [f"{key}={value}" for key, value in self.environment.items()], diff --git a/src/sio3pack/workflow/execution/resource_group.py b/src/sio3pack/workflow/execution/resource_group.py index f32c658..fbeb72d 100644 --- a/src/sio3pack/workflow/execution/resource_group.py +++ b/src/sio3pack/workflow/execution/resource_group.py @@ -69,7 +69,7 @@ def __init__(self, task: "Task"): Create a new resource group manager. :param task: The task the resource group manager belongs to. """ - self.resource_groups: [ResourceGroup] = [] + self.resource_groups: list[ResourceGroup] = [] self.id = 0 def add(self, resource_group: ResourceGroup): @@ -87,13 +87,13 @@ def get_by_id(self, id: int) -> ResourceGroup: """ return self.resource_groups[id] - def to_json(self) -> [dict]: + def to_json(self) -> list[dict]: """ Convert the resource group manager to a dictionary. """ return [resource_group.to_json() for resource_group in self.resource_groups] - def from_json(self, data: [dict]): + def from_json(self, data: list[dict]): """ Create a new resource group manager from a list of dictionaries. :param data: The list of dictionaries to create the resource group manager from. @@ -102,7 +102,7 @@ def from_json(self, data: [dict]): self.add(ResourceGroup.from_json(resource_group, self.id)) self.id += 1 - def all(self): + def all(self) -> list[ResourceGroup]: """ Get all resource groups. """ diff --git a/src/sio3pack/workflow/tasks.py b/src/sio3pack/workflow/tasks.py index 8046cf5..fed25cd 100644 --- a/src/sio3pack/workflow/tasks.py +++ b/src/sio3pack/workflow/tasks.py @@ -32,8 +32,8 @@ def __init__( output_register: int = None, input_register: int = None, pid_namespaces: int = 1, - processes: [Process] = None, - pipes: [Pipe] = None, + processes: list[Process] = None, + pipes: list[Pipe] = None, system_pipes: int = 3, ): """ @@ -140,8 +140,8 @@ def __init__( name: str, workflow: "Workflow", reactive: bool = False, - input_registers: [int] = None, - output_registers: [int] = None, + input_registers: list[int] = None, + output_registers: list[int] = None, script: str = None, ): """ diff --git a/src/sio3pack/workflow/workflow.py b/src/sio3pack/workflow/workflow.py index 01bbd55..5c10bb6 100644 --- a/src/sio3pack/workflow/workflow.py +++ b/src/sio3pack/workflow/workflow.py @@ -21,10 +21,10 @@ def from_json(cls, data: dict): def __init__( self, name: str, - external_objects: [str] = None, - observable_objects: [str] = None, + external_objects: list[str] = None, + observable_objects: list[str] = None, observable_registers: int = 0, - tasks: [Task] = None, + tasks: list[Task] = None, ): """ Create a new workflow. Number of required registers is calculated automatically.