-
Notifications
You must be signed in to change notification settings - Fork 1
Description
Hello,
This issue is a mix of both channels_rabbitmq and carehare, but ultimately the related code is in carehare so I created the issue here. Let me know if this is incorrect.
I got blocked today using channels_rabbitmq and carehare today, more specifically when initialising a new AsyncWebsocketConsumer in channels_rabbitmq:
channels=4.0.0
channels-rabbitmq=4.0.1
carehare=1.0.2
I am using channels_rabbitmq.core.RabbitmqChannelLayer as back end for the layer, and my connect() method in my async websocket consumer is as follow :
async def connect(self):
await self.channel_layer.group_add("my_group", self.channel_name)This method, called when a websocket connect to my AsyncWebsocketConsumer never returns something from await. Later on, this library is creating a connection using a carehare object:
async def _setup_connection(
*,
connection: carehare.Connection,
queue_name: str,
remote_capacity: Optional[int],
expiry: Optional[int],
groups_exchange: str,
) -> None:
# Declare "groups" exchange. It may persist; spurious declarations
# (such as on reconnect) are harmless.
await connection.exchange_declare(groups_exchange, exchange_type="direct")The created connection will end up creating a transport and protocol object, and the protocol object is creating a ChannelIdStore at line 321
@_accept_handshake_frame.register # type: ignore [no-redef]
def _(self, frame: pamqp.commands.Connection.Tune) -> None:
assert pamqp.commands.Connection.Start in self._lifecycle
assert pamqp.commands.Connection.Tune not in self._lifecycle
self._lifecycle[pamqp.commands.Connection.Tune] = frame
self._frame_writer.send_frames(
[
pamqp.commands.Connection.TuneOk(
channel_max=frame.channel_max,
frame_max=frame.frame_max,
heartbeat=frame.heartbeat,
),
pamqp.commands.Connection.Open(virtual_host=self._virtual_host),
]
)
self._heartbeat_sender = _heartbeat.HeartbeatSender(
self._transport, frame.heartbeat
)
self._channel_id_store: ChannelIdStore = ChannelIdStore(frame.channel_max)The init method of the ChannelIdStore class is as follow
def __init__(self, channel_max: int):
self._new: Iterator[int] = iter(range(1, channel_max))
self._recycled: List[int] = []However, when channel_max = 0 (which is a default value when I am using channels_rabbitmq), the range(1, 0) returns an empty list, which means that we can never acquire a channel id below:
def acquire(self) -> int:
if self._recycled:
return self._recycled.pop()
else:
return next(self._new)and the whole connection chain get blocked
I am not sure if this is an intended behavior (if yes, where can I override the channel_max value ? It seems to be the one from my frame directly), but if not, I would propose the following change:` for the ChannelIdStore:
def __init__(self, channel_max: int):
if not channel_max or channel_max == 0:
channel_max = 65535
self._new: Iterator[int] = iter(range(1, channel_max))
self._recycled: List[int] = []I am willing to provide more information if required about my setup, I tried to be as relevant as possible,