Skip to content

Commit c6b1888

Browse files
committed
feat: support set metadir as a fold from the cloud
1 parent 442e501 commit c6b1888

23 files changed

+1243
-643
lines changed

.gitignore

+3-1
Original file line numberDiff line numberDiff line change
@@ -158,4 +158,6 @@ docs/api/
158158
test.py
159159

160160
/local_tests/
161-
t.ipynb
161+
t.ipynb
162+
163+
gac_key.json

poetry.lock

+499-192
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

+10-9
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,23 @@ homepage = "https://github.com/pwwang/xqute"
88
repository = "https://github.com/pwwang/xqute"
99
readme = "README.md"
1010

11+
[tool.poetry.build]
12+
generate-setup-file = true
13+
1114
[tool.poetry.dependencies]
12-
python = "^3.8"
15+
python = "^3.9"
1316
simplug = "^0.4"
1417
diot = "^0.2"
1518
rich = "^13"
1619
uvloop = "^0"
17-
aiopath = [
18-
{version = "0.5.*", python = "<3.10"},
19-
{version = "0.6.*", python = ">=3.10,<3.12"},
20-
{version = "0.7.*", python = ">=3.12"},
21-
]
20+
cloudpathlib = "^0.20"
21+
google-cloud-storage = "^2.19.0"
22+
argx = "^0.2.11"
2223

23-
[tool.poetry.build]
24-
generate-setup-file = true
24+
[tool.poetry.extras]
25+
meta-gs = ["google-cloud-storage"]
2526

26-
[tool.poetry.dev-dependencies]
27+
[tool.poetry.group.dev.dependencies]
2728
pytest = "^8"
2829
pytest-cov = "^5"
2930
pytest-asyncio = "^0"

tests/conftest.py

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
import os
2+
3+
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "./gac_key.json"

tests/test_local_scheduler.py

+36-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
import asyncio
12
import pytest
23

4+
from xqute.defaults import JobStatus
35
from xqute.schedulers.local_scheduler import LocalJob, LocalScheduler
46

57

@@ -12,11 +14,26 @@ async def test_scheduler():
1214
assert isinstance(pid, int)
1315

1416

17+
@pytest.mark.asyncio
18+
async def test_scheduler_with_meta_gs():
19+
job = LocalJob(
20+
0,
21+
["echo", 1],
22+
metadir="gs://handy-buffer-287000.appspot.com/xqute_local_test",
23+
)
24+
25+
scheduler = LocalScheduler(1)
26+
pid = await scheduler.submit_job(job)
27+
28+
assert isinstance(pid, int)
29+
assert job.stdout_file.read_text() == "1\n"
30+
31+
1532
@pytest.mark.asyncio
1633
async def test_immediate_submission_failure():
1734

1835
class BadLocalJob(LocalJob):
19-
async def wrapped_script(self, scheduler):
36+
def wrapped_script(self, scheduler):
2037
wrapt_script = self.metadir / f"job.wrapped.{scheduler.name}"
2138
wrapt_script.write_text("sleep 1; bad_non_existent_command")
2239
return wrapt_script
@@ -30,3 +47,21 @@ async def wrapped_script(self, scheduler):
3047
RuntimeError, match="bad_non_existent_command: command not found"
3148
):
3249
await scheduler.submit_job(job)
50+
51+
52+
@pytest.mark.asyncio
53+
async def test_killing_running_jobs(caplog):
54+
job1 = LocalJob(0, ["sleep", "10"])
55+
job2 = LocalJob(1, ["sleep", "10"])
56+
57+
scheduler = LocalScheduler(2)
58+
await scheduler.submit_job_and_update_status(job1)
59+
await scheduler.submit_job_and_update_status(job2)
60+
61+
await asyncio.sleep(1)
62+
await scheduler.kill_running_jobs([job1, job2])
63+
64+
assert job1.status == JobStatus.FINISHED
65+
assert job2.status == JobStatus.FINISHED
66+
assert job1.rc != 0
67+
assert job2.rc != 0

tests/test_main.py

