Skip to content

ExecutorCreator

kedro_dagster.dagster.ExecutorCreator

Create Dagster executor definitions from Kedro-Dagster configuration.

Parameters

Name Type Description Default
dagster_config KedroDagsterConfig

Parsed Kedro-Dagster config containing executor entries.

required

See Also

kedro_dagster.dagster.ScheduleCreator : Creates schedule definitions from configuration. kedro_dagster.dagster.LoggerCreator : Creates logger definitions from configuration. kedro_dagster.config.execution.ExecutorOptions : Union of supported executor option models.

Source Code

Show/Hide source
class ExecutorCreator:
    """Create Dagster executor definitions from Kedro-Dagster configuration.

    Parameters
    ----------
    dagster_config : KedroDagsterConfig
        Parsed Kedro-Dagster config containing executor entries.

    See Also
    --------
    `kedro_dagster.dagster.ScheduleCreator` :
        Creates schedule definitions from configuration.
    `kedro_dagster.dagster.LoggerCreator` :
        Creates logger definitions from configuration.
    `kedro_dagster.config.execution.ExecutorOptions` :
        Union of supported executor option models.
    """

    _OPTION_EXECUTOR_MAP = {
        InProcessExecutorOptions: dg.in_process_executor,
        MultiprocessExecutorOptions: dg.multiprocess_executor,
    }

    _EXECUTOR_CONFIGS: list[tuple[type[ExecutorOptions], str, str]] = [
        (CeleryExecutorOptions, "dagster_celery", "celery_executor"),
        (CeleryDockerExecutorOptions, "dagster_celery_docker", "celery_docker_executor"),
        (CeleryK8sJobExecutorOptions, "dagster_celery_k8s", "celery_k8s_job_executor"),
        (DaskExecutorOptions, "dagster_dask", "dask_executor"),
        (DockerExecutorOptions, "dagster_docker", "docker_executor"),
        (K8sJobExecutorOptions, "dagster_k8s", "k8s_job_executor"),
    ]

    def __init__(self, dagster_config: "KedroDagsterConfig"):
        self._dagster_config = dagster_config

    def register_executor(self, executor_option: type[ExecutorOptions], executor: dg.ExecutorDefinition) -> None:
        """Register a mapping between an options model and a Dagster executor factory.

        Parameters
        ----------
        executor_option : type[ExecutorOptions]
            Pydantic model type acting as the key.
        executor : ExecutorDefinition
            Dagster executor factory to use for that key.

        See Also
        --------
        `kedro_dagster.dagster.ExecutorCreator.create_executors` :
            Consumes registered executor mappings.
        """
        self._OPTION_EXECUTOR_MAP[executor_option] = executor

    def create_executors(self) -> dict[str, dg.ExecutorDefinition]:
        """Instantiate executor definitions declared in the configuration.

        Returns
        -------
        dict[str, ExecutorDefinition]
            Mapping of executor name to configured executor.

        See Also
        --------
        `kedro_dagster.dagster.ExecutorCreator.register_executor` :
            Registers new executor types before creation.
        """
        LOGGER.info("Creating Dagster executors...")
        # Register all available executors dynamically
        for executor_option, module_name, executor_name in self._EXECUTOR_CONFIGS:
            try:
                module = __import__(module_name, fromlist=[executor_name])
                executor = getattr(module, executor_name)
                self.register_executor(executor_option, executor)
            except ImportError:
                pass

        named_executors = {}

        # First, create executors from the global executors configuration
        if self._dagster_config.executors is not None:
            for executor_name, executor_config in self._dagster_config.executors.items():
                LOGGER.debug(f"Creating executor '{executor_name}'...")
                # Make use of the executor map to create the executor
                executor = self._OPTION_EXECUTOR_MAP.get(type(executor_config), None)
                if executor is None:
                    msg = (
                        f"Executor '{executor_name}' not supported. "
                        f"Please use one of the following executors: "
                        f"{', '.join([str(k) for k in self._OPTION_EXECUTOR_MAP])}"
                    )
                    LOGGER.error(msg)
                    raise ValueError(msg)
                executor = executor.configured(executor_config.model_dump())
                named_executors[executor_name] = executor

        # Next, iterate over jobs to handle inline executor configurations
        if self._dagster_config.jobs is not None:
            available_executor_names = set(named_executors.keys())

            for job_name, job_config in self._dagster_config.jobs.items():
                if job_config.executor is not None:
                    LOGGER.debug(f"Processing executor configuration for job '{job_name}'...")
                    if isinstance(job_config.executor, str):
                        # String reference - validate it exists in available executors
                        if job_config.executor not in available_executor_names:
                            msg = (
                                f"Executor named '{job_config.executor}' for job '{job_name}' not found in available executors. "
                                f"Available executors: {sorted(available_executor_names)}"
                            )
                            LOGGER.error(msg)
                            raise ValueError(msg)
                    else:
                        # Inline executor configuration - create executor definition
                        executor = self._OPTION_EXECUTOR_MAP.get(type(job_config.executor), None)
                        if executor is None:
                            msg = (
                                f"Executor type `{type(job_config.executor)}` for job '{job_name}' not supported. "
                                f"Please use one of the following executor types: "
                                f"{', '.join([str(k) for k in self._OPTION_EXECUTOR_MAP])}"
                            )
                            LOGGER.error(msg)
                            raise ValueError(msg)

                        # Create the executor with job-specific naming
                        executor_name = f"{job_name}__executor"
                        executor_def = executor.configured(job_config.executor.model_dump())
                        named_executors[executor_name] = executor_def

        LOGGER.debug(f"Created {len(named_executors)} executor(s)")
        return named_executors

