package io.cdap.plugin.gcp.publisher.source;

import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.NotFoundException;
import com.google.auth.Credentials;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.pubsub.v1.CreateSnapshotRequest;
import com.google.pubsub.v1.ProjectSnapshotName;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.SeekRequest;
import com.google.pubsub.v1.Snapshot;
import com.google.pubsub.v1.TopicName;
import io.cdap.cdap.etl.api.streaming.StreamingContext;
import io.cdap.cdap.etl.api.streaming.StreamingEventHandler;
import java.io.IOException;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.dstream.InputDStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.reflect.ClassTag$;

/* loaded from: input_file:io/cdap/plugin/gcp/publisher/source/PubSubDirectDStream.class */
public class PubSubDirectDStream<T> extends InputDStream<T> implements StreamingEventHandler {
    private static final Logger LOG = LoggerFactory.getLogger(PubSubDirectDStream.class);
    private static final String CDAP_PIPELINE = "cdap_pipeline";
    private static final int MAX_SNAPSHOT_ATTEMPTS = 3;
    private final Credentials credentials;
    private final PubSubSubscriberConfig config;
    private final long readDuration;
    private final StreamingContext context;
    private final boolean autoAcknowledge;
    private final SerializableFunction<PubSubMessage, T> mappingFn;
    private final org.apache.spark.streaming.StreamingContext streamingContext;
    private final String pipeline;
    private final BackoffConfig backoffConfig;
    private SubscriptionAdminClient subscriptionAdminClient;
    private ProjectSnapshotName currentSnapshotName;
    private boolean takeSnapshot;

    public PubSubDirectDStream(StreamingContext streamingContext, PubSubSubscriberConfig pubSubSubscriberConfig, long j, boolean z, SerializableFunction<PubSubMessage, T> serializableFunction) {
        super(streamingContext.getSparkStreamingContext().ssc(), ClassTag$.MODULE$.apply(PubSubMessage.class));
        this.streamingContext = streamingContext.getSparkStreamingContext().ssc();
        this.config = pubSubSubscriberConfig;
        this.readDuration = j;
        this.context = streamingContext;
        this.autoAcknowledge = z;
        this.mappingFn = serializableFunction;
        this.pipeline = streamingContext.getPipelineName();
        this.credentials = PubSubSubscriberUtil.createCredentials(pubSubSubscriberConfig.getServiceAccount(), pubSubSubscriberConfig.isServiceAccountFilePath().booleanValue());
        this.backoffConfig = BackoffConfig.defaultInstance();
    }

    public Option<RDD<T>> compute(Time time) {
        LOG.debug("Computing RDD for time {}.", time);
        return Option.apply(new PubSubRDD(this.streamingContext.sparkContext(), time, this.readDuration, this.config, this.autoAcknowledge).map(this.mappingFn, ClassTag$.MODULE$.apply(PubSubMessage.class)));
    }

    public void start() {
        try {
            this.subscriptionAdminClient = buildSubscriptionAdminClient(this.credentials);
            if (this.config.getTopic() != null) {
                try {
                    createSubscriptionIfNotPresent();
                } catch (IOException | InterruptedException e) {
                    throw new RuntimeException("Subscription creation failed.", e);
                }
            }
            this.currentSnapshotName = fetchSnapShot(this.config.getSubscription(), this.context);
            if (this.currentSnapshotName == null) {
                this.takeSnapshot = true;
            } else {
                seekSnapshot(this.currentSnapshotName, ProjectSubscriptionName.of(this.config.getProject(), this.config.getSubscription()));
                this.takeSnapshot = false;
            }
        } catch (IOException e2) {
            throw new RuntimeException("SubscriptionAdminClient creation failed.", e2);
        }
    }

    public void stop() {
        if (this.subscriptionAdminClient == null) {
            return;
        }
        this.subscriptionAdminClient.close();
    }

    public void onBatchStarted(StreamingContext streamingContext) {
        LOG.debug("Starting a batch.");
        if (this.takeSnapshot) {
            ProjectSnapshotName of = ProjectSnapshotName.of(this.config.getProject(), generateName(this.config.getSubscription()));
            try {
                saveSnapshotAsState(createSnapshot(of, ProjectSubscriptionName.of(this.config.getProject(), this.config.getSubscription())), this.config.getSubscription(), this.context);
                this.currentSnapshotName = of;
            } catch (IOException e) {
                deleteSnapshot(of);
                throw new RuntimeException("Error while saving state.", e);
            }
        }
    }

    public void onBatchRetry(StreamingContext streamingContext) {
        LOG.debug("Batch is about to be retried. Seeking to snapshot {} for the current batch.", this.currentSnapshotName);
        seekSnapshot(this.currentSnapshotName, ProjectSubscriptionName.of(this.config.getProject(), this.config.getSubscription()));
    }

