diff --git a/distributed/client.py b/distributed/client.py index ad283a352ac..661fbdb50d5 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -2779,6 +2779,17 @@ def scatter( "Consider using a normal for loop and Client.submit" ) + if broadcast and dask.config.get( + "distributed.scheduler.active-memory-manager.start" + ): + raise RuntimeError( + "Scattering data with broadcast=True is incompatible " + "with the Active Memory Manager’s ReduceReplicas " + "policy. Please disable the AMM plugin by setting " + "the following config to False: " + "'distributed.scheduler.active-memory-manager.start'" + ) + try: local_worker = get_worker() except ValueError: diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index c3c40519299..b7a5a86d8ff 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -2353,6 +2353,12 @@ async def test__broadcast(c, s, a, b): assert a.data == b.data == {x.key: 1, y.key: 2} +@gen_cluster(client=True) +async def test__broadcast_raises(c, s, a, b): + with pytest.raises(RuntimeError): + await c.scatter([1, 2], broadcast=True) + + @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 4, config=NO_AMM) async def test__broadcast_integer(c, s, *workers): x, y = await c.scatter([1, 2], broadcast=2)