Skip to content

KedroProjectTranslator

kedro_dagster.translator.KedroProjectTranslator

Translate a Kedro project into a Dagster code location.

Parameters

Name Type Description Default
env str

Kedro environment to use.

required
project_path Path or None

Path to the Kedro project. If omitted, auto-discovered.

None
conf_source str or None

Optional path to the Kedro configuration source directory.

None

See Also

kedro_dagster.translator.DagsterCodeLocation : Container returned by to_dagster.

Source Code

Show/Hide source
class KedroProjectTranslator:
    """Translate a Kedro project into a Dagster code location.

    Parameters
    ----------
    env : str
        Kedro environment to use.
    project_path : Path or None, optional
        Path to the Kedro project. If omitted, auto-discovered.
    conf_source : str or None, optional
        Optional path to the Kedro configuration source directory.

    See Also
    --------
    `kedro_dagster.translator.DagsterCodeLocation` :
        Container returned by `to_dagster`.
    """

    def __init__(
        self,
        env: str,
        project_path: Path | None = None,
        conf_source: str | None = None,
    ) -> None:
        self._project_path: Path
        if project_path is None:
            self._project_path = find_kedro_project(Path.cwd()) or Path.cwd()
        else:
            self._project_path = project_path

        self._env: str = env

        self.initialize_kedro(conf_source=conf_source)

    def initialize_kedro(self, conf_source: str | None = None) -> None:
        """Initialize Kedro context and pipelines for translation.

        This bootstraps the Kedro project, starts a session, loads the context,
        and discovers pipelines to prepare for translation.

        Parameters
        ----------
        conf_source : str or None, optional
            Optional configuration source directory.

        See Also
        --------
        `kedro_dagster.translator.KedroProjectTranslator.to_dagster` :
            Orchestrates translation after initialization.
        `kedro_dagster.translator.KedroProjectTranslator.get_defined_pipelines` :
            Uses pipelines discovered during initialization.
        """
        # Lazy import to avoid circular dependency
        from kedro.framework.project import find_pipelines
        from kedro.framework.session import KedroSession
        from kedro.framework.startup import bootstrap_project

        LOGGER.info("Initializing Kedro project...")

        LOGGER.debug(f"Bootstrapping Kedro project at path: {self._project_path}")
        self._project_metadata = bootstrap_project(self._project_path)
        LOGGER.debug(f"Project name: {self._project_metadata.project_name}")

        LOGGER.debug(
            f"Creating Kedro session with project path: {self._project_path}, environment: {self._env}, conf_source: {conf_source}"
        )
        self._session = KedroSession.create(
            project_path=self._project_path,
            env=self._env,
            conf_source=conf_source,
        )

        self._session_id = self._session.session_id
        LOGGER.debug(f"Session created with ID: {self._session_id}")

        LOGGER.info("Loading Kedro context...")
        self._context = self._session.load_context()

        LOGGER.info("Getting Kedro catalog...")
        self._catalog = self._context.catalog

        self._pipelines = find_pipelines()

        LOGGER.info("Kedro initialization complete.")

    def get_defined_pipelines(self, dagster_config: "BaseModel", translate_all: bool) -> list["Pipeline"]:
        """Resolve the list of pipelines to translate.

        Parameters
        ----------
        dagster_config : BaseModel
            Validated configuration for Kedro-Dagster.
        translate_all : bool
            If ``True``, translate all discovered pipelines; otherwise, only
            those referenced by jobs in the configuration.

        Returns
        -------
        list[Pipeline]
            Kedro pipelines to translate.

        See Also
        --------
        `kedro_dagster.utils.get_filter_params_dict` :
            Extracts filter parameters from pipeline config.
        `kedro_dagster.translator.KedroProjectTranslator.to_dagster` :
            Calls this to determine which pipelines to translate.
        """
        # Lazy import to avoid circular dependency
        from kedro.framework.project import find_pipelines, pipelines

        if translate_all:
            return list(find_pipelines().values())

        defined_pipelines = []
        for job_config in dagster_config.jobs.values():
            pipeline_config = job_config.pipeline.model_dump()

            pipeline_name = pipeline_config.get("pipeline_name", "__default__")
            filter_params = get_filter_params_dict(pipeline_config)
            pipeline = pipelines.get(pipeline_name).filter(**filter_params)
            defined_pipelines.append(pipeline)

        return defined_pipelines

    def to_dagster(self, translate_all: bool = False) -> DagsterCodeLocation:
        """Translate the Kedro project into a Dagster code location.

        Parameters
        ----------
        translate_all : bool, optional
            Whether to translate all pipelines or only those referenced
            in the ``dagster.yml`` configuration.

        Returns
        -------
        DagsterCodeLocation
            Translated Dagster code location composed of assets, jobs,
            resources, and more.

        See Also
        --------
        `kedro_dagster.translator.DagsterCodeLocation` :
            Container returned by this method.
        """
        LOGGER.info("Translating Kedro project into Dagster...")

        dagster_config = get_dagster_config(self._context)

        kedro_run_translator = KedroRunTranslator(
            context=self._context,
            catalog=self._catalog,
            project_path=str(self._project_path),
            env=self._env,
            run_id=self._session_id,
        )
        kedro_run_resource = kedro_run_translator.to_dagster(
            pipeline_name="__default__",
            filter_params={},
        )
        named_resources: dict[str, dg.ResourceDefinition] = {"kedro_run": kedro_run_resource}

        mlflow_config = None
        if is_mlflow_enabled():
            # Add MLflow resource only if MLflow is installed and configured on the context
            mlflow_config = getattr(self._context, "mlflow", None)
            if mlflow_config is not None:
                named_resources["mlflow"] = get_mlflow_resource_from_config(mlflow_config)
            else:
                LOGGER.info("MLflow is installed but not configured on the Kedro context; skipping MLflow resource.")

        self.logger_creator = LoggerCreator(dagster_config=dagster_config)
        named_loggers = self.logger_creator.create_loggers()

        defined_pipelines = self.get_defined_pipelines(dagster_config=dagster_config, translate_all=translate_all)
        self.catalog_translator = CatalogTranslator(
            catalog=self._catalog,
            pipelines=defined_pipelines,
            hook_manager=self._context._hook_manager,
            env=self._env,
        )
        named_io_managers, asset_partitions = self.catalog_translator.to_dagster()
        named_resources |= named_io_managers

        self.node_translator = NodeTranslator(
            pipelines=defined_pipelines,
            catalog=self._catalog,
            hook_manager=self._context._hook_manager,
            run_id=self._session_id,
            asset_partitions=asset_partitions,
            named_resources=named_resources,
            env=self._env,
            mlflow_config=mlflow_config,
        )
        named_op_factories, named_assets = self.node_translator.to_dagster()

        self.executor_creator = ExecutorCreator(dagster_config=dagster_config)
        named_executors = self.executor_creator.create_executors()

        self.pipeline_translator = PipelineTranslator(
            dagster_config=dagster_config,
            context=self._context,
            catalog=self._catalog,
            project_path=str(self._project_path),
            env=self._env,
            run_id=self._session_id,
            named_assets=named_assets,
            asset_partitions=asset_partitions,
            named_op_factories=named_op_factories,
            named_resources=named_resources,
            named_executors=named_executors,
            named_loggers=named_loggers,
            enable_mlflow=is_mlflow_enabled() and hasattr(self._context, "mlflow"),
        )
        named_jobs = self.pipeline_translator.to_dagster()

        self.schedule_creator = ScheduleCreator(dagster_config=dagster_config, named_jobs=named_jobs)
        named_schedules = self.schedule_creator.create_schedules()

        named_sensors = kedro_run_translator._translate_on_pipeline_error_hook(named_jobs=named_jobs)

        LOGGER.info("Kedro project successfully translated into Dagster.")

        return DagsterCodeLocation(
            named_resources=named_resources,
            named_assets=named_assets,
            named_jobs=named_jobs,
            named_executors=named_executors,
            named_schedules=named_schedules,
            named_sensors=named_sensors,
            named_loggers=named_loggers,
        )

