package io.cdap.plugin.gcp.publisher.source;

import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.auth.Credentials;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.pubsub.v1.PushConfig;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.etl.api.streaming.StreamingContext;
import io.cdap.plugin.gcp.common.GCPUtils;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Set;
import java.util.function.BooleanSupplier;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.threeten.bp.Duration;
import scala.collection.Iterable;
import scala.collection.JavaConverters;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;

/* loaded from: input_file:io/cdap/plugin/gcp/publisher/source/PubSubSubscriberUtil.class */
public final class PubSubSubscriberUtil {
    protected static final Logger LOG = LoggerFactory.getLogger(PubSubSubscriberUtil.class);
    private static final int RESOURCE_EXHAUSTED = StatusCode.Code.RESOURCE_EXHAUSTED.getHttpStatusCode();
    private static final int CANCELLED = StatusCode.Code.CANCELLED.getHttpStatusCode();
    private static final int INTERNAL = StatusCode.Code.INTERNAL.getHttpStatusCode();
    private static final int UNAVAILABLE = StatusCode.Code.UNAVAILABLE.getHttpStatusCode();
    private static final int DEADLINE_EXCEEDED = StatusCode.Code.DEADLINE_EXCEEDED.getHttpStatusCode();
    private static final Set<Integer> RETRYABLE_STATUS_CODES = (Set) Stream.of((Object[]) new Integer[]{Integer.valueOf(RESOURCE_EXHAUSTED), Integer.valueOf(CANCELLED), Integer.valueOf(INTERNAL), Integer.valueOf(UNAVAILABLE), Integer.valueOf(DEADLINE_EXCEEDED)}).collect(Collectors.toSet());
    private static final int MAX_ATTEMPTS = 5;

    private PubSubSubscriberUtil() {
    }

    public static <T> JavaDStream<T> getStream(StreamingContext streamingContext, PubSubSubscriberConfig pubSubSubscriberConfig, SerializableFunction<PubSubMessage, T> serializableFunction) throws Exception {
        boolean z = true;
        if (streamingContext.isPreviewEnabled()) {
            z = false;
        }
        return getInputDStream(streamingContext, pubSubSubscriberConfig, z, serializableFunction);
    }

    protected static <T> JavaDStream<T> getInputDStream(StreamingContext streamingContext, PubSubSubscriberConfig pubSubSubscriberConfig, boolean z, SerializableFunction<PubSubMessage, T> serializableFunction) {
        if (streamingContext.isStateStoreEnabled()) {
            return new JavaDStream<>(new PubSubDirectDStream(streamingContext, pubSubSubscriberConfig, streamingContext.getBatchInterval(), z, serializableFunction), ClassTag$.MODULE$.apply(PubSubMessage.class));
        }
        ArrayList arrayList = new ArrayList(pubSubSubscriberConfig.getNumberOfReaders().intValue());
        ClassTag apply = ClassTag$.MODULE$.apply(PubSubMessage.class);
        for (int i = 1; i <= pubSubSubscriberConfig.getNumberOfReaders().intValue(); i++) {
            arrayList.add(new PubSubInputDStream(streamingContext.getSparkStreamingContext().ssc(), pubSubSubscriberConfig, StorageLevel.MEMORY_ONLY(), z));
        }
        return new JavaDStream(streamingContext.getSparkStreamingContext().ssc().union(((Iterable) JavaConverters.collectionAsScalaIterableConverter(arrayList).asScala()).toSeq(), apply), apply).map(pubSubMessage -> {
            return serializableFunction.apply(pubSubMessage);
        });
    }

