/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.SerializableSupplier;
import org.junit.Assume;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

public class CheckpointStoreITCase
extends TestLogger {
    private static final Configuration CONFIGURATION = new Configuration().set(HighAvailabilityOptions.HA_MODE, (Object)TestingHAFactory.class.getName());
    @ClassRule
    public static final MiniClusterWithClientResource CLUSTER = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(CONFIGURATION).build());

    @Before
    public void init() {
        FailingStore.reset();
        FailingMapper.reset();
    }

    @Test
    public void testRestartOnRecoveryFailure() throws Exception {
        Assume.assumeFalse((String)"Adaptive scheduler doesn't retry after failures on recovery", (ClusterOptions.getSchedulerType((Configuration)CONFIGURATION) == JobManagerOptions.SchedulerType.Adaptive ? 1 : 0) != 0);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(10L);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)2, (long)0L));
        env.addSource(this.emitUntil((SerializableSupplier<Boolean>)(SerializableSupplier & Serializable)() -> FailingStore.recovered && FailingMapper.failedAndProcessed)).map((MapFunction)new FailingMapper()).addSink((SinkFunction)new DiscardingSink());
        env.execute();
        Preconditions.checkState((FailingStore.recovered && FailingMapper.failedAndProcessed ? 1 : 0) != 0);
    }

    private SourceFunction<Integer> emitUntil(final SerializableSupplier<Boolean> until) {
        return new SourceFunction<Integer>(){
            private volatile boolean running = true;

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void run(SourceFunction.SourceContext<Integer> ctx) {
                while (this.running && !((Boolean)until.get()).booleanValue()) {
                    Object object = ctx.getCheckpointLock();
                    synchronized (object) {
                        ctx.collect((Object)0);
                        try {
                            Thread.sleep(100L);
                        }
                        catch (InterruptedException e) {
                            ExceptionUtils.rethrow((Throwable)e);
                        }
                    }
                }
            }

            public void cancel() {
                this.running = false;
            }
        };
    }

    private static class FailingStore
    implements CompletedCheckpointStore {
        private static volatile boolean started = false;
        private static volatile boolean failed = false;
        private static volatile boolean recovered = false;

        private FailingStore() {
        }

        public static void reset() {
            recovered = false;
            failed = false;
            started = false;
        }

        public void recover() throws Exception {
            if (!started) {
                started = true;
            } else {
                if (!failed) {
                    failed = true;
                    throw new RuntimeException();
                }
                if (!recovered) {
                    recovered = true;
                }
            }
        }

        public void addCheckpoint(CompletedCheckpoint checkpoint, CheckpointsCleaner checkpointsCleaner, Runnable postCleanup) {
        }

        public void shutdown(JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner) throws Exception {
        }

        public List<CompletedCheckpoint> getAllCheckpoints() {
            return Collections.emptyList();
        }

        public int getNumberOfRetainedCheckpoints() {
            return 0;
        }

        public int getMaxNumberOfRetainedCheckpoints() {
            return 1;
        }

        public boolean requiresExternalizedCheckpoints() {
            return false;
        }
    }

    public static class TestingHAFactory
    implements HighAvailabilityServicesFactory {
        public HighAvailabilityServices createHAServices(Configuration configuration, Executor executor) {
            return new EmbeddedHaServices(Executors.directExecutor()){

                public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
                    return new TestingCheckpointRecoveryFactory((CompletedCheckpointStore)new FailingStore(), (CheckpointIDCounter)new TestingCheckpointIDCounter(new CompletableFuture()));
                }
            };
        }
    }

    private static class FailingMapper
    implements MapFunction<Integer, Integer> {
        private static volatile boolean failed = false;
        private static volatile boolean failedAndProcessed = false;

        private FailingMapper() {
        }

        public static void reset() {
            failed = false;
            failedAndProcessed = false;
        }

        public Integer map(Integer element) throws Exception {
            if (!failed) {
                failed = true;
                throw new RuntimeException();
            }
            failedAndProcessed = true;
            return element;
        }
    }
}

