Skip to content

K8sJobExecutorOptions

kedro_dagster.config.models.K8sJobExecutorOptions

Bases: MultiprocessExecutorOptions

Options for the Kubernetes-based executor.

Parameters

Name Type Description Default
retries RetriesEnableOptions or RetriesDisableOptions

Retry configuration for the executor.

required
max_concurrent int or None

Maximum number of concurrent processes.

required
job_namespace str

Kubernetes namespace for jobs.

required
load_incluster_config bool

Whether the executor is running within a k8s cluster.

required
kubeconfig_file str or None

Path to a kubeconfig file to use.

required
step_k8s_config K8sJobConfig

Raw Kubernetes configuration for each step.

required
per_step_k8s_config dict[str, K8sJobConfig]

Per op k8s configuration overrides.

required
image_pull_policy str or None

Image pull policy for Pods.

required
image_pull_secrets list[dict[str, str]] or None

Credentials for pulling images.

required
service_account_name str or None

Kubernetes service account name.

required
env_config_maps list[str] or None

ConfigMapEnvSource names for environment variables.

required
env_secrets list[str] or None

Secret names for environment variables.

required
env_vars list[str] or None

Environment variables for the job.

required
volume_mounts list[dict[str, str]]

Volume mounts for the container.

required
volumes list[dict[str, str]]

Volumes for the Pod.

required
labels dict[str, str]

Labels for created pods.

required
resources dict[str, dict[str, str]] or None

Compute resource requirements.

required
scheduler_name str or None

Custom Kubernetes scheduler for Pods.

required
security_context dict[str, str]

Security settings for the container.

required

Examples

executors:
    k8s_exec:
        k8s_job_executor:
            job_namespace: "dagster"
            max_concurrent: 2
            image_pull_policy: IfNotPresent
            resources:
                limits:
                    cpu: "1"
                    memory: "1Gi"
                requests:
                    cpu: "500m"
                    memory: "512Mi"
            labels:
                team: platform
jobs:
    k8s_job:
        pipeline:
            pipeline_name: k8s_pipeline
        executor: k8s_exec

See Also

kedro_dagster.config.models.K8sJobConfig : Kubernetes job configuration consumed by this executor. kedro_dagster.config.models.CeleryK8sJobExecutorOptions : Combines Celery with Kubernetes job settings. kedro_dagster.dagster.ExecutorCreator : Builds Dagster executor definitions from these options.

Source Code

