Dataflow

Create Google Cloud Dataflow jobs from within Vertex AI Pipelines.

Components:

DataflowFlexTemplateJobOp(...[, location, ...])

Launch a job with a Dataflow Flex Template.

DataflowPythonJobOp(python_module_path, ...)

Launch a self-executing Beam Python file on Google Cloud using the Dataflow Runner.

v1.dataflow.DataflowFlexTemplateJobOp(container_spec_gcs_path: str, gcp_resources: dsl.OutputPath(str), location: str = 'us-central1', job_name: str = '', parameters: dict[str, str] = {}, launch_options: dict[str, str] = {}, num_workers: int = 0, max_workers: int = 0, service_account_email: str = '', temp_location: str = '', machine_type: str = '', additional_experiments: list[str] = [], network: str = '', subnetwork: str = '', additional_user_labels: dict[str, str] = {}, kms_key_name: str = '', ip_configuration: str = '', worker_region: str = '', worker_zone: str = '', enable_streaming_engine: bool = False, flexrs_goal: str = '', staging_location: str = '', sdk_container_image: str = '', disk_size_gb: int = 0, autoscaling_algorithm: str = '', dump_heap_on_oom: bool = False, save_heap_dumps_to_gcs_path: str = '', launcher_machine_type: str = '', enable_launcher_vm_serial_port_logging: bool = False, update: bool = False, transform_name_mappings: dict[str, str] = {}, validate_only: bool = False, project: str = '{{$.pipeline_google_cloud_project_id}}')

Launch a job with a Dataflow Flex Template.

Parameters:
location: str = 'us-central1'

The regional endpoint to which to direct the request. E.g., us-central1, us-west1. Defaults to us-central1 if not set.

job_name: str = ''

The job name to use for the created job. For update job requests, the job name should be the same as the existing running job. If none is specified, a default name will be generated by the component.

container_spec_gcs_path: str

Cloud Storage path to a file with json serialized ContainerSpec as content.

parameters: dict[str, str] = {}

The parameters for the flex template. Ex. {“my_template_param”:”5”}

launch_options: dict[str, str] = {}

Launch options for this flex template job. This is a common set of options across languages and templates. This should not be used to pass job parameters.

num_workers: int = 0

The initial number of Google Compute Engine instances for the job. If empty or unspecified, the Dataflow service determines an appropriate number of workers.

max_workers: int = 0

The maximum number of Google Compute Engine instances to be made available to your pipeline during execution, from 1 to 1000. If empty or unspecified, the Dataflow service determines a default maximum number of instances. For more details, see https://cloud.google.com/dataflow/docs/horizontal-autoscaling.

service_account_email: str = ''

The email address of the service account to run the job as. If unspecified, the Dataflow service uses the project’s Compute Engine default service account.

temp_location: str = ''

The Cloud Storage path to use for temporary files. Must be a valid Cloud Storage URL, beginning with gs://. For more details, see https://cloud.google.com/dataflow/docs/guides/setting-pipeline-options#setting_required_options.

machine_type: str = ''

The machine type to use for the Dataflow job. Defaults to the value from the template if not specified.

additional_experiments: list[str] = []

Additional experiment flags for the job.

network: str = ''

Network to which VMs will be assigned. If empty or unspecified, the service will use the network “default”.

subnetwork: str = ''

Subnetwork to which VMs will be assigned, if desired. You can specify a subnetwork using either a complete URL or an abbreviated path. Expected to be of the form “https://www.googleapis.com/compute/v1/projects/HOST_PROJECT_ID/regions/REGION/subnetworks/SUBNETWORK” or “regions/REGION/subnetworks/SUBNETWORK”. If the subnetwork is located in a Shared VPC network, you must use the complete URL.

additional_user_labels: dict[str, str] = {}

