Skip to content

Datasets

Custom dataset implementations for Kedro-Dagster integration.

DagsterNothingDataset

kedro_dagster.datasets.DagsterNothingDataset

Bases: AbstractDataset

A Kedro dataset used to represent placeholder outputs.

This dataset can be used in Kedro pipelines to signify that a node has completed its task without actually producing any tangible output. It is particularly useful for enforcing execution order in pipelines where certain nodes need to run after others without passing data.

Parameters:

Name Type Description Default
metadata dict[str, Any] or None

Optional metadata stored alongside the dataset.

None

Examples:

>>> from kedro_dagster.datasets.nothing_dataset import DagsterNothingDataset
>>> nothing_ds = DagsterNothingDataset(metadata={"info": "no output"})
>>> data = nothing_ds.load()
>>> print(data)
__nothing__
>>> nothing_ds.save(data)
See Also

kedro_dagster.utils.is_nothing_asset_name : Predicate that checks whether a catalog entry is this type.

Source code in src/kedro_dagster/datasets/nothing_dataset.py
class DagsterNothingDataset(AbstractDataset):
    """A Kedro dataset used to represent placeholder outputs.

    This dataset can be used in Kedro pipelines to signify that a node has
    completed its task without actually producing any tangible output. It is
    particularly useful for enforcing execution order in pipelines where
    certain nodes need to run after others without passing data.

    Parameters
    ----------
    metadata : dict[str, Any] or None, optional
        Optional metadata stored alongside the dataset.

    Examples
    --------
    >>> from kedro_dagster.datasets.nothing_dataset import DagsterNothingDataset
    >>> nothing_ds = DagsterNothingDataset(metadata={"info": "no output"})
    >>> data = nothing_ds.load()
    >>> print(data)
    __nothing__
    >>> nothing_ds.save(data)

    See Also
    --------
    `kedro_dagster.utils.is_nothing_asset_name` :
        Predicate that checks whether a catalog entry is this type.
    """

    def __init__(
        self,
        metadata: dict[str, Any] | None = None,
    ) -> None:
        super().__init__()

        self._metadata = metadata or {}

    def load(self) -> str:
        """Return the nothing sentinel for every load.

        Returns
        -------
        str
            Always ``"__nothing__"``.
        """
        return NOTHING_OUTPUT

    def save(self, data: Any) -> None:
        """Do nothing when saving data.

        Parameters
        ----------
        data : Any
            Ignored.
        """
        # Intentionally do not persist anything
        pass

    def _exists(self) -> bool:
        """Always report that the dataset exists.

        Returns
        -------
        bool
            Always ``True``.
        """
        return True

    def _describe(self) -> dict[str, Any]:
        """Return a JSON-serializable description of the dataset.

        Returns
        -------
        dict[str, Any]
            Basic dataset type info.
        """
        return {"type": self.__class__.__name__}

load()

Return the nothing sentinel for every load.

Returns:

Type Description
str

Always "__nothing__".

Source code in src/kedro_dagster/datasets/nothing_dataset.py
def load(self) -> str:
    """Return the nothing sentinel for every load.

    Returns
    -------
    str
        Always ``"__nothing__"``.
    """
    return NOTHING_OUTPUT

save(data)

Do nothing when saving data.

Parameters:

Name Type Description Default
data Any

Ignored.

required
Source code in src/kedro_dagster/datasets/nothing_dataset.py
def save(self, data: Any) -> None:
    """Do nothing when saving data.

    Parameters
    ----------
    data : Any
        Ignored.
    """
    # Intentionally do not persist anything
    pass

DagsterPartitionedDataset

kedro_dagster.datasets.DagsterPartitionedDataset

Bases: PartitionedDataset

Kedro dataset that enables Dagster partitioning.

This dataset wraps Kedro's PartitionedDataset to expose Dagster partition definitions and mappings. It validates partition types at instantiation and only supports a limited subset of Dagster's partition features.

Supported Partition Types: - StaticPartitionsDefinition: Fixed list of partition keys

Supported Partition Mappings: - StaticPartitionMapping: Explicit upstream→downstream key mapping - IdentityPartitionMapping: 1:1 mapping with matching keys

Parameters:

Name Type Description Default
path str

Base path for partitions.

required
dataset dict[str, Any]

Underlying dataset config.

required
partition dict[str, Any]

