DagsterDocs

Using dbt with Dagster#

This guide explains how you can run a dbt project in your Dagster pipelines.

What is dbt?#

dbt (data build tool) helps engineers transform data in their warehouses by simply writing SELECT statements. dbt automatically builds a dependency graph for your transformations and turns these SELECT statements into tables and views in your data warehouse.

dbt not only runs your data transformations, but also can create data quality tests and generate documentation for your data, right out of the box. To learn more about dbt, visit the official dbt documentation website.

How does dbt work with Dagster?#

Dagster orchestrates dbt alongside other technologies, so you can combine dbt with Spark, Python, etc. in a single pipeline. Dagster also provides built-in operational and data observability capabilities, like storing dbt run results longitudinally and sending alerts when a dbt run fails.

dagster-dbt is an integration library that provides pre-built solids and resources for using dbt together with Dagster. The solids in the library are designed to be configurable for any dbt project.

The solids in dagster-dbt following the following naming convention:

Use the dbt CLI in a Dagster pipeline#

dagster-dbt provides solids for running commands through the dbt CLI. By convention, these solids are named dagster_dbt.dbt_cli_*.

To run the dbt CLI, your dbt project directory must be on your local filesystem and you must have a dbt profile already set up to connect to your data warehouse. Visit the official dbt CLI documentation for more details.

Here are some examples of how to invoke dbt run with the solid dagster_dbt.dbt_cli_run. Other dbt commands can be invoked via the CLI with their respective solid dagster_dbt.dbt_cli_*.

Example: The solid dbt_cli_run is configured to run your entire dbt project.

from dagster import pipeline
from dagster_dbt import dbt_cli_run

config = {"project-dir": "path/to/dbt/project"}
run_all_models = dbt_cli_run.configured(config, name="run_dbt_project")

@pipeline
def my_dbt_pipeline():
    run_all_models()

Example: The solid dbt_cli_run is configured to run specific models in your dbt project. This is similar to invoking dbt run --models tag:staging.

from dagster import pipeline
from dagster_dbt import dbt_cli_run

config = {"project-dir": "path/to/dbt/project", "models": ["tag:staging"]}
run_staging_models = dbt_cli_run.configured(config, name="run_staging_models")

@pipeline
def my_dbt_pipeline():
    run_staging_models()

In the code snippet above, the config "models" takes a list of strings. The string "tag:staging" uses dbt's node selection syntax to filter models with the tag "staging". For more details, visit the official dbt documentation on the node selection syntax.

Example: Similar to the above example, the solid dbt_cli_run is specified to run after another solid has executed. In your pipeline definition, you can assign the output from an upstream solid to the start_after argument of your dbt solid, which will model that dependency relationship for you. See the documentation for Order Based Dependencies for more info.

from dagster import pipeline, solid
from dagster_dbt import dbt_cli_run

config = {"project-dir": "path/to/dbt/project", "models": ["tag:staging"]}
run_staging_models = dbt_cli_run.configured(config, name="run_staging_models")

@solid
def do_something(context):
    # solid logic here
    context.log.info("Executing necessary logic before dbt run")

@pipeline
def my_dbt_pipeline():
    run_staging_models(start_after=do_something())

Example pipeline project with dbt CLI solids#

You can find more examples under the dbt_examples directory in the dagster repo on GitHub.

Use the dbt RPC Server in a Dagster pipeline#

dagster-dbt provides solids for running commands through the dbt RPC server. By convention, these solids are named dagster_dbt.dbt_rpc_*.

Your dbt RPC server can be running locally or remotely. To use the dbt RPC solids in your Dagster pipeline, you will need to create a resource for your dbt RPC server. To learn more about Dagster resources, visit the Resources Overview.

dagster_dbt.dbt_rpc_resource can be configured with your specific host and port.

from dagster_dbt import dbt_rpc_resource

my_remote_rpc = dbt_rpc_resource.configured({"host": "80.80.80.80", "port": 8080})

For convenience during local development, you may also use dagster_dbt.local_dbt_rpc_resource, which is preconfigured for a dbt RPC server that is running on http://localhost:8580.

Here are some examples of how to send dbt commands to a dbt RPC server with a solid.

Example: The solid dbt_rpc_run will send a request to run your entire dbt project when you don't use any solid configuration.

from dagster import ModeDefinition, pipeline
from dagster_dbt import dbt_rpc_run

@pipeline(mode_defs=[ModeDefinition(resource_defs={"dbt_rpc": my_remote_rpc})])
def my_dbt_pipeline():
    dbt_rpc_run()

The code snippet above shows a Dagster pipeline with a single solid dbt_rpc_run. The solid dbt_rpc_run has a required resource key "dbt_rpc". So, any pipeline that uses dbt_rpc_run will need a ModeDefinition that defines a resource under the key "dbt_rpc".

