package io.cdap.plugin.gcp.publisher.source;

import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.auth.Credentials;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.common.annotations.VisibleForTesting;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.ReceivedMessage;
import com.google.pubsub.v1.TopicName;
import io.cdap.plugin.gcp.common.GCPUtils;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/cdap/plugin/gcp/publisher/source/PubSubReceiver.class */
public class PubSubReceiver extends Receiver<PubSubMessage> {
    private static final Logger LOG = LoggerFactory.getLogger(PubSubReceiver.class);
    private static final String CREATE_SUBSCRIPTION_ERROR_MSG = "Failed to create subscription '%s'.";
    private static final String CREATE_SUBSCRIPTION_ADMIN_CLIENT_ERROR_MSG = "Failed to create subscription client to manage subscription '%s'.";
    private static final String CREATE_SUBSCRIPTION_RETRY_ERROR_MSG = "Failed to create subscription '%s' after 5 attempts";
    private static final String MISSING_TOPIC_ERROR_MSG = "Failed to create subscription. Topic '%s' was not found in project '%s'.";
    private static final String SUBSCRIBER_ERROR_MSG = "Failed to create subscriber using subscription '%s' for project '%s'.";
    private static final String FETCH_ERROR_MSG = "Failed to fetch new messages using subscription '%s' for project '%s'.";
    private static final String INTERRUPTED_EXCEPTION_MSG = "Interrupted Exception when sleeping during backoff.";
    private final PubSubSubscriberConfig config;
    private final boolean autoAcknowledge;
    private final BackoffConfig backoffConfig;
    private int previousFetchRate;
    private transient String project;
    private transient String topic;
    private transient String subscription;
    private transient Credentials credentials;
    private transient ScheduledThreadPoolExecutor executor;
    private transient SubscriberStub subscriber;
    private transient AtomicInteger bucket;

    /* loaded from: input_file:io/cdap/plugin/gcp/publisher/source/PubSubReceiver$BackoffConfigBuilder.class */
    public static class BackoffConfigBuilder implements Serializable {
        public int initialBackoffMs = 100;
        public int maximumBackoffMs = 10000;
        public double backoffFactor = 2.0d;

        protected BackoffConfigBuilder() {
        }

        public static BackoffConfigBuilder getInstance() {
            return new BackoffConfigBuilder();
        }

        public BackoffConfig build() {
            if (this.initialBackoffMs > this.maximumBackoffMs) {
                throw new IllegalArgumentException("Maximum backoff cannot be smaller than Initial backoff");
            }
            return new BackoffConfig(this.initialBackoffMs, this.maximumBackoffMs, this.backoffFactor);
        }

        public int getInitialBackoffMs() {
            return this.initialBackoffMs;
        }

        public BackoffConfigBuilder setInitialBackoffMs(int i) {
            this.initialBackoffMs = i;
            return this;
        }

        public int getMaximumBackoffMs() {
            return this.maximumBackoffMs;
        }

        public BackoffConfigBuilder setMaximumBackoffMs(int i) {
            this.maximumBackoffMs = i;
            return this;
        }

        public double getBackoffFactor() {
            return this.backoffFactor;
        }

        public BackoffConfigBuilder setBackoffFactor(int i) {
            this.backoffFactor = i;
            return this;
        }
    }

    /* loaded from: input_file:io/cdap/plugin/gcp/publisher/source/PubSubReceiver$LoggingRejectedExecutionHandler.class */
    protected static class LoggingRejectedExecutionHandler implements RejectedExecutionHandler {
        protected LoggingRejectedExecutionHandler() {
        }

