kfp.dsl package

kfp.dsl.RUN_ID_PLACEHOLDER
kfp.dsl.EXECUTION_ID_PLACEHOLDER
class kfp.dsl.Artifact(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]

Bases: object

Generic Artifact class.

This class is meant to represent the metadata around an input or output machine-learning Artifact. Artifacts have URIs, which can either be a location on disk (or Cloud storage) or some other resource identifier such as an API resource name.

Artifacts carry a metadata field, which is a dictionary for storing metadata related to this artifact.

TYPE_NAME = 'system.Artifact'
VERSION = '0.0.1'
property path
class kfp.dsl.ClassificationMetrics(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]

Bases: kfp.components.types.artifact_types.Artifact

Represents Artifact class to store Classification Metrics.

TYPE_NAME = 'system.ClassificationMetrics'
VERSION = '0.0.1'
log_confusion_matrix(categories: List[str], matrix: List[List[int]])[source]

Logs a confusion matrix.

Parameters
  • categories – List of the category names.

  • matrix – Complete confusion matrix.

Raises

ValueError – Length of categories does not match number of rows or columns.

log_confusion_matrix_cell(row_category: str, col_category: str, value: int)[source]

Logs a cell in the confusion matrix.

Parameters
  • row_category – String representing the name of the row category.

  • col_category – String representing the name of the column category.

  • value – Int value of the cell.

Raises

ValueError – If row_category or col_category is not in the list of categories set in set_categories.

log_confusion_matrix_row(row_category: str, row: List[float])[source]

Logs a confusion matrix row.

Parameters
  • row_category – Category to which the row belongs.

  • row

    List of integers specifying the values for the row.

    Raises:

    ValueError: If row_category is not in the list of categories set in set_categories call.

log_roc_curve(fpr: List[float], tpr: List[float], threshold: List[float])[source]

Logs an ROC curve.

The list length of fpr, tpr and threshold must be the same.

Parameters
  • fpr – List of false positive rate values.

  • tpr – List of true positive rate values.

  • threshold – List of threshold values.

log_roc_data_point(fpr: float, tpr: float, threshold: float)[source]

Logs a single data point in the ROC Curve.

Parameters
  • fpr – False positive rate value of the data point.

  • tpr – True positive rate value of the data point.

  • threshold – Threshold value for the data point.

property path
set_confusion_matrix_categories(categories: List[str])[source]

Stores confusion matrix categories.

Parameters

categories – List of strings specifying the categories.

class kfp.dsl.Condition(condition: kfp.components.pipeline_channel.ConditionOperator, name: Optional[str] = None)[source]

Bases: kfp.components.tasks_group.TasksGroup

Represents an condition group with a condition.

Example

with Condition(param1=='pizza', '[param1 is pizza]'):
    task1 = MyComponent1(...)
    task2 = MyComponent2(...)
condition

The condition expression.

remove_task_recursive(task: kfp.components.pipeline_task.PipelineTask)

Removes a task from the group recursively.

class kfp.dsl.Dataset(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]

Bases: kfp.components.types.artifact_types.Artifact

An artifact representing an ML Dataset.

TYPE_NAME = 'system.Dataset'
VERSION = '0.0.1'
property path
class kfp.dsl.ExitHandler(exit_task: kfp.components.pipeline_task.PipelineTask, name: Optional[str] = None)[source]

Bases: kfp.components.tasks_group.TasksGroup

Represents an exit handler that is invoked upon exiting a group of tasks.

Example

exit_task = ExitComponent(...)
with ExitHandler(exit_task):
    task1 = MyComponent1(...)
    task2 = MyComponent2(...)
exit_task

The exit handler task.

remove_task_recursive(task: kfp.components.pipeline_task.PipelineTask)

Removes a task from the group recursively.

class kfp.dsl.HTML(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]

Bases: kfp.components.types.artifact_types.Artifact

An artifact representing an HTML file.

