Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51591][PYTHON][CONNECT] Fix ThreadPoolExecutor failure in python 3.13 daily test #50332

Closed
wants to merge 5 commits into from

Conversation

zhengruifeng
Copy link
Contributor

@zhengruifeng zhengruifeng commented Mar 20, 2025

What changes were proposed in this pull request?

Fix ThreadPoolExecutor failure in python 3.13 daily build

Why are the changes needed?

in the last 3 weeks, python 3.13 daily build keeps failing with

  File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1823, in _handle_error
    raise error
  File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1534, in _execute_and_fetch_as_iterator
    for b in generator:
             ^^^^^^^^^
  File "<frozen _collections_abc>", line 360, in __next__
  File "/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 146, in send
    self._release_all()
    ~~~~~~~~~~~~~~~~~^^
  File "/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 241, in _release_all
    self._release_thread_pool.submit(target)
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^
  File "/usr/lib/python3.13/concurrent/futures/thread.py", line 171, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown

see https://github.com/apache/spark/actions/runs/13955602888/job/39065942222

In ThreadPoolExecutor.submit, this only happens when the pool is already shutdown:

    def submit(self, fn, /, *args, **kwargs):
        with self._shutdown_lock, _global_shutdown_lock:
            if self._broken:
                raise BrokenThreadPool(self._broken)

            if self._shutdown:
                raise RuntimeError('cannot schedule new futures after shutdown')
            if _shutdown:
                raise RuntimeError('cannot schedule new futures after '
                                   'interpreter shutdown')

            f = _base.Future()
            w = _WorkItem(f, fn, args, kwargs)

            self._work_queue.put(w)
            self._adjust_thread_count()
            return f

Does this PR introduce any user-facing change?

no

How was this patch tested?

PR builder with

default: '{"PYSPARK_IMAGE_TO_TEST": "python-313", "PYTHON_TO_TEST": "python3.13"}'

Was this patch authored or co-authored using generative AI tooling?

no

fix

Verified

This commit was signed with the committer’s verified signature.
zhengruifeng Ruifeng Zheng

Verified

This commit was signed with the committer’s verified signature.
zhengruifeng Ruifeng Zheng

Verified

This commit was signed with the committer’s verified signature.
zhengruifeng Ruifeng Zheng

Verified

This commit was signed with the committer’s verified signature.
zhengruifeng Ruifeng Zheng

Verified

This commit was signed with the committer’s verified signature.
zhengruifeng Ruifeng Zheng
@github-actions github-actions bot removed the INFRA label Mar 24, 2025
@zhengruifeng zhengruifeng changed the title [WIP][PYTHON] Fix ThreadPoolExecutor failure in python 3.13 daily test [SPARK-51591][PYTHON][CONNECT] Fix ThreadPoolExecutor failure in python 3.13 daily test Mar 24, 2025
@zhengruifeng zhengruifeng marked this pull request as ready for review March 24, 2025 05:32
@zhengruifeng
Copy link
Contributor Author

merged to master

@zhengruifeng zhengruifeng deleted the py_pool_313 branch March 24, 2025 05:34
@zhengruifeng
Copy link
Contributor Author

post-merge check: https://github.com/apache/spark/actions/runs/14028046790

3.13 daily test is green now

SauronShepherd pushed a commit to SauronShepherd/spark that referenced this pull request Mar 25, 2025

Unverified

This commit is not signed, but one or more authors requires that any commit attributed to them is signed.
…on 3.13 daily test

### What changes were proposed in this pull request?
Fix ThreadPoolExecutor failure in python 3.13 daily build

### Why are the changes needed?
in the last 3 weeks, python 3.13 daily build keeps failing with
```
  File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1823, in _handle_error
    raise error
  File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1534, in _execute_and_fetch_as_iterator
    for b in generator:
             ^^^^^^^^^
  File "<frozen _collections_abc>", line 360, in __next__
  File "/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 146, in send
    self._release_all()
    ~~~~~~~~~~~~~~~~~^^
  File "/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 241, in _release_all
    self._release_thread_pool.submit(target)
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^
  File "/usr/lib/python3.13/concurrent/futures/thread.py", line 171, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
```

see https://github.com/apache/spark/actions/runs/13955602888/job/39065942222

In `ThreadPoolExecutor.submit`, this only happens when the pool is already shutdown:
```
    def submit(self, fn, /, *args, **kwargs):
        with self._shutdown_lock, _global_shutdown_lock:
            if self._broken:
                raise BrokenThreadPool(self._broken)

            if self._shutdown:
                raise RuntimeError('cannot schedule new futures after shutdown')
            if _shutdown:
                raise RuntimeError('cannot schedule new futures after '
                                   'interpreter shutdown')

            f = _base.Future()
            w = _WorkItem(f, fn, args, kwargs)

            self._work_queue.put(w)
            self._adjust_thread_count()
            return f
```

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
PR builder with
```
default: '{"PYSPARK_IMAGE_TO_TEST": "python-313", "PYTHON_TO_TEST": "python3.13"}'
```

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#50332 from zhengruifeng/py_pool_313.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
kazemaksOG pushed a commit to kazemaksOG/spark-custom-scheduler that referenced this pull request Mar 27, 2025

Unverified

This commit is not signed, but one or more authors requires that any commit attributed to them is signed.
…on 3.13 daily test

### What changes were proposed in this pull request?
Fix ThreadPoolExecutor failure in python 3.13 daily build

### Why are the changes needed?
in the last 3 weeks, python 3.13 daily build keeps failing with
```
  File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1823, in _handle_error
    raise error
  File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1534, in _execute_and_fetch_as_iterator
    for b in generator:
             ^^^^^^^^^
  File "<frozen _collections_abc>", line 360, in __next__
  File "/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 146, in send
    self._release_all()
    ~~~~~~~~~~~~~~~~~^^
  File "/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 241, in _release_all
    self._release_thread_pool.submit(target)
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^
  File "/usr/lib/python3.13/concurrent/futures/thread.py", line 171, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
```

see https://github.com/apache/spark/actions/runs/13955602888/job/39065942222

In `ThreadPoolExecutor.submit`, this only happens when the pool is already shutdown:
```
    def submit(self, fn, /, *args, **kwargs):
        with self._shutdown_lock, _global_shutdown_lock:
            if self._broken:
                raise BrokenThreadPool(self._broken)

            if self._shutdown:
                raise RuntimeError('cannot schedule new futures after shutdown')
            if _shutdown:
                raise RuntimeError('cannot schedule new futures after '
                                   'interpreter shutdown')

            f = _base.Future()
            w = _WorkItem(f, fn, args, kwargs)

            self._work_queue.put(w)
            self._adjust_thread_count()
            return f
```

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
PR builder with
```
default: '{"PYSPARK_IMAGE_TO_TEST": "python-313", "PYTHON_TO_TEST": "python3.13"}'
```

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#50332 from zhengruifeng/py_pool_313.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants