Skip to content

PipelineTranslator

kedro_dagster.pipelines.PipelineTranslator

Translator for Kedro pipelines to Dagster jobs.

Parameters

Name Type Description Default
dagster_config KedroDagsterConfig

Parsed configuration of the Dagster repository.

required
context KedroContext

Active Kedro context (provides catalog and hooks).

required
catalog CatalogProtocol

Kedro data catalog.

required
project_path str

Path to the Kedro project.

required
env str

Kedro environment used for namespacing.

required
run_id str

Kedro run ID. In Kedro < 1.0, this is called session_id.

required
named_assets dict[str, AssetsDefinition]

Mapping of asset name to asset.

required
asset_partitions dict[str, Any]

Mapping of asset name to partition definitions/mappings.

required
named_op_factories dict[str, OpDefinition]

Mapping of graph-op name to op factory.

required
named_resources dict[str, ResourceDefinition]

Mapping of resource name to resource definition.

required
named_executors dict[str, ExecutorDefinition]

Mapping of executor name to executor definition.

required
named_loggers dict[str, LoggerDefinition]

Mapping of logger name to logger definition.

required
enable_mlflow bool

Whether MLflow integration is enabled.

required

See Also

kedro_dagster.translator.KedroProjectTranslator : Orchestrates the full project translation. kedro_dagster.nodes.NodeTranslator : Translates individual Kedro nodes.

Source Code

