/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.async.queue;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue;
import org.apache.flink.streaming.api.operators.async.queue.QueueUtil;
import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
import org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
class StreamElementQueueTest {
    private final AsyncDataStream.OutputMode outputMode;

    @Parameters
    private static Collection<AsyncDataStream.OutputMode> outputModes() {
        return Arrays.asList(AsyncDataStream.OutputMode.ORDERED, AsyncDataStream.OutputMode.UNORDERED);
    }

    StreamElementQueueTest(AsyncDataStream.OutputMode outputMode) {
        this.outputMode = (AsyncDataStream.OutputMode)Preconditions.checkNotNull((Object)outputMode);
    }

    private StreamElementQueue<Integer> createStreamElementQueue(int capacity) {
        switch (this.outputMode) {
            case ORDERED: {
                return new OrderedStreamElementQueue(capacity);
            }
            case UNORDERED: {
                return new UnorderedStreamElementQueue(capacity);
            }
        }
        throw new IllegalStateException("Unknown output mode: " + String.valueOf(this.outputMode));
    }

    @TestTemplate
    void testPut() {
        StreamElementQueue<Integer> queue = this.createStreamElementQueue(2);
        Watermark watermark = new Watermark(0L);
        StreamRecord streamRecord = new StreamRecord((Object)42, 1L);
        Assertions.assertThat((Optional)queue.tryPut((StreamElement)watermark)).isPresent();
        Assertions.assertThat((Optional)queue.tryPut((StreamElement)streamRecord)).isPresent();
        Assertions.assertThat((int)queue.size()).isEqualTo(2);
        Assertions.assertThat((Optional)queue.tryPut((StreamElement)new Watermark(2L))).isNotPresent();
        Assertions.assertThat((List)queue.values()).containsExactly((Object[])new StreamElement[]{watermark, streamRecord});
    }

    @TestTemplate
    void testPop() {
        StreamElementQueue<Integer> queue = this.createStreamElementQueue(2);
        QueueUtil.putSuccessfully(queue, (StreamElement)new Watermark(0L));
        ResultFuture<Integer> recordResult = QueueUtil.putSuccessfully(queue, (StreamElement)new StreamRecord((Object)42, 1L));
        Assertions.assertThat((int)queue.size()).isEqualTo(2);
        Assertions.assertThat(QueueUtil.popCompleted(queue)).containsExactly((Object[])new StreamElement[]{new Watermark(0L)});
        Assertions.assertThat((int)queue.size()).isOne();
        recordResult.complete(Collections.singleton(43));
        Assertions.assertThat(QueueUtil.popCompleted(queue)).containsExactly((Object[])new StreamElement[]{new StreamRecord((Object)43, 1L)});
        Assertions.assertThat((int)queue.size()).isZero();
        Assertions.assertThat((boolean)queue.isEmpty()).isTrue();
    }

    @TestTemplate
    void testPutOnFull() {
        StreamElementQueue<Integer> queue = this.createStreamElementQueue(1);
        ResultFuture<Integer> resultFuture = QueueUtil.putSuccessfully(queue, (StreamElement)new StreamRecord((Object)42, 0L));
        Assertions.assertThat((int)queue.size()).isOne();
        QueueUtil.putUnsuccessfully(queue, (StreamElement)new StreamRecord((Object)43, 1L));
        resultFuture.complete(Collections.singleton(1764));
        Assertions.assertThat(QueueUtil.popCompleted(queue)).containsExactly((Object[])new StreamElement[]{new StreamRecord((Object)1764, 0L)});
        QueueUtil.putSuccessfully(queue, (StreamElement)new StreamRecord((Object)43, 1L));
    }

    @TestTemplate
    void testWatermarkOnly() {
        StreamElementQueue<Integer> queue = this.createStreamElementQueue(2);
        QueueUtil.putSuccessfully(queue, (StreamElement)new Watermark(2L));
        QueueUtil.putSuccessfully(queue, (StreamElement)new Watermark(5L));
        Assertions.assertThat((int)queue.size()).isEqualTo(2);
        Assertions.assertThat((boolean)queue.isEmpty()).isFalse();
        Assertions.assertThat(QueueUtil.popCompleted(queue)).containsExactly((Object[])new StreamElement[]{new Watermark(2L), new Watermark(5L)});
        Assertions.assertThat((int)queue.size()).isZero();
        Assertions.assertThat(QueueUtil.popCompleted(queue)).isEmpty();
    }
}

