/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.function.Predicate;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
abstract class SinkTransformationTranslatorITCaseBase<SinkT> {
    @Parameter
    protected RuntimeExecutionMode runtimeExecutionMode;
    static final String NAME = "FileSink";
    static final String SLOT_SHARE_GROUP = "FileGroup";
    static final String UID = "FileUid";
    static final int PARALLELISM = 2;

    SinkTransformationTranslatorITCaseBase() {
    }

    @Parameters(name="Execution Mode: {0}")
    private static Collection<Object> data() {
        return Arrays.asList(RuntimeExecutionMode.STREAMING, RuntimeExecutionMode.BATCH);
    }

    abstract SinkT simpleSink();

    abstract SinkT sinkWithCommitter();

    abstract DataStreamSink<Integer> sinkTo(DataStream<Integer> var1, SinkT var2);

    @TestTemplate
    void generateWriterTopology() {
        StreamGraph streamGraph = this.buildGraph(this.simpleSink(), this.runtimeExecutionMode);
        StreamNode sourceNode = this.findNodeName(streamGraph, node -> node.contains("Source"));
        StreamNode writerNode = this.findWriter(streamGraph);
        Assertions.assertThat((Collection)streamGraph.getStreamNodes()).hasSize(2);
        this.validateTopology(sourceNode, IntSerializer.class, writerNode, SinkWriterOperatorFactory.class, 2, -1);
    }

    @TestTemplate
    void generateWriterCommitterTopology() {
        StreamGraph streamGraph = this.buildGraph(this.sinkWithCommitter(), this.runtimeExecutionMode);
        StreamNode sourceNode = this.findNodeName(streamGraph, node -> node.contains("Source"));
        StreamNode writerNode = this.findWriter(streamGraph);
        this.validateTopology(sourceNode, IntSerializer.class, writerNode, SinkWriterOperatorFactory.class, 2, -1);
        StreamNode committerNode = this.findNodeName(streamGraph, name -> name.contains("Committer"));
        Assertions.assertThat((Collection)streamGraph.getStreamNodes()).hasSize(3);
        SinkTransformationTranslatorITCaseBase.assertNoUnalignedOutput(writerNode);
        this.validateTopology(writerNode, SimpleVersionedSerializerTypeSerializerProxy.class, committerNode, CommitterOperatorFactory.class, 2, -1);
    }

    @TestTemplate
    void testParallelismConfigured() {
        this.testParallelismConfiguredInternal(true);
        this.testParallelismConfiguredInternal(false);
    }

    private void testParallelismConfiguredInternal(boolean setSinkParallelism) {
        StreamGraph streamGraph = this.buildGraph(this.sinkWithCommitter(), this.runtimeExecutionMode, setSinkParallelism);
        StreamNode writerNode = this.findWriter(streamGraph);
        StreamNode committerNode = this.findCommitter(streamGraph);
        Assertions.assertThat((boolean)writerNode.isParallelismConfigured()).isEqualTo(setSinkParallelism);
        Assertions.assertThat((boolean)committerNode.isParallelismConfigured()).isEqualTo(setSinkParallelism);
    }

    StreamNode findWriter(StreamGraph streamGraph) {
        return this.findNodeName(streamGraph, name -> name.contains("Writer") && !name.contains("Committer"));
    }

    StreamNode findCommitter(StreamGraph streamGraph) {
        return this.findNodeName(streamGraph, name -> name.contains("Committer") && !name.contains("Global Committer"));
    }

    StreamNode findGlobalCommitter(StreamGraph streamGraph) {
        return this.findNodeName(streamGraph, name -> name.contains("Global Committer"));
    }

