Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#7346: Handle execution on Dask workers to avoid creating conflic… #7347

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
25 changes: 22 additions & 3 deletions modin/core/execution/dask/common/engine_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,25 @@
from dask.distributed import wait
from distributed import Future
from distributed.client import default_client
from distributed.worker import get_worker

Check warning on line 22 in modin/core/execution/dask/common/engine_wrapper.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dask/common/engine_wrapper.py#L22

Added line #L22 was not covered by tests


def get_dask_client():

Check warning on line 25 in modin/core/execution/dask/common/engine_wrapper.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dask/common/engine_wrapper.py#L25

Added line #L25 was not covered by tests
"""
Get the Dask client, reusing the worker's client if execution is on a Dask worker.

Returns
-------
distributed.Client
The Dask client.
"""
try:
client = default_client()
except ValueError:

Check warning on line 36 in modin/core/execution/dask/common/engine_wrapper.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dask/common/engine_wrapper.py#L34-L36

Added lines #L34 - L36 were not covered by tests
# We ought to be in a worker process
worker = get_worker()
client = worker.client
return client

Check warning on line 40 in modin/core/execution/dask/common/engine_wrapper.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dask/common/engine_wrapper.py#L38-L40

Added lines #L38 - L40 were not covered by tests


def _deploy_dask_func(func, *args, return_pandas_df=None, **kwargs): # pragma: no cover
Expand Down Expand Up @@ -83,7 +102,7 @@
list
The result of ``func`` split into parts in accordance with ``num_returns``.
"""
client = default_client()
client = get_dask_client()

Check warning on line 105 in modin/core/execution/dask/common/engine_wrapper.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dask/common/engine_wrapper.py#L105

Added line #L105 was not covered by tests
args = [] if f_args is None else f_args
kwargs = {} if f_kwargs is None else f_kwargs
if callable(func):
Expand Down Expand Up @@ -137,7 +156,7 @@
Any
An object(s) from the distributed memory.
"""
client = default_client()
client = get_dask_client()

Check warning on line 159 in modin/core/execution/dask/common/engine_wrapper.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dask/common/engine_wrapper.py#L159

Added line #L159 was not covered by tests
return client.gather(future)

@classmethod
Expand All @@ -164,7 +183,7 @@
# {'sep': <Future: finished, type: pandas._libs.lib._NoDefault, key: sep>, \
# 'delimiter': <Future: finished, type: NoneType, key: delimiter> ...
data = UserDict(data)
client = default_client()
client = get_dask_client()

Check warning on line 186 in modin/core/execution/dask/common/engine_wrapper.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dask/common/engine_wrapper.py#L186

Added line #L186 was not covered by tests
return client.scatter(data, **kwargs)

@classmethod
Expand Down
11 changes: 11 additions & 0 deletions modin/core/execution/dask/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@
def initialize_dask():
"""Initialize Dask environment."""
from distributed.client import default_client
from distributed.worker import get_worker

Check warning on line 33 in modin/core/execution/dask/common/utils.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dask/common/utils.py#L33

Added line #L33 was not covered by tests

try:

Check warning on line 35 in modin/core/execution/dask/common/utils.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dask/common/utils.py#L35

Added line #L35 was not covered by tests
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why initialize_dask is called in a worker? Could you verify if this is the case?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I definitely don't fully understand it. I can verify that this PR stops the issue and prevents multiple instances of a Dask cluster from starting under any of the operations where I was observing the behavior. Since it occurs in an apply context on a remote worker, it was beyond my available time (or existing technical skill) to debug exactly what was happening on the Dask worker. It seems possible that there's some other root cause leading to a call to initialize_dask.

I can verify by inference that initialize_dask is being called inside a worker, because it appears to be the only place in Modin 0.31 where the distributed.Client class is ever instantiated, and I can observe in the stdout that multiple Clients are being created as daemonic processes on Dask during the apply operation demonstrated in #7346, but only when working with Modin (not with the equivalent operation in Pandas).

