/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.base.source.hybrid;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
import org.apache.flink.connector.base.source.hybrid.HybridSource;
import org.apache.flink.connector.base.source.hybrid.HybridSourceEnumeratorState;
import org.apache.flink.connector.base.source.hybrid.HybridSourceSplit;
import org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator;
import org.apache.flink.connector.base.source.hybrid.SourceReaderFinishedEvent;
import org.apache.flink.connector.base.source.hybrid.SwitchSourceEvent;
import org.apache.flink.connector.base.source.hybrid.SwitchedSources;
import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
import org.apache.flink.connector.base.source.reader.mocks.MockSplitEnumerator;
import org.apache.flink.mock.Whitebox;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

class HybridSourceSplitEnumeratorTest {
    private static final int SUBTASK0 = 0;
    private static final int SUBTASK1 = 1;
    private static final MockBaseSource MOCK_SOURCE = new MockBaseSource(1, 1, Boundedness.BOUNDED);
    private HybridSource<Integer> source;
    private MockSplitEnumeratorContext<HybridSourceSplit> context;
    private HybridSourceSplitEnumerator enumerator;
    private HybridSourceSplit splitFromSource0;
    private HybridSourceSplit splitFromSource1;

    HybridSourceSplitEnumeratorTest() {
    }

    private void setupEnumeratorAndTriggerSourceSwitch() {
        this.context = new MockSplitEnumeratorContext(2);
        this.source = HybridSource.builder((Source)MOCK_SOURCE).addSource((Source)MOCK_SOURCE).build();
        this.enumerator = (HybridSourceSplitEnumerator)this.source.createEnumerator(this.context);
        this.enumerator.start();
        HybridSourceSplitEnumeratorTest.registerReader(this.context, this.enumerator, 0);
        Assertions.assertThat((List)this.context.getSplitsAssignmentSequence()).isEmpty();
        HybridSourceSplitEnumeratorTest.registerReader(this.context, this.enumerator, 1);
        Assertions.assertThat((List)this.context.getSplitsAssignmentSequence()).isEmpty();
        this.enumerator.handleSourceEvent(0, (SourceEvent)new SourceReaderFinishedEvent(-1));
        Assertions.assertThat((List)this.context.getSplitsAssignmentSequence()).isEmpty();
        this.enumerator.handleSourceEvent(1, (SourceEvent)new SourceReaderFinishedEvent(-1));
        Assertions.assertThat((List)this.context.getSplitsAssignmentSequence()).hasSize(1);
        this.splitFromSource0 = (HybridSourceSplit)((List)((SplitsAssignment)this.context.getSplitsAssignmentSequence().get(0)).assignment().get(0)).get(0);
        Assertions.assertThat((int)this.splitFromSource0.sourceIndex()).isEqualTo(0);
        Assertions.assertThat((int)HybridSourceSplitEnumeratorTest.getCurrentSourceIndex(this.enumerator)).isEqualTo(0);
        this.enumerator.handleSourceEvent(0, (SourceEvent)new SourceReaderFinishedEvent(0));
        ((AbstractIntegerAssert)Assertions.assertThat((int)HybridSourceSplitEnumeratorTest.getCurrentSourceIndex(this.enumerator)).as("one reader finished", new Object[0])).isEqualTo(0);
        this.enumerator.handleSourceEvent(1, (SourceEvent)new SourceReaderFinishedEvent(0));
        ((AbstractIntegerAssert)Assertions.assertThat((int)HybridSourceSplitEnumeratorTest.getCurrentSourceIndex(this.enumerator)).as("both readers finished", new Object[0])).isEqualTo(1);
        ((ListAssert)Assertions.assertThat((List)this.context.getSplitsAssignmentSequence()).as("switch triggers split assignment", new Object[0])).hasSize(2);
        this.splitFromSource1 = (HybridSourceSplit)((List)((SplitsAssignment)this.context.getSplitsAssignmentSequence().get(1)).assignment().get(0)).get(0);
        Assertions.assertThat((int)this.splitFromSource1.sourceIndex()).isEqualTo(1);
        this.enumerator.handleSourceEvent(1, (SourceEvent)new SourceReaderFinishedEvent(1));
        ((AbstractIntegerAssert)Assertions.assertThat((int)HybridSourceSplitEnumeratorTest.getCurrentSourceIndex(this.enumerator)).as("reader without assignment", new Object[0])).isEqualTo(1);
    }

