/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.functions.table.fullcache.inputformat;

import java.io.IOException;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.generated.Projection;
import org.apache.flink.types.Row;
import org.assertj.core.api.Assertions;

public class FullCacheTestInputFormat
extends RichInputFormat<RowData, QueueInputSplit> {
    public static final AtomicInteger OPEN_CLOSED_COUNTER = new AtomicInteger(0);
    private static final int DEFAULT_NUM_SPLITS = 2;
    private static final int DEFAULT_DELTA_NUM_SPLITS = 0;
    private final Collection<Row> dataRows;
    private final DataFormatConverters.RowConverter rowConverter;
    private final GeneratedProjection generatedProjection;
    private final boolean projectable;
    private final int numSplits;
    private final int deltaNumSplits;
    private transient ConcurrentLinkedQueue<RowData> queue;
    private transient Projection<RowData, GenericRowData> projection;
    private int loadCounter;
    private int maxReadRecords;
    private int readRecords;
    private int numOpens;

    public FullCacheTestInputFormat(Collection<Row> dataRows, Optional<GeneratedProjection> generatedProjection, DataFormatConverters.RowConverter rowConverter, int numSplits, int deltaNumSplits) {
        this.dataRows = dataRows;
        this.projectable = generatedProjection.isPresent();
        this.generatedProjection = generatedProjection.orElse(null);
        this.rowConverter = rowConverter;
        this.numSplits = numSplits;
        this.deltaNumSplits = deltaNumSplits;
    }

    public FullCacheTestInputFormat(Collection<Row> dataRows, Optional<GeneratedProjection> generatedProjection, DataFormatConverters.RowConverter rowConverter) {
        this(dataRows, generatedProjection, rowConverter, 2, 0);
    }

    public QueueInputSplit[] createInputSplits(int minNumSplits) throws IOException {
        int delta = this.loadCounter > 0 ? this.deltaNumSplits : 0;
        int currentSplits = this.numSplits + delta;
        ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
        QueueInputSplit[] splits = new QueueInputSplit[currentSplits];
        IntStream.range(0, currentSplits).forEach(i -> {
            splits[i] = new QueueInputSplit(queue, i);
        });
        this.dataRows.forEach(row -> queue.add(this.rowConverter.toInternal(row)));
        ++this.loadCounter;
        this.maxReadRecords = (int)Math.ceil((double)queue.size() / (double)currentSplits);
        return splits;
    }

    public void openInputFormat() {
        ++this.numOpens;
        OPEN_CLOSED_COUNTER.incrementAndGet();
    }

    public void open(QueueInputSplit split) throws IOException {
        this.queue = split.getQueue();
        if (this.projectable) {
            this.projection = (Projection)this.generatedProjection.newInstance(Thread.currentThread().getContextClassLoader());
        }
        this.readRecords = 0;
        ++this.numOpens;
        OPEN_CLOSED_COUNTER.incrementAndGet();
    }

    public boolean reachedEnd() throws IOException {
        return this.queue.isEmpty();
    }

    public RowData nextRecord(RowData reuse) throws IOException {
        Assertions.assertThat((int)this.numOpens).isEqualTo(2);
        if (this.readRecords == this.maxReadRecords) {
            return null;
        }
        ++this.readRecords;
        RowData rowData = this.queue.poll();
        if (rowData != null && this.projectable) {
            return this.projection.apply(rowData);
        }
        return rowData;
    }

    public InputSplitAssigner getInputSplitAssigner(QueueInputSplit[] inputSplits) {
        return new DefaultInputSplitAssigner((InputSplit[])inputSplits);
    }

    public void configure(Configuration parameters) {
    }

    public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
        return null;
    }

    public void close() throws IOException {
        OPEN_CLOSED_COUNTER.decrementAndGet();
    }

    public void closeInputFormat() {
        OPEN_CLOSED_COUNTER.decrementAndGet();
    }

    public static class QueueInputSplit
    implements InputSplit {
        private final transient ConcurrentLinkedQueue<RowData> queue;
        private final int splitNumber;

        public QueueInputSplit(ConcurrentLinkedQueue<RowData> queue, int splitNumber) {
            this.queue = queue;
            this.splitNumber = splitNumber;
        }

        public int getSplitNumber() {
            return this.splitNumber;
        }

        public ConcurrentLinkedQueue<RowData> getQueue() {
            return this.queue;
        }
    }
}

