Source code for google_cloud_pipeline_components.preview.model_evaluation.model_based_llm_evaluation.autosxs.autosxs_pipeline

# Copyright 2023 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Optimization AI Inference and AutoSxS pipeline function."""

from typing import Any, Dict, List

from google_cloud_pipeline_components import _placeholders
from google_cloud_pipeline_components._implementation.llm import arbiter_preprocess
from google_cloud_pipeline_components._implementation.llm import autosxs_arbiter
from google_cloud_pipeline_components._implementation.llm import autosxs_metrics_computer
from google_cloud_pipeline_components._implementation.llm import function_based
from google_cloud_pipeline_components._implementation.llm import task_preprocess
from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components.v1 import batch_predict_job
from kfp import dsl


# pylint: disable=no-value-for-parameter
@dsl.pipeline(
    name='predictions-pipeline',
    description='Runs the prediction pipeline for one of the two SxS models.',
)
def _get_predictions(
    name: str,
    project: str,
    location: str,
    model: str,
    model_parameters: Dict[str, str],
    prediction_inputs: List[str],
    is_model_inference: bool,
) -> str:
  """Makes predictions for a given model."""
  with dsl.If(is_model_inference == True, name='Inference Required'):  # pylint: disable=singleton-comparison
    get_vertex_model_task = dsl.importer(
        artifact_uri=(
            f'https://{location}-aiplatform.googleapis.com/v1/{model}'
        ),
        artifact_class=artifact_types.VertexModel,
        metadata={'resourceName': model},
    ).set_display_name('Import Vertex Model Artifact')

    batch_predict_task = batch_predict_job.ModelBatchPredictOp(
        project=project,
        location=location,
        model=get_vertex_model_task.outputs['artifact'],
        job_display_name=(
            f'autosxs-{name}-{{$.pipeline_job_uuid}}-{{$.pipeline_task_uuid}}'
        ),
        gcs_source_uris=prediction_inputs,
        instances_format='jsonl',
        predictions_format='jsonl',
        gcs_destination_output_uri_prefix=(
            f'{dsl.PIPELINE_ROOT_PLACEHOLDER}/{dsl.PIPELINE_TASK_ID_PLACEHOLDER}'
            f'/{name}_predictions'
        ),
        model_parameters=model_parameters,
    )
    prediction_uris_from_inference = function_based.get_uri(
        artifact=batch_predict_task.outputs['gcs_output_directory'],
        is_dir=True,
    )

  with dsl.Else(name='Responses Provided'):  # pylint: disable=singleton-comparison
    prediction_uris_inference_provided = function_based.get_empty_string()

  prediction_uris = dsl.OneOf(
      prediction_uris_from_inference.output,
      prediction_uris_inference_provided.output,
  )

  # We can't directly output dsl.OneOf, so we need to use identity.
  return function_based.identity(x=prediction_uris).output


