DagsterDocs

Source code for dagster.core.execution.context.init

from typing import Any, Dict, Optional, Union

from dagster import check
from dagster.core.definitions.pipeline import PipelineDefinition
from dagster.core.definitions.resource import IContainsGenerator, ResourceDefinition, Resources
from dagster.core.errors import DagsterInvariantViolationError
from dagster.core.instance import DagsterInstance
from dagster.core.log_manager import DagsterLogManager
from dagster.core.storage.pipeline_run import PipelineRun


[docs]class InitResourceContext: """Resource-specific initialization context. Attributes: resource_config (Any): The configuration data provided by the run config. The schema for this data is defined by the ``config_field`` argument to :py:class:`ResourceDefinition`. resource_def (ResourceDefinition): The definition of the resource currently being constructed. log_manager (DagsterLogManager): The log manager for this run of the pipeline resources (ScopedResources): The resources that are available to the resource that we are initalizing. pipeline_run (Optional[PipelineRun]): The pipeline run to use. When initializing resources outside of execution context, this will be None. run_id (Optional[str]): The id for this run of the pipeline. When initializing resources outside of execution context, this will be None. """ def __init__( self, resource_config: Any, resources: Resources, resource_def: Optional[ResourceDefinition] = None, instance: Optional[DagsterInstance] = None, pipeline_run: Optional[PipelineRun] = None, log_manager: Optional[DagsterLogManager] = None, pipeline_def_for_backwards_compat: Optional[PipelineDefinition] = None, ): self._resource_config = resource_config self._resource_def = resource_def self._log_manager = log_manager self._instance = instance self._resources = resources self._pipeline_def_for_backwards_compat = pipeline_def_for_backwards_compat self._pipeline_run = pipeline_run @property def resource_config(self) -> Any: return self._resource_config @property def resource_def(self) -> Optional[ResourceDefinition]: return self._resource_def @property def resources(self) -> Resources: return self._resources @property def instance(self) -> Optional[DagsterInstance]: return self._instance @property def pipeline_def_for_backwards_compat(self) -> Optional[PipelineDefinition]: return self._pipeline_def_for_backwards_compat @property def pipeline_run(self) -> Optional[PipelineRun]: return self._pipeline_run @property def log(self) -> Optional[DagsterLogManager]: return self._log_manager # backcompat: keep around this property from when InitResourceContext used to be a NamedTuple @property def log_manager(self) -> Optional[DagsterLogManager]: return self._log_manager @property def run_id(self) -> Optional[str]: return self.pipeline_run.run_id if self.pipeline_run else None def replace_config(self, config: Any) -> "InitResourceContext": return InitResourceContext( resource_config=config, resources=self.resources, instance=self.instance, resource_def=self.resource_def, pipeline_run=self.pipeline_run, log_manager=self.log, )
class UnboundInitResourceContext(InitResourceContext): """Resource initialization context outputted by ``build_init_resource_context``. Represents a context whose config has not yet been validated against a resource definition, hence the inability to access the `resource_def` attribute. When an instance of ``UnboundInitResourceContext`` is passed to a resource invocation, config is validated, and it is subsumed into an `InitResourceContext`, which contains the resource_def validated against. """ def __init__( self, resource_config: Any, resources: Optional[Union[Resources, Dict[str, Any]]], instance: Optional[DagsterInstance], ): from dagster.core.execution.build_resources import build_resources from dagster.core.execution.api import ephemeral_instance_if_missing from dagster.core.execution.context_creation_pipeline import initialize_console_manager self._instance_provided = ( check.opt_inst_param(instance, "instance", DagsterInstance) is not None ) # Construct ephemeral instance if missing self._instance_cm = ephemeral_instance_if_missing(instance) # Pylint can't infer that the ephemeral_instance context manager has an __enter__ method, # so ignore lint error instance = self._instance_cm.__enter__() # pylint: disable=no-member # If we are provided with a Resources instance, then we do not need to initialize if isinstance(resources, Resources): self._resources_cm = None else: self._resources_cm = build_resources( check.opt_dict_param(resources, "resources", key_type=str), instance=instance ) resources = self._resources_cm.__enter__() # pylint: disable=no-member self._resources_contain_cm = isinstance(resources, IContainsGenerator) self._cm_scope_entered = False super(UnboundInitResourceContext, self).__init__( resource_config=resource_config, resources=resources, resource_def=None, instance=instance, pipeline_run=None, log_manager=initialize_console_manager(None), pipeline_def_for_backwards_compat=None, ) def __enter__(self): self._cm_scope_entered = True return self def __exit__(self, *exc): if self._resources_cm: self._resources_cm.__exit__(*exc) # pylint: disable=no-member if self._instance_provided: self._instance_cm.__exit__(*exc) # pylint: disable=no-member def __del__(self): if self._resources_cm and self._resources_contain_cm and not self._cm_scope_entered: self._resources_cm.__exit__(None, None, None) # pylint: disable=no-member if self._instance_provided and not self._cm_scope_entered: self._instance_cm.__exit__(None, None, None) # pylint: disable=no-member @property def resource_config(self) -> Any: return self._resource_config @property def resource_def(self) -> Optional[ResourceDefinition]: raise DagsterInvariantViolationError( "UnboundInitLoggerContext has not been validated against a logger definition." ) @property def resources(self) -> Resources: if self._resources_cm and self._resources_contain_cm and not self._cm_scope_entered: raise DagsterInvariantViolationError( "At least one provided resource is a generator, but attempting to access " "resources outside of context manager scope. You can use the following syntax to " "open a context manager: `with build_init_resource_context(...) as context:`" ) return self._resources @property def instance(self) -> Optional[DagsterInstance]: return self._instance @property def pipeline_def_for_backwards_compat(self) -> Optional[PipelineDefinition]: return None @property def pipeline_run(self) -> Optional[PipelineRun]: return None @property def log(self) -> Optional[DagsterLogManager]: return self._log_manager # backcompat: keep around this property from when InitResourceContext used to be a NamedTuple @property def log_manager(self) -> Optional[DagsterLogManager]: return self._log_manager @property def run_id(self) -> Optional[str]: return None
[docs]def build_init_resource_context( config: Optional[Dict[str, Any]] = None, resources: Optional[Dict[str, Any]] = None, instance: Optional[DagsterInstance] = None, ) -> InitResourceContext: """Builds resource initialization context from provided parameters. ``build_init_resource_context`` can be used as either a function or context manager. If there is a provided resource to ``build_init_resource_context`` that is a context manager, then it must be used as a context manager. This function can be used to provide the context argument to the invocation of a resource. Args: resources (Optional[Dict[str, Any]]): The resources to provide to the context. These can be either values or resource definitions. config (Optional[Any]): The resource config to provide to the context. instance (Optional[DagsterInstance]): The dagster instance configured for the context. Defaults to DagsterInstance.ephemeral(). Examples: .. code-block:: python context = build_init_resource_context() resource_to_init(context) with build_init_resource_context( resources={"foo": context_manager_resource} ) as context: resource_to_init(context) """ return UnboundInitResourceContext( resource_config=check.opt_dict_param(config, "config", key_type=str), instance=check.opt_inst_param(instance, "instance", DagsterInstance), resources=check.opt_dict_param(resources, "resources", key_type=str), )