Methods

initialize_kedro(conf_source=None)

Initialize Kedro context and pipelines for translation.

This bootstraps the Kedro project, starts a session, loads the context, and discovers pipelines to prepare for translation.

Parameters
Name Type Description Default
conf_source str or None

Optional configuration source directory.

None
See Also

kedro_dagster.translator.KedroProjectTranslator.to_dagster : Orchestrates translation after initialization. kedro_dagster.translator.KedroProjectTranslator.get_defined_pipelines : Uses pipelines discovered during initialization.

Source Code
Show/Hide source
def initialize_kedro(self, conf_source: str | None = None) -> None:
    """Initialize Kedro context and pipelines for translation.

    This bootstraps the Kedro project, starts a session, loads the context,
    and discovers pipelines to prepare for translation.

    Parameters
    ----------
    conf_source : str or None, optional
        Optional configuration source directory.

    See Also
    --------
    `kedro_dagster.translator.KedroProjectTranslator.to_dagster` :
        Orchestrates translation after initialization.
    `kedro_dagster.translator.KedroProjectTranslator.get_defined_pipelines` :
        Uses pipelines discovered during initialization.
    """
    # Lazy import to avoid circular dependency
    from kedro.framework.project import find_pipelines
    from kedro.framework.session import KedroSession
    from kedro.framework.startup import bootstrap_project

    LOGGER.info("Initializing Kedro project...")

    LOGGER.debug(f"Bootstrapping Kedro project at path: {self._project_path}")
    self._project_metadata = bootstrap_project(self._project_path)
    LOGGER.debug(f"Project name: {self._project_metadata.project_name}")

    LOGGER.debug(
        f"Creating Kedro session with project path: {self._project_path}, environment: {self._env}, conf_source: {conf_source}"
    )
    self._session = KedroSession.create(
        project_path=self._project_path,
        env=self._env,
        conf_source=conf_source,
    )

    self._session_id = self._session.session_id
    LOGGER.debug(f"Session created with ID: {self._session_id}")

    LOGGER.info("Loading Kedro context...")
    self._context = self._session.load_context()

    LOGGER.info("Getting Kedro catalog...")
    self._catalog = self._context.catalog

    self._pipelines = find_pipelines()

    LOGGER.info("Kedro initialization complete.")

