Solid hooks let you define success and failure handling policies on solids.
Name | Description |
---|---|
@failure_hook | The decorator to define a callback on solid failure. |
@success_hook | The decorator to define a callback on solid success. |
HookContext | The context object available to a hook function. |
build_hook_context | A function for building a HookContext outside of execution, intended to be used when testing a hook. |
A @success_hook
or @failure_hook
decorated function is called a solid hook. Solid hooks are designed for generic purposes — it can be anything you would like to do at a per solid level.
from dagster import HookContext, failure_hook, success_hook
@success_hook(required_resource_keys={"slack"})
def slack_message_on_success(context: HookContext):
message = f"Solid {context.solid.name} finished successfully"
context.resources.slack.chat.post_message(channel="#foo", text=message)
@failure_hook(required_resource_keys={"slack"})
def slack_message_on_failure(context: HookContext):
message = f"Solid {context.solid.name} failed"
context.resources.slack.chat.post_message(channel="#foo", text=message)
As you may have noticed, the hook function takes one argument, which is an instance of HookContext
. The available properties on this context are:
context.log
: loggerscontext.solid
: the solid associated with the hook.context.solid_config
: The config specific to the associated solid.context.step
: the execution step that triggers the hook.context.resources
: the resources the hook can use.Dagster provides different ways to trigger solid hooks.
For example, you want to send a slack message to a channel when any solid fails in a pipeline. In this case, we will be applying a hook on a pipeline, which will apply the hook on every solid instance within in that pipeline.
You can simply use the hook created "slack_message_on_failure" above to decorate the pipeline "notif_all". Then, slack messages will be sent when any solid in the pipeline fails.
@slack_message_on_failure
@pipeline(mode_defs=mode_defs)
def notif_all():
# the hook "slack_message_on_failure" is applied on every solid instance within this pipeline
a()
b()
However, sometimes a pipeline is a shared responsibility or you only want to be alerted on high-priority solid executions. So we also provide a way to set up hooks on solid instances which enables you to apply policies on a per-solid basis.
@pipeline(mode_defs=mode_defs)
def selective_notif():
# only solid "a" triggers hooks: a slack message will be sent when it fails or succeeds
a.with_hooks({slack_message_on_failure, slack_message_on_success})()
# solid "b" won't trigger any hooks
b()
In this case, solid "b" won't trigger any hooks, while when solid "a" fails or succeeds it will send a slack message.
You can test the functionality of a hook by invoking the hook definition. This will run the underlying decorated function. You can construct a context to provide to the invocation using the build_hook_context
function.
from dagster import build_hook_context
@success_hook(required_resource_keys={"my_conn"})
def my_success_hook(context):
context.resources.my_conn.send("foo")
def test_my_success_hook():
my_conn = mock.MagicMock()
# construct HookContext with mocked ``my_conn`` resource.
context = build_hook_context(resources={"my_conn": my_conn})
my_success_hook(context)
assert my_conn.send.call_count == 1
In many cases, you might want to know details about a solid failure. You can get the exception object thrown in the failed solid via the solid_exception
property on HookContext
:
from dagster import HookContext, failure_hook
import traceback
@failure_hook
def my_failure_hook(context: HookContext):
solid_exception: BaseException = context.solid_exception
# print stack trace of exception
traceback.print_tb(solid_exception.__traceback__)
Hooks use resource keys to access resources. After including the resource key in its set of required_resource_keys
, the body of the hook can access the corresponding resource via the resources
attribute of its context object.
It also enables you to switch resource values in different modes so that, for example, you can send slack messages only when you are in "prod" mode and mock the slack resource when you are in "dev" mode.
In this case, we can mock the slack_resource
in the "dev" mode using a helper function ResourceDefinition.hardcoded_resource()
, so it won't send slack messages when you are developing the pipeline.
mode_defs = [
ModeDefinition(
"dev",
resource_defs={
"slack": ResourceDefinition.hardcoded_resource(
slack_resource_mock, "do not send messages in dev"
)
},
),
ModeDefinition("prod", resource_defs={"slack": slack_resource}),
]
When we switch to "prod" mode, we can provide the real slack token in the run_config
and therefore enable sending messages to a certain slack channel when a hook is triggered.
resources:
slack:
config:
token: "xoxp-1234123412341234-12341234-1234" # replace with your slack token
Then, we can execute a pipeline with the config through Python API, CLI, or the Dagit UI. Here's an example of using the Python API.
if __name__ == "__main__":
with open(
file_relative_path(__file__, "prod.yaml"),
"r",
) as fd:
run_config = yaml.safe_load(fd.read())
result = execute_pipeline(notif_all, run_config=run_config, mode="prod")
Dagster currently does not have pipeline-scoped hooks. When you add a hook decorator to a pipeline, the hook is attached to every solid in the pipeline individually. The hook does not track pipeline-scoped events and only tracks solid-level success or failure events.
You may find the need for something like pipeline-level hooks. For example, you may want to run some code for every pipeline success or failure.
Even though Dagster does not have first-class pipeline-level hooks, you can use @sensor
to achieve the same functionality. The sensor can monitor pipeline events at the instance level and perform an arbitrary action (such as send an alert or even instigate a new pipeline run).
You can find an example of the sensor at Pipeline failure sensor on the Sensors page.