Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix async client safety #3512

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

abrookins
Copy link
Contributor

@abrookins abrookins commented Feb 12, 2025

Fix async safety when Redis client is used as an async context manager.

When the async Redis client is used as an async context manager and
called from different corotuines, one coroutine can exit, shutting
down the client's connection pool, while another coroutine is
attempting to use a connection. This results in a connection error,
such as:

redis.exceptions.ConnectionError: Connection closed by server.

Additional locking in ConnectionPool resolves the problem but
introduces extreme latency due to the locking. Instead, this PR
implements a shielded counter that increments as callers enter the async
context manager and decrements when they exit. The client then closes
its connection pool only after all active contexts exit.

Performance is on par with use of the client without a context manager.

Reproducing the issue

You should be able to reproduce the connection error with the following
script, using the "manager" argument. Running the script with the
"no-manager" argument should not produce any errors.

With this patch applied, the error no longer occurs.

from redis.asyncio import Redis
import asyncio
import sys


async def async_ctx_manager(): 
    print("creating r client")
    r = Redis(host="localhost", port=6379, db=0)
    
    # This works fine:
    # async with r:
    #     async def task():
    #         print("setting foo")
    #         await r.set("foo", "bar")
    #         print(await r.get("foo"))
    #         print("leaving context manager")

    #     tasks = [task() for _ in range(1000)]
    #     await asyncio.gather(*tasks)

    # This does not work (fails with a connection error)
    # Example one:
    #   File "/Users/andrew.brookins/src/redis-py/redis/_parsers/hiredis.py", line 191, in read_from_socket
    #     raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None
    # redis.exceptions.ConnectionError: Connection closed by server.
    #
    # Example two:
    #   File "/Users/andrew.brookins/src/redis-py/redis/asyncio/connection.py", line 511, in send_packed_command
    #       raise ConnectionError(
    #  redis.exceptions.ConnectionError: Error UNKNOWN while writing to socket. Connection lost.
    async def task():
        print("entering context manager")
        async with r:
            print("setting foo")
            await r.set("foo", "bar")
            print(await r.get("foo"))
            print("leaving context manager")
            
    tasks = [task() for _ in range(1000)]
    await asyncio.gather(*tasks)

    print("deleting foo")
    await r.delete("foo")
    
    print("connections in use and connected? ", [c.is_connected for c in r.connection_pool._in_use_connections])
    print("connections available and connected? ", [c.is_connected for c in r.connection_pool._available_connections])


async def without_ctx_manager():
    r = Redis(host="localhost", port=6379, db=0)
    
    # This works fine:
    async def task():
        await r.set("foo", "bar")
        print(await r.get("foo"))
        
    tasks = [task() for _ in range(1000)]
    await asyncio.gather(*tasks)


    # If we didn't close, we'd have many open connections
    print("connections in use and connected? ", [c.is_connected for c in r.connection_pool._in_use_connections])
    print("connections available and connected? ", [c.is_connected for c in r.connection_pool._available_connections])

    await r.aclose()
    
    print("After closing connection pool")
    # Now we've closed all open connections in the pool
    print("connections in use and connected? ", [c.is_connected for c in r.connection_pool._in_use_connections])
    print("connections available and connected? ", [c.is_connected for c in r.connection_pool._available_connections])

    print("leaving function")


if __name__ == "__main__":
    if len(sys.argv) < 1:
        print("Usage: python test.py [manager|no-manager]")
        sys.exit(1)
    cmd = sys.argv[1] 
    if cmd == "manager":
        asyncio.run(async_ctx_manager())
    elif cmd == "no-manager":
        asyncio.run(without_ctx_manager())
    else:
        print("Usage: python test.py [manager|no-manager]")
        sys.exit(1)

Pull Request check-list

Please make sure to review and check all of these items:

  • Do tests and lints pass with this change?
  • Do the CI tests pass with this change (enable it first in your forked repo and wait for the github action build to finish)?
  • Is the new or changed code fully tested?
  • Is a documentation update included (if this change modifies existing APIs, or introduces new ones)?
  • Is there an example added to the examples folder (if applicable)?
  • Was the change added to CHANGES file?

NOTE: these things are not required to open a PR and can be done
afterwards / while the PR is open.

Description of change

Please provide a description of the change here.

When the async Redis client is used as an async context manager and
called from different corotuines, one coroutine can exit, shutting
down the client's connection pool, while another coroutine is
attempting to use a connection. This results in a connection error,
such as:

redis.exceptions.ConnectionError: Connection closed by server.

Additional locking in `ConnectionPool` resolves the problem but
introduces extreme latency due to the locking. Instead, this PR
implements a shielded counter that increments as callers enter the async
context manager and decrements when they exit. The client then closes
its connection pool only after all active contexts exit.

Performance is on par with use of the client without a context manager.
self._usage_counter -= 1
raise

async def _decrement_usage(self) -> int:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A helper method is required so we can use it in the shield().

@petyaslavova
Copy link
Collaborator

@abrookins Appreciate your contribution! We’ll review your change soon.

@ishaan-jaff
Copy link

Is there an ETA on when this will land on a new release? This is a critical blocker for LiteLLM users @petyaslavova

Anything I can do to help accelerate this ?

return await self.initialize()
except Exception:
# If initialization fails, decrement the counter to keep it in sync
async with self._usage_lock:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can directly use the new function _decrement_usage here. The same applies to cluster.py changes as well.

connection pool is only closed (via aclose()) when no context is using
the client.
"""
async with self._usage_lock:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest adding another function _increment_usage for this operation. It will be easier to read and follow the code if here you call _increment_usage and below in the except clause you call _decrement_usage. The same applies to cluster.py changes as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants