Skip to content

Preventing Deadlocks When Reading Metadata Concurrently via asyncio.gather #3207

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions changes/3207.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
- This pull request resolves the issue of deadlocks and indefinite hangs when
opening Zarr v3 arrays on synchronous fsspec filesystems, by implementing a
fallback to sequential reads for non-concurrency-safe filesystems, ensuring
robust metadata retrieval without sacrificing performance for safe
filesystems. Furthermore ``Store._get_many`` was modified to retrieve objects
concurrently from storage. The previous implementation was sequential,
awaiting each ``self.get(*req)`` before proceeding, contrary to the
docstring.
- Introduced ``StorePath.get_many``, mimicing the behaviour of `StorePath.get`.
- Use ``Store._get_many`` and ``StorePath.get_many`` in ``get_array_metadata``.
- Implemented ``FsspecStore._get_many`` to conditionally use ``asyncio.gather``
based on the concurrency safety of the underlying file system, enhancing
compatibility with synchronous file systems by avoiding deadlocks when
accessing metadata concurrently. Adding tests ``LockableFileSystem`` to test
with async/sync behavior.
15 changes: 12 additions & 3 deletions src/zarr/abc/store.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from asyncio import gather
from asyncio import as_completed, gather
from dataclasses import dataclass
from itertools import starmap
from typing import TYPE_CHECKING, Protocol, runtime_checkable
Expand Down Expand Up @@ -414,8 +414,17 @@ async def _get_many(
that objects will be retrieved in the order in which they were requested, so this method
yields tuple[str, Buffer | None] instead of just Buffer | None
"""
for req in requests:
yield (req[0], await self.get(*req))

async def _get_with_name(
key: str, prototype: BufferPrototype, byte_range: ByteRequest | None
) -> tuple[str, Buffer | None]:
value = await self.get(key, prototype, byte_range)
return key, value

tasks = [_get_with_name(*req) for req in requests]
for completed in as_completed(tasks):
task = await completed
yield task
Comment on lines +418 to +427
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the advantage of this new implementation? the previous implementation was extremely simple, which I think is good for an abc.

Copy link
Author

@dgegen dgegen Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The claim in the docstring is incorrect given the previous implementation.

This loop is sequential: it awaits each self.get(*req) and yields it before moving on to the next. Each request is handled one at a time, in the exact order provided. Therefore, results are always yielded in the same order as the input requests.

It is thus not fully concurrent which would be desirable in an I/O-limited system and, at least as I understand, kind of defeats the purpose of having an asynchronous _get_many method yielding results in the first place. Because if we stick to the order, we might as well await all results and simply replace the implementation of _get_many with that of _get_many_ordered, making it faster and arguable more easy to used in the asynchronous case. If we want to give the extra flexibility of not awaiting all at once, but still requesting all at the same time, the new implementation would be the right one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the point of the default implementation is to be the simplest possible implementation of _get_many that any child class can safely support, given an implementation of get. But child classes should also be able to override this with more efficient methods where applicable, and in these cases the order of results is not guaranteed. hence the type annotation in the original method.

Copy link
Author

@dgegen dgegen Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this somewhat confusing, as I would have expected the standard implementation to be fully asynchronous. However, if the goal is to maximize simplicity, then having an asynchronous implementation that runs synchronously might be the way to go.

That being said, if we revert this to the original, we would only have to also remove the FsspecStore._get_many from my current solution. Unless you think we should not have a _get_many_ordered method and use the _get_many method instead and then always sort the values locally, as they could be of a different order in other implementations.


async def getsize(self, key: str) -> int:
"""
Expand Down
20 changes: 11 additions & 9 deletions src/zarr/core/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,22 +211,25 @@ async def get_array_metadata(
store_path: StorePath, zarr_format: ZarrFormat | None = 3
) -> dict[str, JSON]:
if zarr_format == 2:
zarray_bytes, zattrs_bytes = await gather(
(store_path / ZARRAY_JSON).get(prototype=cpu_buffer_prototype),
(store_path / ZATTRS_JSON).get(prototype=cpu_buffer_prototype),
)
requests = [(key, default_buffer_prototype(), None) for key in [ZARRAY_JSON, ZATTRS_JSON]]
retrieved_buffers = {key: value async for key, value in store_path.get_many(requests)}
zarray_bytes, zattrs_bytes = tuple(retrieved_buffers.get(req[0]) for req in requests)

if zarray_bytes is None:
raise FileNotFoundError(store_path)
elif zarr_format == 3:
zarr_json_bytes = await (store_path / ZARR_JSON).get(prototype=cpu_buffer_prototype)
if zarr_json_bytes is None:
raise FileNotFoundError(store_path)
elif zarr_format is None:
zarr_json_bytes, zarray_bytes, zattrs_bytes = await gather(
(store_path / ZARR_JSON).get(prototype=cpu_buffer_prototype),
(store_path / ZARRAY_JSON).get(prototype=cpu_buffer_prototype),
(store_path / ZATTRS_JSON).get(prototype=cpu_buffer_prototype),
requests = [
(key, default_buffer_prototype(), None) for key in [ZARR_JSON, ZARRAY_JSON, ZATTRS_JSON]
]
retrieved_buffers = {key: value async for key, value in store_path.get_many(requests)}
zarr_json_bytes, zarray_bytes, zattrs_bytes = tuple(
retrieved_buffers.get(req[0]) for req in requests
)

if zarr_json_bytes is not None and zarray_bytes is not None:
# warn and favor v3
msg = f"Both zarr.json (Zarr format 3) and .zarray (Zarr format 2) metadata objects exist at {store_path}. Zarr v3 will be used."
Expand Down Expand Up @@ -1445,7 +1448,6 @@ async def _save_metadata(self, metadata: ArrayMetadata, ensure_parents: bool = F
).items()
]
)

