DagsterDocs

GraphQL Python Client#

Dagster provides a GraphQL Python Client to interface with Dagster's GraphQL API from Python.

Relevant APIs#

NameDescription
DagsterGraphQLClient
Experimental
The client class to interact with Dagster's GraphQL API from Python.
DagsterGraphQLClientErrorThe exception that the client raises upon a response error.

Overview#

The Dagster Python Client provides bindings in Python to programmatically interact with Dagster's GraphQL API.

When is this useful? Dagster exposes a powerful GraphQL API, but this level of flexibility is not always necessary. For example, when submitting a new pipeline run, you may only want to think about the pipeline name and configuration and to think less about maintaining a long GraphQL query.

DagsterGraphQLClient provides a way to solve this issue by providing a module with a simple interface to interact with the GraphQL API.

Note that all GraphQL methods on the API are not yet available in Python - the DagsterGraphQLClient currently only provides the following methods:

Using the GraphQL Client#

The snippet below shows example instantiation of the client:

from dagster_graphql import DagsterGraphQLClient

client = DagsterGraphQLClient("localhost", port_number=3000)

Examples#

Getting a Pipeline Run's Status#

You can use the client to get the status of a pipeline run as follows:

from dagster_graphql import DagsterGraphQLClientError
from dagster import PipelineRunStatus

try:
    status: PipelineRunStatus = client.get_run_status(RUN_ID)
    if status == PipelineRunStatus.SUCCESS:
        do_something_on_success()
    else:
        do_something_else()
except DagsterGraphQLClientError as exc:
    do_something_with_exc(exc)
    raise exc

Reloading all Repositories in a Repository Location#

You can also reload a repository location in a Dagster deployment.

This reloads all repositories in that repository location. This is useful in a variety of contexts, including refreshing Dagit without restarting the server. Example usage is as follows:

from dagster_graphql import (
    ReloadRepositoryLocationInfo,
    ReloadRepositoryLocationStatus,
)

reload_info: ReloadRepositoryLocationInfo = client.reload_repository_location(REPO_NAME)
if reload_info.status == ReloadRepositoryLocationStatus.SUCCESS:
    do_something_on_success()
else:
    raise Exception(
        "Repository location reload failed because of a "
        f"{reload_info.failure_type} error: {reload_info.message}"
    )

Submitting a Pipeline Run#

Using a Mode definition and a Run Configuration#

You can use the client to submit a pipeline run as follows:

from dagster_graphql import DagsterGraphQLClientError

try:
    new_run_id: str = client.submit_pipeline_execution(
        PIPELINE_NAME,
        repository_location_name=REPO_LOCATION_NAME,
        repository_name=REPO_NAME,
        run_config={},
        mode="default",
    )
    do_something_on_success(new_run_id)
except DagsterGraphQLClientError as exc:
    do_something_with_exc(exc)
    raise exc

Using a Preset#

You can also submit a pipeline from a preset with the client:

from dagster_graphql import DagsterGraphQLClientError

try:
    new_run_id: str = client.submit_pipeline_execution(
        PIPELINE_NAME,
        repository_location_name=REPO_LOCATION_NAME,
        repository_name=REPO_NAME,
        preset=PRESET_NAME,
    )
    do_something_on_success(new_run_id)
except DagsterGraphQLClientError as exc:
    do_something_with_exc(exc)
    raise exc

Shutting down a Repository Location Server#

If you're running your own gRPC server, we generally recommend updating your repository code by building a new Docker image with a new tag and redeploying your server using that new image, but sometimes you may want to restart your server without changing the image (for example, if your pipeline definitions are generated programatically from a database, and you want to restart the server and re-generate your repositories even though the underlying Python code hasn't changed). In these situations, reload_repository_location is insufficient, since it refreshes Dagit's information about the repositories but doesn't actually restart the server or reload the repository definition.

One way to cause your server to restart and your repositories to be reloaded is to run your server in an environment like Kubernetes that automatically restarts services when they fail (or docker-compose with restart: always set on the service), and then use the shutdown_repository_location function on the GraphQL client to shut down the server. The server will then be restarted by your environment, which will be automatically detected by Dagit.

Example usage:

from dagster_graphql import (
    ShutdownRepositoryLocationInfo,
    ShutdownRepositoryLocationStatus,
)

shutdown_info: ShutdownRepositoryLocationInfo = client.shutdown_repository_location(REPO_NAME)
if shutdown_info.status == ShutdownRepositoryLocationStatus.SUCCESS:
    do_something_on_success()
else:
    raise Exception(f"Repository location shutdown failed: {shutdown_info.message}")

Repository Location and Repository Inference#

Note that specifying the repository location name and repository name are not always necessary; the GraphQL client will infer the repository name and repository location name if the pipeline name is unique.

from dagster_graphql import DagsterGraphQLClientError

try:
    new_run_id: str = client.submit_pipeline_execution(
        PIPELINE_NAME,
        run_config={},
        mode="default",
    )
    do_something_on_success(new_run_id)
except DagsterGraphQLClientError as exc:
    do_something_with_exc(exc)
    raise exc