/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.base.source.hybrid;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.connector.base.source.hybrid.HybridSource;
import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.collect.ClientAndIterator;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.Assertions;
import org.junit.Rule;
import org.junit.Test;

public class HybridSourceITCase
extends TestLogger {
    private static final int PARALLELISM = 2;
    @Rule
    public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(2).setRpcServiceSharing(RpcServiceSharing.DEDICATED).withHaLeadershipControl().build());
    private static final List<Integer> EXPECTED_RESULT = IntStream.rangeClosed(0, 39).boxed().collect(Collectors.toList());

    @Test
    public void testHybridSource() throws Exception {
        this.testHybridSource(FailoverType.NONE, this.sourceWithFixedSwitchPosition());
    }

    @Test
    public void testHybridSourceWithDynamicSwitchPosition() throws Exception {
        this.testHybridSource(FailoverType.NONE, this.sourceWithDynamicSwitchPosition());
    }

    @Test
    public void testHybridSourceWithTaskManagerFailover() throws Exception {
        this.testHybridSource(FailoverType.TM, this.sourceWithFixedSwitchPosition());
    }

    @Test
    public void testHybridSourceWithJobManagerFailover() throws Exception {
        this.testHybridSource(FailoverType.JM, this.sourceWithFixedSwitchPosition());
    }

    private Source sourceWithFixedSwitchPosition() {
        int numSplits = 2;
        int numRecordsPerSplit = EXPECTED_RESULT.size() / 4;
        return HybridSource.builder((Source)new MockBaseSource(numSplits, numRecordsPerSplit, Boundedness.BOUNDED)).addSource((Source)new MockBaseSource(numSplits, numRecordsPerSplit, 20, Boundedness.BOUNDED)).build();
    }

    private Source sourceWithDynamicSwitchPosition() {
        return HybridSource.builder((Source)new MockBaseSource(2, 10, Boundedness.BOUNDED)).addSource((HybridSource.SourceFactory & Serializable)enumerator -> new MockBaseSource(2, 10, 20, Boundedness.BOUNDED), Boundedness.BOUNDED).build();
    }

    private void testHybridSource(FailoverType failoverType, Source source) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env.setRestartStrategy(FailoverType.NONE == failoverType ? RestartStrategies.noRestart() : RestartStrategies.fixedDelayRestart((int)1, (long)0L));
        SingleOutputStreamOperator stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "hybrid-source").returns(Integer.class);
        DataStream streamFailingInTheMiddleOfReading = RecordCounterToFail.wrapWithFailureAfter((DataStream)stream, HybridSourceITCase.EXPECTED_RESULT.size() / 2);
        ClientAndIterator client = DataStreamUtils.collectWithClient((DataStream)streamFailingInTheMiddleOfReading, (String)(HybridSourceITCase.class.getSimpleName() + '-' + failoverType.name()));
        JobID jobId = client.client.getJobID();
        RecordCounterToFail.waitToFail();
        HybridSourceITCase.triggerFailover(failoverType, jobId, () -> RecordCounterToFail.continueProcessing(), this.miniClusterResource.getMiniCluster());
        ArrayList<Integer> result = new ArrayList<Integer>();
        while (result.size() < EXPECTED_RESULT.size() && client.iterator.hasNext()) {
            result.add((Integer)client.iterator.next());
        }
        HybridSourceITCase.verifyResult(result);
    }

    private static void triggerFailover(FailoverType type, JobID jobId, Runnable afterFailAction, MiniCluster miniCluster) throws Exception {
        switch (type) {
            case NONE: {
                afterFailAction.run();
                break;
            }
            case TM: {
                HybridSourceITCase.restartTaskManager(afterFailAction, miniCluster);
                break;
            }
            case JM: {
                HybridSourceITCase.triggerJobManagerFailover(jobId, afterFailAction, miniCluster);
            }
        }
    }

    private static void triggerJobManagerFailover(JobID jobId, Runnable afterFailAction, MiniCluster miniCluster) throws Exception {
        HaLeadershipControl haLeadershipControl = (HaLeadershipControl)miniCluster.getHaLeadershipControl().get();
        haLeadershipControl.revokeJobMasterLeadership(jobId).get();
        afterFailAction.run();
        haLeadershipControl.grantJobMasterLeadership(jobId).get();
    }

    private static void restartTaskManager(Runnable afterFailAction, MiniCluster miniCluster) throws Exception {
        miniCluster.terminateTaskManager(0).get();
        afterFailAction.run();
        miniCluster.startTaskManager();
    }

    private static void verifyResult(List<Integer> result) {
        Collections.sort(result);
        Assertions.assertThat(result).isEqualTo(EXPECTED_RESULT);
    }

    private static class RecordCounterToFail {
        private static AtomicInteger records;
        private static CompletableFuture<Void> fail;
        private static CompletableFuture<Void> continueProcessing;

        private RecordCounterToFail() {
        }

        private static <T> DataStream<T> wrapWithFailureAfter(DataStream<T> stream, int failAfter) {
            records = new AtomicInteger();
            fail = new CompletableFuture();
            continueProcessing = new CompletableFuture();
            return stream.map((MapFunction & Serializable)record -> {
                boolean notFailedYet;
                boolean halfOfInputIsRead = records.incrementAndGet() > failAfter;
                boolean bl = notFailedYet = !fail.isDone();
                if (notFailedYet && halfOfInputIsRead) {
                    fail.complete(null);
                    continueProcessing.get();
                }
                return record;
            });
        }

        private static void waitToFail() throws ExecutionException, InterruptedException {
            fail.get();
        }

        private static void continueProcessing() {
            continueProcessing.complete(null);
        }
    }

    private static enum FailoverType {
        NONE,
        TM,
        JM;

    }
}

