# -*- coding: utf-8 -*-
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import inspect
import json
import pickle
import logging as std_logging
import warnings
from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union

from google.api_core import gapic_v1
from google.api_core import grpc_helpers_async
from google.api_core import exceptions as core_exceptions
from google.api_core import retry_async as retries
from google.auth import credentials as ga_credentials   # type: ignore
from google.auth.transport.grpc import SslCredentials  # type: ignore
from cloudsdk.google.protobuf.json_format import MessageToJson
import cloudsdk.google.protobuf.message

import grpc                        # type: ignore
import proto                       # type: ignore
from grpc.experimental import aio  # type: ignore

from google.api import httpbody_pb2  # type: ignore
from google.longrunning import operations_pb2 # type: ignore
from googlecloudsdk.generated_clients.gapic_clients.aiplatform_v1beta1.types import prediction_service
from .base import PredictionServiceTransport, DEFAULT_CLIENT_INFO
from .grpc import PredictionServiceGrpcTransport

try:
    from google.api_core import client_logging  # type: ignore
    CLIENT_LOGGING_SUPPORTED = True  # pragma: NO COVER
except ImportError:  # pragma: NO COVER
    CLIENT_LOGGING_SUPPORTED = False

_LOGGER = std_logging.getLogger(__name__)


class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor):  # pragma: NO COVER
    async def intercept_unary_unary(self, continuation, client_call_details, request):
        logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
        if logging_enabled:  # pragma: NO COVER
            request_metadata = client_call_details.metadata
            if isinstance(request, proto.Message):
                request_payload = type(request).to_json(request)
            elif isinstance(request, cloudsdk.google.protobuf.message.Message):
                request_payload = MessageToJson(request)
            else:
                request_payload = f"{type(request).__name__}: {pickle.dumps(request)}"

            request_metadata = {
                key: value.decode("utf-8") if isinstance(value, bytes) else value
                for key, value in request_metadata
            }
            grpc_request = {
                "payload": request_payload,
                "requestMethod": "grpc",
                "metadata": dict(request_metadata),
            }
            _LOGGER.debug(
                f"Sending request for {client_call_details.method}",
                extra = {
                    "serviceName": "google.cloud.aiplatform.v1beta1.PredictionService",
                    "rpcName": str(client_call_details.method),
                    "request": grpc_request,
                    "metadata": grpc_request["metadata"],
                },
            )
        response = await continuation(client_call_details, request)
        if logging_enabled:  # pragma: NO COVER
            response_metadata = await response.trailing_metadata()
            # Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples
            metadata = dict([(k, str(v)) for k, v in response_metadata]) if response_metadata else None
            result = await response
            if isinstance(result, proto.Message):
                response_payload = type(result).to_json(result)
            elif isinstance(result, cloudsdk.google.protobuf.message.Message):
                response_payload = MessageToJson(result)
            else:
                response_payload = f"{type(result).__name__}: {pickle.dumps(result)}"
            grpc_response = {
                "payload": response_payload,
                "metadata": metadata,
                "status": "OK",
            }
            _LOGGER.debug(
                f"Received response to rpc {client_call_details.method}.",
                extra = {
                    "serviceName": "google.cloud.aiplatform.v1beta1.PredictionService",
                    "rpcName": str(client_call_details.method),
                    "response": grpc_response,
                    "metadata": grpc_response["metadata"],
                },
            )
        return response