Example: The solid dbt_rpc_run is configured to run specific models in a dbt project. This is similar to having "params": {"models": "tag:staging"} in your dbt RPC request body.

from dagster import ModeDefinition, pipeline
from dagster_dbt import dbt_rpc_run

run_staging_models = dbt_rpc_run.configured(
    {"models": ["tag:staging"]},
    name="run_staging_models",
)

@pipeline(mode_defs=[ModeDefinition(resource_defs={"dbt_rpc": my_remote_rpc})])
def my_dbt_pipeline():
    run_staging_models()

Note that the solid above will NOT wait until the dbt RPC server has finished executing your request. Instead, it will return immediately with a request token from the dbt RPC server. If you want the solid to wait until execution is finished, see the dagster_dbt.dbt_rpc_run_and_wait.

Example: The solid dbt_rpc_run_and_wait will send a request to run specific models in a dbt project and then poll the dbt RPC server until it has finished executing your request.

from dagster import ModeDefinition, pipeline
from dagster_dbt import dbt_rpc_run_and_wait

@pipeline(mode_defs=[ModeDefinition(resource_defs={"dbt_rpc": my_remote_rpc})])
def my_dbt_pipeline():
    dbt_rpc_run_and_wait()

Use dbt Cloud in a Dagster pipeline#

dagster_dbt currently does not provide solids for invoking dbt commands via dbt Cloud. However, this use case is possible by writing your own solid to create and start Jobs via the dbt Cloud API. For more details about each HTTP endpoint, visit the official documentation for the dbt Cloud API.

Advanced Configuration#

For full documentation on all available config, visit the API docs for dagster-dbt.

dbt CLI: Set the dbt profile and target to load

config = {"profile": PROFILE_NAME, "target": TARGET_NAME}

from dagster_dbt import dbt_cli_run

custom_solid = dbt_cli_run.configured(config, name="custom_solid")

dbt CLI: Set the path to the dbt executable

config = {"dbt_executable": "path/to/dbt/executable"}

from dagster_dbt import dbt_cli_run

custom_solid = dbt_cli_run.configured(config, name="custom_solid")

dbt CLI: Select specific models to run

config = {"models": ["my_dbt_model+", "path.to.models", "tag:nightly"]}

from dagster_dbt import dbt_cli_run

custom_solid = dbt_cli_run.configured(config, name="custom_solid")

For more details, visit the official documentation on dbt's node selection syntax.

dbt CLI: Exclude specific models

config = {"exclude": ["my_dbt_model+", "path.to.models", "tag:nightly"]}

from dagster_dbt import dbt_cli_run

custom_solid = dbt_cli_run.configured(config, name="custom_solid")

For more details, visit the official documentation on dbt's node selection syntax.

dbt CLI: Set key-values for dbt vars

config = {"vars": {"key": "value"}}

from dagster_dbt import dbt_cli_run

custom_solid = dbt_cli_run.configured(config, name="custom_solid")

For more details, visit the official documentation on using variables in dbt.

dbt CLI: Disable default asset materializations

config = {"yield_materializations": False}

from dagster_dbt import dbt_cli_run

custom_solid = dbt_cli_run.configured(config, name="custom_solid")

dbt RPC: Configure a remote dbt RPC resource

from dagster_dbt import dbt_rpc_resource

custom_resource = dbt_rpc_resource.configured({"host": HOST, "post": PORT})

dbt RPC: Select specific models to run

config = {"models": ["my_dbt_model+", "path.to.models", "tag:nightly"]}

from dagster_dbt import dbt_rpc_run

custom_solid = dbt_rpc_run.configured(config, name="custom_solid")

For more details, visit the official documentation on dbt's node selection syntax.

dbt RPC: Exclude specific models

config = {"exclude": ["my_dbt_model+", "path.to.models", "tag:nightly"]}

from dagster_dbt import dbt_rpc_run

custom_solid = dbt_rpc_run.configured(config, name="custom_solid")

For more details, visit the official documentation on dbt's node selection syntax.

dbt RPC: Configure polling interval when using a dbt_rpc_*_and_wait solid

config = {"interval": 3}  # Poll the dbt RPC server every 3 seconds.

from dagster_dbt import dbt_rpc_run

custom_solid = dbt_rpc_run.configured(config, name="custom_solid")

dbt RPC: Disable default asset materializations

config = {"yield_materializations": False}

from dagster_dbt import dbt_rpc_run

custom_solid = dbt_rpc_run.configured(config, name="custom_solid")

Conclusion#

If you find a bug or want to add a feature to the dagster-dbt library, we invite you to contribute.

If you have questions on using dbt with Dagster, we'd love to hear from you:

join-us-on-slack