# -*- coding: utf-8 -*- # # Copyright 2022 Google LLC. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Utilities for the parsing input for cloud build v2 API.""" from apitools.base.py import encoding from googlecloudsdk.api_lib.cloudbuild import cloudbuild_exceptions from googlecloudsdk.api_lib.cloudbuild.v2 import client_util from googlecloudsdk.api_lib.cloudbuild.v2 import input_util from googlecloudsdk.core import log _WORKER_POOL_ANNOTATION = "cloudbuild.googleapis.com/worker-pool" _MANAGED_SIDECARS_ANNOTATION = "cloudbuild.googleapis.com/managed-sidecars" _MACHINE_TYPE = "cloudbuild.googleapis.com/worker/machine-type" _PROVENANCE_ENABLED = "cloudbuild.googleapis.com/provenance/enabled" _PROVENANCE_STORAGE = "cloudbuild.googleapis.com/provenance/storage" _PROVENANCE_REGION = "cloudbuild.googleapis.com/provenance/region" def TektonYamlDataToPipelineRun(data): """Convert Tekton yaml file into PipelineRun message.""" _VersionCheck(data) _MetadataTransform(data) spec = data["spec"] if "pipelineSpec" in spec: _PipelineSpecTransform(spec["pipelineSpec"]) elif "pipelineRef" in spec: input_util.RefTransform(spec["pipelineRef"]) else: raise cloudbuild_exceptions.InvalidYamlError( "PipelineSpec or PipelineRef is required.") if "resources" in spec: spec.pop("resources") log.warning( "PipelineResources are dropped because they are deprecated: " "https://github.com/tektoncd/pipeline/blob/main/docs/resources.md") _ServiceAccountTransformPipelineSpec(spec) input_util.ParamDictTransform(spec.get("params", [])) messages = client_util.GetMessagesModule() _CheckSpecKeys(data, spec) data.update(spec) data.pop("spec") data.pop("kind") schema_message = encoding.DictToMessage(data, messages.PipelineRun) input_util.UnrecognizedFields(schema_message) return schema_message def _CheckSpecKeys(data, spec): for key in spec.keys(): if key in data: raise cloudbuild_exceptions.InvalidYamlError( "{0} only needs to be defined in spec".format(key) ) def TektonYamlDataToTaskRun(data): """Convert Tekton yaml file into TaskRun message.""" _VersionCheck(data) metadata = _MetadataTransform(data) spec = data["spec"] if "taskSpec" in spec: _TaskSpecTransform(spec["taskSpec"]) managed_sidecars = _MetadataToSidecar(metadata) if managed_sidecars: spec["taskSpec"]["managedSidecars"] = managed_sidecars elif "taskRef" in spec: input_util.RefTransform(spec["taskRef"]) else: raise cloudbuild_exceptions.InvalidYamlError( "TaskSpec or TaskRef is required.") _ServiceAccountTransformTaskSpec(spec) input_util.ParamDictTransform(spec.get("params", [])) messages = client_util.GetMessagesModule() _CheckSpecKeys(data, spec) data.update(spec) data.pop("spec") data.pop("kind") schema_message = encoding.DictToMessage(data, messages.TaskRun) input_util.UnrecognizedFields(schema_message) return schema_message def _VersionCheck(data): api_version = data.pop("apiVersion") if api_version != "tekton.dev/v1" and api_version != "tekton.dev/v1beta1": raise cloudbuild_exceptions.TektonVersionError() def _MetadataTransform(data): """Helper funtion to transform the metadata.""" spec = data["spec"] if not spec: raise cloudbuild_exceptions.InvalidYamlError("spec is empty.") metadata = data.pop("metadata") if not metadata: raise cloudbuild_exceptions.InvalidYamlError("Metadata is missing in yaml.") annotations = metadata.get("annotations", {}) if _WORKER_POOL_ANNOTATION in annotations: spec["workerPool"] = annotations[_WORKER_POOL_ANNOTATION] spec["annotations"] = annotations if _MACHINE_TYPE in annotations: spec["worker"] = {"machineType": annotations[_MACHINE_TYPE]} security = {} if security: spec["security"] = security provenance = {} if _PROVENANCE_ENABLED in annotations: provenance["enabled"] = annotations[_PROVENANCE_ENABLED].upper() if _PROVENANCE_STORAGE in annotations: provenance["storage"] = annotations[_PROVENANCE_STORAGE].upper() if _PROVENANCE_REGION in annotations: provenance["region"] = annotations[_PROVENANCE_REGION].upper() if provenance: spec["provenance"] = provenance return metadata def _MetadataToSidecar(metadata): if "annotations" in metadata and _MANAGED_SIDECARS_ANNOTATION in metadata[ "annotations"]: return metadata["annotations"][_MANAGED_SIDECARS_ANNOTATION] return None def _PipelineSpecTransform(spec): for param_spec in spec.get("params", []): input_util.ParamSpecTransform(param_spec) for task in spec["tasks"]: _TaskTransform(task) if "finally" in spec: finally_tasks = spec.pop("finally") for task in finally_tasks: _TaskTransform(task) spec["finallyTasks"] = finally_tasks for pipeline_result in spec.get("results", []): input_util.PipelineResultTransform(pipeline_result) def _TaskSpecTransform(spec): for param_spec in spec.get("params", []): input_util.ParamSpecTransform(param_spec) for task_result in spec.get("results", []): input_util.TaskResultTransform(task_result) for task_step in spec.get("steps", []): input_util.TaskStepTransform(task_step) def _TaskTransform(task): """Transform task message.""" if "taskSpec" in task: task_spec = task.pop("taskSpec") _TaskSpecTransform(task_spec) managed_sidecars = _MetadataToSidecar( task_spec.pop("metadata")) if "metadata" in task_spec else [] if managed_sidecars: task_spec["managedSidecars"] = managed_sidecars task["taskSpec"] = {"taskSpec": task_spec} if "taskRef" in task: input_util.RefTransform(task["taskRef"]) whens = task.pop("when", []) for when in whens: if "operator" in when: when["expressionOperator"] = input_util.CamelToSnake( when.pop("operator")).upper() task["whenExpressions"] = whens input_util.ParamDictTransform(task.get("params", [])) def _ServiceAccountTransformPipelineSpec(spec): if "taskRunTemplate" in spec: if "serviceAccountName" in spec["taskRunTemplate"]: sa = spec.pop("taskRunTemplate").pop("serviceAccountName") security = spec.setdefault("security", {}) security["serviceAccount"] = sa return raise cloudbuild_exceptions.InvalidYamlError( "spec.taskRunTemplate.serviceAccountName is required." ) def _ServiceAccountTransformTaskSpec(spec): if "serviceAccountName" in spec: sa = spec.pop("serviceAccountName") spec["serviceAccount"] = sa security = spec.setdefault("security", {}) security["serviceAccount"] = sa