Skip to content

Commit

Permalink
External program mappings reorganization.
Browse files Browse the repository at this point in the history
* Workflow engines and container factories were reading the raw configuration file in order to locate declaration of custom paths for the programs they need for their work. Several method declarations have been changed, in order to minimize what it is passed down to them, so their implementations are not tied to the structure of the general WfExS configuration files.

* CWL workflow engine now is able to install cwltool versions which are older than the requested one, in case that version is incompatible with the python interpreter being used.

* CWL workflow engine has gained the capability to use custom python interpreters (or intepreters in different paths). This is needed to disengage cwltool installation from WfExS one a bit more.

* CWL workflow engine now forces pydot version lower than 3.0.0, due issue common-workflow-language/cwltool#2027
  • Loading branch information
jmfernandez committed Jul 28, 2024
1 parent 1f03d13 commit 04e960c
Show file tree
Hide file tree
Showing 9 changed files with 315 additions and 92 deletions.
10 changes: 6 additions & 4 deletions wfexs_backend/container_factories/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@
ContainerTaggedName,
Fingerprint,
PathlibLike,
ProgsMapping,
RelPath,
SymbolicName,
URIType,
)

Expand Down Expand Up @@ -547,7 +549,7 @@ def __init__(
simpleFileNameMethod: "ContainerFileNamingMethod",
containersCacheDir: "Optional[pathlib.Path]" = None,
stagedContainersDir: "Optional[pathlib.Path]" = None,
tools_config: "Optional[ContainerLocalConfig]" = None,
progs_mapping: "Optional[ProgsMapping]" = None,
engine_name: "str" = "unset",
tempDir: "Optional[pathlib.Path]" = None,
):
Expand All @@ -560,9 +562,9 @@ def __init__(
# provides its file naming method
self.simpleFileNameMethod = simpleFileNameMethod

if tools_config is None:
tools_config = dict()
self.tools_config = tools_config
if progs_mapping is None:
progs_mapping = dict()
self.progs_mapping = progs_mapping

# Getting a logger focused on specific classes
self.logger = logging.getLogger(
Expand Down
10 changes: 7 additions & 3 deletions wfexs_backend/container_factories/docker_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@
AnyPath,
ContainerTaggedName,
Fingerprint,
ProgsMapping,
RelPath,
SymbolicName,
URIType,
)

Expand Down Expand Up @@ -97,19 +99,21 @@ def __init__(
simpleFileNameMethod: "ContainerFileNamingMethod",
containersCacheDir: "Optional[pathlib.Path]" = None,
stagedContainersDir: "Optional[pathlib.Path]" = None,
tools_config: "Optional[ContainerLocalConfig]" = None,
progs_mapping: "Optional[ProgsMapping]" = None,
engine_name: "str" = "unset",
tempDir: "Optional[pathlib.Path]" = None,
):
super().__init__(
simpleFileNameMethod=simpleFileNameMethod,
containersCacheDir=containersCacheDir,
stagedContainersDir=stagedContainersDir,
tools_config=tools_config,
progs_mapping=progs_mapping,
engine_name=engine_name,
tempDir=tempDir,
)
self.runtime_cmd = self.tools_config.get("dockerCommand", DEFAULT_DOCKER_CMD)
self.runtime_cmd = self.progs_mapping.get(
cast("SymbolicName", "docker"), DEFAULT_DOCKER_CMD
)

@classmethod
def ContainerType(cls) -> "ContainerType":
Expand Down
2 changes: 0 additions & 2 deletions wfexs_backend/container_factories/no_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ class NoContainerFactory(ContainerFactory):
The 'no container approach', for development and local installed software
"""

# def __init__(self, containersCacheDir=None, tools_config=None, engine_name='unset'):
# super().__init__(containersCacheDir=containersCacheDir, tools_config=tools_config, engine_name=engine_name)
AcceptedContainerTypes = set([common.ContainerType.NoContainer])

@classmethod
Expand Down
10 changes: 7 additions & 3 deletions wfexs_backend/container_factories/podman_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@
AnyPath,
ContainerTaggedName,
Fingerprint,
ProgsMapping,
RelPath,
SymbolicName,
URIType,
)

Expand Down Expand Up @@ -100,19 +102,21 @@ def __init__(
simpleFileNameMethod: "ContainerFileNamingMethod",
containersCacheDir: "Optional[pathlib.Path]" = None,
stagedContainersDir: "Optional[pathlib.Path]" = None,
tools_config: "Optional[ContainerLocalConfig]" = None,
progs_mapping: "Optional[ProgsMapping]" = None,
engine_name: "str" = "unset",
tempDir: "Optional[pathlib.Path]" = None,
):
super().__init__(
simpleFileNameMethod=simpleFileNameMethod,
containersCacheDir=containersCacheDir,
stagedContainersDir=stagedContainersDir,
tools_config=tools_config,
progs_mapping=progs_mapping,
engine_name=engine_name,
tempDir=tempDir,
)
self.runtime_cmd = self.tools_config.get("podmanCommand", DEFAULT_PODMAN_CMD)
self.runtime_cmd = self.progs_mapping.get(
cast("SymbolicName", "podman"), DEFAULT_PODMAN_CMD
)

self._environment.update(
{
Expand Down
10 changes: 6 additions & 4 deletions wfexs_backend/container_factories/singularity_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@
ContainerTaggedName,
ExitVal,
Fingerprint,
ProgsMapping,
RelPath,
SymbolicName,
URIType,
)

Expand Down Expand Up @@ -132,20 +134,20 @@ def __init__(
simpleFileNameMethod: "ContainerFileNamingMethod",
containersCacheDir: "Optional[pathlib.Path]" = None,
stagedContainersDir: "Optional[pathlib.Path]" = None,
tools_config: "Optional[ContainerLocalConfig]" = None,
progs_mapping: "Optional[ProgsMapping]" = None,
engine_name: "str" = "unset",
tempDir: "Optional[pathlib.Path]" = None,
):
super().__init__(
simpleFileNameMethod=simpleFileNameMethod,
containersCacheDir=containersCacheDir,
stagedContainersDir=stagedContainersDir,
tools_config=tools_config,
progs_mapping=progs_mapping,
engine_name=engine_name,
tempDir=tempDir,
)
self.runtime_cmd = self.tools_config.get(
"singularityCommand", DEFAULT_SINGULARITY_CMD
self.runtime_cmd = self.progs_mapping.get(
cast("SymbolicName", "singularityCommand"), DEFAULT_SINGULARITY_CMD
)

# This is needed due a bug in singularity 3.6, where
Expand Down
1 change: 1 addition & 0 deletions wfexs_backend/wfexs_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -1978,6 +1978,7 @@ def instantiateEngine(
return engineDesc.clazz.FromStagedSetup(
staged_setup=stagedSetup,
container_factory_classes=self.listContainerFactoryClasses(),
progs_mapping=self.progs,
cache_dir=self.cacheDir,
cache_workflow_dir=self.cacheWorkflowDir,
cache_workflow_inputs_dir=self.cacheWorkflowInputsDir,
Expand Down
114 changes: 82 additions & 32 deletions wfexs_backend/workflow_engines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# limitations under the License.
from __future__ import absolute_import

import copy
import os
import pathlib
import sys
Expand Down Expand Up @@ -53,6 +54,7 @@
Callable,
Mapping,
MutableSequence,
MutableMapping,
NewType,
Optional,
Pattern,
Expand Down Expand Up @@ -80,9 +82,11 @@
LocalWorkflow,
MaterializedInput,
MaterializedContent,
ProgsMapping,
RelPath,
StagedExecution,
StagedSetup,
SymbolicName,
SymbolicOutputName,
SymbolicParamName,
TRS_Workflow_Descriptor,
Expand Down Expand Up @@ -200,6 +204,22 @@ class MaterializedWorkflowEngine(NamedTuple):
containers: "Optional[Sequence[Container]]" = None
operational_containers: "Optional[Sequence[Container]]" = None

@classmethod
def _mapping_fixes(
cls, orig: "Mapping[str, Any]", workdir: "Optional[pathlib.Path]"
) -> "Mapping[str, Any]":
dest = cast("MutableMapping[str, Any]", copy.copy(orig))
dest["engine_path"] = pathlib.Path(orig["engine_path"])
if workdir is not None and not dest["engine_path"].is_absolute():
dest["engine_path"] = (workdir / dest["engine_path"]).resolve()

if dest.get("containers_path") is not None:
dest["containers_path"] = pathlib.Path(orig["containers_path"])
if workdir is not None and not dest["containers_path"].is_absolute():
dest["containers_path"] = (workdir / dest["containers_path"]).resolve()

return dest


# This skeleton is here only for type mapping reasons
class AbstractWorkflowEngineType(abc.ABC):
Expand Down Expand Up @@ -324,6 +344,7 @@ def FromStagedSetup(
container_factory_classes: "Sequence[Type[ContainerFactory]]" = [
NoContainerFactory
],
progs_mapping: "Optional[ProgsMapping]" = None,
cache_dir: "Optional[pathlib.Path]" = None,
cache_workflow_dir: "Optional[pathlib.Path]" = None,
cache_workflow_inputs_dir: "Optional[pathlib.Path]" = None,
Expand All @@ -350,12 +371,14 @@ class WorkflowEngineInstallException(WorkflowEngineException):


class WorkflowEngine(AbstractWorkflowEngineType):
ENGINE_NAME = "abstract"

def __init__(
self,
container_factory_clazz: "Type[ContainerFactory]" = NoContainerFactory,
cacheDir: "Optional[pathlib.Path]" = None,
workflow_config: "Optional[Mapping[str, Any]]" = None,
local_config: "Optional[EngineLocalConfig]" = None,
engine_config: "Optional[EngineLocalConfig]" = None,
progs_mapping: "Optional[ProgsMapping]" = None,
engineTweaksDir: "Optional[pathlib.Path]" = None,
cacheWorkflowDir: "Optional[pathlib.Path]" = None,
cacheWorkflowInputsDir: "Optional[pathlib.Path]" = None,
Expand All @@ -368,15 +391,14 @@ def __init__(
secure_exec: "bool" = False,
allowOther: "bool" = False,
config_directory: "Optional[pathlib.Path]" = None,
engine_mode: "EngineMode" = DEFAULT_ENGINE_MODE,
writable_containers: "bool" = False,
):
"""
Abstract init method
:param cacheDir:
:param workflow_config:
This one may be needed to identify container overrides
or specific engine versions
:param local_config:
:param engine_config:
:param engineTweaksDir:
:param cacheWorkflowDir:
:param cacheWorkflowInputsDir:
Expand All @@ -386,12 +408,15 @@ def __init__(
:param tempDir:
:param secure_exec:
:param config_directory:
:param writable_containers: Whether the containers of each step are writable
"""
if local_config is None:
local_config = dict()
if workflow_config is None:
workflow_config = dict()
self.local_config = local_config
if engine_config is None:
engine_config = dict()
self.engine_config = engine_config

if progs_mapping is None:
progs_mapping = dict()
self.progs_mapping = progs_mapping

if config_directory is None:
config_directory = pathlib.Path.cwd()
Expand All @@ -400,14 +425,7 @@ def __init__(
# Getting a logger focused on specific classes
self.logger = logging.getLogger(self.__class__.__name__)

# This one may be needed to identify container overrides
# or specific engine versions
self.workflow_config = workflow_config

# cacheDir
if cacheDir is None:
cacheDir = local_config.get("cacheDir")

if cacheDir is None:
cacheDir = pathlib.Path(tempfile.mkdtemp(prefix="WfExS", suffix="backend"))
# Assuring this temporal directory is removed at the end
Expand Down Expand Up @@ -509,13 +527,9 @@ def __init__(
self.stagedContainersDir = stagedContainersDir

# Setting up common properties
tools_config = local_config.get("tools", {})
self.docker_cmd = tools_config.get("dockerCommand", DEFAULT_DOCKER_CMD)
engine_mode = tools_config.get("engineMode")
if engine_mode is None:
engine_mode = DEFAULT_ENGINE_MODE
else:
engine_mode = EngineMode(engine_mode)
self.docker_cmd = self.progs_mapping.get(
cast("SymbolicName", "docker"), DEFAULT_DOCKER_CMD
)
self.engine_mode = engine_mode

container_type = container_factory_clazz.ContainerType()
Expand All @@ -532,14 +546,12 @@ def __init__(
self.logger.debug(f"Instantiating container type {container_type}")
# For materialized containers, we should use common directories
# This for the containers themselves
containersCacheDir = (
pathlib.Path(cacheDir) / "containers" / container_factory_clazz.__name__
)
containersCacheDir = cacheDir / "containers" / container_factory_clazz.__name__
self.container_factory = container_factory_clazz(
simpleFileNameMethod=self.simpleContainerFileName,
containersCacheDir=containersCacheDir,
stagedContainersDir=self.stagedContainersDir,
tools_config=tools_config,
progs_mapping=progs_mapping,
engine_name=self.__class__.__name__,
tempDir=self.tempDir,
)
Expand All @@ -566,7 +578,7 @@ def __init__(
self.payloadsDir = pathlib.Path(os.path.dirname(__file__), "payloads")

# Whether the containers of each step are writable
self.writable_containers = workflow_config.get("writable_containers", False)
self.writable_containers = writable_containers

if (
secure_exec
Expand All @@ -586,6 +598,7 @@ def FromStagedSetup(
container_factory_classes: "Sequence[Type[ContainerFactory]]" = [
NoContainerFactory
],
progs_mapping: "Optional[ProgsMapping]" = None,
cache_dir: "Optional[pathlib.Path]" = None,
cache_workflow_dir: "Optional[pathlib.Path]" = None,
cache_workflow_inputs_dir: "Optional[pathlib.Path]" = None,
Expand Down Expand Up @@ -613,9 +626,45 @@ def FromStagedSetup(
raise WorkflowEngineException(
f"FATAL: No container factory implementation for {staged_setup.container_type}"
)

if local_config is None:
local_config = dict()
tools_config = local_config.get("tools", {})

engineConf = copy.deepcopy(tools_config.get(cls.ENGINE_NAME, {}))
workflowEngineConf = (
staged_setup.workflow_config.get(cls.ENGINE_NAME, {})
if staged_setup.workflow_config
else {}
)
engineConf.update(workflowEngineConf)

if cache_dir is None:
cache_dir_str = local_config.get("cacheDir")
if cache_dir_str is not None:
cache_dir = pathlib.Path(cache_dir_str)

engine_mode = tools_config.get("engineMode")
if engine_mode is None:
engine_mode = DEFAULT_ENGINE_MODE
else:
try:
engine_mode = EngineMode(engine_mode)
except:
raise WorkflowEngineException(
f"Unrecognized engine mode {engine_mode} for {cls.ENGINE_NAME}"
)

# Whether the containers of each step are writable
writable_containers = False
if staged_setup.workflow_config is not None:
writable_containers = staged_setup.workflow_config.get(
"writable_containers", False
)

return cls(
container_factory_clazz=the_container_factory_clazz,
workflow_config=staged_setup.workflow_config,
progs_mapping=progs_mapping,
engineTweaksDir=staged_setup.engine_tweaks_dir,
workDir=staged_setup.work_dir,
outputsDir=staged_setup.outputs_dir,
Expand All @@ -627,8 +676,9 @@ def FromStagedSetup(
cacheWorkflowDir=cache_workflow_dir,
cacheWorkflowInputsDir=cache_workflow_inputs_dir,
stagedContainersDir=staged_setup.containers_dir,
local_config=local_config,
engine_config=engineConf,
config_directory=config_directory,
writable_containers=writable_containers,
)

def getConfiguredContainerType(self) -> "ContainerType":
Expand Down
Loading

0 comments on commit 04e960c

Please sign in to comment.