From ea497a5bd3696b50b76c0a5bdb09f34c8bd1dbee Mon Sep 17 00:00:00 2001 From: Max Ryabinin Date: Sun, 14 Jul 2024 11:55:12 +0100 Subject: [PATCH 01/18] Parallelize and optimize tests --- .github/workflows/run-tests.yml | 6 +++--- requirements-dev.txt | 1 + tests/test_dht_experts.py | 5 +++++ tests/test_moe.py | 27 ++++++++++++++------------- tests/test_util_modules.py | 10 +++++----- 5 files changed, 28 insertions(+), 21 deletions(-) diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 74d778bd0..8d6f1a4fe 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -39,7 +39,7 @@ jobs: run: | cd tests export HIVEMIND_MEMORY_SHARING_STRATEGY=file_descriptor - pytest --durations=0 --durations-min=1.0 -v + pytest --durations=0 --durations-min=1.0 -v -n auto build_and_test_p2pd: runs-on: ubuntu-latest timeout-minutes: 10 @@ -70,7 +70,7 @@ jobs: run: | cd tests export HIVEMIND_MEMORY_SHARING_STRATEGY=file_descriptor - pytest -k "p2p" -v + pytest -k "p2p" -v -n auto codecov_in_develop_mode: runs-on: ubuntu-latest @@ -100,6 +100,6 @@ jobs: - name: Test run: | export HIVEMIND_MEMORY_SHARING_STRATEGY=file_descriptor - pytest --cov hivemind --cov-config=pyproject.toml -v tests + pytest --cov hivemind --cov-config=pyproject.toml -v -n auto tests - name: Upload coverage to Codecov uses: codecov/codecov-action@v3 diff --git a/requirements-dev.txt b/requirements-dev.txt index 8398751aa..6c63e416f 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -2,6 +2,7 @@ pytest==6.2.5 # see https://github.com/pytest-dev/pytest/issues/9621 pytest-forked pytest-asyncio==0.16.0 pytest-cov +pytest-xdist coverage==6.0.2 # see https://github.com/pytest-dev/pytest-cov/issues/520 tqdm scikit-learn diff --git a/tests/test_dht_experts.py b/tests/test_dht_experts.py index 0332a1a59..60e82d07c 100644 --- a/tests/test_dht_experts.py +++ b/tests/test_dht_experts.py @@ -46,6 +46,11 @@ def test_store_get_experts(n_peers=10): remaining_peer2 = random.choice([peer for peer in peers if peer.is_alive()]) assert all(declare_experts(remaining_peer1, ["new_expert.1"], expiration_time=get_dht_time() + 30)) assert get_experts(remaining_peer2, ["new_expert.1"])[0].peer_id == remaining_peer1.peer_id + + for peer in peers: + if peer.is_alive(): + peer.shutdown() + @pytest.mark.forked diff --git a/tests/test_moe.py b/tests/test_moe.py index f62c2159d..080d0d7ed 100644 --- a/tests/test_moe.py +++ b/tests/test_moe.py @@ -21,48 +21,49 @@ @pytest.mark.forked -def test_moe(): +def test_moe(batch_size=2, hid_dim=4): all_expert_uids = [ f"ffn.{np.random.randint(0, 3)}.{np.random.randint(0, 3)}.{np.random.randint(0, 3)}" for _ in range(10) ] with background_server( - expert_uids=all_expert_uids, device="cpu", expert_cls="ffn", num_handlers=1, hidden_dim=16 + expert_uids=all_expert_uids, device="cpu", expert_cls="ffn", num_handlers=1, hidden_dim=hid_dim ) as server_peer_info: dht = DHT(start=True, initial_peers=server_peer_info.addrs) - dmoe = RemoteMixtureOfExperts(in_features=16, grid_size=(4, 4, 4), dht=dht, k_best=3, uid_prefix="ffn.") + dmoe = RemoteMixtureOfExperts(in_features=hid_dim, grid_size=(4, 4, 4), dht=dht, k_best=3, uid_prefix="ffn.") for i in range(3): - out = dmoe(torch.randn(10, 16)) + out = dmoe(torch.randn(batch_size, hid_dim)) out.sum().backward() @pytest.mark.forked -def test_no_experts(): +def test_no_experts(batch_size=2, hid_dim=4): all_expert_uids = [ f"expert.{np.random.randint(0, 3)}.{np.random.randint(0, 3)}.{np.random.randint(0, 3)}" for _ in range(10) ] with background_server( - expert_uids=all_expert_uids, device="cpu", expert_cls="nop_delay", num_handlers=1, hidden_dim=16 + expert_uids=all_expert_uids, device="cpu", expert_cls="nop_delay", num_handlers=1, hidden_dim=hid_dim ) as server_peer_info: dht = DHT(start=True, initial_peers=server_peer_info.addrs) dmoe = RemoteSwitchMixtureOfExperts( - in_features=16, + in_features=hid_dim, grid_size=(4, 4, 4), dht=dht, uid_prefix="expert.", - forward_timeout=0.1, - backward_timeout=0.1, + forward_timeout=0.01, + backward_timeout=0.01, allow_zero_outputs=True, ) for i in range(3): - out, balancing_loss = dmoe(torch.randn(10, 16)) + out, balancing_loss = dmoe(torch.randn(batch_size, hid_dim)) out.sum().backward() + dht.shutdown() @pytest.mark.forked -def test_call_many(hidden_dim=16): +def test_call_many(hidden_dim=4): k_min = 1 timeout_after_k_min = None backward_k_min = 1 @@ -88,7 +89,7 @@ def test_call_many(hidden_dim=16): [ExpertInfo(uid=f"expert.{i}", peer_id=server_peer_info.peer_id) for i in range(5)], dht, ) - e5 = RemoteExpert(ExpertInfo(f"thisshouldnotexist", server_peer_info), None) + e5 = RemoteExpert(ExpertInfo("thisshouldnotexist", server_peer_info), None) mask, expert_outputs = _RemoteCallMany.apply( DUMMY, @@ -133,7 +134,7 @@ def test_call_many(hidden_dim=16): @pytest.mark.forked -def test_remote_module_call(hidden_dim=16): +def test_remote_module_call(hidden_dim=4): with background_server( num_experts=1, device="cpu", diff --git a/tests/test_util_modules.py b/tests/test_util_modules.py index f245b777e..f65055889 100644 --- a/tests/test_util_modules.py +++ b/tests/test_util_modules.py @@ -3,7 +3,7 @@ import multiprocessing as mp import random import time -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor, as_completed import numpy as np import pytest @@ -524,9 +524,9 @@ async def test_async_context_flooding(): async def coro(): async with enter_asynchronously(lock1): - await asyncio.sleep(1e-2) + await asyncio.sleep(1e-6) async with enter_asynchronously(lock2): - await asyncio.sleep(1e-2) + await asyncio.sleep(1e-6) num_coros = max(33, mp.cpu_count() * 5 + 1) await asyncio.wait({asyncio.create_task(coro()) for _ in range(num_coros)}) @@ -565,8 +565,8 @@ def run_task(ema): with ThreadPoolExecutor(max_workers) as pool: ema = PerformanceEMA(alpha=alpha) start_time = time.perf_counter() - futures = [pool.submit(run_task, ema) for i in range(num_updates)] - total_size = sum(future.result() for future in futures) + futures = [pool.submit(run_task, ema) for _ in range(num_updates)] + total_size = sum(future.result() for future in as_completed(futures)) end_time = time.perf_counter() target = total_size / (end_time - start_time) assert ema.samples_per_second >= (1 - tolerance) * target * max_workers ** (bias_power - 1) From 685adeda0cb1bd8be1eb5f9fb5ea7c86c6573c05 Mon Sep 17 00:00:00 2001 From: Max Ryabinin Date: Sun, 14 Jul 2024 15:25:32 +0100 Subject: [PATCH 02/18] Optimize and harden additional tests --- tests/test_cli_scripts.py | 2 +- tests/test_moe.py | 11 +++++---- tests/test_optimizer.py | 49 +++++++++++++++++++++++--------------- tests/test_util_modules.py | 19 +++++++++++---- 4 files changed, 52 insertions(+), 29 deletions(-) diff --git a/tests/test_cli_scripts.py b/tests/test_cli_scripts.py index 97c674000..02084e0ed 100644 --- a/tests/test_cli_scripts.py +++ b/tests/test_cli_scripts.py @@ -7,7 +7,7 @@ def test_dht_connection_successful(): - dht_refresh_period = 1 + dht_refresh_period = 3 cloned_env = os.environ.copy() # overriding the loglevel to prevent debug print statements diff --git a/tests/test_moe.py b/tests/test_moe.py index 080d0d7ed..7fc735843 100644 --- a/tests/test_moe.py +++ b/tests/test_moe.py @@ -316,9 +316,9 @@ def test_client_anomaly_detection(): server.shutdown() -def _measure_coro_running_time(n_coros, elapsed_fut, counter): +def _measure_coro_running_time(n_coros, elapsed_fut, counter, coroutine_time): async def coro(): - await asyncio.sleep(0.1) + await asyncio.sleep(coroutine_time) counter.value += 1 try: @@ -337,20 +337,21 @@ async def coro(): @pytest.mark.forked -def test_remote_expert_worker_runs_coros_concurrently(n_processes=4, n_coros=10): +def test_remote_expert_worker_runs_coros_concurrently(n_processes=4, n_coros=10, coroutine_time=0.1): processes = [] counter = mp.Value(ctypes.c_int64) for i in range(n_processes): elapsed_fut = MPFuture() factory = threading.Thread if i % 2 == 0 else mp.Process # Test both threads and processes - proc = factory(target=_measure_coro_running_time, args=(n_coros, elapsed_fut, counter)) + proc = factory(target=_measure_coro_running_time, args=(n_coros, elapsed_fut, counter, coroutine_time)) proc.start() processes.append((proc, elapsed_fut)) for proc, elapsed_fut in processes: # Ensure that the coroutines were run concurrently, not sequentially - assert elapsed_fut.result() < 0.2 + expected_time = coroutine_time * 3 # from non-blocking calls + blocking call + some overhead + assert elapsed_fut.result() < expected_time proc.join() assert counter.value == n_processes * n_coros # Ensure all couroutines have finished diff --git a/tests/test_optimizer.py b/tests/test_optimizer.py index c859e3879..b37ade86a 100644 --- a/tests/test_optimizer.py +++ b/tests/test_optimizer.py @@ -2,12 +2,14 @@ import multiprocessing as mp import time from functools import partial +from typing import List import numpy as np import pytest import torch import torch.nn as nn import torch.nn.functional as F +from multiaddr import Multiaddr import hivemind from hivemind.averaging.control import AveragingStage @@ -227,8 +229,10 @@ def test_progress_tracker(): finished_evt = mp.Event() emas = mp.Array(ctypes.c_double, 5) - def run_worker(index: int, batch_size: int, period: float, **kwargs): - dht = hivemind.DHT(initial_peers=dht_root.get_visible_maddrs(), start=True) + root_maddrs = dht_root.get_visible_maddrs() + + def run_worker(index: int, batch_size: int, step_time: float, initial_peers: List[Multiaddr]): + dht = hivemind.DHT(initial_peers=initial_peers, start=True) tracker = ProgressTracker( dht, prefix, @@ -238,18 +242,17 @@ def run_worker(index: int, batch_size: int, period: float, **kwargs): default_refresh_period=0.2, max_refresh_period=0.5, private_key=RSAPrivateKey(), - **kwargs, ) + with tracker.pause_updates(): + barrier.wait() + if index == 4: + delayed_start_evt.wait() - barrier.wait() - if index == 4: - delayed_start_evt.wait() - - local_epoch = 2 if index == 4 else 0 - samples_accumulated = 0 + local_epoch = 2 if index == 4 else 0 + samples_accumulated = 0 while True: - time.sleep(period) + time.sleep(step_time) if finished_evt.is_set(): break @@ -270,10 +273,10 @@ def run_worker(index: int, batch_size: int, period: float, **kwargs): dht.shutdown() workers = [ - mp.Process(target=run_worker, kwargs=dict(index=1, batch_size=12, period=0.6)), - mp.Process(target=run_worker, kwargs=dict(index=2, batch_size=16, period=0.5)), - mp.Process(target=run_worker, kwargs=dict(index=3, batch_size=24, period=0.4)), - mp.Process(target=run_worker, kwargs=dict(index=4, batch_size=64, period=0.4)), + mp.Process(target=run_worker, kwargs=dict(index=1, batch_size=12, step_time=0.6, initial_peers=root_maddrs)), + mp.Process(target=run_worker, kwargs=dict(index=2, batch_size=16, step_time=0.5, initial_peers=root_maddrs)), + mp.Process(target=run_worker, kwargs=dict(index=3, batch_size=24, step_time=0.2, initial_peers=root_maddrs)), + mp.Process(target=run_worker, kwargs=dict(index=4, batch_size=64, step_time=0.2, initial_peers=root_maddrs)), ] for worker in workers: worker.start() @@ -336,7 +339,7 @@ def run_worker(index: int, batch_size: int, period: float, **kwargs): (False, True, True, True, True), (False, True, True, False, True), (True, False, False, False, False), - (True, True, False, False, False,), + (True, True, False, False, False), ], # fmt: on ) @@ -359,6 +362,8 @@ def test_optimizer( def _test_optimizer( num_peers: int = 1, num_clients: int = 0, + default_batch_size: int = 4, + default_batch_time: int = 0.1, target_batch_size: int = 32, total_epochs: int = 3, use_local_updates: bool = False, @@ -422,20 +427,21 @@ def run_trainer(batch_size: int, batch_time: float, client_mode: bool): prev_time = time.perf_counter() - time.sleep(1.0) optimizer.shutdown() return optimizer peers = [] for index in range(num_peers): + peer_batch_size = default_batch_size + index + peer_batch_time = default_batch_time + 0.01 * index peers.append( mp.Process( target=run_trainer, name=f"trainer-{index}", kwargs=dict( - batch_size=4 + index, - batch_time=0.3 + 0.2 * index, + batch_size=peer_batch_size, + batch_time=peer_batch_time, client_mode=(index >= num_peers - num_clients), ), ) @@ -451,7 +457,12 @@ def run_trainer(batch_size: int, batch_time: float, client_mode: bool): assert optimizer.local_epoch == optimizer.tracker.global_epoch == total_epochs expected_samples_accumulated = target_batch_size * total_epochs assert expected_samples_accumulated <= total_samples_accumulated.value <= expected_samples_accumulated * 1.2 - assert 4 / 0.3 * 0.8 <= optimizer.tracker.performance_ema.samples_per_second <= 4 / 0.3 * 1.2 + expected_performance = default_batch_size / default_batch_time + assert ( + expected_performance * 0.8 + <= optimizer.tracker.performance_ema.samples_per_second + <= expected_performance * 1.2 + ) assert not optimizer.state_averager.is_alive() assert not optimizer.tracker.is_alive() diff --git a/tests/test_util_modules.py b/tests/test_util_modules.py index f65055889..bde2ef006 100644 --- a/tests/test_util_modules.py +++ b/tests/test_util_modules.py @@ -4,6 +4,7 @@ import random import time from concurrent.futures import ThreadPoolExecutor, as_completed +from threading import Event import numpy as np import pytest @@ -266,9 +267,10 @@ def _check_result_and_set(future): with pytest.raises(RuntimeError): future1.add_done_callback(lambda future: (1, 2, 3)) + events[0].wait() assert future1.done() and not future1.cancelled() assert future2.done() and future2.cancelled() - for i in 0, 1, 4: + for i in 1, 4: events[i].wait(1) assert events[0].is_set() and events[1].is_set() and events[2].is_set() and events[4].is_set() assert not events[3].is_set() @@ -556,16 +558,25 @@ def test_performance_ema_threadsafe( bias_power: float = 0.7, tolerance: float = 0.05, ): - def run_task(ema): - task_size = random.randint(1, 4) + def run_task(ema, start_event, task_size): + start_event.wait() with ema.update_threadsafe(task_size): time.sleep(task_size * interval * (0.9 + 0.2 * random.random())) return task_size with ThreadPoolExecutor(max_workers) as pool: ema = PerformanceEMA(alpha=alpha) + start_event = Event() start_time = time.perf_counter() - futures = [pool.submit(run_task, ema) for _ in range(num_updates)] + + futures = [] + for _ in range(num_updates): + task_size = random.randint(1, 4) + future = pool.submit(run_task, ema, start_event, task_size) + futures.append(future) + + ema.reset_timer() + start_event.set() total_size = sum(future.result() for future in as_completed(futures)) end_time = time.perf_counter() target = total_size / (end_time - start_time) From 549f3e5b83db39b588da2d7b4c6f726d3d3990be Mon Sep 17 00:00:00 2001 From: Max Ryabinin Date: Sun, 14 Jul 2024 15:27:51 +0100 Subject: [PATCH 03/18] Fix formatting --- tests/test_dht_experts.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/test_dht_experts.py b/tests/test_dht_experts.py index 60e82d07c..4210ca6ff 100644 --- a/tests/test_dht_experts.py +++ b/tests/test_dht_experts.py @@ -46,11 +46,10 @@ def test_store_get_experts(n_peers=10): remaining_peer2 = random.choice([peer for peer in peers if peer.is_alive()]) assert all(declare_experts(remaining_peer1, ["new_expert.1"], expiration_time=get_dht_time() + 30)) assert get_experts(remaining_peer2, ["new_expert.1"])[0].peer_id == remaining_peer1.peer_id - + for peer in peers: if peer.is_alive(): peer.shutdown() - @pytest.mark.forked From b1ad99cca610237b6d1cf86b82b66b8bd83d70a7 Mon Sep 17 00:00:00 2001 From: Max Ryabinin Date: Sun, 14 Jul 2024 15:40:40 +0100 Subject: [PATCH 04/18] Revert changes to test_progress_tracker --- tests/test_optimizer.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/tests/test_optimizer.py b/tests/test_optimizer.py index b37ade86a..b4ae501a3 100644 --- a/tests/test_optimizer.py +++ b/tests/test_optimizer.py @@ -241,15 +241,13 @@ def run_worker(index: int, batch_size: int, step_time: float, initial_peers: Lis min_refresh_period=0.1, default_refresh_period=0.2, max_refresh_period=0.5, - private_key=RSAPrivateKey(), ) - with tracker.pause_updates(): - barrier.wait() - if index == 4: - delayed_start_evt.wait() + barrier.wait() + if index == 4: + delayed_start_evt.wait() - local_epoch = 2 if index == 4 else 0 - samples_accumulated = 0 + local_epoch = 2 if index == 4 else 0 + samples_accumulated = 0 while True: time.sleep(step_time) From 558891e84a1392dfe8e215bf46fc07ff2811891e Mon Sep 17 00:00:00 2001 From: Max Ryabinin Date: Sun, 14 Jul 2024 16:35:26 +0100 Subject: [PATCH 05/18] Increase polling resolution for test_progress_tracker --- tests/test_optimizer.py | 45 ++++++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/tests/test_optimizer.py b/tests/test_optimizer.py index b4ae501a3..fa7e3eeca 100644 --- a/tests/test_optimizer.py +++ b/tests/test_optimizer.py @@ -238,16 +238,17 @@ def run_worker(index: int, batch_size: int, step_time: float, initial_peers: Lis prefix, target_batch_size, start=True, - min_refresh_period=0.1, - default_refresh_period=0.2, - max_refresh_period=0.5, + min_refresh_period=0.01, + default_refresh_period=0.02, + max_refresh_period=0.05, ) - barrier.wait() - if index == 4: - delayed_start_evt.wait() + with tracker.pause_updates(): + barrier.wait() + if index == 4: + delayed_start_evt.wait() - local_epoch = 2 if index == 4 else 0 - samples_accumulated = 0 + local_epoch = 2 if index == 4 else 0 + samples_accumulated = 0 while True: time.sleep(step_time) @@ -270,23 +271,29 @@ def run_worker(index: int, batch_size: int, step_time: float, initial_peers: Lis tracker.shutdown() dht.shutdown() - workers = [ - mp.Process(target=run_worker, kwargs=dict(index=1, batch_size=12, step_time=0.6, initial_peers=root_maddrs)), - mp.Process(target=run_worker, kwargs=dict(index=2, batch_size=16, step_time=0.5, initial_peers=root_maddrs)), - mp.Process(target=run_worker, kwargs=dict(index=3, batch_size=24, step_time=0.2, initial_peers=root_maddrs)), - mp.Process(target=run_worker, kwargs=dict(index=4, batch_size=64, step_time=0.2, initial_peers=root_maddrs)), - ] - for worker in workers: + worker_batch_sizes = [12, 16, 24, 64] + worker_step_times = [0.6, 0.5, 0.2, 0.2] + + workers = [] + for i, (peer_batch_size, peer_step_time) in enumerate(zip(worker_batch_sizes, worker_step_times), start=1): + peer_kwargs = { + "index": i, + "batch_size": peer_batch_size, + "step_time": peer_step_time, + "initial_peers": root_maddrs, + } + worker = mp.Process(target=run_worker, kwargs=peer_kwargs) worker.start() + workers.append(worker) tracker = ProgressTracker( dht_root, prefix, target_batch_size, start=True, - min_refresh_period=0.1, - default_refresh_period=0.2, - max_refresh_period=0.5, + min_refresh_period=0.01, + default_refresh_period=0.02, + max_refresh_period=0.05, ) barrier.wait() @@ -295,7 +302,7 @@ def run_worker(index: int, batch_size: int, step_time: float, initial_peers: Lis step_time_deltas = [] while local_epoch < 6: - time.sleep(0.1) + time.sleep(0.01) if tracker.ready_to_update_epoch: with tracker.pause_updates(): From cccd904a9cddd0ec7c052e6b7cbfd3fa1d6008f3 Mon Sep 17 00:00:00 2001 From: Max Ryabinin Date: Sun, 14 Jul 2024 16:43:30 +0100 Subject: [PATCH 06/18] Revert step times --- tests/test_optimizer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_optimizer.py b/tests/test_optimizer.py index fa7e3eeca..eca182d81 100644 --- a/tests/test_optimizer.py +++ b/tests/test_optimizer.py @@ -272,7 +272,7 @@ def run_worker(index: int, batch_size: int, step_time: float, initial_peers: Lis dht.shutdown() worker_batch_sizes = [12, 16, 24, 64] - worker_step_times = [0.6, 0.5, 0.2, 0.2] + worker_step_times = [0.6, 0.5, 0.4, 0.4] workers = [] for i, (peer_batch_size, peer_step_time) in enumerate(zip(worker_batch_sizes, worker_step_times), start=1): From 8d4962ab380377546819524990a459a230e2e83a Mon Sep 17 00:00:00 2001 From: Max Ryabinin Date: Sun, 14 Jul 2024 17:37:29 +0100 Subject: [PATCH 07/18] Update timeouts in test_averaging --- tests/test_averaging.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_averaging.py b/tests/test_averaging.py index 1059e321b..732a329ce 100644 --- a/tests/test_averaging.py +++ b/tests/test_averaging.py @@ -82,7 +82,7 @@ def _test_allreduce_once(n_clients, n_aux): tensors, dht=dht, target_group_size=4, - min_matchmaking_time=15, + min_matchmaking_time=30, prefix="mygroup", client_mode=mode == AveragingMode.CLIENT, auxiliary=mode == AveragingMode.AUX, @@ -139,7 +139,7 @@ def test_allreduce_weighted(n_client_mode_peers: int = 2): tensors, dht=dht, target_group_size=4, - min_matchmaking_time=15, + min_matchmaking_time=30, prefix="mygroup", client_mode=client_mode, start=True, @@ -225,7 +225,7 @@ def test_allgather(n_averagers=8, target_group_size=4): [torch.ones(1)], dht=dht, target_group_size=target_group_size, - min_matchmaking_time=15, + min_matchmaking_time=30, prefix="mygroup", initial_group_bits="000", start=True, From 9c4a36f2931e3fe73e1ae77d1e91224881cf561c Mon Sep 17 00:00:00 2001 From: Max Ryabinin Date: Sun, 14 Jul 2024 17:37:48 +0100 Subject: [PATCH 08/18] Change the position of start_time in test_performance_ema_threadsafe --- tests/test_util_modules.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_util_modules.py b/tests/test_util_modules.py index bde2ef006..f4371372f 100644 --- a/tests/test_util_modules.py +++ b/tests/test_util_modules.py @@ -567,7 +567,6 @@ def run_task(ema, start_event, task_size): with ThreadPoolExecutor(max_workers) as pool: ema = PerformanceEMA(alpha=alpha) start_event = Event() - start_time = time.perf_counter() futures = [] for _ in range(num_updates): @@ -577,6 +576,7 @@ def run_task(ema, start_event, task_size): ema.reset_timer() start_event.set() + start_time = time.perf_counter() total_size = sum(future.result() for future in as_completed(futures)) end_time = time.perf_counter() target = total_size / (end_time - start_time) From b37eac38dc318e4e52c5bbcc305ec102b3c9b99f Mon Sep 17 00:00:00 2001 From: Max Ryabinin Date: Sun, 14 Jul 2024 17:49:45 +0100 Subject: [PATCH 09/18] Add torch.manual_seed for test_fault_tolerance --- tests/test_allreduce_fault_tolerance.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/test_allreduce_fault_tolerance.py b/tests/test_allreduce_fault_tolerance.py index 12e310eba..a26f86e5e 100644 --- a/tests/test_allreduce_fault_tolerance.py +++ b/tests/test_allreduce_fault_tolerance.py @@ -1,16 +1,18 @@ from __future__ import annotations +import asyncio from enum import Enum, auto import pytest +import torch import hivemind -from hivemind.averaging.averager import * +from hivemind.averaging.averager import AllReduceRunner, AveragingMode, GatheredData from hivemind.averaging.group_info import GroupInfo from hivemind.averaging.load_balancing import load_balance_peers from hivemind.averaging.matchmaking import MatchmakingException from hivemind.proto import averaging_pb2 -from hivemind.utils.asyncio import aenumerate, as_aiter, azip, enter_asynchronously +from hivemind.utils.asyncio import AsyncIterator, aenumerate, as_aiter, azip, enter_asynchronously from hivemind.utils.logging import get_logger logger = get_logger(__name__) @@ -138,6 +140,8 @@ async def _generate_input_for_peer(self, peer_index: int) -> AsyncIterator[avera ], ) def test_fault_tolerance(fault0: Fault, fault1: Fault): + torch.manual_seed(0) + def _make_tensors(): return [torch.rand(16, 1024), -torch.rand(3, 8192), 2 * torch.randn(4, 4, 4), torch.randn(1024, 1024)] From 5d8859b94211feb03f6103f485ef45e0c2762120 Mon Sep 17 00:00:00 2001 From: Max Ryabinin Date: Sun, 14 Jul 2024 18:47:40 +0100 Subject: [PATCH 10/18] Disable pytest-xdist --- .github/workflows/run-tests.yml | 6 +++--- requirements-dev.txt | 1 - 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 8d6f1a4fe..74d778bd0 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -39,7 +39,7 @@ jobs: run: | cd tests export HIVEMIND_MEMORY_SHARING_STRATEGY=file_descriptor - pytest --durations=0 --durations-min=1.0 -v -n auto + pytest --durations=0 --durations-min=1.0 -v build_and_test_p2pd: runs-on: ubuntu-latest timeout-minutes: 10 @@ -70,7 +70,7 @@ jobs: run: | cd tests export HIVEMIND_MEMORY_SHARING_STRATEGY=file_descriptor - pytest -k "p2p" -v -n auto + pytest -k "p2p" -v codecov_in_develop_mode: runs-on: ubuntu-latest @@ -100,6 +100,6 @@ jobs: - name: Test run: | export HIVEMIND_MEMORY_SHARING_STRATEGY=file_descriptor - pytest --cov hivemind --cov-config=pyproject.toml -v -n auto tests + pytest --cov hivemind --cov-config=pyproject.toml -v tests - name: Upload coverage to Codecov uses: codecov/codecov-action@v3 diff --git a/requirements-dev.txt b/requirements-dev.txt index 6c63e416f..8398751aa 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -2,7 +2,6 @@ pytest==6.2.5 # see https://github.com/pytest-dev/pytest/issues/9621 pytest-forked pytest-asyncio==0.16.0 pytest-cov -pytest-xdist coverage==6.0.2 # see https://github.com/pytest-dev/pytest-cov/issues/520 tqdm scikit-learn From d654ff9c7de52f813fb0504cd1cb02191af8af58 Mon Sep 17 00:00:00 2001 From: Max Ryabinin Date: Sat, 19 Oct 2024 11:39:47 +0200 Subject: [PATCH 11/18] Run tests in parallel --- .github/workflows/run-tests.yml | 2 +- requirements-dev.txt | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 74d778bd0..f3fcfe1c5 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -39,7 +39,7 @@ jobs: run: | cd tests export HIVEMIND_MEMORY_SHARING_STRATEGY=file_descriptor - pytest --durations=0 --durations-min=1.0 -v + pytest --durations=0 --durations-min=1.0 -n auto -v build_and_test_p2pd: runs-on: ubuntu-latest timeout-minutes: 10 diff --git a/requirements-dev.txt b/requirements-dev.txt index 8398751aa..04030bfb9 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -2,6 +2,7 @@ pytest==6.2.5 # see https://github.com/pytest-dev/pytest/issues/9621 pytest-forked pytest-asyncio==0.16.0 pytest-cov +pytest-xdist>=3.0.2 coverage==6.0.2 # see https://github.com/pytest-dev/pytest-cov/issues/520 tqdm scikit-learn From 647224d2f5b12c1441f875b865dc5d4c61d492d1 Mon Sep 17 00:00:00 2001 From: Max Ryabinin Date: Sat, 19 Oct 2024 12:03:43 +0200 Subject: [PATCH 12/18] Restrict Protobuf version --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 5a39ffbcd..f32fc94c8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,7 +7,7 @@ msgpack>=0.5.6 sortedcontainers uvloop>=0.14.0 grpcio-tools>=1.33.2 -protobuf>=3.12.2 +protobuf>=3.12.2,<5.28.0 configargparse>=1.2.3 py-multihash>=0.2.3 multiaddr @ git+https://github.com/multiformats/py-multiaddr.git@e01dbd38f2c0464c0f78b556691d655265018cce From d789ff6e35c53a74755f695628f4a24b84da2f2b Mon Sep 17 00:00:00 2001 From: Max Ryabinin Date: Sat, 19 Oct 2024 12:13:24 +0200 Subject: [PATCH 13/18] Increase timeouts in test_fault_tolerance --- tests/test_allreduce_fault_tolerance.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_allreduce_fault_tolerance.py b/tests/test_allreduce_fault_tolerance.py index a26f86e5e..c2dcd5594 100644 --- a/tests/test_allreduce_fault_tolerance.py +++ b/tests/test_allreduce_fault_tolerance.py @@ -153,10 +153,10 @@ def _make_tensors(): _make_tensors(), hivemind.DHT(initial_peers=dht.get_visible_maddrs(), start=True), prefix="test", - request_timeout=0.3, + request_timeout=1.0, min_matchmaking_time=1.0, next_chunk_timeout=0.5, - allreduce_timeout=5, + allreduce_timeout=10, part_size_bytes=2**16, client_mode=(i == 1), start=True, From f104cd6be1ccc33b44f14b4b46ee919323637433 Mon Sep 17 00:00:00 2001 From: Max Ryabinin Date: Sat, 19 Oct 2024 12:22:50 +0200 Subject: [PATCH 14/18] Increase timeouts and number of processes --- .github/workflows/run-tests.yml | 2 +- tests/test_allreduce_fault_tolerance.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index f3fcfe1c5..1223a4121 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -39,7 +39,7 @@ jobs: run: | cd tests export HIVEMIND_MEMORY_SHARING_STRATEGY=file_descriptor - pytest --durations=0 --durations-min=1.0 -n auto -v + pytest --durations=0 --durations-min=1.0 -n 4 -v build_and_test_p2pd: runs-on: ubuntu-latest timeout-minutes: 10 diff --git a/tests/test_allreduce_fault_tolerance.py b/tests/test_allreduce_fault_tolerance.py index c2dcd5594..e122e9b01 100644 --- a/tests/test_allreduce_fault_tolerance.py +++ b/tests/test_allreduce_fault_tolerance.py @@ -153,10 +153,10 @@ def _make_tensors(): _make_tensors(), hivemind.DHT(initial_peers=dht.get_visible_maddrs(), start=True), prefix="test", - request_timeout=1.0, - min_matchmaking_time=1.0, - next_chunk_timeout=0.5, - allreduce_timeout=10, + request_timeout=1.5, + min_matchmaking_time=2.0, + next_chunk_timeout=1.0, + allreduce_timeout=15, part_size_bytes=2**16, client_mode=(i == 1), start=True, From 80a29eb28c3c5c90d3f90b249f0e3a0cb4054c0a Mon Sep 17 00:00:00 2001 From: Max Ryabinin Date: Sat, 19 Oct 2024 12:31:28 +0200 Subject: [PATCH 15/18] Decrease the lower bound for test_progress_tracker --- tests/test_optimizer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_optimizer.py b/tests/test_optimizer.py index eca182d81..9c91fbd11 100644 --- a/tests/test_optimizer.py +++ b/tests/test_optimizer.py @@ -327,7 +327,7 @@ def run_worker(index: int, batch_size: int, step_time: float, initial_peers: Lis for i in (0, 1, 5): # Without the 4th worker (the fastest one) assert 1.05 * mean_step_time < step_time_deltas[i] < 2.0 * mean_step_time for i in (2, 3, 4): # With the 4th worker - assert 0.5 * mean_step_time < step_time_deltas[i] < 0.95 * mean_step_time + assert 0.3 * mean_step_time < step_time_deltas[i] < 0.95 * mean_step_time assert emas[1] < emas[2] < emas[3] < emas[4] assert tracker.performance_ema.samples_per_second < 1e-9 From aee46bdf5630bd96cf5a06690fa390d3f914c3ed Mon Sep 17 00:00:00 2001 From: Max Ryabinin Date: Sat, 19 Oct 2024 12:43:28 +0200 Subject: [PATCH 16/18] Increase timeouts --- .github/workflows/run-tests.yml | 2 ++ tests/test_allreduce_fault_tolerance.py | 6 +++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 1223a4121..126f87c74 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -39,6 +39,7 @@ jobs: run: | cd tests export HIVEMIND_MEMORY_SHARING_STRATEGY=file_descriptor + export HIVEMIND_DHT_NUM_WORKERS=1 pytest --durations=0 --durations-min=1.0 -n 4 -v build_and_test_p2pd: runs-on: ubuntu-latest @@ -100,6 +101,7 @@ jobs: - name: Test run: | export HIVEMIND_MEMORY_SHARING_STRATEGY=file_descriptor + export HIVEMIND_DHT_NUM_WORKERS=1 pytest --cov hivemind --cov-config=pyproject.toml -v tests - name: Upload coverage to Codecov uses: codecov/codecov-action@v3 diff --git a/tests/test_allreduce_fault_tolerance.py b/tests/test_allreduce_fault_tolerance.py index e122e9b01..92c45d47f 100644 --- a/tests/test_allreduce_fault_tolerance.py +++ b/tests/test_allreduce_fault_tolerance.py @@ -154,9 +154,9 @@ def _make_tensors(): hivemind.DHT(initial_peers=dht.get_visible_maddrs(), start=True), prefix="test", request_timeout=1.5, - min_matchmaking_time=2.0, - next_chunk_timeout=1.0, - allreduce_timeout=15, + min_matchmaking_time=3.0, + next_chunk_timeout=2.0, + allreduce_timeout=30, part_size_bytes=2**16, client_mode=(i == 1), start=True, From 30ea4c1006a3d502184d62e8f99729fe29dde674 Mon Sep 17 00:00:00 2001 From: Max Ryabinin Date: Sun, 3 Nov 2024 09:59:08 +0000 Subject: [PATCH 17/18] Remove parallel testing for now --- .github/workflows/run-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 126f87c74..34f1125c7 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -40,7 +40,7 @@ jobs: cd tests export HIVEMIND_MEMORY_SHARING_STRATEGY=file_descriptor export HIVEMIND_DHT_NUM_WORKERS=1 - pytest --durations=0 --durations-min=1.0 -n 4 -v + pytest --durations=0 --durations-min=1.0 -v build_and_test_p2pd: runs-on: ubuntu-latest timeout-minutes: 10 From fcb62321961dc3315e2c3df9e3bc523a86b1d10a Mon Sep 17 00:00:00 2001 From: Max Ryabinin Date: Sun, 3 Nov 2024 10:13:22 +0000 Subject: [PATCH 18/18] Remove pytest-xdist from deps --- requirements-dev.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/requirements-dev.txt b/requirements-dev.txt index 04030bfb9..8398751aa 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -2,7 +2,6 @@ pytest==6.2.5 # see https://github.com/pytest-dev/pytest/issues/9621 pytest-forked pytest-asyncio==0.16.0 pytest-cov -pytest-xdist>=3.0.2 coverage==6.0.2 # see https://github.com/pytest-dev/pytest-cov/issues/520 tqdm scikit-learn