Methods

register_executor(executor_option, executor)

Register a mapping between an options model and a Dagster executor factory.

Parameters
Name Type Description Default
executor_option type[ExecutorOptions]

Pydantic model type acting as the key.

required
executor ExecutorDefinition

Dagster executor factory to use for that key.

required
See Also

kedro_dagster.dagster.ExecutorCreator.create_executors : Consumes registered executor mappings.

Source Code
Show/Hide source
def register_executor(self, executor_option: type[ExecutorOptions], executor: dg.ExecutorDefinition) -> None:
    """Register a mapping between an options model and a Dagster executor factory.

    Parameters
    ----------
    executor_option : type[ExecutorOptions]
        Pydantic model type acting as the key.
    executor : ExecutorDefinition
        Dagster executor factory to use for that key.

    See Also
    --------
    `kedro_dagster.dagster.ExecutorCreator.create_executors` :
        Consumes registered executor mappings.
    """
    self._OPTION_EXECUTOR_MAP[executor_option] = executor

create_executors()

Instantiate executor definitions declared in the configuration.

Returns
Type Description
dict[str, ExecutorDefinition]

Mapping of executor name to configured executor.

See Also

kedro_dagster.dagster.ExecutorCreator.register_executor : Registers new executor types before creation.

Source Code
Show/Hide source
def create_executors(self) -> dict[str, dg.ExecutorDefinition]:
    """Instantiate executor definitions declared in the configuration.

    Returns
    -------
    dict[str, ExecutorDefinition]
        Mapping of executor name to configured executor.

    See Also
    --------
    `kedro_dagster.dagster.ExecutorCreator.register_executor` :
        Registers new executor types before creation.
    """
    LOGGER.info("Creating Dagster executors...")
    # Register all available executors dynamically
    for executor_option, module_name, executor_name in self._EXECUTOR_CONFIGS:
        try:
            module = __import__(module_name, fromlist=[executor_name])
            executor = getattr(module, executor_name)
            self.register_executor(executor_option, executor)
        except ImportError:
            pass

    named_executors = {}

    # First, create executors from the global executors configuration
    if self._dagster_config.executors is not None:
        for executor_name, executor_config in self._dagster_config.executors.items():
            LOGGER.debug(f"Creating executor '{executor_name}'...")
            # Make use of the executor map to create the executor
            executor = self._OPTION_EXECUTOR_MAP.get(type(executor_config), None)
            if executor is None:
                msg = (
                    f"Executor '{executor_name}' not supported. "
                    f"Please use one of the following executors: "
                    f"{', '.join([str(k) for k in self._OPTION_EXECUTOR_MAP])}"
                )
                LOGGER.error(msg)
                raise ValueError(msg)
            executor = executor.configured(executor_config.model_dump())
            named_executors[executor_name] = executor

    # Next, iterate over jobs to handle inline executor configurations
    if self._dagster_config.jobs is not None:
        available_executor_names = set(named_executors.keys())

        for job_name, job_config in self._dagster_config.jobs.items():
            if job_config.executor is not None:
                LOGGER.debug(f"Processing executor configuration for job '{job_name}'...")
                if isinstance(job_config.executor, str):
                    # String reference - validate it exists in available executors
                    if job_config.executor not in available_executor_names:
                        msg = (
                            f"Executor named '{job_config.executor}' for job '{job_name}' not found in available executors. "
                            f"Available executors: {sorted(available_executor_names)}"
                        )
                        LOGGER.error(msg)
                        raise ValueError(msg)
                else:
                    # Inline executor configuration - create executor definition
                    executor = self._OPTION_EXECUTOR_MAP.get(type(job_config.executor), None)
                    if executor is None:
                        msg = (
                            f"Executor type `{type(job_config.executor)}` for job '{job_name}' not supported. "
                            f"Please use one of the following executor types: "
                            f"{', '.join([str(k) for k in self._OPTION_EXECUTOR_MAP])}"
                        )
                        LOGGER.error(msg)
                        raise ValueError(msg)

                    # Create the executor with job-specific naming
                    executor_name = f"{job_name}__executor"
                    executor_def = executor.configured(job_config.executor.model_dump())
                    named_executors[executor_name] = executor_def

    LOGGER.debug(f"Created {len(named_executors)} executor(s)")
    return named_executors