TYPE_NAME = 'system.HTML'
VERSION = '0.0.1'
property path
class kfp.dsl.InputPath(type=None)[source]

Bases: object

Annotation for indicating a variable is a path to an input.

class kfp.dsl.Markdown(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]

Bases: kfp.components.types.artifact_types.Artifact

An artifact representing an Markdown file.

TYPE_NAME = 'system.Markdown'
VERSION = '0.0.1'
property path
class kfp.dsl.Metrics(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]

Bases: kfp.components.types.artifact_types.Artifact

Represent a simple base Artifact type to store key-value scalar metrics.

TYPE_NAME = 'system.Metrics'
VERSION = '0.0.1'
log_metric(metric: str, value: float)[source]

Sets a custom scalar metric.

Parameters
  • metric – Metric key

  • value – Value of the metric.

property path
class kfp.dsl.Model(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]

Bases: kfp.components.types.artifact_types.Artifact

An artifact representing an ML Model.

TYPE_NAME = 'system.Model'
VERSION = '0.0.1'
property framework: str
property path
class kfp.dsl.OutputPath(type=None)[source]

Bases: object

Annotation for indicating a variable is a path to an output.

class kfp.dsl.ParallelFor(items: Union[List[Union[int, float, str, Dict[str, Any]]], kfp.components.pipeline_channel.PipelineChannel], name: Optional[str] = None)[source]

Bases: kfp.components.tasks_group.TasksGroup

Represents a parallel for loop over a static set of items.

Example

with dsl.ParallelFor([{'a': 1, 'b': 10}, {'a': 2, 'b': 20}]) as item:
    task1 = MyComponent(..., item.a)
    task2 = MyComponent(..., item.b)

In this case task1 would be executed twice, once with case args=['echo 1'] and once with case args=['echo 2']:

.. attribute:: loop_argument

The argument for each loop iteration.

items_is_pipeline_channel

Whether the loop items is PipelineChannel instead of raw items.

remove_task_recursive(task: kfp.components.pipeline_task.PipelineTask)

Removes a task from the group recursively.

class kfp.dsl.PipelineArtifactChannel(name: str, channel_type: Union[str, Dict], task_name: str)[source]

Bases: kfp.components.pipeline_channel.PipelineChannel

Represents a pipeline artifact channel.

name

The name of the pipeline channel.

channel_type

The type of the pipeline channel.

task_name

The name of the task that produces the pipeline channel. A pipeline artifact channel is always produced by some task.

pattern

The serialized string regex pattern this pipeline channel created from.

property full_name: str

Unique name for the PipelineChannel.

property pattern: str

Unique pattern for the PipelineChannel.

class kfp.dsl.PipelineChannel(name: str, channel_type: Union[str, Dict], task_name: Optional[str] = None)[source]

Bases: abc.ABC

Represents a future value that is passed between pipeline components.

A PipelineChannel object can be used as a pipeline function argument so that it will be a pipeline artifact or parameter that shows up in ML Pipelines system UI. It can also represent an intermediate value passed between components.

name

The name of the pipeline channel.

channel_type

The type of the pipeline channel.

task_name

The name of the task that produces the pipeline channel. None means it is not produced by any task, so if None, either user constructs it directly (for providing an immediate value), or it is a pipeline function argument.

pattern

The serialized string regex pattern this pipeline channel created from.

property full_name: str

Unique name for the PipelineChannel.

property pattern: str

Unique pattern for the PipelineChannel.

class kfp.dsl.PipelineParameterChannel(name: str, channel_type: Union[str, Dict], task_name: Optional[str] = None, value: Optional[Union[str, int, float, bool, dict, list]] = None)[source]

Bases: kfp.components.pipeline_channel.PipelineChannel

Represents a pipeline parameter channel.

name

The name of the pipeline channel.

channel_type

The type of the pipeline channel.

task_name