+193
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
import asyncio
2+
import pytest
3+
import subprocess
4+
from argx import Namespace
5+
from unittest.mock import patch, MagicMock, AsyncMock
6+
from xqute import __version__
7+
from xqute.__main__ import (
8+
_update_status,
9+
check_version_and_plugins,
10+
launch_job,
11+
main,
12+
SchedulerXquteInconsistencyWarning,
13+
)
14+
from xqute.defaults import JobStatus
15+
16+
# FILE: xqute/test___main__.py
17+
18+
19+
def test_update_status(tmp_path):
20+
metadir = tmp_path / "metadir"
21+
metadir.mkdir()
22+
status = JobStatus.RUNNING
23+
_update_status(str(metadir), status)
24+
assert (metadir / "job.status").read_text() == str(status)
25+
26+
27+
def test_check_version_and_plugins():
28+
args = MagicMock()
29+
args.version = "1.0.0"
30+
args.plugin = ["plugin1:1.0.0", "plugin2:2.0.0"]
31+
plugin1 = Namespace(name="plugin1", version="1.0.0")
32+
plugin2 = Namespace(name="plugin2", version="2.0.0")
33+
34+
with patch("xqute.__main__.__version__", "1.0.0"):
35+
with patch(
36+
"xqute.__main__.plugin.get_enabled_plugins",
37+
return_value={"plugin1": "1.0.0", "plugin2": "2.0.0"},
38+
):
39+
check_version_and_plugins(args)
40+
41+
with patch("xqute.__main__.__version__", "1.0.0"):
42+
with patch(
43+
"xqute.__main__.plugin.get_enabled_plugins",
44+
return_value={"plugin1": "1.0.0"},
45+
):
46+
with pytest.warns(SchedulerXquteInconsistencyWarning):
47+
check_version_and_plugins(args)
48+
49+
with patch("xqute.__main__.__version__", "1.0.0"):
50+
with patch(
51+
"xqute.__main__.plugin.get_enabled_plugins",
52+
return_value={"plugin1": plugin1, "plugin2": plugin2},
53+
):
54+
with pytest.warns(SchedulerXquteInconsistencyWarning):
55+
check_version_and_plugins(args)
56+
57+
58+
def test_check_version_and_plugins_version_mismatch():
59+
args = MagicMock()
60+
args.version = "1.0.0"
61+
62+
with patch("xqute.__main__.__version__", "2.0.0"):
63+
with pytest.warns(
64+
SchedulerXquteInconsistencyWarning,
65+
match="The xqute on the scheduler system is 2.0.0",
66+
):
67+
check_version_and_plugins(args)
68+
69+
args.plugin = ["plugin1:1.0.0"]
70+
with patch("xqute.__main__.__version__", "1.0.0"):
71+
with patch(
72+
"xqute.__main__.plugin.get_enabled_plugins",
73+
return_value={"plugin1": "2.0.0"},
74+
):
75+
with pytest.warns(
76+
SchedulerXquteInconsistencyWarning,
77+
match="Xqute plugin version mismatch: plugin1 1.0.0 != 2.0.0",
78+
):
79+
check_version_and_plugins(args)
80+
81+
82+
@pytest.mark.asyncio
83+
async def test_launch_job(tmp_path):
84+
args = MagicMock()
85+
args.metadir = str(tmp_path / "metadir")
86+
args.cmd = ["echo", "hello"]
87+
(tmp_path / "metadir").mkdir()
88+
89+
with patch(
90+
"xqute.__main__.plugin.hooks.on_jobsched_started", new_callable=AsyncMock
91+
):
92+
with patch(
93+
"xqute.__main__.plugin.hooks.on_jobsched_ended", new_callable=AsyncMock
94+
):
95+
await launch_job(args)
96+
97+
assert (tmp_path / "metadir" / "job.stdout").read_text() == "hello\n"
98+
assert (tmp_path / "metadir" / "job.rc").read_text() == "0"
99+
assert (tmp_path / "metadir" / "job.status").read_text() == str(JobStatus.FINISHED)
100+
101+
102+
@pytest.mark.asyncio
103+
async def test_launch_job_failed(tmp_path):
104+
metadir = tmp_path / "metadir"
105+
metadir.mkdir()
106+
args = Namespace(
107+
metadir=str(metadir),
108+
cmd=["bash", "-c", "exit 1"],
109+
scheduler="local",
110+
version=__version__,
111+
plugin=[],
112+
)
113+
114+
with pytest.raises(SystemExit):
115+
await launch_job(args)
116+
assert (metadir / "job.rc").read_text() != "0"
117+
118+
args = Namespace(
119+
metadir=str(metadir),
120+
cmd=["__NoNeXisT__"],
121+
scheduler="local",
122+
version=__version__,
123+
plugin=[],
124+
)
125+
126+
with pytest.raises(SystemExit):
127+
await launch_job(args)
128+
assert (metadir / "job.rc").read_text() != "0"
129+
130+
131+
@pytest.mark.asyncio
132+
async def test_launch_job_cancelled(tmp_path):
133+
args = MagicMock()
134+
args.metadir = str(tmp_path / "metadir")
135+
args.cmd = ["echo", "hello"]
136+
(tmp_path / "metadir").mkdir()
137+
138+
with patch(
139+
"xqute.__main__.plugin.hooks.on_jobsched_started", new_callable=AsyncMock
140+
):
141+
with patch(
142+
"xqute.__main__.plugin.hooks.on_jobsched_ended", new_callable=AsyncMock
143+
):
144+
with patch(
145+
"asyncio.create_subprocess_exec", side_effect=asyncio.CancelledError
146+
):
147+
with pytest.raises(asyncio.CancelledError):
148+
await launch_job(args)
149+
150+
assert (tmp_path / "metadir" / "job.rc").read_text() == "1"
151+
assert (tmp_path / "metadir" / "job.status").read_text() == str(JobStatus.FAILED)
152+
153+
154+
@pytest.mark.asyncio
155+
async def test_main(tmp_path):
156+
metadir = tmp_path / "metadir"
157+
metadir.mkdir()
158+
args = Namespace(
159+
metadir=str(metadir),
160+
cmd=["echo", "hello"],
161+
scheduler="local",
162+
version=__version__,
163+
plugin=[],
164+
)
165+
166+
await main(args)
167+
assert (metadir / "job.stdout").read_text() == "hello\n"
168+
169+
170+
@pytest.mark.asyncio
171+
async def test_cli(tmp_path):
172+
metadir = tmp_path / "metadir"
173+
metadir.mkdir()
174+
175+
# Use the real command line arguments
176+
p = subprocess.run(
177+
[
178+
"python",
179+
"-m",
180+
"xqute",
181+
"++scheduler",
182+
"local",
183+
"++version",
184+
__version__,
185+
"++metadir",
186+
str(metadir),
187+
"++cmd",
188+
"echo",
189+
"hello",
190+
]
191+
)
192+
assert p.returncode == 0
193+
assert (metadir / "job.stdout").read_text() == "hello\n"

