DagsterDocs

Schedules#

Dagster includes a scheduler that can be used to launch runs on a fixed interval.

Relevant APIs#

NameDescription
@daily_scheduleDecorator that defines a schedule that executes every day at a fixed time
@hourly_scheduleDecorator that defines a schedule that executes every hour at a fixed time
@weekly_scheduleDecorator that defines a schedule that executes every week on a fixed day and time
@monthly_scheduleDecorator that defines a schedule that executes every month on a fixed day and time
@scheduleDecorator that defines a schedule that executes according to a given cron schedule. It's important to note that schedules created using this schedule are non-partition based schedules
ScheduleExecutionContextThe context passed to the schedule definition execution function
build_schedule_contextA function that constructs a ScheduleExecutionContext
ScheduleDefinitionClass for schedules. You almost never want to use initialize this class directly. Instead, you should use one of the many decorators above

Overview#

A schedule is a definition in Dagster that is used to execute a pipeline at a fixed interval. Each time at which a schedule is evaluated is called a tick. The schedule definition is responsible for generating run configuration for the pipeline on each tick.

Each schedule:

  • Targets a single pipeline
  • Defines a function that returns run configuration for the targeted pipeline.
  • Optionally defines tags, a mode, and a solid selection for the targeted pipeline.
  • Optionally defines a should_execute function, which can be used to skip some ticks.

Dagster includes a scheduler, which runs as part of the dagster-daemon process. Once you have defined a schedule, see the dagster-daemon page for instructions on how to run the daemon in order to execute your schedules.


Defining a schedule#

There are two types of schedules in Dagster:

  • Partition-based
  • Non-partition-based

Partition-based schedules are best when your pipeline operates on data that is partitioned by time (for example, a pipeline that fills in a daily partition of a table). Non-partition-based schedules are best when you just want to run an operation on a fixed interval (for example, a pipeline that rebuilds the same table on each execution)

Partition-based schedules#

Partition-based schedules generate an underlying PartitionSetDefinition to execute against. The scheduler will make sure that a run is executed for every partition in the partition set, starting from the time that the schedule is turned on.

For example, if we wanted to create a schedule that runs at 11:00 AM every day, we could use @daily_schedule, a decorator that produces a partition-based daily schedule.

@daily_schedule(
    pipeline_name="my_pipeline",
    start_date=datetime.datetime(2021, 1, 1),
    execution_time=datetime.time(11, 0),
    execution_timezone="US/Central",
)
def my_daily_schedule(date):
    return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d")}}}}

The decorated schedule function should accept a datetime and return the run config for the pipeline run associated with that partition. The scheduler will ensure that the schedule function is evaluated exactly once for every partition to generate run config, which is then used to create and launch a pipeline run.

Partition-based schedules require a start_date that indicates when the set of partitions begins. The scheduler will only kick off runs for times after both the start_date and the time when the schedule was turned on. You can kick off runs for times between the start_date and when the schedule is turned on by launching a backfill.

By default, the partition that is used for the run will be one partition earlier than the partition that includes the current time, to capture a common ETL use case - for example, a daily schedule will fill in the previous day's partition, and a monthly schedule will fill in last month's partition. You can customize this behavior by changing the partition_days_offset parameter for a daily schedule. The default value of this parameter is 1, which means that the scheduler goes back one day to determine the partition. Setting the value to 0 will cause the schedule to fill in the partition that corresponds to the current day, and increasing it above 1 will cause it to fill in an even earlier partition. A similarly-named parameter also exists for the other execution intervals.

Non-partition-based schedules#

When you want to run a schedule on a fixed interval and don't need partitions, you can use the @schedule decorator to define your schedule.

For example, this schedule runs at 1:00 AM in US/Central time every day:

@schedule(cron_schedule="0 1 * * *", pipeline_name="my_pipeline", execution_timezone="US/Central")
def my_schedule(_context):
    return {"solids": {"process_data": {"config": {"dataset_name": "my_dataset"}}}}

You can also access the execution time fron the passed-in context:

@schedule(cron_schedule="0 1 * * *", pipeline_name="my_pipeline", execution_timezone="US/Central")
def my_execution_time_schedule(context):
    date = context.scheduled_execution_time.strftime("%Y-%m-%d")
    return {
        "solids": {
            "process_data": {"config": {"dataset_name": "my_dataset", "execution_date": date}}
        }
    }

Timezones#

You can customize the timezone in which your schedule executes by setting the execution_timezone parameter on your schedule to any tz timezone. Schedules with no timezone set run in UTC.

For example, the following schedule executes daily at 9AM in US/Pacific time:

@daily_schedule(
    pipeline_name="my_data_pipeline",
    start_date=datetime.datetime(2020, 1, 1),
    execution_time=datetime.time(9, 0),
    execution_timezone="US/Pacific",
)
def my_timezone_schedule(date):
    return {
        "solids": {
            "process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d %H:%M:%S")}}
        }
    }

