Skip to content

get_partition_mapping

kedro_dagster.utils.get_partition_mapping(partition_mappings, upstream_asset_name, downstream_dataset_names, config_resolver)

Get the appropriate partition mapping for an asset based on its downstream datasets.

Parameters

Name Type Description Default
partition_mappings dict[str, PartitionMapping]

Dictionary of partition mappings.

required
upstream_asset_name str

Name of the upstream asset.

required
downstream_dataset_names list[str]

List of downstream dataset names.

required
config_resolver CatalogConfigResolver

Catalog config resolver to match patterns.

required

Returns

Type Description
PartitionMapping or None

Partition mapping or None if not found.

See Also

kedro_dagster.utils.get_match_pattern_from_catalog_resolver : Resolves dataset patterns for mapping lookups. kedro_dagster.datasets.partitioned_dataset.DagsterPartitionedDataset : Defines partition mappings consumed here.

Source Code

Show/Hide source
def get_partition_mapping(
    partition_mappings: dict[str, dg.PartitionMapping],
    upstream_asset_name: str,
    downstream_dataset_names: list[str],
    config_resolver: "CatalogConfigResolver",
) -> dg.PartitionMapping | None:
    """Get the appropriate partition mapping for an asset based on its downstream datasets.

    Parameters
    ----------
    partition_mappings : dict[str, PartitionMapping]
        Dictionary of partition mappings.
    upstream_asset_name : str
        Name of the upstream asset.
    downstream_dataset_names : list[str]
        List of downstream dataset names.
    config_resolver : CatalogConfigResolver
        Catalog config resolver to match patterns.

    Returns
    -------
    PartitionMapping or None
        Partition mapping or ``None`` if not found.

    See Also
    --------
    `kedro_dagster.utils.get_match_pattern_from_catalog_resolver` :
        Resolves dataset patterns for mapping lookups.
    `kedro_dagster.datasets.partitioned_dataset.DagsterPartitionedDataset` :
        Defines partition mappings consumed here.
    """
    mapped_downstream_asset_names = partition_mappings.keys()
    if downstream_dataset_names:
        mapped_downstream_dataset_name = None
        for downstream_dataset_name in downstream_dataset_names:
            downstream_asset_name = format_dataset_name(downstream_dataset_name)
            if downstream_asset_name in mapped_downstream_asset_names:
                mapped_downstream_dataset_name = downstream_dataset_name
                break
            else:
                match_pattern = get_match_pattern_from_catalog_resolver(config_resolver, downstream_dataset_name)
                if match_pattern is not None:
                    mapped_downstream_dataset_name = match_pattern
                    break

        if mapped_downstream_dataset_name is not None:
            if mapped_downstream_dataset_name in partition_mappings:
                return partition_mappings[mapped_downstream_dataset_name]
            else:
                LOGGER.warning(
                    f"Downstream dataset `{mapped_downstream_dataset_name}` of `{upstream_asset_name}` "
                    "is not found in the partition mappings. "
                    "The default partition mapping (i.e., `AllPartitionMapping`) will be used."
                )
        else:
            LOGGER.warning(
                f"None of the downstream datasets `{downstream_dataset_names}` of `{upstream_asset_name}` "
                "is found in the partition mappings. "
                "The default partition mapping (i.e., `AllPartitionMapping`) will be used."
            )

    return None