Resource provider

Resource provider provides a component with initialization and shutdown.

import sys
import logging
from concurrent.futures import ThreadPoolExecutor

from dependency_injector import containers, providers


def init_thread_pool(max_workers: int):
    thread_pool = ThreadPoolExecutor(max_workers=max_workers)
    yield thread_pool
    thread_pool.shutdown(wait=True)


class Container(containers.DeclarativeContainer):

    config = providers.Configuration()

    thread_pool = providers.Resource(
        init_thread_pool,
        max_workers=config.max_workers,
    )

    logging = providers.Resource(
        logging.basicConfig,
        level=logging.INFO,
        stream=sys.stdout,
    )


if __name__ == "__main__":
    container = Container(config={"max_workers": 4})

    container.init_resources()

    logging.info("Resources are initialized")
    thread_pool = container.thread_pool()
    thread_pool.map(print, range(10))

    container.shutdown_resources()

Resource providers help to initialize and configure logging, event loop, thread or process pool, etc.

Resource provider is similar to Singleton. Resource initialization happens only once. You can make injections and use provided instance the same way like you do with any other provider.

class Container(containers.DeclarativeContainer):

    config = providers.Configuration()

    thread_pool = providers.Resource(
        init_thread_pool,
        max_workers=config.max_workers,
    )

    dispatcher = providers.Factory(
        TaskDispatcher,
        executor=thread_pool,
    )

Container has an interface to initialize and shutdown all resources at once:

container = Container()
container.init_resources()
container.shutdown_resources()

You can also initialize and shutdown resources one-by-one using init() and shutdown() methods of the provider:

container = Container()
container.thread_pool.init()
container.thread_pool.shutdown()

When you call .shutdown() method on a resource provider, it will remove the reference to the initialized resource, if any, and switch to uninitialized state. Some of resource initializer types support specifying custom resource shutdown.

Resource provider supports 3 types of initializers:

  • Function

  • Generator

  • Subclass of resources.Resource

Function initializer

Function is the most common way to specify resource initialization:

def init_resource(argument1=..., argument2=...):
    return SomeResource()


class Container(containers.DeclarativeContainer):

    resource = providers.Resource(
        init_resource,
        argument1=...,
        argument2=...,
    )

Function initializer may not return a value. This often happens when you configure global resource:

import logging.config


class Container(containers.DeclarativeContainer):

    configure_logging = providers.Resource(
        logging.config.fileConfig,
        fname="logging.ini",
    )

Function initializer does not provide a way to specify custom resource shutdown.

Generator initializer

Resource provider can use 2-step generators:

  • First step of generator is an initialization phase

  • The second is step is a shutdown phase

def init_resource(argument1=..., argument2=...):
    resource = SomeResource()  # initialization

    yield resource

    # shutdown
    ...


class Container(containers.DeclarativeContainer):

    resource = providers.Resource(
        init_resource,
        argument1=...,
        argument2=...,
    )

Generator initialization phase ends on the first yield statement. You can return a resource object using yield resource like in the example above. Returning of the object is not mandatory. You can leave yield statement empty:

def init_resource(argument1=..., argument2=...):
    # initialization
    ...

    yield

    # shutdown
    ...


class Container(containers.DeclarativeContainer):

    resource = providers.Resource(
        init_resource,
        argument1=...,
        argument2=...,
    )

Subclass initializer

You can create resource initializer by implementing a subclass of the resources.Resource:

from dependency_injector import resources


class MyResource(resources.Resource):

    def init(self, argument1=..., argument2=...) -> SomeResource:
        return SomeResource()

    def shutdown(self, resource: SomeResource) -> None:
        # shutdown
        ...


class Container(containers.DeclarativeContainer):

    resource = providers.Resource(
        MyResource,
        argument1=...,
        argument2=...,
    )

Subclass must implement two methods: init() and shutdown().

Method init() receives arguments specified in resource provider. It performs initialization and returns resource object. Returning of the object is not mandatory.

Method shutdown() receives resource object returned from init(). If init() didn’t return an object shutdown() method will be called anyway with None as a first argument.

from dependency_injector import resources


class MyResource(resources.Resource):

    def init(self, argument1=..., argument2=...) -> None:
        # initialization
        ...

    def shutdown(self, _: None) -> None:
        # shutdown
        ...

Resources, wiring, and per-function execution scope

You can compound Resource provider with Wiring to implement per-function execution scope. For doing this you need to use additional Closing marker from wiring module.

from dependency_injector import containers, providers
from dependency_injector.wiring import Closing, Provide, inject
from flask import Flask, current_app


class Service:
    ...


def init_service() -> Service:
    print("Init service")
    yield Service()
    print("Shutdown service")


class Container(containers.DeclarativeContainer):

    service = providers.Resource(init_service)


@inject
def index_view(service: Service = Closing[Provide[Container.service]]):
    assert service is current_app.container.service()
    return "Hello  World!"


container = Container()
container.wire(modules=[__name__])

app = Flask(__name__)
app.container = container
app.add_url_rule("/", "index", view_func=index_view)


if __name__ == "__main__":
    app.run()

Framework initializes and injects the resource into the function. With the Closing marker framework calls resource shutdown() method when function execution is over.

The example above produces next output:

Init service
Shutdown service
127.0.0.1 - - [29/Oct/2020 22:39:40] "GET / HTTP/1.1" 200 -
Init service
Shutdown service
127.0.0.1 - - [29/Oct/2020 22:39:41] "GET / HTTP/1.1" 200 -
Init service
Shutdown service
127.0.0.1 - - [29/Oct/2020 22:39:41] "GET / HTTP/1.1" 200 -

Asynchronous initializers

When you write an asynchronous application, you might need to initialize resources asynchronously. Resource provider supports asynchronous initialization and shutdown.

Asynchronous function initializer:

async def init_async_resource(argument1=..., argument2=...):
    return await connect()


class Container(containers.DeclarativeContainer):

    resource = providers.Resource(
        init_resource,
        argument1=...,
        argument2=...,
    )

Asynchronous generator initializer:

async def init_async_resource(argument1=..., argument2=...):
    connection = await connect()
    yield connection
    await connection.close()


class Container(containers.DeclarativeContainer):

    resource = providers.Resource(
        init_async_resource,
        argument1=...,
        argument2=...,
    )

Asynchronous subclass initializer:

from dependency_injector import resources


class AsyncConnection(resources.AsyncResource):

    async def init(self, argument1=..., argument2=...):
        yield await connect()

    async def shutdown(self, connection):
        await connection.close()


class Container(containers.DeclarativeContainer):

    resource = providers.Resource(
        AsyncConnection,
        argument1=...,
        argument2=...,
    )

When you use resource provider with asynchronous initializer you need to call its __call__(), init(), and shutdown() methods asynchronously:

import asyncio


class Container(containers.DeclarativeContainer):

    connection = providers.Resource(init_async_connection)


async def main():
    container = Container()
    connection = await container.connection()
    connection = await container.connection.init()
    connection = await container.connection.shutdown()


if __name__ == "__main__":
    asyncio.run(main())

Container init_resources() and shutdown_resources() methods should be used asynchronously if there is at least one asynchronous resource provider:

import asyncio


class Container(containers.DeclarativeContainer):

    connection1 = providers.Resource(init_async_connection)

    connection2 = providers.Resource(init_sync_connection)


async def main():
    container = Container()
    await container.init_resources()
    await container.shutdown_resources()


if __name__ == "__main__":
    asyncio.run(main())

See also: