/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.io.File;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.util.ratelimit.GatedRateLimiter;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.ExternalizedCheckpointRetention;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.test.junit5.InjectClusterClient;
import org.apache.flink.test.junit5.InjectMiniCluster;
import org.apache.flink.test.util.AbstractTestBase;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.AbstractListAssert;
import org.assertj.core.api.Assertions;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

public class SinkV2ITCase
extends AbstractTestBase {
    static final List<Integer> SOURCE_DATA = Arrays.asList(895, 127, 148, 161, 148, 662, 822, 491, 275, 122, 850, 630, 682, 765, 434, 970, 714, 795, 288, 422);
    static final int STREAMING_SOURCE_SEND_ELEMENTS_NUM = SOURCE_DATA.size() * 2;
    static final List<String> EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE = SOURCE_DATA.stream().flatMap(x -> Collections.nCopies(2, Tuple3.of((Object)x, null, (Object)Long.MIN_VALUE).toString()).stream()).collect(Collectors.toList());
    static final List<String> EXPECTED_COMMITTED_DATA_IN_BATCH_MODE = SOURCE_DATA.stream().map(x -> Tuple3.of((Object)x, null, (Object)Long.MIN_VALUE).toString()).collect(Collectors.toList());
    static final Queue<Committer.CommitRequest<String>> COMMIT_QUEUE = new ConcurrentLinkedQueue<Committer.CommitRequest<String>>();
    static final BooleanSupplier COMMIT_QUEUE_RECEIVE_ALL_DATA = (BooleanSupplier & Serializable)() -> COMMIT_QUEUE.size() == STREAMING_SOURCE_SEND_ELEMENTS_NUM;

    @BeforeEach
    public void init() {
        COMMIT_QUEUE.clear();
    }

    @Test
    public void writerAndCommitterExecuteInStreamingMode() throws Exception {
        StreamExecutionEnvironment env = this.buildStreamEnv();
        FiniteTestSource source = new FiniteTestSource(COMMIT_QUEUE_RECEIVE_ALL_DATA, SOURCE_DATA);
        env.addSource((SourceFunction)source, (TypeInformation)IntegerTypeInfo.INT_TYPE_INFO).keyBy((KeySelector & Serializable)value -> value).sinkTo((Sink)TestSinkV2.newBuilder().setDefaultCommitter((Supplier<Queue> & Serializable)() -> COMMIT_QUEUE).build());
        this.executeAndVerifyStreamGraph(env);
        MatcherAssert.assertThat(COMMIT_QUEUE.stream().map(Committer.CommitRequest::getCommittable).collect(Collectors.toList()), (Matcher)Matchers.containsInAnyOrder((Object[])EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.toArray()));
    }

    @Test
    public void writerAndPrecommitToplogyAndCommitterExecuteInStreamingMode() throws Exception {
        StreamExecutionEnvironment env = this.buildStreamEnv();
        FiniteTestSource source = new FiniteTestSource(COMMIT_QUEUE_RECEIVE_ALL_DATA, SOURCE_DATA);
        env.addSource((SourceFunction)source, (TypeInformation)IntegerTypeInfo.INT_TYPE_INFO).keyBy((KeySelector & Serializable)value -> value).sinkTo((Sink)TestSinkV2.newBuilder().setDefaultCommitter((Supplier<Queue> & Serializable)() -> COMMIT_QUEUE).setWithPreCommitTopology(true).build());
        this.executeAndVerifyStreamGraph(env);
        MatcherAssert.assertThat(COMMIT_QUEUE.stream().map(Committer.CommitRequest::getCommittable).collect(Collectors.toList()), (Matcher)Matchers.containsInAnyOrder((Object[])EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.stream().map(s -> s + "Transformed").toArray()));
    }

    @Test
    public void writerAndCommitterExecuteInBatchMode() throws Exception {
        StreamExecutionEnvironment env = this.buildBatchEnv();
        env.fromData(SOURCE_DATA).rebalance().sinkTo((Sink)TestSinkV2.newBuilder().setDefaultCommitter((Supplier<Queue> & Serializable)() -> COMMIT_QUEUE).build());
        this.executeAndVerifyStreamGraph(env);
        MatcherAssert.assertThat(COMMIT_QUEUE.stream().map(Committer.CommitRequest::getCommittable).collect(Collectors.toList()), (Matcher)Matchers.containsInAnyOrder((Object[])EXPECTED_COMMITTED_DATA_IN_BATCH_MODE.toArray()));
    }

    @Test
    public void writerAndPrecommitToplogyAndCommitterExecuteInBatchMode() throws Exception {
        StreamExecutionEnvironment env = this.buildBatchEnv();
        env.fromData(SOURCE_DATA).rebalance().sinkTo((Sink)TestSinkV2.newBuilder().setDefaultCommitter((Supplier<Queue> & Serializable)() -> COMMIT_QUEUE).setWithPreCommitTopology(true).build());
        this.executeAndVerifyStreamGraph(env);
        MatcherAssert.assertThat(COMMIT_QUEUE.stream().map(Committer.CommitRequest::getCommittable).collect(Collectors.toList()), (Matcher)Matchers.containsInAnyOrder((Object[])EXPECTED_COMMITTED_DATA_IN_BATCH_MODE.stream().map(s -> s + "Transformed").toArray()));
    }

    @ParameterizedTest
    @CsvSource(value={"1, 2", "2, 1", "1, 1"})
    public void writerAndCommitterExecuteInStreamingModeWithScaling(int initialParallelism, int scaledParallelism, @TempDir File checkpointDir, @InjectMiniCluster MiniCluster miniCluster, @InjectClusterClient ClusterClient<?> clusterClient) throws Exception {
        TestSinkV2.DefaultCommitter committer = new TestSinkV2.DefaultCommitter((Supplier<Queue> & Serializable)() -> COMMIT_QUEUE);
        Configuration config = this.createConfigForScalingTest(checkpointDir, initialParallelism);
        JobID jobID = this.runStreamingWithScalingTest(config, true, committer, clusterClient);
        config.set(StateRecoveryOptions.SAVEPOINT_PATH, (Object)this.getCheckpointPath(miniCluster, jobID));
        config.set(CoreOptions.DEFAULT_PARALLELISM, (Object)scaledParallelism);
        this.runStreamingWithScalingTest(config, false, committer, clusterClient);
        MatcherAssert.assertThat(COMMIT_QUEUE.stream().map(Committer.CommitRequest::getCommittable).collect(Collectors.toList()), (Matcher)Matchers.containsInAnyOrder((Object[])SinkV2ITCase.duplicate(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE).toArray()));
    }

    private static List<String> duplicate(List<String> values) {
        return IntStream.range(0, 2).boxed().flatMap(i -> values.stream()).collect(Collectors.toList());
    }

    private JobID runStreamingWithScalingTest(Configuration config, boolean shouldMapperFail, TestSinkV2.DefaultCommitter committer, ClusterClient<?> clusterClient) throws Exception {
        StreamExecutionEnvironment env = this.buildStreamEnvWithCheckpointDir(config);
        Source<Integer, ?, ?> source = this.createStreamingSourceForScalingTest();
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "source").rebalance().map((MapFunction)new FailingCheckpointMapper(!shouldMapperFail)).sinkTo((Sink)TestSinkV2.newBuilder().setCommitter(committer).setWithPostCommitTopology(true).build());
        JobID jobId = (JobID)clusterClient.submitJob(env.getStreamGraph().getJobGraph()).get();
        clusterClient.requestJobResult(jobId).get();
        return jobId;
    }

    private String getCheckpointPath(MiniCluster miniCluster, JobID secondJobId) throws InterruptedException, ExecutionException, FlinkJobNotFoundException {
        Optional completedCheckpoint = CommonTestUtils.getLatestCompletedCheckpointPath((JobID)secondJobId, (MiniCluster)miniCluster);
        MatcherAssert.assertThat((Object)completedCheckpoint.isPresent(), (Matcher)Matchers.is((Object)true));
        return (String)completedCheckpoint.get();
    }

    private StreamExecutionEnvironment buildStreamEnv() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.enableCheckpointing(100L);
        return env;
    }

    private StreamExecutionEnvironment buildStreamEnvWithCheckpointDir(Configuration config) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)config);
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.enableCheckpointing(100L);
        return env;
    }

    private StreamExecutionEnvironment buildBatchEnv() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        return env;
    }

    private Configuration createConfigForScalingTest(File checkpointDir, int parallelism) {
        Configuration config = new Configuration();
        config.set(CoreOptions.DEFAULT_PARALLELISM, (Object)parallelism);
        config.set(StateBackendOptions.STATE_BACKEND, (Object)"hashmap");
        config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)checkpointDir.toURI().toString());
        config.set(CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION, (Object)ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
        config.set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, (Object)2000);
        config.set(RestartStrategyOptions.RESTART_STRATEGY, (Object)"disable");
        return config;
    }

    private void executeAndVerifyStreamGraph(StreamExecutionEnvironment env) throws Exception {
        StreamGraph streamGraph = env.getStreamGraph();
        this.assertNoUnalignedCheckpointInSink(streamGraph);
        this.assertUnalignedCheckpointInNonSink(streamGraph);
        env.execute(streamGraph);
    }

    private void assertNoUnalignedCheckpointInSink(StreamGraph streamGraph) {
        ((AbstractListAssert)((AbstractCollectionAssert)Assertions.assertThat((Collection)streamGraph.getStreamNodes()).filteredOn(t -> t.getOperatorName().contains("Sink"))).flatMap(StreamNode::getOutEdges).allMatch(e -> !e.supportsUnalignedCheckpoints())).isNotEmpty();
    }

    private void assertUnalignedCheckpointInNonSink(StreamGraph streamGraph) {
        ((AbstractListAssert)((AbstractCollectionAssert)Assertions.assertThat((Collection)streamGraph.getStreamNodes()).filteredOn(t -> !t.getOperatorName().contains("Sink"))).flatMap(StreamNode::getOutEdges).allMatch(StreamEdge::supportsUnalignedCheckpoints)).isNotEmpty();
    }

    private Source<Integer, ?, ?> createStreamingSourceForScalingTest() {
        RateLimiterStrategy & Serializable rateLimiterStrategy = (RateLimiterStrategy & Serializable)parallelism -> new BurstingRateLimiter(SOURCE_DATA.size() / 4, 2);
        return new DataGeneratorSource((GeneratorFunction & Serializable)l -> SOURCE_DATA.get(l.intValue() % SOURCE_DATA.size()), (long)SOURCE_DATA.size() * 2L, (RateLimiterStrategy)rateLimiterStrategy, (TypeInformation)IntegerTypeInfo.INT_TYPE_INFO);
    }

    private static class FailingCheckpointMapper
    implements MapFunction<Integer, Integer>,
    CheckpointListener {
        private static final AtomicBoolean failed = new AtomicBoolean(false);
        private long lastCheckpointId = 0L;
        private int emittedBetweenCheckpoint = 0;

        FailingCheckpointMapper(boolean failed) {
            FailingCheckpointMapper.failed.set(failed);
        }

        public Integer map(Integer value) {
            if (this.lastCheckpointId >= 1L && this.emittedBetweenCheckpoint > 0 && !failed.get()) {
                failed.set(true);
                throw new RuntimeException("Planned exception.");
            }
            ++this.emittedBetweenCheckpoint;
            return value;
        }

        public void notifyCheckpointComplete(long checkpointId) {
            this.lastCheckpointId = checkpointId;
            this.emittedBetweenCheckpoint = 0;
        }
    }

    private static class BurstingRateLimiter
    implements RateLimiter {
        private final RateLimiter rateLimiter;
        private final int numCheckpointCooldown;
        private int cooldown;

        public BurstingRateLimiter(int recordPerCycle, int numCheckpointCooldown) {
            this.rateLimiter = new GatedRateLimiter(recordPerCycle);
            this.numCheckpointCooldown = numCheckpointCooldown;
        }

        public CompletionStage<Void> acquire() {
            CompletionStage stage = this.rateLimiter.acquire();
            this.cooldown = this.numCheckpointCooldown;
            return stage;
        }

        public void notifyCheckpointComplete(long checkpointId) {
            if (this.cooldown-- <= 0) {
                this.rateLimiter.notifyCheckpointComplete(checkpointId);
            }
        }
    }
}

