Skip to content
This repository was archived by the owner on May 29, 2024. It is now read-only.

Added mark for sequenced execution. Fixes #80 #107

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ pytest --tests-per-worker auto
pytest --workers 2 --tests-per-worker auto
```

## Non parallel runs

Use `@pytest.mark.sequence` to mark tests, that shouldn't run in parallel mode.
Use fixture `sequence` to mark fixtures or tests, that shouldn't run in parallel mode.

## Notice

Beginning with Python 3.8, forking behavior is forced on macOS at the expense of safety.
Expand Down
76 changes: 56 additions & 20 deletions pytest_parallel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,24 @@ def run(self):
pass


@pytest.fixture
def sequence():
"""
Fixture moves test to sequence execution.
Fixture can be used with another fixture.
"""
pass


@pytest.mark.trylast
def pytest_configure(config):
workers = parse_config(config, 'workers')
tests_per_worker = parse_config(config, 'tests_per_worker')
if not config.option.collectonly and (workers or tests_per_worker):
config.pluginmanager.register(ParallelRunner(config), 'parallelrunner')

config.addinivalue_line("markers", "sequence: mark non parallel tests")


class ThreadLocalEnviron(os._Environ):
def __init__(self, env):
Expand Down Expand Up @@ -189,6 +200,31 @@ def __init__(self, *args, **kwargs):
super(ThreadLocalFixtureDef, self).__init__(*args, **kwargs)


def print_info(workers, tests_per_worker, parallel_queue_size, sequence_queue_size):
if workers > 1:
worker_noun, process_noun = ('workers', 'processes')
else:
worker_noun, process_noun = ('worker', 'process')

if tests_per_worker > 1:
test_noun, thread_noun = ('tests', 'threads')
else:
test_noun, thread_noun = ('test', 'thread')

print(
'pytest-parallel: {} {} ({}), {} {} per worker ({})'.format(
workers, worker_noun, process_noun, tests_per_worker, test_noun, thread_noun,
)
)

if sequence_queue_size:
print(
'{} tests will run parallel, {} tests will run in sequence.'.format(
parallel_queue_size, sequence_queue_size,
)
)


class ParallelRunner(object):
def __init__(self, config):
self._config = config
Expand Down Expand Up @@ -257,22 +293,9 @@ def pytest_runtestloop(self, session):
raise ValueError(('tests_per_worker can only be '
'an integer or "auto"'))

if self.workers > 1:
worker_noun, process_noun = ('workers', 'processes')
else:
worker_noun, process_noun = ('worker', 'process')

if tests_per_worker > 1:
test_noun, thread_noun = ('tests', 'threads')
else:
test_noun, thread_noun = ('test', 'thread')

print('pytest-parallel: {} {} ({}), {} {} per worker ({})'
.format(self.workers, worker_noun, process_noun,
tests_per_worker, test_noun, thread_noun))

queue_cls = self._manager.Queue
queue = queue_cls()
parallel_queue = queue_cls()
sequence_queue = queue_cls()
errors = queue_cls()

# Reports about tests will be gathered from workerss
Expand All @@ -282,13 +305,20 @@ def pytest_runtestloop(self, session):
# This way, report generators like JUnitXML will work as expected.
self.responses_queue = queue_cls()

for i in range(len(session.items)):
queue.put(i)
for i, item in enumerate(session.items):
if "sequence" in [mark.name for mark in item.own_markers] or "sequence" in item._fixtureinfo.names_closure:
sequence_queue.put(i)
else:
parallel_queue.put(i)

print_info(self.workers, tests_per_worker, parallel_queue.qsize(), sequence_queue.qsize())

# Now we need to put stopping sentinels, so that worker
# processes will know, there is time to finish the work.
for i in range(self.workers * tests_per_worker):
queue.put('stop')
parallel_queue.put('stop')

sequence_queue.put('stop')

responses_processor = threading.Thread(
target=self.process_responses,
Expand All @@ -307,15 +337,21 @@ def wait_for_responses_processor():
# This flag will be changed after the worker's fork.
self._config.parallel_worker = False

args = (self._config, queue, session, tests_per_worker, errors)
args = (self._config, parallel_queue, session, tests_per_worker, errors)
for _ in range(self.workers):
process = Process(target=process_with_threads, args=args)
process.start()
processes.append(process)

[p.join() for p in processes]

queue.join()
parallel_queue.join()

thread_for_sequence = ThreadWorker(sequence_queue, session, errors)
thread_for_sequence.start()
thread_for_sequence.join()
sequence_queue.join()

wait_for_responses_processor()

if not errors.empty():
Expand Down
36 changes: 35 additions & 1 deletion tests/test_general.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def test_1():
raise Exception('Failed to load test file')
""")
result = testdir.runpytest(*cli_args)
result.assert_outcomes(error=1)
result.assert_outcomes(errors=1)
# Expect error code 2 (Interrupted), which is returned on collection error.
assert result.ret == 2

Expand All @@ -258,3 +258,37 @@ def test_collection_collectonly(testdir, cli_args):
])
result.assert_outcomes()
assert result.ret == 0


@pytest.mark.parametrize('cli_args', [
['--workers=4'],
['--tests-per-worker=4']
])
def test_sequenced_call(testdir, cli_args):
"""
Tests, that sequenced tests will be called in sequence.
"""
testdir.makepyfile("""
import pytest
import time

obj = {"a": None}

@pytest.mark.sequence
def test_1():
time.sleep(0.1)
assert obj["a"] == None
obj["a"] = "first"

@pytest.mark.sequence
def test_2():
assert obj["a"] == "first"
obj["a"] = "second"

@pytest.mark.sequence
def test_3():
assert obj["a"] == "second"
""")
result = testdir.runpytest(*cli_args)
result.assert_outcomes(passed=3)
assert result.ret == 0