Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 71 additions & 5 deletions src/sage/parallel/use_fork.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@


import sys
import traceback
from shutil import rmtree

if sys.platform != 'win32':
Expand Down Expand Up @@ -157,6 +158,58 @@ def __call__(self, f, inputs):
sage: del unpickle_override[('sage.rings.polynomial.polynomial_rational_flint', 'Polynomial_rational_flint')]
sage: list(Polygen([QQ,QQ]))
[(((Rational Field,), {}), x), (((Rational Field,), {}), x)]

When the function raises an exception, the exception is re-raised in the parent process with the same type.
The traceback is gone, but we print a textual version::

sage: def fff(x): return ggg(x)
sage: def ggg(x): return hhh(x)
sage: def hhh(x): return 1/x
sage: try: list(F(fff, [([0],{}), ([1],{})]))
....: except: import traceback; traceback.print_exc()
RuntimeError: forked subprocess raised:
Traceback (most recent call last):
...in ggg...
ZeroDivisionError: rational division by zero
The above exception was the direct cause of the following exception:
...
ZeroDivisionError: rational division by zero

When the erroneous invocation is not the first one, because of parallelism,
the exception might be raised on the first or second ``next()`` function call.
Both of the following behaviors are possible::

sage: # not tested
sage: it = F(fff, [([1],{}), ([0],{})])
sage: next(it)
(([1], {}), 1)
sage: next(it)
Traceback (most recent call last):
...
ZeroDivisionError: rational division by zero

sage: # not tested
sage: it = F(fff, [([1],{}), ([0],{})])
sage: next(it)
Traceback (most recent call last):
...
ZeroDivisionError: rational division by zero

Other corner cases::

sage: class UnpicklableException(Exception): __reduce__ = None
sage: def fff(x): raise UnpicklableException()
sage: list(F(fff, [([0],{}), ([1],{})]))
Traceback (most recent call last):
...
RuntimeError: cannot pickle exception object

sage: class UnpicklableValue: __reduce__ = None
sage: def fff(x): return UnpicklableValue()
sage: list(F(fff, [([0],{}), ([1],{})]))
Traceback (most recent call last):
...
RuntimeError: cannot pickle return value
"""
import os
import sys
Expand Down Expand Up @@ -222,13 +275,13 @@ def __call__(self, f, inputs):
with open(sobj, "rb") as file:
data = file.read()
except OSError:
answer = "NO DATA" + W.failure
should_raise, answer = False, "NO DATA" + W.failure
else:
os.unlink(sobj)
try:
answer = loads(data, compress=False)
should_raise, answer = loads(data, compress=False)
except Exception as E:
answer = "INVALID DATA {}".format(E)
should_raise, answer = False, "INVALID DATA {}".format(E)

out = os.path.join(dir, '%s.out' % pid)
try:
Expand All @@ -238,6 +291,10 @@ def __call__(self, f, inputs):
except OSError:
pass

if should_raise:
exc, tb_str = answer
raise exc from RuntimeError(f"forked subprocess raised:\n{tb_str}")

yield (W.input, answer)
finally:
# Send SIGKILL signal to workers that are left.
Expand Down Expand Up @@ -325,8 +382,17 @@ def _subprocess(self, f, dir, args, kwds={}):
set_random_seed(self.worker_seed)

# Now evaluate the function f.
value = f(*args, **kwds)
try:
value = False, f(*args, **kwds)
except BaseException as e:
value = True, (e, traceback.format_exc())

# And save the result to disk.
sobj = os.path.join(dir, '%s.sobj' % os.getpid())
save(value, sobj, compress=False)
try:
save(value, sobj, compress=False)
except BaseException as e:
if value[0]:
save((True, (RuntimeError('cannot pickle exception object'), traceback.format_exc())), sobj, compress=False)
else:
save((True, (RuntimeError('cannot pickle return value'), traceback.format_exc())), sobj, compress=False)
Loading