Skip to content

0.5.0

Compare
Choose a tag to compare
@davorrunje davorrunje released this 15 Apr 09:12
· 225 commits to main since this release
7d757dc

What's Changed

This is the biggest change since the creation of FastStream. We have completely refactored the entire package, changing the object registration mechanism, message processing pipeline, and application lifecycle. However, you won't even notice it—we've preserved all public APIs from breaking changes. The only feature not compatible with the previous code is the new middleware.

New features:

  1. await FastStream.stop() method and StopApplication exception to stop a FastStream worker are added.

  2. broker.subscriber() and router.subscriber() functions now return a Subscriber object you can use later.

subscriber = broker.subscriber("test")

@subscriber(filter = lambda msg: msg.content_type == "application/json")
async def handler(msg: dict[str, Any]):
    ...
 
@subscriber()
async def handler(msg: dict[str, Any]):
    ...

This is the preferred syntax for filtering now (the old one will be removed in 0.6.0)

  1. The router.publisher() function now returns the correct Publisher object you can use later (after broker startup).
publisher = router.publisher("test")

@router.subscriber("in")
async def handler():
    await publisher.publish("msg")

(Until 0.5.0 you could use it in this way with broker.publisher only)

  1. A list of middlewares can be passed to a broker.publisher as well:
broker = Broker(..., middlewares=())

@broker.subscriber(..., middlewares=())
@broker.publisher(..., middlewares=())  # new feature
async def handler():
    ...
  1. Broker-level middlewares now affect all ways to publish a message, so you can encode application outgoing messages here.

  2. ⚠️ BREAKING CHANGE ⚠️ : both subscriber and publisher middlewares should be async context manager type

async def subscriber_middleware(call_next, msg):
    return await call_next(msg)

async def publisher_middleware(call_next, msg, **kwargs):
    return await call_next(msg, **kwargs)

@broker.subscriber(
    "in",
    middlewares=(subscriber_middleware,),
)
@broker.publisher(
    "out",
    middlewares=(publisher_middleware,),
)
async def handler(msg):
    return msg

Such changes allow you two previously unavailable features:

  • suppress any exceptions and pass fall-back message body to publishers, and
  • patch any outgoing message headers and other parameters.

Without those features we could not implement Observability Middleware or any similar tool, so it is the job that just had to be done.
7. A better FastAPI compatibility: fastapi.BackgroundTasks and response_class subscriber option are supported.

  1. All .pyi files are removed, and explicit docstrings and methods options are added.

  2. New subscribers can be registered in runtime (with an already-started broker):

subscriber = broker.subscriber("dynamic")
subscriber(handler_method)
...
broker.setup_subscriber(subscriber)
await subscriber.start()
...
await subscriber.close()
  1. faststream[docs] distribution is removed.

New Contributors

Full Changelog: 0.4.7...0.5.0