Daylight Savings Time#

Because of Daylight Savings Time transitions, it's possible to specify an execution time that does not exist for every scheduled interval. For example, say you have a daily schedule with an execution time of 2:30 AM in the US/Eastern timezone. On 2019/03/10, the time jumps from 2:00 AM to 3:00 AM when Daylight Savings Time begins. Therefore, the time of 2:30 AM did not exist for the day.

If you specify such an execution time, Dagster runs your schedule at the next time that exists. In the previous example, the schedule would run at 3:00 AM.

It's also possible to specify an execution time that exists twice on one day every year. For example, on 2019/11/03 in US/Eastern time, the hour from 1:00 AM to 2:00 AM repeats, so a daily schedule running at 1:30 AM has two possible times in which it could execute. In this case, Dagster will execute your schedule at the latter of the two possible times.

Hourly schedules will be unaffected by daylight savings time transitions - the schedule will continue to run exactly once every hour, even as the timezone changes. In the example above where the hour from 1:00 AM to 2:00 AM repeats, an hourly schedule running at 30 minutes past the hour would run at 12:30 AM, both instances of 1:30 AM, and then proceed normally from 2:30 AM on.

Running the scheduler#

In order for your schedule to run, it must be started. The easiest way to start and stop schedules is in Dagit from the Schedules page. You can also start and stop a schedule with the dagster schedule start and dagster schedule stop commands.

Once your schedule is started, if you're running the dagster-daemon process as part of your deployment, the schedule will begin executing immediately. See the Troubleshooting section below if your schedule has been started but isn't submitting runs.

Testing Schedules#

Testing Partition Schedules#

In order to unit test a partition schedule, you can invoke it directly with a provided datetime. This will return the run config generated by the decorated function for the given date. This config can then be validated against a pipeline using the validate_run_config function.

@hourly_schedule(
    pipeline_name="test_pipeline",
    start_date=datetime.datetime(2020, 1, 1),
)
def hourly_schedule_to_test(date):
    return {
        "solids": {
            "process_data_for_date": {
                "config": {
                    "date": date.strftime("%Y-%m-%d %H"),
                }
            }
        }
    }


from dagster import validate_run_config


def test_hourly_schedule():
    run_config = hourly_schedule_to_test(datetime.datetime(2020, 1, 1))
    assert validate_run_config(pipeline_for_test, run_config)

Testing Cron Schedules#

Cron schedules can also be tested by directly invoking them. The invocation of this schedule will return run config, which can then be validated using the validate_run_config function.

@schedule(cron_schedule="* * * * *", pipeline_name="my_pipeline_on_cron")
def my_cron_schedule(_context):
    return {}


from dagster import validate_run_config


def test_my_cron_schedule():
    run_config = my_cron_schedule(None)
    assert validate_run_config(my_pipeline_on_cron, run_config)

If using the context parameter in your decorated function, then build_schedule_context can be used to construct a ScheduleExecutionContext for passing through.

@schedule(cron_schedule="0 1 * * *", pipeline_name="pipeline_for_test")
def my_schedule_uses_context(context):
    date_str = context.scheduled_execution_time.strftime("%Y-%m-%d")
    return {
        "solids": {
            "process_data_for_date": {
                "config": {
                    "date": date_str,
                }
            }
        }
    }


from dagster import build_schedule_context, validate_run_config


def test_my_cron_schedule_with_context():
    context = build_schedule_context(scheduled_execution_time=datetime.datetime(2020, 1, 1))
    run_config = my_schedule_uses_context(context)
    assert validate_run_config(pipeline_for_test, run_config)

Examples#

Hourly partition-based schedule#

@hourly_schedule(
    pipeline_name="my_pipeline",
    start_date=datetime.datetime(2020, 1, 1),
    execution_time=datetime.time(hour=0, minute=25),
    execution_timezone="US/Central",
)
def my_hourly_schedule(date):
    return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d %H")}}}}

Daily partition-based schedule#

@daily_schedule(
    pipeline_name="my_pipeline",
    start_date=datetime.datetime(2020, 1, 1),
    execution_time=datetime.time(hour=9, minute=0),
    execution_timezone="US/Central",
)
def my_daily_schedule(date):
    return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d")}}}}

Weekly partition-based schedule#

@weekly_schedule(
    pipeline_name="my_pipeline",
    start_date=datetime.datetime(2020, 1, 1),
    execution_day_of_week=1,  # Monday
    execution_timezone="US/Central",
)
def my_weekly_schedule(date):
    return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d")}}}}

Monthly partition-based schedule#

@monthly_schedule(
    pipeline_name="my_pipeline",
    start_date=datetime.datetime(2020, 1, 1),
    execution_timezone="US/Central",
    execution_day_of_month=15,
    execution_time=datetime.time(hour=9, minute=0),
)
def my_monthly_schedule(date):
    return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m")}}}}

