Skip to content

CatalogTranslator

kedro_dagster.catalog.CatalogTranslator

Translate Kedro datasets into Dagster IO managers.

Parameters

Name Type Description Default
catalog CatalogProtocol

Kedro catalog.

required
pipelines list[Pipeline]

Kedro pipelines to consider when building IO managers.

required
hook_manager PluginManager

Kedro hook manager used to invoke dataset hooks.

required
env str

Kedro environment (used to namespace resource keys).

required

See Also

kedro_dagster.nodes.NodeTranslator : Translates Kedro nodes into Dagster ops and assets. kedro_dagster.translator.KedroProjectTranslator : Orchestrates the full Kedro-to-Dagster translation.

Source Code

Show/Hide source
class CatalogTranslator:
    """Translate Kedro datasets into Dagster IO managers.

    Parameters
    ----------
    catalog : CatalogProtocol
        Kedro catalog.
    pipelines : list[Pipeline]
        Kedro pipelines to consider when building IO managers.
    hook_manager : PluginManager
        Kedro hook manager used to invoke dataset hooks.
    env : str
        Kedro environment (used to namespace resource keys).

    See Also
    --------
    `kedro_dagster.nodes.NodeTranslator` :
        Translates Kedro nodes into Dagster ops and assets.
    `kedro_dagster.translator.KedroProjectTranslator` :
        Orchestrates the full Kedro-to-Dagster translation.
    """

    def __init__(
        self,
        catalog: "CatalogProtocol",
        pipelines: list["Pipeline"],
        hook_manager: "PluginManager",
        env: str,
    ):
        self._catalog = catalog
        self._pipelines = pipelines
        self._hook_manager = hook_manager
        self._env = env

    def _create_dataset_config(self, dataset: "AbstractDataset") -> Any:
        """Create a Pydantic model class capturing dataset configuration.

        The returned class extends Dagster's ``Config`` and contains fields for
        the dataset class short name and fields from ``dataset._describe()``
        (excluding ``version``), converting ``PurePosixPath`` values to strings
        for serialization.

        Parameters
        ----------
        dataset : AbstractDataset
            Kedro dataset to introspect.

        Returns
        -------
        type[Config]
            A Pydantic model class with dataset configuration fields.

        See Also
        --------
        `kedro_dagster.catalog.CatalogTranslator._translate_dataset` :
            Uses this config to build a configurable IO manager.
        """
        params: dict[str, Any] = {"dataset": dataset.__class__.__name__}
        for param, value in dataset._describe().items():
            if param == "version":
                continue
            # Convert any path-like values to strings (preserve original separators).
            if isinstance(value, PurePosixPath | PathLike):
                params[param] = str(value)
            else:
                params[param] = value

        DatasetConfig = _create_pydantic_model_from_dict(
            name="DatasetConfig",
            params=params,
            __base__=dg.Config,
            __config__=ConfigDict(arbitrary_types_allowed=True),
        )
        return DatasetConfig

    def _translate_dataset(
        self, dataset: "AbstractDataset", dataset_name: str
    ) -> tuple[dg.IOManagerDefinition, Any, Any]:
        """Create a configurable IO manager class for a single Kedro dataset.

        Parameters
        ----------
        dataset : AbstractDataset
            Kedro dataset to wrap into an IO manager.
        dataset_name : str
            Name of the dataset in the Kedro catalog.

        Returns
        -------
        tuple[IOManagerDefinition, Any, Any]
            3-tuple of (IO manager instance, partitions_def,
            partition_mappings).

        See Also
        --------
        `kedro_dagster.catalog.CatalogTranslator.to_dagster` :
            Iterates over all datasets calling this method.
        `kedro_dagster.datasets.DagsterPartitionedDataset` :
            Partition-aware dataset wrapper.
        """

        asset_name = format_dataset_name(dataset_name)

        partitions_def, partition_mappings = None, None
        if isinstance(dataset, DagsterPartitionedDataset):
            partitions_def = dataset._get_partitions_definition()
            partition_mappings = dataset._get_partition_mappings()

        DatasetConfig = self._create_dataset_config(dataset)

        hook_manager = self._hook_manager
        named_nodes = {format_node_name(node.name): node for node in sum(self._pipelines, start=Pipeline([])).nodes}

        class ConfigurableDatasetIOManager(DatasetConfig, dg.ConfigurableIOManager):  # type: ignore[valid-type]
            def handle_output(self, context: dg.OutputContext, obj) -> None:  # type: ignore[no-untyped-def]
                """Persist an output through the underlying Kedro dataset.

                Invokes ``before_dataset_saved`` and ``after_dataset_saved``
                Kedro hooks when the calling op corresponds to a known Kedro
                node.
                """
                node_name = context.op_def.name
                is_node_op = node_name in named_nodes

                if is_node_op:
                    context.log.info("Executing `before_dataset_saved` Kedro hook.")
                    node = named_nodes[node_name]
                    hook_manager.hook.before_dataset_saved(
                        dataset_name=dataset_name,
                        data=obj,
                        node=node,
                    )

                partition = None
                # Partition key passed via op tags for fanned-out ops
                if "downstream_partition_key" in context.op_def.tags:
                    downstream_partition_key = context.op_def.tags["downstream_partition_key"]
                    if asset_name == downstream_partition_key.split("|")[0]:
                        partition = downstream_partition_key.split("|")[1]
                # Partition key passed via context for asset jobs
                # Prefer Dagster's asset partition when available, otherwise fall back to plain partition_key
                elif getattr(context, "has_asset_partitions", False):
                    partition = context.asset_partition_key
                elif getattr(context, "has_partition_key", False):
                    partition = context.partition_key

                if partition is not None and isinstance(obj, dict) and set(obj.keys()) != {partition}:
                    raise ValueError(
                        f"Expected data for downstream partition to be a dict with key '{partition}' but got: {obj}"
                    )

                dataset.save(obj)

                if is_node_op:
                    context.log.info("Executing `after_dataset_saved` Kedro hook.")
                    hook_manager.hook.after_dataset_saved(
                        dataset_name=dataset_name,
                        data=obj,
                        node=node,
                    )

            def load_input(self, context: dg.InputContext) -> Any:
                """Load an input through the underlying Kedro dataset.

                Invokes ``before_dataset_loaded`` and ``after_dataset_loaded``
                Kedro hooks when the calling op corresponds to a known Kedro
                node.
                """
                node_name = context.op_def.name
                is_node_op = node_name in named_nodes

                if is_node_op:
                    context.log.info("Executing `before_dataset_loaded` Kedro hook.")
                    node = named_nodes[node_name]
                    hook_manager.hook.before_dataset_loaded(
                        dataset_name=dataset_name,
                        node=node,
                    )

                data = dataset.load()

                partition = None
                # Partition key passed via op tags for fanned-out ops
                if "upstream_partition_key" in context.op_def.tags:
                    upstream_partition_key = context.op_def.tags["upstream_partition_key"]
                    if asset_name == upstream_partition_key.split("|")[0]:
                        partition = upstream_partition_key.split("|")[1]
                # Partition key passed via context for asset jobs
                # Prefer Dagster's asset partition when available, otherwise fall back to plain partition_key
                elif getattr(context, "has_asset_partitions", False):
                    partition = context.asset_partition_key
                elif getattr(context, "has_partition_key", False):
                    partition = context.partition_key

                if partition is not None and isinstance(data, dict):
                    data = {partition: data.get(partition)}

                if is_node_op:
                    context.log.info("Executing `after_dataset_loaded` Kedro hook.")
                    node = named_nodes[node_name]
                    hook_manager.hook.after_dataset_loaded(
                        dataset_name=dataset_name,
                        data=data,
                        node=node,
                    )

                return data

        # Build a named IO manager class for this particular dataset type
        dataset_type_short = dataset.__class__.__name__
        ConfigurableDatasetIOManagerClass = create_model(dataset_type_short, __base__=ConfigurableDatasetIOManager)
        ConfigurableDatasetIOManagerClass.__doc__ = f"IO Manager for Kedro dataset `{dataset_name}`."

        # Instantiate without args; defaults are embedded in the DatasetConfig
        io_manager_instance = ConfigurableDatasetIOManagerClass()

        return io_manager_instance, partitions_def, partition_mappings

    def to_dagster(self) -> tuple[dict[str, dg.IOManagerDefinition], dict[str, dict[str, Any]]]:
        """Generate IO managers and partitions for all Kedro datasets referenced by pipelines.

        Returns
        -------
        tuple[dict[str, IOManagerDefinition], dict[str, dict[str, Any]]]
            2-tuple of (named IO managers, asset partition definitions).

        See Also
        --------
        `kedro_dagster.catalog.CatalogTranslator._translate_dataset` :
            Creates a single IO manager per dataset.
        """
        LOGGER.info("Translating Kedro catalog to Dagster IO managers...")
        named_io_managers: dict[str, dg.IOManagerDefinition] = {}
        asset_partitions: dict[str, dict[str, Any]] = {}

        for dataset_name in sum(self._pipelines, start=Pipeline([])).datasets():
            if _is_param_name(dataset_name) or is_nothing_asset_name(self._catalog, dataset_name):
                continue

            LOGGER.debug(f"Translating dataset '{dataset_name}'...")
            asset_name = format_dataset_name(dataset_name)
            dataset = get_dataset_from_catalog(self._catalog, dataset_name)
            if dataset is None:
                LOGGER.debug(
                    f"Dataset `{dataset_name}` not in catalog. It will be handled by default IO manager `io_manager`."
                )
                continue

            if isinstance(dataset, MemoryDataset):
                continue

            io_manager, partitions_def, partition_mappings = self._translate_dataset(dataset, dataset_name)
            named_io_managers[f"{self._env}__{asset_name}_io_manager"] = io_manager

            if partitions_def is not None:
                asset_partitions[asset_name] = {
                    "partitions_def": partitions_def,
                    "partition_mappings": partition_mappings,
                }

        LOGGER.debug(
            f"Translated {len(named_io_managers)} IO manager(s) with {len(asset_partitions)} partition definition(s)"
        )
        return named_io_managers, asset_partitions