Show/Hide source
class K8sJobExecutorOptions(MultiprocessExecutorOptions):
    """Options for the Kubernetes-based executor.

    Parameters
    ----------
    retries : RetriesEnableOptions or RetriesDisableOptions
        Retry configuration for the executor.
    max_concurrent : int or None
        Maximum number of concurrent processes.
    job_namespace : str
        Kubernetes namespace for jobs.
    load_incluster_config : bool
        Whether the executor is running within a k8s cluster.
    kubeconfig_file : str or None
        Path to a kubeconfig file to use.
    step_k8s_config : K8sJobConfig
        Raw Kubernetes configuration for each step.
    per_step_k8s_config : dict[str, K8sJobConfig]
        Per op k8s configuration overrides.
    image_pull_policy : str or None
        Image pull policy for Pods.
    image_pull_secrets : list[dict[str, str]] or None
        Credentials for pulling images.
    service_account_name : str or None
        Kubernetes service account name.
    env_config_maps : list[str] or None
        ``ConfigMapEnvSource`` names for environment variables.
    env_secrets : list[str] or None
        Secret names for environment variables.
    env_vars : list[str] or None
        Environment variables for the job.
    volume_mounts : list[dict[str, str]]
        Volume mounts for the container.
    volumes : list[dict[str, str]]
        Volumes for the Pod.
    labels : dict[str, str]
        Labels for created pods.
    resources : dict[str, dict[str, str]] or None
        Compute resource requirements.
    scheduler_name : str or None
        Custom Kubernetes scheduler for Pods.
    security_context : dict[str, str]
        Security settings for the container.

    Examples
    --------
    ```yaml
    executors:
        k8s_exec:
            k8s_job_executor:
                job_namespace: "dagster"
                max_concurrent: 2
                image_pull_policy: IfNotPresent
                resources:
                    limits:
                        cpu: "1"
                        memory: "1Gi"
                    requests:
                        cpu: "500m"
                        memory: "512Mi"
                labels:
                    team: platform
    jobs:
        k8s_job:
            pipeline:
                pipeline_name: k8s_pipeline
            executor: k8s_exec
    ```

    See Also
    --------
    `kedro_dagster.config.models.K8sJobConfig` :
        Kubernetes job configuration consumed by this executor.
    `kedro_dagster.config.models.CeleryK8sJobExecutorOptions` :
        Combines Celery with Kubernetes job settings.
    `kedro_dagster.dagster.ExecutorCreator` :
        Builds Dagster executor definitions from these options.
    """

    job_namespace: str = Field(default="dagster")
    load_incluster_config: bool = Field(
        default=True,
        description="""Whether or not the executor is running within a k8s cluster already. If
        the job is using the `K8sRunLauncher`, the default value of this parameter will be
        the same as the corresponding value on the run launcher.
        If ``True``, we assume the executor is running within the target cluster and load config
        using ``kubernetes.config.load_incluster_config``. Otherwise, we will use the k8s config
        specified in ``kubeconfig_file`` (using ``kubernetes.config.load_kube_config``) or fall
        back to the default kubeconfig.""",
    )
    kubeconfig_file: str | None = Field(
        default=None,
        description="""Path to a kubeconfig file to use, if not using default kubeconfig. If
        the job is using the `K8sRunLauncher`, the default value of this parameter will be
        the same as the corresponding value on the run launcher.""",
    )
    step_k8s_config: K8sJobConfig = Field(
        default=K8sJobConfig(),
        description="Raw Kubernetes configuration for each step launched by the executor.",
    )
    per_step_k8s_config: dict[str, K8sJobConfig] = Field(
        default={},
        description="Per op k8s configuration overrides.",
    )
    image_pull_policy: str | None = Field(
        default=None,
        description="Image pull policy to set on launched Pods.",
    )
    image_pull_secrets: list[dict[str, str]] | None = Field(
        default=None,
        description="Specifies that Kubernetes should get the credentials from the Secrets named in this list.",
    )
    service_account_name: str | None = Field(
        default=None,
        description="The name of the Kubernetes service account under which to run.",
    )
    env_config_maps: list[str] | None = Field(
        default=None,
        description="A list of custom ConfigMapEnvSource names from which to draw environment variables (using ``envFrom``) for the Job. Default: ``[]``.",
    )
    env_secrets: list[str] | None = Field(
        default=None,
        description="A list of custom Secret names from which to draw environment variables (using ``envFrom``) for the Job. Default: ``[]``.",
    )
    env_vars: list[str] | None = Field(
        default=None,
        description="A list of environment variables to inject into the Job. Each can be of the form KEY=VALUE or just KEY (in which case the value will be pulled from the current process). Default: ``[]``.",
    )
    volume_mounts: list[dict[str, str]] = Field(
        default=[],
        description="A list of volume mounts to include in the job's container. Default: ``[]``.",
    )
    volumes: list[dict[str, str]] = Field(
        default=[],
        description="A list of volumes to include in the Job's Pod. Default: ``[]``.",
    )
    labels: dict[str, str] = Field(
        default={},
        description="Labels to apply to all created pods.",
    )
    resources: dict[str, dict[str, str]] | None = Field(
        default=None,
        description="Compute resource requirements for the container.",
    )
    scheduler_name: str | None = Field(
        default=None,
        description="Use a custom Kubernetes scheduler for launched Pods.",
    )
    security_context: dict[str, str] = Field(
        default={},
        description="Security settings for the container.",
    )