DagsterDocs

Run Configuration#

Pipeline run configuration allows providing parameters to pipelines at the time they're executed.

Relevant APIs#

NameDescription
ConfigSchemaSee details with code examples in the API documentation.

Overview#

It's often useful to configure pipelines at run time. For example, you might want someone to manually operate a deployed pipeline and choose what dataset it operates on when they run it. In general, you should use Dagster's config system when you want the person or software that is executing a pipeline to be able to make choices about what the pipeline does, without needing to modify the pipeline definition.

The objects that compose a pipeline - solids and resources - are each individually configurable. When executing a pipeline, you can supply "run configuration" that specifies the configuration for each of the objects in the pipeline. When you execute a pipeline with the Python API, you supply run configuration as a Python dictionary. When you execute a pipeline from Dagit or the CLI, you can provide config in a YAML document.

A common use of configuration is for a schedule or sensor to provide configuration to the pipeline run it is launching. For example, a daily schedule might provide the day it's running on to one of the solids as a config value, and that solid might use that config value to decide what day's data to read.

Dagster includes a system for gradually-typed configuration schemas. These make it easy to catch configuration errors before pipeline execution, as well as to learn what configuration is required to execute a pipeline.

Using Configuration Inside a Solid#

This example shows how to write a solid whose behavior is based on values that are passed in via configuration:

from dagster import pipeline, solid


@solid
def config_example_solid(context):
    for _ in range(context.solid_config["iterations"]):
        context.log.info("hello")


@pipeline
def config_example_pipeline():
    config_example_solid()

Providing Run Configuration#

How you specify config values depends on how you're running your pipeline:

Python API#

When executing a pipeline with execute_pipeline, you can specify the config values through run_config argument:

from dagster import execute_pipeline

    execute_pipeline(
        config_example_pipeline,
        run_config={"solids": {"config_example_solid": {"config": {"iterations": 1}}}},
    )

Dagster CLI#

When executing a pipeline from the command line, the easiest way to provide config is to put it into a YAML file, like:

solids:
  config_example_solid:
    config:
      iterations: 1

When you invoke dagster pipeline execute, you can point to that YAML file using the --config option:

dagster pipeline execute --config my_config.yaml

Dagit#

When executing a pipeline from Dagit's Playground, you can supply config as YAML using the config editor:

Config in Dagit

Config Schema#

Dagster includes a system for gradually-typed configuration schemas. For example, you can specify that a particular solid accepts configuration for a particular set of keys, and that values provided for a particular key must be integers. Before executing a pipeline, Dagster will compare the provided run configuration to the config schema for the objects in the pipeline and fail early if they don't match.

Configuration schema helps:

  • Catch configuration errors before pipeline execution.
  • Make deployed pipelines self documenting, so that it's easy to learn what configuration is required to launch them.

The full range of config types and ways to specify config schema are documented in the API Reference with examples.

The most common objects to specify ConfigSchema for are SolidDefinition and ResourceDefinition (see example code in Configuring a Resource).

Here's an example of a solid that defines a config schema:

from dagster import pipeline, solid


@solid(config_schema={"iterations": int})
def configurable_solid_with_schema(context):
    for _ in range(context.solid_config["iterations"]):
        context.log.info(context.solid_config["word"])


@pipeline
def configurable_pipeline_with_schema():
    configurable_solid_with_schema()

Dagster validates the run_config against the config_schema. If the values violate the schema, it will fail at execution time. For example, the following will raise a DagsterInvalidConfigError:

from dagster import execute_pipeline

    execute_pipeline(
        configurable_pipeline_with_schema,
        run_config={
            "solids": {
                "configurable_solid_with_schema": {"config": {"nonexistent_config_value": 1}}
            }
        },
    )

The config editor in Dagit the page comes with typeaheads, schema validation, and schema documentation. You can also click the "Scaffold Missing Config" button to generate dummy values based on the config schema.

Examples#

Configuring a Resource#

You can also configure a ResourceDefinition:

@resource(config_schema={"region": str, "use_unsigned_session": bool})
def s3_session(_init_context):
    """Connect to S3"""

And specify the configurations at runtime via a run config like:

resources:
  key:
    config:
      region: us-east-1
      use_unsigned_session: False

Passing Configuration to Multiple Solids in a Pipeline#

If you want multiple solids to share values, You can use make_values_resource to pass the values via a resource and reference that resource from any solid that wants to use it.

It defaults to Any type, meaning Dagster will accept any config value provided for the resource:

from dagster import ModeDefinition, execute_pipeline, make_values_resource, pipeline, solid


@solid(required_resource_keys={"value"})
def solid1(context):
    context.log.info(f"value: {context.resources.value}")


@solid(required_resource_keys={"value"})
def solid2(context):
    context.log.info(f"value: {context.resources.value}")


@pipeline(mode_defs=[ModeDefinition(resource_defs={"value": make_values_resource()})])
def my_pipeline():
    solid1()
    solid2()


execute_pipeline(my_pipeline, run_config={"resources": {"value": {"config": "some_value"}}})

You can also specify the schema of the values like:

from dagster import ModeDefinition, execute_pipeline, make_values_resource, pipeline, solid


@solid(required_resource_keys={"values"})
def solid1(context):
    context.log.info(f"my str: {context.resources.values['my_str']}")


@solid(required_resource_keys={"values"})
def solid2(context):
    context.log.info(f"my int: {context.resources.values['my_int']}")


@pipeline(
    mode_defs=[
        ModeDefinition(resource_defs={"values": make_values_resource(my_str=str, my_int=int)})
    ]
)
def my_pipeline():
    solid1()
    solid2()


execute_pipeline(
    my_pipeline, run_config={"resources": {"values": {"config": {"my_str": "foo", "my_int": 1}}}}
)

And pass the values via a run config like:

resources:
  values:
    config:
      my_str: foo
      my_int: 1