/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.operators.sink.GlobalCommitterOperator;
import org.apache.flink.streaming.runtime.operators.sink.IntegerSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.function.SerializableFunction;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

class GlobalCommitterOperatorTest {
    GlobalCommitterOperatorTest() {
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testWaitForCommittablesOfLatestCheckpointBeforeCommitting(boolean commitOnInput) throws Exception {
        MockCommitter committer = new MockCommitter();
        try (OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, Void> testHarness = this.createTestHarness(committer, commitOnInput);){
            testHarness.open();
            long cid = 1L;
            testHarness.processElement((StreamRecord<CommittableMessage<Integer>>)new StreamRecord((Object)new CommittableSummary(1, 1, cid, 2, 0)));
            testHarness.processElement((StreamRecord<CommittableMessage<Integer>>)new StreamRecord((Object)new CommittableWithLineage((Object)1, cid, 1)));
            testHarness.notifyOfCompletedCheckpoint(cid);
            Assertions.assertThat(testHarness.getOutput()).isEmpty();
            Assertions.assertThat(committer.committed).isEmpty();
            testHarness.processElement((StreamRecord<CommittableMessage<Integer>>)new StreamRecord((Object)new CommittableWithLineage((Object)2, cid, 1)));
            if (commitOnInput) {
                Assertions.assertThat(committer.committed).containsExactly((Object[])new Integer[]{1, 2});
            } else {
                Assertions.assertThat(committer.committed).isEmpty();
                testHarness.notifyOfCompletedCheckpoint(cid + 1L);
                Assertions.assertThat(committer.committed).containsExactly((Object[])new Integer[]{1, 2});
            }
            Assertions.assertThat(testHarness.getOutput()).isEmpty();
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testWaitForNotifyCheckpointCompleted(boolean commitOnInput) throws Exception {
        MockCommitter committer = new MockCommitter();
        try (OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, Void> testHarness = this.createTestHarness(committer, commitOnInput);){
            testHarness.open();
            long cid = 1L;
            testHarness.processElement((StreamRecord<CommittableMessage<Integer>>)new StreamRecord((Object)new CommittableSummary(1, 1, cid, 2, 0)));
            testHarness.processElement((StreamRecord<CommittableMessage<Integer>>)new StreamRecord((Object)new CommittableWithLineage((Object)1, cid, 1)));
            Assertions.assertThat(testHarness.getOutput()).isEmpty();
            Assertions.assertThat(committer.committed).isEmpty();
            testHarness.processElement((StreamRecord<CommittableMessage<Integer>>)new StreamRecord((Object)new CommittableWithLineage((Object)2, cid, 1)));
            if (commitOnInput) {
                Assertions.assertThat(committer.committed).containsExactly((Object[])new Integer[]{1, 2});
            } else {
                Assertions.assertThat(committer.committed).isEmpty();
            }
            testHarness.notifyOfCompletedCheckpoint(cid);
            Assertions.assertThat(committer.committed).containsExactly((Object[])new Integer[]{1, 2});
            Assertions.assertThat(testHarness.getOutput()).isEmpty();
        }
    }

    @Test
    void testStateRestore() throws Exception {
        MockCommitter committer = new MockCommitter();
        try (OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, Void> testHarness = this.createTestHarness(committer, false);){
            testHarness.open();
            CommittableSummary committableSummary = new CommittableSummary(1, 1, 0L, 1, 1);
            testHarness.processElement((StreamRecord<CommittableMessage<Integer>>)new StreamRecord((Object)committableSummary));
            CommittableWithLineage first = new CommittableWithLineage((Object)1, 0L, 1);
            testHarness.processElement((StreamRecord<CommittableMessage<Integer>>)new StreamRecord((Object)first));
            OperatorSubtaskState snapshot = testHarness.snapshot(0L, 2L);
            Assertions.assertThat(testHarness.getOutput()).isEmpty();
            testHarness.close();
            Assertions.assertThat(committer.committed).isEmpty();
            try (OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, Void> restored = this.createTestHarness(committer, true);){
                restored.initializeState(snapshot);
                restored.open();
                Assertions.assertThat(testHarness.getOutput()).isEmpty();
                Assertions.assertThat(committer.committed).containsExactly((Object[])new Integer[]{1});
            }
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testCommitAllCommittablesOnFinalCheckpoint(boolean commitOnInput) throws Exception {
        MockCommitter committer = new MockCommitter();
        OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, Void> testHarness = this.createTestHarness(committer, commitOnInput);
        testHarness.open();
        CommittableSummary committableSummary = new CommittableSummary(1, 2, Long.MAX_VALUE, 1, 0);
        testHarness.processElement((StreamRecord<CommittableMessage<Integer>>)new StreamRecord((Object)committableSummary));
        CommittableSummary committableSummary2 = new CommittableSummary(2, 2, Long.MAX_VALUE, 1, 0);
        testHarness.processElement((StreamRecord<CommittableMessage<Integer>>)new StreamRecord((Object)committableSummary2));
        CommittableWithLineage first = new CommittableWithLineage((Object)1, Long.MAX_VALUE, 1);
        testHarness.processElement((StreamRecord<CommittableMessage<Integer>>)new StreamRecord((Object)first));
        CommittableWithLineage second = new CommittableWithLineage((Object)2, Long.MAX_VALUE, 2);
        testHarness.processElement((StreamRecord<CommittableMessage<Integer>>)new StreamRecord((Object)second));
        if (commitOnInput) {
            Assertions.assertThat(committer.committed).containsExactly((Object[])new Integer[]{1, 2});
        } else {
            Assertions.assertThat(committer.committed).isEmpty();
            testHarness.notifyOfCompletedCheckpoint(Long.MAX_VALUE);
            Assertions.assertThat(committer.committed).containsExactly((Object[])new Integer[]{1, 2});
        }
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
    }

    private OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, Void> createTestHarness(Committer<Integer> committer, boolean commitOnInput) throws Exception {
        return new OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, Void>((OneInputStreamOperator<CommittableMessage<Integer>, Void>)new GlobalCommitterOperator((SerializableFunction & Serializable)ctx -> committer, IntegerSerializer::new, commitOnInput));
    }

    private static class MockCommitter
    implements Committer<Integer> {
        final List<Integer> committed = new ArrayList<Integer>();

        private MockCommitter() {
        }

        public void close() throws Exception {
        }

        public void commit(Collection<Committer.CommitRequest<Integer>> committables) throws IOException, InterruptedException {
            committables.forEach(c -> this.committed.add((Integer)c.getCommittable()));
        }
    }
}

