Skip to content

Commit 7945f5a

Browse files
create classes for submitting and watching DDP jobs (#70)
* create classes for submitting and watching DDP jobs Signed-off-by: Kevin <[email protected]> * update demo notebook to use new job class Signed-off-by: Kevin <[email protected]> * added job tests (#1) * WIP job tests * added unit tests for Jobs * add more specificity to tests * add torchx to requirements file Signed-off-by: Kevin <[email protected]> --------- Signed-off-by: Kevin <[email protected]> Co-authored-by: Michael Clifford <[email protected]>
1 parent 2c563bc commit 7945f5a

File tree

6 files changed

+417
-1796
lines changed

6 files changed

+417
-1796
lines changed

demo-notebooks/batch-job/batch_mnist.ipynb

Lines changed: 78 additions & 1795 deletions
Large diffs are not rendered by default.

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
openshift-client==1.0.18
22
rich==12.5.1
33
ray[default]==2.1.0
4+
git+https://github.com/project-codeflare/torchx@6517d5b060e4fe32b9ad41019c3bef647095c35f#egg=torchx

src/codeflare_sdk/cluster/cluster.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
from os import stat
2222
from time import sleep
23-
from typing import List, Optional, Tuple
23+
from typing import List, Optional, Tuple, Dict
2424

2525
import openshift as oc
2626
from ray.job_submission import JobSubmissionClient
@@ -45,6 +45,8 @@ class Cluster:
4545
Note that currently, the underlying implementation is a Ray cluster.
4646
"""
4747

48+
torchx_scheduler = "ray"
49+
4850
def __init__(self, config: ClusterConfiguration):
4951
"""
5052
Create the resource cluster object by passing in a ClusterConfiguration
@@ -268,6 +270,20 @@ def job_logs(self, job_id: str) -> str:
268270
client = JobSubmissionClient(dashboard_route)
269271
return client.get_job_logs(job_id)
270272

273+
def torchx_config(
274+
self, working_dir: str = None, requirements: str = None
275+
) -> Dict[str, str]:
276+
dashboard_address = f"{self.cluster_dashboard_uri().lstrip('http://')}"
277+
to_return = {
278+
"cluster_name": self.config.name,
279+
"dashboard_address": dashboard_address,
280+
}
281+
if working_dir:
282+
to_return["working_dir"] = working_dir
283+
if requirements:
284+
to_return["requirements"] = requirements
285+
return to_return
286+
271287

272288
def get_current_namespace() -> str:
273289
"""

src/codeflare_sdk/job/__init__.py

Whitespace-only changes.

src/codeflare_sdk/job/jobs.py

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
# Copyright 2023 IBM, Red Hat
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
import abc
17+
from typing import TYPE_CHECKING, Optional, Dict, List
18+
from pathlib import Path
19+
20+
from torchx.components.dist import ddp
21+
from torchx.runner import get_runner
22+
from torchx.specs import AppHandle, parse_app_handle, AppDryRunInfo
23+
24+
if TYPE_CHECKING:
25+
from ..cluster.cluster import Cluster
26+
27+
all_jobs: List["Job"] = []
28+
torchx_runner = get_runner()
29+
30+
31+
class JobDefinition(metaclass=abc.ABCMeta):
32+
def _dry_run(self, cluster: "Cluster"):
33+
pass
34+
35+
def submit(self, cluster: "Cluster"):
36+
pass
37+
38+
39+
class Job(metaclass=abc.ABCMeta):
40+
def status(self):
41+
pass
42+
43+
def logs(self):
44+
pass
45+
46+
47+
class DDPJobDefinition(JobDefinition):
48+
def __init__(
49+
self,
50+
script: Optional[str] = None,
51+
m: Optional[str] = None,
52+
script_args: Optional[List[str]] = None,
53+
name: Optional[str] = None,
54+
cpu: Optional[int] = None,
55+
gpu: Optional[int] = None,
56+
memMB: Optional[int] = None,
57+
h: Optional[str] = None,
58+
j: Optional[str] = None,
59+
env: Optional[Dict[str, str]] = None,
60+
max_retries: int = 0,
61+
mounts: Optional[List[str]] = None,
62+
rdzv_port: int = 29500,
63+
scheduler_args: Optional[Dict[str, str]] = None,
64+
):
65+
if bool(script) == bool(m): # logical XOR
66+
raise ValueError(
67+
"Exactly one of the following arguments must be defined: [script, m]."
68+
)
69+
self.script = script
70+
self.m = m
71+
self.script_args: List[str] = script_args if script_args is not None else []
72+
self.name = name
73+
self.cpu = cpu
74+
self.gpu = gpu
75+
self.memMB = memMB
76+
self.h = h
77+
self.j = j
78+
self.env: Dict[str, str] = env if env is not None else dict()
79+
self.max_retries = max_retries
80+
self.mounts: List[str] = mounts if mounts is not None else []
81+
self.rdzv_port = rdzv_port
82+
self.scheduler_args: Dict[str, str] = (
83+
scheduler_args if scheduler_args is not None else dict()
84+
)
85+
86+
def _dry_run(self, cluster: "Cluster"):
87+
j = f"{cluster.config.max_worker}x{max(cluster.config.gpu, 1)}" # # of proc. = # of gpus
88+
return torchx_runner.dryrun(
89+
app=ddp(
90+
*self.script_args,
91+
script=self.script,
92+
m=self.m,
93+
name=self.name,
94+
h=self.h,
95+
cpu=self.cpu if self.cpu is not None else cluster.config.max_cpus,
96+
gpu=self.gpu if self.gpu is not None else cluster.config.gpu,
97+
memMB=self.memMB
98+
if self.memMB is not None
99+
else cluster.config.max_memory * 1024,
100+
j=self.j if self.j is not None else j,
101+
env=self.env,
102+
max_retries=self.max_retries,
103+
rdzv_port=self.rdzv_port,
104+
mounts=self.mounts,
105+
),
106+
scheduler=cluster.torchx_scheduler,
107+
cfg=cluster.torchx_config(**self.scheduler_args),
108+
workspace=f"file://{Path.cwd()}",
109+
)
110+
111+
def submit(self, cluster: "Cluster") -> "Job":
112+
return DDPJob(self, cluster)
113+
114+
115+
class DDPJob(Job):
116+
def __init__(self, job_definition: "DDPJobDefinition", cluster: "Cluster"):
117+
self.job_definition = job_definition
118+
self.cluster = cluster
119+
self._app_handle = torchx_runner.schedule(job_definition._dry_run(cluster))
120+
all_jobs.append(self)
121+
122+
def status(self) -> str:
123+
return torchx_runner.status(self._app_handle)
124+
125+
def logs(self) -> str:
126+
return "".join(torchx_runner.log_lines(self._app_handle, None))

tests/unit_test.py

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import sys
1717
import filecmp
1818
import os
19+
import re
1920

2021
parent = Path(__file__).resolve().parents[1]
2122
sys.path.append(str(parent) + "/src")
@@ -46,10 +47,20 @@
4647
RayClusterStatus,
4748
CodeFlareClusterStatus,
4849
)
50+
from codeflare_sdk.job.jobs import (
51+
JobDefinition,
52+
Job,
53+
DDPJobDefinition,
54+
DDPJob,
55+
torchx_runner,
56+
)
4957
import openshift
5058
from openshift import OpenShiftPythonException
5159
from openshift.selector import Selector
5260
import ray
61+
from torchx.specs import AppDryRunInfo, AppDef
62+
from torchx.runner import get_runner, Runner
63+
from torchx.schedulers.ray_scheduler import RayJob
5364
import pytest
5465

5566

@@ -1535,6 +1546,7 @@ def test_cluster_status(mocker):
15351546
mocker.patch(
15361547
"codeflare_sdk.cluster.cluster._ray_cluster_status", return_value=fake_ray
15371548
)
1549+
15381550
status, ready = cf.status()
15391551
assert status == CodeFlareClusterStatus.STARTING
15401552
assert ready == False
@@ -1594,3 +1606,186 @@ def test_cmd_line_generation():
15941606
def test_cleanup():
15951607
os.remove("test.yaml")
15961608
os.remove("raytest2.yaml")
1609+
1610+
1611+
def test_jobdefinition_coverage():
1612+
abstract = JobDefinition()
1613+
cluster = Cluster(test_config_creation())
1614+
abstract._dry_run(cluster)
1615+
abstract.submit(cluster)
1616+
1617+
1618+
def test_job_coverage():
1619+
abstract = Job()
1620+
abstract.status()
1621+
abstract.logs()
1622+
1623+
1624+
def test_DDPJobDefinition_creation():
1625+
ddp = DDPJobDefinition(
1626+
script="test.py",
1627+
m=None,
1628+
script_args=["test"],
1629+
name="test",
1630+
cpu=1,
1631+
gpu=0,
1632+
memMB=1024,
1633+
h=None,
1634+
j="2x1",
1635+
env={"test": "test"},
1636+
max_retries=0,
1637+
mounts=[],
1638+
rdzv_port=29500,
1639+
scheduler_args={"requirements": "test"},
1640+
)
1641+
assert ddp.script == "test.py"
1642+
assert ddp.m == None
1643+
assert ddp.script_args == ["test"]
1644+
assert ddp.name == "test"
1645+
assert ddp.cpu == 1
1646+
assert ddp.gpu == 0
1647+
assert ddp.memMB == 1024
1648+
assert ddp.h == None
1649+
assert ddp.j == "2x1"
1650+
assert ddp.env == {"test": "test"}
1651+
assert ddp.max_retries == 0
1652+
assert ddp.mounts == []
1653+
assert ddp.rdzv_port == 29500
1654+
assert ddp.scheduler_args == {"requirements": "test"}
1655+
return ddp
1656+
1657+
1658+
def test_DDPJobDefinition_dry_run():
1659+
"""
1660+
Test that the dry run method returns the correct type: AppDryRunInfo,
1661+
that the attributes of the returned object are of the correct type,
1662+
and that the values from cluster and job definition are correctly passed.
1663+
"""
1664+
ddp = test_DDPJobDefinition_creation()
1665+
cluster = Cluster(test_config_creation())
1666+
ddp_job = ddp._dry_run(cluster)
1667+
assert type(ddp_job) == AppDryRunInfo
1668+
assert ddp_job._fmt is not None
1669+
assert type(ddp_job.request) == RayJob
1670+
assert type(ddp_job._app) == AppDef
1671+
assert type(ddp_job._cfg) == type(dict())
1672+
assert type(ddp_job._scheduler) == type(str())
1673+
1674+
assert ddp_job.request.app_id.startswith("test")
1675+
assert ddp_job.request.working_dir.startswith("/tmp/torchx_workspace")
1676+
assert ddp_job.request.cluster_name == "unit-test-cluster"
1677+
assert ddp_job.request.requirements == "test"
1678+
1679+
assert ddp_job._app.roles[0].resource.cpu == 1
1680+
assert ddp_job._app.roles[0].resource.gpu == 0
1681+
assert ddp_job._app.roles[0].resource.memMB == 1024
1682+
1683+
assert ddp_job._cfg["cluster_name"] == "unit-test-cluster"
1684+
assert ddp_job._cfg["requirements"] == "test"
1685+
1686+
assert ddp_job._scheduler == "ray"
1687+
1688+
1689+
def test_DDPJobDefinition_dry_run_no_resource_args():
1690+
"""
1691+
Test that the dry run correctly gets resources from the cluster object
1692+
when the job definition does not specify resources.
1693+
"""
1694+
cluster = Cluster(test_config_creation())
1695+
ddp = DDPJobDefinition(
1696+
script="test.py",
1697+
m=None,
1698+
script_args=["test"],
1699+
name="test",
1700+
h=None,
1701+
env={"test": "test"},
1702+
max_retries=0,
1703+
mounts=[],
1704+
rdzv_port=29500,
1705+
scheduler_args={"requirements": "test"},
1706+
)
1707+
ddp_job = ddp._dry_run(cluster)
1708+
1709+
assert ddp_job._app.roles[0].resource.cpu == cluster.config.max_cpus
1710+
assert ddp_job._app.roles[0].resource.gpu == cluster.config.gpu
1711+
assert ddp_job._app.roles[0].resource.memMB == cluster.config.max_memory * 1024
1712+
assert (
1713+
parse_j(ddp_job._app.roles[0].args[1])
1714+
== f"{cluster.config.max_worker}x{cluster.config.gpu}"
1715+
)
1716+
1717+
1718+
def test_DDPJobDefinition_submit(mocker):
1719+
"""
1720+
Tests that the submit method returns the correct type: DDPJob
1721+
And that the attributes of the returned object are of the correct type
1722+
"""
1723+
ddp_def = test_DDPJobDefinition_creation()
1724+
cluster = Cluster(test_config_creation())
1725+
mocker.patch(
1726+
"codeflare_sdk.job.jobs.torchx_runner.schedule",
1727+
return_value="fake-dashboard-url",
1728+
) # a fake app_handle
1729+
ddp_job = ddp_def.submit(cluster)
1730+
assert type(ddp_job) == DDPJob
1731+
assert type(ddp_job.job_definition) == DDPJobDefinition
1732+
assert type(ddp_job.cluster) == Cluster
1733+
assert type(ddp_job._app_handle) == str
1734+
assert ddp_job._app_handle == "fake-dashboard-url"
1735+
1736+
1737+
def test_DDPJob_creation(mocker):
1738+
ddp_def = test_DDPJobDefinition_creation()
1739+
cluster = Cluster(test_config_creation())
1740+
mocker.patch(
1741+
"codeflare_sdk.job.jobs.torchx_runner.schedule",
1742+
return_value="fake-dashboard-url",
1743+
) # a fake app_handle
1744+
ddp_job = DDPJob(ddp_def, cluster)
1745+
assert type(ddp_job) == DDPJob
1746+
assert type(ddp_job.job_definition) == DDPJobDefinition
1747+
assert type(ddp_job.cluster) == Cluster
1748+
assert type(ddp_job._app_handle) == str
1749+
assert ddp_job._app_handle == "fake-dashboard-url"
1750+
_, args, kwargs = torchx_runner.schedule.mock_calls[0]
1751+
assert type(args[0]) == AppDryRunInfo
1752+
job_info = args[0]
1753+
assert type(job_info.request) == RayJob
1754+
assert type(job_info._app) == AppDef
1755+
assert type(job_info._cfg) == type(dict())
1756+
assert type(job_info._scheduler) == type(str())
1757+
return ddp_job
1758+
1759+
1760+
def test_DDPJob_status(mocker):
1761+
ddp_job = test_DDPJob_creation(mocker)
1762+
mocker.patch(
1763+
"codeflare_sdk.job.jobs.torchx_runner.status", return_value="fake-status"
1764+
)
1765+
assert ddp_job.status() == "fake-status"
1766+
_, args, kwargs = torchx_runner.status.mock_calls[0]
1767+
assert args[0] == "fake-dashboard-url"
1768+
1769+
1770+
def test_DDPJob_logs(mocker):
1771+
ddp_job = test_DDPJob_creation(mocker)
1772+
mocker.patch(
1773+
"codeflare_sdk.job.jobs.torchx_runner.log_lines", return_value="fake-logs"
1774+
)
1775+
assert ddp_job.logs() == "fake-logs"
1776+
_, args, kwargs = torchx_runner.log_lines.mock_calls[0]
1777+
assert args[0] == "fake-dashboard-url"
1778+
1779+
1780+
def parse_j(cmd):
1781+
1782+
pattern = r"--nnodes\s+\d+\s+--nproc_per_node\s+\d+"
1783+
match = re.search(pattern, cmd)
1784+
if match:
1785+
substring = match.group(0)
1786+
else:
1787+
return None
1788+
args = substring.split()
1789+
max_worker = args[1]
1790+
gpu = args[3]
1791+
return f"{max_worker}x{gpu}"

0 commit comments

Comments
 (0)