Custom Job

Run KFP components as Vertex AI Custom Training Jobs with customized worker and cloud configurations.

Components:

CustomTrainingJobOp(display_name, gcp_resources)

Launch a Vertex AI custom training job using the CustomJob API.

Functions:

create_custom_training_job_from_component(...)

Convert a KFP component into Vertex AI custom training job using the CustomJob API.

v1.custom_job.CustomTrainingJobOp(display_name: str, gcp_resources: dsl.OutputPath(str), location: str = 'us-central1', worker_pool_specs: list[dict[str, str]] = [], timeout: str = '604800s', restart_job_on_worker_restart: bool = False, service_account: str = '', tensorboard: str = '', enable_web_access: bool = False, network: str = '', reserved_ip_ranges: list[str] = [], base_output_directory: str = '', labels: dict[str, str] = {}, encryption_spec_key_name: str = '', project: str = '{{$.pipeline_google_cloud_project_id}}')

Launch a Vertex AI custom training job using the CustomJob API. See Create custom training jobs for more information.

Parameters:
location: str = 'us-central1'

Location for creating the custom training job. If not set, default to us-central1.

display_name: str

The name of the CustomJob.

worker_pool_specs: list[dict[str, str]] = []

Serialized json spec of the worker pools including machine type and Docker image. All worker pools except the first one are optional and can be skipped by providing an empty value. See more information.

timeout: str = '604800s'

The maximum job running time. The default is 7 days. A duration in seconds with up to nine fractional digits, terminated by ‘s’, for example: “3.5s”.

restart_job_on_worker_restart: bool = False

Restarts the entire CustomJob if a worker gets restarted. This feature can be used by distributed training jobs that are not resilient to workers leaving and joining a job.

service_account: str = ''

Sets the default service account for workload run-as account. The service account running the pipeline submitting jobs must have act-as permission on this run-as account. If unspecified, the Vertex AI Custom Code Service Agent for the CustomJob’s project.

tensorboard: str = ''

The name of a Vertex AI TensorBoard resource to which this CustomJob will upload TensorBoard logs.

enable_web_access: bool = False

Whether you want Vertex AI to enable interactive shell access to training containers. If True, you can access interactive shells at the URIs given by [CustomJob.web_access_uris][].

network: str = ''

The full name of the Compute Engine network to which the job should be peered. For example, projects/12345/global/networks/myVPC. Format is of the form projects/{project}/global/networks/{network}. Where {project} is a project number, as in 12345, and {network} is a network name. Private services access must already be configured for the network. If left unspecified, the job is not peered with any network.

reserved_ip_ranges: list[str] = []

A list of names for the reserved IP ranges under the VPC network that can be used for this job. If set, we will deploy the job within the provided IP ranges. Otherwise, the job will be deployed to any IP ranges under the provided VPC network.

base_output_directory: str = ''

The Cloud Storage location to store the output of this CustomJob or HyperparameterTuningJob. See more information.

labels: dict[str, str] = {}

The labels with user-defined metadata to organize the CustomJob. See more information.

encryption_spec_key_name: str = ''

Customer-managed encryption key options for the CustomJob. If this is set, then all resources created by the CustomJob will be encrypted with the provided encryption key.

project: str = '{{$.pipeline_google_cloud_project_id}}'

Project to create the custom training job in. Defaults to the project in which the PipelineJob is run.

Returns:

gcp_resources: dsl.OutputPath(str)

