/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.datagen.source.TestDataGenerators;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.test.util.AbstractTestBaseJUnit4;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;

public class SinkITCase
extends AbstractTestBaseJUnit4 {
    static final List<Integer> SOURCE_DATA = Arrays.asList(895, 127, 148, 161, 148, 662, 822, 491, 275, 122, 850, 630, 682, 765, 434, 970, 714, 795, 288, 422);
    static final int STREAMING_SOURCE_SEND_ELEMENTS_NUM = SOURCE_DATA.size() * 2;
    static final List<String> EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE = SOURCE_DATA.stream().flatMap(x -> Collections.nCopies(2, Tuple3.of((Object)x, null, (Object)Long.MIN_VALUE).toString()).stream()).collect(Collectors.toList());
    static final List<String> EXPECTED_COMMITTED_DATA_IN_BATCH_MODE = SOURCE_DATA.stream().map(x -> Tuple3.of((Object)x, null, (Object)Long.MIN_VALUE).toString()).collect(Collectors.toList());
    static final List<String> EXPECTED_GLOBAL_COMMITTED_DATA_IN_STREAMING_MODE = SOURCE_DATA.stream().flatMap(x -> Collections.nCopies(2, Tuple3.of((Object)x, null, (Object)Long.MIN_VALUE).toString()).stream()).collect(Collectors.toList());
    static final List<String> EXPECTED_GLOBAL_COMMITTED_DATA_IN_BATCH_MODE = Collections.singletonList(SOURCE_DATA.stream().map(x -> Tuple3.of((Object)x, null, (Object)Long.MIN_VALUE).toString()).sorted().collect(Collectors.joining("+")));
    static final Queue<String> COMMIT_QUEUE = new ConcurrentLinkedQueue<String>();
    static final Queue<String> GLOBAL_COMMIT_QUEUE = new ConcurrentLinkedQueue<String>();
    static final BooleanSupplier COMMIT_QUEUE_RECEIVE_ALL_DATA = (BooleanSupplier & Serializable)() -> COMMIT_QUEUE.size() == STREAMING_SOURCE_SEND_ELEMENTS_NUM;
    static final BooleanSupplier GLOBAL_COMMIT_QUEUE_RECEIVE_ALL_DATA = (BooleanSupplier & Serializable)() -> SinkITCase.getSplittedGlobalCommittedData().size() == STREAMING_SOURCE_SEND_ELEMENTS_NUM;
    static final BooleanSupplier BOTH_QUEUE_RECEIVE_ALL_DATA = (BooleanSupplier & Serializable)() -> COMMIT_QUEUE_RECEIVE_ALL_DATA.getAsBoolean() && GLOBAL_COMMIT_QUEUE_RECEIVE_ALL_DATA.getAsBoolean();

    @Before
    public void init() {
        COMMIT_QUEUE.clear();
        GLOBAL_COMMIT_QUEUE.clear();
    }

    @Test
    public void writerAndCommitterAndGlobalCommitterExecuteInStreamingMode() throws Exception {
        StreamExecutionEnvironment env = this.buildStreamEnv();
        DataStreamSource stream = env.fromSource((Source)TestDataGenerators.fromDataWithSnapshotsLatch(SOURCE_DATA, (TypeInformation)Types.INT, (BooleanSupplier)BOTH_QUEUE_RECEIVE_ALL_DATA), WatermarkStrategy.noWatermarks(), "Test Source");
        stream.sinkTo((Sink)TestSink.newBuilder().setDefaultCommitter((Supplier<Queue> & Serializable)() -> COMMIT_QUEUE).setGlobalCommitter((Supplier<Queue> & Serializable)() -> GLOBAL_COMMIT_QUEUE).build());
        env.execute();
        GLOBAL_COMMIT_QUEUE.remove("end of input");
        MatcherAssert.assertThat(COMMIT_QUEUE, (Matcher)Matchers.containsInAnyOrder((Object[])EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.toArray()));
        MatcherAssert.assertThat(SinkITCase.getSplittedGlobalCommittedData(), (Matcher)Matchers.containsInAnyOrder((Object[])EXPECTED_GLOBAL_COMMITTED_DATA_IN_STREAMING_MODE.toArray()));
    }

    @Test
    public void writerAndCommitterAndGlobalCommitterExecuteInBatchMode() throws Exception {
        StreamExecutionEnvironment env = this.buildBatchEnv();
        env.fromData(SOURCE_DATA).sinkTo((Sink)TestSink.newBuilder().setDefaultCommitter((Supplier<Queue> & Serializable)() -> COMMIT_QUEUE).setGlobalCommitter((Supplier<Queue> & Serializable)() -> GLOBAL_COMMIT_QUEUE).build());
        env.execute();
        MatcherAssert.assertThat(COMMIT_QUEUE, (Matcher)Matchers.containsInAnyOrder((Object[])EXPECTED_COMMITTED_DATA_IN_BATCH_MODE.toArray()));
        MatcherAssert.assertThat(GLOBAL_COMMIT_QUEUE, (Matcher)Matchers.containsInAnyOrder((Object[])EXPECTED_GLOBAL_COMMITTED_DATA_IN_BATCH_MODE.toArray()));
    }

    @Test
    public void writerAndCommitterExecuteInStreamingMode() throws Exception {
        StreamExecutionEnvironment env = this.buildStreamEnv();
        DataStreamSource stream = env.fromSource((Source)TestDataGenerators.fromDataWithSnapshotsLatch(SOURCE_DATA, (TypeInformation)Types.INT, (BooleanSupplier)COMMIT_QUEUE_RECEIVE_ALL_DATA), WatermarkStrategy.noWatermarks(), "Test Source");
        stream.sinkTo((Sink)TestSink.newBuilder().setDefaultCommitter((Supplier<Queue> & Serializable)() -> COMMIT_QUEUE).build());
        env.execute();
        MatcherAssert.assertThat(COMMIT_QUEUE, (Matcher)Matchers.containsInAnyOrder((Object[])EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.toArray()));
    }

    @Test
    public void writerAndCommitterExecuteInBatchMode() throws Exception {
        StreamExecutionEnvironment env = this.buildBatchEnv();
        env.fromData(SOURCE_DATA).sinkTo((Sink)TestSink.newBuilder().setDefaultCommitter((Supplier<Queue> & Serializable)() -> COMMIT_QUEUE).build());
        env.execute();
        MatcherAssert.assertThat(COMMIT_QUEUE, (Matcher)Matchers.containsInAnyOrder((Object[])EXPECTED_COMMITTED_DATA_IN_BATCH_MODE.toArray()));
    }

    @Test
    public void writerAndGlobalCommitterExecuteInStreamingMode() throws Exception {
        StreamExecutionEnvironment env = this.buildStreamEnv();
        DataStreamSource stream = env.fromSource((Source)TestDataGenerators.fromDataWithSnapshotsLatch(SOURCE_DATA, (TypeInformation)Types.INT, (BooleanSupplier)GLOBAL_COMMIT_QUEUE_RECEIVE_ALL_DATA), WatermarkStrategy.noWatermarks(), "Test Source");
        stream.sinkTo((Sink)TestSink.newBuilder().setCommittableSerializer((SimpleVersionedSerializer)TestSink.StringCommittableSerializer.INSTANCE).setGlobalCommitter((Supplier<Queue> & Serializable)() -> GLOBAL_COMMIT_QUEUE).build());
        env.execute();
        GLOBAL_COMMIT_QUEUE.remove("end of input");
        MatcherAssert.assertThat(SinkITCase.getSplittedGlobalCommittedData(), (Matcher)Matchers.containsInAnyOrder((Object[])EXPECTED_GLOBAL_COMMITTED_DATA_IN_STREAMING_MODE.toArray()));
    }

    @Test
    public void writerAndGlobalCommitterExecuteInBatchMode() throws Exception {
        StreamExecutionEnvironment env = this.buildBatchEnv();
        env.fromData(SOURCE_DATA).sinkTo((Sink)TestSink.newBuilder().setCommittableSerializer((SimpleVersionedSerializer)TestSink.StringCommittableSerializer.INSTANCE).setGlobalCommitter((Supplier<Queue> & Serializable)() -> GLOBAL_COMMIT_QUEUE).build());
        env.execute();
        MatcherAssert.assertThat(GLOBAL_COMMIT_QUEUE, (Matcher)Matchers.containsInAnyOrder((Object[])EXPECTED_GLOBAL_COMMITTED_DATA_IN_BATCH_MODE.toArray()));
    }

    private static List<String> getSplittedGlobalCommittedData() {
        return GLOBAL_COMMIT_QUEUE.stream().flatMap(x -> Arrays.stream(x.split("\\+"))).collect(Collectors.toList());
    }

    private StreamExecutionEnvironment buildStreamEnv() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.enableCheckpointing(100L);
        return env;
    }

    private StreamExecutionEnvironment buildBatchEnv() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        return env;
    }
}