    @Test
    void testHighCardinalitySources() {
        int i;
        this.context = new MockSplitEnumeratorContext(2);
        HybridSource.HybridSourceBuilder hybridSourceBuilder = HybridSource.builder((Source)MOCK_SOURCE);
        int maxSources = 130;
        for (i = 1; i < 130; ++i) {
            hybridSourceBuilder = hybridSourceBuilder.addSource((Source)MOCK_SOURCE);
        }
        this.source = hybridSourceBuilder.build();
        this.enumerator = (HybridSourceSplitEnumerator)this.source.createEnumerator(this.context);
        this.enumerator.start();
        HybridSourceSplitEnumeratorTest.registerReader(this.context, this.enumerator, 0);
        Assertions.assertThat((List)this.context.getSplitsAssignmentSequence()).isEmpty();
        HybridSourceSplitEnumeratorTest.registerReader(this.context, this.enumerator, 1);
        Assertions.assertThat((List)this.context.getSplitsAssignmentSequence()).isEmpty();
        this.enumerator.handleSourceEvent(0, (SourceEvent)new SourceReaderFinishedEvent(-1));
        Assertions.assertThat((List)this.context.getSplitsAssignmentSequence()).isEmpty();
        this.enumerator.handleSourceEvent(1, (SourceEvent)new SourceReaderFinishedEvent(-1));
        Assertions.assertThat((List)this.context.getSplitsAssignmentSequence()).hasSize(1);
        this.splitFromSource0 = (HybridSourceSplit)((List)((SplitsAssignment)this.context.getSplitsAssignmentSequence().get(0)).assignment().get(0)).get(0);
        Assertions.assertThat((int)this.splitFromSource0.sourceIndex()).isEqualTo(0);
        Assertions.assertThat((int)HybridSourceSplitEnumeratorTest.getCurrentSourceIndex(this.enumerator)).isEqualTo(0);
        for (i = 0; i < 130; ++i) {
            this.enumerator.handleSourceEvent(0, (SourceEvent)new SourceReaderFinishedEvent(i));
            ((AbstractIntegerAssert)Assertions.assertThat((int)HybridSourceSplitEnumeratorTest.getCurrentSourceIndex(this.enumerator)).as("one reader finished", new Object[0])).isEqualTo(i);
            this.enumerator.handleSourceEvent(1, (SourceEvent)new SourceReaderFinishedEvent(i));
            if (i < 129) {
                ((AbstractIntegerAssert)Assertions.assertThat((int)HybridSourceSplitEnumeratorTest.getCurrentSourceIndex(this.enumerator)).as("both readers finished", new Object[0])).isEqualTo(i + 1);
                ((ListAssert)Assertions.assertThat((List)this.context.getSplitsAssignmentSequence()).as("switch triggers split assignment", new Object[0])).hasSize(i + 2);
                this.splitFromSource1 = (HybridSourceSplit)((List)((SplitsAssignment)this.context.getSplitsAssignmentSequence().get(i)).assignment().get(0)).get(0);
                Assertions.assertThat((int)this.splitFromSource1.sourceIndex()).isEqualTo(i);
                continue;
            }
            ((AbstractIntegerAssert)Assertions.assertThat((int)HybridSourceSplitEnumeratorTest.getCurrentSourceIndex(this.enumerator)).as("both readers finished", new Object[0])).isEqualTo(129);
        }
        this.enumerator.handleSourceEvent(1, (SourceEvent)new SourceReaderFinishedEvent(1));
        ((AbstractIntegerAssert)Assertions.assertThat((int)HybridSourceSplitEnumeratorTest.getCurrentSourceIndex(this.enumerator)).as("reader without assignment", new Object[0])).isEqualTo(129);
    }

