kfp.dsl package
- kfp.dsl.RUN_ID_PLACEHOLDER
- kfp.dsl.EXECUTION_ID_PLACEHOLDER
- class kfp.dsl.Artifact(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]
Bases:
object
Generic Artifact class.
This class is meant to represent the metadata around an input or output machine-learning Artifact. Artifacts have URIs, which can either be a location on disk (or Cloud storage) or some other resource identifier such as an API resource name.
Artifacts carry a metadata field, which is a dictionary for storing metadata related to this artifact.
- TYPE_NAME = 'system.Artifact'
- VERSION = '0.0.1'
- property path
- class kfp.dsl.ClassificationMetrics(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]
Bases:
kfp.components.types.artifact_types.Artifact
Represents Artifact class to store Classification Metrics.
- TYPE_NAME = 'system.ClassificationMetrics'
- VERSION = '0.0.1'
- log_confusion_matrix(categories: List[str], matrix: List[List[int]])[source]
Logs a confusion matrix.
- Parameters
categories – List of the category names.
matrix – Complete confusion matrix.
- Raises
ValueError – Length of categories does not match number of rows or columns.
- log_confusion_matrix_cell(row_category: str, col_category: str, value: int)[source]
Logs a cell in the confusion matrix.
- Parameters
row_category – String representing the name of the row category.
col_category – String representing the name of the column category.
value – Int value of the cell.
- Raises
ValueError – If row_category or col_category is not in the list of categories set in set_categories.
- log_confusion_matrix_row(row_category: str, row: List[float])[source]
Logs a confusion matrix row.
- Parameters
row_category – Category to which the row belongs.
row –
List of integers specifying the values for the row.
- Raises:
ValueError: If row_category is not in the list of categories set in set_categories call.
- log_roc_curve(fpr: List[float], tpr: List[float], threshold: List[float])[source]
Logs an ROC curve.
The list length of fpr, tpr and threshold must be the same.
- Parameters
fpr – List of false positive rate values.
tpr – List of true positive rate values.
threshold – List of threshold values.
- log_roc_data_point(fpr: float, tpr: float, threshold: float)[source]
Logs a single data point in the ROC Curve.
- Parameters
fpr – False positive rate value of the data point.
tpr – True positive rate value of the data point.
threshold – Threshold value for the data point.
- property path
- class kfp.dsl.Condition(condition: kfp.components.pipeline_channel.ConditionOperator, name: Optional[str] = None)[source]
Bases:
kfp.components.tasks_group.TasksGroup
Represents an condition group with a condition.
Example
with Condition(param1=='pizza', '[param1 is pizza]'): task1 = MyComponent1(...) task2 = MyComponent2(...)
- condition
The condition expression.
- remove_task_recursive(task: kfp.components.pipeline_task.PipelineTask)
Removes a task from the group recursively.
- class kfp.dsl.Dataset(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]
Bases:
kfp.components.types.artifact_types.Artifact
An artifact representing an ML Dataset.
- TYPE_NAME = 'system.Dataset'
- VERSION = '0.0.1'
- property path
- class kfp.dsl.ExitHandler(exit_task: kfp.components.pipeline_task.PipelineTask, name: Optional[str] = None)[source]
Bases:
kfp.components.tasks_group.TasksGroup
Represents an exit handler that is invoked upon exiting a group of tasks.
Example
exit_task = ExitComponent(...) with ExitHandler(exit_task): task1 = MyComponent1(...) task2 = MyComponent2(...)
- exit_task
The exit handler task.
- remove_task_recursive(task: kfp.components.pipeline_task.PipelineTask)
Removes a task from the group recursively.
- class kfp.dsl.HTML(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]
Bases:
kfp.components.types.artifact_types.Artifact
An artifact representing an HTML file.
- TYPE_NAME = 'system.HTML'
- VERSION = '0.0.1'
- property path
- class kfp.dsl.InputPath(type=None)[source]
Bases:
object
Annotation for indicating a variable is a path to an input.
- class kfp.dsl.Markdown(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]
Bases:
kfp.components.types.artifact_types.Artifact
An artifact representing an Markdown file.
- TYPE_NAME = 'system.Markdown'
- VERSION = '0.0.1'
- property path
- class kfp.dsl.Metrics(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]
Bases:
kfp.components.types.artifact_types.Artifact
Represent a simple base Artifact type to store key-value scalar metrics.
- TYPE_NAME = 'system.Metrics'
- VERSION = '0.0.1'
- log_metric(metric: str, value: float)[source]
Sets a custom scalar metric.
- Parameters
metric – Metric key
value – Value of the metric.
- property path
- class kfp.dsl.Model(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]
Bases:
kfp.components.types.artifact_types.Artifact
An artifact representing an ML Model.
- TYPE_NAME = 'system.Model'
- VERSION = '0.0.1'
- property framework: str
- property path
- class kfp.dsl.OutputPath(type=None)[source]
Bases:
object
Annotation for indicating a variable is a path to an output.
- class kfp.dsl.ParallelFor(items: Union[List[Union[int, float, str, Dict[str, Any]]], kfp.components.pipeline_channel.PipelineChannel], name: Optional[str] = None)[source]
Bases:
kfp.components.tasks_group.TasksGroup
Represents a parallel for loop over a static set of items.
Example
with dsl.ParallelFor([{'a': 1, 'b': 10}, {'a': 2, 'b': 20}]) as item: task1 = MyComponent(..., item.a) task2 = MyComponent(..., item.b)
In this case
task1
would be executed twice, once with caseargs=['echo 1']
and once with caseargs=['echo 2']
:.. attribute:: loop_argument
The argument for each loop iteration.
- items_is_pipeline_channel
Whether the loop items is PipelineChannel instead of raw items.
- remove_task_recursive(task: kfp.components.pipeline_task.PipelineTask)
Removes a task from the group recursively.
- class kfp.dsl.PipelineArtifactChannel(name: str, channel_type: Union[str, Dict], task_name: str)[source]
Bases:
kfp.components.pipeline_channel.PipelineChannel
Represents a pipeline artifact channel.
- name
The name of the pipeline channel.
- channel_type
The type of the pipeline channel.
- task_name
The name of the task that produces the pipeline channel. A pipeline artifact channel is always produced by some task.
- pattern
The serialized string regex pattern this pipeline channel created from.
- property full_name: str
Unique name for the PipelineChannel.
- property pattern: str
Unique pattern for the PipelineChannel.
- class kfp.dsl.PipelineChannel(name: str, channel_type: Union[str, Dict], task_name: Optional[str] = None)[source]
Bases:
abc.ABC
Represents a future value that is passed between pipeline components.
A PipelineChannel object can be used as a pipeline function argument so that it will be a pipeline artifact or parameter that shows up in ML Pipelines system UI. It can also represent an intermediate value passed between components.
- name
The name of the pipeline channel.
- channel_type
The type of the pipeline channel.
- task_name
The name of the task that produces the pipeline channel. None means it is not produced by any task, so if None, either user constructs it directly (for providing an immediate value), or it is a pipeline function argument.
- pattern
The serialized string regex pattern this pipeline channel created from.
- property full_name: str
Unique name for the PipelineChannel.
- property pattern: str
Unique pattern for the PipelineChannel.
- class kfp.dsl.PipelineParameterChannel(name: str, channel_type: Union[str, Dict], task_name: Optional[str] = None, value: Optional[Union[str, int, float, bool, dict, list]] = None)[source]
Bases:
kfp.components.pipeline_channel.PipelineChannel
Represents a pipeline parameter channel.
- name
The name of the pipeline channel.
- channel_type
The type of the pipeline channel.
- task_name
The name of the task that produces the pipeline channel. None means it is not produced by any task, so if None, either user constructs it directly (for providing an immediate value), or it is a pipeline function argument.
- pattern
The serialized string regex pattern this pipeline channel created from.
- value
The actual value of the pipeline channel. If provided, the pipeline channel is “resolved” immediately.
- property full_name: str
Unique name for the PipelineChannel.
- property pattern: str
Unique pattern for the PipelineChannel.
- class kfp.dsl.PipelineTask(component_spec: kfp.components.structures.ComponentSpec, args: Mapping[str, Any])[source]
Bases:
object
Represents a pipeline task – an instantiated component.
Replaces ContainerOp. Holds operations available on a task object, such as .after(), .set_memory_limit(), enable_caching(), etc.
- name
The name of the task. Unique within its parent group.
- outputs
- task_spec
The task spec of the task.
- component_spec
The component spec of the task.
- container_spec
The resolved container spec of the task. Only one of container_spec and importer_spec should be filled.
- importer_spec
The resolved importer spec of the task. Only one of container_spec and importer_spec should be filled.
- add_node_selector_constraint(accelerator: str) kfp.components.pipeline_task.PipelineTask [source]
Sets accelerator type requirement for this task.
- Parameters
value (str) – The name of the accelerator. Available values include ‘NVIDIA_TESLA_K80’, ‘TPU_V3’.
- Returns
Self return to allow chained setting calls.
- after(*tasks) kfp.components.pipeline_task.PipelineTask [source]
Specify explicit dependency on other tasks.
- Parameters
name (tasks) – dependent tasks.
- Returns
Self return to allow chained setting calls.
- property channel_inputs: List[kfp.components.pipeline_channel.PipelineChannel]
Returns the list of all PipelineChannels passed to the task.
- property dependent_tasks: List[str]
Returns the list of dependent task names.
- property inputs: List[Union[str, int, float, bool, dict, list, kfp.components.pipeline_channel.PipelineChannel]]
Returns the list of actual inputs passed to the task.
- property name: str
Returns the name of the task.
- property output: kfp.components.pipeline_channel.PipelineChannel
Returns the single output object (a PipelineChannel) of the task.
- property outputs: Mapping[str, kfp.components.pipeline_channel.PipelineChannel]
Returns the dictionary of outputs (PipelineChannels) of the task.
- register_task_handler()
- set_caching_options(enable_caching: bool) kfp.components.pipeline_task.PipelineTask [source]
Sets caching options for the Pipeline task.
- Parameters
enable_caching – Whether or not to enable caching for this task.
- Returns
Self return to allow chained setting calls.
- set_cpu_limit(cpu: str) kfp.components.pipeline_task.PipelineTask [source]
Set cpu limit (maximum) for this operator.
- Parameters
cpu (str) – A string which can be a number or a number followed by “m”, whichmeans 1/1000.
- Returns
Self return to allow chained setting calls.
- set_display_name(name: str) kfp.components.pipeline_task.PipelineTask [source]
Set display name for the pipelineTask.
- Parameters
name (str) – display name for the task.
- Returns
Self return to allow chained setting calls.
- set_env_variable(name: str, value: str) kfp.components.pipeline_task.PipelineTask [source]
Set environment variable for the pipelineTask.
- Parameters
name – The name of the environment variable.
value – The value of the environment variable.
- Returns
Self return to allow chained setting calls.
- set_gpu_limit(gpu: str) kfp.components.pipeline_task.PipelineTask [source]
Set gpu limit (maximum) for this operator.
- Parameters
gpu (str) – Positive number required for number of GPUs.
- Returns
Self return to allow chained setting calls.
- set_memory_limit(memory: str) kfp.components.pipeline_task.PipelineTask [source]
Set memory limit (maximum) for this operator.
- Parameters
memory (str) – a string which can be a number or a number followed by one of “E”, “Ei”, “P”, “Pi”, “T”, “Ti”, “G”, “Gi”, “M”, “Mi”, “K”, “Ki”.
- Returns
Self return to allow chained setting calls.
- class kfp.dsl.PipelineTaskFinalStatus(state: str, pipeline_job_resource_name: str, pipeline_task_name: str, error_code: Optional[int], error_message: Optional[str])[source]
Bases:
object
The final status of a pipeline task.
This is the Python representation of the proto: PipelineTaskFinalStatus https://github.com/kubeflow/pipelines/blob/1c3e2768e6177d5d6e3f4b8eff8fafb9a3b76c1f/api/v2alpha1/pipeline_spec.proto#L886
- state
The final state of the task. The value could be one of ‘SUCCEEDED’, ‘FAILED’ or ‘CANCELLED’.
- Type
str
- pipeline_job_resource_name
The pipeline job resource name, in the format of projects/{project}/locations/{location}/pipelineJobs/{pipeline_job}.
- Type
str
- pipeline_task_name
The pipeline task that produces this status.
- Type
str
- error_code
In case of error, the oogle.rpc.Code https://github.com/googleapis/googleapis/blob/master/google/rpc/code.proto If state is ‘SUCCEEDED’, this is None.
- Type
Optional[int]
- error_message
In case of error, the detailed error message. If state is ‘SUCCEEDED’, this is None.
- Type
Optional[str]
- error_code: Optional[int]
- error_message: Optional[str]
- pipeline_job_resource_name: str
- pipeline_task_name: str
- state: str
- class kfp.dsl.SlicedClassificationMetrics(name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None)[source]
Bases:
kfp.components.types.artifact_types.Artifact
Metrics class representing Sliced Classification Metrics.
Similar to ClassificationMetrics clients using this class are expected to use log methods of the class to log metrics with the difference being each log method takes a slice to associate the ClassificationMetrics.
- TYPE_NAME = 'system.SlicedClassificationMetrics'
- VERSION = '0.0.1'
- load_confusion_matrix(slice: str, categories: List[str], matrix: List[List[int]])[source]
Supports bulk loading the whole confusion matrix for a slice.
- Parameters
slice – String representing slice label.
categories – List of the category names.
matrix – Complete confusion matrix.
- load_roc_readings(slice: str, readings: List[List[float]])[source]
Supports bulk loading ROC Curve readings for a slice.
- Parameters
slice – String representing slice label.
readings –
A 2-D list providing ROC Curve data points. The expected order of the data points is: threshold,
true_positive_rate, false_positive_rate.
- log_confusion_matrix_cell(slice: str, row_category: str, col_category: str, value: int)[source]
Logs a confusion matrix cell for a slice..
Cell is updated on the internal metrics_utils.ConfusionMatrix instance of the slice.
- Parameters
slice – String representing slice label.
row_category – String representing the name of the row category.
col_category – String representing the name of the column category.
value – Int value of the cell.
- log_confusion_matrix_row(slice: str, row_category: str, row: List[int])[source]
Logs a confusion matrix row for a slice.
Row is updated on the internal metrics_utils.ConfusionMatrix instance of the slice.
- Parameters
slice – String representing slice label.
row_category – Category to which the row belongs.
row – List of integers specifying the values for the row.
- log_roc_reading(slice: str, threshold: float, tpr: float, fpr: float)[source]
Logs a single data point in the ROC Curve of a slice.
- Parameters
slice – String representing slice label.
threshold – Thresold value for the data point.
tpr – True positive rate value of the data point.
fpr – False positive rate value of the data point.
- property path
- set_confusion_matrix_categories(slice: str, categories: List[str])[source]
Stores confusion matrix categories for a slice..
Categories are stored in the internal metrics_utils.ConfusionMatrix instance of the slice.
- Parameters
slice – String representing slice label.
categories – List of strings specifying the categories.
- kfp.dsl.component(func: Optional[Callable] = None, *, base_image: Optional[str] = None, target_image: Optional[str] = None, packages_to_install: Optional[List[str]] = None, pip_index_urls: Optional[List[str]] = None, output_component_file: Optional[str] = None, install_kfp_package: bool = True, kfp_package_path: Optional[str] = None)[source]
Decorator for Python-function based components in KFP v2.
A KFP v2 component can either be a lightweight component, or a containerized one.
If target_image is not specified, this function creates a lightweight component. A lightweight component is a self-contained Python function that includes all necessary imports and dependencies. In lightweight components, packages_to_install will be used to install dependencies at runtime. The parameters install_kfp_package and kfp_package_path can be used to control how KFP should be installed when the lightweight component is executed.
If target_image is specified, this function creates a component definition based around the target_image. The assumption is that the function in func will be packaged by KFP into this target_image. Use the KFP CLI’s build command to package func into target_image.
Example usage:
from kfp import dsl @dsl.component def my_function_one(input: str, output: Output[Model]):
…
- @dsl.component(
base_image=’python:3.9’, output_component_file=’my_function.yaml’
) def my_function_two(input: Input[Mode])):
…
- @dsl.pipeline(pipeline_root=’…’,
name=’my-pipeline’)
- def pipeline():
my_function_one_task = my_function_one(input=…) my_function_two_task = my_function_two(input=my_function_one_task.outputs..
- Parameters
func – The python function to create a component from. The function should have type annotations for all its arguments, indicating how it is intended to be used (e.g. as an input/output Artifact object, a plain parameter, or a path to a file).
base_image – The image to use when executing func. It should contain a default Python interpreter that is compatible with KFP.
packages_to_install – A list of optional packages to install before executing func. These will always be installed at component runtime.
pip_index_urls – Python Package Index base URLS from which to
only (install packages_to_install. Defaults to installing from) – “https://pypi.org/simple”. For more information, see: https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-0.
output_component_file – If specified, this function will write a shareable/loadable version of the component spec into this file.
install_kfp_package – Specifies if we should add a KFP Python package to packages_to_install. Lightweight Python functions always require an installation of KFP in base_image to work. If you specify a base_image that already contains KFP, you can set this to False. This flag is ignored when target_image is specified, which implies we’re building a containerized component. Containerized components will always install KFP as part of the build process.
kfp_package_path – Specifies the location from which to install KFP. By default, this will try to install from PyPi using the same version as that used when this component was created. KFP developers can choose to override this to point to a Github pull request or other pip-compatible location when testing changes to lightweight Python functions.
- Returns
A component task factory that can be used in pipeline definitions.
- kfp.dsl.importer(artifact_uri: Union[kfp.components.pipeline_channel.PipelineParameterChannel, str], artifact_class: Type[kfp.components.types.artifact_types.Artifact], reimport: bool = False, metadata: Optional[Mapping[str, Any]] = None) kfp.components.pipeline_task.PipelineTask [source]
dsl.importer for importing an existing artifact. Only for v2 pipeline.
- Parameters
artifact_uri – The artifact uri to import from.
artifact_type_schema – The user specified artifact type schema of the artifact to be imported.
reimport – Whether to reimport the artifact. Defaults to False.
metadata – Properties of the artifact.
- Returns
A PipelineTask instance.
- Raises
ValueError if the passed in artifact_uri is neither a PipelineParam nor a – constant string value.
- kfp.dsl.pipeline(name: Optional[str] = None, description: Optional[str] = None, pipeline_root: Optional[str] = None)[source]
Decorator of pipeline functions.
- Example
@pipeline( name='my-pipeline', description='My ML Pipeline.' pipeline_root='gs://my-bucket/my-output-path' ) def my_pipeline(a: str, b: int): ...
- Parameters
name – The pipeline name. Default to a sanitized version of the function name.
description – Optionally, a human-readable description of the pipeline.
pipeline_root – The root directory to generate input/output URI under this pipeline. This is required if input/output URI placeholder is used in this pipeline.