-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdispatch.py
146 lines (116 loc) · 4.67 KB
/
dispatch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import os
import sys
import time
import atexit
import select
import tempfile
import subprocess
import classad
import htcondor
"""Runs a lot of short jobs very quickly."""
def dispatch(commands, files, count=1):
"""Transfers each file in files to one or more sandboxes, then runs each
command in commands in one such sandbox."""
jobhash = _make_default_jobhash(files)
return dispatch_with_job(commands, jobhash, count)
def dispatch_with_job(commands, jobhash, count=1):
"""Submits one or more jobs to HTCondor (which may transfer files), then
runs each command in one of the jobs."""
return _main_select_loop(None, commands[:], jobhash, count)
def sweep(command, arguments, files, count=1):
"""Transfers each file in files to one or more sandboxes, then runs
command in one such sandbox once for each argument list in arguments."""
jobhash = _make_default_jobhash(files)
return sweep_with_job(command, arguments, jobhash, count)
def sweep_with_job(command, arguments, jobhash, count=1):
"""Submits one or more jobs to HTCondor (which may transfer files), then
runs command in one of the jobs for each argument list in arguments."""
return _main_select_loop(command, arguments[:], jobhash, count)
# ----------------------------------------------------------------------------
def _make_default_jobhash(files):
jobhash = {
"executable": "/bin/sleep",
"arguments": "300",
"transfer_executable": False,
"should_transfer_files": True,
}
(fd, logfile) = tempfile.mkstemp()
os.close(fd)
atexit.register(os.unlink, logfile)
jobhash["log"] = logfile
jobhash["transfer_input_files"] = ",".join(files)
return jobhash
def _open_ssh_pipes(cluster, proc):
ssh = subprocess.Popen(
["condor_ssh_to_job", "{0}.{1}".format(cluster, proc)],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
# We can't use communicate() because we need to do it more than once.
return ssh.stdin, ssh.stdout
def _remove_job(schedd, cluster, message):
schedd.act(htcondor.JobAction.Remove, "ClusterId == {0}".format(cluster), message)
def _main_select_loop(prefix, commands, jobhash, count):
logfile = jobhash["log"]
start = time.time()
sub = htcondor.Submit(jobhash)
schedd = htcondor.Schedd()
with schedd.transaction() as txn:
cluster = sub.queue(txn, count)
##
print("Completed submit to cluster {0}".format(cluster))
atexit.register(_remove_job, schedd, cluster, "dispatch cleaning up")
jel = htcondor.JobEventLog(logfile)
ready, pipesTo, pipesFro = [], [], []
results, froToResultMap, correspondingToPipe = {}, {}, {}
while True:
for event in jel.events(0):
if event.cluster == cluster:
if event.type is htcondor.JobEventType.EXECUTE:
##
print(
"Submit-to-startup time: {0} seconds".format(
time.time() - start
)
)
to, fro = _open_ssh_pipes(cluster, event.proc)
pipesTo.append(to)
pipesFro.append(fro)
correspondingToPipe[fro] = to
# Check for pipe events.
ready, _, _ = select.select(pipesFro, pipesTo, [], 0)
for f in ready:
result = f.readline()
if result.startswith("Welcome to"):
f.readline()
else:
result = result.strip()
results[froToResultMap[f]] = result
t = correspondingToPipe[f]
if len(commands) != 0:
froToResultMap[f] = commands.pop()
message = froToResultMap[f]
if not message.endswith("\n"):
message += "\n"
if prefix is None:
t.write(message)
else:
t.write("{0} {1}".format(prefix, message))
t.flush()
else:
pipesTo.remove(t)
pipesFro.remove(f)
if len(pipesTo) == 0 or len(pipesFro) == 0:
##
print("Jobs complete in {0} seconds".format(time.time() - start))
_remove_job(schedd, cluster, "dispatch complete")
return results
time.sleep(0.01)
if __name__ == "__main__":
# Tests not implemented.
# We could also permit something like:
# $ script | condor_dispatch [submit-file]
# where the script produces a newline-separated sequence of commands to
# dispatch, but that may be better suited, and more idiomatic, as its
# own wrapper script.
pass