Serialized JSON of gcp_resources [proto](https://github.com/kubeflow/pipelines/tree/master/components/google-cloud/google_cloud_pipeline_components/proto) which tracks the CustomJob.

v1.custom_job.create_custom_training_job_from_component(component_spec: Callable, display_name: str = '', replica_count: int = 1, machine_type: str = 'n1-standard-4', accelerator_type: str = '', accelerator_count: int = 1, boot_disk_type: str = 'pd-ssd', boot_disk_size_gb: int = 100, timeout: str = '604800s', restart_job_on_worker_restart: bool = False, service_account: str = '', network: str = '', encryption_spec_key_name: str = '', tensorboard: str = '', enable_web_access: bool = False, reserved_ip_ranges: list[str] | None = None, nfs_mounts: list[dict[str, str]] | None = None, base_output_directory: str = '', labels: dict[str, str] | None = None, env: list[dict[str, str]] | None = None) Callable[source]

Convert a KFP component into Vertex AI custom training job using the CustomJob API.

This utility converts a KFP component provided to component_spec into CustomTrainingJobOp component. Your components inputs, outputs, and logic are carried over, with additional CustomJob parameters exposed. Note that this utility constructs a ClusterSpec where the master and all the workers use the same spec, meaning all disk/machine spec related parameters will apply to all replicas. This is suitable for uses cases such as executing a training component over multiple replicas with MultiWorkerMirroredStrategy or MirroredStrategy. See Create custom training jobs for more information.

Parameters:
component_spec: Callable

A KFP component.

display_name: str = ''

The name of the CustomJob. If not provided the component’s name will be used instead.

replica_count: int = 1

The count of instances in the cluster. One replica always counts towards the master in worker_pool_spec[0] and the remaining replicas will be allocated in worker_pool_spec[1]. See more information.

machine_type: str = 'n1-standard-4'

The type of the machine to run the CustomJob. The default value is “n1-standard-4”. See more information.

accelerator_type: str = ''

The type of accelerator(s) that may be attached to the machine per accelerator_count. See more information.

accelerator_count: int = 1

The number of accelerators to attach to the machine. Defaults to 1 if accelerator_type is set.

boot_disk_type: str = 'pd-ssd'

Type of the boot disk (default is “pd-ssd”). Valid values: “pd-ssd” (Persistent Disk Solid State Drive) or “pd-standard” (Persistent Disk Hard Disk Drive). boot_disk_type is set as a static value and cannot be changed as a pipeline parameter.

boot_disk_size_gb: int = 100

Size in GB of the boot disk (default is 100GB). boot_disk_size_gb is set as a static value and cannot be changed as a pipeline parameter.

timeout: str = '604800s'

The maximum job running time. The default is 7 days. A duration in seconds with up to nine fractional digits, terminated by ‘s’, for example: “3.5s”.

restart_job_on_worker_restart: bool = False

Restarts the entire CustomJob if a worker gets restarted. This feature can be used by distributed training jobs that are not resilient to workers leaving and joining a job.

service_account: str = ''

Sets the default service account for workload run-as account. The service account running the pipeline submitting jobs must have act-as permission on this run-as account. If unspecified, the Vertex AI Custom Code Service Agent for the CustomJob’s project.

network: str = ''

The full name of the Compute Engine network to which the job should be peered. For example, projects/12345/global/networks/myVPC. Format is of the form projects/{project}/global/networks/{network}. Where {project} is a project number, as in 12345, and {network} is a network name. Private services access must already be configured for the network. If left unspecified, the job is not peered with any network.

encryption_spec_key_name: str = ''

Customer-managed encryption key options for the CustomJob. If this is set, then all resources created by the CustomJob will be encrypted with the provided encryption key.

tensorboard: str = ''

The name of a Vertex AI TensorBoard resource to which this CustomJob will upload TensorBoard logs.

enable_web_access: bool = False

Whether you want Vertex AI to enable interactive shell access to training containers. If True, you can access interactive shells at the URIs given by [CustomJob.web_access_uris][].

reserved_ip_ranges: list[str] | None = None

A list of names for the reserved IP ranges under the VPC network that can be used for this job. If set, we will deploy the job within the provided IP ranges. Otherwise, the job will be deployed to any IP ranges under the provided VPC network.

nfs_mounts: list[dict[str, str]] | None = None

A list of NfsMount resource specs in Json dict format. For more details about mounting NFS for CustomJob, see Mount an NFS share for custom training.

base_output_directory: str = ''

The Cloud Storage location to store the output of this CustomJob or HyperparameterTuningJob. See more information.

labels: dict[str, str] | None = None

The labels with user-defined metadata to organize the CustomJob. See more information.

env: list[dict[str, str]] | None = None

Environment variables to be passed to the container. Takes the form [{'name': '...', 'value': '...'}]. Maximum limit is 100.

Returns: A KFP component with CustomJob specification applied.