package io.cdap.plugin.gcp.datastore.sink;

import com.google.api.client.util.BackOff;
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.client.util.Sleeper;
import com.google.common.collect.MoreCollectors;
import com.google.datastore.v1.AllocateIdsRequest;
import com.google.datastore.v1.BeginTransactionRequest;
import com.google.datastore.v1.CommitRequest;
import com.google.datastore.v1.Entity;
import com.google.datastore.v1.Key;
import com.google.datastore.v1.client.Datastore;
import com.google.datastore.v1.client.DatastoreException;
import com.google.datastore.v1.client.DatastoreHelper;
import com.google.rpc.Code;
import io.cdap.plugin.gcp.datastore.sink.util.DatastoreSinkConstants;
import io.cdap.plugin.gcp.datastore.util.DatastoreUtil;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/cdap/plugin/gcp/datastore/sink/DatastoreRecordWriter.class */
public class DatastoreRecordWriter extends RecordWriter<NullWritable, Entity> {
    private static final Logger LOG = LoggerFactory.getLogger(DatastoreRecordWriter.class);
    private final Datastore datastore;
    private final int batchSize;
    private final boolean useAutogeneratedKey;
    private final boolean useTransactions;
    private CommitRequest.Builder builder;
    private int totalCount;
    private int numberOfRecordsInBatch;
    private String projectId;
    private Counter counter;
    private Sleeper sleeper;
    private BackOff flushBackoff;

    public DatastoreRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException {
        Configuration configuration = taskAttemptContext.getConfiguration();
        this.projectId = configuration.get(DatastoreSinkConstants.CONFIG_PROJECT);
        String str = configuration.get(DatastoreSinkConstants.CONFIG_SERVICE_ACCOUNT);
        Boolean valueOf = Boolean.valueOf(configuration.getBoolean(DatastoreSinkConstants.CONFIG_SERVICE_ACCOUNT_IS_FILE_PATH, true));
        this.batchSize = configuration.getInt(DatastoreSinkConstants.CONFIG_BATCH_SIZE, 25);
        this.useAutogeneratedKey = configuration.getBoolean(DatastoreSinkConstants.CONFIG_USE_AUTOGENERATED_KEY, false);
        this.useTransactions = configuration.getBoolean(DatastoreSinkConstants.CONFIG_USE_TRANSACTIONS, true);
        LOG.debug("Initialize RecordWriter(projectId={}, batchSize={}, useAutogeneratedKey={}, serviceAccount={})", new Object[]{this.projectId, Integer.valueOf(this.batchSize), Boolean.valueOf(this.useAutogeneratedKey), str});
        this.datastore = DatastoreUtil.getDatastoreV1(str, valueOf, this.projectId);
        this.totalCount = 0;
        this.numberOfRecordsInBatch = 0;
        this.builder = newCommitRequest();
        this.counter = taskAttemptContext.getCounter(FileOutputFormatCounter.BYTES_WRITTEN);
        this.sleeper = Sleeper.DEFAULT;
        this.flushBackoff = new ExponentialBackOff.Builder().setMaxIntervalMillis(DatastoreSinkConstants.FLUSH_MAX_BACKOFF_MILLIS).setInitialIntervalMillis(1000).setMaxElapsedTimeMillis(60000).setRandomizationFactor(0.25d).build();
    }

    private CommitRequest.Builder newCommitRequest() throws IOException {
        CommitRequest.Builder newBuilder = CommitRequest.newBuilder();
        newBuilder.setProjectId(this.projectId);
        if (this.useTransactions) {
            newBuilder.setMode(CommitRequest.Mode.TRANSACTIONAL);
            try {
                newBuilder.setTransaction(this.datastore.beginTransaction(BeginTransactionRequest.newBuilder().build()).getTransaction());
            } catch (DatastoreException e) {
                throw new IOException("Failed to begin datastore transaction", e);
            }
        } else {
            newBuilder.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
        }
        return newBuilder;
    }

    @Override // org.apache.hadoop.mapreduce.RecordWriter
    public void write(NullWritable nullWritable, Entity entity) throws IOException, InterruptedException {
        LOG.trace("RecordWriter write({})", entity);
        if (this.useAutogeneratedKey) {
            try {
                this.builder.addMutations(DatastoreHelper.makeInsert(Entity.newBuilder().setKey((Key) this.datastore.allocateIds(AllocateIdsRequest.newBuilder().setProjectId(this.projectId).addKeys(entity.getKey()).build()).getKeysList().stream().collect(MoreCollectors.onlyElement())).putAllProperties(entity.getPropertiesMap()).build()).build());
            } catch (DatastoreException e) {
                throw new IOException("Failed to allocate id", e);
            }
        } else {
            this.builder.addMutations(DatastoreHelper.makeUpsert(entity).build());
        }
        this.totalCount++;
        this.numberOfRecordsInBatch++;
        if (this.totalCount % this.batchSize == 0) {
            flush();
        }
    }

    @Override // org.apache.hadoop.mapreduce.RecordWriter
    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        flush();
        LOG.debug("Total number of values written to Cloud Datastore: {}", Integer.valueOf(this.totalCount));
    }

    private void flush() throws IOException, InterruptedException {
        if (this.numberOfRecordsInBatch > 0) {
            LOG.debug("Writing a batch of {} values to Cloud Datastore.", Integer.valueOf(this.numberOfRecordsInBatch));
            while (true) {
                try {
                    flushInternal();
                    this.builder = newCommitRequest();
                    this.numberOfRecordsInBatch = 0;
                    this.flushBackoff.reset();
                    return;
                } catch (DatastoreException e) {
                    long nextBackOffMillis = this.flushBackoff.nextBackOffMillis();
                    if (nextBackOffMillis == -1 || !isRetryable(e)) {
                        LOG.error("Datastore commit failed with code {}: {}", e.getCode(), e.toString());
                        throw new IOException("Datastore commit failed", e);
                    }
                    LOG.warn("Retrying flush after {} ms", Long.valueOf(nextBackOffMillis));
                    this.sleeper.sleep(nextBackOffMillis);
                }
            }
            LOG.error("Datastore commit failed with code {}: {}", e.getCode(), e.toString());
            throw new IOException("Datastore commit failed", e);
        }
    }

    private void flushInternal() throws DatastoreException {
        this.datastore.commit(this.builder.build());
        this.counter.increment(r0.getSerializedSize());
    }

    private boolean isRetryable(DatastoreException datastoreException) {
        return datastoreException.getCode() == Code.ABORTED || datastoreException.getCode() == Code.DEADLINE_EXCEEDED || datastoreException.getCode() == Code.RESOURCE_EXHAUSTED || datastoreException.getCode() == Code.UNAVAILABLE;
    }
}
