/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.base.source.reader.mocks;

import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsBySplits;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.assertj.core.api.Assertions;

public class MockSplitReader
implements SplitReader<int[], MockSourceSplit> {
    private final Map<String, MockSourceSplit> splits = new LinkedHashMap<String, MockSourceSplit>();
    private final int numRecordsPerSplitPerFetch;
    private final boolean separatedFinishedRecord;
    private final boolean blockingFetch;
    private final Object wakeupLock = new Object();
    private volatile Thread threadInBlocking;
    private boolean wokenUp;
    private Set<MockSourceSplit> pausedSplits = new HashSet<MockSourceSplit>();

    protected MockSplitReader(int numRecordsPerSplitPerFetch, boolean separatedFinishedRecord, boolean blockingFetch) {
        this.numRecordsPerSplitPerFetch = numRecordsPerSplitPerFetch;
        this.separatedFinishedRecord = separatedFinishedRecord;
        this.blockingFetch = blockingFetch;
    }

    public RecordsWithSplitIds<int[]> fetch() {
        return this.getRecords();
    }

    public void handleSplitsChanges(SplitsChange<MockSourceSplit> splitsChange) {
        if (!(splitsChange instanceof SplitsAddition)) {
            throw new IllegalArgumentException("Do not recognize split change: " + splitsChange);
        }
        splitsChange.splits().forEach(s -> this.splits.put(s.splitId(), (MockSourceSplit)s));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void wakeUp() {
        Object object = this.wakeupLock;
        synchronized (object) {
            this.wokenUp = true;
            if (this.threadInBlocking != null) {
                this.threadInBlocking.interrupt();
            }
        }
    }

    public void close() throws Exception {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private RecordsBySplits<int[]> getRecords() {
        RecordsBySplits.Builder records = new RecordsBySplits.Builder();
        Object object = this.wakeupLock;
        synchronized (object) {
            if (this.wokenUp) {
                this.wokenUp = false;
                return records.build();
            }
            this.threadInBlocking = Thread.currentThread();
        }
        try {
            Iterator<Map.Entry<String, MockSourceSplit>> iterator = this.splits.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<String, MockSourceSplit> entry = iterator.next();
                MockSourceSplit split = entry.getValue();
                if (this.pausedSplits.contains(split)) continue;
                boolean hasRecords = false;
                for (int i = 0; i < this.numRecordsPerSplitPerFetch && !split.isFinished(); ++i) {
                    int[] record = split.getNext(this.blockingFetch);
                    if (record == null) continue;
                    records.add(entry.getKey(), (Object)record);
                    hasRecords = true;
                }
                if (!split.isFinished()) continue;
                if (!this.separatedFinishedRecord) {
                    records.addFinishedSplit(entry.getKey());
                    iterator.remove();
                    continue;
                }
                if (hasRecords) continue;
                records.addFinishedSplit(entry.getKey());
                iterator.remove();
                break;
            }
        }
        catch (InterruptedException ie) {
            if (!this.blockingFetch) {
                throw new RuntimeException("Caught unexpected interrupted exception.");
            }
        }
        finally {
            Object object2 = this.wakeupLock;
            synchronized (object2) {
                this.wokenUp = false;
                Thread.interrupted();
                this.threadInBlocking = null;
            }
        }
        return records.build();
    }

    public void pauseOrResumeSplits(Collection<MockSourceSplit> splitsToPause, Collection<MockSourceSplit> splitsToResume) {
        if (!splitsToPause.isEmpty()) {
            Assertions.assertThat(this.pausedSplits).doesNotContainAnyElementsOf(splitsToPause);
        }
        this.pausedSplits.addAll(splitsToPause);
        Assertions.assertThat(this.pausedSplits).containsAll(splitsToResume);
        this.pausedSplits.removeAll(splitsToResume);
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    public static class Builder {
        protected int numRecordsPerSplitPerFetch = 2;
        protected boolean separatedFinishedRecord = false;
        protected boolean blockingFetch = false;

        protected Builder() {
        }

        protected Builder(Builder other) {
            this.numRecordsPerSplitPerFetch = other.numRecordsPerSplitPerFetch;
            this.separatedFinishedRecord = other.separatedFinishedRecord;
            this.blockingFetch = other.blockingFetch;
        }

        public Builder setNumRecordsPerSplitPerFetch(int numRecordsPerSplitPerFetch) {
            this.numRecordsPerSplitPerFetch = numRecordsPerSplitPerFetch;
            return this;
        }

        public Builder setSeparatedFinishedRecord(boolean separatedFinishedRecord) {
            this.separatedFinishedRecord = separatedFinishedRecord;
            return this;
        }

        public Builder setBlockingFetch(boolean blockingFetch) {
            this.blockingFetch = blockingFetch;
            return this;
        }

        public MockSplitReader build() {
            return new MockSplitReader(this.numRecordsPerSplitPerFetch, this.separatedFinishedRecord, this.blockingFetch);
        }
    }
}