Show/Hide source
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
class PipelineTranslator:
    """Translator for Kedro pipelines to Dagster jobs.

    Parameters
    ----------
    dagster_config : KedroDagsterConfig
        Parsed configuration of the Dagster repository.
    context : KedroContext
        Active Kedro context (provides catalog and hooks).
    catalog : CatalogProtocol
        Kedro data catalog.
    project_path : str
        Path to the Kedro project.
    env : str
        Kedro environment used for namespacing.
    run_id : str
        Kedro run ID. In Kedro < 1.0, this is called ``session_id``.
    named_assets : dict[str, AssetsDefinition]
        Mapping of asset name to asset.
    asset_partitions : dict[str, Any]
        Mapping of asset name to partition definitions/mappings.
    named_op_factories : dict[str, OpDefinition]
        Mapping of graph-op name to op factory.
    named_resources : dict[str, ResourceDefinition]
        Mapping of resource name to resource definition.
    named_executors : dict[str, ExecutorDefinition]
        Mapping of executor name to executor definition.
    named_loggers : dict[str, LoggerDefinition]
        Mapping of logger name to logger definition.
    enable_mlflow : bool
        Whether MLflow integration is enabled.

    See Also
    --------
    `kedro_dagster.translator.KedroProjectTranslator` :
        Orchestrates the full project translation.
    `kedro_dagster.nodes.NodeTranslator` :
        Translates individual Kedro nodes.
    """

    def __init__(
        self,
        dagster_config: "KedroDagsterConfig",
        context: "KedroContext",
        catalog: "CatalogProtocol",
        project_path: str,
        env: str,
        run_id: str,
        named_assets: dict[str, dg.AssetsDefinition],
        asset_partitions: dict[str, Any],
        named_op_factories: dict[str, dg.OpDefinition],
        named_resources: dict[str, dg.ResourceDefinition],
        named_executors: dict[str, dg.ExecutorDefinition],
        named_loggers: dict[str, dg.LoggerDefinition],
        enable_mlflow: bool,
    ):
        self._dagster_config = dagster_config
        self._context = context
        self._catalog = catalog
        self._project_path = project_path
        self._env = env
        self._run_id = run_id
        self._hook_manager = context._hook_manager
        self._named_assets = named_assets
        self._asset_partitions = asset_partitions
        self._named_op_factories = named_op_factories
        self._named_resources = named_resources
        self._named_executors = named_executors
        self._named_loggers = named_loggers
        self._enable_mlflow = enable_mlflow

    def _enumerate_partition_keys(self, partitions_def: dg.PartitionsDefinition | None) -> list[str]:
        """Enumerate partition keys for an asset.

        This method assumes the partition definition has already been validated
        by ``DagsterPartitionedDataset``. Only ``StaticPartitionsDefinition``
        should reach here.

        Parameters
        ----------
        partitions_def : PartitionsDefinition or None
            Partitions definition of the asset to enumerate.

        Returns
        -------
        list[str]
            Partition keys.

        See Also
        --------
        `kedro_dagster.pipelines.PipelineTranslator._get_node_partition_keys` :
            Calls this to enumerate keys for partition mapping.
        `kedro_dagster.datasets.partitioned_dataset.DagsterPartitionedDataset` :
            Provides the partition definitions enumerated here.
        """
        if not partitions_def:
            return []

        return list(partitions_def.get_partition_keys())

    def _get_node_partition_keys(self, node: "Node") -> dict[str, str]:
        """Compute downstream partition key per upstream partition for a node.

        Returns a mapping of ``"upstream_asset|partition_key"`` to
        ``"downstream_asset|partition_key"``. If the node is unpartitioned,
        returns an empty mapping.

        Parameters
        ----------
        node : Node
            Kedro node to analyze.

        Returns
        -------
        dict[str, str]
            Mapping from upstream to downstream partition keys.

        See Also
        --------
        `kedro_dagster.pipelines.PipelineTranslator._enumerate_partition_keys` :
            Enumerates partition keys for each asset.
        `kedro_dagster.utils.get_partition_mapping` :
            Resolves the partition mapping between upstream and downstream.
        """
        # Check partitioning consistency among output datasets
        # Output datasets can be either all partitioned or all non-partitioned (excluding nothing datasets)
        out_asset_names = [format_dataset_name(dataset_name) for dataset_name in node.outputs]
        partitioned_out_asset_names = [
            asset_name for asset_name in out_asset_names if asset_name in self._asset_partitions
        ]
        non_partitioned_out_asset_names = [
            asset_name
            for asset_name in out_asset_names
            if asset_name not in self._asset_partitions
            and not is_nothing_asset_name(self._catalog, unformat_asset_name(asset_name))
        ]

        if partitioned_out_asset_names and non_partitioned_out_asset_names:
            partitioned_out_dataset_names = [
                unformat_asset_name(asset_name) for asset_name in partitioned_out_asset_names
            ]
            non_partitioned_out_dataset_names = [
                unformat_asset_name(asset_name) for asset_name in non_partitioned_out_asset_names
            ]
            raise ValueError(
                f"Node '{node.name}' has mixed partitioned and non-partitioned non-nothing outputs: "
                f"partitioned={partitioned_out_dataset_names}, non-partitioned, non-nothing={non_partitioned_out_dataset_names}. "
                "All outputs must be either partitioned or nothing datasets if any output is partitioned."
            )

        downstream_per_upstream_partition_key: dict[str, str] = {}
        for out_dataset_name in node.outputs:
            out_asset_name = format_dataset_name(out_dataset_name)
            if out_asset_name in self._asset_partitions:
                out_partitions_def = self._asset_partitions[out_asset_name].get("partitions_def")

            for in_dataset_name in node.inputs:
                in_asset_name = format_dataset_name(in_dataset_name)
                if in_asset_name in self._asset_partitions and out_asset_name in self._asset_partitions:
                    in_partitions_def = self._asset_partitions[in_asset_name].get("partitions_def")
                    partition_mappings = self._asset_partitions[in_asset_name].get("partition_mappings", None)

                    in_partition_keys = self._enumerate_partition_keys(in_partitions_def)
                    partition_mapping = get_partition_mapping(
                        partition_mappings=partition_mappings,
                        upstream_asset_name=in_dataset_name,
                        downstream_dataset_names=[out_dataset_name],
                        config_resolver=self._catalog._config_resolver,
                    )

                    if partition_mapping is None:
                        # Identity mapping: same key propagated downstream
                        partition_mapping = dg.IdentityPartitionMapping()

                    for in_partition_key in in_partition_keys:
                        mapped_downstream_key = partition_mapping.get_downstream_partitions_for_partitions(
                            upstream_partitions_subset=in_partitions_def.empty_subset().with_partition_keys([
                                in_partition_key
                            ]),
                            upstream_partitions_def=in_partitions_def,
                            downstream_partitions_def=out_partitions_def,
                        )[0]
                        mapped_downstream_key = list(mapped_downstream_key)[0]
                        downstream_per_upstream_partition_key[f"{in_asset_name}|{in_partition_key}"] = (
                            f"{out_asset_name}|{mapped_downstream_key}"
                        )

        return downstream_per_upstream_partition_key

    def _create_before_pipeline_run_hook(self, job_name: str, pipeline: Pipeline) -> dg.OpDefinition:
        """Create the pipeline hook op executed before the pipeline run.

        Parameters
        ----------
        job_name : str
            Job name.
        pipeline : Pipeline
            Kedro pipeline.

        Returns
        -------
        OpDefinition
            Op that triggers before-pipeline-run hooks.
        """
        required_resource_keys = {"kedro_run"}
        # Only require mlflow if provided in named resources
        if "mlflow" in self._named_resources:
            required_resource_keys.add("mlflow")

        @dg.op(
            name=f"before_pipeline_run_hook_{job_name}",
            description=f"Hook to be executed before the `{job_name}` pipeline run.",
            out={"before_pipeline_run_hook_output": dg.Out(dagster_type=dg.Nothing)},
            required_resource_keys=required_resource_keys,
        )
        def before_pipeline_run_hook_op(context: dg.OpExecutionContext) -> dg.Nothing:
            """Op that fires Kedro before-pipeline-run hooks."""
            kedro_run_resource = context.resources.kedro_run
            kedro_run_resource.after_context_created_hook()
            kedro_run_resource.after_catalog_created_hook()

            self._hook_manager.hook.before_pipeline_run(
                run_params=kedro_run_resource.run_params,
                pipeline=pipeline,
                catalog=self._catalog,
            )

        return before_pipeline_run_hook_op

    def _create_after_pipeline_run_hook_op(
        self,
        job_name: str,
        pipeline: Pipeline,
        after_pipeline_run_asset_names: list[str],
    ) -> dg.OpDefinition:
        """Create the pipeline hook op executed after the pipeline run.

        Parameters
        ----------
        job_name : str
            Job name.
        pipeline : Pipeline
            Kedro pipeline.
        after_pipeline_run_asset_names : list[str]
            Names of Nothing inputs to fan-in.

        Returns
        -------
        OpDefinition
            Op that triggers after-pipeline-run hooks.
        """
        after_pipeline_run_hook_ins: dict[str, dg.In] = {}
        for asset_name in after_pipeline_run_asset_names:
            after_pipeline_run_hook_ins[asset_name] = dg.In(dagster_type=dg.Nothing)

        required_resource_keys = {"kedro_run"}
        # Only require mlflow if provided in named resources
        if "mlflow" in self._named_resources:
            required_resource_keys.add("mlflow")

        @dg.op(
            name=f"after_pipeline_run_hook_{job_name}",
            description=f"Hook to be executed after the `{job_name}` pipeline run.",
            ins=after_pipeline_run_hook_ins,
            required_resource_keys=required_resource_keys,
        )
        def after_pipeline_run_hook_op(context: dg.OpExecutionContext) -> dg.Nothing:
            """Op that fires Kedro after-pipeline-run hooks."""
            kedro_run_resource = context.resources.kedro_run
            run_params = kedro_run_resource.run_params

            # NOTE: We set run_results=None because, in the Dagster context, we do not have access
            # to the Kedro run results dictionary (mapping dataset names to DatasetSaveError objects).
            # This means that hooks relying on run_results for error reporting or post-processing
            # will not receive this information. This is a known limitation of the Dagster integration.
            with warnings.catch_warnings():
                warnings.filterwarnings(
                    "ignore",
                    r"Argument(s) 'run_result' which are declared in the hookspec cannot be found in this hook call",
                    UserWarning,
                    # NOTE: This filter depends on pluggy internals ('pluggy._hooks').
                    # If pluggy changes its internal structure, this filter may need to be updated.
                    "pluggy._hooks",
                )
                self._hook_manager.hook.after_pipeline_run(
                    run_results=None,
                    run_params=run_params,
                    pipeline=pipeline,
                    catalog=self._catalog,
                )

        return after_pipeline_run_hook_op

    def translate_pipeline(
        self,
        pipeline: Pipeline,
        pipeline_name: str,
        filter_params: dict[str, Any],
        job_name: str,
        executor_def: dg.ExecutorDefinition | None = None,
        logger_defs: dict[str, dg.LoggerDefinition] | None = None,
        loggers_config: dict[str, Any] | None = None,
    ) -> dg.JobDefinition:
        """Translate a Kedro pipeline into a Dagster job with partition support.

        This method implements static fan-out for partitioned datasets:

        - Nodes with partitioned outputs/inputs are cloned for each partition
          key.
        - Partition mappings define relationships between upstream and
          downstream partitions.
        - Identity mapping is used by default (same partition key across
          assets).

        Parameters
        ----------
        pipeline : Pipeline
            Kedro pipeline.
        pipeline_name : str
            Name of the Kedro pipeline.
        filter_params : dict[str, Any]
            Filter parameters for the pipeline.
        job_name : str
            Name of the job.
        executor_def : ExecutorDefinition or None, optional
            Executor definition.
        logger_defs : dict[str, LoggerDefinition] or None, optional
            Logger definitions.
        loggers_config : dict[str, Any] or None, optional
            Logger configurations.

        Returns
        -------
        JobDefinition
            Dagster job definition with partition-aware ops.

        See Also
        --------
        `kedro_dagster.pipelines.PipelineTranslator.to_dagster` :
            Iterates over configured jobs and calls this method.
        """
        before_pipeline_run_hook_op = self._create_before_pipeline_run_hook(job_name, pipeline)

        @dg.graph(
            name=f"{self._env}__{job_name}",
            description=f"Job derived from pipeline associated to `{job_name}` in env `{self._env}`.",
            out=None,
        )
        def pipeline_graph() -> None:
            """Compose the Dagster graph for a single Kedro pipeline."""
            before_pipeline_run_hook_output = before_pipeline_run_hook_op()

            # Collect input assets
            materialized_in_assets: dict[str, Any] = {}
            for dataset_name in pipeline.inputs():
                asset_name = format_dataset_name(dataset_name)
                if not _is_param_name(dataset_name):
                    # External assets first
                    if asset_name in self._named_assets:
                        materialized_in_assets[asset_name] = self._named_assets[asset_name]
                    else:
                        asset_key = get_asset_key_from_dataset_name(dataset_name, self._env)
                        materialized_in_assets[asset_name] = dg.AssetSpec(key=asset_key).with_io_manager_key(
                            f"{self._env}__{asset_name}_io_manager"
                        )

            partitioned_out_assets: dict[str, dict[str, Any]] = {}
            after_pipeline_run_hook_inputs: dict[str, Any] = {}

            n_layers = len(pipeline.grouped_nodes)
            # Iterate in topological order over layers of the pipeline
            for i_layer, layer in enumerate(pipeline.grouped_nodes):
                is_node_in_first_layer = i_layer == 0
                is_node_in_last_layer = i_layer == n_layers - 1

                for node in layer:
                    downstream_per_upstream_partition_key = self._get_node_partition_keys(node)

                    op_name = format_node_name(node.name) + "_graph"

                    inputs_kwargs: dict[str, Any] = {}
                    for in_dataset_name in node.inputs:
                        in_asset_name = format_dataset_name(in_dataset_name)
                        if _is_param_name(in_dataset_name):
                            continue

                        if in_asset_name in materialized_in_assets:
                            inputs_kwargs[in_asset_name] = materialized_in_assets[in_asset_name]

                    if is_node_in_first_layer:
                        inputs_kwargs["before_pipeline_run_hook_output"] = before_pipeline_run_hook_output

                    if not downstream_per_upstream_partition_key:
                        # Unpartitioned single invocation
                        base_op = self._named_op_factories[op_name](
                            is_in_first_layer=is_node_in_first_layer,
                            is_in_last_layer=is_node_in_last_layer,
                        )

                        partition_keys_per_in_asset_names = {}
                        for asset_name, asset_partitions_val in partitioned_out_assets.items():
                            dataset_name = unformat_asset_name(asset_name)
                            if asset_name in base_op.ins and is_nothing_asset_name(self._catalog, dataset_name):
                                partition_keys_per_in_asset_names[asset_name] = [
                                    format_partition_key(partition_key) for partition_key in asset_partitions_val
                                ]
                                for partition_key, asset_partition_val in asset_partitions_val.items():
                                    formatted_partition_key = format_partition_key(partition_key)
                                    inputs_kwargs[asset_name + f"__{formatted_partition_key}"] = asset_partition_val

                        op = self._named_op_factories[op_name](
                            is_in_first_layer=is_node_in_first_layer,
                            is_in_last_layer=is_node_in_last_layer,
                            partition_keys_per_in_asset_names=partition_keys_per_in_asset_names,
                        )
                        res = op(**inputs_kwargs)

                        # Capture outputs
                        if hasattr(res, "output_name"):
                            # Single output
                            materialized_out_assets_op = {res.output_name: res}
                        else:
                            materialized_out_assets_op = {out_handle.output_name: out_handle for out_handle in res}

                        for out_dataset in node.outputs:
                            out_asset_name = format_dataset_name(out_dataset)
                            out_asset = materialized_out_assets_op.get(out_asset_name)

                            if out_asset is not None:
                                materialized_in_assets[out_asset_name] = out_asset

                        for out_asset_name, out_asset in materialized_out_assets_op.items():
                            if out_asset_name.endswith("_after_pipeline_run_hook_input"):
                                after_pipeline_run_hook_inputs[out_asset_name] = out_asset
                        continue

                    # Partitioned: fan out node execution per partition key
                    for (
                        in_asset_partition_key,
                        out_asset_partition_key,
                    ) in downstream_per_upstream_partition_key.items():
                        op_partition_keys = {
                            "upstream_partition_key": in_asset_partition_key,
                            "downstream_partition_key": out_asset_partition_key,
                        }
                        op = self._named_op_factories[op_name](
                            is_in_first_layer=is_node_in_first_layer,
                            is_in_last_layer=is_node_in_last_layer,
                            partition_keys=op_partition_keys,
                        )

                        res = op(**inputs_kwargs)

                        # Capture outputs
                        if hasattr(res, "output_name"):
                            # Single output
                            materialized_out_assets_op = {res.output_name: res}
                        else:
                            materialized_out_assets_op = {out_handle.output_name: out_handle for out_handle in res}

                        for out_asset_name, out_asset in materialized_out_assets_op.items():
                            out_partition_key = out_asset_partition_key.split("|")[1]
                            formatted_out_partition_key = format_partition_key(out_partition_key)

                            if not out_asset_name.endswith("_after_pipeline_run_hook_input"):
                                partitioned_out_assets.setdefault(out_asset_name, {})[out_partition_key] = out_asset
                            elif is_node_in_last_layer:
                                after_pipeline_run_hook_in_name = (
                                    out_asset_name.split("_after_pipeline_run_hook_input")[0]
                                    + f"__{formatted_out_partition_key}"
                                    + "_after_pipeline_run_hook_input"
                                )
                                after_pipeline_run_hook_inputs[after_pipeline_run_hook_in_name] = out_asset

            after_pipeline_run_hook_op = self._create_after_pipeline_run_hook_op(
                job_name, pipeline, list(after_pipeline_run_hook_inputs.keys())
            )
            after_pipeline_run_hook_op(**after_pipeline_run_hook_inputs)

        kedro_run_translator = KedroRunTranslator(
            context=self._context,
            catalog=self._catalog,
            project_path=self._project_path,
            env=self._env,
            run_id=self._run_id,
        )

        kedro_run_resource = kedro_run_translator.to_dagster(
            pipeline_name=pipeline_name,
            filter_params=filter_params,
        )
        resource_defs = {"kedro_run": kedro_run_resource}

        for dataset_name in pipeline.all_inputs() | pipeline.all_outputs():
            asset_name = format_dataset_name(dataset_name)
            if f"{self._env}__{asset_name}_io_manager" in self._named_resources:
                resource_defs[f"{self._env}__{asset_name}_io_manager"] = self._named_resources[
                    f"{self._env}__{asset_name}_io_manager"
                ]

        # Expose mlflow resource only if provided
        if "mlflow" in self._named_resources:
            resource_defs |= {"mlflow": self._named_resources["mlflow"]}

        job = pipeline_graph.to_job(
            name=f"{self._env}__{job_name}",
            resource_defs=resource_defs,
            executor_def=executor_def,
            logger_defs=logger_defs,
            config=loggers_config,
        )

        return job

    def to_dagster(self) -> dict[str, dg.JobDefinition]:
        """Translate the Kedro pipelines into Dagster jobs.

        Returns
        -------
        dict[str, JobDefinition]
            Translated Dagster jobs keyed by job name.

        See Also
        --------
        `kedro_dagster.pipelines.PipelineTranslator.translate_pipeline` :
            Translates a single Kedro pipeline.
        """
        LOGGER.info("Translating Kedro pipelines to Dagster jobs...")
        # Lazy import to avoid circular dependency
        from kedro.framework.project import pipelines

        named_jobs: dict[str, dg.JobDefinition] = {}
        if self._dagster_config.jobs is None:
            LOGGER.debug("No jobs defined in configuration")
            return named_jobs

        LOGGER.debug(f"Processing {len(self._dagster_config.jobs)} job(s)")
        for job_name, job_config in self._dagster_config.jobs.items():
            LOGGER.debug(f"Translating job '{job_name}'...")
            pipeline_config = job_config.pipeline

            pipeline_name = pipeline_config.pipeline_name
            filter_params = get_filter_params_dict(pipeline_config.model_dump())
            pipeline = pipelines.get(pipeline_name).filter(**filter_params)

            # Handle executor configuration (string reference or inline config)
            executor_def = None
            executor_config = job_config.executor
            if executor_config is not None:
                if isinstance(executor_config, str):
                    # String reference to named executor
                    if executor_config in self._named_executors:
                        executor_def = self._named_executors[executor_config]
                    else:
                        msg = f"Executor '{executor_config}' not found. Available executors: {list(self._named_executors.keys())}"
                        LOGGER.error(msg)
                        raise ValueError(msg)
                else:
                    # Inline executor configuration - look for job-specific executor
                    job_executor_name = f"{job_name}__executor"
                    if job_executor_name in self._named_executors:
                        executor_def = self._named_executors[job_executor_name]
                    else:
                        msg = f"Job-specific executor '{job_executor_name}' not found. Available executors: {list(self._named_executors.keys())}"
                        LOGGER.error(msg)
                        raise ValueError(msg)

            # Handle logger configurations (string references and/or inline configs)
            logger_defs, logger_configs = {}, {}
            if job_config.loggers:
                LOGGER.debug(f"Processing {len(job_config.loggers)} loggers for job '{job_name}'...")
                for idx, logger_config in enumerate(job_config.loggers):
                    if isinstance(logger_config, str):
                        # String reference to named logger
                        if logger_config in self._named_loggers:
                            logger_defs[logger_config] = self._named_loggers[logger_config]
                        else:
                            msg = f"Logger '{logger_config}' not found. Available loggers: {list(self._named_loggers.keys())}"
                            LOGGER.error(msg)
                            raise ValueError(msg)

                        # If logger_config exists in _named_loggers, then loggers must exist - we assert for mypy
                        assert self._dagster_config.loggers is not None
                        logger_configs[logger_config] = self._dagster_config.loggers[logger_config]

                    else:
                        # Inline logger configuration - look for job-specific logger
                        job_logger_name = f"{job_name}__logger_{idx}"
                        if job_logger_name in self._named_loggers:
                            logger_defs[job_logger_name] = self._named_loggers[job_logger_name]
                        else:
                            msg = f"Job-specific logger '{job_logger_name}' for inline logger configuration not found."
                            LOGGER.error(msg)
                            raise ValueError(msg)

                        logger_configs[job_logger_name] = logger_config

            loggers_config = {}
            if logger_configs:
                loggers_config = {
                    "loggers": {
                        name: {"config": logger_config.model_dump()} for name, logger_config in logger_configs.items()
                    }
                }

            job = self.translate_pipeline(
                pipeline=pipeline,
                pipeline_name=pipeline_name,
                filter_params=filter_params,
                job_name=job_name,
                executor_def=executor_def,
                logger_defs=logger_defs,
                loggers_config=loggers_config,
            )

            named_jobs[job_name] = job
            LOGGER.debug(f"Successfully translated job '{job_name}'")

        LOGGER.debug(f"Translated {len(named_jobs)} Dagster job(s)")
        return named_jobs

