This example is intended to describe how to use Dagster's versioning and memoization features. As of release 0.10.0
, Dagster has functionality for tagging a solid or resource with the version argument of the decorator (ie @solid(version="a")
, @resource(version="b")
). This version argument is meant to be a representation of the state of the code contained by the wrapper.
Dagster can use versions to determine whether or not it is necessary to re-execute a particular step. Given versions of the code from each solid in a pipeline, the system can infer whether an upcoming execution of a step will differ from previous executions by tagging solid outputs with a version. This allows for the outputs of previous runs to be re-used. We call this process memoized execution.
The following example will walk through adding versions to a pipeline, and then performing memoized execution. Please note that versioning and memoization are experimental features, and are not yet intended for production use.
Consider adding version information to the following pipeline.
from dagster import ModeDefinition, pipeline, repository
from dagster.core.storage.memoizable_io_manager import versioned_filesystem_io_manager
from dagster.core.storage.tags import MEMOIZED_RUN_TAG
from memoized_development.solids.dog import emit_dog
from memoized_development.solids.sentence import emit_sentence
from memoized_development.solids.tree import emit_tree
@pipeline(
mode_defs=[
ModeDefinition("memoized", resource_defs={"io_manager": versioned_filesystem_io_manager}),
],
tags={MEMOIZED_RUN_TAG: "true"},
)
def my_pipeline():
return emit_sentence(emit_dog(), emit_tree())
In order to enable memoization for this pipeline, we have tagged the pipeline with MEMOIZED_RUN_TAG: "true"
, which indicates to the system that we should compute version information for outputs, and only run steps that have not been memoized. We also use versioned_filesystem_io_manager
as our io_manager
, which is a MemoizableIOManager
. It is required to use a MemoizableIOManager
as the io_manager
, so that versioning information can be retrieved from previous runs.
To enable memoization features on this pipeline, we will need to tag each solid (emit_sentence
, emit_dog
, and emit_tree
) with a version.
In order to catch code changes individually from each solid, we will have each solid live in its own file, and have the version of each solid be a hash of the corresponding file.
Here is the code to hash a file in Python. Note that this code is not provided by Dagster, and is instead part of the example code:
import hashlib
def get_hash_for_file(path):
h = hashlib.new("md5")
with open(path, "rb") as f:
data = f.read()
h.update(data)
digest = h.hexdigest()
return digest
And here is the definition of each solid, using file-hashing to compute the version:
from dagster import solid
from memoized_development.solids.solid_utils import get_hash_for_file
@solid(version=get_hash_for_file(__file__), config_schema={"dog_breed": str})
def emit_dog(context):
breed = context.solid_config["dog_breed"]
return breed
from dagster import solid
from memoized_development.solids.solid_utils import get_hash_for_file
@solid(version=get_hash_for_file(__file__), config_schema={"tree_species": str})
def emit_tree(context):
species = context.solid_config["tree_species"]
return "tree", species
from dagster import InputDefinition, solid
from memoized_development.solids.solid_utils import get_hash_for_file
@solid(
version=get_hash_for_file(__file__),
input_defs=[InputDefinition("dog_breed"), InputDefinition("tree_species")],
)
def emit_sentence(dog_breed, tree_species):
return f"{dog_breed} is a breed of dog, while {tree_species} is a species of tree."
Notice that the emit_dog
and emit_tree
solids require config. In order to execute my_pipeline
, we will provide the required config for these solids:
solids:
emit_dog:
config:
dog_breed: poodle
emit_tree:
config:
tree_species: douglas_fir
resources:
io_manager:
config:
base_dir: "./results"
Before executing, we can use the list_versions
CLI to check which outputs have been memoized in their current state, and which will need to be re-executed:
dagster pipeline list_versions \
-m memoized_development.repo \
-c ./config/california_config.yaml \
--mode memoized
This results in the following output:
| Step Output | Version | Status of Output |
|----------------------|------------------------------------------|--------------------|
| emit_dog.result | 93815493adaeae4d55fdfa820891e0d40fb95f46 | to-be-recomputed |
| emit_tree.result | 94d4620ffec745ef895719204424c16406646b99 | to-be-recomputed |
| emit_sentence.result | d367447b011cc2dcee083d5d9509477da746958b | to-be-recomputed |
This matches expectation, as we have not run the pipeline at this point. Use the execute
CLI to execute the pipeline. Note that memoized execution is not yet supported from Dagit.
dagster pipeline execute \
-m memoized_development.repo \
-c ./config/california_config.yaml \
--mode memoized
Let's see what happens if we change the config which we use to execute:
solids:
emit_dog:
config:
dog_breed: poodle
emit_tree:
config:
tree_species: red_maple
resources:
io_manager:
config:
base_dir: "./results"
Notice that only the config to the emit_tree
solid has changed. Therefore, the state of the output of emit_tree
has changed, and we would expect the version to change. Note however that since the emit_sentence
solid uses the result of emit_tree
as input, the version of the output of emit_sentence
should also change. We can verify this by again using the list_versions
CLI.
dagster pipeline execute \
-m memoized_development.repo \
-c ./config/maryland_config.yaml \
--mode memoized
This results in the following output:
| Step Output | Version | Status of Output |
|----------------------|------------------------------------------|--------------------|
| emit_dog.result | 93815493adaeae4d55fdfa820891e0d40fb95f46 | stored |
| emit_tree.result | 94d4620ffec745ef895719204424c16406646b99 | to-be-recomputed |
| emit_sentence.result | d367447b011cc2dcee083d5d9509477da746958b | to-be-recomputed |
As we expect, the versions of outputs for emit_tree
and emit_sentence
have changed, indicating that if we run the pipeline again with the given config, these two outputs will be reconstructed. Note additionally that the result of the emit_dog
solid is marked as stored, and its version has not changed, because the state of its inputs has remained the same. If we execute the pipeline again with the new config, we expect that the output of emit_dog
will be retrieved from the previous run, and that only the emit_tree
and emit_sentence
solids will run.