/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public abstract class CheckpointIDCounterTest
extends TestLogger {
    protected abstract CheckpointIDCounter createCompletedCheckpoints() throws Exception;

    @Test
    public void testSerialIncrementAndGet() throws Exception {
        CheckpointIDCounter counter = this.createCompletedCheckpoints();
        try {
            counter.start();
            Assert.assertEquals((long)1L, (long)counter.getAndIncrement());
            Assert.assertEquals((long)2L, (long)counter.getAndIncrement());
            Assert.assertEquals((long)3L, (long)counter.getAndIncrement());
            Assert.assertEquals((long)4L, (long)counter.getAndIncrement());
        }
        finally {
            counter.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentGetAndIncrement() throws Exception {
        int numThreads = 8;
        CountDownLatch startLatch = new CountDownLatch(1);
        CheckpointIDCounter counter = this.createCompletedCheckpoints();
        counter.start();
        ExecutorService executor = null;
        try {
            executor = Executors.newFixedThreadPool(8);
            ArrayList<Future<List<Long>>> resultFutures = new ArrayList<Future<List<Long>>>(8);
            for (int i = 0; i < 8; ++i) {
                resultFutures.add(executor.submit(new Incrementer(startLatch, counter)));
            }
            startLatch.countDown();
            int expectedTotal = 1024;
            ArrayList<Long> all = new ArrayList<Long>(1024);
            for (Future future : resultFutures) {
                List counts = (List)future.get();
                Iterator iterator = counts.iterator();
                while (iterator.hasNext()) {
                    long val = (Long)iterator.next();
                    all.add(val);
                }
            }
            Collections.sort(all);
            Assert.assertEquals((long)1024L, (long)all.size());
            long current = 0L;
            Iterator iterator = all.iterator();
            while (iterator.hasNext()) {
                long val = (Long)iterator.next();
                Assert.assertEquals((long)(++current), (long)val);
            }
            Assert.assertEquals((long)1025L, (long)counter.getAndIncrement());
        }
        finally {
            if (executor != null) {
                executor.shutdown();
            }
            counter.stop();
        }
    }

    private static class Incrementer
    implements Callable<List<Long>> {
        private static final int NumIncrements = 128;
        private final CountDownLatch startLatch;
        private final CheckpointIDCounter counter;

        public Incrementer(CountDownLatch startLatch, CheckpointIDCounter counter) {
            this.startLatch = startLatch;
            this.counter = counter;
        }

        @Override
        public List<Long> call() throws Exception {
            Random rand = new Random();
            ArrayList<Long> counts = new ArrayList<Long>();
            this.startLatch.await();
            for (int i = 0; i < 128; ++i) {
                counts.add(this.counter.getAndIncrement());
                Thread.sleep(rand.nextInt(20));
            }
            return counts;
        }
    }

    public static class ZooKeeperCheckpointIDCounterITCase
    extends CheckpointIDCounterTest {
        private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);

        @AfterClass
        public static void tearDown() throws Exception {
            if (ZooKeeper != null) {
                ZooKeeper.shutdown();
            }
        }

        @Before
        public void cleanUp() throws Exception {
            ZooKeeper.deleteAll();
        }

        @Override
        protected CheckpointIDCounter createCompletedCheckpoints() throws Exception {
            return new ZooKeeperCheckpointIDCounter(ZooKeeper.getClient(), "/checkpoint-id-counter");
        }
    }

    public static class StandaloneCheckpointIDCounterTest
    extends CheckpointIDCounterTest {
        @Override
        protected CheckpointIDCounter createCompletedCheckpoints() throws Exception {
            return new StandaloneCheckpointIDCounter();
        }
    }
}