    public void onBatchCompleted(StreamingContext streamingContext) {
        LOG.debug("Batch completed called.");
        try {
            streamingContext.deleteState(this.config.getSubscription());
            deleteSnapshot(this.currentSnapshotName);
            this.takeSnapshot = true;
            LOG.debug("Batch completed successfully. Deleted snapshot {} for the current batch.", this.currentSnapshotName);
        } catch (IOException e) {
            throw new RuntimeException("Deleting state failed. ", e);
        }
    }

    private String generateName(String str) {
        String uuid = UUID.randomUUID().toString();
        int length = (255 - uuid.length()) - 1;
        return String.format("%s-%s", str.length() > length ? str.substring(0, length) : str, uuid);
    }

    private void createSubscriptionIfNotPresent() throws IOException, InterruptedException {
        PubSubSubscriberUtil.createSubscription(() -> {
            return true;
        }, this.backoffConfig, ProjectSubscriptionName.format(this.config.getProject(), this.config.getSubscription()), TopicName.format(this.config.getProject(), this.config.getTopic()), () -> {
            return this.subscriptionAdminClient;
        }, PubSubSubscriberUtil::isApiExceptionRetryable);
    }

    private void seekSnapshot(ProjectSnapshotName projectSnapshotName, ProjectSubscriptionName projectSubscriptionName) {
        try {
            this.subscriptionAdminClient.seek(SeekRequest.newBuilder().setSnapshot(projectSnapshotName.toString()).setSubscription(projectSubscriptionName.toString()).build());
        } catch (NotFoundException e) {
            throw new RuntimeException(String.format("Saved snapshot %s not found. Please clear the application state to proceed. REST api for state deletion is namespaces/{namespace-id}/apps/{app-id}/state .", projectSnapshotName.toString()), e);
        }
    }

    @Nullable
    private ProjectSnapshotName fetchSnapShot(String str, StreamingContext streamingContext) {
        try {
            Optional state = streamingContext.getState(str);
            if (!state.isPresent()) {
                LOG.debug("No saved state for {}.", str);
                return null;
            }
            try {
                Snapshot parseFrom = Snapshot.parseFrom((byte[]) state.get());
                LOG.debug("Found existing snapshot {} .", parseFrom.getName());
                return ProjectSnapshotName.parse(parseFrom.getName());
            } catch (InvalidProtocolBufferException e) {
                throw new RuntimeException(String.format("Error parsing saved state for subscription %s.", str), e);
            }
        } catch (IOException e2) {
            throw new RuntimeException(String.format("Error fetching saved state for subscription %s.", str), e2);
        }
    }

    private Snapshot createSnapshot(ProjectSnapshotName projectSnapshotName, ProjectSubscriptionName projectSubscriptionName) {
        LOG.debug("Creating snapshot {} for subscription {} in Pub/Sub .", projectSnapshotName.toString(), projectSubscriptionName.toString());
        try {
            return (Snapshot) PubSubSubscriberUtil.callWithRetry(() -> {
                return this.subscriptionAdminClient.createSnapshot(CreateSnapshotRequest.newBuilder().setName(projectSnapshotName.toString()).setSubscription(projectSubscriptionName.toString()).putAllLabels(Collections.singletonMap(CDAP_PIPELINE, getLabelValue(this.pipeline))).build());
            }, this.backoffConfig, 3);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @VisibleForTesting
    static String getLabelValue(String str) {
        String lowerCase = str.toLowerCase();
        if (lowerCase.length() > 63) {
            lowerCase = lowerCase.substring(0, 63);
            LOG.debug("Trimming pipeline name to 63 chars to add as label for snapshot.");
        }
        return lowerCase;
    }

    private void deleteSnapshot(ProjectSnapshotName projectSnapshotName) {
        try {
            PubSubSubscriberUtil.callWithRetry(() -> {
                this.subscriptionAdminClient.deleteSnapshot(projectSnapshotName);
                return null;
            }, this.backoffConfig, 3);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void saveSnapshotAsState(Snapshot snapshot, String str, StreamingContext streamingContext) throws IOException {
        LOG.debug("Saving snapshot {} in state .", snapshot.getName());
        streamingContext.saveState(str, snapshot.toByteArray());
    }

    private SubscriptionAdminClient buildSubscriptionAdminClient(Credentials credentials) throws IOException {
        SubscriptionAdminSettings.Builder newBuilder = SubscriptionAdminSettings.newBuilder();
        if (credentials != null) {
            newBuilder.setCredentialsProvider(FixedCredentialsProvider.create(credentials));
        }
        RetrySettings retrySettings = PubSubSubscriberUtil.getRetrySettings();
        newBuilder.seekSettings().setRetrySettings(retrySettings);
        newBuilder.createSnapshotSettings().setRetrySettings(retrySettings);
        newBuilder.deleteSnapshotSettings().setRetrySettings(retrySettings);
        return SubscriptionAdminClient.create(newBuilder.build());
    }
}
