Skip to content

Commit f4d275a

Browse files
author
zhousheng06
committed
Merge remote-tracking branch 'origin/feature/support_rediscluster_transport' into feature/support_rediscluster_transport
2 parents 1a5eb30 + 9983557 commit f4d275a

7 files changed

Lines changed: 143 additions & 5 deletions

File tree

kombu/connection.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1035,6 +1035,26 @@ def is_evented(self):
10351035
BrokerConnection = Connection
10361036

10371037

1038+
class PooledConnection(Connection):
1039+
"""Wraps :class:`kombu.Connection`.
1040+
1041+
This wrapper modifies :meth:`kombu.Connection.__exit__` to close the connection
1042+
in case any exception occurred while the context was active.
1043+
"""
1044+
1045+
def __init__(self, pool, **kwargs):
1046+
self._pool = pool
1047+
super().__init__(**kwargs)
1048+
1049+
def __enter__(self):
1050+
return self
1051+
1052+
def __exit__(self, exc_type, exc_val, exc_tb):
1053+
if exc_type is not None and self._pool.limit:
1054+
self._pool.replace(self)
1055+
return super().__exit__(exc_type, exc_val, exc_tb)
1056+
1057+
10381058
class ConnectionPool(Resource):
10391059
"""Pool of connections."""
10401060

@@ -1046,7 +1066,7 @@ def __init__(self, connection, limit=None, **kwargs):
10461066
super().__init__(limit=limit)
10471067

10481068
def new(self):
1049-
return self.connection.clone()
1069+
return PooledConnection(self, **dict(self.connection._info(resolve=False)))
10501070

10511071
def release_resource(self, resource):
10521072
try:

kombu/messaging.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
from .common import maybe_declare
99
from .compression import compress
10-
from .connection import is_connection, maybe_channel
10+
from .connection import PooledConnection, is_connection, maybe_channel
1111
from .entity import Exchange, Queue, maybe_delivery_mode
1212
from .exceptions import ContentDisallowed
1313
from .serialization import dumps, prepare_accept_content
@@ -257,6 +257,12 @@ def __exit__(
257257
exc_val: BaseException | None,
258258
exc_tb: TracebackType | None
259259
) -> None:
260+
# In case the connection is part of a pool it needs to be
261+
# replaced in case of an exception
262+
if self.__connection__ is not None and exc_type is not None:
263+
if isinstance(self.__connection__, PooledConnection):
264+
self.__connection__._pool.replace(self.__connection__)
265+
260266
self.release()
261267

262268
def release(self):

requirements/extras/redis.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
redis>=4.5.2,!=4.5.5,!=5.0.2,<=5.2.0
1+
redis>=4.5.2,!=4.5.5,!=5.0.2,<=5.2.1

requirements/test.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
hypothesis<7
22
Pyro4==4.82
3-
pytest-freezer==0.4.8
3+
pytest-freezer==0.4.9
44
pytest-sugar==1.0.0
55
pytest==8.3.4
66
pre-commit>=3.5.0,<3.8.0; python_version < '3.9'

t/integration/test_py_amqp.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
from __future__ import annotations
22

33
import os
4+
import uuid
45

56
import pytest
7+
from amqp.exceptions import NotFound
68

79
import kombu
10+
from kombu.connection import ConnectionPool
811

912
from .common import (BaseExchangeTypes, BaseFailover, BaseMessage,
1013
BasePriority, BaseTimeToLive, BasicFunctionality)
@@ -20,6 +23,12 @@ def get_failover_connection(hostname, port, vhost):
2023
)
2124

2225

26+
def get_confirm_connection(hostname, port):
27+
return kombu.Connection(
28+
f"pyamqp://{hostname}:{port}", transport_options={"confirm_publish": True}
29+
)
30+
31+
2332
@pytest.fixture()
2433
def invalid_connection():
2534
return kombu.Connection('pyamqp://localhost:12345')
@@ -47,6 +56,14 @@ def failover_connection(request):
4756
)
4857

4958

