DagsterDocs

Source code for dagster.core.definitions.logger

import logging
from typing import TYPE_CHECKING, Any, Callable, Optional, Union

from dagster import check
from dagster.core.definitions.config import is_callable_valid_config_arg
from dagster.core.definitions.configurable import AnonymousConfigurableDefinition
from dagster.core.errors import DagsterInvalidInvocationError

from ..decorator_utils import get_function_params
from .definition_config_schema import convert_user_facing_definition_config_schema

if TYPE_CHECKING:
    from dagster.core.execution.context.logger import InitLoggerContext, UnboundInitLoggerContext
    from dagster.core.definitions import PipelineDefinition

    InitLoggerFunction = Callable[[InitLoggerContext], logging.Logger]


[docs]class LoggerDefinition(AnonymousConfigurableDefinition): """Core class for defining loggers. Loggers are pipeline-scoped logging handlers, which will be automatically invoked whenever solids in a pipeline log messages. Args: logger_fn (Callable[[InitLoggerContext], logging.Logger]): User-provided function to instantiate the logger. This logger will be automatically invoked whenever the methods on ``context.log`` are called from within solid compute logic. config_schema (Optional[ConfigSchema]): The schema for the config. Configuration data available in `init_context.logger_config`. If not set, Dagster will accept any config provided. description (Optional[str]): A human-readable description of this logger. """ def __init__( self, logger_fn: "InitLoggerFunction", config_schema: Any = None, description: Optional[str] = None, ): self._logger_fn = check.callable_param(logger_fn, "logger_fn") self._config_schema = convert_user_facing_definition_config_schema(config_schema) self._description = check.opt_str_param(description, "description") def __call__(self, *args, **kwargs): from dagster.core.execution.context.logger import UnboundInitLoggerContext from .logger_invocation import logger_invocation_result if len(args) == 0 and len(kwargs) == 0: raise DagsterInvalidInvocationError( "Logger initialization function has context argument, but no context argument was " "provided when invoking." ) if len(args) + len(kwargs) > 1: raise DagsterInvalidInvocationError( "Initialization of logger received multiple arguments. Only a first " "positional context parameter should be provided when invoking." ) context_param_name = get_function_params(self.logger_fn)[0].name if args: context = check.opt_inst_param( args[0], context_param_name, UnboundInitLoggerContext, default=UnboundInitLoggerContext(logger_config=None, pipeline_def=None), ) return logger_invocation_result(self, context) else: if context_param_name not in kwargs: raise DagsterInvalidInvocationError( f"Logger initialization expected argument '{context_param_name}'." ) context = check.opt_inst_param( kwargs[context_param_name], context_param_name, UnboundInitLoggerContext, default=UnboundInitLoggerContext(logger_config=None, pipeline_def=None), ) return logger_invocation_result(self, context) @property def logger_fn(self) -> "InitLoggerFunction": return self._logger_fn @property def config_schema(self) -> Any: return self._config_schema @property def description(self) -> Optional[str]: return self._description def copy_for_configured( self, description: Optional[str], config_schema: Any, _ ) -> "LoggerDefinition": return LoggerDefinition( config_schema=config_schema, description=description or self.description, logger_fn=self.logger_fn, )
[docs]def logger( config_schema: Any = None, description: Optional[str] = None ) -> Union["LoggerDefinition", Callable[["InitLoggerFunction"], "LoggerDefinition"]]: """Define a logger. The decorated function should accept an :py:class:`InitLoggerContext` and return an instance of :py:class:`python:logging.Logger`. This function will become the ``logger_fn`` of an underlying :py:class:`LoggerDefinition`. Args: config_schema (Optional[ConfigSchema]): The schema for the config. Configuration data available in `init_context.logger_config`. If not set, Dagster will accept any config provided. description (Optional[str]): A human-readable description of the logger. """ # This case is for when decorator is used bare, without arguments. # E.g. @logger versus @logger() if callable(config_schema) and not is_callable_valid_config_arg(config_schema): return LoggerDefinition(logger_fn=config_schema) def _wrap(logger_fn: "InitLoggerFunction") -> "LoggerDefinition": return LoggerDefinition( logger_fn=logger_fn, config_schema=config_schema, description=description, ) return _wrap
[docs]def build_init_logger_context( logger_config: Any = None, pipeline_def: Optional["PipelineDefinition"] = None, ) -> "UnboundInitLoggerContext": """Builds logger initialization context from provided parameters. This function can be used to provide the context argument to the invocation of a logger definition. Args: logger_config (Any): The config to provide during initialization of logger. pipeline_def (Optional[PipelineDefinition]): The pipeline definition that the logger will be used with. Examples: .. code-block:: python context = build_init_logger_context() logger_to_init(context) """ from dagster.core.execution.context.logger import UnboundInitLoggerContext from dagster.core.definitions import PipelineDefinition check.opt_inst_param(pipeline_def, "pipeline_def", PipelineDefinition) return UnboundInitLoggerContext(logger_config=logger_config, pipeline_def=pipeline_def)