class LoggerCreator:
"""Create Dagster logger definitions from Kedro-Dagster configuration.
Parameters
----------
dagster_config : KedroDagsterConfig
Parsed Kedro-Dagster config containing logger entries.
See Also
--------
`kedro_dagster.dagster.ExecutorCreator` :
Creates executor definitions from configuration.
`kedro_dagster.dagster.ScheduleCreator` :
Creates schedule definitions from configuration.
`kedro_dagster.config.logging.LoggerOptions` :
Logger option model.
"""
def __init__(self, dagster_config: "KedroDagsterConfig"):
self._dagster_config = dagster_config
def _get_logger_definition(self, logger_name: str) -> dg.LoggerDefinition:
"""Create a Dagster logger definition from the configuration.
Parameters
----------
logger_name : str
Name of the logger.
Returns
-------
LoggerDefinition
Dagster logger definition.
See Also
--------
`kedro_dagster.dagster.LoggerCreator.create_loggers` :
Entry point that calls this method for each configured logger.
"""
def _resolve_reference(ref: Any) -> Any:
"""Resolve a string with "module.ClassName", "module:function_name", or "ext://module.attr"."""
if isinstance(ref, str):
# Handle ext:// protocol used in logging configurations
if ref.startswith("ext://"):
ref = ref[6:] # Remove "ext://" prefix
module_path, _, attr = ref.rpartition(".")
if module_path:
module = importlib.import_module(module_path)
return getattr(module, attr)
raise TypeError(f"Unable to resolve reference {ref!r}")
def dagster_logger(context: dg.InitLoggerContext) -> logging.Logger:
"""Build a stdlib logger from the Dagster logger config."""
# Use the provided config directly instead of dynamic schema
config_data = dict(context.logger_config)
level = config_data.get("log_level", "INFO").upper()
klass = logging.getLoggerClass()
logger_ = klass(logger_name, level=level)
# Optionally clear existing handlers to prevent duplicates
for h in list(logger_.handlers):
logger_.removeHandler(h)
# Build formatter registry
default_formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
formatter_registry: dict[str, logging.Formatter] = {}
if config_data.get("formatters"):
for fname, fcfg in config_data["formatters"].items():
if "()" in fcfg:
# Callable given to create formatter
formatter_callable = _resolve_reference(fcfg["()"])
# remove the special key for constructor params
init_kwargs = {k: v for k, v in fcfg.items() if k != "()"}
fmt_inst = formatter_callable(**init_kwargs)
elif "class" in fcfg:
# Assume class path given in "class" key
cls_path = fcfg.get("class")
formatter_cls = _resolve_reference(cls_path)
init_kwargs = {k: v for k, v in fcfg.items() if k != "class"}
fmt_inst = formatter_cls(**init_kwargs)
else:
# Use standard logging.Formatter
fmt_str = fcfg.get("format", None)
datefmt = fcfg.get("datefmt", None)
style = fcfg.get("style", "%")
fmt_inst = logging.Formatter(fmt_str, datefmt=datefmt, style=style)
formatter_registry[fname] = fmt_inst
# Build filter registry
filter_registry: dict[str, logging.Filter] = {}
if config_data.get("filters"):
for fname, fcfg in config_data["filters"].items():
if "()" in fcfg:
filter_callable = _resolve_reference(fcfg["()"])
init_kwargs = {k: v for (k, v) in fcfg.items() if k != "()"}
filter_inst = filter_callable(**init_kwargs)
else:
# assume class path in "class" key
cls_path = fcfg.get("class")
filter_cls = _resolve_reference(cls_path)
init_kwargs = {k: v for (k, v) in fcfg.items() if k != "class"}
filter_inst = filter_cls(**init_kwargs)
filter_registry[fname] = filter_inst
# Build handlers
if config_data.get("handlers"):
for hcfg in config_data["handlers"]:
fmt_ref = hcfg.get("formatter", None)
filter_refs = hcfg.get("filters", [])
h_level = hcfg.get("level", level).upper()
# Resolve handler class
if "()" in hcfg:
handler_callable = _resolve_reference(hcfg["()"])
init_kwargs = {k: v for (k, v) in hcfg.items() if k != "()"}
# Resolve stream references like "ext://sys.stdout"
if "stream" in init_kwargs and isinstance(init_kwargs["stream"], str):
init_kwargs["stream"] = _resolve_reference(init_kwargs["stream"])
handler_inst = handler_callable(**init_kwargs)
else:
cls_path = hcfg.get("class", "logging.StreamHandler")
handler_cls = _resolve_reference(cls_path)
init_kwargs = {
k: v for (k, v) in hcfg.items() if k not in ("class", "level", "formatter", "filters")
}
# Resolve stream references like "ext://sys.stdout"
if "stream" in init_kwargs and isinstance(init_kwargs["stream"], str):
init_kwargs["stream"] = _resolve_reference(init_kwargs["stream"])
handler_inst = handler_cls(**init_kwargs)
# Set handler level
handler_inst.setLevel(h_level)
# Attach formatter
if fmt_ref is not None and formatter_registry.get(fmt_ref):
handler_inst.setFormatter(formatter_registry[fmt_ref])
elif fmt_ref is None:
# No formatter specified for this handler, attach default
handler_inst.setFormatter(default_formatter)
# Attach filters
for fref in filter_refs:
if fref in filter_registry:
handler_inst.addFilter(filter_registry[fref])
logger_.addHandler(handler_inst)
else:
# No handlers specified, default to StreamHandler
sh = logging.StreamHandler(stream=sys.stdout)
sh.setLevel(level)
sh.setFormatter(default_formatter)
logger_.addHandler(sh)
return logger_
config_schema = {
"log_level": dg.Field(str, default_value="INFO"),
"handlers": dg.Field(list, is_required=False),
"formatters": dg.Field(dict, is_required=False),
"filters": dg.Field(dict, is_required=False),
}
return dg.LoggerDefinition(
dagster_logger,
description=f"Logger definition `{logger_name}`.",
config_schema=config_schema,
)
def create_loggers(self) -> dict[str, dg.LoggerDefinition]:
"""Create logger definitions from the configuration.
Returns
-------
dict[str, LoggerDefinition]
Mapping of fully-qualified logger name to definition.
See Also
--------
`kedro_dagster.dagster.LoggerCreator._get_logger_definition` :
Creates a single logger definition.
`kedro_dagster.config.logging.LoggerOptions` :
Logger option model used as input.
"""
LOGGER.info("Creating Dagster loggers...")
named_loggers = {}
if self._dagster_config.loggers:
for logger_name in self._dagster_config.loggers:
LOGGER.debug(f"Creating logger '{logger_name}'...")
logger = self._get_logger_definition(logger_name)
named_loggers[logger_name] = logger
# Iterate over jobs to handle job-specific logger configurations
if hasattr(self._dagster_config, "jobs") and self._dagster_config.jobs:
available_logger_names = list(named_loggers.keys())
for job_name, job_config in self._dagster_config.jobs.items():
if hasattr(job_config, "loggers") and job_config.loggers:
LOGGER.debug(f"Processing {len(job_config.loggers)} loggers for job '{job_name}'...")
for idx, logger_config in enumerate(job_config.loggers):
if isinstance(logger_config, str):
# If logger_config is a string, check it exists in available_logger_names
if logger_config not in available_logger_names:
raise ValueError(
f"Logger '{logger_config}' referenced in job '{job_name}' "
f"not found in available loggers: {available_logger_names}"
)
else:
# If logger_config is not a string, create logger definition named after the job
job_logger_name = f"{job_name}__logger_{idx}"
logger = self._get_logger_definition(job_logger_name)
named_loggers[job_logger_name] = logger
LOGGER.debug(f"Created {len(named_loggers)} logger(s)")
return named_loggers