Patterns#

Using a preset in a schedule definition#

If you already have a preset defined for a pipeline you want to schedule, you can extract the necessary attributes from the preset and pass them to the schedule decorator.

In this example, we directly use the solid_selection, mode, tags, and run_config from the preset:

@daily_schedule(
    start_date=datetime.datetime(2020, 1, 1),
    pipeline_name="my_pipeline",
    solid_selection=preset.solid_selection,
    mode=preset.mode,
    tags_fn_for_date=lambda _: preset.tags,
)
def my_preset_schedule(_date):
    return preset.run_config

You might need to modify the preset's run config to include information about the date partition:

import copy


@daily_schedule(
    start_date=datetime.datetime(2020, 1, 1),
    pipeline_name="my_pipeline",
    solid_selection=preset.solid_selection,
    mode=preset.mode,
    tags_fn_for_date=lambda _: preset.tags,
)
def my_modified_preset_schedule(date):
    modified_run_config = copy.deepcopy(preset.run_config)
    modified_run_config["solids"]["process_data_for_date"]["config"]["date"] = date.strftime(
        "%Y-%m-%d"
    )
    return modified_run_config

If you find yourself using presets to generate schedule definitions frequently, you can use a helper function similar to this one to take a preset and return a schedule.

def daily_schedule_definition_from_pipeline_preset(pipeline, preset_name, start_date):
    preset = pipeline.get_preset(preset_name)
    if not preset:
        raise Exception(
            "Preset {preset_name} was not found "
            "on pipeline {pipeline_name}".format(
                preset_name=preset_name, pipeline_name=pipeline.name
            )
        )

    @daily_schedule(
        start_date=start_date,
        pipeline_name=pipeline.name,
        solid_selection=preset.solid_selection,
        mode=preset.mode,
        tags_fn_for_date=lambda _: preset.tags,
    )
    def my_schedule(_date):
        return preset.run_config

    return my_schedule

Troubleshooting#

Try these steps if you're trying to run a schedule and are running into problems.

Step 1: Is your schedule included in your repository and turned on?#

The left-hand navigation bar in Dagit shows all of the schedules for the currently-selected repository, with a green dot next to each schedule that is running. Make sure that your schedule appears in this list with a green dot. To ensure that Dagit has loaded the latest version of your schedule code, you can press the reload button next to your repository name to reload all the code in your repository.

  • If Dagit is unable to load the repository containing your schedule (for example, due to a syntax error or a problem with one of your definitions), there should be an error message in the left nav with a link that will give you more information about the error.
  • If the repository is loading, but the schedule doesn't appear in the list of schedules, make sure that your schedule function is included in the list of schedules returned by your repository function.
  • If the schedule appears but doesn't have a green dot next to it, click on the name of the schedule, then toggle the switch at the top of the screen to mark it as running.

Step 2: Is your schedule interval set up correctly?#

When you click on your schedule name in the left-hand nav in Dagit, you'll be take to a page where you can view more information about the schedule. If the schedule is running, there should be a "Next tick" row near the top of the page that tells you when the schedule is expected to run next. Make sure that time is what you expect (including the timezone).

Step 3: Is the schedule interval configured correctly, but it still isn't creating any runs?#

It's possible that the dagster-daemon process that submits runs for your schedule is not working correctly. If you haven't set up dagster-daemon yet, check the Deploying Dagster section to find the steps to do so.

First, check that the daemon is running. Click on "Status" in the left nav in Dagit, and examine the "Scheduler" row under the "Daemon statuses" section. The daemon process periodically sends out a heartbeat from the scheduler, so if the scheduler daemon is listed as "Not running", that indicates that there's a problem with your daemon deployment. If the daemon ran into an error that caused it to throw an exception, that error will often appear in this UI as well.

If there isn't a clear error on this page, or if the daemon should be sending heartbeats but isn't, you may need to check the logs from the daemon process. The steps to do this will depend on your deployment - for example, if you're using Kubernetes, you'll need to get the logs from the pod that's running the daemon. You should be able to search those logs for the name of your schedule (or SchedulerDaemon to see all logs associated with the scheduler) to gain an understanding of what's going wrong.

Finally, it's possible that the daemon is running correctly, but there's a problem with your schedule code. Check the "Latest tick" row on the page for your schedule. If there was an error while trying to submit runs for your schedule, there should be a red "Failure" box next to the time. Clicking on the box should display an error with a stack trace showing you why the schedule couldn't execute. If the schedule is working as expected, it should display a blue box instead with information about any runs that were created by that schedule tick.

Still stuck?#

If these steps didn't help and your schedule still isn't running, reach out in Slack or file an issue and we'll be happy to help investigate.