    public static void createSubscription(BooleanSupplier booleanSupplier, BackoffConfig backoffConfig, String str, String str2, Supplier<SubscriptionAdminClient> supplier, Predicate<ApiException> predicate) throws InterruptedException, IOException {
        int initialBackoffMs = backoffConfig.getInitialBackoffMs();
        int i = 5;
        ApiException apiException = null;
        while (booleanSupplier.getAsBoolean()) {
            int i2 = i;
            i--;
            if (i2 <= 0) {
                break;
            }
            try {
                supplier.get().createSubscription(str, str2, PushConfig.getDefaultInstance(), 60);
                return;
            } catch (ApiException e) {
                apiException = e;
                if (e.getStatusCode().getCode().equals(StatusCode.Code.ALREADY_EXISTS)) {
                    return;
                }
                if (!predicate.test(e)) {
                    throw e;
                }
                initialBackoffMs = sleepAndIncreaseBackoff(booleanSupplier, initialBackoffMs, backoffConfig);
            }
        }
        throw new RuntimeException(apiException);
    }

    public static boolean isApiExceptionRetryable(ApiException apiException) {
        return apiException.isRetryable() || RETRYABLE_STATUS_CODES.contains(Integer.valueOf(apiException.getStatusCode().getCode().getHttpStatusCode()));
    }

    public static SerializableFunction<PubSubMessage, StructuredRecord> getMappingFunction(GoogleSubscriberConfig googleSubscriberConfig) {
        return new PubSubStructuredRecordConverter(googleSubscriberConfig);
    }

    public static RetrySettings getRetrySettings() {
        return RetrySettings.newBuilder().setInitialRetryDelay(Duration.ofMillis(r0.getInitialBackoffMs())).setMaxRetryDelay(Duration.ofMillis(r0.getMaximumBackoffMs())).setRetryDelayMultiplier(BackoffConfig.defaultInstance().getBackoffFactor()).setMaxAttempts(5).build();
    }

    @Nullable
    public static Credentials createCredentials(String str, boolean z) {
        if (str == null) {
            return null;
        }
        try {
            return GCPUtils.loadServiceAccountCredentials(str, z);
        } catch (IOException e) {
            throw new RuntimeException("Error creating credentials from service account.", e);
        }
    }

    public static <T> T callWithRetry(Supplier<T> supplier, BackoffConfig backoffConfig, int i) throws Exception {
        int initialBackoffMs = backoffConfig.getInitialBackoffMs();
        ApiException apiException = null;
        while (true) {
            int i2 = i;
            i--;
            if (i2 <= 0) {
                throw new RuntimeException(apiException);
            }
            try {
                return supplier.get();
            } catch (ApiException e) {
                apiException = e;
                if (!isApiExceptionRetryable(e)) {
                    throw e;
                }
                initialBackoffMs = sleepAndIncreaseBackoff(() -> {
                    return true;
                }, initialBackoffMs, backoffConfig);
            }
        }
    }

    private static SubscriptionAdminClient buildSubscriptionAdminClient(Credentials credentials) throws IOException {
        SubscriptionAdminSettings.Builder newBuilder = SubscriptionAdminSettings.newBuilder();
        if (credentials != null) {
            newBuilder.setCredentialsProvider(FixedCredentialsProvider.create(credentials));
        }
        return SubscriptionAdminClient.create(newBuilder.build());
    }

    private static int sleepAndIncreaseBackoff(BooleanSupplier booleanSupplier, int i, BackoffConfig backoffConfig) throws InterruptedException {
        if (booleanSupplier.getAsBoolean()) {
            LOG.trace("Backoff - Sleeping for {} ms.", Integer.valueOf(i));
            Thread.sleep(i);
        }
        return calculateUpdatedBackoff(i, backoffConfig);
    }

    private static int calculateUpdatedBackoff(int i, BackoffConfig backoffConfig) {
        return Math.min((int) (i * backoffConfig.getBackoffFactor()), backoffConfig.getMaximumBackoffMs());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1701292147:
                if (implMethodName.equals("lambda$getInputDStream$fa888992$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("io/cdap/plugin/gcp/publisher/source/PubSubSubscriberUtil") && serializedLambda.getImplMethodSignature().equals("(Lio/cdap/plugin/gcp/publisher/source/SerializableFunction;Lio/cdap/plugin/gcp/publisher/source/PubSubMessage;)Ljava/lang/Object;")) {
                    SerializableFunction serializableFunction = (SerializableFunction) serializedLambda.getCapturedArg(0);
                    return pubSubMessage -> {
                        return serializableFunction.apply(pubSubMessage);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