tests/test_sge_scheduler.py

+31-23
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,28 @@
99
MOCKS = Path(__file__).parent / "mocks"
1010

1111

12-
def setup_module():
13-
qsub = str(MOCKS / "qsub")
14-
qdel = str(MOCKS / "qdel")
15-
qstat = str(MOCKS / "qstat")
12+
@pytest.fixture
13+
def qsub():
14+
cmd = str(MOCKS / "qsub")
15+
st = os.stat(cmd)
16+
os.chmod(cmd, st.st_mode | stat.S_IEXEC)
17+
return cmd
1618

17-
for qcmd in (qsub, qdel, qstat):
18-
st = os.stat(str(qcmd))
19-
os.chmod(str(qcmd), st.st_mode | stat.S_IEXEC)
19+
20+
@pytest.fixture
21+
def qdel():
22+
cmd = str(MOCKS / "qdel")
23+
st = os.stat(cmd)
24+
os.chmod(cmd, st.st_mode | stat.S_IEXEC)
25+
return cmd
26+
27+
28+
@pytest.fixture
29+
def qstat():
30+
cmd = str(MOCKS / "qstat")
31+
st = os.stat(cmd)
32+
os.chmod(cmd, st.st_mode | stat.S_IEXEC)
33+
return cmd
2034

2135

2236
@pytest.mark.asyncio
@@ -26,23 +40,20 @@ async def test_job():
2640
forks=1, qsub_notify=True, sge_l=["vmem=2G", "gpu=1"], sge_m="abe"
2741
)
2842
assert (
29-
await job.wrapped_script(scheduler)
30-
== DEFAULT_JOB_METADIR / "0" / "job.wrapped.sge"
43+
job.wrapped_script(scheduler)
44+
== Path(DEFAULT_JOB_METADIR) / "0" / "job.wrapped.sge"
3145
)
3246

33-
shebang = job.shebang(scheduler)
34-
assert "#$ -notify" in shebang
35-
assert "#$ -l vmem=2G" in shebang
36-
assert "#$ -l gpu=1" in shebang
37-
assert "#$ -m abe" in shebang
47+
script = job.wrap_script(scheduler)
48+
assert "#$ -notify" in script
49+
assert "#$ -l vmem=2G" in script
50+
assert "#$ -l gpu=1" in script
51+
assert "#$ -m abe" in script
3852

3953

4054
@pytest.mark.asyncio
41-
async def test_scheduler(capsys):
55+
async def test_scheduler(capsys, qsub, qdel, qstat):
4256
job = SgeJob(0, ["echo", 1])
43-
qsub = str(MOCKS / "qsub")
44-
qdel = str(MOCKS / "qdel")
45-
qstat = str(MOCKS / "qstat")
4657

4758
scheduler = SgeScheduler(1, qsub=qsub, qdel=qdel, qstat=qstat)
4859
assert await scheduler.submit_job(job) == "613815"
@@ -61,13 +72,10 @@ async def test_scheduler(capsys):
6172

6273

6374
@pytest.mark.asyncio
64-
async def test_submission_failure(capsys):
75+
async def test_submission_failure(capsys, qdel, qstat):
6576
job = SgeJob(0, ["echo", 1])
66-
qsub = str(MOCKS / "no_such_qsub")
67-
qdel = str(MOCKS / "qdel")
68-
qstat = str(MOCKS / "qstat")
6977

70-
scheduler = SgeScheduler(1, qsub=qsub, qdel=qdel, qstat=qstat)
78+
scheduler = SgeScheduler(1, qsub="no_such_qsub", qdel=qdel, qstat=qstat)
7179

7280
assert await scheduler.submit_job_and_update_status(job) is None
7381
assert await scheduler.job_is_running(job) is False

0 commit comments

Comments
 (0)