Dead letter publisher patterns #1161
-
When handling an event with RabbitMQ broker using FastAPI plugin, I want to be able to send it to a dead letter exchange when an error occurs. While it's pretty clear from docs how I can declare a new exchange and parameterise it accordingly, I have some doubts on the right pattern to use with publishers. Borrowing from Faststream docs page, if I understand correctly, I could do something like: from faststream.exceptions import NackMessage
from faststream.rabbit import RabbitExchange, RabbitQueue
from faststream.rabbit.fastapi import ( # type: ignore
Logger,
RabbitMessage,
RabbitRouter,
)
from internal.logic import compute_output
from internal.models import Input, Output
...
# Setup StreamRouter and instantiate RabbitMQ broker
router = RabbitRouter(
settings.rabbitmq_url,
schema_url="/asyncapi",
include_in_schema=True,
)
broker = router.broker
# Config
exchange = RabbitExchange("my-exchange", durable=True, routing_key="") ### exchange
dlx = RabbitExchange("dead-letter-exchange", durable=True, routing_key="") ### dead letter exchange
queue_1 = RabbitQueue("queue-1", durable=True, routing_key="") ### the queue I'm listening to
queue_2 = RabbitQueue("queue-2", durable=True, routing_key="queue-2") ### the queue I'm publishing output into
# Instantiate publisher object
publisher = broker.publisher(queue=queue_2, exchange=exchange, persist=True)
# Instantiate dlx publisher object
dlx_publisher = broker.publisher(exchange=dlx, persist=True)
# Declare broker exchanges and queues on application startup
@router.after_startup
async def declare(app: FastAPI):
await broker.declare_exchange(exchange)
await broker.declare_exchange(dlx)
await broker.declare_queue(queue_1)
await broker.declare_queue(queue_2)
### Here I decorate my handler function with the publisher object that sends messages to my output queue
### BUT I also use dlx_publisher object declared above to push messages to dead letter exchange
@publisher
@router.subscriber(queue_1, exchange, retry=3)
async def handle_event(
input: Input, msg: RabbitMessage, logger: Logger
) -> Output:
try:
output = compute_output(input)
return output
except Exception as err:
logger.exception(err)
dlx_publisher.publish(input)
raise NackMessage()
# Create app
app = FastAPI(lifespan=router.lifespan_context)
# Include a StreamRouter object that uses RabbitMQ broker
app.include_router(router) Do these (a) pattern and (b) config make sense? On (a) pattern: not really clear what happens behind the scenes in Faststream event handling ( Thanks in advance! |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 5 replies
-
@darioprencipe your code is working, but not using the native RMQ dead-letter features. https://www.rabbitmq.com/dlx.html#using-optional-queue-arguments |
Beta Was this translation helpful? Give feedback.
@darioprencipe your code is working, but not using the native RMQ dead-letter features.
FastStream allows you to manual ack/nack message even without 'no_ack' argument (in this case it justdo nothing if message already acked inside your handler), but the preferred way - use RMQ dead-letter exchange and bind it to queue using queue bind arguments
https://www.rabbitmq.com/dlx.html#using-optional-queue-arguments