From c6eb82639fb87c0d4a348460683e9a201f4c45e5 Mon Sep 17 00:00:00 2001 From: Daniel Alejandro Coll Tejeda <62675074+macarronesc@users.noreply.github.com> Date: Fri, 11 Oct 2024 12:43:18 +0200 Subject: [PATCH] [IBM Code Engine] Added local disk variable for scheduler and worker (#440) * [IBM Code Engine] Added local disk storage to scheduler and workers * [IBM Code Engine] Local disk clarification added --- dask_cloudprovider/cloudprovider.yaml | 2 ++ dask_cloudprovider/ibm/code_engine.py | 25 +++++++++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/dask_cloudprovider/cloudprovider.yaml b/dask_cloudprovider/cloudprovider.yaml index 2e65be81..19bc7913 100755 --- a/dask_cloudprovider/cloudprovider.yaml +++ b/dask_cloudprovider/cloudprovider.yaml @@ -128,9 +128,11 @@ cloudprovider: project_id: null scheduler_cpu: "1.0" scheduler_mem: 4G + scheduler_disk: 400M scheduler_timeout: 600 # seconds worker_cpu: "2.0" worker_mem: 8G + worker_disk: 400M worker_threads: 1 openstack: diff --git a/dask_cloudprovider/ibm/code_engine.py b/dask_cloudprovider/ibm/code_engine.py index e056de12..eec0366d 100644 --- a/dask_cloudprovider/ibm/code_engine.py +++ b/dask_cloudprovider/ibm/code_engine.py @@ -41,9 +41,11 @@ def __init__( project_id: str = None, scheduler_cpu: str = None, scheduler_mem: str = None, + scheduler_disk: str = None, scheduler_timeout: int = None, worker_cpu: str = None, worker_mem: str = None, + worker_disk: str = None, worker_threads: int = None, api_key: str = None, **kwargs, @@ -56,9 +58,11 @@ def __init__( self.project_id = project_id self.scheduler_cpu = scheduler_cpu self.scheduler_mem = scheduler_mem + self.scheduler_disk = scheduler_disk self.scheduler_timeout = scheduler_timeout self.worker_cpu = worker_cpu self.worker_mem = worker_mem + self.worker_disk = worker_disk self.worker_threads = worker_threads self.api_key = api_key @@ -89,6 +93,7 @@ async def create_vm(self): scale_min_instances=1, scale_concurrency=1000, scale_memory_limit=self.memory, + scale_ephemeral_storage_limit=self.disk, scale_request_timeout=self.cluster.scheduler_timeout, run_env_variables=[ { @@ -144,6 +149,7 @@ def create_job_run_thread(): run_commands=self.command, scale_cpu_limit=self.cpu, scale_memory_limit=self.memory, + scale_ephemeral_storage_limit=self.disk, run_env_variables=[ { "type": "config_map_key_reference", @@ -194,6 +200,7 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.cpu = self.cluster.scheduler_cpu self.memory = self.cluster.scheduler_mem + self.disk = self.cluster.scheduler_disk self.command = [ "python", @@ -211,9 +218,11 @@ async def start(self): f"\n Project id: {self.project_id} " f"\n Scheduler CPU: {self.cpu} " f"\n Scheduler Memory: {self.memory} " + f"\n Scheduler Disk: {self.disk} " f"\n Scheduler Timeout: {self.cluster.scheduler_timeout} " f"\n Worker CPU: {self.cluster.worker_cpu} " f"\n Worker Memory: {self.cluster.worker_mem} " + f"\n Worker Disk: {self.cluster.worker_disk} " f"\n Worker Threads: {self.cluster.worker_threads} " ) self.cluster._log(f"Creating scheduler instance {self.name}") @@ -244,6 +253,7 @@ def __init__( self.worker_options = worker_options self.cpu = self.cluster.worker_cpu self.memory = self.cluster.worker_mem + self.disk = self.cluster.worker_disk # On this case, the worker must connect to the scheduler internal URL with the "ws" protocol and port 80 internal_scheduler = f"ws://{self.cluster.scheduler_internal_ip}:80" @@ -300,6 +310,8 @@ class IBMCodeEngineCluster(VMCluster): The amount of memory to allocate to the scheduler. See: https://cloud.ibm.com/docs/codeengine?topic=codeengine-mem-cpu-combo + scheduler_disk: str + The amount of ephemeral storage to allocate to the scheduler. This value must be lower than scheduler_mem. scheduler_timeout: int The timeout for the scheduler in seconds. worker_cpu: str @@ -310,6 +322,8 @@ class IBMCodeEngineCluster(VMCluster): The amount of memory to allocate to each worker. See: https://cloud.ibm.com/docs/codeengine?topic=codeengine-mem-cpu-combo + worker_disk: str + The amount of ephemeral storage to allocate to each worker. This value must be lower than worker_mem. worker_threads: int The number of threads to use on each worker. debug: bool, optional @@ -347,9 +361,11 @@ class IBMCodeEngineCluster(VMCluster): Project id: f21626f6-54f7-4065-a038-75c8b9a0d2e0 Scheduler CPU: 0.25 Scheduler Memory: 1G + Scheduler Disk: 400M Scheduler Timeout: 600 Worker CPU: 2 Worker Memory: 4G + Worker Disk: 400M Creating scheduler dask-xxxxxxxx-scheduler Waiting for scheduler to run at dask-xxxxxxxx-scheduler.xxxxxxxxxxxx.xx-xx.codeengine.appdomain.cloud:443 Scheduler is running @@ -382,9 +398,12 @@ class IBMCodeEngineCluster(VMCluster): Project id: f21626f6-54f7-4065-a038-75c8b9a0d2e0 Scheduler CPU: 0.25 Scheduler Memory: 1G + Scheduler Disk: 400M Scheduler Timeout: 600 Worker CPU: 2 Worker Memory: 4G + Worker Disk: 400M + Worker Threads: 1 Creating scheduler dask-xxxxxxxx-scheduler Waiting for scheduler to run at dask-xxxxxxxx-scheduler.xxxxxxxxxxxx.xx-xx.codeengine.appdomain.cloud:443 Scheduler is running @@ -402,9 +421,11 @@ def __init__( project_id: str = None, scheduler_cpu: str = None, scheduler_mem: str = None, + scheduler_disk: str = None, scheduler_timeout: int = None, worker_cpu: str = None, worker_mem: str = None, + worker_disk: str = None, worker_threads: int = 1, debug: bool = False, **kwargs, @@ -419,11 +440,13 @@ def __init__( api_key = self.config.get("api_key") self.scheduler_cpu = scheduler_cpu or self.config.get("scheduler_cpu") self.scheduler_mem = scheduler_mem or self.config.get("scheduler_mem") + self.scheduler_disk = scheduler_disk or self.config.get("scheduler_disk") self.scheduler_timeout = scheduler_timeout or self.config.get( "scheduler_timeout" ) self.worker_cpu = worker_cpu or self.config.get("worker_cpu") self.worker_mem = worker_mem or self.config.get("worker_mem") + self.worker_disk = worker_disk or self.config.get("worker_disk") self.worker_threads = worker_threads or self.config.get("worker_threads") self.debug = debug @@ -436,9 +459,11 @@ def __init__( "project_id": self.project_id, "scheduler_cpu": self.scheduler_cpu, "scheduler_mem": self.scheduler_mem, + "scheduler_disk": self.scheduler_disk, "scheduler_timeout": self.scheduler_timeout, "worker_cpu": self.worker_cpu, "worker_mem": self.worker_mem, + "worker_disk": self.worker_disk, "worker_threads": self.worker_threads, "api_key": api_key, }