DagsterDocs

Modes and Resources#

Resources provide a way to manage dependencies to external APIs. Together with modes, they can be used to represent multiple different execution environments for a pipeline.

Relevant APIs#

NameDescription
@resourceThe decorator used to define resources. The decorated function is called a resource_fn. The decorator returns a ResourceDefinition.
ResourceDefinitionClass for resource definitions. You almost never want to use initialize this class directly. Instead, you should use the @resource which returns a ResourceDefinition.
ModeDefinitionClass used to define a pipeline mode.
InitResourceContextThe context object provided to a resource during initialization. This object contains required resource, config, and other run information.
build_init_resource_contextFunction for building an InitResourceContext outside of execution, intended to be used when testing a resource.

Overview#

You can use resources to access features of the execution environments to solids during pipeline execution. You can use modes to bind a set of resources (and other environment information) to a pipeline so that those resources can be available to the solids within a pipeline. You can provide multiple modes to a pipeline, each with different resources, to represent the execution environments that your pipeline will be run within.

Why Use Resources and Modes#

Representing external dependencies as resources, in conjunction with modes, have very convenient properties:

  • Pluggable: You can map a resource to a key in one mode, and then map a different resource to that same key in a different mode. This is useful if there is a heavy external dependency that you want to use in production, but avoid using it in testing. You can simply provide different modes for each execution case: one for production with the heavy dependency (e.g., AWS) as a resource, and one for testing with something lighter (i.e., in-memory store) mapped to the same key. For more information about this capability, check out Separating Business Logic from Environments.
  • Pipeline Scoped: Since resources are pipeline scoped, if you provide a resource to a mode, then it becomes available for use with every solid in that pipeline.
  • Configurable: Resources can be configured, using a strongly typed configuration system.
  • Dependencies: Resources can depend on other resources. This makes it possible to cleanly represent external environment objects that rely on other external environment information for initialization.

Defining a Resource#

To define a resource, use the @resource decorator. Wrap a function that takes an init_context as the first parameter, which is an instance of InitResourceContext. From this function, return or yield the object that you would like to be available as a resource.

class ExternalCerealFetcher:
    def fetch_new_cereals(self, start_ts, end_ts):
        pass


@resource
def cereal_fetcher(init_context):
    return ExternalCerealFetcher()

Accessing Resources in Solids#

Solids use resource keys to access resources, like so:

CREATE_TABLE_1_QUERY = "create table_1 as select * from table_0"


@solid(required_resource_keys={"database"})
def solid_requires_resources(context):
    context.resources.database.execute_query(CREATE_TABLE_1_QUERY)

(Experimental) Testing Resource Initialization#

You can test the initialization of a resource by invoking the resource definition. This will run the underlying decorated function.

@resource
def my_resource(_):
    return "foo"


def test_my_resource():
    assert my_resource(None) == "foo"

If your resource requires other resources or config, then you can provide a InitResourceContext object by using the build_init_resource_context function.

@resource(required_resource_keys={"foo"}, config_schema={"bar": str})
def my_resource_requires_context(init_context):
    return init_context.resources.foo, init_context.resource_config["bar"]


from dagster import build_init_resource_context


def test_my_resource_with_context():
    init_context = build_init_resource_context(
        resources={"foo": "foo_str"}, config={"bar": "bar_str"}
    )
    assert my_resource_requires_context(init_context) == ("foo_str", "bar_str")

If your resource is a context manager, then you can open it as one using python's with syntax.

from contextlib import contextmanager


@resource
@contextmanager
def my_cm_resource(_):
    yield "foo"


def test_cm_resource():
    with my_cm_resource(None) as initialized_resource:
        assert initialized_resource == "foo"

Defining a Mode#

To define a mode, construct a ModeDefinition. Each resource definition provided to the mode should be mapped to a unique key.

mode_def_ab = ModeDefinition(
    "ab_mode",
    resource_defs={
        "a": resource_a,
        "b": resource_b,
    },
)

Providing Modes to a Pipeline#

Modes can be provided to a pipeline via the mode_defs argument on the @pipeline decorator.

@pipeline(mode_defs=[mode_def_ab, mode_def_c])
def pipeline_with_mode():
    basic_solid()

Selecting a Mode during Execution#

Python API#

When executing a pipeline using execute_pipeline, you can toggle between modes by providing the mode name to the mode parameter.

execute_pipeline(pipeline_with_mode, mode="ab_mode")

In Dagit#

When Launching the pipeline via the Dagit Playground, you can select a mode from the mode selector dropdown:

Modes in Dagit

Dagster CLI#

When launching a pipeline via the CLI, you can use the -d option to specify the mode.

$ dagster pipeline execute -d prod_mode my_pipeline

Examples#

Resource Configuration#

ResourceDefinitions can have a config schema, which allows you to customize behavior at runtime through pipeline configuration.

For example, let's say we wanted to pass a connection string to our DatabaseConnection resource.

class DatabaseConnection:
    def __init__(self, connection: str):
        self.connection = connection


@resource(config_schema={"connection": str})
def db_resource(init_context):
    connection = init_context.resource_config["connection"]
    return DatabaseConnection(connection)

Resource to Resource Dependencies#

Resources can depend upon other resources. Use the required_resource_keys parameter of the @resource decorator to specify which resources to depend upon. Access the required resources through the context object provided to the wrapped function.

@resource
def foo_resource(_):
    return "foo"


@resource(required_resource_keys={"foo"})
def emit_foo(init_context):
    return init_context.resources.foo

Note that the required keys provided must be provided to the same mode as the requiring resource, and that dependencies between resources cannot be cyclic.

ModeDefinition(resource_defs={"foo": foo_resource, "emit": emit_foo})