    @TestTemplate
    void throwExceptionWithoutSettingUid() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Configuration config = new Configuration();
        config.set(ExecutionOptions.RUNTIME_MODE, (Object)this.runtimeExecutionMode);
        env.configure((ReadableConfig)config, this.getClass().getClassLoader());
        env.getConfig().disableAutoGeneratedUIDs();
        this.sinkTo((DataStream<Integer>)env.fromElements((Object[])new Integer[]{1, 2}), this.simpleSink());
        Assertions.assertThatThrownBy(() -> ((StreamExecutionEnvironment)env).getStreamGraph()).isInstanceOf(IllegalStateException.class);
    }

    @TestTemplate
    void disableOperatorChain() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource src = env.fromElements((Object[])new Integer[]{1, 2});
        DataStreamSink dataStreamSink = this.sinkTo((DataStream<Integer>)src, this.sinkWithCommitter()).name(NAME);
        dataStreamSink.disableChaining();
        StreamGraph streamGraph = env.getStreamGraph();
        StreamNode writer = this.findWriter(streamGraph);
        StreamNode committer = this.findCommitter(streamGraph);
        Assertions.assertThat((Comparable)writer.getOperatorFactory().getChainingStrategy()).isEqualTo((Object)ChainingStrategy.NEVER);
        Assertions.assertThat((Comparable)committer.getOperatorFactory().getChainingStrategy()).isEqualTo((Object)ChainingStrategy.NEVER);
    }

    void validateTopology(StreamNode src, Class<?> srcOutTypeInfo, StreamNode dest, Class<? extends StreamOperatorFactory> operatorFactoryClass, int expectedParallelism, int expectedMaxParallelism) {
        StreamEdge srcOutEdge = (StreamEdge)src.getOutEdges().get(0);
        Assertions.assertThat((int)srcOutEdge.getTargetId()).isEqualTo(dest.getId());
        Assertions.assertThat((Object)src.getTypeSerializerOut()).isInstanceOf(srcOutTypeInfo);
        StreamEdge destInputEdge = (StreamEdge)dest.getInEdges().get(0);
        Assertions.assertThat((int)destInputEdge.getTargetId()).isEqualTo(dest.getId());
        Assertions.assertThat((Object)dest.getTypeSerializersIn()[0]).isInstanceOf(srcOutTypeInfo);
        Assertions.assertThat((String)dest.getOperatorName()).isNotEqualTo((Object)src.getOperatorName());
        Assertions.assertThat((String)dest.getTransformationUID()).isNotEqualTo((Object)src.getTransformationUID());
        Assertions.assertThat((Object)dest.getOperatorFactory()).isInstanceOf(operatorFactoryClass);
        Assertions.assertThat((int)dest.getParallelism()).isEqualTo(expectedParallelism);
        Assertions.assertThat((int)dest.getMaxParallelism()).isEqualTo(expectedMaxParallelism);
        Assertions.assertThat((Comparable)dest.getOperatorFactory().getChainingStrategy()).isEqualTo((Object)ChainingStrategy.ALWAYS);
        Assertions.assertThat((String)dest.getSlotSharingGroup()).isEqualTo(SLOT_SHARE_GROUP);
    }

    protected static void assertNoUnalignedOutput(StreamNode src) {
        Assertions.assertThat((List)src.getOutEdges()).allMatch(e -> !e.supportsUnalignedCheckpoints());
    }

    StreamGraph buildGraph(SinkT sink, RuntimeExecutionMode runtimeExecutionMode) {
        return this.buildGraph(sink, runtimeExecutionMode, true);
    }

    StreamGraph buildGraph(SinkT sink, RuntimeExecutionMode runtimeExecutionMode, boolean setSinkParallelism) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Configuration config = new Configuration();
        config.set(ExecutionOptions.RUNTIME_MODE, (Object)runtimeExecutionMode);
        env.configure((ReadableConfig)config, this.getClass().getClassLoader());
        DataStreamSource src = env.fromElements((Object[])new Integer[]{1, 2});
        DataStreamSink<Integer> dataStreamSink = this.sinkTo((DataStream<Integer>)src.rebalance(), sink);
        this.setSinkProperty(dataStreamSink, setSinkParallelism);
        env.getExecutionPlan();
        return env.getStreamGraph();
    }

    private void setSinkProperty(DataStreamSink<Integer> dataStreamSink, boolean setSinkParallelism) {
        dataStreamSink.name(NAME);
        dataStreamSink.uid(UID);
        if (setSinkParallelism) {
            dataStreamSink.setParallelism(2);
        }
        dataStreamSink.slotSharingGroup(SLOT_SHARE_GROUP);
    }

    StreamNode findNodeName(StreamGraph streamGraph, Predicate<String> predicate) {
        return streamGraph.getStreamNodes().stream().filter(node -> predicate.test(node.getOperatorName())).findFirst().orElseThrow(() -> new IllegalStateException("Can not find the node"));
    }
}