The name of the task that produces the pipeline channel. None means it is not produced by any task, so if None, either user constructs it directly (for providing an immediate value), or it is a pipeline function argument.

pattern

The serialized string regex pattern this pipeline channel created from.

value

The actual value of the pipeline channel. If provided, the pipeline channel is “resolved” immediately.

property full_name: str

Unique name for the PipelineChannel.

property pattern: str

Unique pattern for the PipelineChannel.

class kfp.dsl.PipelineTask(component_spec: kfp.components.structures.ComponentSpec, args: Mapping[str, Any])[source]

Bases: object

Represents a pipeline task – an instantiated component.

Replaces ContainerOp. Holds operations available on a task object, such as .after(), .set_memory_limit(), enable_caching(), etc.

name

The name of the task. Unique within its parent group.

outputs
task_spec

The task spec of the task.

component_spec

The component spec of the task.

container_spec

The resolved container spec of the task. Only one of container_spec and importer_spec should be filled.

importer_spec

The resolved importer spec of the task. Only one of container_spec and importer_spec should be filled.

add_node_selector_constraint(accelerator: str) kfp.components.pipeline_task.PipelineTask[source]

Sets accelerator type requirement for this task.

Parameters

value (str) – The name of the accelerator. Available values include ‘NVIDIA_TESLA_K80’, ‘TPU_V3’.

Returns

Self return to allow chained setting calls.

after(*tasks) kfp.components.pipeline_task.PipelineTask[source]

Specify explicit dependency on other tasks.

Parameters

name (tasks) – dependent tasks.

Returns

Self return to allow chained setting calls.

property channel_inputs: List[kfp.components.pipeline_channel.PipelineChannel]

Returns the list of all PipelineChannels passed to the task.

property dependent_tasks: List[str]

Returns the list of dependent task names.

property inputs: List[Union[str, int, float, bool, dict, list, kfp.components.pipeline_channel.PipelineChannel]]

Returns the list of actual inputs passed to the task.

property name: str

Returns the name of the task.

property output: kfp.components.pipeline_channel.PipelineChannel

Returns the single output object (a PipelineChannel) of the task.

property outputs: Mapping[str, kfp.components.pipeline_channel.PipelineChannel]

Returns the dictionary of outputs (PipelineChannels) of the task.

register_task_handler()
set_caching_options(enable_caching: bool) kfp.components.pipeline_task.PipelineTask[source]

Sets caching options for the Pipeline task.

Parameters

enable_caching – Whether or not to enable caching for this task.

Returns

Self return to allow chained setting calls.

set_cpu_limit(cpu: str) kfp.components.pipeline_task.PipelineTask[source]

Set cpu limit (maximum) for this operator.

Parameters

cpu (str) – A string which can be a number or a number followed by “m”, whichmeans 1/1000.

Returns

Self return to allow chained setting calls.

set_display_name(name: str) kfp.components.pipeline_task.PipelineTask[source]

Set display name for the pipelineTask.

Parameters

name (str) – display name for the task.

Returns

Self return to allow chained setting calls.

set_env_variable(name: str, value: str) kfp.components.pipeline_task.PipelineTask[source]

Set environment variable for the pipelineTask.

Parameters
  • name – The name of the environment variable.

  • value – The value of the environment variable.

Returns

Self return to allow chained setting calls.

set_gpu_limit(gpu: str) kfp.components.pipeline_task.PipelineTask[source]

Set gpu limit (maximum) for this operator.

Parameters

gpu (str) – Positive number required for number of GPUs.

Returns

Self return to allow chained setting calls.

set_memory_limit(memory: str) kfp.components.pipeline_task.PipelineTask[source]

Set memory limit (maximum) for this operator.

Parameters

memory (str) – a string which can be a number or a number followed by one of “E”, “Ei”, “P”, “Pi”, “T”, “Ti”, “G”, “Gi”, “M”, “Mi”, “K”, “Ki”.

Returns