get_defined_pipelines(dagster_config, translate_all)

Resolve the list of pipelines to translate.

Parameters
Name Type Description Default
dagster_config BaseModel

Validated configuration for Kedro-Dagster.

required
translate_all bool

If True, translate all discovered pipelines; otherwise, only those referenced by jobs in the configuration.

required
Returns
Type Description
list[Pipeline]

Kedro pipelines to translate.

See Also

kedro_dagster.utils.get_filter_params_dict : Extracts filter parameters from pipeline config. kedro_dagster.translator.KedroProjectTranslator.to_dagster : Calls this to determine which pipelines to translate.

Source Code
Show/Hide source
def get_defined_pipelines(self, dagster_config: "BaseModel", translate_all: bool) -> list["Pipeline"]:
    """Resolve the list of pipelines to translate.

    Parameters
    ----------
    dagster_config : BaseModel
        Validated configuration for Kedro-Dagster.
    translate_all : bool
        If ``True``, translate all discovered pipelines; otherwise, only
        those referenced by jobs in the configuration.

    Returns
    -------
    list[Pipeline]
        Kedro pipelines to translate.

    See Also
    --------
    `kedro_dagster.utils.get_filter_params_dict` :
        Extracts filter parameters from pipeline config.
    `kedro_dagster.translator.KedroProjectTranslator.to_dagster` :
        Calls this to determine which pipelines to translate.
    """
    # Lazy import to avoid circular dependency
    from kedro.framework.project import find_pipelines, pipelines

    if translate_all:
        return list(find_pipelines().values())

    defined_pipelines = []
    for job_config in dagster_config.jobs.values():
        pipeline_config = job_config.pipeline.model_dump()

        pipeline_name = pipeline_config.get("pipeline_name", "__default__")
        filter_params = get_filter_params_dict(pipeline_config)
        pipeline = pipelines.get(pipeline_name).filter(**filter_params)
        defined_pipelines.append(pipeline)

    return defined_pipelines

to_dagster(translate_all=False)

Translate the Kedro project into a Dagster code location.

