Skip to content

COH-32325 - Fix issue with cache destroy when the stream handler has been closed #242

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions src/coherence/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1068,7 +1068,6 @@ def on_destroyed(name: str) -> None:
if name == cache_name and not this.destroyed:
this._events_manager._close()
this._destroyed = True
this._released = True
emitter.emit(MapLifecycleEvent.DESTROYED.value, name)

# noinspection PyProtectedMember
Expand Down Expand Up @@ -1289,10 +1288,13 @@ async def clear(self) -> None:
await self._near_cache.clear()

async def destroy(self) -> None:
self._internal_emitter.once(MapLifecycleEvent.DESTROYED.value)
self._internal_emitter.emit(MapLifecycleEvent.DESTROYED.value, self.name)
dispatcher: Dispatcher = self._request_factory.destroy_request()
await dispatcher.dispatch(self._stream_handler)
if not self._stream_handler._closed:
dispatcher: Dispatcher = self._request_factory.destroy_request()
await dispatcher.dispatch(self._stream_handler)
# Now do everything that is done for release
self._internal_emitter.once(MapLifecycleEvent.RELEASED.value)
self._internal_emitter.emit(MapLifecycleEvent.RELEASED.value, self.name)
await self._stream_handler.close()

async def release(self) -> None:
if self.active:
Expand Down
42 changes: 41 additions & 1 deletion tests/e2e/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,46 @@ def callback(n: str) -> None:
await session.close()


# noinspection PyShadowingNames,DuplicatedCode
@pytest.mark.asyncio(loop_scope="function")
async def test_cache_destroy_event() -> None:
session: Session = await tests.get_session()
cache: NamedCache[str, str] = await session.get_cache("test-" + str(int(time() * 1000)))
name: str = "UNSET"
destroy_event: Event = Event()
release_event: Event = Event()

def destroy_callback(n: str) -> None:
nonlocal name
name = n
destroy_event.set()

cache.on(MapLifecycleEvent.DESTROYED, destroy_callback)

def release_callback(n: str) -> None:
nonlocal name
name = n
release_event.set()

cache.on(MapLifecycleEvent.RELEASED, release_callback)

try:
await cache.put("A", "B")
await cache.put("C", "D")
assert await cache.size() == 2

await cache.destroy()
await tests.wait_for(destroy_event, EVENT_TIMEOUT)
await tests.wait_for(release_event, EVENT_TIMEOUT)

assert name == cache.name
assert cache.destroyed
assert cache.released
assert not cache.active
finally:
await session.close()


# noinspection PyShadowingNames,DuplicatedCode,PyUnresolvedReferences
@pytest.mark.asyncio
async def test_add_remove_index(person_cache: NamedCache[str, Person]) -> None:
Expand Down Expand Up @@ -603,7 +643,7 @@ async def test_ttl_configuration(test_session: Session) -> None:

@pytest.mark.asyncio
async def test_unary_error(test_session: Session) -> None:
cache: NamedCache[str, str] = await test_session.get_cache("unary_error")
cache: NamedCache[str, dict] = await test_session.get_cache("unary_error")

d = dict()
d["@class"] = "com.foo.Bar"
Expand Down