/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.accumulators;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.TestLogger;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AccumulatorLiveITCase
extends TestLogger {
    private static final Logger LOG = LoggerFactory.getLogger(AccumulatorLiveITCase.class);
    private static final String ACCUMULATOR_NAME = "test";
    private static final long HEARTBEAT_INTERVAL = 50L;
    private static final int NUM_ITERATIONS = 5;
    private static final List<Integer> inputData = new ArrayList<Integer>(5);
    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE;

    private static Configuration getConfiguration() {
        Configuration config = new Configuration();
        config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
        config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 50L);
        return config;
    }

    @Before
    public void resetLatches() throws InterruptedException {
        NotifyingMapper.reset();
    }

    @Test
    public void testBatch() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataSource input = env.fromCollection(inputData);
        input.flatMap((FlatMapFunction)new NotifyingMapper()).output((OutputFormat)new DummyOutputFormat());
        JobGraph jobGraph = AccumulatorLiveITCase.getJobGraph(env.createProgramPlan());
        AccumulatorLiveITCase.submitJobAndVerifyResults(jobGraph);
    }

    @Test
    public void testStreaming() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource input = env.fromCollection(inputData);
        input.flatMap((FlatMapFunction)new NotifyingMapper()).writeUsingOutputFormat((OutputFormat)new DummyOutputFormat()).disableChaining();
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        AccumulatorLiveITCase.submitJobAndVerifyResults(jobGraph);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void submitJobAndVerifyResults(final JobGraph jobGraph) throws Exception {
        Deadline deadline = Deadline.now().plus(Duration.ofSeconds(30L));
        final ClusterClient client = MINI_CLUSTER_RESOURCE.getClusterClient();
        CheckedThread submissionThread = new CheckedThread(){

            public void go() throws Exception {
                client.submitJob(jobGraph, AccumulatorLiveITCase.class.getClassLoader());
            }
        };
        submissionThread.start();
        try {
            NotifyingMapper.notifyLatch.await();
            FutureUtils.retrySuccessfulWithDelay(() -> {
                try {
                    return CompletableFuture.completedFuture(client.getAccumulators(jobGraph.getJobID()));
                }
                catch (Exception e) {
                    return FutureUtils.completedExceptionally((Throwable)e);
                }
            }, (Time)Time.milliseconds((long)20L), (Deadline)deadline, accumulators -> accumulators.size() == 1 && accumulators.containsKey(ACCUMULATOR_NAME) && (Integer)((OptionalFailure)accumulators.get(ACCUMULATOR_NAME)).getUnchecked() == 5, (ScheduledExecutor)TestingUtils.defaultScheduledExecutor()).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
            NotifyingMapper.shutdownLatch.trigger();
        }
        finally {
            NotifyingMapper.shutdownLatch.trigger();
            submissionThread.sync();
        }
    }

    private static JobGraph getJobGraph(Plan plan) {
        Optimizer pc = new Optimizer(new DataStatistics(), new Configuration());
        JobGraphGenerator jgg = new JobGraphGenerator();
        OptimizedPlan op = pc.compile(plan);
        return jgg.compileJobGraph(op);
    }

    static {
        for (int i = 0; i < 5; ++i) {
            inputData.add(i);
        }
        MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(AccumulatorLiveITCase.getConfiguration()).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(1).build());
    }

    private static class DummyOutputFormat
    implements OutputFormat<Integer> {
        private static final long serialVersionUID = 1L;

        private DummyOutputFormat() {
        }

        public void configure(Configuration parameters) {
        }

        public void open(int taskNumber, int numTasks) throws IOException {
        }

        public void writeRecord(Integer record) throws IOException {
        }

        public void close() throws IOException {
        }
    }

    private static class NotifyingMapper
    extends RichFlatMapFunction<Integer, Integer> {
        private static final long serialVersionUID = 1L;
        private static final OneShotLatch notifyLatch = new OneShotLatch();
        private static final OneShotLatch shutdownLatch = new OneShotLatch();
        private final IntCounter counter = new IntCounter();

        private NotifyingMapper() {
        }

        public void open(Configuration parameters) throws Exception {
            this.getRuntimeContext().addAccumulator(AccumulatorLiveITCase.ACCUMULATOR_NAME, (Accumulator)this.counter);
        }

        public void flatMap(Integer value, Collector<Integer> out) throws Exception {
            this.counter.add(1);
            out.collect((Object)value);
            LOG.debug("Emitting value {}.", (Object)value);
            if (this.counter.getLocalValuePrimitive() == 5) {
                notifyLatch.trigger();
            }
        }

        public void close() throws Exception {
            shutdownLatch.await();
        }

        private static void reset() throws InterruptedException {
            notifyLatch.reset();
            shutdownLatch.reset();
        }
    }
}

