/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.mongodb.sink.writer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
import org.apache.flink.connector.mongodb.sink.writer.context.DefaultMongoSinkContext;
import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
import org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.mongodb.shaded.com.mongodb.MongoException;
import org.apache.flink.mongodb.shaded.com.mongodb.client.MongoClient;
import org.apache.flink.mongodb.shaded.com.mongodb.client.MongoClients;
import org.apache.flink.mongodb.shaded.com.mongodb.client.model.WriteModel;
import org.apache.flink.mongodb.shaded.org.bson.BsonDocument;
import org.apache.flink.util.Collector;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class MongoWriter<IN>
implements SinkWriter<IN> {
    private static final Logger LOG = LoggerFactory.getLogger(MongoWriter.class);
    private final MongoConnectionOptions connectionOptions;
    private final MongoWriteOptions writeOptions;
    private final MongoSerializationSchema<IN> serializationSchema;
    private final MongoSinkContext sinkContext;
    private final MailboxExecutor mailboxExecutor;
    private final boolean flushOnCheckpoint;
    private final List<WriteModel<BsonDocument>> bulkRequests = new ArrayList<WriteModel<BsonDocument>>();
    private final Collector<WriteModel<BsonDocument>> collector;
    private final Counter numRecordsOut;
    private final MongoClient mongoClient;
    private boolean checkpointInProgress = false;
    private volatile long lastSendTime = 0L;
    private volatile long ackTime = Long.MAX_VALUE;

    public MongoWriter(MongoConnectionOptions connectionOptions, MongoWriteOptions writeOptions, boolean flushOnCheckpoint, Sink.InitContext initContext, MongoSerializationSchema<IN> serializationSchema) {
        this.connectionOptions = (MongoConnectionOptions)Preconditions.checkNotNull((Object)connectionOptions);
        this.writeOptions = (MongoWriteOptions)Preconditions.checkNotNull((Object)writeOptions);
        this.serializationSchema = (MongoSerializationSchema)Preconditions.checkNotNull(serializationSchema);
        this.flushOnCheckpoint = flushOnCheckpoint;
        Preconditions.checkNotNull((Object)initContext);
        this.mailboxExecutor = (MailboxExecutor)Preconditions.checkNotNull((Object)initContext.getMailboxExecutor());
        SinkWriterMetricGroup metricGroup = (SinkWriterMetricGroup)Preconditions.checkNotNull((Object)initContext.metricGroup());
        metricGroup.setCurrentSendTimeGauge(() -> this.ackTime - this.lastSendTime);
        this.numRecordsOut = metricGroup.getNumRecordsSendCounter();
        this.collector = new ListCollector(this.bulkRequests);
        this.sinkContext = new DefaultMongoSinkContext(initContext, writeOptions);
        try {
            SerializationSchema.InitializationContext initializationContext = initContext.asSerializationSchemaInitializationContext();
            serializationSchema.open(initializationContext, this.sinkContext, writeOptions);
        }
        catch (Exception e) {
            throw new FlinkRuntimeException("Failed to open the MongoEmitter", (Throwable)e);
        }
        this.mongoClient = MongoClients.create(connectionOptions.getUri());
    }

    public void write(IN element, SinkWriter.Context context) throws IOException, InterruptedException {
        while (this.checkpointInProgress) {
            this.mailboxExecutor.yield();
        }
        WriteModel<BsonDocument> writeModel = this.serializationSchema.serialize(element, this.sinkContext);
        this.numRecordsOut.inc();
        this.collector.collect(writeModel);
        if (this.isOverMaxBatchSizeLimit() || this.isOverMaxBatchIntervalLimit()) {
            this.doBulkWrite();
        }
    }

    public void flush(boolean endOfInput) throws IOException {
        this.checkpointInProgress = true;
        while (!this.bulkRequests.isEmpty() && (this.flushOnCheckpoint || endOfInput)) {
            this.doBulkWrite();
        }
        this.checkpointInProgress = false;
    }

    public void close() {
        this.mongoClient.close();
    }

    @VisibleForTesting
    void doBulkWrite() throws IOException {
        if (this.bulkRequests.isEmpty()) {
            return;
        }
        int maxRetries = this.writeOptions.getMaxRetries();
        long retryIntervalMs = this.writeOptions.getRetryIntervalMs();
        for (int i = 0; i <= maxRetries; ++i) {
            try {
                this.lastSendTime = System.currentTimeMillis();
                this.mongoClient.getDatabase(this.connectionOptions.getDatabase()).getCollection(this.connectionOptions.getCollection(), BsonDocument.class).bulkWrite(this.bulkRequests);
                this.ackTime = System.currentTimeMillis();
                this.bulkRequests.clear();
                break;
            }
            catch (MongoException e) {
                LOG.debug("Bulk Write to MongoDB failed, retry times = {}", (Object)i, (Object)e);
                if (i >= maxRetries) {
                    LOG.error("Bulk Write to MongoDB failed", (Throwable)e);
                    throw new IOException(e);
                }
                try {
                    Thread.sleep(retryIntervalMs * (long)(i + 1));
                    continue;
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    throw new IOException("Unable to flush; interrupted while doing another attempt", e);
                }
            }
        }
    }

    private boolean isOverMaxBatchSizeLimit() {
        int bulkActions = this.writeOptions.getBatchSize();
        return bulkActions != -1 && this.bulkRequests.size() >= bulkActions;
    }

    private boolean isOverMaxBatchIntervalLimit() {
        long bulkFlushInterval = this.writeOptions.getBatchIntervalMs();
        long lastSentInterval = System.currentTimeMillis() - this.lastSendTime;
        return bulkFlushInterval != -1L && lastSentInterval >= bulkFlushInterval;
    }
}