Methods

translate_pipeline(pipeline, pipeline_name, filter_params, job_name, executor_def=None, logger_defs=None, loggers_config=None)

Translate a Kedro pipeline into a Dagster job with partition support.

This method implements static fan-out for partitioned datasets:

  • Nodes with partitioned outputs/inputs are cloned for each partition key.
  • Partition mappings define relationships between upstream and downstream partitions.
  • Identity mapping is used by default (same partition key across assets).
Parameters
Name Type Description Default
pipeline Pipeline

Kedro pipeline.

required
pipeline_name str

Name of the Kedro pipeline.

required
filter_params dict[str, Any]

Filter parameters for the pipeline.

required
job_name str

Name of the job.

required
executor_def ExecutorDefinition or None

Executor definition.

None
logger_defs dict[str, LoggerDefinition] or None

Logger definitions.

None
loggers_config dict[str, Any] or None

Logger configurations.

None
Returns
Type Description
JobDefinition

Dagster job definition with partition-aware ops.

See Also

kedro_dagster.pipelines.PipelineTranslator.to_dagster : Iterates over configured jobs and calls this method.

Source Code
Show/Hide source
def translate_pipeline(
    self,
    pipeline: Pipeline,
    pipeline_name: str,
    filter_params: dict[str, Any],
    job_name: str,
    executor_def: dg.ExecutorDefinition | None = None,
    logger_defs: dict[str, dg.LoggerDefinition] | None = None,
    loggers_config: dict[str, Any] | None = None,
) -> dg.JobDefinition:
    """Translate a Kedro pipeline into a Dagster job with partition support.

    This method implements static fan-out for partitioned datasets:

    - Nodes with partitioned outputs/inputs are cloned for each partition
      key.
    - Partition mappings define relationships between upstream and
      downstream partitions.
    - Identity mapping is used by default (same partition key across
      assets).

    Parameters
    ----------
    pipeline : Pipeline
        Kedro pipeline.
    pipeline_name : str
        Name of the Kedro pipeline.
    filter_params : dict[str, Any]
        Filter parameters for the pipeline.
    job_name : str
        Name of the job.
    executor_def : ExecutorDefinition or None, optional
        Executor definition.
    logger_defs : dict[str, LoggerDefinition] or None, optional
        Logger definitions.
    loggers_config : dict[str, Any] or None, optional
        Logger configurations.

    Returns
    -------
    JobDefinition
        Dagster job definition with partition-aware ops.

    See Also
    --------
    `kedro_dagster.pipelines.PipelineTranslator.to_dagster` :
        Iterates over configured jobs and calls this method.
    """
    before_pipeline_run_hook_op = self._create_before_pipeline_run_hook(job_name, pipeline)

    @dg.graph(
        name=f"{self._env}__{job_name}",
        description=f"Job derived from pipeline associated to `{job_name}` in env `{self._env}`.",
        out=None,
    )
    def pipeline_graph() -> None:
        """Compose the Dagster graph for a single Kedro pipeline."""
        before_pipeline_run_hook_output = before_pipeline_run_hook_op()

        # Collect input assets
        materialized_in_assets: dict[str, Any] = {}
        for dataset_name in pipeline.inputs():
            asset_name = format_dataset_name(dataset_name)
            if not _is_param_name(dataset_name):
                # External assets first
                if asset_name in self._named_assets:
                    materialized_in_assets[asset_name] = self._named_assets[asset_name]
                else:
                    asset_key = get_asset_key_from_dataset_name(dataset_name, self._env)
                    materialized_in_assets[asset_name] = dg.AssetSpec(key=asset_key).with_io_manager_key(
                        f"{self._env}__{asset_name}_io_manager"
                    )

        partitioned_out_assets: dict[str, dict[str, Any]] = {}
        after_pipeline_run_hook_inputs: dict[str, Any] = {}

        n_layers = len(pipeline.grouped_nodes)
        # Iterate in topological order over layers of the pipeline
        for i_layer, layer in enumerate(pipeline.grouped_nodes):
            is_node_in_first_layer = i_layer == 0
            is_node_in_last_layer = i_layer == n_layers - 1

            for node in layer:
                downstream_per_upstream_partition_key = self._get_node_partition_keys(node)

                op_name = format_node_name(node.name) + "_graph"

                inputs_kwargs: dict[str, Any] = {}
                for in_dataset_name in node.inputs:
                    in_asset_name = format_dataset_name(in_dataset_name)
                    if _is_param_name(in_dataset_name):
                        continue

                    if in_asset_name in materialized_in_assets:
                        inputs_kwargs[in_asset_name] = materialized_in_assets[in_asset_name]

                if is_node_in_first_layer:
                    inputs_kwargs["before_pipeline_run_hook_output"] = before_pipeline_run_hook_output

                if not downstream_per_upstream_partition_key:
                    # Unpartitioned single invocation
                    base_op = self._named_op_factories[op_name](
                        is_in_first_layer=is_node_in_first_layer,
                        is_in_last_layer=is_node_in_last_layer,
                    )

                    partition_keys_per_in_asset_names = {}
                    for asset_name, asset_partitions_val in partitioned_out_assets.items():
                        dataset_name = unformat_asset_name(asset_name)
                        if asset_name in base_op.ins and is_nothing_asset_name(self._catalog, dataset_name):
                            partition_keys_per_in_asset_names[asset_name] = [
                                format_partition_key(partition_key) for partition_key in asset_partitions_val
                            ]
                            for partition_key, asset_partition_val in asset_partitions_val.items():
                                formatted_partition_key = format_partition_key(partition_key)
                                inputs_kwargs[asset_name + f"__{formatted_partition_key}"] = asset_partition_val

                    op = self._named_op_factories[op_name](
                        is_in_first_layer=is_node_in_first_layer,
                        is_in_last_layer=is_node_in_last_layer,
                        partition_keys_per_in_asset_names=partition_keys_per_in_asset_names,
                    )
                    res = op(**inputs_kwargs)

                    # Capture outputs
                    if hasattr(res, "output_name"):
                        # Single output
                        materialized_out_assets_op = {res.output_name: res}
                    else:
                        materialized_out_assets_op = {out_handle.output_name: out_handle for out_handle in res}

                    for out_dataset in node.outputs:
                        out_asset_name = format_dataset_name(out_dataset)
                        out_asset = materialized_out_assets_op.get(out_asset_name)

                        if out_asset is not None:
                            materialized_in_assets[out_asset_name] = out_asset

                    for out_asset_name, out_asset in materialized_out_assets_op.items():
                        if out_asset_name.endswith("_after_pipeline_run_hook_input"):
                            after_pipeline_run_hook_inputs[out_asset_name] = out_asset
                    continue

                # Partitioned: fan out node execution per partition key
                for (
                    in_asset_partition_key,
                    out_asset_partition_key,
                ) in downstream_per_upstream_partition_key.items():
                    op_partition_keys = {
                        "upstream_partition_key": in_asset_partition_key,
                        "downstream_partition_key": out_asset_partition_key,
                    }
                    op = self._named_op_factories[op_name](
                        is_in_first_layer=is_node_in_first_layer,
                        is_in_last_layer=is_node_in_last_layer,
                        partition_keys=op_partition_keys,
                    )

                    res = op(**inputs_kwargs)

                    # Capture outputs
                    if hasattr(res, "output_name"):
                        # Single output
                        materialized_out_assets_op = {res.output_name: res}
                    else:
                        materialized_out_assets_op = {out_handle.output_name: out_handle for out_handle in res}

                    for out_asset_name, out_asset in materialized_out_assets_op.items():
                        out_partition_key = out_asset_partition_key.split("|")[1]
                        formatted_out_partition_key = format_partition_key(out_partition_key)

                        if not out_asset_name.endswith("_after_pipeline_run_hook_input"):
                            partitioned_out_assets.setdefault(out_asset_name, {})[out_partition_key] = out_asset
                        elif is_node_in_last_layer:
                            after_pipeline_run_hook_in_name = (
                                out_asset_name.split("_after_pipeline_run_hook_input")[0]
                                + f"__{formatted_out_partition_key}"
                                + "_after_pipeline_run_hook_input"
                            )
                            after_pipeline_run_hook_inputs[after_pipeline_run_hook_in_name] = out_asset

        after_pipeline_run_hook_op = self._create_after_pipeline_run_hook_op(
            job_name, pipeline, list(after_pipeline_run_hook_inputs.keys())
        )
        after_pipeline_run_hook_op(**after_pipeline_run_hook_inputs)

    kedro_run_translator = KedroRunTranslator(
        context=self._context,
        catalog=self._catalog,
        project_path=self._project_path,
        env=self._env,
        run_id=self._run_id,
    )

    kedro_run_resource = kedro_run_translator.to_dagster(
        pipeline_name=pipeline_name,
        filter_params=filter_params,
    )
    resource_defs = {"kedro_run": kedro_run_resource}

    for dataset_name in pipeline.all_inputs() | pipeline.all_outputs():
        asset_name = format_dataset_name(dataset_name)
        if f"{self._env}__{asset_name}_io_manager" in self._named_resources:
            resource_defs[f"{self._env}__{asset_name}_io_manager"] = self._named_resources[
                f"{self._env}__{asset_name}_io_manager"
            ]

    # Expose mlflow resource only if provided
    if "mlflow" in self._named_resources:
        resource_defs |= {"mlflow": self._named_resources["mlflow"]}

    job = pipeline_graph.to_job(
        name=f"{self._env}__{job_name}",
        resource_defs=resource_defs,
        executor_def=executor_def,
        logger_defs=logger_defs,
        config=loggers_config,
    )

    return job