await gather(*awaitables)

async def _set_selection(
Expand Down
41 changes: 25 additions & 16 deletions src/zarr/core/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,19 +513,23 @@ async def open(
consolidated_key = use_consolidated

if zarr_format == 2:
paths = [store_path / ZGROUP_JSON, store_path / ZATTRS_JSON]
requests = [
(key, default_buffer_prototype(), None) for key in [ZGROUP_JSON, ZATTRS_JSON]
]
if use_consolidated or use_consolidated is None:
paths.append(store_path / consolidated_key)
requests.append((consolidated_key, default_buffer_prototype(), None))

zgroup_bytes, zattrs_bytes, *rest = await asyncio.gather(
*[path.get() for path in paths]
retrieved_buffers = {key: value async for key, value in store_path.get_many(requests)}
zgroup_bytes, zattrs_bytes = (
retrieved_buffers[ZGROUP_JSON],
retrieved_buffers[ZATTRS_JSON],
)

if zgroup_bytes is None:
raise FileNotFoundError(store_path)

if use_consolidated or use_consolidated is None:
maybe_consolidated_metadata_bytes = rest[0]

maybe_consolidated_metadata_bytes = retrieved_buffers[consolidated_key]
else:
maybe_consolidated_metadata_bytes = None

Expand All @@ -534,17 +538,18 @@ async def open(
if zarr_json_bytes is None:
raise FileNotFoundError(store_path)
elif zarr_format is None:
requests = [
(key, default_buffer_prototype(), None)
for key in [ZARR_JSON, ZGROUP_JSON, ZATTRS_JSON, consolidated_key]
]
retrieved_buffers = {key: value async for key, value in store_path.get_many(requests)}
(
zarr_json_bytes,
zgroup_bytes,
zattrs_bytes,
maybe_consolidated_metadata_bytes,
) = await asyncio.gather(
(store_path / ZARR_JSON).get(),
(store_path / ZGROUP_JSON).get(),
(store_path / ZATTRS_JSON).get(),
(store_path / str(consolidated_key)).get(),
)
) = tuple(retrieved_buffers.get(req[0]) for req in requests)

if zarr_json_bytes is not None and zgroup_bytes is not None:
# warn and favor v3
msg = f"Both zarr.json (Zarr format 3) and .zgroup (Zarr format 2) metadata objects exist at {store_path}. Zarr format 3 will be used."
Expand Down Expand Up @@ -3476,10 +3481,14 @@ async def _read_metadata_v2(store: Store, path: str) -> ArrayV2Metadata | GroupM
"""
# TODO: consider first fetching array metadata, and only fetching group metadata when we don't
# find an array
zarray_bytes, zgroup_bytes, zattrs_bytes = await asyncio.gather(
store.get(_join_paths([path, ZARRAY_JSON]), prototype=default_buffer_prototype()),
store.get(_join_paths([path, ZGROUP_JSON]), prototype=default_buffer_prototype()),
store.get(_join_paths([path, ZATTRS_JSON]), prototype=default_buffer_prototype()),
requests = [
(_join_paths([path, ZARRAY_JSON]), default_buffer_prototype(), None),
(_join_paths([path, ZGROUP_JSON]), default_buffer_prototype(), None),
(_join_paths([path, ZATTRS_JSON]), default_buffer_prototype(), None),
Comment on lines +3485 to +3487
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets bind _join_paths([path, X]) to a variable so that we don't call the _join_paths function so many times. for example:

zarray_path = _join_paths([path, ZARRAY_JSON])
...

]
retrieved_buffers = {key: value async for key, value in store._get_many(requests)}
zarray_bytes, zgroup_bytes, zattrs_bytes = tuple(
retrieved_buffers.get(req[0]) for req in requests
)

if zattrs_bytes is None:
Expand Down
22 changes: 22 additions & 0 deletions src/zarr/storage/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import importlib.util
import json
from asyncio import gather
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal, Self, TypeAlias

Expand All @@ -27,6 +28,8 @@
FSMap = None

if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Iterable

from zarr.core.buffer import BufferPrototype


Expand Down Expand Up @@ -163,6 +166,25 @@ async def get(
prototype = default_buffer_prototype()
return await self.store.get(self.path, prototype=prototype, byte_range=byte_range)

async def get_many(
self, requests: Iterable[tuple[str, BufferPrototype, ByteRequest | None]]
) -> AsyncGenerator[tuple[str, Buffer | None], None]:
"""
Read multiple bytes from the store in order of the provided path_components.

Parameters
----------
requests : Iterable[tuple[str, BufferPrototype, ByteRequest | None]]

Yields
-------
tuple[str, Buffer | None]
"""
path_component_dict = {(self / req[0]).path: req[0] for req in requests}
complete_requests = [((self / req[0]).path, *req[1:]) for req in requests]
async for result in self.store._get_many(complete_requests):
yield (path_component_dict[result[0]], *result[1:])

async def set(self, value: Buffer, byte_range: ByteRequest | None = None) -> None:
"""
Write bytes to the store.
Expand Down
14 changes: 13 additions & 1 deletion src/zarr/storage/_fsspec.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
import json
import warnings
from contextlib import suppress
Expand All @@ -18,7 +19,7 @@
from zarr.storage._common import _dereference_path

if TYPE_CHECKING:
from collections.abc import AsyncIterator, Iterable
from collections.abc import AsyncGenerator, AsyncIterator, Iterable

from fsspec import AbstractFileSystem
from fsspec.asyn import AsyncFileSystem
Expand Down Expand Up @@ -326,6 +327,17 @@ async def get(
else:
return value

async def _get_many(
self, requests: Iterable[tuple[str, BufferPrototype, ByteRequest | None]]
) -> AsyncGenerator[tuple[str, Buffer | None], None]:
if getattr(self.fs, "asynchronous", True):
async for result in super()._get_many(requests):
yield result
else:
for key, prototype, byte_range in requests:
value = await self.get(key, prototype, byte_range)
yield (key, value)

async def set(
self,
key: str,
Expand Down
104 changes: 104 additions & 0 deletions tests/test_store/test_fsspec.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from __future__ import annotations

import asyncio
import json
import os
import re
import warnings
from itertools import cycle
from typing import TYPE_CHECKING, Any

import numpy as np
Expand Down Expand Up @@ -42,6 +45,7 @@
]

fsspec = pytest.importorskip("fsspec")
AsyncFileSystem = pytest.importorskip("fsspec.asyn").AsyncFileSystem
s3fs = pytest.importorskip("s3fs")
requests = pytest.importorskip("requests")
moto_server = pytest.importorskip("moto.moto_server.threaded_moto_server")
Expand Down Expand Up @@ -440,3 +444,103 @@ async def test_with_read_only_auto_mkdir(tmp_path: Path) -> None:

store_w = FsspecStore.from_url(f"file://{tmp_path}", storage_options={"auto_mkdir": False})
_ = store_w.with_read_only()


class LockableFileSystem(AsyncFileSystem):
"""
A mock file system that simulates asynchronous and synchronous behaviors with artificial delays.
"""

def __init__(
self,
asynchronous: bool,
lock: bool | None = None,
delays: tuple[float, ...] | None = None,
) -> None:
if delays is None:
delays = (
0.03,
0.01,
)
lock = lock if lock is not None else not asynchronous

# self.asynchronous = asynchronous
self.lock = asyncio.Lock() if lock else None
self.delays = cycle(delays)
self.async_impl = True

super().__init__(asynchronous=asynchronous)

async def _check_active(self) -> None:
if self.lock and self.lock.locked():
raise RuntimeError("Concurrent requests!")

async def _cat_file(self, path, start=None, end=None) -> bytes:
await self._simulate_io_operation(path)
return self.get_data(path)

async def _await_io(self) -> None:
await asyncio.sleep(next(self.delays))

async def _simulate_io_operation(self, path) -> None:
if self.lock:
await self._check_active()
async with self.lock:
await self._await_io()
else:
await self._await_io()

def get_store(self, path: str) -> FsspecStore:
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=UserWarning)
return FsspecStore(fs=self, path=path)

@staticmethod
def get_data(key: str) -> bytes:
return f"{key}_data".encode()


@pytest.mark.asyncio
class TestLockableFSSPECFileSystem:
@pytest.fixture(autouse=True)
async def setup(self):
self.path = "root"
self.store_async = LockableFileSystem(asynchronous=True).get_store(path=self.path)
self.store_sync = LockableFileSystem(asynchronous=False).get_store(path=self.path)

def get_requests_and_true_results(self, path_components=("a", "b")):
true_results = [
(component, LockableFileSystem.get_data(f"{self.path}/{component}"))
for component in path_components
]
requests = [(component, default_buffer_prototype(), None) for component in path_components]
return requests, true_results

async def test_get_many_asynchronous_fs(self):
requests, true_results = self.get_requests_and_true_results(("a", "b", "c"))

results = []
async for k, v in self.store_async._get_many(requests):
results.append((k, v.to_bytes() if v else None))

results_ordered = sorted(results, key=lambda x: x[0])
assert results_ordered == true_results

async def test_get_many_synchronous_fs(self):
requests, true_results = self.get_requests_and_true_results()

results = []
async for k, v in self.store_sync._get_many(requests):
results.append((k, v.to_bytes() if v else None))
# In the synchronous case, results should be in the same order as requests

assert results == true_results


async def test_asynchronous_locked_fs_raises(self):
store = LockableFileSystem(asynchronous=True, lock=True).get_store(path="root")
requests, _ = self.get_requests_and_true_results()

with pytest.raises(RuntimeError, match="Concurrent requests!"):
async for _, _ in store._get_many(requests):
pass
Loading