Skip to content

Commit fdd363c

Browse files
Interactive task via IO change (#30)
* Interactive task changes after ONTAK * Fix missing pass_fds * Bump sioworkers version --------- Co-authored-by: Tomasz Kwiatkowski <[email protected]>
1 parent c508adb commit fdd363c

File tree

4 files changed

+114
-63
lines changed

4 files changed

+114
-63
lines changed

setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
setup(
44
name = "sioworkers",
5-
version = '1.5.4',
5+
version = '1.5.5',
66
author = "SIO2 Project Team",
77
author_email = '[email protected]',
88
description = "Programming contest judging infrastructure",

sio/executors/common.py

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ def _populate_environ(renv, environ):
1818
environ[key] = renv.get(key, '')
1919
if 'out_file' in renv:
2020
environ['out_file'] = renv['out_file']
21+
environ['result_percentage'] = renv.get('result_percentage', (0, 1))
2122

2223

2324
def _extract_input_if_zipfile(input_name, zipdir):

sio/executors/interactive_common.py

+110-62
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,29 @@ def __init__(self, message, interactor_out, env, renv, irenv):
2727
)
2828

2929

30+
class Pipes:
31+
"""
32+
Class for storing file descriptors for interactor and solution processes.
33+
"""
34+
r_interactor = None
35+
w_interactor = None
36+
r_solution = None
37+
w_solution = None
38+
39+
def __init__(self, r_interactor, w_interactor, r_solution, w_solution):
40+
"""
41+
Constructor for Pipes class.
42+
:param r_interactor: file descriptor from which the interactor reads from the solution
43+
:param w_interactor: file descriptor to which the interactor writes to the solution
44+
:param r_solution: file descriptor from which the solution reads from the interactor
45+
:param w_solution: file descriptor to which the solution writes to the interactor
46+
"""
47+
self.r_interactor = r_interactor
48+
self.w_interactor = w_interactor
49+
self.r_solution = r_solution
50+
self.w_solution = w_solution
51+
52+
3053
def _limit_length(s):
3154
if len(s) > RESULT_STRING_LENGTH_LIMIT:
3255
suffix = b'[...]'
@@ -70,18 +93,7 @@ def _fill_result(env, renv, irenv, interactor_out):
7093
inter_sig = irenv.get('exit_signal', None)
7194
sigpipe = signal.SIGPIPE.value
7295

73-
if irenv['result_code'] != 'OK' and inter_sig != sigpipe:
74-
renv['result_code'] = 'SE'
75-
raise InteractorError(f'Interactor got {irenv["result_code"]}.', interactor_out, env, renv, irenv)
76-
elif renv['result_code'] != 'OK' and sol_sig != sigpipe:
77-
return
78-
elif len(interactor_out) == 0:
79-
renv['result_code'] = 'SE'
80-
raise InteractorError(f'Empty interactor out.', interactor_out, env, renv, irenv)
81-
elif inter_sig == sigpipe:
82-
renv['result_code'] = 'WA'
83-
renv['result_string'] = 'solution exited prematurely'
84-
else:
96+
if six.ensure_binary(interactor_out[0]) != b'':
8597
renv['result_string'] = ''
8698
if six.ensure_binary(interactor_out[0]) == b'OK':
8799
renv['result_code'] = 'OK'
@@ -93,11 +105,25 @@ def _fill_result(env, renv, irenv, interactor_out):
93105
if interactor_out[1]:
94106
renv['result_string'] = _limit_length(interactor_out[1])
95107
renv['result_percentage'] = (0, 1)
108+
elif irenv['result_code'] != 'OK' and irenv['result_code'] != 'TLE' and inter_sig != sigpipe:
109+
renv['result_code'] = 'SE'
110+
raise InteractorError(f'Interactor got {irenv["result_code"]}.', interactor_out, env, renv, irenv)
111+
elif renv['result_code'] != 'OK' and sol_sig != sigpipe:
112+
return
113+
elif inter_sig == sigpipe:
114+
renv['result_code'] = 'WA'
115+
renv['result_string'] = 'solution exited prematurely'
116+
elif irenv.get('real_time_killed', False):
117+
renv['result_code'] = 'TLE'
118+
renv['result_string'] = 'interactor time limit exceeded (user\'s solution or interactor can be the cause)'
119+
else:
120+
raise InteractorError(f'Unexpected interactor error', interactor_out, env, renv, irenv)
96121

97122

98123
def _run(environ, executor, use_sandboxes):
99124
input_name = tempcwd('in')
100125

126+
num_processes = environ.get('num_processes', 1)
101127
file_executor = get_file_runner(executor, environ)
102128
interactor_executor = DetailedUnprotectedExecutor()
103129
exe_filename = file_executor.preferred_filename()
@@ -113,13 +139,18 @@ def _run(environ, executor, use_sandboxes):
113139
os.mkdir(zipdir)
114140
try:
115141
input_name = _extract_input_if_zipfile(input_name, zipdir)
142+
proc_pipes = []
116143

117-
r1, w1 = os.pipe()
118-
r2, w2 = os.pipe()
119-
for fd in (r1, w1, r2, w2):
120-
os.set_inheritable(fd, True)
144+
for i in range(num_processes):
145+
r1, w1 = os.pipe()
146+
r2, w2 = os.pipe()
147+
for fd in (r1, w1, r2, w2):
148+
os.set_inheritable(fd, True)
149+
proc_pipes.append(Pipes(r1, w2, r2, w1))
121150

122-
interactor_args = [os.path.basename(input_name), 'out']
151+
interactor_args = [str(num_processes)]
152+
for pipes in proc_pipes:
153+
interactor_args.extend([str(pipes.r_interactor), str(pipes.w_interactor)])
123154

124155
interactor_time_limit = 2 * environ['exec_time_limit']
125156

@@ -139,51 +170,68 @@ def run(self):
139170
except Exception as e:
140171
self.exception = e
141172

142-
with interactor_executor as ie:
143-
interactor = ExecutionWrapper(
144-
ie,
145-
[tempcwd(interactor_filename)] + interactor_args,
146-
stdin=r2,
147-
stdout=w1,
148-
ignore_errors=True,
149-
environ=environ,
150-
environ_prefix='interactor_',
151-
mem_limit=DEFAULT_INTERACTOR_MEM_LIMIT,
152-
time_limit=interactor_time_limit,
153-
fds_to_close=(r2, w1),
154-
close_passed_fd=True,
155-
cwd=tempcwd(),
156-
in_file=environ['in_file'],
157-
)
158-
159-
with file_executor as fe:
160-
exe = ExecutionWrapper(
161-
fe,
162-
tempcwd(exe_filename),
163-
[],
164-
stdin=r1,
165-
stdout=w2,
166-
ignore_errors=True,
167-
environ=environ,
168-
environ_prefix='exec_',
169-
fds_to_close=(r1, w2),
170-
close_passed_fd=True,
171-
cwd=tempcwd(),
172-
in_file=environ['in_file'],
173-
)
174-
175-
exe.start()
176-
interactor.start()
177-
178-
exe.join()
179-
interactor.join()
180-
181-
for ew in (exe, interactor):
182-
if ew.exception is not None:
183-
raise ew.exception
184-
185-
renv = exe.value
186-
irenv = interactor.value
173+
with open(input_name, 'rb') as infile, open(tempcwd('out'), 'wb') as outfile:
174+
processes = []
175+
interactor_fds = []
176+
for pipes in proc_pipes:
177+
interactor_fds.extend([pipes.r_interactor, pipes.w_interactor])
178+
179+
with interactor_executor as ie:
180+
interactor = ExecutionWrapper(
181+
ie,
182+
[tempcwd(interactor_filename)] + interactor_args,
183+
stdin=infile,
184+
stdout=outfile,
185+
ignore_errors=True,
186+
environ=environ,
187+
environ_prefix='interactor_',
188+
mem_limit=DEFAULT_INTERACTOR_MEM_LIMIT,
189+
time_limit=interactor_time_limit,
190+
fds_to_close=interactor_fds,
191+
pass_fds=interactor_fds,
192+
cwd=tempcwd(),
193+
)
194+
195+
for i in range(num_processes):
196+
pipes = proc_pipes[i]
197+
with file_executor as fe:
198+
exe = ExecutionWrapper(
199+
fe,
200+
tempcwd(exe_filename),
201+
[str(i)],
202+
stdin=pipes.r_solution,
203+
stdout=pipes.w_solution,
204+
ignore_errors=True,
205+
environ=environ,
206+
environ_prefix='exec_',
207+
fds_to_close=[pipes.r_solution, pipes.w_solution],
208+
cwd=tempcwd(),
209+
)
210+
processes.append(exe)
211+
212+
for process in processes:
213+
process.start()
214+
interactor.start()
215+
216+
for process in processes:
217+
process.join()
218+
interactor.join()
219+
220+
if interactor.exception:
221+
raise interactor.exception
222+
for process in processes:
223+
if process.exception:
224+
raise process.exception
225+
226+
renv = processes[0].value
227+
for process in processes:
228+
if process.value['result_code'] != 'OK':
229+
renv = process.value
230+
break
231+
renv['time_used'] = max(renv['time_used'], process.value['time_used'])
232+
renv['mem_used'] = max(renv['mem_used'], process.value['mem_used'])
233+
234+
irenv = interactor.value
187235

188236
try:
189237
with open(tempcwd('out'), 'rb') as result_file:

sio/workers/executors.py

+2
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ def execute_command(
8080
ignore_errors=False,
8181
extra_ignore_errors=(),
8282
cwd=None,
83+
pass_fds=(),
8384
fds_to_close=(),
8485
**kwargs,
8586
):
@@ -140,6 +141,7 @@ def execute_command(
140141
stderr=forward_stderr and subprocess.STDOUT or stderr,
141142
shell=True,
142143
close_fds=True,
144+
pass_fds=pass_fds,
143145
universal_newlines=True,
144146
env=env,
145147
cwd=cwd,

0 commit comments

Comments
 (0)