Skip to content

Commit f680e04

Browse files
authored
Merge pull request #3601 from jsiirola/capture-output-deadlock
Resolve intermittent `capture_output` deadlock
2 parents 5bd2849 + 4028582 commit f680e04

File tree

5 files changed

+191
-90
lines changed

5 files changed

+191
-90
lines changed

pyomo/common/tee.py

Lines changed: 73 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import threading
2525
import time
2626

27+
from pyomo.common.errors import DeveloperError
2728
from pyomo.common.log import LoggingIntercept, LogStream
2829

2930
_poll_interval = 0.0001
@@ -34,8 +35,8 @@
3435
# polling timeout when waiting to close threads. This will bail on
3536
# closing threast after a minimum of 13.1 seconds and a worst case of
3637
# ~(13.1 * #threads) seconds
37-
_poll_timeout = 3 # 15 rounds: 0.0001 * 2**15 == 3.2768
38-
_poll_timeout_deadlock = 200 # 21 rounds: 0.0001 * 2**21 == 209.7152
38+
_poll_timeout = 1 # 14 rounds: 0.0001 * 2**14 == 1.6384
39+
_poll_timeout_deadlock = 100 # seconds
3940

4041
_noop = lambda: None
4142
_mswindows = sys.platform.startswith('win')
@@ -240,10 +241,11 @@ class capture_output(object):
240241
241242
"""
242243

244+
startup_shutdown = threading.Lock()
245+
243246
def __init__(self, output=None, capture_fd=False):
244-
if output is None:
245-
output = io.StringIO()
246247
self.output = output
248+
self.output_stream = None
247249
self.old = None
248250
self.tee = None
249251
self.capture_fd = capture_fd
@@ -285,13 +287,45 @@ def _exit_context_stack(self, et, ev, tb):
285287
return FAIL
286288

287289
def __enter__(self):
290+
if not capture_output.startup_shutdown.acquire(timeout=_poll_timeout_deadlock):
291+
# This situation *shouldn't* happen. If it does, it is
292+
# unlikely that the user can fix it (or even debug it).
293+
# Instead they should report it back to us.
294+
#
295+
# Breadcrumbs:
296+
#
297+
# - The last time we hit this [5/2025], it was because we
298+
# were using capture_output in a solver's __del__. This
299+
# led to the GC deleting the solver while another solver
300+
# was trying to start up / run (so the other solver held
301+
# the lock, but the GC interrupted that thread and
302+
# wouldn't let go).
303+
raise DeveloperError("Deadlock starting capture_output")
304+
try:
305+
return self._enter_impl()
306+
finally:
307+
capture_output.startup_shutdown.release()
308+
309+
def __exit__(self, et, ev, tb):
310+
if not capture_output.startup_shutdown.acquire(timeout=_poll_timeout_deadlock):
311+
# See comments & breadcrumbs in __enter__() above.
312+
raise DeveloperError("Deadlock closing capture_output")
313+
try:
314+
return self._exit_impl(et, ev, tb)
315+
finally:
316+
capture_output.startup_shutdown.release()
317+
318+
def _enter_impl(self):
288319
self.old = (sys.stdout, sys.stderr)
289320
old_fd = []
290321
for stream in self.old:
291-
stream.flush()
292322
try:
293-
old_fd.append(stream.fileno())
294-
except (AttributeError, OSError):
323+
stream.flush()
324+
try:
325+
old_fd.append(stream.fileno())
326+
except (AttributeError, OSError):
327+
old_fd.append(None)
328+
except (ValueError, OSError):
295329
old_fd.append(None)
296330
try:
297331
# We have an issue where we are (very aggressively)
@@ -325,6 +359,8 @@ def __enter__(self):
325359

326360
if isinstance(self.output, str):
327361
self.output_stream = self._enter_context(open(self.output, 'w'))
362+
elif self.output is None:
363+
self.output_stream = io.StringIO()
328364
else:
329365
self.output_stream = self.output
330366
if isinstance(self.output, TeeStream):
@@ -399,26 +435,28 @@ def __enter__(self):
399435
buf = buf[0]
400436
return buf
401437

402-
def __exit__(self, et, ev, tb):
438+
def _exit_impl(self, et, ev, tb):
403439
# Check that we were nested correctly
404440
FAIL = []
405-
if self.tee._stdout is not None and self.tee.STDOUT is not sys.stdout:
406-
FAIL.append(
407-
'Captured output (%s) does not match sys.stdout (%s).'
408-
% (self.tee._stdout, sys.stdout)
409-
)
410-
if self.tee._stderr is not None and self.tee.STDERR is not sys.stderr:
411-
FAIL.append(
412-
'Captured output (%s) does not match sys.stderr (%s).'
413-
% (self.tee._stdout, sys.stdout)
414-
)
441+
if self.tee is not None:
442+
if self.tee._stdout is not None and self.tee.STDOUT is not sys.stdout:
443+
FAIL.append(
444+
'Captured output (%s) does not match sys.stdout (%s).'
445+
% (self.tee._stdout, sys.stdout)
446+
)
447+
if self.tee._stderr is not None and self.tee.STDERR is not sys.stderr:
448+
FAIL.append(
449+
'Captured output (%s) does not match sys.stderr (%s).'
450+
% (self.tee._stdout, sys.stdout)
451+
)
415452
# Exit all context managers. This includes
416453
# - Restore any file descriptors we commandeered
417454
# - Close / join the TeeStream
418455
# - Close any opened files
419456
FAIL.extend(self._exit_context_stack(et, ev, tb))
420-
sys.stdout, sys.stderr = self.old
421-
self.old = None
457+
if self.old is not None:
458+
sys.stdout, sys.stderr = self.old
459+
self.old = None
422460
self.tee = None
423461
self.output_stream = None
424462
if FAIL:
@@ -667,15 +705,18 @@ def close(self, in_exception=False):
667705

668706
# Join all stream processing threads
669707
_poll = _poll_interval
708+
_timeout = 0.0
670709
FAIL = False
671710
while True:
672711
for th in self._threads:
673712
th.join(_poll)
713+
_timeout += _poll
674714
self._threads[:] = [th for th in self._threads if th.is_alive()]
675715
if not self._threads:
676716
break
677-
_poll *= 2
678-
if _poll_timeout <= _poll < 2 * _poll_timeout:
717+
if _poll < _poll_timeout:
718+
_poll *= 2.0
719+
if _poll_timeout * 0.5 <= _poll < _poll_timeout:
679720
if in_exception:
680721
# We are already processing an exception: no reason
681722
# to trigger another, nor to deadlock for an
@@ -687,7 +728,7 @@ def close(self, in_exception=False):
687728
"Significant delay observed waiting to join reader "
688729
"threads, possible output stream deadlock"
689730
)
690-
elif _poll >= _poll_timeout_deadlock:
731+
elif _timeout > _poll_timeout_deadlock:
691732
logger.error("TeeStream: deadlock observed joining reader threads")
692733
# Defer raising the exception until after we have
693734
# cleaned things up
@@ -713,7 +754,8 @@ def __exit__(self, et, ev, tb):
713754
if not self._enter_count:
714755
raise RuntimeError("TeeStream: exiting a context that was not entered")
715756
self._enter_count -= 1
716-
self.close(et is not None)
757+
if not self._enter_count:
758+
self.close(et is not None)
717759

718760
def __del__(self):
719761
# Implement __del__ to guarantee that file descriptors are closed
@@ -785,19 +827,18 @@ def _mergedReader(self):
785827
if _mswindows:
786828
for handle in list(handles):
787829
try:
788-
if handle.flush:
789-
flush = True
790-
handle.flush = False
791830
pipe = get_osfhandle(handle.read_pipe)
792831
numAvail = PeekNamedPipe(pipe, 0)[1]
793832
if numAvail:
794833
result, new_data = ReadFile(pipe, numAvail, None)
795834
handle.decoder_buffer += new_data
796835
break
836+
elif handle.flush:
837+
break
797838
except:
798839
handles.remove(handle)
799840
new_data = '' # not None so the poll interval doesn't increase
800-
if new_data is None and not flush:
841+
if new_data is None and not handle.flush:
801842
# PeekNamedPipe is non-blocking; to avoid swamping
802843
# the core, sleep for a "short" amount of time
803844
time.sleep(_poll)
@@ -822,15 +863,14 @@ def _mergedReader(self):
822863
else:
823864
for handle in handles:
824865
if handle.flush:
825-
new_data = ''
826866
break
827867
else:
828-
new_data = None
829868
continue
830869

831-
if handle.flush:
832-
flush = True
833-
handle.flush = False
870+
if handle.flush:
871+
new_data = ''
872+
flush = True
873+
handle.flush = False
834874

835875
# At this point, we have new data sitting in the
836876
# handle.decoder_buffer

pyomo/common/tests/test_tee.py

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@
2020

2121
from io import StringIO, BytesIO
2222

23+
from pyomo.common.errors import DeveloperError
2324
from pyomo.common.log import LoggingIntercept, LogStream
24-
import pyomo.common.unittest as unittest
2525
from pyomo.common.tempfiles import TempfileManager
2626
import pyomo.common.tee as tee
27+
import pyomo.common.unittest as unittest
2728

2829

2930
class timestamper:
@@ -335,6 +336,17 @@ def test_duplicate_capture_output(self):
335336
finally:
336337
capture.reset()
337338

339+
def test_reset_capture_output_twice(self):
340+
capture = tee.capture_output()
341+
with capture as OUT1:
342+
print("test1")
343+
capture.reset()
344+
capture.reset()
345+
with capture as OUT2:
346+
print("test2")
347+
self.assertEqual(OUT1.getvalue(), "test1\n")
348+
self.assertEqual(OUT2.getvalue(), "test2\n")
349+
338350
def test_capture_output_logfile_string(self):
339351
with TempfileManager.new_context() as tempfile:
340352
logfile = tempfile.create_tempfile()
@@ -537,6 +549,14 @@ def test_no_fileno_stdout(self):
537549
with T:
538550
self.assertEqual(len(T.context_stack), 6)
539551

552+
def test_closed_stdout(self):
553+
with tee.capture_output() as T_outer:
554+
sys.stdout.close()
555+
with tee.capture_output() as T_inner:
556+
print("test")
557+
self.assertEqual(T_outer.getvalue(), "")
558+
self.assertEqual(T_inner.getvalue(), "test\n")
559+
540560
def test_capture_output_stack_error(self):
541561
OUT1 = StringIO()
542562
OUT2 = StringIO()
@@ -559,6 +579,31 @@ def test_capture_output_stack_error(self):
559579
sys.stdout, sys.stderr = old
560580
logging.getLogger('pyomo.common.tee').handlers.clear()
561581

582+
def test_atomic_deadlock(self):
583+
save_poll = tee._poll_timeout_deadlock
584+
tee._poll_timeout_deadlock = 0.01
585+
586+
co = tee.capture_output()
587+
try:
588+
tee.capture_output.startup_shutdown.acquire()
589+
with self.assertRaisesRegex(
590+
DeveloperError, "Deadlock starting capture_output"
591+
):
592+
with tee.capture_output():
593+
pass
594+
tee.capture_output.startup_shutdown.release()
595+
596+
with self.assertRaisesRegex(
597+
DeveloperError, "Deadlock closing capture_output"
598+
):
599+
with co:
600+
tee.capture_output.startup_shutdown.acquire()
601+
finally:
602+
tee._poll_timeout_deadlock = save_poll
603+
if tee.capture_output.startup_shutdown.locked():
604+
tee.capture_output.startup_shutdown.release()
605+
co.reset()
606+
562607
def test_capture_output_invalid_ostream(self):
563608
# Test that capture_output does not suppress errors from the tee
564609
# module

0 commit comments

Comments
 (0)