Self return to allow chained setting calls.

class kfp.dsl.PipelineTaskFinalStatus(state: str, pipeline_job_resource_name: str, pipeline_task_name: str, error_code: Optional[int], error_message: Optional[str])[source]

Bases: object

The final status of a pipeline task.

This is the Python representation of the proto: PipelineTaskFinalStatus https://github.com/kubeflow/pipelines/blob/1c3e2768e6177d5d6e3f4b8eff8fafb9a3b76c1f/api/v2alpha1/pipeline_spec.proto#L886

state

The final state of the task. The value could be one of ‘SUCCEEDED’, ‘FAILED’ or ‘CANCELLED’.

Type

str

pipeline_job_resource_name

The pipeline job resource name, in the format of projects/{project}/locations/{location}/pipelineJobs/{pipeline_job}.

Type

str

pipeline_task_name

The pipeline task that produces this status.

Type

str

error_code

In case of error, the oogle.rpc.Code https://github.com/googleapis/googleapis/blob/master/google/rpc/code.proto If state is ‘SUCCEEDED’, this is None.

Type

Optional[int]

error_message

In case of error, the detailed error message. If state is ‘SUCCEEDED’, this is None.

Type

Optional[str]

error_code: Optional[int]
error_message: Optional[str]
pipeline_job_resource_name: str
pipeline_task_name: str
state: str
class kfp.dsl.SlicedClassificationMetrics(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]

Bases: kfp.components.types.artifact_types.Artifact

Metrics class representing Sliced Classification Metrics.

Similar to ClassificationMetrics clients using this class are expected to use log methods of the class to log metrics with the difference being each log method takes a slice to associate the ClassificationMetrics.

TYPE_NAME = 'system.SlicedClassificationMetrics'
VERSION = '0.0.1'
load_confusion_matrix(slice: str, categories: List[str], matrix: List[List[int]])[source]

Supports bulk loading the whole confusion matrix for a slice.

Parameters
  • slice – String representing slice label.

  • categories – List of the category names.

  • matrix – Complete confusion matrix.

load_roc_readings(slice: str, readings: List[List[float]])[source]

Supports bulk loading ROC Curve readings for a slice.

Parameters
  • slice – String representing slice label.

  • readings

    A 2-D list providing ROC Curve data points. The expected order of the data points is: threshold,

    true_positive_rate, false_positive_rate.

log_confusion_matrix_cell(slice: str, row_category: str, col_category: str, value: int)[source]

Logs a confusion matrix cell for a slice..

Cell is updated on the internal metrics_utils.ConfusionMatrix instance of the slice.

Parameters
  • slice – String representing slice label.

  • row_category – String representing the name of the row category.

  • col_category – String representing the name of the column category.

  • value – Int value of the cell.

log_confusion_matrix_row(slice: str, row_category: str, row: List[int])[source]

Logs a confusion matrix row for a slice.

Row is updated on the internal metrics_utils.ConfusionMatrix instance of the slice.

Parameters
  • slice – String representing slice label.

  • row_category – Category to which the row belongs.

  • row – List of integers specifying the values for the row.

log_roc_reading(slice: str, threshold: float, tpr: float, fpr: float)[source]

Logs a single data point in the ROC Curve of a slice.

Parameters
  • slice – String representing slice label.

  • threshold – Thresold value for the data point.

  • tpr – True positive rate value of the data point.

  • fpr – False positive rate value of the data point.

property path
set_confusion_matrix_categories(slice: str, categories: List[str])[source]

Stores confusion matrix categories for a slice..

Categories are stored in the internal metrics_utils.ConfusionMatrix instance of the slice.

Parameters
  • slice – String representing slice label.

  • categories – List of strings specifying the categories.

kfp.dsl.component(func: Optional[Callable] = None, *, base_image: Optional[str] = None, target_image: Optional[str] = None, packages_to_install: Optional[List[str]] = None, pip_index_urls: Optional[List[str]] = None, output_component_file: Optional[str] = None, install_kfp_package: bool = True, kfp_package_path: Optional[str] = None)[source]