    @Test
    void testRegisterReaderAfterSwitchAndReaderReset() {
        this.setupEnumeratorAndTriggerSourceSwitch();
        this.context.getSplitsAssignmentSequence().clear();
        this.enumerator.addReader(0);
        this.enumerator.addSplitsBack(Collections.singletonList(this.splitFromSource0), 0);
        Assertions.assertThat((List)this.context.getSplitsAssignmentSequence()).isEmpty();
        this.enumerator.handleSourceEvent(0, (SourceEvent)new SourceReaderFinishedEvent(-1));
        HybridSourceSplitEnumeratorTest.assertSplitAssignment("addSplitsBack triggers assignment when reader registered", this.context, 1, this.splitFromSource0, 0);
        this.context.getSplitsAssignmentSequence().clear();
        this.context.unregisterReader(0);
        this.enumerator.addSplitsBack(Collections.singletonList(this.splitFromSource0), 0);
        ((ListAssert)Assertions.assertThat((List)this.context.getSplitsAssignmentSequence()).as("addSplitsBack doesn't trigger assignment when reader not registered", new Object[0])).isEmpty();
        HybridSourceSplitEnumeratorTest.registerReader(this.context, this.enumerator, 0);
        Assertions.assertThat((List)this.context.getSplitsAssignmentSequence()).isEmpty();
        this.enumerator.handleSourceEvent(0, (SourceEvent)new SourceReaderFinishedEvent(-1));
        HybridSourceSplitEnumeratorTest.assertSplitAssignment("registerReader triggers assignment", this.context, 1, this.splitFromSource0, 0);
    }