Partition definition config or type.

required
partition_mapping dict[str, Any] or None

Optional downstream partition mappings.

None
filepath_arg str

Arg name in underlying dataset to pass the resolved path.

'filepath'
filename_suffix str

Optional suffix appended to partitioned filename.

''
credentials dict[str, Any] or None

Credentials for the underlying dataset.

None
load_args dict[str, Any] or None

Load args for the underlying dataset.

None
fs_args dict[str, Any] or None

Filesystem args for the underlying dataset.

None
overwrite bool

Whether to overwrite on save.

False
save_lazily bool

Whether to save lazily.

True
metadata dict[str, Any] or None

Arbitrary metadata.

None

Examples:

Basic usage with StaticPartitionsDefinition:

my_partitioned_dataset:
  type: kedro_dagster.DagsterPartitionedDataset
  path: data/partitions/
  dataset:
    type: pandas.CSVDataset
  partition:
    type: dagster.StaticPartitionsDefinition
    partition_keys: ["A", "B", "C"]

With StaticPartitionMapping:

upstream:
  type: kedro_dagster.DagsterPartitionedDataset
  path: data/01_raw/upstream/
  dataset:
    type: pandas.CSVDataset
  partition:
    type: dagster.StaticPartitionsDefinition
    partition_keys: ["1.csv", "2.csv"]
  partition_mappings:
    downstream:
      type: dagster.StaticPartitionMapping
      downstream_partition_keys_by_upstream_partition_key:
        1.csv: 10.csv
        2.csv: 20.csv

With IdentityPartitionMapping:

upstream:
  type: kedro_dagster.DagsterPartitionedDataset
  path: data/02_raw/upstream/
  dataset:
    type: pickle.PickleDataset
  partition:
    type: dagster.StaticPartitionsDefinition
    partition_keys: ["2024-01.pkl", "2024-02.pkl"]
  partition_mappings:
    downstream:
      type: dagster.IdentityPartitionMapping
See Also

kedro_dagster.datasets.nothing_dataset.DagsterNothingDataset : Companion sentinel dataset for Nothing-typed assets. kedro_dagster.catalog.CatalogTranslator : Translates these datasets into Dagster IO managers.

