Skip to content

LoggerCreator

kedro_dagster.dagster.LoggerCreator

Create Dagster logger definitions from Kedro-Dagster configuration.

Parameters

Name Type Description Default
dagster_config KedroDagsterConfig

Parsed Kedro-Dagster config containing logger entries.

required

See Also

kedro_dagster.dagster.ExecutorCreator : Creates executor definitions from configuration. kedro_dagster.dagster.ScheduleCreator : Creates schedule definitions from configuration. kedro_dagster.config.logging.LoggerOptions : Logger option model.

Source Code

Show/Hide source
class LoggerCreator:
    """Create Dagster logger definitions from Kedro-Dagster configuration.

    Parameters
    ----------
    dagster_config : KedroDagsterConfig
        Parsed Kedro-Dagster config containing logger entries.

    See Also
    --------
    `kedro_dagster.dagster.ExecutorCreator` :
        Creates executor definitions from configuration.
    `kedro_dagster.dagster.ScheduleCreator` :
        Creates schedule definitions from configuration.
    `kedro_dagster.config.logging.LoggerOptions` :
        Logger option model.
    """

    def __init__(self, dagster_config: "KedroDagsterConfig"):
        self._dagster_config = dagster_config

    def _get_logger_definition(self, logger_name: str) -> dg.LoggerDefinition:
        """Create a Dagster logger definition from the configuration.

        Parameters
        ----------
        logger_name : str
            Name of the logger.

        Returns
        -------
        LoggerDefinition
            Dagster logger definition.

        See Also
        --------
        `kedro_dagster.dagster.LoggerCreator.create_loggers` :
            Entry point that calls this method for each configured logger.
        """

        def _resolve_reference(ref: Any) -> Any:
            """Resolve a string with "module.ClassName", "module:function_name", or "ext://module.attr"."""
            if isinstance(ref, str):
                # Handle ext:// protocol used in logging configurations
                if ref.startswith("ext://"):
                    ref = ref[6:]  # Remove "ext://" prefix

                module_path, _, attr = ref.rpartition(".")
                if module_path:
                    module = importlib.import_module(module_path)
                    return getattr(module, attr)

            raise TypeError(f"Unable to resolve reference {ref!r}")

        def dagster_logger(context: dg.InitLoggerContext) -> logging.Logger:
            """Build a stdlib logger from the Dagster logger config."""
            # Use the provided config directly instead of dynamic schema
            config_data = dict(context.logger_config)
            level = config_data.get("log_level", "INFO").upper()

            klass = logging.getLoggerClass()
            logger_ = klass(logger_name, level=level)

            # Optionally clear existing handlers to prevent duplicates
            for h in list(logger_.handlers):
                logger_.removeHandler(h)

            # Build formatter registry
            default_formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
            formatter_registry: dict[str, logging.Formatter] = {}
            if config_data.get("formatters"):
                for fname, fcfg in config_data["formatters"].items():
                    if "()" in fcfg:
                        # Callable given to create formatter
                        formatter_callable = _resolve_reference(fcfg["()"])
                        # remove the special key for constructor params
                        init_kwargs = {k: v for k, v in fcfg.items() if k != "()"}
                        fmt_inst = formatter_callable(**init_kwargs)
                    elif "class" in fcfg:
                        # Assume class path given in "class" key
                        cls_path = fcfg.get("class")
                        formatter_cls = _resolve_reference(cls_path)
                        init_kwargs = {k: v for k, v in fcfg.items() if k != "class"}
                        fmt_inst = formatter_cls(**init_kwargs)
                    else:
                        # Use standard logging.Formatter
                        fmt_str = fcfg.get("format", None)
                        datefmt = fcfg.get("datefmt", None)
                        style = fcfg.get("style", "%")
                        fmt_inst = logging.Formatter(fmt_str, datefmt=datefmt, style=style)
                    formatter_registry[fname] = fmt_inst

            # Build filter registry
            filter_registry: dict[str, logging.Filter] = {}
            if config_data.get("filters"):
                for fname, fcfg in config_data["filters"].items():
                    if "()" in fcfg:
                        filter_callable = _resolve_reference(fcfg["()"])
                        init_kwargs = {k: v for (k, v) in fcfg.items() if k != "()"}
                        filter_inst = filter_callable(**init_kwargs)
                    else:
                        # assume class path in "class" key
                        cls_path = fcfg.get("class")
                        filter_cls = _resolve_reference(cls_path)
                        init_kwargs = {k: v for (k, v) in fcfg.items() if k != "class"}
                        filter_inst = filter_cls(**init_kwargs)
                    filter_registry[fname] = filter_inst

            # Build handlers
            if config_data.get("handlers"):
                for hcfg in config_data["handlers"]:
                    fmt_ref = hcfg.get("formatter", None)
                    filter_refs = hcfg.get("filters", [])
                    h_level = hcfg.get("level", level).upper()
                    # Resolve handler class
                    if "()" in hcfg:
                        handler_callable = _resolve_reference(hcfg["()"])
                        init_kwargs = {k: v for (k, v) in hcfg.items() if k != "()"}
                        # Resolve stream references like "ext://sys.stdout"
                        if "stream" in init_kwargs and isinstance(init_kwargs["stream"], str):
                            init_kwargs["stream"] = _resolve_reference(init_kwargs["stream"])
                        handler_inst = handler_callable(**init_kwargs)
                    else:
                        cls_path = hcfg.get("class", "logging.StreamHandler")
                        handler_cls = _resolve_reference(cls_path)
                        init_kwargs = {
                            k: v for (k, v) in hcfg.items() if k not in ("class", "level", "formatter", "filters")
                        }
                        # Resolve stream references like "ext://sys.stdout"
                        if "stream" in init_kwargs and isinstance(init_kwargs["stream"], str):
                            init_kwargs["stream"] = _resolve_reference(init_kwargs["stream"])
                        handler_inst = handler_cls(**init_kwargs)

                    # Set handler level
                    handler_inst.setLevel(h_level)

                    # Attach formatter
                    if fmt_ref is not None and formatter_registry.get(fmt_ref):
                        handler_inst.setFormatter(formatter_registry[fmt_ref])
                    elif fmt_ref is None:
                        # No formatter specified for this handler, attach default
                        handler_inst.setFormatter(default_formatter)

                    # Attach filters
                    for fref in filter_refs:
                        if fref in filter_registry:
                            handler_inst.addFilter(filter_registry[fref])

                    logger_.addHandler(handler_inst)
            else:
                # No handlers specified, default to StreamHandler
                sh = logging.StreamHandler(stream=sys.stdout)
                sh.setLevel(level)
                sh.setFormatter(default_formatter)
                logger_.addHandler(sh)

            return logger_

        config_schema = {
            "log_level": dg.Field(str, default_value="INFO"),
            "handlers": dg.Field(list, is_required=False),
            "formatters": dg.Field(dict, is_required=False),
            "filters": dg.Field(dict, is_required=False),
        }

        return dg.LoggerDefinition(
            dagster_logger,
            description=f"Logger definition `{logger_name}`.",
            config_schema=config_schema,
        )

    def create_loggers(self) -> dict[str, dg.LoggerDefinition]:
        """Create logger definitions from the configuration.

        Returns
        -------
        dict[str, LoggerDefinition]
            Mapping of fully-qualified logger name to definition.

        See Also
        --------
        `kedro_dagster.dagster.LoggerCreator._get_logger_definition` :
            Creates a single logger definition.
        `kedro_dagster.config.logging.LoggerOptions` :
            Logger option model used as input.
        """
        LOGGER.info("Creating Dagster loggers...")
        named_loggers = {}
        if self._dagster_config.loggers:
            for logger_name in self._dagster_config.loggers:
                LOGGER.debug(f"Creating logger '{logger_name}'...")
                logger = self._get_logger_definition(logger_name)
                named_loggers[logger_name] = logger

        # Iterate over jobs to handle job-specific logger configurations
        if hasattr(self._dagster_config, "jobs") and self._dagster_config.jobs:
            available_logger_names = list(named_loggers.keys())
            for job_name, job_config in self._dagster_config.jobs.items():
                if hasattr(job_config, "loggers") and job_config.loggers:
                    LOGGER.debug(f"Processing {len(job_config.loggers)} loggers for job '{job_name}'...")
                    for idx, logger_config in enumerate(job_config.loggers):
                        if isinstance(logger_config, str):
                            # If logger_config is a string, check it exists in available_logger_names
                            if logger_config not in available_logger_names:
                                raise ValueError(
                                    f"Logger '{logger_config}' referenced in job '{job_name}' "
                                    f"not found in available loggers: {available_logger_names}"
                                )
                        else:
                            # If logger_config is not a string, create logger definition named after the job
                            job_logger_name = f"{job_name}__logger_{idx}"
                            logger = self._get_logger_definition(job_logger_name)
                            named_loggers[job_logger_name] = logger

        LOGGER.debug(f"Created {len(named_loggers)} logger(s)")
        return named_loggers

