/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.util.Collection;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.graph.SinkTransformationTranslatorITCaseBase;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
class SinkV1TransformationTranslatorITCase
extends SinkTransformationTranslatorITCaseBase<Sink<Integer, ?, ?, ?>> {
    SinkV1TransformationTranslatorITCase() {
    }

    @Override
    Sink<Integer, ?, ?, ?> simpleSink() {
        return TestSink.newBuilder().build();
    }

    @Override
    Sink<Integer, ?, ?, ?> sinkWithCommitter() {
        return TestSink.newBuilder().setDefaultCommitter().build();
    }

    @Override
    Sink<Integer, ?, ?, ?> sinkWithGlobalCommitter() {
        return TestSink.newBuilder().setDefaultCommitter().setDefaultGlobalCommitter().build();
    }

    @Override
    DataStreamSink<Integer> sinkTo(DataStream<Integer> stream, Sink<Integer, ?, ?, ?> sink) {
        return stream.sinkTo(sink);
    }

    @Override
    DataStreamSink<Integer> sinkTo(DataStream<Integer> stream, Sink<Integer, ?, ?, ?> sink, CustomSinkOperatorUidHashes hashes) {
        return stream.sinkTo(sink, hashes);
    }

    @TestTemplate
    void generateWriterCommitterGlobalCommitterTopology() {
        StreamGraph streamGraph = this.buildGraph(TestSink.newBuilder().setDefaultCommitter().setDefaultGlobalCommitter().build(), this.runtimeExecutionMode);
        StreamNode sourceNode = this.findNodeName(streamGraph, node -> node.contains("Source"));
        StreamNode writerNode = this.findWriter(streamGraph);
        StreamNode committerNode = this.findCommitter(streamGraph);
        this.validateTopology(sourceNode, IntSerializer.class, writerNode, SinkWriterOperatorFactory.class, 2, -1);
        SinkV1TransformationTranslatorITCase.assertNoUnalignedOutput(writerNode);
        if (this.runtimeExecutionMode == RuntimeExecutionMode.STREAMING) {
            Assertions.assertThat((Collection)streamGraph.getStreamNodes()).hasSize(4);
        } else {
            Assertions.assertThat((Collection)streamGraph.getStreamNodes()).hasSize(4);
            this.validateTopology(writerNode, SimpleVersionedSerializerTypeSerializerProxy.class, committerNode, CommitterOperatorFactory.class, 2, -1);
            SinkV1TransformationTranslatorITCase.assertNoUnalignedOutput(committerNode);
        }
        StreamNode globalCommitterNode = this.findGlobalCommitter(streamGraph);
        this.validateTopology(committerNode, SimpleVersionedSerializerTypeSerializerProxy.class, globalCommitterNode, SimpleOperatorFactory.class, 1, 1);
    }

    @TestTemplate
    void generateWriterGlobalCommitterTopology() {
        StreamGraph streamGraph = this.buildGraph(TestSink.newBuilder().setDefaultGlobalCommitter().build(), this.runtimeExecutionMode);
        StreamNode sourceNode = this.findNodeName(streamGraph, node -> node.contains("Source"));
        StreamNode writerNode = this.findWriter(streamGraph);
        this.validateTopology(sourceNode, IntSerializer.class, writerNode, SinkWriterOperatorFactory.class, 2, -1);
        StreamNode committerNode = this.findCommitter(streamGraph);
        StreamNode globalCommitterNode = this.findGlobalCommitter(streamGraph);
        SinkV1TransformationTranslatorITCase.assertNoUnalignedOutput(writerNode);
        this.validateTopology(writerNode, SimpleVersionedSerializerTypeSerializerProxy.class, committerNode, CommitterOperatorFactory.class, 2, -1);
        SinkV1TransformationTranslatorITCase.assertNoUnalignedOutput(committerNode);
        this.validateTopology(committerNode, SimpleVersionedSerializerTypeSerializerProxy.class, globalCommitterNode, SimpleOperatorFactory.class, 1, 1);
    }
}

