/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.sink.internal;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Deque;
import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Properties;
import java.util.TreeMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.kafka.sink.internal.CheckpointTransaction;
import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
import org.apache.flink.connector.kafka.sink.internal.ProducerPool;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.common.KafkaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
@Internal
public class ProducerPoolImpl
implements ProducerPool {
    private static final Logger LOG = LoggerFactory.getLogger(ProducerPoolImpl.class);
    private final Properties kafkaProducerConfig;
    private final Consumer<FlinkKafkaInternalProducer<byte[], byte[]>> producerInit;
    private final Deque<FlinkKafkaInternalProducer<byte[], byte[]>> producerPool = new ArrayDeque<FlinkKafkaInternalProducer<byte[], byte[]>>();
    private final Map<String, ProducerEntry> producerByTransactionalId = new TreeMap<String, ProducerEntry>();
    private final NavigableMap<CheckpointTransaction, String> transactionalIdsByCheckpoint = new TreeMap<CheckpointTransaction, String>(Comparator.comparing(CheckpointTransaction::getCheckpointId).thenComparing(CheckpointTransaction::getTransactionalId));

    public ProducerPoolImpl(Properties kafkaProducerConfig, Consumer<FlinkKafkaInternalProducer<byte[], byte[]>> producerInit, Collection<CheckpointTransaction> precommittedTransactions) {
        this.kafkaProducerConfig = (Properties)Preconditions.checkNotNull((Object)kafkaProducerConfig, (String)"kafkaProducerConfig must not be null");
        this.producerInit = (Consumer)Preconditions.checkNotNull(producerInit, (String)"producerInit must not be null");
        this.initPrecommittedTransactions(precommittedTransactions);
    }

    @Override
    public void recycleByTransactionId(String transactionalId, boolean success) {
        ProducerEntry producerEntry = this.producerByTransactionalId.remove(transactionalId);
        LOG.debug("Transaction {} finished, producer {}", (Object)transactionalId, (Object)producerEntry);
        if (producerEntry == null) {
            LOG.info("Received unmatched producer for transaction {}. This is expected during rescale.", (Object)transactionalId);
            return;
        }
        long finishedChkId = producerEntry.getCheckpointedTransaction().getCheckpointId();
        boolean hasTransactionsFromPreviousCheckpoint = ((CheckpointTransaction)this.transactionalIdsByCheckpoint.firstKey()).getCheckpointId() != finishedChkId;
        this.transactionalIdsByCheckpoint.remove(producerEntry.getCheckpointedTransaction());
        if (success) {
            this.recycleProducer(producerEntry.getProducer());
        } else {
            this.closeProducer(producerEntry.getProducer());
        }
        if (hasTransactionsFromPreviousCheckpoint) {
            Iterator iterator = this.transactionalIdsByCheckpoint.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry entry = iterator.next();
                if (((CheckpointTransaction)entry.getKey()).getCheckpointId() >= finishedChkId) continue;
                iterator.remove();
                this.closeProducer(this.producerByTransactionalId.remove(entry.getValue()).getProducer());
            }
        }
    }

    private void closeProducer(@Nullable FlinkKafkaInternalProducer<byte[], byte[]> producer) {
        if (producer != null) {
            producer.close();
        }
    }

    @Override
    public void recycle(FlinkKafkaInternalProducer<byte[], byte[]> producer) {
        this.recycleProducer(producer);
        ProducerEntry producerEntry = this.producerByTransactionalId.remove(producer.getTransactionalId());
        this.transactionalIdsByCheckpoint.remove(producerEntry.getCheckpointedTransaction());
    }

    private void recycleProducer(@Nullable FlinkKafkaInternalProducer<byte[], byte[]> producer) {
        if (producer == null) {
            return;
        }
        try {
            if (producer.isInTransaction()) {
                producer.commitTransaction();
            }
            this.producerPool.add(producer);
            LOG.debug("Recycling {}, new pool size {}", producer, (Object)this.producerPool.size());
        }
        catch (KafkaException e) {
            this.closeProducer(producer);
            LOG.debug("Encountered exception while double-committing, discarding producer {}: {}", producer, (Object)e);
        }
    }

    private void initPrecommittedTransactions(Collection<CheckpointTransaction> precommittedTransactions) {
        for (CheckpointTransaction transaction : precommittedTransactions) {
            this.transactionalIdsByCheckpoint.put(transaction, transaction.getTransactionalId());
            this.producerByTransactionalId.put(transaction.getTransactionalId(), new ProducerEntry(null, transaction));
        }
        LOG.debug("Initialized ongoing transactions from state {}", precommittedTransactions);
    }

    @Override
    public FlinkKafkaInternalProducer<byte[], byte[]> getTransactionalProducer(String transactionalId, long checkpointId) {
        FlinkKafkaInternalProducer<Object, Object> producer = this.producerPool.poll();
        if (producer == null) {
            producer = new FlinkKafkaInternalProducer(this.kafkaProducerConfig, transactionalId);
            this.producerInit.accept(producer);
        } else if (transactionalId != null) {
            producer.setTransactionId(transactionalId);
        }
        if (transactionalId != null) {
            CheckpointTransaction checkpointedTransaction = new CheckpointTransaction(transactionalId, checkpointId);
            ProducerEntry existing = this.producerByTransactionalId.put(transactionalId, new ProducerEntry(producer, checkpointedTransaction));
            this.transactionalIdsByCheckpoint.put(checkpointedTransaction, transactionalId);
            Preconditions.checkState((existing == null ? 1 : 0) != 0, (String)"Transaction %s already ongoing existing producer %s; new producer %s", (Object[])new Object[]{transactionalId, existing, producer});
            producer.initTransactions();
        }
        LOG.debug("getProducer {}, new pool size {}", producer, (Object)this.producerPool.size());
        return producer;
    }

    @Override
    public Collection<CheckpointTransaction> getOngoingTransactions() {
        return new ArrayList<CheckpointTransaction>(this.transactionalIdsByCheckpoint.keySet());
    }

    @VisibleForTesting
    public Collection<FlinkKafkaInternalProducer<byte[], byte[]>> getProducers() {
        return this.producerPool;
    }

    @Override
    public void close() throws Exception {
        LOG.debug("Closing used producers {} and free producers {}", this.producerByTransactionalId, this.producerPool);
        AutoCloseable[] autoCloseableArray = new AutoCloseable[4];
        autoCloseableArray[0] = () -> IOUtils.closeAll(this.producerPool);
        autoCloseableArray[1] = () -> IOUtils.closeAll((Iterable)this.producerByTransactionalId.values().stream().map(ProducerEntry::getProducer).filter(Objects::nonNull).collect(Collectors.toList()));
        autoCloseableArray[2] = this.producerPool::clear;
        autoCloseableArray[3] = this.producerByTransactionalId::clear;
        IOUtils.closeAll((AutoCloseable[])autoCloseableArray);
    }

    private static class ProducerEntry {
        @Nullable
        private final FlinkKafkaInternalProducer<byte[], byte[]> producer;
        private final CheckpointTransaction checkpointedTransaction;

        private ProducerEntry(@Nullable FlinkKafkaInternalProducer<byte[], byte[]> producer, CheckpointTransaction checkpointedTransaction) {
            this.producer = producer;
            this.checkpointedTransaction = (CheckpointTransaction)Preconditions.checkNotNull((Object)checkpointedTransaction, (String)"checkpointedTransaction must not be null");
        }

        public CheckpointTransaction getCheckpointedTransaction() {
            return this.checkpointedTransaction;
        }

        @Nullable
        public FlinkKafkaInternalProducer<byte[], byte[]> getProducer() {
            return this.producer;
        }

        public String toString() {
            if (this.producer != null) {
                return this.producer.toString();
            }
            return this.checkpointedTransaction.toString();
        }
    }
}