Methods

create_loggers()

Create logger definitions from the configuration.

Returns
Type Description
dict[str, LoggerDefinition]

Mapping of fully-qualified logger name to definition.

See Also

kedro_dagster.dagster.LoggerCreator._get_logger_definition : Creates a single logger definition. kedro_dagster.config.logging.LoggerOptions : Logger option model used as input.

Source Code
Show/Hide source
def create_loggers(self) -> dict[str, dg.LoggerDefinition]:
    """Create logger definitions from the configuration.

    Returns
    -------
    dict[str, LoggerDefinition]
        Mapping of fully-qualified logger name to definition.

    See Also
    --------
    `kedro_dagster.dagster.LoggerCreator._get_logger_definition` :
        Creates a single logger definition.
    `kedro_dagster.config.logging.LoggerOptions` :
        Logger option model used as input.
    """
    LOGGER.info("Creating Dagster loggers...")
    named_loggers = {}
    if self._dagster_config.loggers:
        for logger_name in self._dagster_config.loggers:
            LOGGER.debug(f"Creating logger '{logger_name}'...")
            logger = self._get_logger_definition(logger_name)
            named_loggers[logger_name] = logger

    # Iterate over jobs to handle job-specific logger configurations
    if hasattr(self._dagster_config, "jobs") and self._dagster_config.jobs:
        available_logger_names = list(named_loggers.keys())
        for job_name, job_config in self._dagster_config.jobs.items():
            if hasattr(job_config, "loggers") and job_config.loggers:
                LOGGER.debug(f"Processing {len(job_config.loggers)} loggers for job '{job_name}'...")
                for idx, logger_config in enumerate(job_config.loggers):
                    if isinstance(logger_config, str):
                        # If logger_config is a string, check it exists in available_logger_names
                        if logger_config not in available_logger_names:
                            raise ValueError(
                                f"Logger '{logger_config}' referenced in job '{job_name}' "
                                f"not found in available loggers: {available_logger_names}"
                            )
                    else:
                        # If logger_config is not a string, create logger definition named after the job
                        job_logger_name = f"{job_name}__logger_{idx}"
                        logger = self._get_logger_definition(job_logger_name)
                        named_loggers[job_logger_name] = logger

    LOGGER.debug(f"Created {len(named_loggers)} logger(s)")
    return named_loggers