Source code in src/kedro_dagster/datasets/partitioned_dataset.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
class DagsterPartitionedDataset(PartitionedDataset):
    """Kedro dataset that enables Dagster partitioning.

    This dataset wraps Kedro's `PartitionedDataset` to expose Dagster partition
    definitions and mappings. It validates partition types at instantiation and
    only supports a limited subset of Dagster's partition features.

    **Supported Partition Types**:
        - `StaticPartitionsDefinition`: Fixed list of partition keys

    **Supported Partition Mappings**:
        - `StaticPartitionMapping`: Explicit upstream→downstream key mapping
        - `IdentityPartitionMapping`: 1:1 mapping with matching keys

    Parameters
    ----------
    path : str
        Base path for partitions.
    dataset : dict[str, Any]
        Underlying dataset config.
    partition : dict[str, Any]
        Partition definition config or type.
    partition_mapping : dict[str, Any] or None
        Optional downstream partition mappings.
    filepath_arg : str
        Arg name in underlying dataset to pass the resolved path.
    filename_suffix : str
        Optional suffix appended to partitioned filename.
    credentials : dict[str, Any] or None
        Credentials for the underlying dataset.
    load_args : dict[str, Any] or None
        Load args for the underlying dataset.
    fs_args : dict[str, Any] or None
        Filesystem args for the underlying dataset.
    overwrite : bool
        Whether to overwrite on save.
    save_lazily : bool
        Whether to save lazily.
    metadata : dict[str, Any] or None
        Arbitrary metadata.

    Examples
    --------
    Basic usage with ``StaticPartitionsDefinition``:

    ```yaml
    my_partitioned_dataset:
      type: kedro_dagster.DagsterPartitionedDataset
      path: data/partitions/
      dataset:
        type: pandas.CSVDataset
      partition:
        type: dagster.StaticPartitionsDefinition
        partition_keys: ["A", "B", "C"]
    ```

    With ``StaticPartitionMapping``:

    ```yaml
    upstream:
      type: kedro_dagster.DagsterPartitionedDataset
      path: data/01_raw/upstream/
      dataset:
        type: pandas.CSVDataset
      partition:
        type: dagster.StaticPartitionsDefinition
        partition_keys: ["1.csv", "2.csv"]
      partition_mappings:
        downstream:
          type: dagster.StaticPartitionMapping
          downstream_partition_keys_by_upstream_partition_key:
            1.csv: 10.csv
            2.csv: 20.csv
    ```

    With ``IdentityPartitionMapping``:

    ```yaml
    upstream:
      type: kedro_dagster.DagsterPartitionedDataset
      path: data/02_raw/upstream/
      dataset:
        type: pickle.PickleDataset
      partition:
        type: dagster.StaticPartitionsDefinition
        partition_keys: ["2024-01.pkl", "2024-02.pkl"]
      partition_mappings:
        downstream:
          type: dagster.IdentityPartitionMapping
    ```

    See Also
    --------
    `kedro_dagster.datasets.nothing_dataset.DagsterNothingDataset` :
        Companion sentinel dataset for Nothing-typed assets.
    `kedro_dagster.catalog.CatalogTranslator` :
        Translates these datasets into Dagster IO managers.
    """

    def __init__(
        self,
        *,
        path: str,
        dataset: dict[str, Any],
        partition: dict[str, Any],
        partition_mapping: dict[str, Any] | None = None,
        filepath_arg: str = "filepath",
        filename_suffix: str = "",
        credentials: dict[str, Any] | None = None,
        load_args: dict[str, Any] | None = None,
        fs_args: dict[str, Any] | None = None,
        overwrite: bool = False,
        save_lazily: bool = True,
        metadata: dict[str, Any] | None = None,
    ):
        # Must be set before super().__init__() because the parent constructor
        # calls self._invalidate_caches(), which clears this dict.
        self._partition_cache: dict[Any, Any] = {}

        super().__init__(
            path=path,
            dataset=dataset,
            filepath_arg=filepath_arg,
            filename_suffix=filename_suffix,
            credentials=credentials,
            load_args=load_args,
            fs_args=fs_args,
            overwrite=overwrite,
            save_lazily=save_lazily,
            metadata=metadata,
        )

        partition = partition if isinstance(partition, dict) else {"type": partition}
        self._validate_partitions_definition(partition)
        self._partition_type, self._partition_config = parse_dagster_definition(partition)
        self._validate_partition_definition_type()

        self._partition_mapping: dict[str, Any] | None = None
        if partition_mapping is not None:
            self._partition_mapping = {}
            for downstream_dataset_name, downstream_partition_mapping in partition_mapping.items():
                downstream_mapping_cfg = (
                    downstream_partition_mapping
                    if isinstance(downstream_partition_mapping, dict)
                    else {"type": downstream_partition_mapping}
                )
                downstream_partition_mapping_type, downstream_partition_mapping_config = parse_dagster_definition(
                    downstream_mapping_cfg
                )
                self._validate_partition_mapping_type(downstream_partition_mapping_type, downstream_dataset_name)
                self._partition_mapping[downstream_dataset_name] = {
                    "type": downstream_partition_mapping_type,
                    "config": downstream_partition_mapping_config,
                }

    def _validate_partitions_definition(self, partition: dict[str, Any]) -> None:
        """Validate minimal structure for partition definition.

        Parameters
        ----------
        partition : dict[str, Any]
            Partition definition config.
        """
        if "type" not in partition:
            raise ValueError("Partition definition must contain the 'type' key.")

    def _validate_partition_definition_type(self) -> None:
        """Validate that only StaticPartitionsDefinition is used.

        Raises
        ------
        NotImplementedError
            If partition definition type is not supported.
        """
        # Check if it's StaticPartitionsDefinition or a subclass
        is_static = self._partition_type is dg.StaticPartitionsDefinition or (
            isinstance(self._partition_type, type) and issubclass(self._partition_type, dg.StaticPartitionsDefinition)
        )

        if not is_static:
            msg = (
                f"Partition definition type '{self._partition_type.__name__}' is not supported. "
                "Kedro-Dagster currently only supports `StaticPartitionsDefinition`."
            )
            raise NotImplementedError(msg)

    def _validate_partition_mapping_type(self, mapping_type: type, downstream_dataset_name: str) -> None:
        """Validate that only supported partition mappings are used.

        Parameters
        ----------
        mapping_type : type
            The partition mapping class.
        downstream_dataset_name : str
            Name of the downstream dataset.

        Raises
        ------
        NotImplementedError
            If partition mapping type is not supported.
        """
        # Allowed mapping types (StaticPartitionMapping and IdentityPartitionMapping)
        allowed_types = (dg.StaticPartitionMapping, dg.IdentityPartitionMapping)

        # Check if mapping_type is one of the allowed types or a subclass
        is_allowed = mapping_type in allowed_types or (
            isinstance(mapping_type, type) and any(issubclass(mapping_type, t) for t in allowed_types)
        )

        if not is_allowed:
            msg = (
                f"Partition mapping type '{mapping_type.__name__}' for downstream dataset "
                f"'{downstream_dataset_name}' is not supported. "
                "Kedro-Dagster currently only supports `StaticPartitionMapping` and `IdentityPartitionMapping`."
            )
            raise NotImplementedError(msg)

    def _invalidate_caches(self) -> None:
        """Invalidate both the parent partition cache and the Dagster partition cache."""
        super()._invalidate_caches()
        self._partition_cache.clear()

    def _get_partitions_definition(self) -> dg.PartitionsDefinition:
        """Instantiate and return the Dagster partitions definition.

        Returns
        -------
        PartitionsDefinition
            Instantiated partitions definition.
        """
        try:
            partition_def = self._partition_type(**self._partition_config)
        except Exception as exc:
            raise ValueError(
                f"Failed to instantiate partitions definition "
                f"'{self._partition_type.__name__}' with config: {self._partition_config}"
            ) from exc

        return partition_def

    def _get_mapped_downstream_dataset_names(self) -> list[str]:
        """Return downstream dataset names that have a partition mapping.

        Returns
        -------
        list[str]
            Mapped downstream dataset names.
        """
        if self._partition_mapping is None:
            return []

        return list(self._partition_mapping.keys())

    def _get_partition_mappings(self) -> dict[str, dg.PartitionMapping] | None:
        """Instantiate and return configured partition mappings, if any.

        Returns
        -------
        dict[str, PartitionMapping] or None
            Mapping per downstream dataset or ``None``.
        """
        if self._partition_mapping is None:
            return None

        partition_mappings: dict[str, dg.PartitionMapping] = {}
        for (
            downstream_dataset_name,
            downstream_partition_mapping_info,
        ) in self._partition_mapping.items():
            # downstream_partition_mapping_info["type"] is a class object; cast it for typing
            partition_mapping_type = cast(
                type[dg.PartitionMapping],
                downstream_partition_mapping_info["type"],
            )
            partition_mapping_config: dict[str, Any] = downstream_partition_mapping_info["config"]

            try:
                partition_mapping = partition_mapping_type(**partition_mapping_config)
            except Exception as exc:
                raise ValueError(
                    f"Failed to instantiate partition mapping "
                    f"'{downstream_partition_mapping_info['type'].__name__}' "
                    f"with config: {downstream_partition_mapping_info['config']}"
                ) from exc

            partition_mappings[downstream_dataset_name] = partition_mapping

        return partition_mappings

    # TODO: Cache?
    def _list_available_partition_keys(self) -> list[str]:
        """List available partition keys present on the filesystem.

        Returns
        -------
        list[str]
            Available partition keys.
        """
        available_partitions: list[str] = self._list_partitions()

        partition_keys: list[str] = []
        base_path = Path(self._normalized_path)
        for partition in available_partitions:
            try:
                key = os.path.relpath(str(partition), start=str(base_path))
            except Exception:
                key = Path(partition).name

            if self._filename_suffix and key.endswith(self._filename_suffix):
                key = key[: -len(self._filename_suffix)]
            partition_keys.append(key)

        return partition_keys

    @cachedmethod(cache=operator.attrgetter("_partition_cache"))
    def _list_partitions(self) -> list[str]:
        """List partition paths according to the partitions definition.

        Returns
        -------
        list[str]
            Full paths of discovered partitions.
        """
        partitions_def = self._get_partitions_definition()
        partition_keys = partitions_def.get_partition_keys()

        base_path = Path(self._normalized_path)
        partitions: list[str] = []

        for key in partition_keys:
            # Check candidate paths: prefer key + filename_suffix if suffix is provided
            candidates: list[Path] = []
            if self._filename_suffix:
                candidates.append(base_path / f"{key}{self._filename_suffix}")
            candidates.append(base_path / key)

            for candidate in candidates:
                try:
                    candidate_str = str(candidate)
                    if self._filesystem.exists(candidate_str):
                        partitions.append(candidate_str)
                        break
                except Exception:
                    # Ignore errors for individual candidates and continue checking others
                    continue

        return partitions

    def _get_filepath(self, partition: str) -> str:
        """Compute the full filepath for a given partition key.

        Parameters
        ----------
        partition : str
            Partition key.

        Returns
        -------
        str
            Full path to the partition.
        """
        partition_def = self._get_partitions_definition()
        if partition_def is None:
            raise ValueError("Partition definition could not be instantiated.")

        if partition not in partition_def.get_partition_keys():
            raise ValueError(f"Partition '{partition}' not found in partition definition.")

        return str(Path(self._normalized_path) / partition)

    def load(self) -> dict[str, Callable[[], Any]]:
        """Load partitioned data lazily as callables per partition key.

        Returns
        -------
        dict[str, Callable[[], Any]]
            Map of partition key to loader callable.
        """
        loaded_data = cast(dict[str, Callable[[], Any]], super().load())

        # Normalize keys to logical partition keys (e.g., "p1") instead of full paths.
        base = Path(self._normalized_path)
        normalized: dict[str, Callable[[], Any]] = {}
        for raw_key, value in loaded_data.items():
            key_str = str(raw_key)
            # Only attempt to relativize if key looks like a path pointing under base
            if os.path.isabs(key_str) or key_str.startswith(str(base)):
                try:
                    key_rel = os.path.relpath(key_str, start=str(base))
                except Exception:
                    key_rel = Path(key_str).name
            else:
                key_rel = key_str

            if self._filename_suffix and key_rel.endswith(self._filename_suffix):
                key_rel = key_rel[: -len(self._filename_suffix)]

            normalized[key_rel] = value

        return normalized

    def save(self, data: dict[str, Any]) -> None:
        """Save partitioned data.

        Parameters
        ----------
        data : dict[str, Any]
            Map of partition key to data.
        """
        partitions_def = self._get_partitions_definition()
        partition_keys = partitions_def.get_partition_keys()
        if not isinstance(data, dict):
            raise TypeError(f"Expected data to be a dict mapping partition keys to data, but got: {type(data)}")
        elif all(key not in partition_keys for key in data):
            raise ValueError(
                "No matching partitions found to save the provided data. Partition keys: "
                f"{list(data.keys())}. Expected keys: {partition_keys}"
            )

        super().save(data)

    def _describe(self) -> dict[str, Any]:
        """Return a JSON-serializable description of the dataset configuration.

        Returns
        -------
        dict[str, Any]
            Description including dataset, partition, and mapping metadata.
        """
        partitioned_dataset_description = super()._describe()

        clean_partition_config = (
            dict(self._partition_config.items()) if isinstance(self._partition_config, dict) else self._partition_config
        )

        partitioned_dataset_description = partitioned_dataset_description | {
            "partition_type": self._partition_type.__name__,
            "partition_config": clean_partition_config,
        }

        if self._partition_mapping is not None:
            for downstream_dataset_name, downstream_partition_mapping_info in self._partition_mapping.items():
                downstream_partition_config: dict[str, Any] = downstream_partition_mapping_info["config"]
                clean_downstream_partition_mapping_config = (
                    dict(downstream_partition_config.items())
                    if isinstance(downstream_partition_config, dict)
                    else downstream_partition_config
                )

                partitioned_dataset_description = partitioned_dataset_description | {
                    downstream_dataset_name: {
                        "partition_mapping": downstream_partition_mapping_info["type"].__name__,
                        "partition_mapping_config": clean_downstream_partition_mapping_config,
                    }
                }
        return partitioned_dataset_description  # type: ignore[no-any-return]

    def __repr__(self) -> str:
        """Return a human-friendly representation of the dataset."""
        object_description = self._describe()

        # Dummy object to call _pretty_repr
        # Only clean_dataset_config parameters are exposed
        kwargs = deepcopy(self._dataset_config)
        kwargs[self._filepath_arg] = ""
        dataset = self._dataset_type(**kwargs)

        object_description_repr = {
            "filepath": object_description["path"],
            "dataset": dataset._pretty_repr(object_description["dataset_config"]),
            "partition": dataset._pretty_repr(object_description["partition_config"]),
        }

        # Render partition mappings (if any) from the description
        if self._partition_mapping:
            mapping_repr: dict[str, Any] = {}
            for downstream_dataset_name in self._partition_mapping:
                downstream_desc = object_description.get(downstream_dataset_name)
                if isinstance(downstream_desc, dict) and "partition_mapping_config" in downstream_desc:
                    mapping_repr[downstream_dataset_name] = dataset._pretty_repr(
                        downstream_desc["partition_mapping_config"]
                    )
            if mapping_repr:
                object_description_repr["partition_mapping"] = mapping_repr

        return self._pretty_repr(object_description_repr)  # type: ignore[no-any-return]

    def _exists(self) -> bool:
        """Return True when at least one partition exists."""
        return bool(self._list_partitions())