Decorator for Python-function based components in KFP v2.

A KFP v2 component can either be a lightweight component, or a containerized one.

If target_image is not specified, this function creates a lightweight component. A lightweight component is a self-contained Python function that includes all necessary imports and dependencies. In lightweight components, packages_to_install will be used to install dependencies at runtime. The parameters install_kfp_package and kfp_package_path can be used to control how KFP should be installed when the lightweight component is executed.

If target_image is specified, this function creates a component definition based around the target_image. The assumption is that the function in func will be packaged by KFP into this target_image. Use the KFP CLI’s build command to package func into target_image.

Example usage:

from kfp import dsl @dsl.component def my_function_one(input: str, output: Output[Model]):

@dsl.component(

base_image=’python:3.9’, output_component_file=’my_function.yaml’

) def my_function_two(input: Input[Mode])):

@dsl.pipeline(pipeline_root=’…’,

name=’my-pipeline’)

def pipeline():

my_function_one_task = my_function_one(input=…) my_function_two_task = my_function_two(input=my_function_one_task.outputs..

Parameters
  • func – The python function to create a component from. The function should have type annotations for all its arguments, indicating how it is intended to be used (e.g. as an input/output Artifact object, a plain parameter, or a path to a file).

  • base_image – The image to use when executing func. It should contain a default Python interpreter that is compatible with KFP.

  • packages_to_install – A list of optional packages to install before executing func. These will always be installed at component runtime.

  • pip_index_urls – Python Package Index base URLS from which to

  • only (install packages_to_install. Defaults to installing from) – “https://pypi.org/simple”. For more information, see: https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-0.

  • output_component_file – If specified, this function will write a shareable/loadable version of the component spec into this file.

  • install_kfp_package – Specifies if we should add a KFP Python package to packages_to_install. Lightweight Python functions always require an installation of KFP in base_image to work. If you specify a base_image that already contains KFP, you can set this to False. This flag is ignored when target_image is specified, which implies we’re building a containerized component. Containerized components will always install KFP as part of the build process.

  • kfp_package_path – Specifies the location from which to install KFP. By default, this will try to install from PyPi using the same version as that used when this component was created. KFP developers can choose to override this to point to a Github pull request or other pip-compatible location when testing changes to lightweight Python functions.

Returns

A component task factory that can be used in pipeline definitions.

kfp.dsl.importer(artifact_uri: Union[kfp.components.pipeline_channel.PipelineParameterChannel, str], artifact_class: Type[kfp.components.types.artifact_types.Artifact], reimport: bool = False, metadata: Optional[Mapping[str, Any]] = None) kfp.components.pipeline_task.PipelineTask[source]

dsl.importer for importing an existing artifact. Only for v2 pipeline.

Parameters
  • artifact_uri – The artifact uri to import from.

  • artifact_type_schema – The user specified artifact type schema of the artifact to be imported.

  • reimport – Whether to reimport the artifact. Defaults to False.

  • metadata – Properties of the artifact.

Returns

A PipelineTask instance.

Raises

ValueError if the passed in artifact_uri is neither a PipelineParam nor a – constant string value.

kfp.dsl.pipeline(name: Optional[str] = None, description: Optional[str] = None, pipeline_root: Optional[str] = None)[source]

Decorator of pipeline functions.

Example
@pipeline(
  name='my-pipeline',
  description='My ML Pipeline.'
  pipeline_root='gs://my-bucket/my-output-path'
)
def my_pipeline(a: str, b: int):
  ...
Parameters
  • name – The pipeline name. Default to a sanitized version of the function name.

  • description – Optionally, a human-readable description of the pipeline.

  • pipeline_root – The root directory to generate input/output URI under this pipeline. This is required if input/output URI placeholder is used in this pipeline.