/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.base.source.reader.synchronization;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class FutureCompletingBlockingQueueTest {
    private static final int DEFAULT_CAPACITY = 2;

    FutureCompletingBlockingQueueTest() {
    }

    @Test
    void testBasics() throws InterruptedException {
        FutureCompletingBlockingQueue queue = new FutureCompletingBlockingQueue(5);
        CompletableFuture future = queue.getAvailabilityFuture();
        Assertions.assertThat((boolean)queue.isEmpty()).isTrue();
        Assertions.assertThat((int)queue.size()).isEqualTo(0);
        queue.put(0, (Object)1234);
        Assertions.assertThat((boolean)future.isDone()).isTrue();
        Assertions.assertThat((int)queue.size()).isEqualTo(1);
        Assertions.assertThat((boolean)queue.isEmpty()).isFalse();
        Assertions.assertThat((int)queue.remainingCapacity()).isEqualTo(4);
        Assertions.assertThat((Integer)((Integer)queue.peek())).isNotNull();
        Assertions.assertThat((int)((Integer)queue.peek())).isEqualTo(1234);
        Assertions.assertThat((int)((Integer)queue.poll())).isEqualTo(1234);
        Assertions.assertThat((int)queue.size()).isEqualTo(0);
        Assertions.assertThat((boolean)queue.isEmpty()).isTrue();
        Assertions.assertThat((int)queue.remainingCapacity()).isEqualTo(5);
    }

    @Test
    void testPoll() throws InterruptedException {
        FutureCompletingBlockingQueue queue = new FutureCompletingBlockingQueue();
        queue.put(0, (Object)1234);
        Integer value = (Integer)queue.poll();
        Assertions.assertThat((Integer)value).isNotNull();
        Assertions.assertThat((int)value).isEqualTo(1234);
    }

    @Test
    void testPollEmptyQueue() throws InterruptedException {
        FutureCompletingBlockingQueue queue = new FutureCompletingBlockingQueue();
        queue.put(0, (Object)1234);
        Assertions.assertThat((Integer)((Integer)queue.poll())).isNotNull();
        Assertions.assertThat((Integer)((Integer)queue.poll())).isNull();
        Assertions.assertThat((Integer)((Integer)queue.poll())).isNull();
    }

    @Test
    void testWakeUpPut() throws InterruptedException {
        FutureCompletingBlockingQueue queue = new FutureCompletingBlockingQueue(1);
        CountDownLatch latch = new CountDownLatch(1);
        new Thread(() -> {
            try {
                Assertions.assertThat((boolean)queue.put(0, (Object)1234)).isTrue();
                Assertions.assertThat((boolean)queue.put(0, (Object)1234)).isFalse();
                latch.countDown();
            }
            catch (InterruptedException e) {
                Assertions.fail((String)"Interrupted unexpectedly.");
            }
        }).start();
        queue.wakeUpPuttingThread(0);
        latch.await();
        Assertions.assertThat((long)latch.getCount()).isEqualTo(0L);
    }

    @Test
    void testConcurrency() throws InterruptedException {
        FutureCompletingBlockingQueue queue = new FutureCompletingBlockingQueue(5);
        int numValuesPerThread = 10000;
        int numPuttingThreads = 5;
        ArrayList<Thread> threads = new ArrayList<Thread>();
        int i = 0;
        while (i < 5) {
            int index = i++;
            Thread t = new Thread(() -> {
                for (int j = 0; j < 10000; ++j) {
                    int base = index * 10000;
                    try {
                        queue.put(index, (Object)(base + j));
                        continue;
                    }
                    catch (InterruptedException e) {
                        Assertions.fail((String)"putting thread interrupted.");
                    }
                }
            });
            t.start();
            threads.add(t);
        }
        BitSet bitSet = new BitSet();
        AtomicInteger count = new AtomicInteger(0);
        for (int i2 = 0; i2 < 5; ++i2) {
            Thread t = new Thread(() -> {
                while (count.get() < 50000) {
                    Integer value = (Integer)queue.poll();
                    if (value == null) continue;
                    count.incrementAndGet();
                    if (bitSet.get(value)) {
                        Assertions.fail((String)("Value " + value + " has been consumed before"));
                    }
                    BitSet bitSet2 = bitSet;
                    synchronized (bitSet2) {
                        bitSet.set(value);
                    }
                }
            });
            t.start();
            threads.add(t);
        }
        for (Thread t : threads) {
            t.join();
        }
    }

    @Test
    void testSpecifiedQueueCapacity() {
        int capacity = 8000;
        FutureCompletingBlockingQueue queue = new FutureCompletingBlockingQueue(8000);
        Assertions.assertThat((int)queue.remainingCapacity()).isEqualTo(8000);
    }

    @Test
    void testQueueDefaultCapacity() {
        FutureCompletingBlockingQueue queue = new FutureCompletingBlockingQueue();
        Assertions.assertThat((int)queue.remainingCapacity()).isEqualTo(2);
        Assertions.assertThat((int)((Integer)SourceReaderOptions.ELEMENT_QUEUE_CAPACITY.defaultValue())).isEqualTo(2);
    }

    @Test
    void testUnavailableWhenEmpty() {
        FutureCompletingBlockingQueue queue = new FutureCompletingBlockingQueue();
        Assertions.assertThat((boolean)queue.getAvailabilityFuture().isDone()).isFalse();
    }

    @Test
    void testImmediatelyAvailableAfterPut() throws InterruptedException {
        FutureCompletingBlockingQueue queue = new FutureCompletingBlockingQueue();
        queue.put(0, new Object());
        Assertions.assertThat((boolean)queue.getAvailabilityFuture().isDone()).isTrue();
    }

    @Test
    void testFutureBecomesAvailableAfterPut() throws InterruptedException {
        FutureCompletingBlockingQueue queue = new FutureCompletingBlockingQueue();
        CompletableFuture future = queue.getAvailabilityFuture();
        queue.put(0, new Object());
        Assertions.assertThat((boolean)future.isDone()).isTrue();
    }

    @Test
    void testUnavailableWhenBecomesEmpty() throws InterruptedException {
        FutureCompletingBlockingQueue queue = new FutureCompletingBlockingQueue();
        queue.put(0, new Object());
        queue.poll();
        Assertions.assertThat((boolean)queue.getAvailabilityFuture().isDone()).isFalse();
    }

    @Test
    void testAvailableAfterNotifyAvailable() throws InterruptedException {
        FutureCompletingBlockingQueue queue = new FutureCompletingBlockingQueue();
        queue.notifyAvailable();
        Assertions.assertThat((boolean)queue.getAvailabilityFuture().isDone()).isTrue();
    }

    @Test
    void testFutureBecomesAvailableAfterNotifyAvailable() throws InterruptedException {
        FutureCompletingBlockingQueue queue = new FutureCompletingBlockingQueue();
        CompletableFuture future = queue.getAvailabilityFuture();
        queue.notifyAvailable();
        Assertions.assertThat((boolean)future.isDone()).isTrue();
    }

    @Test
    void testPollResetsAvailability() throws InterruptedException {
        FutureCompletingBlockingQueue queue = new FutureCompletingBlockingQueue();
        queue.notifyAvailable();
        CompletableFuture beforePoll = queue.getAvailabilityFuture();
        queue.poll();
        CompletableFuture afterPoll = queue.getAvailabilityFuture();
        Assertions.assertThat((boolean)beforePoll.isDone()).isTrue();
        Assertions.assertThat((boolean)afterPoll.isDone()).isFalse();
    }

    @Test
    void testQueueUsesShortCircuitFuture() {
        Assertions.assertThat((CompletableFuture)FutureCompletingBlockingQueue.AVAILABLE).isSameAs((Object)AvailabilityProvider.AVAILABLE);
    }
}

