class CatalogTranslator:
"""Translate Kedro datasets into Dagster IO managers.
Parameters
----------
catalog : CatalogProtocol
Kedro catalog.
pipelines : list[Pipeline]
Kedro pipelines to consider when building IO managers.
hook_manager : PluginManager
Kedro hook manager used to invoke dataset hooks.
env : str
Kedro environment (used to namespace resource keys).
See Also
--------
`kedro_dagster.nodes.NodeTranslator` :
Translates Kedro nodes into Dagster ops and assets.
`kedro_dagster.translator.KedroProjectTranslator` :
Orchestrates the full Kedro-to-Dagster translation.
"""
def __init__(
self,
catalog: "CatalogProtocol",
pipelines: list["Pipeline"],
hook_manager: "PluginManager",
env: str,
):
self._catalog = catalog
self._pipelines = pipelines
self._hook_manager = hook_manager
self._env = env
def _create_dataset_config(self, dataset: "AbstractDataset") -> Any:
"""Create a Pydantic model class capturing dataset configuration.
The returned class extends Dagster's ``Config`` and contains fields for
the dataset class short name and fields from ``dataset._describe()``
(excluding ``version``), converting ``PurePosixPath`` values to strings
for serialization.
Parameters
----------
dataset : AbstractDataset
Kedro dataset to introspect.
Returns
-------
type[Config]
A Pydantic model class with dataset configuration fields.
See Also
--------
`kedro_dagster.catalog.CatalogTranslator._translate_dataset` :
Uses this config to build a configurable IO manager.
"""
params: dict[str, Any] = {"dataset": dataset.__class__.__name__}
for param, value in dataset._describe().items():
if param == "version":
continue
# Convert any path-like values to strings (preserve original separators).
if isinstance(value, PurePosixPath | PathLike):
params[param] = str(value)
else:
params[param] = value
DatasetConfig = _create_pydantic_model_from_dict(
name="DatasetConfig",
params=params,
__base__=dg.Config,
__config__=ConfigDict(arbitrary_types_allowed=True),
)
return DatasetConfig
def _translate_dataset(
self, dataset: "AbstractDataset", dataset_name: str
) -> tuple[dg.IOManagerDefinition, Any, Any]:
"""Create a configurable IO manager class for a single Kedro dataset.
Parameters
----------
dataset : AbstractDataset
Kedro dataset to wrap into an IO manager.
dataset_name : str
Name of the dataset in the Kedro catalog.
Returns
-------
tuple[IOManagerDefinition, Any, Any]
3-tuple of (IO manager instance, partitions_def,
partition_mappings).
See Also
--------
`kedro_dagster.catalog.CatalogTranslator.to_dagster` :
Iterates over all datasets calling this method.
`kedro_dagster.datasets.DagsterPartitionedDataset` :
Partition-aware dataset wrapper.
"""
asset_name = format_dataset_name(dataset_name)
partitions_def, partition_mappings = None, None
if isinstance(dataset, DagsterPartitionedDataset):
partitions_def = dataset._get_partitions_definition()
partition_mappings = dataset._get_partition_mappings()
DatasetConfig = self._create_dataset_config(dataset)
hook_manager = self._hook_manager
named_nodes = {format_node_name(node.name): node for node in sum(self._pipelines, start=Pipeline([])).nodes}
class ConfigurableDatasetIOManager(DatasetConfig, dg.ConfigurableIOManager): # type: ignore[valid-type]
def handle_output(self, context: dg.OutputContext, obj) -> None: # type: ignore[no-untyped-def]
"""Persist an output through the underlying Kedro dataset.
Invokes ``before_dataset_saved`` and ``after_dataset_saved``
Kedro hooks when the calling op corresponds to a known Kedro
node.
"""
node_name = context.op_def.name
is_node_op = node_name in named_nodes
if is_node_op:
context.log.info("Executing `before_dataset_saved` Kedro hook.")
node = named_nodes[node_name]
hook_manager.hook.before_dataset_saved(
dataset_name=dataset_name,
data=obj,
node=node,
)
partition = None
# Partition key passed via op tags for fanned-out ops
if "downstream_partition_key" in context.op_def.tags:
downstream_partition_key = context.op_def.tags["downstream_partition_key"]
if asset_name == downstream_partition_key.split("|")[0]:
partition = downstream_partition_key.split("|")[1]
# Partition key passed via context for asset jobs
# Prefer Dagster's asset partition when available, otherwise fall back to plain partition_key
elif getattr(context, "has_asset_partitions", False):
partition = context.asset_partition_key
elif getattr(context, "has_partition_key", False):
partition = context.partition_key
if partition is not None and isinstance(obj, dict) and set(obj.keys()) != {partition}:
raise ValueError(
f"Expected data for downstream partition to be a dict with key '{partition}' but got: {obj}"
)
dataset.save(obj)
if is_node_op:
context.log.info("Executing `after_dataset_saved` Kedro hook.")
hook_manager.hook.after_dataset_saved(
dataset_name=dataset_name,
data=obj,
node=node,
)
def load_input(self, context: dg.InputContext) -> Any:
"""Load an input through the underlying Kedro dataset.
Invokes ``before_dataset_loaded`` and ``after_dataset_loaded``
Kedro hooks when the calling op corresponds to a known Kedro
node.
"""
node_name = context.op_def.name
is_node_op = node_name in named_nodes
if is_node_op:
context.log.info("Executing `before_dataset_loaded` Kedro hook.")
node = named_nodes[node_name]
hook_manager.hook.before_dataset_loaded(
dataset_name=dataset_name,
node=node,
)
data = dataset.load()
partition = None
# Partition key passed via op tags for fanned-out ops
if "upstream_partition_key" in context.op_def.tags:
upstream_partition_key = context.op_def.tags["upstream_partition_key"]
if asset_name == upstream_partition_key.split("|")[0]:
partition = upstream_partition_key.split("|")[1]
# Partition key passed via context for asset jobs
# Prefer Dagster's asset partition when available, otherwise fall back to plain partition_key
elif getattr(context, "has_asset_partitions", False):
partition = context.asset_partition_key
elif getattr(context, "has_partition_key", False):
partition = context.partition_key
if partition is not None and isinstance(data, dict):
data = {partition: data.get(partition)}
if is_node_op:
context.log.info("Executing `after_dataset_loaded` Kedro hook.")
node = named_nodes[node_name]
hook_manager.hook.after_dataset_loaded(
dataset_name=dataset_name,
data=data,
node=node,
)
return data
# Build a named IO manager class for this particular dataset type
dataset_type_short = dataset.__class__.__name__
ConfigurableDatasetIOManagerClass = create_model(dataset_type_short, __base__=ConfigurableDatasetIOManager)
ConfigurableDatasetIOManagerClass.__doc__ = f"IO Manager for Kedro dataset `{dataset_name}`."
# Instantiate without args; defaults are embedded in the DatasetConfig
io_manager_instance = ConfigurableDatasetIOManagerClass()
return io_manager_instance, partitions_def, partition_mappings
def to_dagster(self) -> tuple[dict[str, dg.IOManagerDefinition], dict[str, dict[str, Any]]]:
"""Generate IO managers and partitions for all Kedro datasets referenced by pipelines.
Returns
-------
tuple[dict[str, IOManagerDefinition], dict[str, dict[str, Any]]]
2-tuple of (named IO managers, asset partition definitions).
See Also
--------
`kedro_dagster.catalog.CatalogTranslator._translate_dataset` :
Creates a single IO manager per dataset.
"""
LOGGER.info("Translating Kedro catalog to Dagster IO managers...")
named_io_managers: dict[str, dg.IOManagerDefinition] = {}
asset_partitions: dict[str, dict[str, Any]] = {}
for dataset_name in sum(self._pipelines, start=Pipeline([])).datasets():
if _is_param_name(dataset_name) or is_nothing_asset_name(self._catalog, dataset_name):
continue
LOGGER.debug(f"Translating dataset '{dataset_name}'...")
asset_name = format_dataset_name(dataset_name)
dataset = get_dataset_from_catalog(self._catalog, dataset_name)
if dataset is None:
LOGGER.debug(
f"Dataset `{dataset_name}` not in catalog. It will be handled by default IO manager `io_manager`."
)
continue
if isinstance(dataset, MemoryDataset):
continue
io_manager, partitions_def, partition_mappings = self._translate_dataset(dataset, dataset_name)
named_io_managers[f"{self._env}__{asset_name}_io_manager"] = io_manager
if partitions_def is not None:
asset_partitions[asset_name] = {
"partitions_def": partitions_def,
"partition_mappings": partition_mappings,
}
LOGGER.debug(
f"Translated {len(named_io_managers)} IO manager(s) with {len(asset_partitions)} partition definition(s)"
)
return named_io_managers, asset_partitions