I can hazard a partial guess as to what might be happening that would require further study based on some very confusing behavior I observed: sometimes while attempting to use client.submit(lambda x: x.apply(foo), pandas_df) directly on a Pandas dataframe (not Modin), I saw the same error, but only if Modin had been imported using import modin.pandas as pd. It made me wonder if Dask was calling a pd function while pd had been masked in the worker's namespace by Modin?

I think I can probably create a working example of that if I have enough time later, which might help find the root cause.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I have a decent understanding of what is going on, but there's still something weird happening that I can't explain.

Modin is never fully initialized on the workers. Modin's initialization code is never run on the workers, unless a worker runs a task requires subscribing to an engine it will never have this problem. Link:

class FactoryDispatcher(object):
"""
Class that routes IO-work to the factories.
This class is responsible for keeping selected factory up-to-date and dispatching
calls of IO-functions to its actual execution-specific implementations.
"""
__factory: factories.BaseFactory = None
@classmethod
def get_factory(cls) -> factories.BaseFactory:
"""Get current factory."""
if cls.__factory is None:
from modin.pandas import _update_engine
Engine.subscribe(_update_engine)
Engine.subscribe(cls._update_factory)
StorageFormat.subscribe(cls._update_factory)
return cls.__factory

The Series constructor ends up in this path via from_pandas. So when you call pd.Series(...) from within an apply function, cloudpickle will serialize all of the dependencies of that call and then unpack it within a worker. This could potentially explain why @data-makerman is seeing this happening with pandas after using Modin (but that still seems like a bug to me and is worth investigating more).

Now, what I don't understand is that in ipython and jupyter, instead of hitting this problem we have something else entirely, which is that Modin starts a Ray cluster inside a Dask cluster:

Screenshot 2024-07-22 at 4 14 48 PM

I don't really even understand why it's different, because if I use %run issue-7346.py within ipython I get the same issue as before with Dask:

Screenshot 2024-07-22 at 4 20 35 PM

So I think this patch is absolutely correct in detecting that we are in a worker and avoiding initializing a second Dask cluster, but I will follow on with a patch for this weird ipython issue.

Also, the reason this doesn't happen with Ray is because Ray uses a global variable that all workers share to ensure that one and only one Ray cluster are initialized in the same client/worker. We bypass initialization if Ray is initialized here:

if not ray.is_initialized() or override_is_cluster:

Ray will always be initialized from within a worker.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, what I don't understand is that in ipython and jupyter, instead of hitting this problem we have something else entirely, which is that Modin starts a Ray cluster inside a Dask cluster:

This probably has to do with the fact that workers know nothing about Modin configs set in the main process.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YarShev I think that is correct, but do you know why it would be different in ipython vs the python script?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would guess there is a different strategy for importing modules in ipython.

# Check if running within a Dask worker process
get_worker()

Check warning on line 37 in modin/core/execution/dask/common/utils.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dask/common/utils.py#L37

Added line #L37 was not covered by tests
# If the above line does not raise an error, we are in a worker process
# and should not create a new client
return
except ValueError:

Check warning on line 41 in modin/core/execution/dask/common/utils.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dask/common/utils.py#L40-L41

Added lines #L40 - L41 were not covered by tests
# Not in a Dask worker, proceed to check for or create a client
pass

try:
client = default_client()
Expand Down
20 changes: 20 additions & 0 deletions modin/tests/core/storage_formats/pandas/test_internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -2784,3 +2784,23 @@
match="<function DataFrame.<property fget:_test_default_property>> is not currently supported",
):
pd.DataFrame([[1]]).dataframe_test_default_property

def test_daemonic_worker_protection():
data-makerman marked this conversation as resolved.
Show resolved Hide resolved
# Test for issue #7346, wherein some operations on Dask cause a second submission of a task to
# the Dask client from the worker scope, which should not cause a new client to be created

def submission_triggering_row_operation(row):
row_to_dict = row.to_dict()
dict_to_row = pd.Series(row_to_dict)
return dict_to_row

df = pd.DataFrame(
{
"A": ["a", "b", "c", "d"],
"B": [1, 2, 3, 4],
"C": [1, 2, 3, 4],
"D": [1, 2, 3, 4],
}
)

df = df.apply(submission_triggering_row_operation, axis=1)
Fixed Show fixed Hide fixed
Loading