Skip to content

[Bug]: Pool is created but not stored, preventing reconnection on connection drop #44

@levish0

Description

@levish0

What happened?

AmqpBackend creates a deadpool_lapin::Pool during initialization but discards it immediately, storing only a single Channel. This defeats the purpose of using a connection pool and makes recovery from connection drops impossible.

When a connection drops:

  1. The stored Channel becomes invalid
  2. heartbeat() detects disconnection and returns an error
  3. No recovery is possible because the Pool was discarded
  4. Worker stops permanently

AmqpBackend struct (src/lib.rs:61-68)

pub struct AmqpBackend<M, Codec> {
    channel: Channel,       // Only stores a single channel
    queue: lapin::Queue,
    message_type: PhantomData<M>,
    config: Config,
    sink: sink::AmqpSink<M, Codec>,
    // Pool is NOT stored!
}

new_from_addr_with_config (src/lib.rs:258-287)

pub async fn new_from_addr_with_config<S: AsRef<str>>(
    addr: S,
    config: Config,
) -> Result<AmqpBackend<M, JsonCodec<Vec<u8>>>, lapin::Error> {
    let manager = Manager::new(addr.as_ref(), ConnectionProperties::default());
    let pool: Pool = deadpool::managed::Pool::builder(manager)
        .max_size(10)
        .build()
        .map_err(|error| { /* ... */ })?;

    let amqp_conn = pool.get().await.map_err(|error| { /* ... */ })?;
    let channel = amqp_conn.create_channel().await?;
    let queue = channel.queue_declare(/* ... */).await?;

    Ok(Self::new_with_config(channel, queue, config))
    // ^^^ Pool is dropped here. Only channel is passed.
}

heartbeat (src/lib.rs:94-118)

fn heartbeat(&self, worker: &WorkerContext) -> Self::Beat {
    // ...
    let is_connected = channel.status().connected();
    if !is_connected {
        // Only returns error, no reconnection attempt
        Some((Err(ErrorKind::IOError(/* ... */)), /* ... */))
    }
    // ...
}

Impact

  • Any network hiccup kills the worker permanently
  • deadpool_lapin provides connection pooling and recycling, but none of it is used
  • Long-running workers will eventually fail and never recover

Expected behavior

When a connection drops:

  1. Backend should detect the disconnection
  2. Get a new connection from the pool
  3. Create a fresh channel
  4. Continue processing

Steps to reproduce

  1. Start a worker using AmqpBackend::new_from_addr()
  2. Restart RabbitMQ (or any network interruption)
  3. Connection is restored

Expected: Worker recovers and continues processing
Actual: Worker stays dead, never reconnects

Version

0.7.x

Environment

  • OS: Windows 11
  • Rust version: 1.92.0
  • Cargo version: 1.92.0

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions