DagsterDocs

Source code for dagster.core.storage.schedules.base

import abc
from typing import Iterable, Type

from dagster.core.definitions.run_request import JobType
from dagster.core.errors import DagsterScheduleWipeRequired
from dagster.core.instance import MayHaveInstanceWeakref
from dagster.core.scheduler.job import JobState, JobStatus, JobTick, JobTickData, JobTickStatus


[docs]class ScheduleStorage(abc.ABC, MayHaveInstanceWeakref): """Abstract class for managing persistance of scheduler artifacts""" @abc.abstractmethod def wipe(self): """Delete all schedules from storage""" @abc.abstractmethod def all_stored_job_state( self, repository_origin_id: str = None, job_type: JobType = None ) -> Iterable[JobState]: """Return all JobStates present in storage Args: repository_origin_id (Optional[str]): The ExternalRepository target id to scope results to job_type (Optional[JobType]): The JobType to scope results to """ @abc.abstractmethod def get_job_state(self, job_origin_id: str) -> JobState: """Return the unique job with the given id Args: job_origin_id (str): The unique job identifier """ @abc.abstractmethod def add_job_state(self, job: JobState): """Add a job to storage. Args: job (JobState): The job to add """ @abc.abstractmethod def update_job_state(self, job: JobState): """Update a job in storage. Args: job (JobState): The job to update """ @abc.abstractmethod def delete_job_state(self, job_origin_id: str): """Delete a job in storage. Args: job_origin_id (str): The id of the ExternalJob target to delete """ @abc.abstractmethod def get_job_ticks( self, job_origin_id: str, before: float = None, after: float = None, limit: int = None ) -> Iterable[JobTick]: """Get the ticks for a given job. Args: job_origin_id (str): The id of the ExternalJob target """ @abc.abstractmethod def get_latest_job_tick(self, job_origin_id: str) -> JobTick: """Get the most recent tick for a given job. Args: job_origin_id (str): The id of the ExternalJob target """ @abc.abstractmethod def create_job_tick(self, job_tick_data: JobTickData): """Add a job tick to storage. Args: job_tick_data (JobTickData): The job tick to add """ @abc.abstractmethod def update_job_tick(self, tick: JobTick): """Update a job tick already in storage. Args: tick (JobTick): The job tick to update """ @abc.abstractmethod def purge_job_ticks(self, job_origin_id: str, tick_status: JobTickStatus, before: float): """Wipe ticks for a job for a certain status and timestamp. Args: job_origin_id (str): The id of the ExternalJob target to delete tick_status (JobTickStatus): The tick status to wipe before (datetime): All ticks before this datetime will get purged """ @abc.abstractmethod def get_job_tick_stats(self, job_origin_id: str): """Get tick stats for a given job. Args: job_origin_id (str): The id of the ExternalJob target """ @abc.abstractmethod def upgrade(self): """Perform any needed migrations""" def optimize_for_dagit(self, statement_timeout: int): """Allows for optimizing database connection / use in the context of a long lived dagit process""" def validate_stored_schedules(self, scheduler_class: Type): # Check for any running job states that reference a different scheduler, # prompt the user to wipe if they don't match stored_schedules = self.all_stored_job_state(job_type=JobType.SCHEDULE) for schedule in stored_schedules: if schedule.status != JobStatus.RUNNING: continue stored_scheduler_class = schedule.job_specific_data.scheduler if stored_scheduler_class and stored_scheduler_class != scheduler_class: instance_scheduler_class = scheduler_class if scheduler_class else "None" raise DagsterScheduleWipeRequired( f"Found a running schedule using a scheduler ({stored_scheduler_class}) " + f"that differs from the scheduler on the instance ({instance_scheduler_class}). " + "The most likely reason for this error is that you changed the scheduler on " + "your instance while it was still running schedules. " + "To fix this, change the scheduler on your instance back to the previous " + "scheduler configuration and run the command 'dagster schedule wipe'. It " + f"will then be safe to change back to {instance_scheduler_class}." )