Methods

to_dagster()

Generate IO managers and partitions for all Kedro datasets referenced by pipelines.

Returns
Type Description
tuple[dict[str, IOManagerDefinition], dict[str, dict[str, Any]]]

2-tuple of (named IO managers, asset partition definitions).

See Also

kedro_dagster.catalog.CatalogTranslator._translate_dataset : Creates a single IO manager per dataset.

Source Code
Show/Hide source
def to_dagster(self) -> tuple[dict[str, dg.IOManagerDefinition], dict[str, dict[str, Any]]]:
    """Generate IO managers and partitions for all Kedro datasets referenced by pipelines.

    Returns
    -------
    tuple[dict[str, IOManagerDefinition], dict[str, dict[str, Any]]]
        2-tuple of (named IO managers, asset partition definitions).

    See Also
    --------
    `kedro_dagster.catalog.CatalogTranslator._translate_dataset` :
        Creates a single IO manager per dataset.
    """
    LOGGER.info("Translating Kedro catalog to Dagster IO managers...")
    named_io_managers: dict[str, dg.IOManagerDefinition] = {}
    asset_partitions: dict[str, dict[str, Any]] = {}

    for dataset_name in sum(self._pipelines, start=Pipeline([])).datasets():
        if _is_param_name(dataset_name) or is_nothing_asset_name(self._catalog, dataset_name):
            continue

        LOGGER.debug(f"Translating dataset '{dataset_name}'...")
        asset_name = format_dataset_name(dataset_name)
        dataset = get_dataset_from_catalog(self._catalog, dataset_name)
        if dataset is None:
            LOGGER.debug(
                f"Dataset `{dataset_name}` not in catalog. It will be handled by default IO manager `io_manager`."
            )
            continue

        if isinstance(dataset, MemoryDataset):
            continue

        io_manager, partitions_def, partition_mappings = self._translate_dataset(dataset, dataset_name)
        named_io_managers[f"{self._env}__{asset_name}_io_manager"] = io_manager

        if partitions_def is not None:
            asset_partitions[asset_name] = {
                "partitions_def": partitions_def,
                "partition_mappings": partition_mappings,
            }

    LOGGER.debug(
        f"Translated {len(named_io_managers)} IO manager(s) with {len(asset_partitions)} partition definition(s)"
    )
    return named_io_managers, asset_partitions