/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.File;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.checkpointing.ChangelogRecoveryITCaseBase;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

public abstract class ChangelogRecoverySwitchEnvTestBase
extends ChangelogRecoveryITCaseBase {
    public ChangelogRecoverySwitchEnvTestBase(AbstractStateBackend delegatedStateBackend) {
        super(delegatedStateBackend);
    }

    protected void testSwitchEnv(StreamExecutionEnvironment firstEnv, StreamExecutionEnvironment secondEnv) throws Exception {
        File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
        SharedReference miniCluster = this.sharedObjects.add((Object)this.cluster.getMiniCluster());
        SharedReference currentMaterializationId = this.sharedObjects.add(ConcurrentHashMap.newKeySet());
        firstEnv.getCheckpointConfig().setCheckpointStorage(firstCheckpointFolder.toURI());
        JobGraph firstJobGraph = this.firstNormalJobGraph(firstEnv, (SharedReference<MiniCluster>)miniCluster, (SharedReference<Set<StateHandleID>>)currentMaterializationId);
        try {
            this.cluster.getMiniCluster().submitJob(firstJobGraph).get();
            this.cluster.getMiniCluster().requestJobResult(firstJobGraph.getJobID()).get();
        }
        catch (Exception ex) {
            Preconditions.checkState((boolean)ExceptionUtils.findThrowable((Throwable)ex, ChangelogRecoveryITCaseBase.ArtificialFailure.class).isPresent());
        }
        File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
        secondEnv.getCheckpointConfig().setCheckpointStorage(secondCheckpointFolder.toURI());
        JobGraph jobGraph = this.nextNormalJobGraph(secondEnv, (SharedReference<MiniCluster>)miniCluster, (SharedReference<Set<StateHandleID>>)currentMaterializationId);
        File checkpointFile = TestUtils.getMostRecentCompletedCheckpoint((File)firstCheckpointFolder);
        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)checkpointFile.getPath()));
        this.waitAndAssert(jobGraph);
    }

    private JobGraph firstNormalJobGraph(StreamExecutionEnvironment env, final SharedReference<MiniCluster> miniCluster, final SharedReference<Set<StateHandleID>> currentMaterializationId) {
        final SharedReference jobID = this.sharedObjects.add((Object)this.generateJobID());
        return this.buildJobGraph(env, new ChangelogRecoveryITCaseBase.ControlledSource(){

            @Override
            protected void beforeElement(SourceFunction.SourceContext<Integer> ctx) throws Exception {
                if (this.currentIndex == 2500) {
                    this.waitWhile(() -> {
                        if (this.completedCheckpointNum.get() <= 0) {
                            return true;
                        }
                        Set<StateHandleID> allMaterializationId = ChangelogRecoveryITCaseBase.getAllStateHandleId((JobID)jobID.get(), (MiniCluster)miniCluster.get());
                        if (!allMaterializationId.isEmpty()) {
                            ((Set)currentMaterializationId.get()).addAll(allMaterializationId);
                            return false;
                        }
                        return true;
                    });
                } else if (this.currentIndex > 3333) {
                    this.throwArtificialFailure();
                }
            }
        }, (JobID)jobID.get());
    }

    private JobGraph nextNormalJobGraph(StreamExecutionEnvironment env, final SharedReference<MiniCluster> miniCluster, final SharedReference<Set<StateHandleID>> currentMaterializationId) {
        final SharedReference jobID = this.sharedObjects.add((Object)this.generateJobID());
        return this.buildJobGraph(env, new ChangelogRecoveryITCaseBase.ControlledSource(){

            @Override
            protected void beforeElement(SourceFunction.SourceContext<Integer> ctx) throws Exception {
                if (this.currentIndex == 5000) {
                    this.waitWhile(() -> {
                        Set<StateHandleID> allMaterializationId = ChangelogRecoveryITCaseBase.getAllStateHandleId((JobID)jobID.get(), (MiniCluster)miniCluster.get());
                        return allMaterializationId.isEmpty() || ((Set)currentMaterializationId.get()).equals(allMaterializationId);
                    });
                }
            }
        }, (JobID)jobID.get());
    }

    protected JobGraph buildJobGraph(StreamExecutionEnvironment env, final int waitingOnIndex, final int failIndex, final SharedReference<MiniCluster> miniCluster) {
        final SharedReference jobID = this.sharedObjects.add((Object)this.generateJobID());
        return this.buildJobGraph(env, new ChangelogRecoveryITCaseBase.ControlledSource(){

            @Override
            protected void beforeElement(SourceFunction.SourceContext<Integer> ctx) throws Exception {
                if (this.currentIndex == waitingOnIndex) {
                    this.waitWhile(() -> ChangelogRecoveryITCaseBase.getAllStateHandleId((JobID)jobID.get(), (MiniCluster)miniCluster.get()).isEmpty());
                } else if (this.currentIndex > failIndex) {
                    this.throwArtificialFailure();
                }
            }
        }, (JobID)jobID.get());
    }
}

