/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.io.Serializable;
import java.util.Collection;
import java.util.List;
import java.util.function.IntSupplier;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.configuration.SinkOptions;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializerAdapter;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummaryAssert;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineageAssert;
import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.function.SerializableSupplier;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.InstanceOfAssertFactory;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;

class SinkV2CommitterOperatorTest {
    public static final SerializableSupplier<SimpleVersionedSerializer<String>> STRING_SERIALIZER = (SerializableSupplier & Serializable)() -> new SimpleVersionedSerializerAdapter((TypeSerializer)StringSerializer.INSTANCE);

    SinkV2CommitterOperatorTest() {
    }

    SinkAndCounters sinkWithPostCommit() {
        ForwardingCommitter committer = new ForwardingCommitter();
        return new SinkAndCounters(TestSinkV2.newBuilder().setWriter(new TestSinkV2.ForwardCommittingSinkWriter()).setCommitter(committer, STRING_SERIALIZER).setWithPostCommitTopology(true).build(), () -> committer.successfulCommits);
    }

    SinkAndCounters sinkWithPostCommitWithRetry() {
        return new SinkAndCounters(TestSinkV2.newBuilder().setWriter(new TestSinkV2.ForwardCommittingSinkWriter()).setCommitter(new TestSinkV2.RetryOnceCommitter(), STRING_SERIALIZER).setWithPostCommitTopology(true).build(), () -> 0);
    }