class PredictionServiceGrpcAsyncIOTransport(PredictionServiceTransport):
    """gRPC AsyncIO backend transport for PredictionService.

    A service for online predictions and explanations.

    This class defines the same methods as the primary client, so the
    primary client can load the underlying transport implementation
    and call it.

    It sends protocol buffers over the wire using gRPC (which is built on
    top of HTTP/2); the ``grpcio`` package must be installed.
    """

    _grpc_channel: aio.Channel
    _stubs: Dict[str, Callable] = {}

    @classmethod
    def create_channel(cls,
                       host: str = 'aiplatform.googleapis.com',
                       credentials: Optional[ga_credentials.Credentials] = None,
                       credentials_file: Optional[str] = None,
                       scopes: Optional[Sequence[str]] = None,
                       quota_project_id: Optional[str] = None,
                       **kwargs) -> aio.Channel:
        """Create and return a gRPC AsyncIO channel object.
        Args:
            host (Optional[str]): The host for the channel to use.
            credentials (Optional[~.Credentials]): The
                authorization credentials to attach to requests. These
                credentials identify this application to the service. If
                none are specified, the client will attempt to ascertain
                the credentials from the environment.
            credentials_file (Optional[str]): Deprecated. A file with credentials that can
                be loaded with :func:`google.auth.load_credentials_from_file`. This argument will be
                removed in the next major version of this library.
            scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
                service. These are only used when credentials are not specified and
                are passed to :func:`google.auth.default`.
            quota_project_id (Optional[str]): An optional project to use for billing
                and quota.
            kwargs (Optional[dict]): Keyword arguments, which are passed to the
                channel creation.
        Returns:
            aio.Channel: A gRPC AsyncIO channel object.
        """

        return grpc_helpers_async.create_channel(
            host,
            credentials=credentials,
            credentials_file=credentials_file,
            quota_project_id=quota_project_id,
            default_scopes=cls.AUTH_SCOPES,
            scopes=scopes,
            default_host=cls.DEFAULT_HOST,
            **kwargs
        )

    def __init__(self, *,
            host: str = 'aiplatform.googleapis.com',
            credentials: Optional[ga_credentials.Credentials] = None,
            credentials_file: Optional[str] = None,
            scopes: Optional[Sequence[str]] = None,
            channel: Optional[Union[aio.Channel, Callable[..., aio.Channel]]] = None,
            api_mtls_endpoint: Optional[str] = None,
            client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
            ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None,
            client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
            quota_project_id: Optional[str] = None,
            client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
            always_use_jwt_access: Optional[bool] = False,
            api_audience: Optional[str] = None,
            ) -> None:
        """Instantiate the transport.

        Args:
            host (Optional[str]):
                 The hostname to connect to (default: 'aiplatform.googleapis.com').
            credentials (Optional[google.auth.credentials.Credentials]): The
                authorization credentials to attach to requests. These
                credentials identify the application to the service; if none
                are specified, the client will attempt to ascertain the
                credentials from the environment.
                This argument is ignored if a ``channel`` instance is provided.
            credentials_file (Optional[str]): Deprecated. A file with credentials that can
                be loaded with :func:`google.auth.load_credentials_from_file`.
                This argument is ignored if a ``channel`` instance is provided.
                This argument will be removed in the next major version of this library.
            scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
                service. These are only used when credentials are not specified and
                are passed to :func:`google.auth.default`.
            channel (Optional[Union[aio.Channel, Callable[..., aio.Channel]]]):
                A ``Channel`` instance through which to make calls, or a Callable
                that constructs and returns one. If set to None, ``self.create_channel``
                is used to create the channel. If a Callable is given, it will be called
                with the same arguments as used in ``self.create_channel``.
            api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint.
                If provided, it overrides the ``host`` argument and tries to create
                a mutual TLS channel with client SSL credentials from
                ``client_cert_source`` or application default SSL credentials.
            client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]):
                Deprecated. A callback to provide client SSL certificate bytes and
                private key bytes, both in PEM format. It is ignored if
                ``api_mtls_endpoint`` is None.
            ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials
                for the grpc channel. It is ignored if a ``channel`` instance is provided.
            client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]):
                A callback to provide client certificate bytes and private key bytes,
                both in PEM format. It is used to configure a mutual TLS channel. It is
                ignored if a ``channel`` instance or ``ssl_channel_credentials`` is provided.
            quota_project_id (Optional[str]): An optional project to use for billing
                and quota.
            client_info (google.api_core.gapic_v1.client_info.ClientInfo):
                The client info used to send a user-agent string along with
                API requests. If ``None``, then default info will be used.
                Generally, you only need to set this if you're developing
                your own client library.
            always_use_jwt_access (Optional[bool]): Whether self signed JWT should
                be used for service account credentials.

        Raises:
            google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport
              creation failed for any reason.
          google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
              and ``credentials_file`` are passed.
        """
        self._grpc_channel = None
        self._ssl_channel_credentials = ssl_channel_credentials
        self._stubs: Dict[str, Callable] = {}

        if api_mtls_endpoint:
            warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning)
        if client_cert_source:
            warnings.warn("client_cert_source is deprecated", DeprecationWarning)

        if isinstance(channel, aio.Channel):
            # Ignore credentials if a channel was passed.
            credentials = None
            self._ignore_credentials = True
            # If a channel was explicitly provided, set it.
            self._grpc_channel = channel
            self._ssl_channel_credentials = None
        else:
            if api_mtls_endpoint:
                host = api_mtls_endpoint

                # Create SSL credentials with client_cert_source or application
                # default SSL credentials.
                if client_cert_source:
                    cert, key = client_cert_source()
                    self._ssl_channel_credentials = grpc.ssl_channel_credentials(
                        certificate_chain=cert, private_key=key
                    )
                else:
                    self._ssl_channel_credentials = SslCredentials().ssl_credentials

            else:
                if client_cert_source_for_mtls and not ssl_channel_credentials:
                    cert, key = client_cert_source_for_mtls()
                    self._ssl_channel_credentials = grpc.ssl_channel_credentials(
                        certificate_chain=cert, private_key=key
                    )

        # The base transport sets the host, credentials and scopes
        super().__init__(
            host=host,
            credentials=credentials,
            credentials_file=credentials_file,
            scopes=scopes,
            quota_project_id=quota_project_id,
            client_info=client_info,
            always_use_jwt_access=always_use_jwt_access,
            api_audience=api_audience,
        )

        if not self._grpc_channel:
            # initialize with the provided callable or the default channel
            channel_init = channel or type(self).create_channel
            self._grpc_channel = channel_init(
                self._host,
                # use the credentials which are saved
                credentials=self._credentials,
                # Set ``credentials_file`` to ``None`` here as
                # the credentials that we saved earlier should be used.
                credentials_file=None,
                scopes=self._scopes,
                ssl_credentials=self._ssl_channel_credentials,
                quota_project_id=quota_project_id,
                options=[
                    ("grpc.max_send_message_length", -1),
                    ("grpc.max_receive_message_length", -1),
                ],
            )

        self._interceptor = _LoggingClientAIOInterceptor()
        self._grpc_channel._unary_unary_interceptors.append(self._interceptor)
        self._logged_channel = self._grpc_channel
        self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters
        # Wrap messages. This must be done after self._logged_channel exists
        self._prep_wrapped_messages(client_info)

    @property
    def grpc_channel(self) -> aio.Channel:
        """Create the channel designed to connect to this service.

        This property caches on the instance; repeated calls return
        the same channel.
        """
        # Return the channel from cache.
        return self._grpc_channel

    @property
    def predict(self) -> Callable[
            [prediction_service.PredictRequest],
            Awaitable[prediction_service.PredictResponse]]:
        r"""Return a callable for the predict method over gRPC.

        Perform an online prediction.

        Returns:
            Callable[[~.PredictRequest],
                    Awaitable[~.PredictResponse]]:
                A function that, when called, will call the underlying RPC
                on the server.
        """
        # Generate a "stub function" on-the-fly which will actually make
        # the request.
        # gRPC handles serialization and deserialization, so we just need
        # to pass in the functions for each.
        if 'predict' not in self._stubs:
            self._stubs['predict'] = self._logged_channel.unary_unary(
                '/google.cloud.aiplatform.v1beta1.PredictionService/Predict',
                request_serializer=prediction_service.PredictRequest.serialize,
                response_deserializer=prediction_service.PredictResponse.deserialize,
            )
        return self._stubs['predict']

    @property
    def raw_predict(self) -> Callable[
            [prediction_service.RawPredictRequest],
            Awaitable[httpbody_pb2.HttpBody]]:
        r"""Return a callable for the raw predict method over gRPC.

        Perform an online prediction with an arbitrary HTTP payload.

        The response includes the following HTTP headers:

        - ``X-Vertex-AI-Endpoint-Id``: ID of the
          [Endpoint][google.cloud.aiplatform.v1beta1.Endpoint] that
          served this prediction.

        - ``X-Vertex-AI-Deployed-Model-Id``: ID of the Endpoint's
          [DeployedModel][google.cloud.aiplatform.v1beta1.DeployedModel]
          that served this prediction.

        Returns:
            Callable[[~.RawPredictRequest],
                    Awaitable[~.HttpBody]]:
                A function that, when called, will call the underlying RPC
                on the server.
        """
        # Generate a "stub function" on-the-fly which will actually make
        # the request.
        # gRPC handles serialization and deserialization, so we just need
        # to pass in the functions for each.
        if 'raw_predict' not in self._stubs:
            self._stubs['raw_predict'] = self._logged_channel.unary_unary(
                '/google.cloud.aiplatform.v1beta1.PredictionService/RawPredict',
                request_serializer=prediction_service.RawPredictRequest.serialize,
                response_deserializer=httpbody_pb2.HttpBody.FromString,
            )
        return self._stubs['raw_predict']

    @property
    def stream_raw_predict(self) -> Callable[
            [prediction_service.StreamRawPredictRequest],
            Awaitable[httpbody_pb2.HttpBody]]:
        r"""Return a callable for the stream raw predict method over gRPC.

        Perform a streaming online prediction with an
        arbitrary HTTP payload.

        Returns:
            Callable[[~.StreamRawPredictRequest],
                    Awaitable[~.HttpBody]]:
                A function that, when called, will call the underlying RPC
                on the server.
        """
        # Generate a "stub function" on-the-fly which will actually make
        # the request.
        # gRPC handles serialization and deserialization, so we just need
        # to pass in the functions for each.
        if 'stream_raw_predict' not in self._stubs:
            self._stubs['stream_raw_predict'] = self._logged_channel.unary_stream(
                '/google.cloud.aiplatform.v1beta1.PredictionService/StreamRawPredict',
                request_serializer=prediction_service.StreamRawPredictRequest.serialize,
                response_deserializer=httpbody_pb2.HttpBody.FromString,
            )
        return self._stubs['stream_raw_predict']

    @property
    def direct_predict(self) -> Callable[
            [prediction_service.DirectPredictRequest],
            Awaitable[prediction_service.DirectPredictResponse]]:
        r"""Return a callable for the direct predict method over gRPC.

        Perform an unary online prediction request to a gRPC
        model server for Vertex first-party products and
        frameworks.

        Returns:
            Callable[[~.DirectPredictRequest],
                    Awaitable[~.DirectPredictResponse]]:
                A function that, when called, will call the underlying RPC
                on the server.
        """
        # Generate a "stub function" on-the-fly which will actually make
        # the request.
        # gRPC handles serialization and deserialization, so we just need
        # to pass in the functions for each.
        if 'direct_predict' not in self._stubs:
            self._stubs['direct_predict'] = self._logged_channel.unary_unary(
                '/google.cloud.aiplatform.v1beta1.PredictionService/DirectPredict',
                request_serializer=prediction_service.DirectPredictRequest.serialize,
                response_deserializer=prediction_service.DirectPredictResponse.deserialize,
            )
        return self._stubs['direct_predict']

    @property
    def direct_raw_predict(self) -> Callable[
            [prediction_service.DirectRawPredictRequest],
            Awaitable[prediction_service.DirectRawPredictResponse]]:
        r"""Return a callable for the direct raw predict method over gRPC.

        Perform an unary online prediction request to a gRPC
        model server for custom containers.

        Returns:
            Callable[[~.DirectRawPredictRequest],
                    Awaitable[~.DirectRawPredictResponse]]:
                A function that, when called, will call the underlying RPC
                on the server.
        """
        # Generate a "stub function" on-the-fly which will actually make
        # the request.
        # gRPC handles serialization and deserialization, so we just need
        # to pass in the functions for each.
        if 'direct_raw_predict' not in self._stubs:
            self._stubs['direct_raw_predict'] = self._logged_channel.unary_unary(
                '/google.cloud.aiplatform.v1beta1.PredictionService/DirectRawPredict',
                request_serializer=prediction_service.DirectRawPredictRequest.serialize,
                response_deserializer=prediction_service.DirectRawPredictResponse.deserialize,
            )
        return self._stubs['direct_raw_predict']

    @property
    def stream_direct_predict(self) -> Callable[
            [prediction_service.StreamDirectPredictRequest],
            Awaitable[prediction_service.StreamDirectPredictResponse]]:
        r"""Return a callable for the stream direct predict method over gRPC.

        Perform a streaming online prediction request to a
        gRPC model server for Vertex first-party products and
        frameworks.

        Returns:
            Callable[[~.StreamDirectPredictRequest],
                    Awaitable[~.StreamDirectPredictResponse]]:
                A function that, when called, will call the underlying RPC
                on the server.
        """
        # Generate a "stub function" on-the-fly which will actually make
        # the request.
        # gRPC handles serialization and deserialization, so we just need
        # to pass in the functions for each.
        if 'stream_direct_predict' not in self._stubs:
            self._stubs['stream_direct_predict'] = self._logged_channel.stream_stream(
                '/google.cloud.aiplatform.v1beta1.PredictionService/StreamDirectPredict',
                request_serializer=prediction_service.StreamDirectPredictRequest.serialize,
                response_deserializer=prediction_service.StreamDirectPredictResponse.deserialize,
            )
        return self._stubs['stream_direct_predict']

    @property
    def stream_direct_raw_predict(self) -> Callable[
            [prediction_service.StreamDirectRawPredictRequest],
            Awaitable[prediction_service.StreamDirectRawPredictResponse]]:
        r"""Return a callable for the stream direct raw predict method over gRPC.

        Perform a streaming online prediction request to a
        gRPC model server for custom containers.

        Returns:
            Callable[[~.StreamDirectRawPredictRequest],
                    Awaitable[~.StreamDirectRawPredictResponse]]:
                A function that, when called, will call the underlying RPC
                on the server.
        """
        # Generate a "stub function" on-the-fly which will actually make
        # the request.
        # gRPC handles serialization and deserialization, so we just need
        # to pass in the functions for each.
        if 'stream_direct_raw_predict' not in self._stubs:
            self._stubs['stream_direct_raw_predict'] = self._logged_channel.stream_stream(
                '/google.cloud.aiplatform.v1beta1.PredictionService/StreamDirectRawPredict',
                request_serializer=prediction_service.StreamDirectRawPredictRequest.serialize,
                response_deserializer=prediction_service.StreamDirectRawPredictResponse.deserialize,
            )
        return self._stubs['stream_direct_raw_predict']

    @property
    def streaming_predict(self) -> Callable[
            [prediction_service.StreamingPredictRequest],
            Awaitable[prediction_service.StreamingPredictResponse]]:
        r"""Return a callable for the streaming predict method over gRPC.

        Perform a streaming online prediction request for
        Vertex first-party products and frameworks.

        Returns:
            Callable[[~.StreamingPredictRequest],
                    Awaitable[~.StreamingPredictResponse]]:
                A function that, when called, will call the underlying RPC
                on the server.
        """
        # Generate a "stub function" on-the-fly which will actually make
        # the request.
        # gRPC handles serialization and deserialization, so we just need
        # to pass in the functions for each.
        if 'streaming_predict' not in self._stubs:
            self._stubs['streaming_predict'] = self._logged_channel.stream_stream(
                '/google.cloud.aiplatform.v1beta1.PredictionService/StreamingPredict',
                request_serializer=prediction_service.StreamingPredictRequest.serialize,
                response_deserializer=prediction_service.StreamingPredictResponse.deserialize,
            )
        return self._stubs['streaming_predict']

    @property
    def server_streaming_predict(self) -> Callable[
            [prediction_service.StreamingPredictRequest],
            Awaitable[prediction_service.StreamingPredictResponse]]:
        r"""Return a callable for the server streaming predict method over gRPC.

        Perform a server-side streaming online prediction
        request for Vertex LLM streaming.

        Returns:
            Callable[[~.StreamingPredictRequest],
                    Awaitable[~.StreamingPredictResponse]]:
                A function that, when called, will call the underlying RPC
                on the server.
        """
        # Generate a "stub function" on-the-fly which will actually make
        # the request.
        # gRPC handles serialization and deserialization, so we just need
        # to pass in the functions for each.
        if 'server_streaming_predict' not in self._stubs:
            self._stubs['server_streaming_predict'] = self._logged_channel.unary_stream(
                '/google.cloud.aiplatform.v1beta1.PredictionService/ServerStreamingPredict',
                request_serializer=prediction_service.StreamingPredictRequest.serialize,
                response_deserializer=prediction_service.StreamingPredictResponse.deserialize,
            )
        return self._stubs['server_streaming_predict']

    @property
    def streaming_raw_predict(self) -> Callable[
            [prediction_service.StreamingRawPredictRequest],
            Awaitable[prediction_service.StreamingRawPredictResponse]]:
        r"""Return a callable for the streaming raw predict method over gRPC.

        Perform a streaming online prediction request through
        gRPC.

        Returns:
            Callable[[~.StreamingRawPredictRequest],
                    Awaitable[~.StreamingRawPredictResponse]]:
                A function that, when called, will call the underlying RPC
                on the server.
        """
        # Generate a "stub function" on-the-fly which will actually make
        # the request.
        # gRPC handles serialization and deserialization, so we just need
        # to pass in the functions for each.
        if 'streaming_raw_predict' not in self._stubs:
            self._stubs['streaming_raw_predict'] = self._logged_channel.stream_stream(
                '/google.cloud.aiplatform.v1beta1.PredictionService/StreamingRawPredict',
                request_serializer=prediction_service.StreamingRawPredictRequest.serialize,
                response_deserializer=prediction_service.StreamingRawPredictResponse.deserialize,
            )
        return self._stubs['streaming_raw_predict']

    @property
    def predict_long_running(self) -> Callable[
            [prediction_service.PredictLongRunningRequest],
            Awaitable[operations_pb2.Operation]]:
        r"""Return a callable for the predict long running method over gRPC.

        Returns:
            Callable[[~.PredictLongRunningRequest],
                    Awaitable[~.Operation]]:
                A function that, when called, will call the underlying RPC
                on the server.
        """
        # Generate a "stub function" on-the-fly which will actually make
        # the request.
        # gRPC handles serialization and deserialization, so we just need
        # to pass in the functions for each.
        if 'predict_long_running' not in self._stubs:
            self._stubs['predict_long_running'] = self._logged_channel.unary_unary(
                '/google.cloud.aiplatform.v1beta1.PredictionService/PredictLongRunning',
                request_serializer=prediction_service.PredictLongRunningRequest.serialize,
                response_deserializer=operations_pb2.Operation.FromString,
            )
        return self._stubs['predict_long_running']

    @property
    def fetch_predict_operation(self) -> Callable[
            [prediction_service.FetchPredictOperationRequest],
            Awaitable[operations_pb2.Operation]]:
        r"""Return a callable for the fetch predict operation method over gRPC.

        Fetch an asynchronous online prediction operation.

        Returns:
            Callable[[~.FetchPredictOperationRequest],
                    Awaitable[~.Operation]]:
                A function that, when called, will call the underlying RPC
                on the server.
        """
        # Generate a "stub function" on-the-fly which will actually make
        # the request.
        # gRPC handles serialization and deserialization, so we just need
        # to pass in the functions for each.
        if 'fetch_predict_operation' not in self._stubs:
            self._stubs['fetch_predict_operation'] = self._logged_channel.unary_unary(
                '/google.cloud.aiplatform.v1beta1.PredictionService/FetchPredictOperation',
                request_serializer=prediction_service.FetchPredictOperationRequest.serialize,
                response_deserializer=operations_pb2.Operation.FromString,
            )
        return self._stubs['fetch_predict_operation']

    @property
    def invoke(self) -> Callable[
            [prediction_service.InvokeRequest],
            Awaitable[httpbody_pb2.HttpBody]]:
        r"""Return a callable for the invoke method over gRPC.

        Forwards arbitrary HTTP requests for both streaming and
        non-streaming cases. To use this method,
        [invoke_route_prefix][Model.container_spec.invoke_route_prefix]
        must be set to allow the paths that will be specified in the
        request.

        Returns:
            Callable[[~.InvokeRequest],
                    Awaitable[~.HttpBody]]:
                A function that, when called, will call the underlying RPC
                on the server.
        """
        # Generate a "stub function" on-the-fly which will actually make
        # the request.
        # gRPC handles serialization and deserialization, so we just need
        # to pass in the functions for each.
        if 'invoke' not in self._stubs:
            self._stubs['invoke'] = self._logged_channel.unary_stream(
                '/google.cloud.aiplatform.v1beta1.PredictionService/Invoke',
                request_serializer=prediction_service.InvokeRequest.serialize,
                response_deserializer=httpbody_pb2.HttpBody.FromString,
            )
        return self._stubs['invoke']

    @property
    def explain(self) -> Callable[
            [prediction_service.ExplainRequest],
            Awaitable[prediction_service.ExplainResponse]]:
        r"""Return a callable for the explain method over gRPC.

        Perform an online explanation.

        If
        [deployed_model_id][google.cloud.aiplatform.v1beta1.ExplainRequest.deployed_model_id]
        is specified, the corresponding DeployModel must have
        [explanation_spec][google.cloud.aiplatform.v1beta1.DeployedModel.explanation_spec]
        populated. If
        [deployed_model_id][google.cloud.aiplatform.v1beta1.ExplainRequest.deployed_model_id]
        is not specified, all DeployedModels must have
        [explanation_spec][google.cloud.aiplatform.v1beta1.DeployedModel.explanation_spec]
        populated.

        Returns:
            Callable[[~.ExplainRequest],
                    Awaitable[~.ExplainResponse]]:
                A function that, when called, will call the underlying RPC
                on the server.
        """
        # Generate a "stub function" on-the-fly which will actually make
        # the request.
        # gRPC handles serialization and deserialization, so we just need
        # to pass in the functions for each.
        if 'explain' not in self._stubs:
            self._stubs['explain'] = self._logged_channel.unary_unary(
                '/google.cloud.aiplatform.v1beta1.PredictionService/Explain',
                request_serializer=prediction_service.ExplainRequest.serialize,
                response_deserializer=prediction_service.ExplainResponse.deserialize,
            )
        return self._stubs['explain']

    @property
    def count_tokens(self) -> Callable[
            [prediction_service.CountTokensRequest],
            Awaitable[prediction_service.CountTokensResponse]]:
        r"""Return a callable for the count tokens method over gRPC.

        Perform a token counting.

        Returns:
            Callable[[~.CountTokensRequest],
                    Awaitable[~.CountTokensResponse]]:
                A function that, when called, will call the underlying RPC
                on the server.
        """
        # Generate a "stub function" on-the-fly which will actually make
        # the request.
        # gRPC handles serialization and deserialization, so we just need
        # to pass in the functions for each.
        if 'count_tokens' not in self._stubs:
            self._stubs['count_tokens'] = self._logged_channel.unary_unary(
                '/google.cloud.aiplatform.v1beta1.PredictionService/CountTokens',
                request_serializer=prediction_service.CountTokensRequest.serialize,
                response_deserializer=prediction_service.CountTokensResponse.deserialize,
            )
        return self._stubs['count_tokens']

    @property
    def generate_content(self) -> Callable[
            [prediction_service.GenerateContentRequest],
            Awaitable[prediction_service.GenerateContentResponse]]:
        r"""Return a callable for the generate content method over gRPC.

        Generate content with multimodal inputs.

        Returns:
            Callable[[~.GenerateContentRequest],
                    Awaitable[~.GenerateContentResponse]]:
                A function that, when called, will call the underlying RPC
                on the server.
        """
        # Generate a "stub function" on-the-fly which will actually make
        # the request.
        # gRPC handles serialization and deserialization, so we just need
        # to pass in the functions for each.
        if 'generate_content' not in self._stubs:
            self._stubs['generate_content'] = self._logged_channel.unary_unary(
                '/google.cloud.aiplatform.v1beta1.PredictionService/GenerateContent',
                request_serializer=prediction_service.GenerateContentRequest.serialize,
                response_deserializer=prediction_service.GenerateContentResponse.deserialize,
            )
        return self._stubs['generate_content']

    @property
    def stream_generate_content(self) -> Callable[
            [prediction_service.GenerateContentRequest],
            Awaitable[prediction_service.GenerateContentResponse]]:
        r"""Return a callable for the stream generate content method over gRPC.

        Generate content with multimodal inputs with
        streaming support.

        Returns:
            Callable[[~.GenerateContentRequest],
                    Awaitable[~.GenerateContentResponse]]:
                A function that, when called, will call the underlying RPC
                on the server.
        """
        # Generate a "stub function" on-the-fly which will actually make
        # the request.
        # gRPC handles serialization and deserialization, so we just need
        # to pass in the functions for each.
        if 'stream_generate_content' not in self._stubs:
            self._stubs['stream_generate_content'] = self._logged_channel.unary_stream(
                '/google.cloud.aiplatform.v1beta1.PredictionService/StreamGenerateContent',
                request_serializer=prediction_service.GenerateContentRequest.serialize,
                response_deserializer=prediction_service.GenerateContentResponse.deserialize,
            )
        return self._stubs['stream_generate_content']

    @property
    def chat_completions(self) -> Callable[
            [prediction_service.ChatCompletionsRequest],
            Awaitable[httpbody_pb2.HttpBody]]:
        r"""Return a callable for the chat completions method over gRPC.

        Exposes an OpenAI-compatible endpoint for chat
        completions.

        Returns:
            Callable[[~.ChatCompletionsRequest],
                    Awaitable[~.HttpBody]]:
                A function that, when called, will call the underlying RPC
                on the server.
        """
        # Generate a "stub function" on-the-fly which will actually make
        # the request.
        # gRPC handles serialization and deserialization, so we just need
        # to pass in the functions for each.
        if 'chat_completions' not in self._stubs:
            self._stubs['chat_completions'] = self._logged_channel.unary_stream(
                '/google.cloud.aiplatform.v1beta1.PredictionService/ChatCompletions',
                request_serializer=prediction_service.ChatCompletionsRequest.serialize,
                response_deserializer=httpbody_pb2.HttpBody.FromString,
            )
        return self._stubs['chat_completions']

    @property
    def embed_content(self) -> Callable[
            [prediction_service.EmbedContentRequest],
            Awaitable[prediction_service.EmbedContentResponse]]:
        r"""Return a callable for the embed content method over gRPC.

        Embed content with multimodal inputs.

        Returns:
            Callable[[~.EmbedContentRequest],
                    Awaitable[~.EmbedContentResponse]]:
                A function that, when called, will call the underlying RPC
                on the server.
        """
        # Generate a "stub function" on-the-fly which will actually make
        # the request.
        # gRPC handles serialization and deserialization, so we just need
        # to pass in the functions for each.
        if 'embed_content' not in self._stubs:
            self._stubs['embed_content'] = self._logged_channel.unary_unary(
                '/google.cloud.aiplatform.v1beta1.PredictionService/EmbedContent',
                request_serializer=prediction_service.EmbedContentRequest.serialize,
                response_deserializer=prediction_service.EmbedContentResponse.deserialize,
            )
        return self._stubs['embed_content']

    def _prep_wrapped_messages(self, client_info):
        """ Precompute the wrapped methods, overriding the base class method to use async wrappers."""
        self._wrapped_methods = {
            self.predict: self._wrap_method(
                self.predict,
                default_timeout=None,
                client_info=client_info,
            ),
            self.raw_predict: self._wrap_method(
                self.raw_predict,
                default_timeout=None,
                client_info=client_info,
            ),
            self.stream_raw_predict: self._wrap_method(
                self.stream_raw_predict,
                default_timeout=None,
                client_info=client_info,
            ),
            self.direct_predict: self._wrap_method(
                self.direct_predict,
                default_timeout=None,
                client_info=client_info,
            ),
            self.direct_raw_predict: self._wrap_method(
                self.direct_raw_predict,
                default_timeout=None,
                client_info=client_info,
            ),
            self.stream_direct_predict: self._wrap_method(
                self.stream_direct_predict,
                default_timeout=5.0,
                client_info=client_info,
            ),
            self.stream_direct_raw_predict: self._wrap_method(
                self.stream_direct_raw_predict,
                default_timeout=5.0,
                client_info=client_info,
            ),
            self.streaming_predict: self._wrap_method(
                self.streaming_predict,
                default_timeout=None,
                client_info=client_info,
            ),
            self.server_streaming_predict: self._wrap_method(
                self.server_streaming_predict,
                default_timeout=None,
                client_info=client_info,
            ),
            self.streaming_raw_predict: self._wrap_method(
                self.streaming_raw_predict,
                default_timeout=None,
                client_info=client_info,
            ),
            self.predict_long_running: self._wrap_method(
                self.predict_long_running,
                default_timeout=None,
                client_info=client_info,
            ),
            self.fetch_predict_operation: self._wrap_method(
                self.fetch_predict_operation,
                default_timeout=None,
                client_info=client_info,
            ),
            self.invoke: self._wrap_method(
                self.invoke,
                default_timeout=None,
                client_info=client_info,
            ),
            self.explain: self._wrap_method(
                self.explain,
                default_timeout=None,
                client_info=client_info,
            ),
            self.count_tokens: self._wrap_method(
                self.count_tokens,
                default_timeout=None,
                client_info=client_info,
            ),
            self.generate_content: self._wrap_method(
                self.generate_content,
                default_timeout=None,
                client_info=client_info,
            ),
            self.stream_generate_content: self._wrap_method(
                self.stream_generate_content,
                default_timeout=None,
                client_info=client_info,
            ),
            self.chat_completions: self._wrap_method(
                self.chat_completions,
                default_timeout=None,
                client_info=client_info,
            ),
            self.embed_content: self._wrap_method(
                self.embed_content,
                default_timeout=None,
                client_info=client_info,
            ),
        }

    def _wrap_method(self, func, *args, **kwargs):
        if self._wrap_with_kind:  # pragma: NO COVER
            kwargs["kind"] = self.kind
        return gapic_v1.method_async.wrap_method(func, *args, **kwargs)

    def close(self):
        return self._logged_channel.close()

    @property
    def kind(self) -> str:
        return "grpc_asyncio"


__all__ = (
    'PredictionServiceGrpcAsyncIOTransport',
)
