package io.cdap.plugin.gcp.publisher.source;

import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.auth.Credentials;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.spark.TaskContext;
import org.apache.spark.streaming.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Iterator;

/* loaded from: input_file:io/cdap/plugin/gcp/publisher/source/PubSubRDDIterator.class */
public class PubSubRDDIterator implements Iterator<PubSubMessage> {
    private static final Logger LOG = LoggerFactory.getLogger(PubSubRDDIterator.class);
    private static final int MAX_MESSAGES = 1000;
    private static final int MAX_MESSAGE_SIZE = 20971520;
    private static final int RETRY_DELAY = 100;
    private final long startTime;
    private final PubSubSubscriberConfig config;
    private final TaskContext context;
    private final long batchDuration;
    private final String subscriptionFormatted;
    private final boolean autoAcknowledge;
    private final Queue<ReceivedMessage> receivedMessages = new ConcurrentLinkedDeque();
    private PullRequest pullRequest;
    private SubscriberStub subscriber;
    private long messageCount;

    public PubSubRDDIterator(PubSubSubscriberConfig pubSubSubscriberConfig, TaskContext taskContext, Time time, long j, boolean z) {
        this.config = pubSubSubscriberConfig;
        this.context = taskContext;
        this.batchDuration = j;
        this.startTime = time.milliseconds();
        this.autoAcknowledge = z;
        this.subscriptionFormatted = ProjectSubscriptionName.format(this.config.getProject(), this.config.getSubscription());
    }

    public boolean hasNext() {
        if (!this.receivedMessages.isEmpty()) {
            return true;
        }
        long currentTimeMillis = System.currentTimeMillis();
        long j = this.startTime + this.batchDuration;
        if (currentTimeMillis >= j) {
            LOG.debug("Time exceeded for batch. Total time is {} millis. Total messages returned is {} .", Long.valueOf(currentTimeMillis - this.startTime), Long.valueOf(this.messageCount));
            return false;
        }
        try {
            List<ReceivedMessage> fetchAndAck = fetchAndAck(j);
            if (fetchAndAck.isEmpty()) {
                LOG.debug("No more messages. Total messages returned is {} .", Long.valueOf(this.messageCount));
                return false;
            }
            this.receivedMessages.addAll(fetchAndAck);
            return true;
        } catch (IOException e) {
            throw new RuntimeException("Error reading messages from Pub/Sub. ", e);
        }
    }

    /* renamed from: next, reason: merged with bridge method [inline-methods] */
    public PubSubMessage m8647next() {
        if (this.receivedMessages.isEmpty()) {
            throw new IllegalStateException("Unexpected state. No messages available.");
        }
        ReceivedMessage poll = this.receivedMessages.poll();
        this.messageCount++;
        return new PubSubMessage(poll);
    }

    /* JADX WARN: Type inference failed for: r0v12, types: [com.google.cloud.pubsub.v1.stub.SubscriberStubSettings] */
    private SubscriberStub buildSubscriberClient() throws IOException {
        SubscriberStubSettings.Builder newBuilder = SubscriberStubSettings.newBuilder();
        Credentials createCredentials = PubSubSubscriberUtil.createCredentials(this.config.getServiceAccount(), this.config.isServiceAccountFilePath().booleanValue());
        if (createCredentials != null) {
            newBuilder.setCredentialsProvider(FixedCredentialsProvider.create(createCredentials));
        }
        newBuilder.setTransportChannelProvider(SubscriberStubSettings.defaultGrpcTransportProviderBuilder().setMaxInboundMessageSize(Integer.valueOf(MAX_MESSAGE_SIZE)).build());
        newBuilder.getSubscriptionSettings().setRetrySettings(PubSubSubscriberUtil.getRetrySettings());
        return GrpcSubscriberStub.create((SubscriberStubSettings) newBuilder.build2());
    }

    private List<ReceivedMessage> fetchAndAck(long j) throws IOException {
        if (this.subscriber == null) {
            this.subscriber = buildSubscriberClient();
            this.context.addTaskCompletionListener(taskContext -> {
                if (this.subscriber == null || this.subscriber.isShutdown()) {
                    return;
                }
                this.subscriber.shutdown();
                try {
                    this.subscriber.awaitTermination(30L, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    LOG.warn("Exception in shutting down subscriber. ", e);
                }
            });
            this.pullRequest = PullRequest.newBuilder().setMaxMessages(1000).setSubscription(this.subscriptionFormatted).build();
        }
        while (System.currentTimeMillis() < j) {
            List<ReceivedMessage> receivedMessagesList = this.subscriber.pullCallable().call(this.pullRequest).getReceivedMessagesList();
            if (!receivedMessagesList.isEmpty()) {
                ackMessages((List) receivedMessagesList.stream().map((v0) -> {
                    return v0.getAckId();
                }).collect(Collectors.toList()), this.autoAcknowledge, this.subscriptionFormatted);
                return receivedMessagesList;
            }
            try {
                TimeUnit.MILLISECONDS.sleep(100L);
            } catch (InterruptedException e) {
                LOG.debug("Interrupted while waiting for retry. ", e);
                return Collections.EMPTY_LIST;
            }
        }
        return Collections.EMPTY_LIST;
    }

    private void ackMessages(List<String> list, boolean z, String str) {
        if (z) {
            this.subscriber.acknowledgeCallable().call(AcknowledgeRequest.newBuilder().setSubscription(str).addAllAckIds(list).build());
        }
    }
}
