DagsterDocs

Source code for dagster.core.definitions.mode

from typing import TYPE_CHECKING, Any, Callable, Dict, List, NamedTuple, Optional, cast

from dagster import check
from dagster.core.definitions.executor import ExecutorDefinition, default_executors
from dagster.loggers import default_loggers
from dagster.utils.backcompat import experimental_arg_warning
from dagster.utils.merger import merge_dicts

from .config import ConfigMapping
from .logger import LoggerDefinition
from .resource import ResourceDefinition
from .utils import check_valid_name

DEFAULT_MODE_NAME = "default"

if TYPE_CHECKING:
    from .intermediate_storage import IntermediateStorageDefinition


[docs]class ModeDefinition( NamedTuple( "_ModeDefinition", [ ("name", str), ("resource_defs", Dict[str, ResourceDefinition]), ("loggers", Dict[str, LoggerDefinition]), ("executor_defs", List[ExecutorDefinition]), ("description", Optional[str]), ("intermediate_storage_defs", List["IntermediateStorageDefinition"]), ("config_mapping", Optional[ConfigMapping]), ("partitions", Optional[Callable[[], List[Any]]]), ], ) ): """Define a mode in which a pipeline can operate. A mode provides pipelines with a set of resource implementations, loggers, system storages, and executors. Args: name (Optional[str]): The name of the mode. Must be unique within the :py:class:`PipelineDefinition` to which the mode is attached. (default: "default"). resource_defs (Optional[Dict[str, ResourceDefinition]]): A dictionary of string resource keys to their implementations. Individual solids may require resources to be present by these keys. logger_defs (Optional[Dict[str, LoggerDefinition]]): A dictionary of string logger identifiers to their implementations. executor_defs (Optional[List[ExecutorDefinition]]): The set of executors available when executing in this mode. By default, this will be the 'in_process' and 'multiprocess' executors (:py:data:`~dagster.default_executors`). description (Optional[str]): A human-readable description of the mode. intermediate_storage_defs (Optional[List[IntermediateStorageDefinition]]): The set of intermediate storage options available when executing in this mode. By default, this will be the 'in_memory' and 'filesystem' system storages. _config_mapping (Optional[ConfigMapping]): Experimental _partitions (Optional[Callable[[], List[Any]]]): Experimental """ def __new__( cls, name: Optional[str] = None, resource_defs: Optional[Dict[str, ResourceDefinition]] = None, logger_defs: Optional[Dict[str, LoggerDefinition]] = None, executor_defs: Optional[List[ExecutorDefinition]] = None, description: Optional[str] = None, intermediate_storage_defs: Optional[List["IntermediateStorageDefinition"]] = None, _config_mapping: Optional[ConfigMapping] = None, _partitions: Optional[Callable[[], List[Any]]] = None, ): from dagster.core.storage.system_storage import default_intermediate_storage_defs from .intermediate_storage import IntermediateStorageDefinition check.opt_dict_param( resource_defs, "resource_defs", key_type=str, value_type=ResourceDefinition ) if resource_defs and "io_manager" in resource_defs: resource_defs_with_defaults = resource_defs else: from dagster.core.storage.mem_io_manager import mem_io_manager resource_defs_with_defaults = merge_dicts( {"io_manager": mem_io_manager}, resource_defs or {} ) if _config_mapping: experimental_arg_warning("_config_mapping", "ModeDefinition.__new__") if _partitions: experimental_arg_warning("_partitions", "ModeDefinition.__new__") return super(ModeDefinition, cls).__new__( cls, name=check_valid_name(name) if name else DEFAULT_MODE_NAME, resource_defs=resource_defs_with_defaults, loggers=( check.opt_dict_param( logger_defs, "logger_defs", key_type=str, value_type=LoggerDefinition ) or default_loggers() ), intermediate_storage_defs=check.list_param( intermediate_storage_defs if intermediate_storage_defs else default_intermediate_storage_defs, "intermediate_storage_defs", of_type=IntermediateStorageDefinition, ), executor_defs=check.list_param( executor_defs if executor_defs else default_executors, "executor_defs", of_type=ExecutorDefinition, ), description=check.opt_str_param(description, "description"), config_mapping=check.opt_inst_param(_config_mapping, "_config_mapping", ConfigMapping), partitions=check.opt_callable_param(_partitions, "_partitions"), ) @property def resource_key_set(self): return frozenset(self.resource_defs.keys()) def get_intermediate_storage_def(self, name): check.str_param(name, "name") for intermediate_storage_def in self.intermediate_storage_defs: if intermediate_storage_def.name == name: return intermediate_storage_def check.failed("{} storage definition not found".format(name)) def get_partition_set_def(self, pipeline_name: str): from dagster.core.definitions.partition import PartitionSetDefinition, Partition if not self.partitions: return None def partition_fn() -> List[Partition]: return [ Partition(partition, str(partition)) for partition in cast(Callable, self.partitions)() ] def run_config_fn_for_partition(partition: Partition): return partition.value return PartitionSetDefinition( pipeline_name=pipeline_name, name=pipeline_name + "_" + self.name + "_partition_set", partition_fn=partition_fn, run_config_fn_for_partition=run_config_fn_for_partition, mode=self.name, ) @staticmethod def from_resources(resources, name=None): check.dict_param(resources, "resources", key_type=str) return ModeDefinition( name=name, resource_defs={ resource_name: ResourceDefinition.hardcoded_resource(resource) for resource_name, resource in resources.items() }, )