    @Test
    void testHandleSplitRequestAfterSwitchAndReaderReset() {
        this.setupEnumeratorAndTriggerSourceSwitch();
        UnderlyingEnumeratorWrapper underlyingEnumeratorWrapper = new UnderlyingEnumeratorWrapper(HybridSourceSplitEnumeratorTest.getCurrentEnumerator(this.enumerator));
        Whitebox.setInternalState((Object)this.enumerator, (String)"currentEnumerator", (Object)underlyingEnumeratorWrapper);
        List mockSourceSplits = (List)Whitebox.getInternalState((Object)underlyingEnumeratorWrapper.enumerator, (String)"splits");
        Assertions.assertThat((List)mockSourceSplits).isEmpty();
        this.context.getSplitsAssignmentSequence().clear();
        ((AbstractIntegerAssert)Assertions.assertThat((int)HybridSourceSplitEnumeratorTest.getCurrentSourceIndex(this.enumerator)).as("current enumerator", new Object[0])).isEqualTo(1);
        Assertions.assertThat(underlyingEnumeratorWrapper.handleSplitRequests).isEmpty();
        this.enumerator.handleSplitRequest(0, "fakehostname");
        SwitchedSources switchedSources = new SwitchedSources();
        switchedSources.put(1, (Source)MOCK_SOURCE);
        HybridSourceSplitEnumeratorTest.assertSplitAssignment("handleSplitRequest triggers assignment of split by underlying enumerator", this.context, 1, HybridSourceSplit.wrapSplit((SourceSplit)UnderlyingEnumeratorWrapper.SPLIT_1, (int)1, (SwitchedSources)switchedSources), 0);
        this.enumerator.addSplitsBack(Collections.singletonList(this.splitFromSource0), 0);
        Assertions.assertThatThrownBy(() -> this.enumerator.handleSplitRequest(0, "fakehostname")).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testRestoreEnumerator() throws Exception {
        this.setupEnumeratorAndTriggerSourceSwitch();
        this.enumerator = (HybridSourceSplitEnumerator)this.source.createEnumerator(this.context);
        this.enumerator.start();
        HybridSourceEnumeratorState enumeratorState = this.enumerator.snapshotState(0L);
        MockSplitEnumerator underlyingEnumerator = HybridSourceSplitEnumeratorTest.getCurrentEnumerator(this.enumerator);
        Assertions.assertThat((List)((List)Whitebox.getInternalState((Object)underlyingEnumerator, (String)"splits"))).hasSize(1);
        this.enumerator = (HybridSourceSplitEnumerator)this.source.restoreEnumerator(this.context, enumeratorState);
        this.enumerator.start();
        underlyingEnumerator = HybridSourceSplitEnumeratorTest.getCurrentEnumerator(this.enumerator);
        Assertions.assertThat((List)((List)Whitebox.getInternalState((Object)underlyingEnumerator, (String)"splits"))).hasSize(1);
    }

    @Test
    void testRestoreEnumeratorAfterFirstSourceWithoutRestoredSplits() throws Exception {
        this.setupEnumeratorAndTriggerSourceSwitch();
        HybridSourceEnumeratorState enumeratorState = this.enumerator.snapshotState(0L);
        MockSplitEnumerator underlyingEnumerator = HybridSourceSplitEnumeratorTest.getCurrentEnumerator(this.enumerator);
        Assertions.assertThat((List)((List)Whitebox.getInternalState((Object)underlyingEnumerator, (String)"splits"))).hasSize(0);
        this.enumerator = (HybridSourceSplitEnumerator)this.source.restoreEnumerator(this.context, enumeratorState);
        this.enumerator.start();
        this.enumerator.handleSourceEvent(0, (SourceEvent)new SourceReaderFinishedEvent(-1));
        underlyingEnumerator = HybridSourceSplitEnumeratorTest.getCurrentEnumerator(this.enumerator);
        Assertions.assertThat((List)((List)Whitebox.getInternalState((Object)underlyingEnumerator, (String)"splits"))).hasSize(0);
    }

    @Test
    void testDefaultMethodDelegation() throws Exception {
        this.setupEnumeratorAndTriggerSourceSwitch();
        SplitEnumerator underlyingEnumeratorSpy = (SplitEnumerator)Mockito.spy((Object)HybridSourceSplitEnumeratorTest.getCurrentEnumerator(this.enumerator));
        Whitebox.setInternalState((Object)this.enumerator, (String)"currentEnumerator", (Object)underlyingEnumeratorSpy);
        this.enumerator.notifyCheckpointComplete(1L);
        ((SplitEnumerator)Mockito.verify((Object)underlyingEnumeratorSpy)).notifyCheckpointComplete(1L);
        this.enumerator.notifyCheckpointAborted(2L);
        ((SplitEnumerator)Mockito.verify((Object)underlyingEnumeratorSpy)).notifyCheckpointAborted(2L);
        SwitchSourceEvent se = new SwitchSourceEvent(0, null, false);
        this.enumerator.handleSourceEvent(0, (SourceEvent)se);
        ((SplitEnumerator)Mockito.verify((Object)underlyingEnumeratorSpy)).handleSourceEvent(0, (SourceEvent)se);
    }

    @Test
    void testInterceptNoMoreSplitEvent() {
        this.context = new MockSplitEnumeratorContext(2);
        this.source = HybridSource.builder((Source)MOCK_SOURCE).addSource((Source)MOCK_SOURCE).build();
        this.enumerator = (HybridSourceSplitEnumerator)this.source.createEnumerator(this.context);
        this.enumerator.start();
        HybridSourceSplitEnumeratorTest.registerReader(this.context, this.enumerator, 0);
        HybridSourceSplitEnumeratorTest.registerReader(this.context, this.enumerator, 1);
        this.enumerator.handleSourceEvent(0, (SourceEvent)new SourceReaderFinishedEvent(-1));
        this.enumerator.handleSourceEvent(1, (SourceEvent)new SourceReaderFinishedEvent(-1));
        Assertions.assertThat((boolean)this.context.hasNoMoreSplits(0)).isFalse();
        Assertions.assertThat((boolean)this.context.hasNoMoreSplits(1)).isFalse();
        this.splitFromSource0 = (HybridSourceSplit)((List)((SplitsAssignment)this.context.getSplitsAssignmentSequence().get(0)).assignment().get(0)).get(0);
        this.enumerator.handleSourceEvent(0, (SourceEvent)new SourceReaderFinishedEvent(0));
        this.enumerator.handleSourceEvent(1, (SourceEvent)new SourceReaderFinishedEvent(0));
        Assertions.assertThat((boolean)this.context.hasNoMoreSplits(0)).isTrue();
        Assertions.assertThat((boolean)this.context.hasNoMoreSplits(1)).isTrue();
        this.context.getSplitsAssignmentSequence().clear();
        this.context.resetNoMoreSplits(0);
        this.enumerator.addReader(0);
        this.enumerator.addSplitsBack(Collections.singletonList(this.splitFromSource0), 0);
        this.enumerator.handleSourceEvent(0, (SourceEvent)new SourceReaderFinishedEvent(-1));
        Assertions.assertThat((boolean)this.context.hasNoMoreSplits(0)).isFalse();
        this.enumerator.handleSourceEvent(0, (SourceEvent)new SourceReaderFinishedEvent(0));
        Assertions.assertThat((boolean)this.context.hasNoMoreSplits(0)).isTrue();
    }

    @Test
    void testMultiSubtaskSwitchEnumerator() {
        this.context = new MockSplitEnumeratorContext(2);
        this.source = HybridSource.builder((Source)MOCK_SOURCE).addSource((Source)MOCK_SOURCE).addSource((Source)MOCK_SOURCE).build();
        this.enumerator = (HybridSourceSplitEnumerator)this.source.createEnumerator(this.context);
        this.enumerator.start();
        HybridSourceSplitEnumeratorTest.registerReader(this.context, this.enumerator, 0);
        HybridSourceSplitEnumeratorTest.registerReader(this.context, this.enumerator, 1);
        this.enumerator.handleSourceEvent(0, (SourceEvent)new SourceReaderFinishedEvent(-1));
        this.enumerator.handleSourceEvent(1, (SourceEvent)new SourceReaderFinishedEvent(-1));
        Assertions.assertThat((int)HybridSourceSplitEnumeratorTest.getCurrentSourceIndex(this.enumerator)).isEqualTo(0);
        this.enumerator.handleSourceEvent(0, (SourceEvent)new SourceReaderFinishedEvent(0));
        Assertions.assertThat((int)HybridSourceSplitEnumeratorTest.getCurrentSourceIndex(this.enumerator)).isEqualTo(0);
        this.enumerator.handleSourceEvent(1, (SourceEvent)new SourceReaderFinishedEvent(0));
        ((AbstractIntegerAssert)Assertions.assertThat((int)HybridSourceSplitEnumeratorTest.getCurrentSourceIndex(this.enumerator)).as("all reader finished source-0", new Object[0])).isEqualTo(1);
        this.enumerator.handleSourceEvent(0, (SourceEvent)new SourceReaderFinishedEvent(1));
        ((AbstractIntegerAssert)Assertions.assertThat((int)HybridSourceSplitEnumeratorTest.getCurrentSourceIndex(this.enumerator)).as("only reader-0 has finished reading, reader-1 is not yet done, so do not switch to the next source", new Object[0])).isEqualTo(1);
        this.enumerator.handleSourceEvent(1, (SourceEvent)new SourceReaderFinishedEvent(1));
        ((AbstractIntegerAssert)Assertions.assertThat((int)HybridSourceSplitEnumeratorTest.getCurrentSourceIndex(this.enumerator)).as("all reader finished source-1", new Object[0])).isEqualTo(2);
    }

    private static void assertSplitAssignment(String reason, MockSplitEnumeratorContext<HybridSourceSplit> context, int size, HybridSourceSplit split, int subtask) {
        ((ListAssert)Assertions.assertThat((List)context.getSplitsAssignmentSequence()).as(reason, new Object[0])).hasSize(size);
        ((ObjectAssert)Assertions.assertThat((Object)((HybridSourceSplit)((List)((SplitsAssignment)context.getSplitsAssignmentSequence().get(size - 1)).assignment().get(subtask)).get(0))).as(reason, new Object[0])).isEqualTo((Object)split);
    }

    private static void registerReader(MockSplitEnumeratorContext<HybridSourceSplit> context, HybridSourceSplitEnumerator enumerator, int reader) {
        context.registerReader(new ReaderInfo(reader, "location 0"));
        enumerator.addReader(reader);
    }

    private static int getCurrentSourceIndex(HybridSourceSplitEnumerator enumerator) {
        return (Integer)Whitebox.getInternalState((Object)enumerator, (String)"currentSourceIndex");
    }

    private static MockSplitEnumerator getCurrentEnumerator(HybridSourceSplitEnumerator enumerator) {
        return (MockSplitEnumerator)Whitebox.getInternalState((Object)enumerator, (String)"currentEnumerator");
    }

    private static class UnderlyingEnumeratorWrapper
    implements SplitEnumerator<MockSourceSplit, Object> {
        private static final MockSourceSplit SPLIT_1 = new MockSourceSplit(0, 0, 1);
        private final List<Integer> handleSplitRequests = new ArrayList<Integer>();
        private final MockSplitEnumerator enumerator;
        private final SplitEnumeratorContext context;

        private UnderlyingEnumeratorWrapper(MockSplitEnumerator enumerator) {
            this.enumerator = enumerator;
            this.context = (SplitEnumeratorContext)Whitebox.getInternalState((Object)enumerator, (String)"context");
        }

        public void handleSplitRequest(int subtaskId, String requesterHostname) {
            this.handleSplitRequests.add(subtaskId);
            this.context.assignSplits(new SplitsAssignment((SourceSplit)SPLIT_1, subtaskId));
        }

        public void start() {
            throw new UnsupportedOperationException();
        }

        public void addSplitsBack(List splits, int subtaskId) {
            this.enumerator.addSplitsBack(splits, subtaskId);
        }

        public void addReader(int subtaskId) {
            this.enumerator.addReader(subtaskId);
        }

        public Object snapshotState(long checkpointId) throws Exception {
            return this.enumerator.snapshotState(checkpointId);
        }

        public void close() throws IOException {
            this.enumerator.close();
        }
    }
}

