Skip to content

Resolve intermittent capture_output deadlock #3601

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
May 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 73 additions & 33 deletions pyomo/common/tee.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import threading
import time

from pyomo.common.errors import DeveloperError
from pyomo.common.log import LoggingIntercept, LogStream

_poll_interval = 0.0001
Expand All @@ -34,8 +35,8 @@
# polling timeout when waiting to close threads. This will bail on
# closing threast after a minimum of 13.1 seconds and a worst case of
# ~(13.1 * #threads) seconds
_poll_timeout = 3 # 15 rounds: 0.0001 * 2**15 == 3.2768
_poll_timeout_deadlock = 200 # 21 rounds: 0.0001 * 2**21 == 209.7152
_poll_timeout = 1 # 14 rounds: 0.0001 * 2**14 == 1.6384
_poll_timeout_deadlock = 100 # seconds

_noop = lambda: None
_mswindows = sys.platform.startswith('win')
Expand Down Expand Up @@ -240,10 +241,11 @@ class capture_output(object):

"""

startup_shutdown = threading.Lock()

def __init__(self, output=None, capture_fd=False):
if output is None:
output = io.StringIO()
self.output = output
self.output_stream = None
self.old = None
self.tee = None
self.capture_fd = capture_fd
Expand Down Expand Up @@ -285,13 +287,45 @@ def _exit_context_stack(self, et, ev, tb):
return FAIL

def __enter__(self):
if not capture_output.startup_shutdown.acquire(timeout=_poll_timeout_deadlock):
# This situation *shouldn't* happen. If it does, it is
# unlikely that the user can fix it (or even debug it).
# Instead they should report it back to us.
#
# Breadcrumbs:
#
# - The last time we hit this [5/2025], it was because we
# were using capture_output in a solver's __del__. This
# led to the GC deleting the solver while another solver
# was trying to start up / run (so the other solver held
# the lock, but the GC interrupted that thread and
# wouldn't let go).
raise DeveloperError("Deadlock starting capture_output")
try:
return self._enter_impl()
finally:
capture_output.startup_shutdown.release()

def __exit__(self, et, ev, tb):
if not capture_output.startup_shutdown.acquire(timeout=_poll_timeout_deadlock):
# See comments & breadcrumbs in __enter__() above.
raise DeveloperError("Deadlock closing capture_output")
try:
return self._exit_impl(et, ev, tb)
finally:
capture_output.startup_shutdown.release()

def _enter_impl(self):
self.old = (sys.stdout, sys.stderr)
old_fd = []
for stream in self.old:
stream.flush()
try:
old_fd.append(stream.fileno())
except (AttributeError, OSError):
stream.flush()
try:
old_fd.append(stream.fileno())
except (AttributeError, OSError):
old_fd.append(None)
except (ValueError, OSError):
old_fd.append(None)
try:
# We have an issue where we are (very aggressively)
Expand Down Expand Up @@ -325,6 +359,8 @@ def __enter__(self):

if isinstance(self.output, str):
self.output_stream = self._enter_context(open(self.output, 'w'))
elif self.output is None:
self.output_stream = io.StringIO()
else:
self.output_stream = self.output
if isinstance(self.output, TeeStream):
Expand Down Expand Up @@ -399,26 +435,28 @@ def __enter__(self):
buf = buf[0]
return buf

def __exit__(self, et, ev, tb):
def _exit_impl(self, et, ev, tb):
# Check that we were nested correctly
FAIL = []
if self.tee._stdout is not None and self.tee.STDOUT is not sys.stdout:
FAIL.append(
'Captured output (%s) does not match sys.stdout (%s).'
% (self.tee._stdout, sys.stdout)
)
if self.tee._stderr is not None and self.tee.STDERR is not sys.stderr:
FAIL.append(
'Captured output (%s) does not match sys.stderr (%s).'
% (self.tee._stdout, sys.stdout)
)
if self.tee is not None:
if self.tee._stdout is not None and self.tee.STDOUT is not sys.stdout:
FAIL.append(
'Captured output (%s) does not match sys.stdout (%s).'
% (self.tee._stdout, sys.stdout)
)
if self.tee._stderr is not None and self.tee.STDERR is not sys.stderr:
FAIL.append(
'Captured output (%s) does not match sys.stderr (%s).'
% (self.tee._stdout, sys.stdout)
)
# Exit all context managers. This includes
# - Restore any file descriptors we commandeered
# - Close / join the TeeStream
# - Close any opened files
FAIL.extend(self._exit_context_stack(et, ev, tb))
sys.stdout, sys.stderr = self.old
self.old = None
if self.old is not None:
sys.stdout, sys.stderr = self.old
self.old = None
self.tee = None
self.output_stream = None
if FAIL:
Expand Down Expand Up @@ -667,15 +705,18 @@ def close(self, in_exception=False):

# Join all stream processing threads
_poll = _poll_interval
_timeout = 0.0
FAIL = False
while True:
for th in self._threads:
th.join(_poll)
_timeout += _poll
self._threads[:] = [th for th in self._threads if th.is_alive()]
if not self._threads:
break
_poll *= 2
if _poll_timeout <= _poll < 2 * _poll_timeout:
if _poll < _poll_timeout:
_poll *= 2.0
if _poll_timeout * 0.5 <= _poll < _poll_timeout:
if in_exception:
# We are already processing an exception: no reason
# to trigger another, nor to deadlock for an
Expand All @@ -687,7 +728,7 @@ def close(self, in_exception=False):
"Significant delay observed waiting to join reader "
"threads, possible output stream deadlock"
)
elif _poll >= _poll_timeout_deadlock:
elif _timeout > _poll_timeout_deadlock:
logger.error("TeeStream: deadlock observed joining reader threads")
# Defer raising the exception until after we have
# cleaned things up
Expand All @@ -713,7 +754,8 @@ def __exit__(self, et, ev, tb):
if not self._enter_count:
raise RuntimeError("TeeStream: exiting a context that was not entered")
self._enter_count -= 1
self.close(et is not None)
if not self._enter_count:
self.close(et is not None)

def __del__(self):
# Implement __del__ to guarantee that file descriptors are closed
Expand Down Expand Up @@ -785,19 +827,18 @@ def _mergedReader(self):
if _mswindows:
for handle in list(handles):
try:
if handle.flush:
flush = True
handle.flush = False
pipe = get_osfhandle(handle.read_pipe)
numAvail = PeekNamedPipe(pipe, 0)[1]
if numAvail:
result, new_data = ReadFile(pipe, numAvail, None)
handle.decoder_buffer += new_data
break
elif handle.flush:
break
except:
handles.remove(handle)
new_data = '' # not None so the poll interval doesn't increase
if new_data is None and not flush:
if new_data is None and not handle.flush:
# PeekNamedPipe is non-blocking; to avoid swamping
# the core, sleep for a "short" amount of time
time.sleep(_poll)
Expand All @@ -822,15 +863,14 @@ def _mergedReader(self):
else:
for handle in handles:
if handle.flush:
new_data = ''
break
else:
new_data = None
continue

if handle.flush:
flush = True
handle.flush = False
if handle.flush:
new_data = ''
flush = True
handle.flush = False

# At this point, we have new data sitting in the
# handle.decoder_buffer
Expand Down
47 changes: 46 additions & 1 deletion pyomo/common/tests/test_tee.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@

from io import StringIO, BytesIO

from pyomo.common.errors import DeveloperError
from pyomo.common.log import LoggingIntercept, LogStream
import pyomo.common.unittest as unittest
from pyomo.common.tempfiles import TempfileManager
import pyomo.common.tee as tee
import pyomo.common.unittest as unittest


class timestamper:
Expand Down Expand Up @@ -335,6 +336,17 @@ def test_duplicate_capture_output(self):
finally:
capture.reset()

def test_reset_capture_output_twice(self):
capture = tee.capture_output()
with capture as OUT1:
print("test1")
capture.reset()
capture.reset()
with capture as OUT2:
print("test2")
self.assertEqual(OUT1.getvalue(), "test1\n")
self.assertEqual(OUT2.getvalue(), "test2\n")

def test_capture_output_logfile_string(self):
with TempfileManager.new_context() as tempfile:
logfile = tempfile.create_tempfile()
Expand Down Expand Up @@ -537,6 +549,14 @@ def test_no_fileno_stdout(self):
with T:
self.assertEqual(len(T.context_stack), 6)

def test_closed_stdout(self):
with tee.capture_output() as T_outer:
sys.stdout.close()
with tee.capture_output() as T_inner:
print("test")
self.assertEqual(T_outer.getvalue(), "")
self.assertEqual(T_inner.getvalue(), "test\n")

def test_capture_output_stack_error(self):
OUT1 = StringIO()
OUT2 = StringIO()
Expand All @@ -559,6 +579,31 @@ def test_capture_output_stack_error(self):
sys.stdout, sys.stderr = old
logging.getLogger('pyomo.common.tee').handlers.clear()

def test_atomic_deadlock(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nifty test, I like it.

save_poll = tee._poll_timeout_deadlock
tee._poll_timeout_deadlock = 0.01

co = tee.capture_output()
try:
tee.capture_output.startup_shutdown.acquire()
with self.assertRaisesRegex(
DeveloperError, "Deadlock starting capture_output"
):
with tee.capture_output():
pass
tee.capture_output.startup_shutdown.release()

with self.assertRaisesRegex(
DeveloperError, "Deadlock closing capture_output"
):
with co:
tee.capture_output.startup_shutdown.acquire()
finally:
tee._poll_timeout_deadlock = save_poll
if tee.capture_output.startup_shutdown.locked():
tee.capture_output.startup_shutdown.release()
co.reset()

def test_capture_output_invalid_ostream(self):
# Test that capture_output does not suppress errors from the tee
# module
Expand Down
Loading
Loading