FastStream example

This example shows how to use Dependency Injector with FastStream.

The source code is available on the Github.

Despite FastStream uses FastDepends library for dependency injection, the integration between Dependency injector and FastStream has a small difference from already existing FastDepends example.

Since FastStream also leverages function signatures to determine input data types you have to use Depends() function with cast=False argument to make FastStream ignore your injected dependency argument in the function signature.

Example below shows how to inject Counter class into FastStream redis handler so that it will distinguish between message schema (User) and injected dependency (Counter) and use them both correctly.

Listing of consumer.py:

import asyncio
from typing import Annotated

from dependency_injector import containers, providers
from dependency_injector.wiring import Provide, inject
from faststream import Depends, FastStream
from faststream.redis import RedisBroker, RedisRouter
from pydantic import BaseModel


class Counter:
    def __init__(self):
        self.count = 0

    def next(self) -> int:
        self.count += 1
        return self.count


class Container(containers.DeclarativeContainer):
    counter = providers.Singleton(Counter)

    config = providers.Configuration()

    broker = providers.Singleton(RedisBroker, config.redis_url, logger=None)
    app = providers.Factory(FastStream, broker, logger=None)


class Message(BaseModel):
    user: str
    text: str


router = RedisRouter()


@router.subscriber("messages")
@inject
async def handle_user_message(
    message: Message,
    counter: Annotated[
        Counter,
        Depends(
            Provide[Container.counter],
            cast=False,  # <-- this is the key part
        ),
    ],
) -> None:
    count = counter.next()
    print(f"Message #{count} from {message.user}: '{message.text}'")


async def main() -> None:
    container = Container()
    container.wire(modules=[__name__])

    container.config.redis_url.from_env("REDIS_URL")

    broker = container.broker()
    broker.include_router(router)

    app = container.app()
    await app.run()


if __name__ == "__main__":
    asyncio.run(main())

Listing of producer.py:

import json
import time

from dependency_injector import containers, providers
from redis import Redis


class Container(containers.DeclarativeContainer):
    config = providers.Configuration()

    redis = providers.Singleton(Redis, config.redis_host, config.redis_port.as_int())


def main():
    container = Container()
    container.wire(modules=[__name__])

    container.config.redis_host.from_env("REDIS_HOST")
    container.config.redis_port.from_env("REDIS_PORT")

    redis = container.redis()

    for text in (
        "As you can see",
        "messages are counted correctly",
        "by the counter that is injected",
        "into faststream handler",
        "via awesome dependency_injector library.",
    ):
        time.sleep(2)

        message = {"user": "John", "text": text}
        redis.publish("messages", json.dumps(message))


if __name__ == "__main__":
    main()

Sources

Explore the sources on the Github.

Sponsor the project on GitHub: