/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.recovery;

import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.ReduceOperator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.recovery.AbstractTaskManagerProcessFailureRecoveryTest;
import org.junit.Assert;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class TaskManagerProcessFailureBatchRecoveryITCase
extends AbstractTaskManagerProcessFailureRecoveryTest {
    private final ExecutionMode executionMode;

    public TaskManagerProcessFailureBatchRecoveryITCase(ExecutionMode executionMode) {
        this.executionMode = executionMode;
    }

    @Parameterized.Parameters
    public static Collection<Object[]> executionMode() {
        return Arrays.asList({ExecutionMode.PIPELINED}, {ExecutionMode.BATCH});
    }

    @Override
    public void testTaskManagerFailure(Configuration configuration, final File coordinateDir) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)1337, (Configuration)configuration, (String[])new String[0]);
        env.setParallelism(4);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)2, (long)0L));
        env.getConfig().setExecutionMode(this.executionMode);
        env.getConfig().disableSysoutLogging();
        long numElements = 100000L;
        ReduceOperator result = env.generateSequence(1L, 100000L).rebalance().map((MapFunction)new RichMapFunction<Long, Long>(){
            private final File proceedFile;
            private boolean markerCreated;
            private boolean checkForProceedFile;
            {
                this.proceedFile = new File(coordinateDir, "proceed");
                this.markerCreated = false;
                this.checkForProceedFile = true;
            }

            public Long map(Long value) throws Exception {
                if (!this.markerCreated) {
                    int taskIndex = this.getRuntimeContext().getIndexOfThisSubtask();
                    AbstractTaskManagerProcessFailureRecoveryTest.touchFile(new File(coordinateDir, "ready_" + taskIndex));
                    this.markerCreated = true;
                }
                if (this.checkForProceedFile) {
                    if (this.proceedFile.exists()) {
                        this.checkForProceedFile = false;
                    } else {
                        Thread.sleep(100L);
                    }
                }
                return value;
            }
        }).reduce((ReduceFunction)new ReduceFunction<Long>(){

            public Long reduce(Long value1, Long value2) {
                return value1 + value2;
            }
        });
        long sum = (Long)result.collect().get(0);
        Assert.assertEquals((long)5000050000L, (long)sum);
    }
}

