# Copyright 2021 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pipeline task class and operations."""
import re
import copy
from typing import Any, List, Mapping, Optional, Union
from kfp.components import constants
from kfp.components import placeholders
from kfp.components import pipeline_channel
from kfp.components import structures
from kfp.components.types import type_utils
def create_pipeline_task(
component_spec: structures.ComponentSpec,
args: Mapping[str, Any],
) -> 'PipelineTask': # pytype: disable=name-error
return PipelineTask(component_spec=component_spec, args=args)
[docs]class PipelineTask:
"""Represents a pipeline task -- an instantiated component.
Replaces `ContainerOp`. Holds operations available on a task object, such as
`.after()`, `.set_memory_limit()`, `enable_caching()`, etc.
Attributes:
name: The name of the task. Unique within its parent group.
outputs:
task_spec: The task spec of the task.
component_spec: The component spec of the task.
container_spec: The resolved container spec of the task. Only one of
container_spec and importer_spec should be filled.
importer_spec: The resolved importer spec of the task. Only one of
container_spec and importer_spec should be filled.
"""
# To be override by pipeline `register_task_and_generate_id`
register_task_handler = lambda task: task.component_spec.name
def __init__(
self,
component_spec: structures.ComponentSpec,
args: Mapping[str, Any],
):
"""Initilizes a PipelineTask instance.
Args:
component_spec: The component definition.
args: The dictionary of component arguments.
"""
args = args or {}
for input_name, argument_value in args.items():
if input_name not in component_spec.inputs:
raise ValueError(
f'Component "{component_spec.name}" got an unexpected input:'
f' {input_name}.')
input_type = component_spec.inputs[input_name].type
argument_type = None
if isinstance(argument_value, pipeline_channel.PipelineChannel):
argument_type = argument_value.channel_type
elif isinstance(argument_value, str):
argument_type = 'String'
elif isinstance(argument_value, bool):
argument_type = 'Boolean'
elif isinstance(argument_value, int):
argument_type = 'Integer'
elif isinstance(argument_value, float):
argument_type = 'Float'
elif isinstance(argument_value, dict):
argument_type = 'Dict'
elif isinstance(argument_value, list):
argument_type = 'List'
else:
raise ValueError(
'Input argument supports only the following types: '
'str, int, float, bool, dict, and list. Got: '
f'"{argument_value}" of type "{type(argument_value)}".')
type_utils.verify_type_compatibility(
given_type=argument_type,
expected_type=input_type,
error_message_prefix=(
'Incompatible argument passed to the input '
f'"{input_name}" of component "{component_spec.name}": '),
)
self.component_spec = component_spec
self.task_spec = structures.TaskSpec(
name=self.register_task_handler(),
inputs={input_name: value for input_name, value in args.items()},
dependent_tasks=[],
component_ref=component_spec.name,
enable_caching=True,
)
self.importer_spec = None
self.container_spec = None
if component_spec.implementation.container is not None:
self.container_spec = self._resolve_command_line_and_arguments(
component_spec=component_spec,
args=args,
)
elif component_spec.implementation.importer is not None:
self.importer_spec = component_spec.implementation.importer
self.importer_spec.artifact_uri = args['uri']
self._outputs = {
output_name: pipeline_channel.create_pipeline_channel(
name=output_name,
channel_type=output_spec.type,
task_name=self.task_spec.name,
) for output_name, output_spec in (
component_spec.outputs or {}).items()
}
self._inputs = args
self._channel_inputs = [
value for _, value in args.items()
if isinstance(value, pipeline_channel.PipelineChannel)
] + pipeline_channel.extract_pipeline_channels_from_any([
value for _, value in args.items()
if not isinstance(value, pipeline_channel.PipelineChannel)
])
@property
def name(self) -> str:
"""Returns the name of the task."""
return self.task_spec.name
@property
def inputs(
self
) -> List[Union[type_utils.PARAMETER_TYPES,
pipeline_channel.PipelineChannel]]:
"""Returns the list of actual inputs passed to the task."""
return self._inputs
@property
def channel_inputs(self) -> List[pipeline_channel.PipelineChannel]:
"""Returns the list of all PipelineChannels passed to the task."""
return self._channel_inputs
@property
def output(self) -> pipeline_channel.PipelineChannel:
"""Returns the single output object (a PipelineChannel) of the task."""
if len(self._outputs) != 1:
raise AttributeError
return list(self._outputs.values())[0]
@property
def outputs(self) -> Mapping[str, pipeline_channel.PipelineChannel]:
"""Returns the dictionary of outputs (PipelineChannels) of the task."""
return self._outputs
@property
def dependent_tasks(self) -> List[str]:
"""Returns the list of dependent task names."""
return self.task_spec.dependent_tasks
def _resolve_command_line_and_arguments(
self,
component_spec: structures.ComponentSpec,
args: Mapping[str, str],
) -> structures.ContainerSpec:
"""Resolves the command line argument placeholders in a container spec.
Args:
component_spec: The component definition.
args: The dictionary of component arguments.
"""
argument_values = args
component_inputs = component_spec.inputs or {}
inputs_dict = {
input_name: input_spec
for input_name, input_spec in component_inputs.items()
}
component_outputs = component_spec.outputs or {}
outputs_dict = {
output_name: output_spec
for output_name, output_spec in component_outputs.items()
}
def expand_command_part(arg) -> Union[str, List[str], None]:
if arg is None:
return None
if isinstance(arg, (str, int, float, bool)):
return str(arg)
elif isinstance(arg, (dict, list)):
return json.dumps(arg)
elif isinstance(arg, structures.InputValuePlaceholder):
input_name = arg.input_name
if not type_utils.is_parameter_type(
inputs_dict[input_name].type):
raise TypeError(
f'Input "{input_name}" with type '
f'"{inputs_dict[input_name].type}" cannot be paired with '
'InputValuePlaceholder.')
if input_name in args or type_utils.is_task_final_status_type(
inputs_dict[input_name].type):
return placeholders.input_parameter_placeholder(input_name)
else:
input_spec = inputs_dict[input_name]
if input_spec.default is not None:
return None
else:
raise ValueError(
f'No value provided for input: {input_name}.')
elif isinstance(arg, structures.InputUriPlaceholder):
input_name = arg.input_name
if type_utils.is_parameter_type(inputs_dict[input_name].type):
raise TypeError(
f'Input "{input_name}" with type '
f'"{inputs_dict[input_name].type}" cannot be paired with '
'InputUriPlaceholder.')
if input_name in args:
input_uri = placeholders.input_artifact_uri_placeholder(
input_name)
return input_uri
else:
input_spec = inputs_dict[input_name]
if input_spec.default is not None:
return None
else:
raise ValueError(
f'No value provided for input: {input_name}.')
elif isinstance(arg, structures.InputPathPlaceholder):
input_name = arg.input_name
if type_utils.is_parameter_type(inputs_dict[input_name].type):
raise TypeError(
f'Input "{input_name}" with type '
f'"{inputs_dict[input_name].type}" cannot be paired with '
'InputPathPlaceholder.')
if input_name in args:
input_path = placeholders.input_artifact_path_placeholder(
input_name)
return input_path
else:
input_spec = inputs_dict[input_name]
if input_spec.optional:
return None
else:
raise ValueError(
f'No value provided for input: {input_name}.')
elif isinstance(arg, structures.OutputUriPlaceholder):
output_name = arg.output_name
if type_utils.is_parameter_type(outputs_dict[output_name].type):
raise TypeError(
f'Onput "{output_name}" with type '
f'"{outputs_dict[output_name].type}" cannot be paired with '
'OutputUriPlaceholder.')
output_uri = placeholders.output_artifact_uri_placeholder(
output_name)
return output_uri
elif isinstance(arg, structures.OutputPathPlaceholder):
output_name = arg.output_name
if type_utils.is_parameter_type(outputs_dict[output_name].type):
output_path = placeholders.output_parameter_path_placeholder(
output_name)
else:
output_path = placeholders.output_artifact_path_placeholder(
output_name)
return output_path
elif isinstance(arg, structures.ConcatPlaceholder):
expanded_argument_strings = expand_argument_list(arg.items)
return ''.join(expanded_argument_strings)
elif isinstance(arg, structures.IfPresentPlaceholder):
if arg.if_structure.input_name in argument_values:
result_node = arg.if_structure.then
else:
result_node = arg.if_structure.otherwise
if result_node is None:
return []
if isinstance(result_node, list):
expanded_result = expand_argument_list(result_node)
else:
expanded_result = expand_command_part(result_node)
return expanded_result
else:
raise TypeError('Unrecognized argument type: {}'.format(arg))
def expand_argument_list(argument_list) -> Optional[List[str]]:
if argument_list is None:
return None
expanded_list = []
for part in argument_list:
expanded_part = expand_command_part(part)
if expanded_part is not None:
if isinstance(expanded_part, list):
expanded_list.extend(expanded_part)
else:
expanded_list.append(str(expanded_part))
return expanded_list
container_spec = component_spec.implementation.container
resolved_container_spec = copy.deepcopy(container_spec)
resolved_container_spec.command = expand_argument_list(
container_spec.command)
resolved_container_spec.args = expand_argument_list(container_spec.args)
return resolved_container_spec
[docs] def set_caching_options(self, enable_caching: bool) -> 'PipelineTask':
"""Sets caching options for the Pipeline task.
Args:
enable_caching: Whether or not to enable caching for this task.
Returns:
Self return to allow chained setting calls.
"""
self.task_spec.enable_caching = enable_caching
return self
[docs] def set_cpu_limit(self, cpu: str) -> 'PipelineTask':
"""Set cpu limit (maximum) for this operator.
Args:
cpu(str): A string which can be a
number or a number followed by "m", whichmeans 1/1000.
Returns:
Self return to allow chained setting calls.
"""
if re.match(r'([0-9]*[.])?[0-9]+m?$', cpu) is None:
raise ValueError(
'Invalid cpu string. Should be float or integer, or integer'
' followed by "m".')
if cpu.endswith('m'):
cpu = float(cpu[:-1]) / 1000
else:
cpu = float(cpu)
if self.container_spec is None:
raise ValueError(
'There is no container specified in implementation')
if self.container_spec.resources is not None:
self.container_spec.resources.cpu_limit = cpu
else:
self.container_spec.resources = structures.ResourceSpec(
cpu_limit=cpu)
return self
[docs] def set_gpu_limit(self, gpu: str) -> 'PipelineTask':
"""Set gpu limit (maximum) for this operator.
Args:
gpu(str): Positive number required for number of GPUs.
Returns:
Self return to allow chained setting calls.
"""
if re.match(r'[1-9]\d*$', gpu) is None:
raise ValueError('GPU must be positive integer.')
gpu = int(gpu)
if self.container_spec is None:
raise ValueError(
'There is no container specified in implementation')
if self.container_spec.resources is not None:
self.container_spec.resources.accelerator_count = gpu
else:
self.container_spec.resources = structures.ResourceSpec(
accelerator_count=gpu)
return self
[docs] def set_memory_limit(self, memory: str) -> 'PipelineTask':
"""Set memory limit (maximum) for this operator.
Args:
memory(str): a string which can be a number or a number followed by
one of "E", "Ei", "P", "Pi", "T", "Ti", "G", "Gi", "M", "Mi",
"K", "Ki".
Returns:
Self return to allow chained setting calls.
"""
if re.match(r'^[0-9]+(E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki){0,1}$',
memory) is None:
raise ValueError(
'Invalid memory string. Should be a number or a number '
'followed by one of "E", "Ei", "P", "Pi", "T", "Ti", "G", '
'"Gi", "M", "Mi", "K", "Ki".')
if memory.endswith('E'):
memory = float(memory[:-1]) * constants._E / constants._G
elif memory.endswith('Ei'):
memory = float(memory[:-2]) * constants._EI / constants._G
elif memory.endswith('P'):
memory = float(memory[:-1]) * constants._P / constants._G
elif memory.endswith('Pi'):
memory = float(memory[:-2]) * constants._PI / constants._G
elif memory.endswith('T'):
memory = float(memory[:-1]) * constants._T / constants._G
elif memory.endswith('Ti'):
memory = float(memory[:-2]) * constants._TI / constants._G
elif memory.endswith('G'):
memory = float(memory[:-1])
elif memory.endswith('Gi'):
memory = float(memory[:-2]) * constants._GI / constants._G
elif memory.endswith('M'):
memory = float(memory[:-1]) * constants._M / constants._G
elif memory.endswith('Mi'):
memory = float(memory[:-2]) * constants._MI / constants._G
elif memory.endswith('K'):
memory = float(memory[:-1]) * constants._K / constants._G
elif memory.endswith('Ki'):
memory = float(memory[:-2]) * constants._KI / constants._G
else:
# By default interpret as a plain integer, in the unit of Bytes.
memory = float(memory) / constants._G
if self.container_spec is None:
raise ValueError(
'There is no container specified in implementation')
if self.container_spec.resources is not None:
self.container_spec.resources.memory_limit = memory
else:
self.container_spec.resources = structures.ResourceSpec(
memory_limit=memory)
return self
[docs] def add_node_selector_constraint(self, accelerator: str) -> 'PipelineTask':
"""Sets accelerator type requirement for this task.
Args:
value(str): The name of the accelerator. Available values include
'NVIDIA_TESLA_K80', 'TPU_V3'.
Returns:
Self return to allow chained setting calls.
"""
if self.container_spec is None:
raise ValueError(
'There is no container specified in implementation')
if self.container_spec.resources is not None:
self.container_spec.resources.accelerator_type = accelerator
if self.container_spec.resources.accelerator_count is None:
self.container_spec.resources.accelerator_count = 1
else:
self.container_spec.resources = structures.ResourceSpec(
accelerator_count=1, accelerator_type=accelerator)
return self
[docs] def set_display_name(self, name: str) -> 'PipelineTask':
"""Set display name for the pipelineTask.
Args:
name(str): display name for the task.
Returns:
Self return to allow chained setting calls.
"""
self.task_spec.display_name = name
return self
[docs] def set_env_variable(self, name: str, value: str) -> 'PipelineTask':
"""Set environment variable for the pipelineTask.
Args:
name: The name of the environment variable.
value: The value of the environment variable.
Returns:
Self return to allow chained setting calls.
"""
if self.container_spec.env is not None:
self.container_spec.env[name] = value
else:
self.container_spec.env = {name: value}
return self
[docs] def after(self, *tasks) -> 'PipelineTask':
"""Specify explicit dependency on other tasks.
Args:
name(tasks): dependent tasks.
Returns:
Self return to allow chained setting calls.
"""
for task in tasks:
self.task_spec.dependent_tasks.append(task.name)
return self