Resources provide a way to manage dependencies to external APIs. Together with modes, they can be used to represent multiple different execution environments for a pipeline.
Name | Description |
---|---|
@resource | The decorator used to define resources. The decorated function is called a resource_fn . The decorator returns a ResourceDefinition . |
ResourceDefinition | Class for resource definitions. You almost never want to use initialize this class directly. Instead, you should use the @resource which returns a ResourceDefinition . |
ModeDefinition | Class used to define a pipeline mode. |
InitResourceContext | The context object provided to a resource during initialization. This object contains required resource, config, and other run information. |
build_init_resource_context | Function for building an InitResourceContext outside of execution, intended to be used when testing a resource. |
You can use resources to access features of the execution environments to solids during pipeline execution. You can use modes to bind a set of resources (and other environment information) to a pipeline so that those resources can be available to the solids within a pipeline. You can provide multiple modes to a pipeline, each with different resources, to represent the execution environments that your pipeline will be run within.
Representing external dependencies as resources, in conjunction with modes, have very convenient properties:
To define a resource, use the @resource
decorator. Wrap a function that takes an init_context
as the first parameter, which is an instance of InitResourceContext
. From this function, return or yield the object that you would like to be available as a resource.
class ExternalCerealFetcher:
def fetch_new_cereals(self, start_ts, end_ts):
pass
@resource
def cereal_fetcher(init_context):
return ExternalCerealFetcher()
Solids use resource keys to access resources, like so:
CREATE_TABLE_1_QUERY = "create table_1 as select * from table_0"
@solid(required_resource_keys={"database"})
def solid_requires_resources(context):
context.resources.database.execute_query(CREATE_TABLE_1_QUERY)
You can test the initialization of a resource by invoking the resource definition. This will run the underlying decorated function.
@resource
def my_resource(_):
return "foo"
def test_my_resource():
assert my_resource(None) == "foo"
If your resource requires other resources or config, then you can provide a InitResourceContext
object by using the build_init_resource_context
function.
@resource(required_resource_keys={"foo"}, config_schema={"bar": str})
def my_resource_requires_context(init_context):
return init_context.resources.foo, init_context.resource_config["bar"]
from dagster import build_init_resource_context
def test_my_resource_with_context():
init_context = build_init_resource_context(
resources={"foo": "foo_str"}, config={"bar": "bar_str"}
)
assert my_resource_requires_context(init_context) == ("foo_str", "bar_str")
If your resource is a context manager, then you can open it as one using python's with
syntax.
from contextlib import contextmanager
@resource
@contextmanager
def my_cm_resource(_):
yield "foo"
def test_cm_resource():
with my_cm_resource(None) as initialized_resource:
assert initialized_resource == "foo"
To define a mode, construct a ModeDefinition
. Each resource definition provided to the mode should be mapped to a unique key.
mode_def_ab = ModeDefinition(
"ab_mode",
resource_defs={
"a": resource_a,
"b": resource_b,
},
)
Modes can be provided to a pipeline via the mode_defs
argument on the @pipeline
decorator.
@pipeline(mode_defs=[mode_def_ab, mode_def_c])
def pipeline_with_mode():
basic_solid()
When executing a pipeline using execute_pipeline
, you can toggle between modes by providing the mode name to the mode
parameter.
execute_pipeline(pipeline_with_mode, mode="ab_mode")
When Launching the pipeline via the Dagit Playground, you can select a mode from the mode selector dropdown:
When launching a pipeline via the CLI, you can use the -d
option to specify the mode.
$ dagster pipeline execute -d prod_mode my_pipeline
ResourceDefinitions
can have a config schema, which allows you to customize behavior at runtime through pipeline configuration.For example, let's say we wanted to pass a connection string to our DatabaseConnection
resource.
class DatabaseConnection:
def __init__(self, connection: str):
self.connection = connection
@resource(config_schema={"connection": str})
def db_resource(init_context):
connection = init_context.resource_config["connection"]
return DatabaseConnection(connection)
Resources can depend upon other resources. Use the required_resource_keys
parameter of the @resource
decorator to specify which resources to depend upon. Access the required resources through the context object provided to the wrapped function.
@resource
def foo_resource(_):
return "foo"
@resource(required_resource_keys={"foo"})
def emit_foo(init_context):
return init_context.resources.foo
Note that the required keys provided must be provided to the same mode as the requiring resource, and that dependencies between resources cannot be cyclic.
ModeDefinition(resource_defs={"foo": foo_resource, "emit": emit_foo})