Resource provider¶
Resource
provider provides a component with initialization and shutdown.
import sys
import logging
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
from dependency_injector import containers, providers
@contextmanager
def init_thread_pool(max_workers: int):
thread_pool = ThreadPoolExecutor(max_workers=max_workers)
yield thread_pool
thread_pool.shutdown(wait=True)
class Container(containers.DeclarativeContainer):
config = providers.Configuration()
thread_pool = providers.Resource(
init_thread_pool,
max_workers=config.max_workers,
)
logging = providers.Resource(
logging.basicConfig,
level=logging.INFO,
stream=sys.stdout,
)
if __name__ == "__main__":
container = Container(config={"max_workers": 4})
container.init_resources()
logging.info("Resources are initialized")
thread_pool = container.thread_pool()
thread_pool.map(print, range(10))
container.shutdown_resources()
Resource providers help to initialize and configure logging, event loop, thread or process pool, etc.
Resource provider is similar to Singleton
. Resource initialization happens only once.
You can make injections and use provided instance the same way like you do with any other provider.
class Container(containers.DeclarativeContainer):
config = providers.Configuration()
thread_pool = providers.Resource(
init_thread_pool,
max_workers=config.max_workers,
)
dispatcher = providers.Factory(
TaskDispatcher,
executor=thread_pool,
)
Container has an interface to initialize and shutdown all resources at once:
container = Container()
container.init_resources()
container.shutdown_resources()
You can also initialize and shutdown resources one-by-one using init()
and
shutdown()
methods of the provider:
container = Container()
container.thread_pool.init()
container.thread_pool.shutdown()
When you call .shutdown()
method on a resource provider, it will remove the reference to the initialized resource,
if any, and switch to uninitialized state. Some of resource initializer types support specifying custom
resource shutdown.
Resource provider supports 4 types of initializers:
Function
Context Manager
Generator (legacy)
Subclass of
resources.Resource
(legacy)
Function initializer¶
Function is the most common way to specify resource initialization:
def init_resource(argument1=..., argument2=...):
return SomeResource()
class Container(containers.DeclarativeContainer):
resource = providers.Resource(
init_resource,
argument1=...,
argument2=...,
)
Function initializer may not return a value. This often happens when you configure global resource:
import logging.config
class Container(containers.DeclarativeContainer):
configure_logging = providers.Resource(
logging.config.fileConfig,
fname="logging.ini",
)
Function initializer does not provide a way to specify custom resource shutdown.
Context Manager initializer¶
This is an extension to the Function initializer. Resource provider automatically detects if the initializer returns a context manager and uses it to manage the resource lifecycle.
from dependency_injector import containers, providers
class DatabaseConnection:
def __init__(self, host, port, user, password):
self.host = host
self.port = port
self.user = user
self.password = password
def __enter__(self):
print(f"Connecting to {self.host}:{self.port} as {self.user}")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
print("Closing connection")
class Container(containers.DeclarativeContainer):
config = providers.Configuration()
db = providers.Resource(
DatabaseConnection,
host=config.db.host,
port=config.db.port,
user=config.db.user,
password=config.db.password,
)
Generator initializer (legacy)¶
Resource provider can use 2-step generators:
First step of generator is an initialization phase
The second is step is a shutdown phase
def init_resource(argument1=..., argument2=...):
resource = SomeResource() # initialization
yield resource
# shutdown
...
class Container(containers.DeclarativeContainer):
resource = providers.Resource(
init_resource,
argument1=...,
argument2=...,
)
Generator initialization phase ends on the first yield
statement. You can return a
resource object using yield resource
like in the example above. Returning of the
object is not mandatory. You can leave yield
statement empty:
def init_resource(argument1=..., argument2=...):
# initialization
...
yield
# shutdown
...
class Container(containers.DeclarativeContainer):
resource = providers.Resource(
init_resource,
argument1=...,
argument2=...,
)
Note
Generator initializers are automatically wrapped with contextmanager
or asynccontextmanager
decorator when
provided to a Resource
provider.
Subclass initializer (legacy)¶
You can create resource initializer by implementing a subclass of the resources.Resource
:
from dependency_injector import resources
class MyResource(resources.Resource):
def init(self, argument1=..., argument2=...) -> SomeResource:
return SomeResource()
def shutdown(self, resource: SomeResource) -> None:
# shutdown
...
class Container(containers.DeclarativeContainer):
resource = providers.Resource(
MyResource,
argument1=...,
argument2=...,
)
Subclass must implement two methods: init()
and shutdown()
.
Method init()
receives arguments specified in resource provider.
It performs initialization and returns resource object. Returning of the object
is not mandatory.
Method shutdown()
receives resource object returned from init()
. If init()
didn’t return an object shutdown()
method will be called anyway with None
as a
first argument.
from dependency_injector import resources
class MyResource(resources.Resource):
def init(self, argument1=..., argument2=...) -> None:
# initialization
...
def shutdown(self, _: None) -> None:
# shutdown
...
Scoping Resources using specialized subclasses¶
You can use specialized subclasses of Resource
provider to initialize and shutdown resources by type.
Allowing for example to only initialize a subgroup of resources.
class ScopedResource(resources.Resource):
pass
def init_service(name) -> Service:
print(f"Init {name}")
yield Service()
print(f"Shutdown {name}")
class Container(containers.DeclarativeContainer):
scoped = ScopedResource(
init_service,
"scoped",
)
generic = providers.Resource(
init_service,
"generic",
)
To initialize resources by type you can use init_resources(resource_type)
and shutdown_resources(resource_type)
methods adding the resource type as an argument:
def main():
container = Container()
container.init_resources(ScopedResource)
# Generates:
# >>> Init scoped
container.shutdown_resources(ScopedResource)
# Generates:
# >>> Shutdown scoped
And to initialize all resources you can use init_resources()
and shutdown_resources()
without arguments:
def main():
container = Container()
container.init_resources()
# Generates:
# >>> Init scoped
# >>> Init generic
container.shutdown_resources()
# Generates:
# >>> Shutdown scoped
# >>> Shutdown generic
It works using the traverse()
method to find all resources of the specified type, selecting all resources
which are instances of the specified type.
Resources, wiring, and per-function execution scope¶
You can compound Resource
provider with Wiring to implement per-function
execution scope. For doing this you need to use additional Closing
marker from
wiring
module.
from dependency_injector import containers, providers
from dependency_injector.wiring import Closing, Provide, inject
from flask import Flask, current_app
class Service:
...
def init_service() -> Service:
print("Init service")
yield Service()
print("Shutdown service")
class Container(containers.DeclarativeContainer):
service = providers.Resource(init_service)
@inject
def index_view(service: Service = Closing[Provide[Container.service]]):
assert service is current_app.container.service()
return "Hello World!"
container = Container()
container.wire(modules=[__name__])
app = Flask(__name__)
app.container = container
app.add_url_rule("/", "index", view_func=index_view)
if __name__ == "__main__":
app.run()
Framework initializes and injects the resource into the function. With the Closing
marker
framework calls resource shutdown()
method when function execution is over.
The example above produces next output:
Init service
Shutdown service
127.0.0.1 - - [29/Oct/2020 22:39:40] "GET / HTTP/1.1" 200 -
Init service
Shutdown service
127.0.0.1 - - [29/Oct/2020 22:39:41] "GET / HTTP/1.1" 200 -
Init service
Shutdown service
127.0.0.1 - - [29/Oct/2020 22:39:41] "GET / HTTP/1.1" 200 -
Asynchronous initializers¶
When you write an asynchronous application, you might need to initialize resources asynchronously. Resource provider supports asynchronous initialization and shutdown.
Asynchronous function initializer:
async def init_async_resource(argument1=..., argument2=...):
return await connect()
class Container(containers.DeclarativeContainer):
resource = providers.Resource(
init_resource,
argument1=...,
argument2=...,
)
Asynchronous Context Manager initializer:
@asynccontextmanager
async def init_async_resource(argument1=..., argument2=...):
connection = await connect()
yield connection
await connection.close()
class Container(containers.DeclarativeContainer):
resource = providers.Resource(
init_async_resource,
argument1=...,
argument2=...,
)
Asynchronous subclass initializer:
from dependency_injector import resources
class AsyncConnection(resources.AsyncResource):
async def init(self, argument1=..., argument2=...):
yield await connect()
async def shutdown(self, connection):
await connection.close()
class Container(containers.DeclarativeContainer):
resource = providers.Resource(
AsyncConnection,
argument1=...,
argument2=...,
)
When you use resource provider with asynchronous initializer you need to call its __call__()
,
init()
, and shutdown()
methods asynchronously:
import asyncio
class Container(containers.DeclarativeContainer):
connection = providers.Resource(init_async_connection)
async def main():
container = Container()
connection = await container.connection()
connection = await container.connection.init()
connection = await container.connection.shutdown()
if __name__ == "__main__":
asyncio.run(main())
Container init_resources()
and shutdown_resources()
methods should be used asynchronously if there is
at least one asynchronous resource provider:
import asyncio
class Container(containers.DeclarativeContainer):
connection1 = providers.Resource(init_async_connection)
connection2 = providers.Resource(init_sync_connection)
async def main():
container = Container()
await container.init_resources()
await container.shutdown_resources()
if __name__ == "__main__":
asyncio.run(main())
See also:
Provider Asynchronous injections
Wiring Asynchronous injections
ASGI Lifespan Protocol Support¶
The dependency_injector.ext.starlette
module provides a Lifespan
class that integrates resource providers with ASGI applications using the Lifespan Protocol. This allows resources to
be automatically initialized at application startup and properly shut down when the application stops.
from contextlib import asynccontextmanager
from dependency_injector import containers, providers
from dependency_injector.wiring import Provide, inject
from dependency_injector.ext.starlette import Lifespan
from fastapi import FastAPI, Request, Depends, APIRouter
class Connection: ...
@asynccontextmanager
async def init_database():
print("opening database connection")
yield Connection()
print("closing database connection")
router = APIRouter()
@router.get("/")
@inject
async def index(request: Request, db: Connection = Depends(Provide["db"])):
# use the database connection here
return "OK!"
class Container(containers.DeclarativeContainer):
__self__ = providers.Self()
db = providers.Resource(init_database)
lifespan = providers.Singleton(Lifespan, __self__)
app = providers.Singleton(FastAPI, lifespan=lifespan)
_include_router = providers.Resource(
app.provided.include_router.call(),
router,
)
if __name__ == "__main__":
import uvicorn
container = Container()
app = container.app()
uvicorn.run(app, host="localhost", port=8000)