# pylint: disable=dangerous-default-value,g-bare-generic
[docs]@dsl.pipeline( name='autosxs-template', description='Determines the SxS winrate between two models.', ) def autosxs_pipeline( evaluation_dataset: str, task: str, id_columns: List[str], model_a: str = '', model_b: str = '', autorater_prompt_parameters: Dict[str, Dict[str, str]] = {}, # pylint: disable=unused-argument model_a_prompt_parameters: Dict[str, Dict[str, str]] = {}, model_b_prompt_parameters: Dict[str, Dict[str, str]] = {}, response_column_a: str = '', response_column_b: str = '', model_a_parameters: Dict[str, str] = {}, model_b_parameters: Dict[str, str] = {}, human_preference_column: str = '', project: str = _placeholders.PROJECT_ID_PLACEHOLDER, location: str = _placeholders.LOCATION_PLACEHOLDER, judgments_format: str = 'jsonl', bigquery_destination_prefix: str = '', experimental_args: Dict[str, Any] = {}, ): """Evaluates two models side-by-side using an arbiter model. Args: evaluation_dataset: A list of GCS paths to a JSONL dataset containing evaluation examples. task: Evaluation task in the form {task}@{version}. task can be one of "summarization", "question_answer". Version is an integer with 3 digits or "latest". Ex: summarization@001 or question_answer@latest. id_columns: The columns which distinguish unique evaluation examples. model_a: A fully-qualified model resource name. This parameter is optional if Model A responses are specified. model_b: A fully-qualified model resource name. This parameter is optional if Model B responses are specified. autorater_prompt_parameters: Map of autorater prompt parameters to columns or templates. The expected parameters are: inference_instruction - Details on how to perform a task. inference_context - Content to reference to perform the task. model_a_prompt_parameters: Map of Model A prompt template parameters to columns or templates. model_b_prompt_parameters: Map of Model B prompt template parameters to columns or templates. response_column_a: The column containing responses for model A. Required if any response tables are provided for model A. response_column_b: The column containing responses for model B. Required if any response tables are provided for model B. model_a_parameters: The parameters that govern the predictions from model A. model_b_parameters: The parameters that govern the predictions from model B. human_preference_column: The column containing ground truths. Only required when users want to check the autorater alignment against human preference. project: Project used to run custom jobs. Default is the same project used to run the pipeline. location: Location used to run custom jobs. Default is the same location used to run the pipeline. judgments_format: The format to write judgments to. Can be either 'json' or 'bigquery'. bigquery_destination_prefix: BigQuery table to write judgments to if the specified format is 'bigquery'. experimental_args: Experimentally released arguments. Subject to change. """ prediction_inputs_a = task_preprocess.task_preprocess( evaluation_dataset=evaluation_dataset, task=task, model_prompt_parameters=model_a_prompt_parameters, response_column=response_column_a, human_preference_column=human_preference_column, id_columns=id_columns, ).set_display_name('Preprocess Model A Inputs') prediction_inputs_b = task_preprocess.task_preprocess( evaluation_dataset=evaluation_dataset, task=task, model_prompt_parameters=model_b_prompt_parameters, response_column=response_column_b, human_preference_column=human_preference_column, id_columns=id_columns, ).set_display_name('Preprocess Model B Inputs') is_model_a_inference = function_based.get_usage_metric( metadata=prediction_inputs_a.outputs['metadata'], key='is_model_inference', ).set_display_name('Read is_model_a_inference') is_model_b_inference = function_based.get_usage_metric( metadata=prediction_inputs_b.outputs['metadata'], key='is_model_inference', ).set_display_name('Read is_model_b_inference') inferrer_a = _get_predictions( name='A', project=project, location=location, model=model_a, model_parameters=model_a_parameters, prediction_inputs=prediction_inputs_a.outputs['prediction_inputs'], is_model_inference=is_model_a_inference.output, ).set_display_name('Model A Responses') inferrer_b = _get_predictions( name='B', project=project, location=location, model=model_b, model_parameters=model_b_parameters, prediction_inputs=prediction_inputs_b.outputs['prediction_inputs'], is_model_inference=is_model_b_inference.output, ).set_display_name('Model B Responses') arbiter_input_preprocess = arbiter_preprocess.arbiter_preprocess( autorater_prompt_parameters=autorater_prompt_parameters, evaluation_dataset=evaluation_dataset, id_columns=id_columns, prediction_uris_b=inferrer_b.output, prediction_uris_a=inferrer_a.output, model_a_prompt_parameters=model_a_prompt_parameters, model_b_prompt_parameters=model_b_prompt_parameters, task=task, response_column_a=response_column_a, response_column_b=response_column_b, human_preference_column=human_preference_column, is_bp_output_a=is_model_a_inference.output, is_bp_output_b=is_model_b_inference.output, ).set_display_name('Preprocess Predictions') autosxs_arbiter_task = autosxs_arbiter.autosxs_arbiter( inference_output_uri=arbiter_input_preprocess.outputs[ 'preprocessed_evaluation_dataset_uri' ], id_columns=id_columns, human_preference_column=human_preference_column, task=task, judgments_format=judgments_format, bigquery_destination_prefix=bigquery_destination_prefix, experimental_args=experimental_args, ).set_display_name('AutoSxS Arbiter') has_human_preference = function_based.get_usage_metric( metadata=prediction_inputs_a.outputs['metadata'], key='has_human_preference_column', ).set_display_name('Read has_human_preference_column') autosxs_metrics_computer.autosxs_metrics_computer( judgments_dir=autosxs_arbiter_task.outputs['judgments_uri'], has_human_preference=has_human_preference.output, ).set_display_name('AutoSxS Metrics')