        @Override // java.util.concurrent.RejectedExecutionHandler
        public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
            PubSubReceiver.LOG.error("Thread Pool rejected execution of a task.");
        }
    }

    public PubSubReceiver(PubSubSubscriberConfig pubSubSubscriberConfig, boolean z, StorageLevel storageLevel) {
        this(pubSubSubscriberConfig, z, storageLevel, BackoffConfig.defaultInstance());
    }

    public PubSubReceiver(PubSubSubscriberConfig pubSubSubscriberConfig, boolean z, StorageLevel storageLevel, BackoffConfig backoffConfig) {
        super(storageLevel);
        this.previousFetchRate = -1;
        this.config = pubSubSubscriberConfig;
        this.autoAcknowledge = z;
        this.backoffConfig = backoffConfig;
    }

    @VisibleForTesting
    public PubSubReceiver(String str, String str2, String str3, Credentials credentials, boolean z, StorageLevel storageLevel, BackoffConfig backoffConfig, ScheduledThreadPoolExecutor scheduledThreadPoolExecutor, SubscriberStub subscriberStub, AtomicInteger atomicInteger) {
        super(storageLevel);
        this.previousFetchRate = -1;
        this.backoffConfig = backoffConfig;
        this.project = str;
        this.topic = str2;
        this.subscription = str3;
        this.autoAcknowledge = z;
        this.credentials = credentials;
        this.executor = scheduledThreadPoolExecutor;
        this.subscriber = subscriberStub;
        this.bucket = atomicInteger;
        this.config = null;
    }

    public void onStart() {
        this.executor = new ScheduledThreadPoolExecutor(3, new LoggingRejectedExecutionHandler());
        this.executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        this.executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        this.executor.setRemoveOnCancelPolicy(true);
        this.bucket = new AtomicInteger();
        this.project = this.config.getProject();
        this.subscription = ProjectSubscriptionName.format(this.config.getProject(), this.config.getSubscription());
        this.credentials = createCredentials();
        if (this.config.getTopic() != null) {
            this.topic = TopicName.format(this.config.getProject(), this.config.getTopic());
            createSubscription();
        }
        this.subscriber = createSubscriberClient();
        scheduleTasks();
        LOG.info("Receiver started execution");
    }

    public void onStop() {
        if (this.executor != null && !this.executor.isShutdown()) {
            this.executor.shutdown();
            try {
                this.executor.awaitTermination(30L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                LOG.error("InterruptedException while waiting for executor to shutdown.");
            }
        }
        if (this.subscriber != null && !this.subscriber.isShutdown()) {
            this.subscriber.shutdown();
            try {
                this.subscriber.awaitTermination(30L, TimeUnit.SECONDS);
            } catch (InterruptedException e2) {
                LOG.error("InterruptedException while waiting for subscriber to shutdown.");
            }
        }
        LOG.info("Receiver completed execution");
    }

    @Nullable
    protected Credentials createCredentials() {
        if (isStopped()) {
            return null;
        }
        try {
            if (this.config.getServiceAccount() == null) {
                return null;
            }
            return GCPUtils.loadServiceAccountCredentials(this.config.getServiceAccount(), this.config.isServiceAccountFilePath().booleanValue());
        } catch (IOException e) {
            stop("Unable to get credentials for receiver.", e);
            return null;
        }
    }

    protected void createSubscription() {
        if (isStopped()) {
            return;
        }
        try {
            SubscriptionAdminClient buildSubscriptionAdminClient = buildSubscriptionAdminClient();
            Throwable th = null;
            try {
                PubSubSubscriberUtil.createSubscription(() -> {
                    return !isStopped();
                }, this.backoffConfig, this.subscription, this.topic, () -> {
                    return buildSubscriptionAdminClient;
                }, this::isApiExceptionRetryable);
                if (buildSubscriptionAdminClient != null) {
                    if (0 != 0) {
                        try {
                            buildSubscriptionAdminClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        buildSubscriptionAdminClient.close();
                    }
                }
            } catch (Throwable th3) {
                if (buildSubscriptionAdminClient != null) {
                    if (0 != 0) {
                        try {
                            buildSubscriptionAdminClient.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        buildSubscriptionAdminClient.close();
                    }
                }
                throw th3;
            }
        } catch (ApiException e) {
            if (e.getStatusCode().getCode().equals(StatusCode.Code.NOT_FOUND)) {
                stop(String.format(MISSING_TOPIC_ERROR_MSG, this.topic, this.project), e);
            } else {
                stop(String.format(CREATE_SUBSCRIPTION_ERROR_MSG, this.subscription), e);
            }
        } catch (IOException e2) {
            stop(String.format(CREATE_SUBSCRIPTION_ADMIN_CLIENT_ERROR_MSG, this.subscription), e2);
        } catch (InterruptedException e3) {
            stop(INTERRUPTED_EXCEPTION_MSG, e3);
        } catch (RuntimeException e4) {
            stop(String.format(CREATE_SUBSCRIPTION_RETRY_ERROR_MSG, this.subscription), e4);
        }
    }

    @Nullable
    public SubscriberStub createSubscriberClient() {
        if (isStopped()) {
            return null;
        }
        try {
            return buildSubscriberClient();
        } catch (IOException e) {
            stop(String.format(SUBSCRIBER_ERROR_MSG, this.subscription, this.project), e);
            return null;
        }
    }

    public void scheduleTasks() {
        if (isStopped()) {
            return;
        }
        this.executor.scheduleAtFixedRate(this::updateMessageRateAndFillBucket, 0L, 1L, TimeUnit.SECONDS);
        this.executor.scheduleWithFixedDelay(this::receiveMessages, 100L, 100L, TimeUnit.MILLISECONDS);
    }

    protected void receiveMessages() {
        int initialBackoffMs = this.backoffConfig.getInitialBackoffMs();
        while (true) {
            int i = initialBackoffMs;
            if (isStopped()) {
                return;
            }
            try {
                fetchAndAck();
                return;
            } catch (ApiException e) {
                if (!isApiExceptionRetryable(e)) {
                    restart(String.format(FETCH_ERROR_MSG, this.subscription, this.project), e);
                    return;
                }
                initialBackoffMs = sleepAndIncreaseBackoff(i);
            }
        }
    }

    protected void fetchAndAck() {
        int i = this.bucket.get();
        if (i <= 0) {
            return;
        }
        List<ReceivedMessage> receivedMessagesList = this.subscriber.pullCallable().call(PullRequest.newBuilder().setMaxMessages(i).setSubscription(this.subscription).build()).getReceivedMessagesList();
        if (receivedMessagesList.isEmpty()) {
            return;
        }
        this.bucket.updateAndGet(i2 -> {
            return i2 - receivedMessagesList.size();
        });
        if (isStopped()) {
            LOG.trace("Receiver stopped before store and ack.");
            return;
        }
        List list = (List) receivedMessagesList.stream().map(PubSubMessage::new).collect(Collectors.toList());
        store(list.iterator());
        if (this.autoAcknowledge) {
            this.subscriber.acknowledgeCallable().call(AcknowledgeRequest.newBuilder().setSubscription(this.subscription).addAllAckIds((List) list.stream().map((v0) -> {
                return v0.getAckId();
            }).collect(Collectors.toList())).build());
        }
    }

    protected SubscriptionAdminSettings buildSubscriptionAdminSettings() throws IOException {
        SubscriptionAdminSettings.Builder newBuilder = SubscriptionAdminSettings.newBuilder();
        if (this.credentials != null) {
            newBuilder.setCredentialsProvider(FixedCredentialsProvider.create(this.credentials));
        }
        return newBuilder.build();
    }

    /* JADX WARN: Type inference failed for: r0v4, types: [com.google.cloud.pubsub.v1.stub.SubscriberStubSettings] */
    protected SubscriberStubSettings buildSubscriberSettings() throws IOException {
        SubscriberStubSettings.Builder newBuilder = SubscriberStubSettings.newBuilder();
        if (this.credentials != null) {
            newBuilder.setCredentialsProvider(FixedCredentialsProvider.create(this.credentials));
        }
        return newBuilder.build2();
    }

    protected SubscriptionAdminClient buildSubscriptionAdminClient() {
        try {
            return SubscriptionAdminClient.create(buildSubscriptionAdminSettings());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    protected SubscriberStub buildSubscriberClient() throws IOException {
        return GrpcSubscriberStub.create(buildSubscriberSettings());
    }

    protected int sleepAndIncreaseBackoff(int i) {
        try {
            if (!isStopped()) {
                LOG.trace("Backoff - Sleeping for {} ms.", Integer.valueOf(i));
                Thread.sleep(i);
            }
        } catch (InterruptedException e) {
            stop(INTERRUPTED_EXCEPTION_MSG, e);
        }
        return calculateUpdatedBackoff(i);
    }

    protected int calculateUpdatedBackoff(int i) {
        return Math.min((int) (i * this.backoffConfig.getBackoffFactor()), this.backoffConfig.getMaximumBackoffMs());
    }

    protected void updateMessageRateAndFillBucket() {
        int min = (int) Math.min(supervisor().getCurrentRateLimit(), 2147483647L);
        if (min != this.previousFetchRate) {
            this.previousFetchRate = min;
            LOG.trace("Receiver fetch rate is set to: {}", Integer.valueOf(min));
        }
        this.bucket.set(min);
    }

    protected boolean isApiExceptionRetryable(ApiException apiException) {
        return PubSubSubscriberUtil.isApiExceptionRetryable(apiException);
    }
}