    SinkAndCounters sinkWithoutPostCommit() {
        ForwardingCommitter committer = new ForwardingCommitter();
        return new SinkAndCounters(TestSinkV2.newBuilder().setWriter(new TestSinkV2.ForwardCommittingSinkWriter()).setCommitter(committer, STRING_SERIALIZER).setWithPostCommitTopology(false).build(), () -> committer.successfulCommits);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testEmitCommittables(boolean withPostCommitTopology) throws Exception {
        SinkAndCounters sinkAndCounters = withPostCommitTopology ? this.sinkWithPostCommit() : this.sinkWithoutPostCommit();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)new CommitterOperatorFactory(sinkAndCounters.sink, false, true));
        testHarness.open();
        CommittableSummary committableSummary = new CommittableSummary(1, 1, 1L, 1, 0);
        testHarness.processElement(new StreamRecord((Object)committableSummary));
        CommittableWithLineage committableWithLineage = new CommittableWithLineage((Object)"1", 1L, 1);
        testHarness.processElement(new StreamRecord((Object)committableWithLineage));
        testHarness.notifyOfCompletedCheckpoint(1L);
        Assertions.assertThat((int)sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1);
        if (withPostCommitTopology) {
            ListAssert records = (ListAssert)Assertions.assertThat((List)testHarness.extractOutputValues()).hasSize(2);
            ((CommittableSummaryAssert)records.element(0, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableSummary()))).hasFailedCommittables(committableSummary.getNumberOfFailedCommittables()).hasOverallCommittables(committableSummary.getNumberOfCommittables());
            ((CommittableWithLineageAssert)records.element(1, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableWithLineage()))).isEqualTo((Object)committableWithLineage.withSubtaskId(0));
        } else {
            Assertions.assertThat((Collection)testHarness.getOutput()).isEmpty();
        }
        testHarness.close();
    }

    @Test
    void ensureAllCommittablesArrivedBeforeCommitting() throws Exception {
        SinkAndCounters sinkAndCounters = this.sinkWithPostCommit();
        OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>> testHarness = this.createTestHarness(sinkAndCounters.sink, false, true, 1, 1, 0);
        testHarness.open();
        testHarness.setProcessingTime(0L);
        CommittableSummary committableSummary = new CommittableSummary(1, 1, 1L, 2, 0);
        testHarness.processElement(new StreamRecord((Object)committableSummary));
        CommittableWithLineage first = new CommittableWithLineage((Object)"1", 1L, 1);
        testHarness.processElement(new StreamRecord((Object)first));
        Assertions.assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(1L)).hasMessageContaining("Trying to commit incomplete batch of committables");
        Assertions.assertThat((Collection)testHarness.getOutput()).isEmpty();
        Assertions.assertThat((int)sinkAndCounters.commitCounter.getAsInt()).isZero();
        CommittableWithLineage second = new CommittableWithLineage((Object)"2", 1L, 1);
        testHarness.processElement(new StreamRecord((Object)second));
        Assertions.assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(1L)).doesNotThrowAnyException();
        Assertions.assertThat((int)sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2);
        ListAssert records = (ListAssert)Assertions.assertThat((List)testHarness.extractOutputValues()).hasSize(3);
        ((CommittableSummaryAssert)records.element(0, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableSummary()))).hasFailedCommittables(committableSummary.getNumberOfFailedCommittables()).hasOverallCommittables(committableSummary.getNumberOfCommittables());
        ((CommittableWithLineageAssert)records.element(1, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableWithLineage()))).isEqualTo((Object)first.withSubtaskId(0));
        ((CommittableWithLineageAssert)records.element(2, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableWithLineage()))).isEqualTo((Object)second.withSubtaskId(0));
        testHarness.close();
    }

    @ParameterizedTest
    @CsvSource(value={"1, 10, 9", "2, 1, 0", "2, 2, 1"})
    void testStateRestoreWithScaling(int parallelismBeforeScaling, int parallelismAfterScaling, int subtaskIdAfterRecovery) throws Exception {
        boolean originalSubtaskId = false;
        OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>> testHarness = this.createTestHarness(this.sinkWithPostCommitWithRetry().sink, false, true, parallelismBeforeScaling, parallelismBeforeScaling, 0);
        testHarness.open();
        long checkpointId = 0L;
        CommittableSummary committableSummary = new CommittableSummary(0, parallelismBeforeScaling, checkpointId, 1, 0);
        testHarness.processElement(new StreamRecord((Object)committableSummary));
        CommittableWithLineage first = new CommittableWithLineage((Object)"1", checkpointId, 0);
        testHarness.processElement(new StreamRecord((Object)first));
        CommittableSummary committableSummary2 = new CommittableSummary(1, parallelismBeforeScaling, checkpointId, 1, 0);
        testHarness.processElement(new StreamRecord((Object)committableSummary2));
        CommittableWithLineage second = new CommittableWithLineage((Object)"2", checkpointId, 1);
        testHarness.processElement(new StreamRecord((Object)second));
        OperatorSubtaskState snapshot = testHarness.snapshot(checkpointId, 2L);
        Assertions.assertThat((Collection)testHarness.getOutput()).isEmpty();
        testHarness.close();
        SinkAndCounters restored = this.sinkWithPostCommit();
        OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>> restoredHarness = this.createTestHarness(restored.sink, false, true, parallelismAfterScaling, parallelismAfterScaling, subtaskIdAfterRecovery);
        restoredHarness.initializeState(snapshot);
        restoredHarness.open();
        Assertions.assertThat((int)restored.commitCounter.getAsInt()).isEqualTo(2);
        ListAssert records = (ListAssert)Assertions.assertThat((List)restoredHarness.extractOutputValues()).hasSize(3);
        CommittableSummaryAssert objectCommittableSummaryAssert = ((CommittableSummaryAssert)records.element(0, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableSummary()))).hasCheckpointId(checkpointId).hasFailedCommittables(0).hasSubtaskId(subtaskIdAfterRecovery).hasNumberOfSubtasks(Math.min(parallelismBeforeScaling, parallelismAfterScaling));
        objectCommittableSummaryAssert.hasOverallCommittables(2);
        ((CommittableWithLineageAssert)records.element(1, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableWithLineage()))).hasCheckpointId(checkpointId).hasSubtaskId(subtaskIdAfterRecovery).hasCommittable(first.getCommittable());
        ((CommittableWithLineageAssert)records.element(2, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableWithLineage()))).hasCheckpointId(checkpointId).hasSubtaskId(subtaskIdAfterRecovery).hasCommittable(second.getCommittable());
        restoredHarness.close();
    }

    @ParameterizedTest
    @ValueSource(ints={0, 1})
    void testNumberOfRetries(int numRetries) throws Exception {
        try (OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>> testHarness = this.createTestHarness(this.sinkWithPostCommitWithRetry().sink, false, true, 1, 1, 0);){
            testHarness.getStreamConfig().getConfiguration().set(SinkOptions.COMMITTER_RETRIES, (Object)numRetries);
            testHarness.open();
            long ckdId = 1L;
            testHarness.processElement(new StreamRecord((Object)new CommittableSummary(0, 1, ckdId, 1, 0)));
            testHarness.processElement(new StreamRecord((Object)new CommittableWithLineage((Object)"1", ckdId, 0)));
            AbstractThrowableAssert throwableAssert = Assertions.assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(ckdId));
            if (numRetries == 0) {
                throwableAssert.hasMessageContaining("Failed to commit 1 committables");
            } else {
                throwableAssert.doesNotThrowAnyException();
            }
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Exception {
        SinkAndCounters sinkAndCounters = this.sinkWithPostCommit();
        try (OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)new CommitterOperatorFactory(sinkAndCounters.sink, false, isCheckpointingEnabled));){
            testHarness.open();
            CommittableSummary committableSummary = new CommittableSummary(1, 1, 1L, 1, 0);
            testHarness.processElement(new StreamRecord((Object)committableSummary));
            CommittableWithLineage committableWithLineage = new CommittableWithLineage((Object)"1", 1L, 1);
            testHarness.processElement(new StreamRecord((Object)committableWithLineage));
            testHarness.endInput();
            if (isCheckpointingEnabled) {
                testHarness.notifyOfCompletedCheckpoint(1L);
            }
            ListAssert records = (ListAssert)Assertions.assertThat((List)testHarness.extractOutputValues()).hasSize(2);
            CommittableSummaryAssert objectCommittableSummaryAssert = ((CommittableSummaryAssert)records.element(0, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableSummary()))).hasCheckpointId(1L);
            objectCommittableSummaryAssert.hasOverallCommittables(1);
            ((CommittableWithLineageAssert)records.element(1, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableWithLineage()))).isEqualTo((Object)committableWithLineage.withSubtaskId(0));
            testHarness.notifyOfCompletedCheckpoint(2L);
            testHarness.endInput();
            Assertions.assertThat((Collection)testHarness.getOutput()).hasSize(2);
        }
    }

    private OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>> createTestHarness(SupportsCommitter<String> sink, boolean isBatchMode, boolean isCheckpointingEnabled, int maxParallelism, int parallelism, int subtaskId) throws Exception {
        return new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)new CommitterOperatorFactory(sink, isBatchMode, isCheckpointingEnabled), maxParallelism, parallelism, subtaskId);
    }

    static class SinkAndCounters {
        SupportsCommitter<String> sink;
        IntSupplier commitCounter;

        public SinkAndCounters(TestSinkV2<?> sink, IntSupplier commitCounter) {
            this.sink = (SupportsCommitter)sink;
            this.commitCounter = commitCounter;
        }
    }

    private static class ForwardingCommitter<CommT>
    extends TestSinkV2.DefaultCommitter<CommT> {
        private int successfulCommits = 0;

        private ForwardingCommitter() {
        }

        @Override
        public void commit(Collection<Committer.CommitRequest<CommT>> committables) {
            this.successfulCommits += committables.size();
        }

        @Override
        public void close() throws Exception {
        }
    }
}