Additional user labels to be specified for the job. Keys and values must follow the restrictions specified in the labeling restrictions page (https://cloud.google.com/compute/docs/labeling-resources#restrictions). An object containing a list of “key”: value pairs. Example: { "name": "wrench", "mass": "1kg", "count": "3" }.

kms_key_name: str = ''

Name for the Cloud KMS key for the job. Key format is “projects/HOST_PROJECT_ID/locations/LOCATION/keyRings/KEYRING_ID/cryptoKeys/CRYPTO_KEY_ID”

ip_configuration: str = ''

Configuration for VM IPs.

worker_region: str = ''

The Compute Engine region (https://cloud.google.com/compute/docs/regions-zones/regions-zones) in which worker processing should occur, e.g. “us-west1”. Mutually exclusive with worker_zone. If neither worker_region nor worker_zone is specified, default to the control plane’s region.

worker_zone: str = ''

The Compute Engine zone (https://cloud.google.com/compute/docs/regions-zones/regions-zones) in which worker processing should occur, e.g. “us-west1-a”. Mutually exclusive with workerRegion. If neither worker_region nor worker_zone is specified, a zone in the control plane’s region is chosen based on available capacity.

enable_streaming_engine: bool = False

Whether to enable Streaming Engine for the job.

flexrs_goal: str = ''

Set FlexRS goal for the job. For more details, see https://cloud.google.com/dataflow/docs/guides/flexrs.

staging_location: str = ''

The Cloud Storage path for staging local files. Must be a valid Cloud Storage URL, beginning with gs://. For more details, see https://cloud.google.com/dataflow/docs/guides/setting-pipeline-options#setting_required_options.

sdk_container_image: str = ''

Docker registry location (e.g. Artifact Registry) of the container image to use for the worker harness. Default is the container for the version of the SDK. Note this field is only valid for portable Dataflow pipeline jobs.

disk_size_gb: int = 0

Worker disk size, in gigabytes. If empty or unspecified, the Dataflow service determines an appropriate disk size.

autoscaling_algorithm: str = ''

The algorithm to use for autoscaling. If empty or unspecified, the Dataflow service sets a default value. For more details, see https://cloud.google.com/dataflow/docs/reference/pipeline-options#resource_utilization.

dump_heap_on_oom: bool = False

If true, when processing time is spent almost entirely on garbage collection (GC), saves a heap dump before ending the thread or process. If false, ends the thread or process without saving a heap dump. Does not save a heap dump when the Java Virtual Machine (JVM) has an out of memory error during processing. The location of the heap file is either echoed back to the user, or the user is given the opportunity to download the heap file.

save_heap_dumps_to_gcs_path: str = ''

Cloud Storage bucket (directory) to upload heap dumps to. Enabling this field implies that dump_heap_on_oom is set to true.

launcher_machine_type: str = ''

The machine type to use for launching the Dataflow job. The default is n1-standard-1.

enable_launcher_vm_serial_port_logging: bool = False

If true serial port logging will be enabled for the launcher VM.

update: bool = False

Set this to true if you are sending a request to update a running streaming job. When set, the job name should be the same as the running job.

transform_name_mappings: dict[str, str] = {}

Use this to pass transformNameMappings for streaming update jobs. Example: {"oldTransformName":"newTransformName",...}. For more details, see https://cloud.google.com/dataflow/docs/guides/updating-a-pipeline#Mapping

validate_only: bool = False

If true, the request is validated but not actually executed. Defaults to false.

project: str = '{{$.pipeline_google_cloud_project_id}}'

The ID of the Cloud Platform project that the job belongs to. Defaults to the project in which the PipelineJob is run.

Returns

gcp_resources: Serialized gcp_resources proto tracking the Dataflow job. For more details, see https://github.com/kubeflow/pipelines/blob/master/components/google-cloud/google_cloud_pipeline_components/proto/README.md.

v1.dataflow.DataflowPythonJobOp(python_module_path: str, temp_location: str, gcp_resources: dsl.OutputPath(str), location: str = 'us-central1', requirements_file_path: str = '', args: list[str] = [], project: str = '{{$.pipeline_google_cloud_project_id}}')

Launch a self-executing Beam Python file on Google Cloud using the Dataflow Runner.

Parameters:
location: str = 'us-central1'

Location of the Dataflow job. If not set, defaults to 'us-central1'.

python_module_path: str

The GCS path to the Python file to run.

temp_location: str

A GCS path for Dataflow to stage temporary job files created during the execution of the pipeline.

requirements_file_path: str = ''

The GCS path to the pip requirements file.

args: list[str] = []

The list of args to pass to the Python file. Can include additional parameters for the Dataflow Runner.

project: str = '{{$.pipeline_google_cloud_project_id}}'

Project to create the Dataflow job. Defaults to the project in which the PipelineJob is run.

Returns:

gcp_resources: dsl.OutputPath(str)

Serialized gcp_resources proto tracking the Dataflow job. For more details, see https://github.com/kubeflow/pipelines/blob/master/components/google-cloud/google_cloud_pipeline_components/proto/README.md.