/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2;
import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.test.util.AbstractTestBase;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;

@Deprecated
public class SinkV2DeprecatedITCase
extends AbstractTestBase {
    static final List<Integer> SOURCE_DATA = Arrays.asList(895, 127, 148, 161, 148, 662, 822, 491, 275, 122, 850, 630, 682, 765, 434, 970, 714, 795, 288, 422);
    static final int STREAMING_SOURCE_SEND_ELEMENTS_NUM = SOURCE_DATA.size() * 2;
    static final List<String> EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE = SOURCE_DATA.stream().flatMap(x -> Collections.nCopies(2, Tuple3.of((Object)x, null, (Object)Long.MIN_VALUE).toString()).stream()).collect(Collectors.toList());
    static final List<String> EXPECTED_COMMITTED_DATA_IN_BATCH_MODE = SOURCE_DATA.stream().map(x -> Tuple3.of((Object)x, null, (Object)Long.MIN_VALUE).toString()).collect(Collectors.toList());
    static final Queue<Committer.CommitRequest<String>> COMMIT_QUEUE = new ConcurrentLinkedQueue<Committer.CommitRequest<String>>();
    static final BooleanSupplier COMMIT_QUEUE_RECEIVE_ALL_DATA = (BooleanSupplier & Serializable)() -> COMMIT_QUEUE.size() == STREAMING_SOURCE_SEND_ELEMENTS_NUM;

    @Before
    public void init() {
        COMMIT_QUEUE.clear();
    }

    @Test
    public void writerAndCommitterExecuteInStreamingMode() throws Exception {
        StreamExecutionEnvironment env = this.buildStreamEnv();
        FiniteTestSource source = new FiniteTestSource(COMMIT_QUEUE_RECEIVE_ALL_DATA, SOURCE_DATA);
        env.addSource((SourceFunction)source, (TypeInformation)IntegerTypeInfo.INT_TYPE_INFO).sinkTo((Sink)TestSinkV2.newBuilder().setDefaultCommitter((Supplier<Queue> & Serializable)() -> COMMIT_QUEUE).build());
        env.execute();
        MatcherAssert.assertThat(COMMIT_QUEUE.stream().map(Committer.CommitRequest::getCommittable).collect(Collectors.toList()), (Matcher)Matchers.containsInAnyOrder((Object[])EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.toArray()));
    }

    @Test
    public void writerAndCommitterExecuteInBatchMode() throws Exception {
        StreamExecutionEnvironment env = this.buildBatchEnv();
        env.fromData(SOURCE_DATA).sinkTo((Sink)TestSinkV2.newBuilder().setDefaultCommitter((Supplier<Queue> & Serializable)() -> COMMIT_QUEUE).build());
        env.execute();
        MatcherAssert.assertThat(COMMIT_QUEUE.stream().map(Committer.CommitRequest::getCommittable).collect(Collectors.toList()), (Matcher)Matchers.containsInAnyOrder((Object[])EXPECTED_COMMITTED_DATA_IN_BATCH_MODE.toArray()));
    }

    private StreamExecutionEnvironment buildStreamEnv() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.enableCheckpointing(100L);
        return env;
    }

    private StreamExecutionEnvironment buildBatchEnv() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        return env;
    }
}

