Skip to content

ScheduleCreator

kedro_dagster.dagster.ScheduleCreator

Create Dagster schedule definitions from Kedro configuration.

Parameters

Name Type Description Default
dagster_config KedroDagsterConfig

Parsed Kedro-Dagster config containing schedule entries.

required
named_jobs dict[str, JobDefinition]

Mapping of job names to Dagster job definitions.

required

See Also

kedro_dagster.dagster.ExecutorCreator : Creates executor definitions from configuration. kedro_dagster.dagster.LoggerCreator : Creates logger definitions from configuration. kedro_dagster.config.automation.ScheduleOptions : Schedule option model.

Source Code

Show/Hide source
class ScheduleCreator:
    """Create Dagster schedule definitions from Kedro configuration.

    Parameters
    ----------
    dagster_config : KedroDagsterConfig
        Parsed Kedro-Dagster config containing schedule entries.
    named_jobs : dict[str, JobDefinition]
        Mapping of job names to Dagster job definitions.

    See Also
    --------
    `kedro_dagster.dagster.ExecutorCreator` :
        Creates executor definitions from configuration.
    `kedro_dagster.dagster.LoggerCreator` :
        Creates logger definitions from configuration.
    `kedro_dagster.config.automation.ScheduleOptions` :
        Schedule option model.
    """

    def __init__(self, dagster_config: "KedroDagsterConfig", named_jobs: dict[str, dg.JobDefinition]):
        self._dagster_config = dagster_config
        self._named_jobs = named_jobs

    def create_schedules(self) -> dict[str, dg.ScheduleDefinition]:
        """Create schedule definitions from the configuration.

        Returns
        -------
        dict[str, ScheduleDefinition]
            Dict of schedule definitions keyed by job name.

        See Also
        --------
        `kedro_dagster.config.automation.ScheduleOptions` :
            Schedule option model used as input.
        """
        LOGGER.info("Creating Dagster schedules...")
        named_schedule_config = {}
        if self._dagster_config.schedules is not None:
            for schedule_name, schedule_config in self._dagster_config.schedules.items():
                LOGGER.debug(f"Registering schedule '{schedule_name}'...")
                named_schedule_config[schedule_name] = schedule_config.model_dump()

        available_schedule_names = set(named_schedule_config.keys())

        named_schedules = {}
        if self._dagster_config.jobs is not None:
            for job_name, job_config in self._dagster_config.jobs.items():
                if job_config.schedule is not None:
                    LOGGER.debug(f"Creating schedule for job '{job_name}'...")
                    if isinstance(job_config.schedule, str):
                        schedule_name = job_config.schedule
                        if schedule_name in named_schedule_config:
                            schedule = dg.ScheduleDefinition(
                                name=f"{job_name}_{schedule_name}_schedule",
                                job=self._named_jobs[job_name],
                                **named_schedule_config[schedule_name],
                            )
                        else:
                            msg = (
                                f"Schedule named '{schedule_name}' for job '{job_name}' not found. "
                                f"Available schedules: {sorted(available_schedule_names)}"
                            )
                            LOGGER.error(msg)
                            raise ValueError(msg)
                    else:
                        # If schedule_config is not a string, create schedule definition using inline config
                        schedule = dg.ScheduleDefinition(
                            name=f"{job_name}__schedule",
                            job=self._named_jobs[job_name],
                            **job_config.schedule.model_dump(),
                        )

                    named_schedules[job_name] = schedule

        LOGGER.debug(f"Created {len(named_schedules)} schedule(s)")
        return named_schedules

Methods

create_schedules()

Create schedule definitions from the configuration.

Returns
Type Description
dict[str, ScheduleDefinition]

Dict of schedule definitions keyed by job name.

See Also

kedro_dagster.config.automation.ScheduleOptions : Schedule option model used as input.

Source Code
Show/Hide source
def create_schedules(self) -> dict[str, dg.ScheduleDefinition]:
    """Create schedule definitions from the configuration.

    Returns
    -------
    dict[str, ScheduleDefinition]
        Dict of schedule definitions keyed by job name.

    See Also
    --------
    `kedro_dagster.config.automation.ScheduleOptions` :
        Schedule option model used as input.
    """
    LOGGER.info("Creating Dagster schedules...")
    named_schedule_config = {}
    if self._dagster_config.schedules is not None:
        for schedule_name, schedule_config in self._dagster_config.schedules.items():
            LOGGER.debug(f"Registering schedule '{schedule_name}'...")
            named_schedule_config[schedule_name] = schedule_config.model_dump()

    available_schedule_names = set(named_schedule_config.keys())

    named_schedules = {}
    if self._dagster_config.jobs is not None:
        for job_name, job_config in self._dagster_config.jobs.items():
            if job_config.schedule is not None:
                LOGGER.debug(f"Creating schedule for job '{job_name}'...")
                if isinstance(job_config.schedule, str):
                    schedule_name = job_config.schedule
                    if schedule_name in named_schedule_config:
                        schedule = dg.ScheduleDefinition(
                            name=f"{job_name}_{schedule_name}_schedule",
                            job=self._named_jobs[job_name],
                            **named_schedule_config[schedule_name],
                        )
                    else:
                        msg = (
                            f"Schedule named '{schedule_name}' for job '{job_name}' not found. "
                            f"Available schedules: {sorted(available_schedule_names)}"
                        )
                        LOGGER.error(msg)
                        raise ValueError(msg)
                else:
                    # If schedule_config is not a string, create schedule definition using inline config
                    schedule = dg.ScheduleDefinition(
                        name=f"{job_name}__schedule",
                        job=self._named_jobs[job_name],
                        **job_config.schedule.model_dump(),
                    )

                named_schedules[job_name] = schedule

    LOGGER.debug(f"Created {len(named_schedules)} schedule(s)")
    return named_schedules