Skip to content

Commit

Permalink
[IBM Code Engine] Added local disk variable for scheduler and worker (#…
Browse files Browse the repository at this point in the history
…440)

* [IBM Code Engine] Added local disk storage to scheduler and workers

* [IBM Code Engine] Local disk clarification added
  • Loading branch information
macarronesc authored Oct 11, 2024
1 parent 6eaf2db commit c6eb826
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 0 deletions.
2 changes: 2 additions & 0 deletions dask_cloudprovider/cloudprovider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,11 @@ cloudprovider:
project_id: null
scheduler_cpu: "1.0"
scheduler_mem: 4G
scheduler_disk: 400M
scheduler_timeout: 600 # seconds
worker_cpu: "2.0"
worker_mem: 8G
worker_disk: 400M
worker_threads: 1

openstack:
Expand Down
25 changes: 25 additions & 0 deletions dask_cloudprovider/ibm/code_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ def __init__(
project_id: str = None,
scheduler_cpu: str = None,
scheduler_mem: str = None,
scheduler_disk: str = None,
scheduler_timeout: int = None,
worker_cpu: str = None,
worker_mem: str = None,
worker_disk: str = None,
worker_threads: int = None,
api_key: str = None,
**kwargs,
Expand All @@ -56,9 +58,11 @@ def __init__(
self.project_id = project_id
self.scheduler_cpu = scheduler_cpu
self.scheduler_mem = scheduler_mem
self.scheduler_disk = scheduler_disk
self.scheduler_timeout = scheduler_timeout
self.worker_cpu = worker_cpu
self.worker_mem = worker_mem
self.worker_disk = worker_disk
self.worker_threads = worker_threads
self.api_key = api_key

Expand Down Expand Up @@ -89,6 +93,7 @@ async def create_vm(self):
scale_min_instances=1,
scale_concurrency=1000,
scale_memory_limit=self.memory,
scale_ephemeral_storage_limit=self.disk,
scale_request_timeout=self.cluster.scheduler_timeout,
run_env_variables=[
{
Expand Down Expand Up @@ -144,6 +149,7 @@ def create_job_run_thread():
run_commands=self.command,
scale_cpu_limit=self.cpu,
scale_memory_limit=self.memory,
scale_ephemeral_storage_limit=self.disk,
run_env_variables=[
{
"type": "config_map_key_reference",
Expand Down Expand Up @@ -194,6 +200,7 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cpu = self.cluster.scheduler_cpu
self.memory = self.cluster.scheduler_mem
self.disk = self.cluster.scheduler_disk

self.command = [
"python",
Expand All @@ -211,9 +218,11 @@ async def start(self):
f"\n Project id: {self.project_id} "
f"\n Scheduler CPU: {self.cpu} "
f"\n Scheduler Memory: {self.memory} "
f"\n Scheduler Disk: {self.disk} "
f"\n Scheduler Timeout: {self.cluster.scheduler_timeout} "
f"\n Worker CPU: {self.cluster.worker_cpu} "
f"\n Worker Memory: {self.cluster.worker_mem} "
f"\n Worker Disk: {self.cluster.worker_disk} "
f"\n Worker Threads: {self.cluster.worker_threads} "
)
self.cluster._log(f"Creating scheduler instance {self.name}")
Expand Down Expand Up @@ -244,6 +253,7 @@ def __init__(
self.worker_options = worker_options
self.cpu = self.cluster.worker_cpu
self.memory = self.cluster.worker_mem
self.disk = self.cluster.worker_disk

# On this case, the worker must connect to the scheduler internal URL with the "ws" protocol and port 80
internal_scheduler = f"ws://{self.cluster.scheduler_internal_ip}:80"
Expand Down Expand Up @@ -300,6 +310,8 @@ class IBMCodeEngineCluster(VMCluster):
The amount of memory to allocate to the scheduler.
See: https://cloud.ibm.com/docs/codeengine?topic=codeengine-mem-cpu-combo
scheduler_disk: str
The amount of ephemeral storage to allocate to the scheduler. This value must be lower than scheduler_mem.
scheduler_timeout: int
The timeout for the scheduler in seconds.
worker_cpu: str
Expand All @@ -310,6 +322,8 @@ class IBMCodeEngineCluster(VMCluster):
The amount of memory to allocate to each worker.
See: https://cloud.ibm.com/docs/codeengine?topic=codeengine-mem-cpu-combo
worker_disk: str
The amount of ephemeral storage to allocate to each worker. This value must be lower than worker_mem.
worker_threads: int
The number of threads to use on each worker.
debug: bool, optional
Expand Down Expand Up @@ -347,9 +361,11 @@ class IBMCodeEngineCluster(VMCluster):
Project id: f21626f6-54f7-4065-a038-75c8b9a0d2e0
Scheduler CPU: 0.25
Scheduler Memory: 1G
Scheduler Disk: 400M
Scheduler Timeout: 600
Worker CPU: 2
Worker Memory: 4G
Worker Disk: 400M
Creating scheduler dask-xxxxxxxx-scheduler
Waiting for scheduler to run at dask-xxxxxxxx-scheduler.xxxxxxxxxxxx.xx-xx.codeengine.appdomain.cloud:443
Scheduler is running
Expand Down Expand Up @@ -382,9 +398,12 @@ class IBMCodeEngineCluster(VMCluster):
Project id: f21626f6-54f7-4065-a038-75c8b9a0d2e0
Scheduler CPU: 0.25
Scheduler Memory: 1G
Scheduler Disk: 400M
Scheduler Timeout: 600
Worker CPU: 2
Worker Memory: 4G
Worker Disk: 400M
Worker Threads: 1
Creating scheduler dask-xxxxxxxx-scheduler
Waiting for scheduler to run at dask-xxxxxxxx-scheduler.xxxxxxxxxxxx.xx-xx.codeengine.appdomain.cloud:443
Scheduler is running
Expand All @@ -402,9 +421,11 @@ def __init__(
project_id: str = None,
scheduler_cpu: str = None,
scheduler_mem: str = None,
scheduler_disk: str = None,
scheduler_timeout: int = None,
worker_cpu: str = None,
worker_mem: str = None,
worker_disk: str = None,
worker_threads: int = 1,
debug: bool = False,
**kwargs,
Expand All @@ -419,11 +440,13 @@ def __init__(
api_key = self.config.get("api_key")
self.scheduler_cpu = scheduler_cpu or self.config.get("scheduler_cpu")
self.scheduler_mem = scheduler_mem or self.config.get("scheduler_mem")
self.scheduler_disk = scheduler_disk or self.config.get("scheduler_disk")
self.scheduler_timeout = scheduler_timeout or self.config.get(
"scheduler_timeout"
)
self.worker_cpu = worker_cpu or self.config.get("worker_cpu")
self.worker_mem = worker_mem or self.config.get("worker_mem")
self.worker_disk = worker_disk or self.config.get("worker_disk")
self.worker_threads = worker_threads or self.config.get("worker_threads")

self.debug = debug
Expand All @@ -436,9 +459,11 @@ def __init__(
"project_id": self.project_id,
"scheduler_cpu": self.scheduler_cpu,
"scheduler_mem": self.scheduler_mem,
"scheduler_disk": self.scheduler_disk,
"scheduler_timeout": self.scheduler_timeout,
"worker_cpu": self.worker_cpu,
"worker_mem": self.worker_mem,
"worker_disk": self.worker_disk,
"worker_threads": self.worker_threads,
"api_key": api_key,
}
Expand Down

0 comments on commit c6eb826

Please sign in to comment.