diff --git a/example_graphs/example.json b/example_workflows/example.json similarity index 95% rename from example_graphs/example.json rename to example_workflows/example.json index 8f0b865..7c53951 100644 --- a/example_graphs/example.json +++ b/example_workflows/example.json @@ -1,4 +1,5 @@ { + "name": "Example workflow", "external_objects" : [ "example-object" ], @@ -7,6 +8,7 @@ "registers" : 3, "tasks" : [ { + "name": "Example script", "input_registers" : [ 1 ], @@ -19,6 +21,7 @@ "type" : "script" }, { + "name": "Example execution", "exclusive" : true, "filesystems" : [ { diff --git a/example_graphs/run.json b/example_workflows/run.json similarity index 96% rename from example_graphs/run.json rename to example_workflows/run.json index b17a2a1..10d4da4 100644 --- a/example_graphs/run.json +++ b/example_workflows/run.json @@ -1,4 +1,5 @@ { + "name": "Run the tests and grade the results", "external_objects": [ "abc0a.in", "abc0b.in", @@ -14,7 +15,7 @@ "sol.e" ], "observable_objects": [], - "observable_registers": 0, + "observable_registers": 1, "registers": 19, "tasks": [ { @@ -35,7 +36,7 @@ } ], "hard_time_limit": 2137, - "mount_namespace": [ + "mount_namespaces": [ { "mountpoints": [ { @@ -67,6 +68,7 @@ "/exe", "" }, { "type": "script", diff --git a/setup.cfg b/setup.cfg index b2890a1..a9668f6 100644 --- a/setup.cfg +++ b/setup.cfg @@ -32,6 +32,7 @@ tests = pytest pytest-cov pytest-xdist + deepdiff django_tests = pytest-django django = diff --git a/src/sio3pack/graph/__init__.py b/src/sio3pack/graph/__init__.py deleted file mode 100644 index 79efe46..0000000 --- a/src/sio3pack/graph/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from sio3pack.graph.graph import Graph -from sio3pack.graph.graph_manager import GraphManager -from sio3pack.graph.graph_op import GraphOperation diff --git a/src/sio3pack/graph/graph.py b/src/sio3pack/graph/graph.py deleted file mode 100644 index 370cbb8..0000000 --- a/src/sio3pack/graph/graph.py +++ /dev/null @@ -1,17 +0,0 @@ -class Graph: - """ - A class to represent a job graph. - """ - - @classmethod - def from_dict(cls, data: dict): - raise NotImplementedError() - - def __init__(self, name: str): - self.name = name - - def get_prog_files(self) -> list[str]: - """ - Get all program files in the graph. - """ - raise NotImplementedError() diff --git a/src/sio3pack/graph/node.py b/src/sio3pack/graph/node.py deleted file mode 100644 index 51fc2ef..0000000 --- a/src/sio3pack/graph/node.py +++ /dev/null @@ -1,7 +0,0 @@ -class Node: - """ - A class to represent a node in a graph. - """ - - def __init__(self, name: str): - self.name = name diff --git a/src/sio3pack/packages/package/model.py b/src/sio3pack/packages/package/model.py index cd78727..eca39ea 100644 --- a/src/sio3pack/packages/package/model.py +++ b/src/sio3pack/packages/package/model.py @@ -2,15 +2,14 @@ from typing import Any from sio3pack import LocalFile +from sio3pack.exceptions import SIO3PackException from sio3pack.files import File -from sio3pack.graph import Graph, GraphOperation from sio3pack.packages.exceptions import UnknownPackageType from sio3pack.packages.package.handler import NoDjangoHandler from sio3pack.test import Test from sio3pack.utils.archive import Archive from sio3pack.utils.classinit import RegisteredSubclassesBase - -from sio3pack.exceptions import SIO3PackException +from sio3pack.workflow import Workflow, WorkflowOperation def wrap_exceptions(func): @@ -20,7 +19,7 @@ def decorator(*args, **kwargs): try: return func(*args, **kwargs) except SIO3PackException: - raise # Do not wrap SIO3PackExceptions again + raise # Do not wrap SIO3PackExceptions again except Exception as e: raise SIO3PackException(f"SIO3Pack raised an exception in {func.__name__} function.", e) @@ -135,15 +134,15 @@ def get_test(self, test_id: str) -> Test: pass @wrap_exceptions - def get_unpack_graph(self) -> GraphOperation | None: + def get_unpack_graph(self) -> WorkflowOperation | None: pass @wrap_exceptions - def get_run_graph(self, file: File, tests: list[Test] | None = None) -> GraphOperation | None: + def get_run_graph(self, file: File, tests: list[Test] | None = None) -> WorkflowOperation | None: pass @wrap_exceptions - def get_save_outs_graph(self, tests: list[Test] | None = None) -> GraphOperation | None: + def get_save_outs_graph(self, tests: list[Test] | None = None) -> WorkflowOperation | None: pass @wrap_exceptions diff --git a/src/sio3pack/packages/sinolpack/model.py b/src/sio3pack/packages/sinolpack/model.py index 9920377..3382d84 100644 --- a/src/sio3pack/packages/sinolpack/model.py +++ b/src/sio3pack/packages/sinolpack/model.py @@ -5,12 +5,12 @@ import yaml from sio3pack.files import File, LocalFile -from sio3pack.graph import Graph, GraphManager, GraphOperation from sio3pack.packages.exceptions import ImproperlyConfigured from sio3pack.packages.package import Package from sio3pack.packages.sinolpack.enums import ModelSolutionKind from sio3pack.util import naturalsort_key from sio3pack.utils.archive import Archive, UnrecognizedArchiveFormat +from sio3pack.workflow import Workflow, WorkflowManager, WorkflowOperation class Sinolpack(Package): @@ -101,8 +101,8 @@ def _from_file(self, file: LocalFile, django_settings=None): self.rootdir = file.path try: - graph_file = self.get_in_root("graph.json") - self.graph_manager = GraphManager.from_file(graph_file) + graph_file = self.get_in_root("workflow.json") + self.graph_manager = WorkflowManager.from_file(graph_file) except FileNotFoundError: self.has_custom_graph = False @@ -116,10 +116,10 @@ def _from_db(self, problem_id: int): if not self.django_enabled: raise ImproperlyConfigured("sio3pack is not installed with Django support.") - def _default_graph_manager(self) -> GraphManager: - return GraphManager( + def _default_graph_manager(self) -> WorkflowManager: + return WorkflowManager( { - "unpack": Graph.from_dict( + "unpack": Workflow.from_json( { "name": "unpack", # ... @@ -178,7 +178,7 @@ def _process_package(self): self._process_attachments() if not self.has_custom_graph: - # Create the graph with processed files. + # Create the workflow with processed files. # TODO: Uncomment this line when Graph will work. # self.graph_manager = self._default_graph_manager() pass @@ -283,8 +283,8 @@ def sort_key(model_solution): def _process_prog_files(self): """ Process all files in the problem's program directory that are used. - Saves all models solution files. If the problem has a custom graph file, - takes the files that are used in the graph. Otherwise, ingen, inwer and + Saves all models solution files. If the problem has a custom workflow file, + takes the files that are used in the workflow. Otherwise, ingen, inwer and files in `extra_compilation_files` and `extra_execution_files` from config are saved. """ @@ -374,9 +374,9 @@ def _process_attachments(self): if os.path.isfile(os.path.join(attachments_dir, attachment)) ] - def get_unpack_graph(self) -> GraphOperation | None: + def get_unpack_graph(self) -> WorkflowOperation | None: try: - return GraphOperation( + return WorkflowOperation( self.graph_manager.get("unpack"), True, self._unpack_return_data, diff --git a/src/sio3pack/visualizer/__init__.py b/src/sio3pack/visualizer/__init__.py index 8d44c80..9c488e6 100644 --- a/src/sio3pack/visualizer/__init__.py +++ b/src/sio3pack/visualizer/__init__.py @@ -1,21 +1,18 @@ -from django.core.files.locks import kernel32 -from django.shortcuts import render - try: import dash import dash_cytoscape as cyto - from dash import html, Output, Input + from dash import Input, Output, html except ImportError: raise ImportError("Please install the 'dash' and 'dash-cytoscape' packages to use the visualizer.") +import json import os import sys -import json def main(): if len(sys.argv) != 2: - print("Usage: python -m sio3pack.visualizer ") + print("Usage: python -m sio3pack.visualizer ") sys.exit(1) file_path = sys.argv[1] if not file_path.endswith(".json"): @@ -32,32 +29,28 @@ def main(): # Create nodes for observable registers. for register in range(graph["observable_registers"]): - elements.append({ - "data": { - "id": f"obs_register_{register}", - "label": f"Observable register {register}", - "info": "This is an observable register. It's an output of a workflow." - }, - "classes": "register" - }) + elements.append( + { + "data": { + "id": f"obs_register_{register}", + "label": f"Observable register {register}", + "info": "This is an observable register. It's an output of a workflow.", + }, + "classes": "register", + } + ) ins[register] = [f"obs_register_{register}"] rendered_registers.add(register) - script_i = 0 execution_i = 0 # First pass to create nodes and mark input registers. for task in graph["tasks"]: if task["type"] == "script": id = f"script_{script_i}" - elements.append({ - "data": { - "id": id, - "label": task.get("name", f"Script {script_i}"), - "info": task - }, - "classes": "script" - }) + elements.append( + {"data": {"id": id, "label": task.get("name", f"Script {script_i}"), "info": task}, "classes": "script"} + ) if task["reactive"]: elements[-1]["classes"] += " reactive" script_i += 1 @@ -67,14 +60,12 @@ def main(): ins[register].append(id) elif task["type"] == "execution": id = f"execution_{execution_i}" - elements.append({ - "data": { - "id": id, - "label": task.get("name", f"Execution {execution_i}"), - "info": task - }, - "classes": "execution" - }) + elements.append( + { + "data": {"id": id, "label": task.get("name", f"Execution {execution_i}"), "info": task}, + "classes": "execution", + } + ) if task["exclusive"]: elements[-1]["classes"] += " exclusive" @@ -99,31 +90,37 @@ def main(): for register in registers: if register not in ins: - elements.append({ - "data": { - "id": f"register_{register}", - "label": f"Register {register}", - "info": f"This is a register. It's an intermediate value in a workflow." - }, - "classes": "register" - }) + elements.append( + { + "data": { + "id": f"register_{register}", + "label": f"Register {register}", + "info": f"This is a register. It's an intermediate value in a workflow.", + }, + "classes": "register", + } + ) ins[register] = [f"register_{register}"] rendered_registers.add(register) for id in ins[register]: if task["type"] == "script": - elements.append({ - "data": { - "source": f"script_{script_i}", - "target": id, + elements.append( + { + "data": { + "source": f"script_{script_i}", + "target": id, + } } - }) + ) elif task["type"] == "execution": - elements.append({ - "data": { - "source": f"execution_{execution_i}", - "target": id, + elements.append( + { + "data": { + "source": f"execution_{execution_i}", + "target": id, + } } - }) + ) if register not in rendered_registers: elements[-1]["data"]["label"] = f"via register {register}" @@ -133,70 +130,99 @@ def main(): execution_i += 1 app = dash.Dash(__name__) - app.layout = html.Div([ - html.Div([ - cyto.Cytoscape( - id='cytoscape', - layout={"name": "breadthfirst", "directed": True}, - style={'width': '100%', 'height': '100vh'}, - elements=elements, - stylesheet=[ - {"selector": "node", "style": { - "label": "data(label)", - "text-valign": "center", - "text-margin-y": "-20px", - }}, - {"selector": "edge", "style": { - "curve-style": "bezier", # Makes edges curved for better readability - "target-arrow-shape": "triangle", # Adds an arrowhead to indicate direction - "arrow-scale": 1.5, # Makes the arrow larger - "line-color": "#0074D9", # Edge color - "target-arrow-color": "#0074D9", # Arrow color - "width": 2, # Line thickness - "content": "data(label)", # Show edge label on hover - "font-size": "12px", - "color": "#ff4136", - "text-background-opacity": 1, - "text-background-color": "white", - "text-background-shape": "roundrectangle", - "text-border-opacity": 1, - "text-border-width": 1, - "text-border-color": "#ff4136", - }}, - {"selector": ".register", "style": { - "shape": "rectangle", - }}, - {"selector": ".script", "style": { - "shape": "roundrectangle", - }}, - {"selector": ".execution", "style": { - "shape": "ellipse", - }}, - {"selector": ".reactive", "style": { - "background-color": "#ff851b", - }}, - {"selector": ".exclusive", "style": { - "background-color": "#ff4136", - }}, + app.layout = html.Div( + [ + html.Div( + [ + cyto.Cytoscape( + id="cytoscape", + layout={"name": "breadthfirst", "directed": True}, + style={"width": "100%", "height": "100vh"}, + elements=elements, + stylesheet=[ + { + "selector": "node", + "style": { + "label": "data(label)", + "text-valign": "center", + "text-margin-y": "-20px", + }, + }, + { + "selector": "edge", + "style": { + "curve-style": "bezier", # Makes edges curved for better readability + "target-arrow-shape": "triangle", # Adds an arrowhead to indicate direction + "arrow-scale": 1.5, # Makes the arrow larger + "line-color": "#0074D9", # Edge color + "target-arrow-color": "#0074D9", # Arrow color + "width": 2, # Line thickness + "content": "data(label)", # Show edge label on hover + "font-size": "12px", + "color": "#ff4136", + "text-background-opacity": 1, + "text-background-color": "white", + "text-background-shape": "roundrectangle", + "text-border-opacity": 1, + "text-border-width": 1, + "text-border-color": "#ff4136", + }, + }, + { + "selector": ".register", + "style": { + "shape": "rectangle", + }, + }, + { + "selector": ".script", + "style": { + "shape": "roundrectangle", + }, + }, + { + "selector": ".execution", + "style": { + "shape": "ellipse", + }, + }, + { + "selector": ".reactive", + "style": { + "background-color": "#ff851b", + }, + }, + { + "selector": ".exclusive", + "style": { + "background-color": "#ff4136", + }, + }, + ], + ), ], + style={"flex": "3", "height": "100vh"}, ), - ], style={"flex": "3", "height": "100vh"}), - - html.Div([ - html.Pre(id='node-data', style={ - "padding": "10px", - "white-space": "pre", - "overflow": "auto", - "max-height": "95vh", - "max-width": "100%" - }) - ], style={"flex": "1", "height": "100vh", "background-color": "#f7f7f7"}) - ], style={"display": "flex", "flex-direction": "row", "height": "100vh"}) - - @app.callback( - Output('node-data', 'children'), - Input('cytoscape', 'tapNodeData') + html.Div( + [ + html.Pre( + id="node-data", + style={ + "padding": "10px", + "white-space": "pre", + "overflow": "auto", + "max-height": "95vh", + "max-width": "100%", + }, + ) + ], + style={"flex": "1", "height": "100vh", "background-color": "#f7f7f7"}, + ), + ], + style={"display": "flex", "flex-direction": "row", "height": "100vh"}, ) + + @app.callback(Output("node-data", "children"), Input("cytoscape", "tapNodeData")) def display_task_info(data): if data is None: return "Click on a node to see its info." diff --git a/src/sio3pack/workflow/__init__.py b/src/sio3pack/workflow/__init__.py new file mode 100644 index 0000000..518f717 --- /dev/null +++ b/src/sio3pack/workflow/__init__.py @@ -0,0 +1,3 @@ +from sio3pack.workflow.workflow import Workflow +from sio3pack.workflow.workflow_manager import WorkflowManager +from sio3pack.workflow.workflow_op import WorkflowOperation diff --git a/src/sio3pack/workflow/execution/__init__.py b/src/sio3pack/workflow/execution/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/sio3pack/workflow/execution/filesystems.py b/src/sio3pack/workflow/execution/filesystems.py new file mode 100644 index 0000000..c91b954 --- /dev/null +++ b/src/sio3pack/workflow/execution/filesystems.py @@ -0,0 +1,167 @@ +from sio3pack.workflow.object import Object + + +class Filesystem: + def __init__(self, id: int): + """ + Represent a filesystem. + :param id: The id of the filesystem. + """ + self.id = id + + @classmethod + def from_json(cls, data: dict, id: int, workflow: "Workflow"): + """ + Create a new filesystem from a dictionary. + :param data: The dictionary to create the filesystem from. + :param id: The id of the filesystem. + :param workflow: The workflow the filesystem belongs to. + """ + return NotImplementedError() + + def to_json(self) -> dict: + """ + Convert the filesystem to a dictionary. + """ + return NotImplementedError() + + +class ImageFilesystem(Filesystem): + def __init__(self, id: int, image: str, path: str = None): + """ + Represent an image filesystem. + :param id: The id of the image filesystem. + :param image: The image to use. + :param path: The path to the image. If None, the path is "" + """ + super().__init__(id) + self.image = image + self.path = path or "" + + def __str__(self): + return f'' + + @classmethod + def from_json(cls, data: dict, id: int, workflow: "Workflow"): + """ + Create a new image filesystem from a dictionary. + :param data: The dictionary to create the image filesystem from. + :param id: The id of the image filesystem. + :param workflow: The workflow the image filesystem belongs to. + """ + return cls(id, data["image"], data["path"]) + + def to_json(self) -> dict: + """ + Convert the image filesystem to a dictionary. + """ + return {"type": "image", "image": self.image, "path": self.path} + + +class EmptyFilesystem(Filesystem): + def __init__(self, id: int): + """ + Represent an empty filesystem. Can be used as tmpfs. + :param id: The id of the empty filesystem. + """ + super().__init__(id) + + def __str__(self): + return "" + + @classmethod + def from_json(cls, data: dict, id: int, workflow: "Workflow"): + """ + Create a new empty filesystem from a dictionary. + :param data: The dictionary to create the empty filesystem from. + :param id: The id of the empty filesystem. + :param workflow: The workflow the empty filesystem belongs to. + """ + return cls(id) + + def to_json(self) -> dict: + """ + Convert the empty filesystem to a dictionary. + """ + return {"type": "empty"} + + +class ObjectFilesystem(Filesystem): + def __init__(self, id: int, object: Object, workflow: "Workflow"): + """ + Represent an object filesystem. + :param id: The id of the object filesystem. + :param object: The object to use. + :param workflow: The workflow the object belongs to. + """ + super().__init__(id) + self.object = object + + def __str__(self): + return f"" + + @classmethod + def from_json(cls, data: dict, id: int, workflow: "Workflow"): + """ + Create a new object filesystem from a dictionary. + :param data: The dictionary to create the object filesystem from. + :param id: The id of the object filesystem. + :param workflow: The workflow the object filesystem belongs to. + """ + return cls(id, workflow.objects_manager.get_or_create_object(data["handle"]), workflow) + + def to_json(self) -> dict: + """ + Convert the object filesystem to a dictionary. + """ + return { + "type": "object", + "handle": self.object.handle, + } + + +class FilesystemManager: + def __init__(self, task: "Task"): + """ + Create a new filesystem manager. + :param task: The task the filesystem manager belongs to. + """ + self.filesystems: list[Filesystem] = [] + self.id = 0 + self.task = task + + def from_json(self, data: list[dict], workflow: "Workflow"): + """ + Create filesystems from a list of dictionaries. + :param data: The list of dictionaries to create the filesystems from. + :param workflow: The workflow the filesystems belong to. + """ + for fs in data: + if fs["type"] == "image": + self.filesystems.append(ImageFilesystem.from_json(fs, self.id, workflow)) + elif fs["type"] == "empty": + self.filesystems.append(EmptyFilesystem.from_json(fs, self.id, workflow)) + elif fs["type"] == "object": + self.filesystems.append(ObjectFilesystem.from_json(fs, self.id, workflow)) + self.id += 1 + + def to_json(self) -> list[dict]: + """ + Convert the filesystems to a list of dictionaries. + """ + return [fs.to_json() for fs in self.filesystems] + + def get_by_id(self, id: int) -> Filesystem: + """ + Get a filesystem by id. + :param id: The id of the filesystem. + """ + return self.filesystems[id] + + def add(self, filesystem: Filesystem): + """ + Add a filesystem to the manager. + :param filesystem: The filesystem to add. + """ + self.filesystems.append(filesystem) + self.id += 1 diff --git a/src/sio3pack/workflow/execution/mount_namespace.py b/src/sio3pack/workflow/execution/mount_namespace.py new file mode 100644 index 0000000..e71e3be --- /dev/null +++ b/src/sio3pack/workflow/execution/mount_namespace.py @@ -0,0 +1,96 @@ +from sio3pack.workflow.execution.filesystems import Filesystem, FilesystemManager + + +class Mountpoint: + def __init__(self, source: Filesystem, target: str, writable: bool = False): + self.source = source + self.target = target + self.writable = writable + + @classmethod + def from_json(cls, data: dict, filesystem_manager: FilesystemManager): + """ + Create a new mountpoint from a dictionary. + :param data: The dictionary to create the mountpoint from. + :param filesystem_manager: The filesystem manager to use. + """ + if isinstance(data["source"], str): # TODO: delete this + return cls(data["source"], data["target"], data["writable"]) + return cls(filesystem_manager.get_by_id(int(data["source"])), data["target"], data["writable"]) + + def to_json(self) -> dict: + """ + Convert the mountpoint to a dictionary. + """ + if isinstance(self.source, str): # TODO: delete this + return {"source": self.source, "target": self.target, "writable": self.writable} + return {"source": self.source.id, "target": self.target, "writable": self.writable} + + +class MountNamespace: + def __init__(self, id: int, mountpoints: list[Mountpoint] = None, root: int = 0): + self.mountpoints = mountpoints or [] + self.root = root + self.id = id + + @classmethod + def from_json(cls, data: dict, id: int, filesystem_manager: FilesystemManager): + """ + Create a new mount namespace from a dictionary. + :param data: The dictionary to create the mount namespace from. + :param id: The id of the mount namespace. + :param filesystem_manager: The filesystem manager to use. + """ + return cls( + id, + [Mountpoint.from_json(mountpoint, filesystem_manager) for mountpoint in data["mountpoints"]], + data["root"], + ) + + def to_json(self) -> dict: + """ + Convert the mount namespace to a dictionary. + """ + return {"mountpoints": [mountpoint.to_json() for mountpoint in self.mountpoints], "root": self.root} + + +class MountNamespaceManager: + def __init__(self, task: "Task", filesystem_manager: FilesystemManager): + """ + Create a new mount namespace manager. + :param task: The task the mount namespace manager belongs to. + """ + self.mount_namespaces: list[MountNamespace] = [] + self.id = 0 + self.task = task + self.filesystem_manager = filesystem_manager + + def from_json(self, data: list[dict]): + """ + Create a new mount namespace manager from a list of dictionaries. + :param data: The list of dictionaries to create the mount namespace manager from. + :param filesystem_manager: The filesystem manager to use. + """ + for mount_namespace in data: + self.add(MountNamespace.from_json(mount_namespace, self.id, self.filesystem_manager)) + self.id += 1 + + def add(self, mount_namespace: MountNamespace): + """ + Add a mount namespace to the manager. + :param mount_namespace: The mount namespace to add. + """ + self.mount_namespaces.append(mount_namespace) + + def get_by_id(self, id: int) -> MountNamespace: + """ + Get a mount namespace by its id. + :param id: The id of the mount namespace. + """ + return self.mount_namespaces[id] + + def to_json(self) -> list[dict]: + """ + Convert the mount namespace manager to a dictionary. + """ + return [mount_namespace.to_json() for mount_namespace in self.mount_namespaces] diff --git a/src/sio3pack/workflow/execution/pipe.py b/src/sio3pack/workflow/execution/pipe.py new file mode 100644 index 0000000..0b2e78a --- /dev/null +++ b/src/sio3pack/workflow/execution/pipe.py @@ -0,0 +1,25 @@ +class Pipe: + def __init__(self, buffer_size: int = 1048576, file_buffer_size: int = 1073741824, limit: int = 2147483648): + """ + Create a new pipe config. + :param buffer_size: The buffer size for the pipe. + :param file_buffer_size: The buffer size for files. + :param limit: The limit for the pipe. + """ + self.buffer_size = buffer_size + self.file_buffer_size = file_buffer_size + self.limit = limit + + @classmethod + def from_json(cls, data: dict): + """ + Create a new pipe config from a dictionary. + :param data: The dictionary to create the pipe config from. + """ + return cls(data["buffer_size"], data["file_buffer_size"], data["limit"]) + + def to_json(self) -> dict: + """ + Convert the pipe config to a dictionary. + """ + return {"buffer_size": self.buffer_size, "file_buffer_size": self.file_buffer_size, "limit": self.limit} diff --git a/src/sio3pack/workflow/execution/process.py b/src/sio3pack/workflow/execution/process.py new file mode 100644 index 0000000..506ed26 --- /dev/null +++ b/src/sio3pack/workflow/execution/process.py @@ -0,0 +1,60 @@ +from sio3pack.workflow.execution.mount_namespace import MountNamespace, MountNamespaceManager +from sio3pack.workflow.execution.resource_group import ResourceGroup, ResourceGroupManager + + +class Process: + def __init__( + self, + arguments: list[str] = None, + environment: dict = None, + image: str = "", + mount_namespace: MountNamespace = None, + resource_group: ResourceGroup = None, + pid_namespace: int = 0, + working_directory: str = "/", + ): + """ + Represent a process. + :param arguments: The arguments of the process. + :param environment: The environment of the process. + :param image: The image of the process. + :param mount_namespace: The mount namespace of the process. + :param resource_group: The resource group of the process. + :param working_directory: The working directory of the process. + """ + self.arguments = arguments or [] + self.environment = environment or {} + self.image = image + self.mount_namespace = mount_namespace + self.resource_group = resource_group + self.pid_namespace = pid_namespace + self.working_directory = working_directory + + def to_json(self) -> dict: + return { + "arguments": self.arguments, + "environment": [f"{key}={value}" for key, value in self.environment.items()], + "image": self.image, + "mount_namespace": self.mount_namespace.id, + "resource_group": self.resource_group.id, + "pid_namespace": self.pid_namespace, + "working_directory": self.working_directory, + } + + @classmethod + def from_json( + cls, data: dict, mountnamespace_manager: MountNamespaceManager, resource_group_manager: ResourceGroupManager + ): + env = {} + for var in data["environment"]: + key, value = var.split("=", 1) + env[key] = value + return cls( + data["arguments"], + env, + data["image"], + mountnamespace_manager.get_by_id(data["mount_namespace"]), + resource_group_manager.get_by_id(data["resource_group"]), + data["pid_namespace"], + data["working_directory"], + ) diff --git a/src/sio3pack/workflow/execution/resource_group.py b/src/sio3pack/workflow/execution/resource_group.py new file mode 100644 index 0000000..fbeb72d --- /dev/null +++ b/src/sio3pack/workflow/execution/resource_group.py @@ -0,0 +1,109 @@ +class ResourceGroup: + def __init__( + self, + id: int, + cpu_usage_limit: int = 100.0, + instruction_limit: int = 1e9, + memory_limit: int = 2147483648, + oom_terminate_all_tasks: bool = False, + pid_limit: int = 2, + swap_limit: int = 0, + time_limit: int = 1e9, + ): + """ + Create a new resource group. + :param id: The id of the resource group. + :param cpu_usage_limit: The CPU usage limit. + :param instruction_limit: The instruction usage limit. + :param memory_limit: The memory limit. + :param oom_terminate_all_tasks: Whether to terminate all tasks on OOM. + :param pid_limit: The PID limit. + :param swap_limit: The swap limit. + :param time_limit: The time limit. + """ + self.id = id + self.cpu_usage_limit = cpu_usage_limit + self.instruction_limit = instruction_limit + self.memory_limit = memory_limit + self.oom_terminate_all_tasks = oom_terminate_all_tasks + self.pid_limit = pid_limit + self.swap_limit = swap_limit + self.time_limit = time_limit + + @classmethod + def from_json(cls, data: dict, id: int): + """ + Create a new resource group from a dictionary. + :param data: The dictionary to create the resource group from. + :param id: The id of the resource group. + """ + return cls( + id, + data["cpu_usage_limit"], + data["instruction_limit"], + data["memory_limit"], + data["oom_terminate_all_tasks"], + data["pid_limit"], + data["swap_limit"], + data["time_limit"], + ) + + def to_json(self) -> dict: + """ + Convert the resource group to a dictionary. + """ + return { + "cpu_usage_limit": self.cpu_usage_limit, + "instruction_limit": self.instruction_limit, + "memory_limit": self.memory_limit, + "oom_terminate_all_tasks": self.oom_terminate_all_tasks, + "pid_limit": self.pid_limit, + "swap_limit": self.swap_limit, + "time_limit": self.time_limit, + } + + +class ResourceGroupManager: + def __init__(self, task: "Task"): + """ + Create a new resource group manager. + :param task: The task the resource group manager belongs to. + """ + self.resource_groups: list[ResourceGroup] = [] + self.id = 0 + + def add(self, resource_group: ResourceGroup): + """ + Add a resource group to the resource group manager. + :param resource_group: The resource group to add. + """ + self.resource_groups.append(resource_group) + self.id += 1 + + def get_by_id(self, id: int) -> ResourceGroup: + """ + Get a resource group by its id. + :param id: The id of the resource group to get. + """ + return self.resource_groups[id] + + def to_json(self) -> list[dict]: + """ + Convert the resource group manager to a dictionary. + """ + return [resource_group.to_json() for resource_group in self.resource_groups] + + def from_json(self, data: list[dict]): + """ + Create a new resource group manager from a list of dictionaries. + :param data: The list of dictionaries to create the resource group manager from. + """ + for resource_group in data: + self.add(ResourceGroup.from_json(resource_group, self.id)) + self.id += 1 + + def all(self) -> list[ResourceGroup]: + """ + Get all resource groups. + """ + return self.resource_groups diff --git a/src/sio3pack/workflow/object.py b/src/sio3pack/workflow/object.py new file mode 100644 index 0000000..2372126 --- /dev/null +++ b/src/sio3pack/workflow/object.py @@ -0,0 +1,52 @@ +class Object: + """ + A class to represent an object in a workflow. + """ + + def __init__(self, handle: str): + """ + Create a new object. + :param handle: The handle of the object. + """ + self.handle = handle + + def __str__(self): + return f"" + + +class ObjectsManager: + def __init__(self): + self.objects = {} + + def create_object(self, handle: str) -> Object: + """ + Create and return a new object. + :param handle: The handle of the object. + :return: The created object. + """ + obj = Object(handle) + self.objects[handle] = obj + return obj + + def add_object(self, obj: Object): + """ + Add an object to the manager. + :param obj: The object to add. + """ + self.objects[obj.handle] = obj + + def get_object(self, handle: str) -> Object: + """ + Get an object by its handle. + :param handle: The handle of the object. + """ + return self.objects[handle] + + def get_or_create_object(self, handle: str) -> Object: + """ + Get an object by its handle, creating it if it does not exist. + :param handle: The handle of the object. + """ + if handle not in self.objects: + return self.create_object(handle) + return self.get_object(handle) diff --git a/src/sio3pack/workflow/tasks.py b/src/sio3pack/workflow/tasks.py new file mode 100644 index 0000000..fed25cd --- /dev/null +++ b/src/sio3pack/workflow/tasks.py @@ -0,0 +1,188 @@ +from sio3pack.workflow.execution.filesystems import Filesystem, FilesystemManager +from sio3pack.workflow.execution.mount_namespace import MountNamespace, MountNamespaceManager +from sio3pack.workflow.execution.pipe import Pipe +from sio3pack.workflow.execution.process import Process +from sio3pack.workflow.execution.resource_group import ResourceGroup, ResourceGroupManager + + +class Task: + @classmethod + def from_json(cls, data: dict, workflow: "Workflow"): + """ + Create a new task from a dictionary. + :param data: The dictionary to create the task from. + :param workflow: The workflow the task belongs to. + """ + if data["type"] == "execution": + return ExecutionTask.from_json(data, workflow) + elif data["type"] == "script": + return ScriptTask.from_json(data, workflow) + else: + raise ValueError(f"Unknown task type: {data['type']}") + + +class ExecutionTask(Task): + def __init__( + self, + name: str, + workflow: "Workflow", + exclusive: bool = False, + hard_time_limit: int = None, + extra_limit: int = None, + output_register: int = None, + input_register: int = None, + pid_namespaces: int = 1, + processes: list[Process] = None, + pipes: list[Pipe] = None, + system_pipes: int = 3, + ): + """ + Create a new execution task. + :param name: The name of the task. + :param workflow: The workflow the task belongs to. + :param exclusive: Whether the task is exclusive. + :param hard_time_limit: The hard time limit. + :param extra_limit: If set, the hard_time_limit for the task will be the maximum time limit of all resource groups + plus this value. + :param output_register: The output register of the task. + :param input_register: The input register of the task. TODO delete + :param pid_namespaces: The number of PID namespaces. + :param processes: The processes of the task. + :param pipes: The pipes of the task. + :param system_pipes: The number of system pipes. + """ + self.name = name + self.workflow = workflow + self.exclusive = exclusive + if hard_time_limit is not None: + self.hard_time_limit = hard_time_limit + self.output_register = output_register + self.input_register = input_register + self.pid_namespaces = pid_namespaces + self.processes = processes or [] + self.system_pipes = system_pipes + self.pipes = [Pipe.from_json(pipe) for pipe in pipes] if pipes else [] + self.extra_limit = extra_limit + + self.filesystem_manager = FilesystemManager(self) + self.mountnamespace_manager = MountNamespaceManager(self, self.filesystem_manager) + self.resource_group_manager = ResourceGroupManager(self) + + @classmethod + def from_json(cls, data: dict, workflow: "Workflow"): + """ + Create a new execution task from a dictionary. + :param data: The dictionary to create the task from. + :param workflow: The workflow the task belongs to. + """ + task = cls( + data["name"], + workflow, + data["exclusive"], + data.get("hard_time_limit"), + output_register=data.get("output_register"), + input_register=data.get("input_register"), + pid_namespaces=data["pid_namespaces"], + pipes=data["pipes"], + system_pipes=data.get("system_pipes", 3), + ) + task.filesystem_manager.from_json(data["filesystems"], workflow) + task.mountnamespace_manager.from_json(data["mount_namespaces"]) + task.resource_group_manager.from_json(data["resource_groups"]) + task.processes = [ + Process.from_json(process, task.mountnamespace_manager, task.resource_group_manager) + for process in data["processes"] + ] + return task + + def to_json(self): + """ + Convert the task to a dictionary. + """ + hard_time_limit = self.hard_time_limit + if self.extra_limit is not None: + hard_time_limit = 0 + for rg in self.resource_group_manager.all(): + hard_time_limit = max(hard_time_limit, rg.time_limit) + hard_time_limit += self.extra_limit + + res = { + "name": self.name, + "type": "execution", + "exclusive": self.exclusive, + "hard_time_limit": hard_time_limit, + "output_register": self.output_register, + "pid_namespaces": self.pid_namespaces, + "filesystems": self.filesystem_manager.to_json(), + "mount_namespaces": self.mountnamespace_manager.to_json(), + "pipes": [pipe.to_json() for pipe in self.pipes], + "system_pipes": self.system_pipes, + "resource_groups": self.resource_group_manager.to_json(), + "processes": [process.to_json() for process in self.processes], + } + if self.input_register is not None: + res["input_register"] = self.input_register + return res + + def add_filesystem(self, filesystem: Filesystem): + self.filesystem_manager.add(filesystem) + + def add_mount_namespace(self, mount_namespace: MountNamespace): + self.mountnamespace_manager.add(mount_namespace) + + def add_resource_group(self, resource_group: ResourceGroup): + self.resource_group_manager.add(resource_group) + + +class ScriptTask(Task): + def __init__( + self, + name: str, + workflow: "Workflow", + reactive: bool = False, + input_registers: list[int] = None, + output_registers: list[int] = None, + script: str = None, + ): + """ + Create a new script task. + :param name: The name of the task. + :param workflow: The workflow the task belongs to. + :param reactive: Whether the task is reactive. + :param input_registers: The input registers of the task. + :param output_registers: The output registers of the task. + :param script: The script to run. + """ + self.name = name + self.workflow = workflow + self.reactive = reactive + self.input_registers = input_registers or [] + self.output_registers = output_registers or [] + self.script = script + + def __str__(self): + return f"" + + @classmethod + def from_json(cls, data: dict, workflow: "Workflow"): + """ + Create a new script task from a dictionary. + :param data: The dictionary to create the task from. + :param workflow: The workflow the task belongs to. + """ + return cls( + data["name"], workflow, data["reactive"], data["input_registers"], data["output_registers"], data["script"] + ) + + def to_json(self) -> dict: + """ + Convert the task to a dictionary. + """ + return { + "name": self.name, + "type": "script", + "reactive": self.reactive, + "input_registers": self.input_registers, + "output_registers": self.output_registers, + "script": self.script, + } diff --git a/src/sio3pack/workflow/workflow.py b/src/sio3pack/workflow/workflow.py new file mode 100644 index 0000000..5c10bb6 --- /dev/null +++ b/src/sio3pack/workflow/workflow.py @@ -0,0 +1,86 @@ +from sio3pack.workflow.object import ObjectsManager +from sio3pack.workflow.tasks import Task + + +class Workflow: + """ + A class to represent a job workflow. + """ + + @classmethod + def from_json(cls, data: dict): + """ + Create a new workflow from a dictionary. + :param data: The dictionary to create the workflow from. + """ + workflow = cls(data["name"], data["external_objects"], data["observable_objects"], data["observable_registers"]) + for task in data["tasks"]: + workflow.add_task(Task.from_json(task, workflow)) + return workflow + + def __init__( + self, + name: str, + external_objects: list[str] = None, + observable_objects: list[str] = None, + observable_registers: int = 0, + tasks: list[Task] = None, + ): + """ + Create a new workflow. Number of required registers is calculated automatically. + :param name: The name of the workflow. + :param external_objects: The external objects used in the workflow. + :param observable_objects: The observable objects used in the workflow. + :param observable_registers: The number of observable registers used in the workflow. + :param tasks: The tasks in the workflow. + """ + self.name = name + self.observable_registers = observable_registers + self.tasks = tasks or [] + self.objects_manager = ObjectsManager() + + self.external_objects = [] + for obj in external_objects or []: + self.external_objects.append(self.objects_manager.get_or_create_object(obj)) + + self.observable_objects = [] + for obj in observable_objects or []: + self.observable_objects.append(self.objects_manager.get_or_create_object(obj)) + + def __str__(self): + return f"" + + def to_json(self) -> dict: + """ + Convert the workflow to a dictionary. + """ + res = { + "name": self.name, + "external_objects": [obj.handle for obj in self.external_objects], + "observable_objects": [obj.handle for obj in self.observable_objects], + "observable_registers": self.observable_registers, + "tasks": [task.to_json() for task in self.tasks], + } + num_registers = 0 + for task in res["tasks"]: + if "input_registers" in task: + num_registers = max(num_registers, max(task["input_registers"])) + if "output_registers" in task: + num_registers = max(num_registers, max(task["output_registers"])) + if "output_register" in task: + num_registers = max(num_registers, task["output_register"]) + res["registers"] = num_registers + 1 + return res + + def add_task(self, task: Task): + """ + Add a task to the workflow. + :param task: The task to add. + """ + self.tasks.append(task) + + def get_prog_files(self) -> list[str]: + """ + Get all program files in the workflow. + """ + raise NotImplementedError() diff --git a/src/sio3pack/graph/graph_manager.py b/src/sio3pack/workflow/workflow_manager.py similarity index 71% rename from src/sio3pack/graph/graph_manager.py rename to src/sio3pack/workflow/workflow_manager.py index 4a8e4f8..1757e1c 100644 --- a/src/sio3pack/graph/graph_manager.py +++ b/src/sio3pack/workflow/workflow_manager.py @@ -1,19 +1,19 @@ import json from sio3pack.files import File -from sio3pack.graph.graph import Graph +from sio3pack.workflow.workflow import Workflow -class GraphManager: +class WorkflowManager: @classmethod def from_file(cls, file: File): graphs = {} content = json.loads(file.read()) for name, graph in content.items(): - graphs[name] = Graph.from_dict(graph) + graphs[name] = Workflow.from_json(graph) return cls(graphs) - def __init__(self, graphs: dict[str, Graph]): + def __init__(self, graphs: dict[str, Workflow]): self.graphs = graphs def get_prog_files(self) -> list[str]: @@ -25,5 +25,5 @@ def get_prog_files(self) -> list[str]: files.update(graph.get_prog_files()) return list(files) - def get(self, name: str) -> Graph: + def get(self, name: str) -> Workflow: return self.graphs[name] diff --git a/src/sio3pack/graph/graph_op.py b/src/sio3pack/workflow/workflow_op.py similarity index 63% rename from src/sio3pack/graph/graph_op.py rename to src/sio3pack/workflow/workflow_op.py index 3ef4257..d8867e9 100644 --- a/src/sio3pack/graph/graph_op.py +++ b/src/sio3pack/workflow/workflow_op.py @@ -1,15 +1,15 @@ -from sio3pack.graph.graph import Graph +from sio3pack.workflow.workflow import Workflow -class GraphOperation: +class WorkflowOperation: """ - A class to represent a graph that should be run on workers. + A class to represent a workflow that should be run on workers. Allows for returning results. """ - def __init__(self, graph: Graph, return_results: bool = False, return_func: callable = None): + def __init__(self, graph: Workflow, return_results: bool = False, return_func: callable = None): """ - :param graph: The graph to run on workers. + :param graph: The workflow to run on workers. :param return_results: Whether to return the results. :param return_func: The function to use to return the results, if return_results is True. diff --git a/tests/workflow/__init__.py b/tests/workflow/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/workflow/test_workflow.py b/tests/workflow/test_workflow.py new file mode 100644 index 0000000..8f8cf7c --- /dev/null +++ b/tests/workflow/test_workflow.py @@ -0,0 +1,17 @@ +import json +import os + +from deepdiff import DeepDiff + +from sio3pack.workflow import Workflow + + +def test_workflow_parsing(): + workflows_dir = os.path.join(os.path.dirname(__file__), "..", "..", "example_workflows") + for file in os.listdir(workflows_dir): + if not os.path.splitext(file)[1] == ".json": + continue + print(f"Parsing {file}") + data = json.load(open(os.path.join(workflows_dir, file))) + workflow = Workflow.from_json(data) + assert workflow.to_json() == data, f"Failed for {file}. Diff: {DeepDiff(data, workflow.to_json())}"