Skip to content

KedroDagsterConfig

kedro_dagster.config.models.KedroDagsterConfig

Bases: BaseModel

Main configuration class representing the dagster.yml structure.

Parameters

Name Type Description Default
loggers dict[str, LoggerOptions] or None

Mapping of logger names to logger options.

required
executors dict[str, ExecutorOptions] or None

Mapping of executor names to executor options.

required
schedules dict[str, ScheduleOptions] or None

Mapping of schedule names to schedule options.

required
jobs dict[str, JobOptions] or None

Mapping of job names to job options.

required

Examples

# conf/base/dagster.yml
executors:
  sequential:
    in_process: {}

schedules:
  daily:
    cron_schedule: "0 6 * * *"
    execution_timezone: "UTC"

jobs:
  __default__:
    pipeline:
      pipeline_name: __default__
    executor: sequential
    schedule: daily

See Also

kedro_dagster.config.models.get_dagster_config : Loads and returns an instance of this class.

Source Code

Show/Hide source
class KedroDagsterConfig(BaseModel):
    """Main configuration class representing the ``dagster.yml`` structure.

    Parameters
    ----------
    loggers : dict[str, LoggerOptions] or None
        Mapping of logger names to logger options.
    executors : dict[str, ExecutorOptions] or None
        Mapping of executor names to executor options.
    schedules : dict[str, ScheduleOptions] or None
        Mapping of schedule names to schedule options.
    jobs : dict[str, JobOptions] or None
        Mapping of job names to job options.

    Examples
    --------
    ```yaml
    # conf/base/dagster.yml
    executors:
      sequential:
        in_process: {}

    schedules:
      daily:
        cron_schedule: "0 6 * * *"
        execution_timezone: "UTC"

    jobs:
      __default__:
        pipeline:
          pipeline_name: __default__
        executor: sequential
        schedule: daily
    ```

    See Also
    --------
    `kedro_dagster.config.models.get_dagster_config` :
        Loads and returns an instance of this class.
    """

    model_config = ConfigDict(validate_assignment=True, extra="forbid")

    loggers: dict[str, LoggerOptions] | None = None
    executors: dict[str, ExecutorOptions] | None = None
    schedules: dict[str, ScheduleOptions] | None = None
    jobs: dict[str, JobOptions] | None = None

    @model_validator(mode="before")
    @classmethod
    def validate_executors(cls, values: dict[str, Any]) -> dict[str, Any]:
        """Parse raw executor config dicts into typed executor option models.

        Parameters
        ----------
        values : dict[str, Any]
            Raw model data before validation.

        Returns
        -------
        dict[str, Any]
            Model data with executor configs replaced by option instances.

        Raises
        ------
        ValueError
            If an executor type is not recognized.
        """
        executors = values.get("executors") or {}

        parsed_executors = {}
        for name, executor_config in executors.items():
            if "in_process" in executor_config:
                executor_name = "in_process"
            elif "multiprocess" in executor_config:
                executor_name = "multiprocess"
            elif "k8s_job_executor" in executor_config:
                executor_name = "k8s_job_executor"
            elif "docker_executor" in executor_config:
                executor_name = "docker_executor"
            else:
                msg = f"Unknown executor type in {name}"
                LOGGER.error(msg)
                raise ValueError(msg)

            executor_options_class = EXECUTOR_MAP[executor_name]
            executor_options_params = executor_config[executor_name] or {}
            parsed_executors[name] = executor_options_class(**executor_options_params)

        values["executors"] = parsed_executors
        return values

Methods

validate_executors(values) classmethod

Parse raw executor config dicts into typed executor option models.

Parameters
Name Type Description Default
values dict[str, Any]

Raw model data before validation.

required
Returns
Type Description
dict[str, Any]

Model data with executor configs replaced by option instances.

Raises
Type Description
ValueError

If an executor type is not recognized.

Source Code
Show/Hide source
@model_validator(mode="before")
@classmethod
def validate_executors(cls, values: dict[str, Any]) -> dict[str, Any]:
    """Parse raw executor config dicts into typed executor option models.

    Parameters
    ----------
    values : dict[str, Any]
        Raw model data before validation.

    Returns
    -------
    dict[str, Any]
        Model data with executor configs replaced by option instances.

    Raises
    ------
    ValueError
        If an executor type is not recognized.
    """
    executors = values.get("executors") or {}

    parsed_executors = {}
    for name, executor_config in executors.items():
        if "in_process" in executor_config:
            executor_name = "in_process"
        elif "multiprocess" in executor_config:
            executor_name = "multiprocess"
        elif "k8s_job_executor" in executor_config:
            executor_name = "k8s_job_executor"
        elif "docker_executor" in executor_config:
            executor_name = "docker_executor"
        else:
            msg = f"Unknown executor type in {name}"
            LOGGER.error(msg)
            raise ValueError(msg)

        executor_options_class = EXECUTOR_MAP[executor_name]
        executor_options_params = executor_config[executor_name] or {}
        parsed_executors[name] = executor_options_class(**executor_options_params)

    values["executors"] = parsed_executors
    return values