load()

Load partitioned data lazily as callables per partition key.

Returns:

Type Description
dict[str, Callable[[], Any]]

Map of partition key to loader callable.

Source code in src/kedro_dagster/datasets/partitioned_dataset.py
def load(self) -> dict[str, Callable[[], Any]]:
    """Load partitioned data lazily as callables per partition key.

    Returns
    -------
    dict[str, Callable[[], Any]]
        Map of partition key to loader callable.
    """
    loaded_data = cast(dict[str, Callable[[], Any]], super().load())

    # Normalize keys to logical partition keys (e.g., "p1") instead of full paths.
    base = Path(self._normalized_path)
    normalized: dict[str, Callable[[], Any]] = {}
    for raw_key, value in loaded_data.items():
        key_str = str(raw_key)
        # Only attempt to relativize if key looks like a path pointing under base
        if os.path.isabs(key_str) or key_str.startswith(str(base)):
            try:
                key_rel = os.path.relpath(key_str, start=str(base))
            except Exception:
                key_rel = Path(key_str).name
        else:
            key_rel = key_str

        if self._filename_suffix and key_rel.endswith(self._filename_suffix):
            key_rel = key_rel[: -len(self._filename_suffix)]

        normalized[key_rel] = value

    return normalized

save(data)

Save partitioned data.