59+
@pytest.fixture()
60+
def confirm_publish_connection():
61+
return get_confirm_connection(
62+
hostname=os.environ.get("RABBITMQ_HOST", "localhost"),
63+
port=os.environ.get("RABBITMQ_5672_TCP", "5672"),
64+
)
65+
66+
5067
@pytest.mark.env('py-amqp')
5168
@pytest.mark.flaky(reruns=5, reruns_delay=2)
5269
class test_PyAMQPBasicFunctionality(BasicFunctionality):
@@ -81,3 +98,49 @@ class test_PyAMQPFailover(BaseFailover):
8198
@pytest.mark.flaky(reruns=5, reruns_delay=2)
8299
class test_PyAMQPMessage(BaseMessage):
83100
pass
101+
102+
103+
@pytest.mark.env("py-amqp")
104+
@pytest.mark.flaky(reruns=5, reruns_delay=2)
105+
class test_PyAMQPConnectionPool:
106+
def test_publish_confirm_does_not_block(self, confirm_publish_connection):
107+
"""Tests that the connection pool closes connections in case of an exception.
108+
109+
In case an exception occurs while the connection is in use, the pool should
110+
close the exception. In case the connection is not closed before releasing it
111+
back to the pool, the connection would remain in an unusable state, causing
112+
causing the next publish call to time out or block forever in case no
113+
timeout is specified.
114+
"""
115+
pool = ConnectionPool(connection=confirm_publish_connection, limit=1)
116+
117+
try:
118+
with pool.acquire(block=True) as connection:
119+
producer = kombu.Producer(connection)
120+
queue = kombu.Queue(
121+
f"test-queue-{uuid.uuid4()}", channel=connection
122+
)
123+
queue.declare()
124+
producer.publish(
125+
{"foo": "bar"}, routing_key=str(uuid.uuid4()), retry=False
126+
)
127+
assert connection.connected
128+
queue.delete()
129+
try:
130+
queue.get()
131+
except NotFound:
132+
raise
133+
except NotFound:
134+
pass
135+
136+
with pool.acquire(block=True) as connection:
137+
assert not connection.connected
138+
producer = kombu.Producer(connection)
139+
queue = kombu.Queue(
140+
f"test-queue-{uuid.uuid4()}", channel=connection
141+
)
142+
queue.declare()
143+
# In case the connection is broken, we should get a Timeout here
144+
producer.publish(
145+
{"foo": "bar"}, routing_key=str(uuid.uuid4()), retry=False, timeout=3
146+
)

t/unit/test_connection.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1045,6 +1045,43 @@ def test_acquire_channel(self):
10451045
with P.acquire_channel() as (conn, channel):
10461046
assert channel is conn.default_channel
10471047

1048+
def test_exception_during_connection_use(self):
1049+
"""Tests that connections retrieved from a pool are replaced.
1050+
1051+
In case of an exception during usage of an exception, it is required that the
1052+
connection is 'replaced' (effectively closing the connection) before releasing
1053+
it back into the pool. This ensures that reconnecting to the broker is required
1054+
before the next usage.
1055+
"""
1056+
P = self.create_resource(1)
1057+
1058+
# Raising an exception during a network call should cause the cause the
1059+
# connection to be replaced.
1060+
with pytest.raises(IOError):
1061+
with P.acquire() as connection:
1062+
connection.connect()
1063+
connection.heartbeat_check = Mock()
1064+
connection.heartbeat_check.side_effect = IOError()
1065+
_ = connection.heartbeat_check()
1066+
1067+
# Acquiring the same connection from the pool yields a disconnected Connection
1068+
# object.
1069+
with P.acquire() as connection:
1070+
assert not connection.connected
1071+
1072+
# acquire_channel automatically reconnects
1073+
with pytest.raises(IOError):
1074+
with P.acquire_channel() as (connection, _):
1075+
# The Connection object should still be connected
1076+
assert connection.connected
1077+
connection.heartbeat_check = Mock()
1078+
connection.heartbeat_check.side_effect = IOError()
1079+
_ = connection.heartbeat_check()
1080+
1081+
with P.acquire() as connection:
1082+
# The connection should be closed
1083+
assert not connection.connected
1084+
10481085

10491086
class test_ChannelPool(ResourceCase):
10501087

t/unit/test_pools.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import pytest
66

77
from kombu import Connection, Producer, pools
8-
from kombu.connection import ConnectionPool
8+
from kombu.connection import ConnectionPool, PooledConnection
99
from kombu.utils.collections import eqhash
1010

1111

@@ -37,6 +37,18 @@ def test_releases_connection_when_Producer_raises(self):
3737
self.pool.create_producer()
3838
conn.release.assert_called_with()
3939

40+
def test_exception_during_connection_use(self):
41+
"""Tests that the connection is closed in case of an exception."""
42+
with pytest.raises(IOError):
43+
with self.pool.acquire() as producer:
44+
producer.__connection__ = Mock(spec=PooledConnection)
45+
producer.__connection__._pool = self.connections
46+
producer.publish = Mock()
47+
producer.publish.side_effect = IOError()
48+
producer.publish("test data")
49+
50+
self.connections.replace.assert_called_once()
51+
4052
def test_prepare_release_connection_on_error(self):
4153
pp = Mock()
4254
p = pp.return_value = Mock()

0 commit comments

Comments
 (0)