to_dagster()

Translate the Kedro pipelines into Dagster jobs.

Returns
Type Description
dict[str, JobDefinition]

Translated Dagster jobs keyed by job name.

See Also

kedro_dagster.pipelines.PipelineTranslator.translate_pipeline : Translates a single Kedro pipeline.

Source Code
Show/Hide source
def to_dagster(self) -> dict[str, dg.JobDefinition]:
    """Translate the Kedro pipelines into Dagster jobs.

    Returns
    -------
    dict[str, JobDefinition]
        Translated Dagster jobs keyed by job name.

    See Also
    --------
    `kedro_dagster.pipelines.PipelineTranslator.translate_pipeline` :
        Translates a single Kedro pipeline.
    """
    LOGGER.info("Translating Kedro pipelines to Dagster jobs...")
    # Lazy import to avoid circular dependency
    from kedro.framework.project import pipelines

    named_jobs: dict[str, dg.JobDefinition] = {}
    if self._dagster_config.jobs is None:
        LOGGER.debug("No jobs defined in configuration")
        return named_jobs

    LOGGER.debug(f"Processing {len(self._dagster_config.jobs)} job(s)")
    for job_name, job_config in self._dagster_config.jobs.items():
        LOGGER.debug(f"Translating job '{job_name}'...")
        pipeline_config = job_config.pipeline

        pipeline_name = pipeline_config.pipeline_name
        filter_params = get_filter_params_dict(pipeline_config.model_dump())
        pipeline = pipelines.get(pipeline_name).filter(**filter_params)

        # Handle executor configuration (string reference or inline config)
        executor_def = None
        executor_config = job_config.executor
        if executor_config is not None:
            if isinstance(executor_config, str):
                # String reference to named executor
                if executor_config in self._named_executors:
                    executor_def = self._named_executors[executor_config]
                else:
                    msg = f"Executor '{executor_config}' not found. Available executors: {list(self._named_executors.keys())}"
                    LOGGER.error(msg)
                    raise ValueError(msg)
            else:
                # Inline executor configuration - look for job-specific executor
                job_executor_name = f"{job_name}__executor"
                if job_executor_name in self._named_executors:
                    executor_def = self._named_executors[job_executor_name]
                else:
                    msg = f"Job-specific executor '{job_executor_name}' not found. Available executors: {list(self._named_executors.keys())}"
                    LOGGER.error(msg)
                    raise ValueError(msg)

        # Handle logger configurations (string references and/or inline configs)
        logger_defs, logger_configs = {}, {}
        if job_config.loggers:
            LOGGER.debug(f"Processing {len(job_config.loggers)} loggers for job '{job_name}'...")
            for idx, logger_config in enumerate(job_config.loggers):
                if isinstance(logger_config, str):
                    # String reference to named logger
                    if logger_config in self._named_loggers:
                        logger_defs[logger_config] = self._named_loggers[logger_config]
                    else:
                        msg = f"Logger '{logger_config}' not found. Available loggers: {list(self._named_loggers.keys())}"
                        LOGGER.error(msg)
                        raise ValueError(msg)

                    # If logger_config exists in _named_loggers, then loggers must exist - we assert for mypy
                    assert self._dagster_config.loggers is not None
                    logger_configs[logger_config] = self._dagster_config.loggers[logger_config]

                else:
                    # Inline logger configuration - look for job-specific logger
                    job_logger_name = f"{job_name}__logger_{idx}"
                    if job_logger_name in self._named_loggers:
                        logger_defs[job_logger_name] = self._named_loggers[job_logger_name]
                    else:
                        msg = f"Job-specific logger '{job_logger_name}' for inline logger configuration not found."
                        LOGGER.error(msg)
                        raise ValueError(msg)

                    logger_configs[job_logger_name] = logger_config

        loggers_config = {}
        if logger_configs:
            loggers_config = {
                "loggers": {
                    name: {"config": logger_config.model_dump()} for name, logger_config in logger_configs.items()
                }
            }

        job = self.translate_pipeline(
            pipeline=pipeline,
            pipeline_name=pipeline_name,
            filter_params=filter_params,
            job_name=job_name,
            executor_def=executor_def,
            logger_defs=logger_defs,
            loggers_config=loggers_config,
        )

        named_jobs[job_name] = job
        LOGGER.debug(f"Successfully translated job '{job_name}'")

    LOGGER.debug(f"Translated {len(named_jobs)} Dagster job(s)")
    return named_jobs