Parameters
Name Type Description Default
translate_all bool

Whether to translate all pipelines or only those referenced in the dagster.yml configuration.

False
Returns
Type Description
DagsterCodeLocation

Translated Dagster code location composed of assets, jobs, resources, and more.

See Also

kedro_dagster.translator.DagsterCodeLocation : Container returned by this method.

Source Code
Show/Hide source
def to_dagster(self, translate_all: bool = False) -> DagsterCodeLocation:
    """Translate the Kedro project into a Dagster code location.

    Parameters
    ----------
    translate_all : bool, optional
        Whether to translate all pipelines or only those referenced
        in the ``dagster.yml`` configuration.

    Returns
    -------
    DagsterCodeLocation
        Translated Dagster code location composed of assets, jobs,
        resources, and more.

    See Also
    --------
    `kedro_dagster.translator.DagsterCodeLocation` :
        Container returned by this method.
    """
    LOGGER.info("Translating Kedro project into Dagster...")

    dagster_config = get_dagster_config(self._context)

    kedro_run_translator = KedroRunTranslator(
        context=self._context,
        catalog=self._catalog,
        project_path=str(self._project_path),
        env=self._env,
        run_id=self._session_id,
    )
    kedro_run_resource = kedro_run_translator.to_dagster(
        pipeline_name="__default__",
        filter_params={},
    )
    named_resources: dict[str, dg.ResourceDefinition] = {"kedro_run": kedro_run_resource}

    mlflow_config = None
    if is_mlflow_enabled():
        # Add MLflow resource only if MLflow is installed and configured on the context
        mlflow_config = getattr(self._context, "mlflow", None)
        if mlflow_config is not None:
            named_resources["mlflow"] = get_mlflow_resource_from_config(mlflow_config)
        else:
            LOGGER.info("MLflow is installed but not configured on the Kedro context; skipping MLflow resource.")

    self.logger_creator = LoggerCreator(dagster_config=dagster_config)
    named_loggers = self.logger_creator.create_loggers()

    defined_pipelines = self.get_defined_pipelines(dagster_config=dagster_config, translate_all=translate_all)
    self.catalog_translator = CatalogTranslator(
        catalog=self._catalog,
        pipelines=defined_pipelines,
        hook_manager=self._context._hook_manager,
        env=self._env,
    )
    named_io_managers, asset_partitions = self.catalog_translator.to_dagster()
    named_resources |= named_io_managers

    self.node_translator = NodeTranslator(
        pipelines=defined_pipelines,
        catalog=self._catalog,
        hook_manager=self._context._hook_manager,
        run_id=self._session_id,
        asset_partitions=asset_partitions,
        named_resources=named_resources,
        env=self._env,
        mlflow_config=mlflow_config,
    )
    named_op_factories, named_assets = self.node_translator.to_dagster()

    self.executor_creator = ExecutorCreator(dagster_config=dagster_config)
    named_executors = self.executor_creator.create_executors()

    self.pipeline_translator = PipelineTranslator(
        dagster_config=dagster_config,
        context=self._context,
        catalog=self._catalog,
        project_path=str(self._project_path),
        env=self._env,
        run_id=self._session_id,
        named_assets=named_assets,
        asset_partitions=asset_partitions,
        named_op_factories=named_op_factories,
        named_resources=named_resources,
        named_executors=named_executors,
        named_loggers=named_loggers,
        enable_mlflow=is_mlflow_enabled() and hasattr(self._context, "mlflow"),
    )
    named_jobs = self.pipeline_translator.to_dagster()

    self.schedule_creator = ScheduleCreator(dagster_config=dagster_config, named_jobs=named_jobs)
    named_schedules = self.schedule_creator.create_schedules()

    named_sensors = kedro_run_translator._translate_on_pipeline_error_hook(named_jobs=named_jobs)

    LOGGER.info("Kedro project successfully translated into Dagster.")

    return DagsterCodeLocation(
        named_resources=named_resources,
        named_assets=named_assets,
        named_jobs=named_jobs,
        named_executors=named_executors,
        named_schedules=named_schedules,
        named_sensors=named_sensors,
        named_loggers=named_loggers,
    )