Source code for dagster.core.storage.schedules.base
import abc
from typing import Iterable, Type
from dagster.core.definitions.run_request import JobType
from dagster.core.errors import DagsterScheduleWipeRequired
from dagster.core.instance import MayHaveInstanceWeakref
from dagster.core.scheduler.job import JobState, JobStatus, JobTick, JobTickData, JobTickStatus
[docs]class ScheduleStorage(abc.ABC, MayHaveInstanceWeakref):
"""Abstract class for managing persistance of scheduler artifacts"""
@abc.abstractmethod
def wipe(self):
"""Delete all schedules from storage"""
@abc.abstractmethod
def all_stored_job_state(
self, repository_origin_id: str = None, job_type: JobType = None
) -> Iterable[JobState]:
"""Return all JobStates present in storage
Args:
repository_origin_id (Optional[str]): The ExternalRepository target id to scope results to
job_type (Optional[JobType]): The JobType to scope results to
"""
@abc.abstractmethod
def get_job_state(self, job_origin_id: str) -> JobState:
"""Return the unique job with the given id
Args:
job_origin_id (str): The unique job identifier
"""
@abc.abstractmethod
def add_job_state(self, job: JobState):
"""Add a job to storage.
Args:
job (JobState): The job to add
"""
@abc.abstractmethod
def update_job_state(self, job: JobState):
"""Update a job in storage.
Args:
job (JobState): The job to update
"""
@abc.abstractmethod
def delete_job_state(self, job_origin_id: str):
"""Delete a job in storage.
Args:
job_origin_id (str): The id of the ExternalJob target to delete
"""
@abc.abstractmethod
def get_job_ticks(
self, job_origin_id: str, before: float = None, after: float = None, limit: int = None
) -> Iterable[JobTick]:
"""Get the ticks for a given job.
Args:
job_origin_id (str): The id of the ExternalJob target
"""
@abc.abstractmethod
def get_latest_job_tick(self, job_origin_id: str) -> JobTick:
"""Get the most recent tick for a given job.
Args:
job_origin_id (str): The id of the ExternalJob target
"""
@abc.abstractmethod
def create_job_tick(self, job_tick_data: JobTickData):
"""Add a job tick to storage.
Args:
job_tick_data (JobTickData): The job tick to add
"""
@abc.abstractmethod
def update_job_tick(self, tick: JobTick):
"""Update a job tick already in storage.
Args:
tick (JobTick): The job tick to update
"""
@abc.abstractmethod
def purge_job_ticks(self, job_origin_id: str, tick_status: JobTickStatus, before: float):
"""Wipe ticks for a job for a certain status and timestamp.
Args:
job_origin_id (str): The id of the ExternalJob target to delete
tick_status (JobTickStatus): The tick status to wipe
before (datetime): All ticks before this datetime will get purged
"""
@abc.abstractmethod
def get_job_tick_stats(self, job_origin_id: str):
"""Get tick stats for a given job.
Args:
job_origin_id (str): The id of the ExternalJob target
"""
@abc.abstractmethod
def upgrade(self):
"""Perform any needed migrations"""
def optimize_for_dagit(self, statement_timeout: int):
"""Allows for optimizing database connection / use in the context of a long lived dagit process"""
def validate_stored_schedules(self, scheduler_class: Type):
# Check for any running job states that reference a different scheduler,
# prompt the user to wipe if they don't match
stored_schedules = self.all_stored_job_state(job_type=JobType.SCHEDULE)
for schedule in stored_schedules:
if schedule.status != JobStatus.RUNNING:
continue
stored_scheduler_class = schedule.job_specific_data.scheduler
if stored_scheduler_class and stored_scheduler_class != scheduler_class:
instance_scheduler_class = scheduler_class if scheduler_class else "None"
raise DagsterScheduleWipeRequired(
f"Found a running schedule using a scheduler ({stored_scheduler_class}) "
+ f"that differs from the scheduler on the instance ({instance_scheduler_class}). "
+ "The most likely reason for this error is that you changed the scheduler on "
+ "your instance while it was still running schedules. "
+ "To fix this, change the scheduler on your instance back to the previous "
+ "scheduler configuration and run the command 'dagster schedule wipe'. It "
+ f"will then be safe to change back to {instance_scheduler_class}."
)