From 0293c7d1976d0a999a74afb4f1bd7f51ee6b3f4c Mon Sep 17 00:00:00 2001 From: Diego Russo Date: Mon, 23 Sep 2024 17:18:01 +0100 Subject: [PATCH 1/6] Implement --timeout when running benchmarks If the benchmark execution is exceeding the timeout execution, pyperf exits with an error 124. This error can be caught by pyperformance or other tool and report it back to the user. --- doc/runner.rst | 4 ++++ pyperf/_manager.py | 15 ++++++++++----- pyperf/_runner.py | 4 ++++ pyperf/_utils.py | 34 +++++++++++++++++++++++++++++++--- pyperf/tests/test_runner.py | 18 ++++++++++++++++++ 5 files changed, 67 insertions(+), 8 deletions(-) diff --git a/doc/runner.rst b/doc/runner.rst index ff705f0b..da724864 100644 --- a/doc/runner.rst +++ b/doc/runner.rst @@ -98,6 +98,7 @@ Option:: --inherit-environ=VARS --copy-env --no-locale + --timeout TIMEOUT --track-memory --tracemalloc @@ -140,6 +141,9 @@ Option:: - ``LC_TELEPHONE`` - ``LC_TIME`` +* ``--timeout``: set a timeout in seconds for an execution of the benchmark. + If the benchmark execution times out, pyperf exits with error code 124. + There is no time out by default. * ``--tracemalloc``: Use the ``tracemalloc`` module to track Python memory allocation and get the peak of memory usage in metadata (``tracemalloc_peak``). The module is only available on Python 3.4 and newer. diff --git a/pyperf/_manager.py b/pyperf/_manager.py index d45ab8d0..195df2fb 100644 --- a/pyperf/_manager.py +++ b/pyperf/_manager.py @@ -69,6 +69,9 @@ def worker_cmd(self, calibrate_loops, calibrate_warmups, wpipe): if args.profile: cmd.extend(['--profile', args.profile]) + if args.timeout: + cmd.extend(['--timeout', str(args.timeout)]) + if args.hook: for hook in args.hook: cmd.extend(['--hook', hook]) @@ -83,7 +86,7 @@ def spawn_worker(self, calibrate_loops, calibrate_warmups): self.args.locale, self.args.copy_env) - rpipe, wpipe = create_pipe() + rpipe, wpipe = create_pipe(timeout=self.args.timeout) with rpipe: with wpipe: warg = wpipe.to_subprocess() @@ -102,10 +105,12 @@ def spawn_worker(self, calibrate_loops, calibrate_warmups): proc = subprocess.Popen(cmd, env=env, **kw) with popen_killer(proc): - with rpipe.open_text() as rfile: - bench_json = rfile.read() - - exitcode = proc.wait() + try: + bench_json = rpipe.read_text() + exitcode = proc.wait() + except TimeoutError as exc: + print(exc) + sys.exit(124) if exitcode: raise RuntimeError("%s failed with exit code %s" diff --git a/pyperf/_runner.py b/pyperf/_runner.py index c43b9e30..0cd79de0 100644 --- a/pyperf/_runner.py +++ b/pyperf/_runner.py @@ -183,6 +183,10 @@ def __init__(self, values=None, processes=None, 'value, used to calibrate the number of ' 'loops (default: %s)' % format_timedelta(min_time)) + parser.add_argument('--timeout', + help='Specify a timeout in seconds for a single ' + 'benchmark execution (default: disabled)', + type=strictly_positive) parser.add_argument('--worker', action='store_true', help='Worker process, run the benchmark.') parser.add_argument('--worker-task', type=positive_or_nul, metavar='TASK_ID', diff --git a/pyperf/_utils.py b/pyperf/_utils.py index e834089c..38325bc2 100644 --- a/pyperf/_utils.py +++ b/pyperf/_utils.py @@ -1,9 +1,11 @@ import contextlib import math import os +import select import statistics import sys import sysconfig +import time from shlex import quote as shell_quote # noqa from shutil import which @@ -286,8 +288,9 @@ def create_environ(inherit_environ, locale, copy_all): class _Pipe: _OPEN_MODE = "r" - def __init__(self, fd): + def __init__(self, fd, timeout=None): self._fd = fd + self._timeout = timeout self._file = None if MS_WINDOWS: self._handle = msvcrt.get_osfhandle(fd) @@ -317,9 +320,34 @@ def __exit__(self, *args): class ReadPipe(_Pipe): def open_text(self): file = open(self._fd, "r", encoding="utf8") + if self._timeout: + os.set_blocking(file.fileno(), False) self._file = file return file + def read_text(self): + with self.open_text() as rfile: + if self._timeout: + return self._read_text_timeout(rfile, self._timeout) + else: + return rfile.read() + + def _read_text_timeout(self, rfile, timeout): + start_time = time.time() + while True: + if time.time() - start_time > timeout: + raise TimeoutError(f"Timed out after {timeout} seconds") + ready, _, _ = select.select([rfile], [], [], timeout) + if ready: + data = rfile.read() + if data: + return data + else: + break + else: + pass + + class WritePipe(_Pipe): def to_subprocess(self): @@ -346,9 +374,9 @@ def open_text(self): return file -def create_pipe(): +def create_pipe(timeout=None): rfd, wfd = os.pipe() - rpipe = ReadPipe(rfd) + rpipe = ReadPipe(rfd, timeout) wpipe = WritePipe(wfd) return (rpipe, wpipe) diff --git a/pyperf/tests/test_runner.py b/pyperf/tests/test_runner.py index afc66d8d..50fd9a67 100644 --- a/pyperf/tests/test_runner.py +++ b/pyperf/tests/test_runner.py @@ -149,6 +149,24 @@ def test_pipe(self): self.assertEqual(bench_json, tests.benchmark_as_json(result.bench)) + def test_pipe_with_timeout(self): + rpipe, wpipe = create_pipe(timeout=1) + with rpipe: + with wpipe: + arg = wpipe.to_subprocess() + # Don't close the file descriptor, it is closed by + # the Runner class + wpipe._fd = None + + self.exec_runner('--pipe', str(arg), '--worker', '-l1', '-w1') + + # Mock the select to make the read pipeline not ready + with mock.patch('pyperf._utils.select.select', return_value=(False, False, False)): + with self.assertRaises(TimeoutError) as cm: + rpipe.read_text() + self.assertEqual(str(cm.exception), + 'Timed out after 1 seconds') + def test_json_exists(self): with tempfile.NamedTemporaryFile('wb+') as tmp: From 4e892b4ea89f9e02dbecea8ee6cfbff8fc9d05f7 Mon Sep 17 00:00:00 2001 From: Diego Russo Date: Tue, 24 Sep 2024 16:26:39 +0100 Subject: [PATCH 2/6] Address Victor's comments. --- pyperf/_manager.py | 4 +++- pyperf/_utils.py | 25 ++++++++++++------------- pyperf/tests/test_runner.py | 4 ++-- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/pyperf/_manager.py b/pyperf/_manager.py index 195df2fb..09077db6 100644 --- a/pyperf/_manager.py +++ b/pyperf/_manager.py @@ -7,6 +7,8 @@ from pyperf._utils import MS_WINDOWS, create_environ, create_pipe, popen_killer +EXIT_TIMEOUT = 60 + # Limit to 5 calibration processes # (10 if calibration is needed for loops and warmups) MAX_CALIBRATION = 5 @@ -107,7 +109,7 @@ def spawn_worker(self, calibrate_loops, calibrate_warmups): with popen_killer(proc): try: bench_json = rpipe.read_text() - exitcode = proc.wait() + exitcode = proc.wait(timeout=EXIT_TIMEOUT) except TimeoutError as exc: print(exc) sys.exit(124) diff --git a/pyperf/_utils.py b/pyperf/_utils.py index 38325bc2..3ee0b0d2 100644 --- a/pyperf/_utils.py +++ b/pyperf/_utils.py @@ -327,26 +327,25 @@ def open_text(self): def read_text(self): with self.open_text() as rfile: - if self._timeout: - return self._read_text_timeout(rfile, self._timeout) + if self._timeout is not None: + return self._read_text_timeout(rfile, self._timeout) else: return rfile.read() def _read_text_timeout(self, rfile, timeout): - start_time = time.time() + start_time = time.monotonic() + output = [] while True: - if time.time() - start_time > timeout: + if time.monotonic() - start_time > timeout: raise TimeoutError(f"Timed out after {timeout} seconds") ready, _, _ = select.select([rfile], [], [], timeout) - if ready: - data = rfile.read() - if data: - return data - else: - break - else: - pass - + if not ready: + continue + data = rfile.read(1024) + if not data: + break + output.append(data) + return "".join(output) class WritePipe(_Pipe): diff --git a/pyperf/tests/test_runner.py b/pyperf/tests/test_runner.py index 50fd9a67..38873b95 100644 --- a/pyperf/tests/test_runner.py +++ b/pyperf/tests/test_runner.py @@ -150,7 +150,7 @@ def test_pipe(self): tests.benchmark_as_json(result.bench)) def test_pipe_with_timeout(self): - rpipe, wpipe = create_pipe(timeout=1) + rpipe, wpipe = create_pipe(timeout=0.1) with rpipe: with wpipe: arg = wpipe.to_subprocess() @@ -165,7 +165,7 @@ def test_pipe_with_timeout(self): with self.assertRaises(TimeoutError) as cm: rpipe.read_text() self.assertEqual(str(cm.exception), - 'Timed out after 1 seconds') + 'Timed out after 0.1 seconds') def test_json_exists(self): with tempfile.NamedTemporaryFile('wb+') as tmp: From fdd3ce1a64535b4a254c8eac418efe82de6e4a18 Mon Sep 17 00:00:00 2001 From: Diego Russo Date: Wed, 25 Sep 2024 18:06:11 +0100 Subject: [PATCH 3/6] Address further comments. --- pyperf/_manager.py | 4 ++-- pyperf/_utils.py | 35 ++++++++++++++++++++--------------- pyperf/tests/test_runner.py | 17 +++++++++++++---- 3 files changed, 35 insertions(+), 21 deletions(-) diff --git a/pyperf/_manager.py b/pyperf/_manager.py index 09077db6..a55f7a5a 100644 --- a/pyperf/_manager.py +++ b/pyperf/_manager.py @@ -88,7 +88,7 @@ def spawn_worker(self, calibrate_loops, calibrate_warmups): self.args.locale, self.args.copy_env) - rpipe, wpipe = create_pipe(timeout=self.args.timeout) + rpipe, wpipe = create_pipe() with rpipe: with wpipe: warg = wpipe.to_subprocess() @@ -108,7 +108,7 @@ def spawn_worker(self, calibrate_loops, calibrate_warmups): with popen_killer(proc): try: - bench_json = rpipe.read_text() + bench_json = rpipe.read_text(timeout=self.args.timeout) exitcode = proc.wait(timeout=EXIT_TIMEOUT) except TimeoutError as exc: print(exc) diff --git a/pyperf/_utils.py b/pyperf/_utils.py index 3ee0b0d2..05e68ff9 100644 --- a/pyperf/_utils.py +++ b/pyperf/_utils.py @@ -288,9 +288,8 @@ def create_environ(inherit_environ, locale, copy_all): class _Pipe: _OPEN_MODE = "r" - def __init__(self, fd, timeout=None): + def __init__(self, fd): self._fd = fd - self._timeout = timeout self._file = None if MS_WINDOWS: self._handle = msvcrt.get_osfhandle(fd) @@ -320,32 +319,38 @@ def __exit__(self, *args): class ReadPipe(_Pipe): def open_text(self): file = open(self._fd, "r", encoding="utf8") - if self._timeout: - os.set_blocking(file.fileno(), False) self._file = file return file - def read_text(self): - with self.open_text() as rfile: - if self._timeout is not None: - return self._read_text_timeout(rfile, self._timeout) - else: + def read_text(self, timeout=None): + if timeout is not None: + return self._read_text_timeout(timeout) + else: + with self.open_text() as rfile: return rfile.read() - def _read_text_timeout(self, rfile, timeout): + def _read_text_timeout(self, timeout): + fd = self.fd + os.set_blocking(fd, False) + start_time = time.monotonic() output = [] while True: if time.monotonic() - start_time > timeout: raise TimeoutError(f"Timed out after {timeout} seconds") - ready, _, _ = select.select([rfile], [], [], timeout) + ready, _, _ = select.select([fd], [], [], timeout) if not ready: continue - data = rfile.read(1024) + try: + data = os.read(fd, 1024) + except BlockingIOError: + continue if not data: break output.append(data) - return "".join(output) + + data = b"".join(output) + return data.decode("utf8") class WritePipe(_Pipe): @@ -373,9 +378,9 @@ def open_text(self): return file -def create_pipe(timeout=None): +def create_pipe(): rfd, wfd = os.pipe() - rpipe = ReadPipe(rfd, timeout) + rpipe = ReadPipe(rfd) wpipe = WritePipe(wfd) return (rpipe, wpipe) diff --git a/pyperf/tests/test_runner.py b/pyperf/tests/test_runner.py index 38873b95..20da0935 100644 --- a/pyperf/tests/test_runner.py +++ b/pyperf/tests/test_runner.py @@ -150,7 +150,7 @@ def test_pipe(self): tests.benchmark_as_json(result.bench)) def test_pipe_with_timeout(self): - rpipe, wpipe = create_pipe(timeout=0.1) + rpipe, wpipe = create_pipe() with rpipe: with wpipe: arg = wpipe.to_subprocess() @@ -158,15 +158,24 @@ def test_pipe_with_timeout(self): # the Runner class wpipe._fd = None - self.exec_runner('--pipe', str(arg), '--worker', '-l1', '-w1') + result = self.exec_runner('--pipe', str(arg), + '--worker', '-l1', '-w1') # Mock the select to make the read pipeline not ready - with mock.patch('pyperf._utils.select.select', return_value=(False, False, False)): + with mock.patch('pyperf._utils.select.select', + return_value=(False, False, False)): with self.assertRaises(TimeoutError) as cm: - rpipe.read_text() + rpipe.read_text(timeout=0.1) self.assertEqual(str(cm.exception), 'Timed out after 0.1 seconds') + # Mock the select to make the read pipeline ready + with mock.patch('pyperf._utils.select.select', + return_value=(True, False, False)): + bench_json = rpipe.read_text(timeout=0.1) + self.assertEqual(bench_json, + tests.benchmark_as_json(result.bench)) + def test_json_exists(self): with tempfile.NamedTemporaryFile('wb+') as tmp: From 6f49f86f843ee78aca8f1371662066ae5f740036 Mon Sep 17 00:00:00 2001 From: Diego Russo Date: Wed, 25 Sep 2024 18:20:54 +0100 Subject: [PATCH 4/6] Set maxDiff = None --- pyperf/tests/test_runner.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pyperf/tests/test_runner.py b/pyperf/tests/test_runner.py index 20da0935..c8f706d7 100644 --- a/pyperf/tests/test_runner.py +++ b/pyperf/tests/test_runner.py @@ -26,6 +26,9 @@ def check_args(loops, a, b): class TestRunner(unittest.TestCase): + + maxDiff = None + def create_runner(self, args, **kwargs): # hack to be able to create multiple instances per process pyperf.Runner._created.clear() From 876783d0f105d854c2c93ef6ca62ce67c7504c34 Mon Sep 17 00:00:00 2001 From: Diego Russo Date: Thu, 26 Sep 2024 10:02:29 +0100 Subject: [PATCH 5/6] Fix test on Windows --- pyperf/tests/test_runner.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/pyperf/tests/test_runner.py b/pyperf/tests/test_runner.py index c8f706d7..df0b371b 100644 --- a/pyperf/tests/test_runner.py +++ b/pyperf/tests/test_runner.py @@ -26,9 +26,6 @@ def check_args(loops, a, b): class TestRunner(unittest.TestCase): - - maxDiff = None - def create_runner(self, args, **kwargs): # hack to be able to create multiple instances per process pyperf.Runner._created.clear() @@ -176,8 +173,8 @@ def test_pipe_with_timeout(self): with mock.patch('pyperf._utils.select.select', return_value=(True, False, False)): bench_json = rpipe.read_text(timeout=0.1) - self.assertEqual(bench_json, - tests.benchmark_as_json(result.bench)) + self.assertEqual(bench_json.rstrip(), + tests.benchmark_as_json(result.bench).rstrip()) def test_json_exists(self): with tempfile.NamedTemporaryFile('wb+') as tmp: From 0e202ddf7199139a28966e668ab6d8ef0aaed31b Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Thu, 26 Sep 2024 11:15:05 +0200 Subject: [PATCH 6/6] Update pyperf/tests/test_runner.py --- pyperf/tests/test_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyperf/tests/test_runner.py b/pyperf/tests/test_runner.py index df0b371b..154c220b 100644 --- a/pyperf/tests/test_runner.py +++ b/pyperf/tests/test_runner.py @@ -174,7 +174,7 @@ def test_pipe_with_timeout(self): return_value=(True, False, False)): bench_json = rpipe.read_text(timeout=0.1) self.assertEqual(bench_json.rstrip(), - tests.benchmark_as_json(result.bench).rstrip()) + tests.benchmark_as_json(result.bench).rstrip()) def test_json_exists(self): with tempfile.NamedTemporaryFile('wb+') as tmp: