/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.base.source.reader.mocks;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
import org.apache.flink.connector.base.source.reader.mocks.MockSourceReader;
import org.apache.flink.connector.base.source.reader.mocks.MockSplitEnumerator;
import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.util.InstantiationUtil;

public class MockBaseSource
implements Source<Integer, MockSourceSplit, List<MockSourceSplit>> {
    private static final long serialVersionUID = 4445067705639284175L;
    private final int numSplits;
    private final int numRecordsPerSplit;
    private final int startingValue;
    private final Boundedness boundedness;

    public MockBaseSource(int numSplits, int numRecordsPerSplit, Boundedness boundedness) {
        this(numSplits, numRecordsPerSplit, 0, boundedness);
    }

    public MockBaseSource(int numSplits, int numRecordsPerSplit, int startingValue, Boundedness boundedness) {
        this.numSplits = numSplits;
        this.numRecordsPerSplit = numRecordsPerSplit;
        this.startingValue = startingValue;
        this.boundedness = boundedness;
    }

    public Boundedness getBoundedness() {
        return this.boundedness;
    }

    public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext readerContext) {
        FutureCompletingBlockingQueue elementsQueue = new FutureCompletingBlockingQueue();
        Configuration config = new Configuration();
        config.setInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1);
        config.setLong(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT, 30000L);
        MockSplitReader.Builder builder = MockSplitReader.newBuilder().setNumRecordsPerSplitPerFetch(2).setBlockingFetch(true);
        return new MockSourceReader((FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>>)elementsQueue, builder::build, config, readerContext);
    }

    public SplitEnumerator<MockSourceSplit, List<MockSourceSplit>> createEnumerator(SplitEnumeratorContext<MockSourceSplit> enumContext) {
        ArrayList<MockSourceSplit> splits = new ArrayList<MockSourceSplit>();
        for (int i = 0; i < this.numSplits; ++i) {
            int endIndex = this.boundedness == Boundedness.BOUNDED ? this.numRecordsPerSplit : Integer.MAX_VALUE;
            MockSourceSplit split = new MockSourceSplit(i, 0, endIndex);
            for (int j = 0; j < this.numRecordsPerSplit; ++j) {
                split.addRecord(this.startingValue + i * this.numRecordsPerSplit + j);
            }
            splits.add(split);
        }
        return new MockSplitEnumerator(splits, enumContext);
    }

    public SplitEnumerator<MockSourceSplit, List<MockSourceSplit>> restoreEnumerator(SplitEnumeratorContext<MockSourceSplit> enumContext, List<MockSourceSplit> checkpoint) throws IOException {
        return new MockSplitEnumerator(checkpoint, enumContext);
    }

    public SimpleVersionedSerializer<MockSourceSplit> getSplitSerializer() {
        return new MockSourceSplitSerializer();
    }

    public SimpleVersionedSerializer<List<MockSourceSplit>> getEnumeratorCheckpointSerializer() {
        return new SimpleVersionedSerializer<List<MockSourceSplit>>(){

            public int getVersion() {
                return 0;
            }

            public byte[] serialize(List<MockSourceSplit> obj) throws IOException {
                return InstantiationUtil.serializeObject((Object)obj.toArray(new MockSourceSplit[obj.size()]));
            }

            public List<MockSourceSplit> deserialize(int version, byte[] serialized) throws IOException {
                MockSourceSplit[] splitArray;
                try {
                    splitArray = (MockSourceSplit[])InstantiationUtil.deserializeObject((byte[])serialized, (ClassLoader)this.getClass().getClassLoader());
                }
                catch (ClassNotFoundException e) {
                    throw new IOException("Failed to deserialize the source split.");
                }
                return new ArrayList<MockSourceSplit>(Arrays.asList(splitArray));
            }
        };
    }
}

