DagsterDocs

Run Attribution
Experimental
#

You can find the code for this example on Github

Run Coordinator#

The Run Coordinator is used to control the policy that Dagster uses to manage the set of active runs on your deployment.

The Run Coordinator is invoked when runs are submitted on the instance (e.g. via the GraphQL API), and as a result it can be used to dynamically attach tags to submitted runs.

In this example, we'll perform run attribution, which means that we'll attach a user's email as a tag to submitted runs.

To accomplish this, we'll use a custom Run Coordinator to read Flask HTTP headers (from Dagster's GraphQL server) and parse the headers to get an email which we'll attach as a tag.

Custom Run Coordinator#

In this use case, we'd like to add a hook to customize submitted runs while still using a queue to submit runs to the Dagster Daemon. To accomplish this, we can use the Queued Run Coordinator as follows:

from dagster.core.host_representation import ExternalPipeline
from dagster.core.run_coordinator import QueuedRunCoordinator
from dagster.core.storage.pipeline_run import PipelineRun


class CustomRunCoordinator(QueuedRunCoordinator):
    def submit_run(
        self, pipeline_run: PipelineRun, external_pipeline: ExternalPipeline
    ) -> PipelineRun:
        pass

This allows us to insert custom hooks in submit_run to execute when runs are submitted.

We can first read HTTP headers with the following code snippet:

from flask import has_request_context, request

desired_header = request.headers.get(CUSTOM_HEADER_NAME) if has_request_context() else None

Then we can parse the relevant header (in this case, called the jwt_claims_header) with any custom hook. In the following example, we're decoding a JWT header which contains the user's email.

def get_email(self, jwt_claims_header: Optional[str]) -> Optional[str]:
    if not jwt_claims_header:
        return None

    split_header_tokens = jwt_claims_header.split(".")
    if len(split_header_tokens) < 2:
        return None

    decoded_claims_json_str = b64decode(split_header_tokens[1])
    try:
        claims_json = loads(decoded_claims_json_str)
        return claims_json.get("email")
    except JSONDecodeError:
        return None

The above is just an example - you can write any hook which would be useful to you.

Putting this all together, we can use these hooks to dynamically attach tags to submitted pipeline runs. In the following example, we'd read the user's email from the X-Amzn-Oidc-Data header by using the get_email hook defined above, and then attach the email as a tag to the pipeline run.

def submit_run(
    self, pipeline_run: PipelineRun, external_pipeline: ExternalPipeline
) -> PipelineRun:
    jwt_claims_header = (
        request.headers.get("X-Amzn-Oidc-Data", None) if has_request_context() else None
    )
    email = self.get_email(jwt_claims_header)
    if email:
        self._instance.add_run_tags(pipeline_run.run_id, {"user": email})
    else:
        warnings.warn(f"Couldn't decode JWT header {jwt_claims_header}")
    return super().submit_run(pipeline_run, external_pipeline)

Deploying#

dagster.yaml#

To specify the custom Run Coordinator to be used on the instance, add the following snippet to an instance's dagster.yaml:

run_coordinator:
  module: run_attribution_example
  class: CustomRunCoordinator

Helm#

If you're using Helm to deploy instead, you can specify the custom run coordinator in the Helm chart's values.yaml:

queuedRunCoordinator:
  enabled: true
  module: run_attribution_example
  class: CustomRunCoordinator
  config: {}

Note that the flexibility of specifying module and class allows for any custom Run Coordinator to be used, as long as the relevant module is installed in the image that the Dagster instance is running on.