/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.api.datastream;

import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.collect.CollectSinkFunction;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.Assertions;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class DataStreamCollectTestITCase
extends TestLogger {
    @Test
    public void testStreamingCollect() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        DataStreamSource stream = env.fromData((Object[])new Integer[]{1, 2, 3});
        try (CloseableIterator iterator = stream.executeAndCollect();){
            List results = CollectionUtil.iteratorToList((Iterator)iterator);
            Assert.assertThat((String)"Failed to collect all data from the stream", (Object)results, (Matcher)Matchers.containsInAnyOrder((Object[])new Integer[]{1, 2, 3}));
        }
    }

    @Test
    public void testStreamingCollectAndLimit() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        DataStreamSource stream = env.fromData((Object[])new Integer[]{1, 2, 3, 4, 5});
        List results = stream.executeAndCollect(1);
        Assert.assertEquals((String)"Failed to collect the correct number of elements from the stream", (long)1L, (long)results.size());
    }

    @Test
    public void testBoundedCollect() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        DataStreamSource stream = env.fromData((Object[])new Integer[]{1, 2, 3});
        Configuration config = new Configuration();
        config.set(ExecutionOptions.RUNTIME_MODE, (Object)RuntimeExecutionMode.BATCH);
        env.configure((ReadableConfig)config, DataStreamCollectTestITCase.class.getClassLoader());
        try (CloseableIterator iterator = stream.executeAndCollect();){
            List results = CollectionUtil.iteratorToList((Iterator)iterator);
            Assert.assertThat((String)"Failed to collect all data from the stream", (Object)results, (Matcher)Matchers.containsInAnyOrder((Object[])new Integer[]{1, 2, 3}));
        }
    }

    @Test
    public void testBoundedCollectAndLimit() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        Configuration config = new Configuration();
        config.set(ExecutionOptions.RUNTIME_MODE, (Object)RuntimeExecutionMode.BATCH);
        env.configure((ReadableConfig)config, DataStreamCollectTestITCase.class.getClassLoader());
        DataStreamSource stream = env.fromData((Object[])new Integer[]{1, 2, 3, 4, 5});
        List results = stream.executeAndCollect(1);
        Assert.assertEquals((String)"Failed to collect the correct number of elements from the stream", (long)1L, (long)results.size());
    }

    @Test
    public void testAsyncCollectWithSinkConfigs() {
        Configuration configuration = new Configuration();
        configuration.set(CollectSinkOperatorFactory.SOCKET_TIMEOUT, (Object)Duration.ofMillis(2L));
        configuration.set(CollectSinkOperatorFactory.MAX_BATCH_SIZE, (Object)new MemorySize(3L));
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)configuration);
        DataStreamSource stream = env.fromData((Object[])new Integer[]{1, 2, 3, 4, 5});
        stream.collectAsync();
        List transformations = env.getTransformations();
        Assert.assertEquals((long)1L, (long)transformations.size());
        LegacySinkTransformation transformation = (LegacySinkTransformation)transformations.get(transformations.size() - 1);
        CollectSinkOperatorFactory collectSinkOperatorFactory = (CollectSinkOperatorFactory)transformation.getOperatorFactory();
        CollectSinkFunction collectSinkFunction = (CollectSinkFunction)((CollectSinkOperator)collectSinkOperatorFactory.getOperator()).getUserFunction();
        Assert.assertEquals((long)2L, (long)collectSinkOperatorFactory.getSocketTimeoutMillis());
        Assert.assertEquals((long)3L, (long)collectSinkFunction.getMaxBytesPerBatch());
    }

    @Test
    public void testAsyncCollect() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource stream1 = env.fromData((Object[])new Integer[]{1, 2, 3, 4, 5});
        DataStreamSource stream2 = env.fromData((Object[])new Integer[]{6, 7, 8, 9, 10});
        try (CloseableIterator iterator1 = stream1.collectAsync();
             CloseableIterator iterator2 = stream2.collectAsync();){
            int x;
            env.executeAsync();
            for (x = 1; x < 6; ++x) {
                Assertions.assertThat((boolean)iterator1.hasNext()).isTrue();
                Assertions.assertThat((Integer)((Integer)iterator1.next())).isEqualTo(x);
            }
            for (x = 6; x < 11; ++x) {
                Assertions.assertThat((boolean)iterator2.hasNext()).isTrue();
                Assertions.assertThat((Integer)((Integer)iterator2.next())).isEqualTo(x);
            }
        }
    }

    @Test
    public void testAsyncCollectWithCollector() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream.Collector collector1 = new DataStream.Collector();
        DataStream.Collector collector2 = new DataStream.Collector();
        DataStreamCollectTestITCase.defineWorkflowAndApplySink(env, stream -> stream.collectAsync(collector1), stream -> stream.collectAsync(collector2));
        try (CloseableIterator iterator1 = collector1.getOutput();
             CloseableIterator iterator2 = collector2.getOutput();){
            int x;
            env.executeAsync();
            for (x = 1; x < 6; ++x) {
                Assertions.assertThat((boolean)iterator1.hasNext()).isTrue();
                Assertions.assertThat((Integer)((Integer)iterator1.next())).isEqualTo(x);
            }
            for (x = 6; x < 11; ++x) {
                Assertions.assertThat((boolean)iterator2.hasNext()).isTrue();
                Assertions.assertThat((Integer)((Integer)iterator2.next())).isEqualTo(x);
            }
        }
    }

    private static void defineWorkflowAndApplySink(StreamExecutionEnvironment env, Consumer<DataStream<Integer>> sink1Applier, Consumer<DataStream<Integer>> sink2Applier) {
        DataStreamSource stream1 = env.fromData((Object[])new Integer[]{1, 2, 3, 4, 5});
        DataStreamSource stream2 = env.fromData((Object[])new Integer[]{6, 7, 8, 9, 10});
        sink1Applier.accept((DataStream<Integer>)stream1);
        sink2Applier.accept((DataStream<Integer>)stream2);
    }
}