Parameters:

Name Type Description Default
data dict[str, Any]

Map of partition key to data.

required
Source code in src/kedro_dagster/datasets/partitioned_dataset.py
def save(self, data: dict[str, Any]) -> None:
    """Save partitioned data.

    Parameters
    ----------
    data : dict[str, Any]
        Map of partition key to data.
    """
    partitions_def = self._get_partitions_definition()
    partition_keys = partitions_def.get_partition_keys()
    if not isinstance(data, dict):
        raise TypeError(f"Expected data to be a dict mapping partition keys to data, but got: {type(data)}")
    elif all(key not in partition_keys for key in data):
        raise ValueError(
            "No matching partitions found to save the provided data. Partition keys: "
            f"{list(data.keys())}. Expected keys: {partition_keys}"
        )

    super().save(data)

__repr__()

Return a human-friendly representation of the dataset.

Source code in src/kedro_dagster/datasets/partitioned_dataset.py
def __repr__(self) -> str:
    """Return a human-friendly representation of the dataset."""
    object_description = self._describe()

    # Dummy object to call _pretty_repr
    # Only clean_dataset_config parameters are exposed
    kwargs = deepcopy(self._dataset_config)
    kwargs[self._filepath_arg] = ""
    dataset = self._dataset_type(**kwargs)

    object_description_repr = {
        "filepath": object_description["path"],
        "dataset": dataset._pretty_repr(object_description["dataset_config"]),
        "partition": dataset._pretty_repr(object_description["partition_config"]),
    }

    # Render partition mappings (if any) from the description
    if self._partition_mapping:
        mapping_repr: dict[str, Any] = {}
        for downstream_dataset_name in self._partition_mapping:
            downstream_desc = object_description.get(downstream_dataset_name)
            if isinstance(downstream_desc, dict) and "partition_mapping_config" in downstream_desc:
                mapping_repr[downstream_dataset_name] = dataset._pretty_repr(
                    downstream_desc["partition_mapping_config"]
                )
        if mapping_repr:
            object_description_repr["partition_mapping"] = mapping_repr

    return self._pretty_repr(object_description_repr)  # type: ignore[no-any-return]