DagsterDocs

Source code for dagster.core.definitions.solid

from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Dict,
    FrozenSet,
    Iterator,
    List,
    Optional,
    Sequence,
    Set,
    Tuple,
    Union,
    cast,
)

from dagster import check
from dagster.core.definitions.dependency import SolidHandle
from dagster.core.definitions.policy import RetryPolicy
from dagster.core.errors import DagsterInvalidDefinitionError, DagsterInvalidInvocationError
from dagster.core.types.dagster_type import DagsterType
from dagster.utils.backcompat import experimental_arg_warning

from .config import ConfigMapping
from .definition_config_schema import (
    IDefinitionConfigSchema,
    convert_user_facing_definition_config_schema,
)
from .dependency import IDependencyDefinition, SolidHandle, SolidInvocation
from .graph import GraphDefinition
from .i_solid_definition import NodeDefinition
from .input import InputDefinition, InputMapping
from .output import OutputDefinition, OutputMapping
from .solid_invocation import solid_invocation_result

if TYPE_CHECKING:
    from .decorators.solid import DecoratedSolidFunction


[docs]class SolidDefinition(NodeDefinition): """ The definition of a Solid that performs a user-defined computation. For more details on what a solid is, refer to the `Solid Overview <../../overview/solids-pipelines/solids>`_ . End users should prefer the :func:`@solid <solid>` and :func:`@lambda_solid <lambda_solid>` decorators. SolidDefinition is generally intended to be used by framework authors. Args: name (str): Name of the solid. Must be unique within any :py:class:`PipelineDefinition` using the solid. input_defs (List[InputDefinition]): Inputs of the solid. compute_fn (Callable): The core of the solid, the function that does the actual computation. The signature of this function is determined by ``input_defs``, and optionally, an injected first argument, ``context``, a collection of information provided by the system. This function will be coerced into a generator or an async generator, which must yield one :py:class:`Output` for each of the solid's ``output_defs``, and additionally may yield other types of Dagster events, including :py:class:`Materialization` and :py:class:`ExpectationResult`. output_defs (List[OutputDefinition]): Outputs of the solid. config_schema (Optional[ConfigSchema): The schema for the config. If set, Dagster will check that config provided for the solid matches this schema and fail if it does not. If not set, Dagster will accept any config provided for the solid. description (Optional[str]): Human-readable description of the solid. tags (Optional[Dict[str, Any]]): Arbitrary metadata for the solid. Frameworks may expect and require certain metadata to be attached to a solid. Users should generally not set metadata directly. Values that are not strings will be json encoded and must meet the criteria that `json.loads(json.dumps(value)) == value`. required_resource_keys (Optional[Set[str]]): Set of resources handles required by this solid. version (Optional[str]): (Experimental) The version of the solid's compute_fn. Two solids should have the same version if and only if they deterministically produce the same outputs when provided the same inputs. retry_policy (Optional[RetryPolicy]): The retry policy for this solid. Examples: .. code-block:: python def _add_one(_context, inputs): yield Output(inputs["num"] + 1) SolidDefinition( name="add_one", input_defs=[InputDefinition("num", Int)], output_defs=[OutputDefinition(Int)], # default name ("result") compute_fn=_add_one, ) """ def __init__( self, name: str, input_defs: Sequence[InputDefinition], compute_fn: Union[Callable[..., Any], "DecoratedSolidFunction"], output_defs: Sequence[OutputDefinition], config_schema: Optional[Union[Dict[str, Any], IDefinitionConfigSchema]] = None, description: Optional[str] = None, tags: Optional[Dict[str, str]] = None, required_resource_keys: Optional[Union[Set[str], FrozenSet[str]]] = None, version: Optional[str] = None, retry_policy: Optional[RetryPolicy] = None, ): from .decorators.solid import DecoratedSolidFunction if isinstance(compute_fn, DecoratedSolidFunction): self._compute_fn: Union[Callable[..., Any], DecoratedSolidFunction] = compute_fn else: compute_fn = cast(Callable[..., Any], compute_fn) self._compute_fn = check.callable_param(compute_fn, "compute_fn") self._config_schema = convert_user_facing_definition_config_schema(config_schema) self._required_resource_keys = frozenset( check.opt_set_param(required_resource_keys, "required_resource_keys", of_type=str) ) self._version = check.opt_str_param(version, "version") if version: experimental_arg_warning("version", "SolidDefinition.__init__") self._retry_policy = check.opt_inst_param(retry_policy, "retry_policy", RetryPolicy) positional_inputs = ( self._compute_fn.positional_inputs() if isinstance(self._compute_fn, DecoratedSolidFunction) else None ) super(SolidDefinition, self).__init__( name=name, input_defs=check.list_param(input_defs, "input_defs", InputDefinition), output_defs=check.list_param(output_defs, "output_defs", OutputDefinition), description=description, tags=check.opt_dict_param(tags, "tags", key_type=str), positional_inputs=positional_inputs, ) def __call__(self, *args, **kwargs) -> Any: from .composition import is_in_composition from .decorators.solid import DecoratedSolidFunction from ..execution.context.invocation import UnboundSolidExecutionContext if is_in_composition(): return super(SolidDefinition, self).__call__(*args, **kwargs) else: if not isinstance(self.compute_fn, DecoratedSolidFunction): raise DagsterInvalidInvocationError( "Attemped to invoke solid that was not constructed using the `@solid` " "decorator. Only solids constructed using the `@solid` decorator can be " "directly invoked." ) if self.compute_fn.has_context_arg(): if len(args) == 0: raise DagsterInvalidInvocationError( f"Compute function of solid '{self.name}' has context argument, but no context " "was provided when invoking." ) elif args[0] is not None and not isinstance(args[0], UnboundSolidExecutionContext): raise DagsterInvalidInvocationError( f"Compute function of solid '{self.name}' has context argument, but no context " "was provided when invoking." ) context = args[0] return solid_invocation_result(self, context, *args[1:], **kwargs) else: if len(args) > 0 and isinstance(args[0], UnboundSolidExecutionContext): raise DagsterInvalidInvocationError( f"Compute function of solid '{self.name}' has no context argument, but " "context was provided when invoking." ) return solid_invocation_result(self, None, *args, **kwargs) @property def compute_fn(self) -> Union[Callable[..., Any], "DecoratedSolidFunction"]: return self._compute_fn @property def config_schema(self) -> IDefinitionConfigSchema: return self._config_schema @property def required_resource_keys(self) -> Optional[FrozenSet[str]]: return frozenset(self._required_resource_keys) @property def version(self) -> Optional[str]: return self._version def all_dagster_types(self) -> Iterator[DagsterType]: yield from self.all_input_output_types() def iterate_node_defs(self) -> Iterator[NodeDefinition]: yield self def iterate_solid_defs(self) -> Iterator["SolidDefinition"]: yield self def resolve_output_to_origin( self, output_name: str, handle: SolidHandle ) -> Tuple[OutputDefinition, SolidHandle]: return self.output_def_named(output_name), handle def input_has_default(self, input_name: str) -> InputDefinition: return self.input_def_named(input_name).has_default_value def default_value_for_input(self, input_name: str) -> InputDefinition: return self.input_def_named(input_name).default_value def input_supports_dynamic_output_dep(self, input_name: str) -> bool: return True def copy_for_configured( self, name: str, description: Optional[str], config_schema: IDefinitionConfigSchema, config_or_config_fn: Any, ) -> "SolidDefinition": return SolidDefinition( name=name, input_defs=self.input_defs, compute_fn=self.compute_fn, output_defs=self.output_defs, config_schema=config_schema, description=description or self.description, tags=self.tags, required_resource_keys=self.required_resource_keys, version=self.version, retry_policy=self.retry_policy, ) @property def retry_policy(self) -> Optional[RetryPolicy]: return self._retry_policy
[docs]class CompositeSolidDefinition(GraphDefinition): """The core unit of composition and abstraction, composite solids allow you to define a solid from a graph of solids. In the same way you would refactor a block of code in to a function to deduplicate, organize, or manage complexity - you can refactor solids in a pipeline in to a composite solid. Args: name (str): The name of this composite solid. Must be unique within any :py:class:`PipelineDefinition` using the solid. solid_defs (List[Union[SolidDefinition, CompositeSolidDefinition]]): The set of solid definitions used in this composite solid. Composites may be arbitrarily nested. input_mappings (Optional[List[InputMapping]]): Define the inputs to the composite solid, and how they map to the inputs of its constituent solids. output_mappings (Optional[List[OutputMapping]]): Define the outputs of the composite solid, and how they map from the outputs of its constituent solids. config_mapping (Optional[ConfigMapping]): By specifying a config mapping, you can override the configuration for the child solids contained within this composite solid. Config mappings require both a configuration field to be specified, which is exposed as the configuration for the composite solid, and a configuration mapping function, which is called to map the configuration of the composite solid into the configuration that is applied to any child solids. dependencies (Optional[Dict[Union[str, SolidInvocation], Dict[str, DependencyDefinition]]]): A structure that declares where each solid gets its inputs. The keys at the top level dict are either string names of solids or SolidInvocations. The values are dicts that map input names to DependencyDefinitions. description (Optional[str]): Human readable description of this composite solid. tags (Optional[Dict[str, Any]]): Arbitrary metadata for the solid. Frameworks may expect and require certain metadata to be attached to a solid. Users should generally not set metadata directly. Values that are not strings will be json encoded and must meet the criteria that `json.loads(json.dumps(value)) == value`. may expect and require certain metadata to be attached to a solid. positional_inputs (Optional[List[str]]): The positional order of the inputs if it differs from the order of the input mappings Examples: .. code-block:: python @lambda_solid def add_one(num: int) -> int: return num + 1 add_two = CompositeSolidDefinition( 'add_two', solid_defs=[add_one], dependencies={ SolidInvocation('add_one', 'adder_1'): {}, SolidInvocation('add_one', 'adder_2'): {'num': DependencyDefinition('adder_1')}, }, input_mappings=[InputDefinition('num', Int).mapping_to('adder_1', 'num')], output_mappings=[OutputDefinition(Int).mapping_from('adder_2')], ) """ def __init__( self, name: str, solid_defs: List[NodeDefinition], input_mappings: Optional[List[InputMapping]] = None, output_mappings: Optional[List[OutputMapping]] = None, config_mapping: Optional[ConfigMapping] = None, dependencies: Optional[ Dict[Union[str, SolidInvocation], Dict[str, IDependencyDefinition]] ] = None, description: Optional[str] = None, tags: Optional[Dict[str, str]] = None, positional_inputs: Optional[List[str]] = None, ): _check_io_managers_on_composite_solid(name, input_mappings, output_mappings) super(CompositeSolidDefinition, self).__init__( name=name, description=description, node_defs=solid_defs, dependencies=dependencies, tags=tags, positional_inputs=positional_inputs, input_mappings=input_mappings, output_mappings=output_mappings, config_mapping=config_mapping, ) def all_dagster_types(self) -> Iterator[DagsterType]: yield from self.all_input_output_types() for node_def in self._node_defs: yield from node_def.all_dagster_types() def copy_for_configured( self, name: str, description: Optional[str], config_schema: IDefinitionConfigSchema, config_or_config_fn: Any, ) -> "CompositeSolidDefinition": if not self.has_config_mapping: raise DagsterInvalidDefinitionError( "Only composite solids utilizing config mapping can be pre-configured. The solid " '"{graph_name}" does not have a config mapping, and thus has nothing to be ' "configured.".format(graph_name=self.name) ) return CompositeSolidDefinition( name=name, solid_defs=self._node_defs, input_mappings=self.input_mappings, output_mappings=self.output_mappings, config_mapping=ConfigMapping( self._config_mapping.config_fn, config_schema=config_schema, ), dependencies=self.dependencies, description=description or self.description, tags=self.tags, positional_inputs=self.positional_inputs, )
def _check_io_managers_on_composite_solid( name: str, input_mappings: Optional[List[InputMapping]], output_mappings: Optional[List[OutputMapping]], ): # Ban root_manager_key on composite solids if input_mappings: for input_mapping in input_mappings: input_def = input_mapping.definition if input_def.root_manager_key: raise DagsterInvalidDefinitionError( "Root input manager cannot be set on a composite solid: " f'root_manager_key "{input_def.root_manager_key}" ' f'is set on InputDefinition "{input_def.name}" of composite solid "{name}". ' ) # Ban io_manager_key on composite solids if output_mappings: for output_mapping in output_mappings: output_def = output_mapping.definition if output_def.io_manager_key != "io_manager": raise DagsterInvalidDefinitionError( "IO manager cannot be set on a composite solid: " f'io_manager_key "{output_def.io_manager_key}" ' f'is set on OutputtDefinition "{output_def.name}" of composite solid "{name}". ' )