Create Dagster schedule definitions from Kedro configuration.
Parameters
| Name |
Type |
Description |
Default |
dagster_config
|
KedroDagsterConfig
|
Parsed Kedro-Dagster config containing schedule entries.
|
required
|
named_jobs
|
dict[str, JobDefinition]
|
Mapping of job names to Dagster job definitions.
|
required
|
See Also
kedro_dagster.dagster.ExecutorCreator :
Creates executor definitions from configuration.
kedro_dagster.dagster.LoggerCreator :
Creates logger definitions from configuration.
kedro_dagster.config.automation.ScheduleOptions :
Schedule option model.
Source Code
View on GitHub
Show/Hide source
| class ScheduleCreator:
"""Create Dagster schedule definitions from Kedro configuration.
Parameters
----------
dagster_config : KedroDagsterConfig
Parsed Kedro-Dagster config containing schedule entries.
named_jobs : dict[str, JobDefinition]
Mapping of job names to Dagster job definitions.
See Also
--------
`kedro_dagster.dagster.ExecutorCreator` :
Creates executor definitions from configuration.
`kedro_dagster.dagster.LoggerCreator` :
Creates logger definitions from configuration.
`kedro_dagster.config.automation.ScheduleOptions` :
Schedule option model.
"""
def __init__(self, dagster_config: "KedroDagsterConfig", named_jobs: dict[str, dg.JobDefinition]):
self._dagster_config = dagster_config
self._named_jobs = named_jobs
def create_schedules(self) -> dict[str, dg.ScheduleDefinition]:
"""Create schedule definitions from the configuration.
Returns
-------
dict[str, ScheduleDefinition]
Dict of schedule definitions keyed by job name.
See Also
--------
`kedro_dagster.config.automation.ScheduleOptions` :
Schedule option model used as input.
"""
LOGGER.info("Creating Dagster schedules...")
named_schedule_config = {}
if self._dagster_config.schedules is not None:
for schedule_name, schedule_config in self._dagster_config.schedules.items():
LOGGER.debug(f"Registering schedule '{schedule_name}'...")
named_schedule_config[schedule_name] = schedule_config.model_dump()
available_schedule_names = set(named_schedule_config.keys())
named_schedules = {}
if self._dagster_config.jobs is not None:
for job_name, job_config in self._dagster_config.jobs.items():
if job_config.schedule is not None:
LOGGER.debug(f"Creating schedule for job '{job_name}'...")
if isinstance(job_config.schedule, str):
schedule_name = job_config.schedule
if schedule_name in named_schedule_config:
schedule = dg.ScheduleDefinition(
name=f"{job_name}_{schedule_name}_schedule",
job=self._named_jobs[job_name],
**named_schedule_config[schedule_name],
)
else:
msg = (
f"Schedule named '{schedule_name}' for job '{job_name}' not found. "
f"Available schedules: {sorted(available_schedule_names)}"
)
LOGGER.error(msg)
raise ValueError(msg)
else:
# If schedule_config is not a string, create schedule definition using inline config
schedule = dg.ScheduleDefinition(
name=f"{job_name}__schedule",
job=self._named_jobs[job_name],
**job_config.schedule.model_dump(),
)
named_schedules[job_name] = schedule
LOGGER.debug(f"Created {len(named_schedules)} schedule(s)")
return named_schedules
|
Methods
create_schedules()
Create schedule definitions from the configuration.
Returns
| Type |
Description |
dict[str, ScheduleDefinition]
|
Dict of schedule definitions keyed by job name.
|
See Also
kedro_dagster.config.automation.ScheduleOptions :
Schedule option model used as input.
Source Code
View on GitHub
Show/Hide source
| def create_schedules(self) -> dict[str, dg.ScheduleDefinition]:
"""Create schedule definitions from the configuration.
Returns
-------
dict[str, ScheduleDefinition]
Dict of schedule definitions keyed by job name.
See Also
--------
`kedro_dagster.config.automation.ScheduleOptions` :
Schedule option model used as input.
"""
LOGGER.info("Creating Dagster schedules...")
named_schedule_config = {}
if self._dagster_config.schedules is not None:
for schedule_name, schedule_config in self._dagster_config.schedules.items():
LOGGER.debug(f"Registering schedule '{schedule_name}'...")
named_schedule_config[schedule_name] = schedule_config.model_dump()
available_schedule_names = set(named_schedule_config.keys())
named_schedules = {}
if self._dagster_config.jobs is not None:
for job_name, job_config in self._dagster_config.jobs.items():
if job_config.schedule is not None:
LOGGER.debug(f"Creating schedule for job '{job_name}'...")
if isinstance(job_config.schedule, str):
schedule_name = job_config.schedule
if schedule_name in named_schedule_config:
schedule = dg.ScheduleDefinition(
name=f"{job_name}_{schedule_name}_schedule",
job=self._named_jobs[job_name],
**named_schedule_config[schedule_name],
)
else:
msg = (
f"Schedule named '{schedule_name}' for job '{job_name}' not found. "
f"Available schedules: {sorted(available_schedule_names)}"
)
LOGGER.error(msg)
raise ValueError(msg)
else:
# If schedule_config is not a string, create schedule definition using inline config
schedule = dg.ScheduleDefinition(
name=f"{job_name}__schedule",
job=self._named_jobs[job_name],
**job_config.schedule.model_dump(),
)
named_schedules[job_name] = schedule
LOGGER.debug(f"Created {len(named_schedules)} schedule(s)")
return named_schedules
|