/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io.checkpointing;

import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.core.memory.MemorySegmentProvider;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.MockChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.TestingConnectionManager;
import org.apache.flink.runtime.io.network.TestingPartitionRequestClient;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.shaded.guava31.com.google.common.io.Closer;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.checkpointing.TestBarrierHandlerFactory;
import org.apache.flink.streaming.runtime.io.checkpointing.UpstreamRecoveryTracker;
import org.apache.flink.streaming.runtime.io.checkpointing.ValidatingCheckpointHandler;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class CheckpointedInputGateTest {
    private final HashMap<Integer, Integer> channelIndexToSequenceNumber = new HashMap();

    @Before
    public void setUp() {
        this.channelIndexToSequenceNumber.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testUpstreamResumedUponEndOfRecovery() throws Exception {
        int numberOfChannels = 11;
        NetworkBufferPool bufferPool = new NetworkBufferPool(numberOfChannels * 3, 1024);
        try {
            ResumeCountingConnectionManager resumeCounter = new ResumeCountingConnectionManager();
            CheckpointedInputGate gate = this.setupInputGate(numberOfChannels, bufferPool, (ConnectionManager)resumeCounter);
            Assert.assertFalse((boolean)gate.pollNext().isPresent());
            for (int channelIndex = 0; channelIndex < numberOfChannels - 1; ++channelIndex) {
                this.enqueueEndOfState(gate, channelIndex);
                Optional bufferOrEvent = gate.pollNext();
                while (bufferOrEvent.isPresent() && ((BufferOrEvent)bufferOrEvent.get()).getEvent() instanceof EndOfChannelStateEvent && !gate.allChannelsRecovered()) {
                    bufferOrEvent = gate.pollNext();
                }
                Assert.assertFalse((String)"should align (block all channels)", (boolean)bufferOrEvent.isPresent());
            }
            this.enqueueEndOfState(gate, numberOfChannels - 1);
            Optional polled = gate.pollNext();
            Assert.assertTrue((boolean)polled.isPresent());
            Assert.assertTrue((boolean)((BufferOrEvent)polled.get()).isEvent());
            Assert.assertEquals((Object)EndOfChannelStateEvent.INSTANCE, (Object)((BufferOrEvent)polled.get()).getEvent());
            Assert.assertEquals((long)numberOfChannels, (long)resumeCounter.getNumResumed());
            Assert.assertFalse((String)"should only be a single event no matter of what is the number of channels", (boolean)gate.pollNext().isPresent());
        }
        finally {
            bufferPool.destroy();
        }
    }

    @Test
    public void testPersisting() throws Exception {
        this.testPersisting(false);
    }

    @Test
    public void testPersistingWithDrainingTheGate() throws Exception {
        this.testPersisting(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testPersisting(boolean drainGate) throws Exception {
        int numberOfChannels = 3;
        NetworkBufferPool bufferPool = new NetworkBufferPool(numberOfChannels * 3, 1024);
        try {
            long checkpointId = 2L;
            long obsoleteCheckpointId = 1L;
            ValidatingCheckpointHandler validatingHandler = new ValidatingCheckpointHandler(checkpointId);
            RecordingChannelStateWriter stateWriter = new RecordingChannelStateWriter();
            CheckpointedInputGate gate = this.setupInputGateWithAlternatingController(numberOfChannels, bufferPool, validatingHandler, stateWriter);
            this.enqueue(gate, 0, BufferBuilderTestUtils.buildSomeBuffer());
            this.enqueue(gate, 0, (AbstractEvent)CheckpointedInputGateTest.barrier(checkpointId));
            this.enqueue(gate, 0, BufferBuilderTestUtils.buildSomeBuffer());
            this.enqueue(gate, 1, BufferBuilderTestUtils.buildSomeBuffer());
            this.enqueue(gate, 1, (AbstractEvent)CheckpointedInputGateTest.barrier(obsoleteCheckpointId));
            this.enqueue(gate, 1, BufferBuilderTestUtils.buildSomeBuffer());
            this.enqueue(gate, 2, BufferBuilderTestUtils.buildSomeBuffer());
            Assert.assertEquals((long)0L, (long)validatingHandler.getTriggeredCheckpointCounter());
            gate.pollNext();
            Assert.assertEquals((long)1L, (long)validatingHandler.getTriggeredCheckpointCounter());
            this.assertAddedInputSize(stateWriter, 0, 1);
            this.assertAddedInputSize(stateWriter, 1, 2);
            this.assertAddedInputSize(stateWriter, 2, 1);
            this.enqueue(gate, 0, BufferBuilderTestUtils.buildSomeBuffer());
            this.enqueue(gate, 1, BufferBuilderTestUtils.buildSomeBuffer());
            this.enqueue(gate, 2, BufferBuilderTestUtils.buildSomeBuffer());
            while (drainGate && gate.pollNext().isPresent()) {
            }
            this.assertAddedInputSize(stateWriter, 0, 1);
            this.assertAddedInputSize(stateWriter, 1, 3);
            this.assertAddedInputSize(stateWriter, 2, 2);
            this.enqueue(gate, 1, (AbstractEvent)CheckpointedInputGateTest.barrier(checkpointId));
            this.enqueue(gate, 1, BufferBuilderTestUtils.buildSomeBuffer());
            this.enqueue(gate, 2, (AbstractEvent)CheckpointedInputGateTest.barrier(obsoleteCheckpointId));
            this.enqueue(gate, 2, BufferBuilderTestUtils.buildSomeBuffer());
            while (drainGate && gate.pollNext().isPresent()) {
            }
            this.assertAddedInputSize(stateWriter, 0, 1);
            this.assertAddedInputSize(stateWriter, 1, 3);
            this.assertAddedInputSize(stateWriter, 2, 3);
            this.enqueue(gate, 2, (AbstractEvent)CheckpointedInputGateTest.barrier(checkpointId));
            this.enqueue(gate, 2, BufferBuilderTestUtils.buildSomeBuffer());
            while (drainGate && gate.pollNext().isPresent()) {
            }
            this.assertAddedInputSize(stateWriter, 0, 1);
            this.assertAddedInputSize(stateWriter, 1, 3);
            this.assertAddedInputSize(stateWriter, 2, 3);
        }
        finally {
            bufferPool.destroy();
        }
    }

    @Test
    public void testPriorityBeforeClose() throws IOException, InterruptedException {
        NetworkBufferPool bufferPool = new NetworkBufferPool(10, 1024);
        try (Closer closer = Closer.create();){
            closer.register(() -> ((NetworkBufferPool)bufferPool).destroy());
            for (int repeat = 0; repeat < 100; ++repeat) {
                this.setUp();
                final SingleInputGate singleInputGate = new SingleInputGateBuilder().setNumberOfChannels(2).setBufferPoolFactory(bufferPool.createBufferPool(2, Integer.MAX_VALUE)).setSegmentProvider((MemorySegmentProvider)bufferPool).setChannelFactory(InputChannelBuilder::buildRemoteChannel).build();
                singleInputGate.setup();
                ((RemoteInputChannel)singleInputGate.getChannel(0)).requestSubpartitions();
                TaskMailboxImpl mailbox = new TaskMailboxImpl();
                MailboxExecutorImpl mailboxExecutor = new MailboxExecutorImpl((TaskMailbox)mailbox, 0, StreamTaskActionExecutor.IMMEDIATE);
                ValidatingCheckpointHandler validatingHandler = new ValidatingCheckpointHandler(1L);
                SingleCheckpointBarrierHandler barrierHandler = TestBarrierHandlerFactory.forTarget(validatingHandler).create(singleInputGate, (ChannelStateWriter)new MockChannelStateWriter());
                CheckpointedInputGate checkpointedInputGate = new CheckpointedInputGate((InputGate)singleInputGate, (CheckpointBarrierHandler)barrierHandler, (MailboxExecutor)mailboxExecutor, UpstreamRecoveryTracker.forInputGate((InputGate)singleInputGate));
                int oldSize = mailbox.size();
                this.enqueue(checkpointedInputGate, 0, (AbstractEvent)CheckpointedInputGateTest.barrier(1L));
                Deadline deadline = Deadline.fromNow((Duration)Duration.ofMinutes(1L));
                while (deadline.hasTimeLeft() && oldSize >= mailbox.size()) {
                    Thread.sleep(1L);
                }
                final CountDownLatch beforeLatch = new CountDownLatch(2);
                CheckedThread canceler = new CheckedThread("Canceler"){

                    public void go() throws IOException {
                        beforeLatch.countDown();
                        singleInputGate.close();
                    }
                };
                canceler.start();
                beforeLatch.countDown();
                try {
                    while (mailboxExecutor.tryYield()) {
                    }
                    Assert.assertEquals((long)1L, (long)validatingHandler.triggeredCheckpointCounter);
                }
                catch (CancelTaskException cancelTaskException) {
                    // empty catch block
                }
                canceler.join();
            }
        }
    }

    private static CheckpointBarrier barrier(long barrierId) {
        return new CheckpointBarrier(barrierId, barrierId, CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault()));
    }

    private void assertAddedInputSize(RecordingChannelStateWriter stateWriter, int channelIndex, int size) {
        Assert.assertEquals((long)size, (long)stateWriter.getAddedInput().get((Object)new InputChannelInfo(0, channelIndex)).size());
    }

    private void enqueueEndOfState(CheckpointedInputGate checkpointedInputGate, int channelIndex) throws IOException {
        this.enqueue(checkpointedInputGate, channelIndex, (AbstractEvent)EndOfChannelStateEvent.INSTANCE);
    }

    private void enqueueEndOfPartition(CheckpointedInputGate checkpointedInputGate, int channelIndex) throws IOException {
        this.enqueue(checkpointedInputGate, channelIndex, (AbstractEvent)EndOfPartitionEvent.INSTANCE);
    }

    private void enqueue(CheckpointedInputGate checkpointedInputGate, int channelIndex, AbstractEvent event) throws IOException {
        boolean hasPriority = false;
        if (event instanceof CheckpointBarrier) {
            hasPriority = ((CheckpointBarrier)event).getCheckpointOptions().isUnalignedCheckpoint();
        }
        this.enqueue(checkpointedInputGate, channelIndex, EventSerializer.toBuffer((AbstractEvent)event, (boolean)hasPriority));
    }

    private void enqueue(CheckpointedInputGate checkpointedInputGate, int channelIndex, Buffer buffer) throws IOException {
        Integer sequenceNumber = this.channelIndexToSequenceNumber.compute(channelIndex, (key, oldSequence) -> oldSequence == null ? 0 : oldSequence + 1);
        ((RemoteInputChannel)checkpointedInputGate.getChannel(channelIndex)).onBuffer(buffer, sequenceNumber.intValue(), 0, 0);
    }

    private CheckpointedInputGate setupInputGate(int numberOfChannels, NetworkBufferPool networkBufferPool, ConnectionManager connectionManager) throws Exception {
        SingleInputGate singleInputGate = new SingleInputGateBuilder().setBufferPoolFactory(networkBufferPool.createBufferPool(numberOfChannels, Integer.MAX_VALUE)).setSegmentProvider((MemorySegmentProvider)networkBufferPool).setChannelFactory((builder, gate) -> builder.setConnectionManager(connectionManager).buildRemoteChannel(gate)).setNumberOfChannels(numberOfChannels).build();
        singleInputGate.setup();
        MailboxExecutorImpl mailboxExecutor = new MailboxExecutorImpl((TaskMailbox)new TaskMailboxImpl(), 0, StreamTaskActionExecutor.IMMEDIATE);
        CheckpointBarrierTracker barrierHandler = new CheckpointBarrierTracker(numberOfChannels, (CheckpointableTask)new AbstractInvokable((Environment)new DummyEnvironment()){

            public void invoke() {
            }
        }, (Clock)SystemClock.getInstance(), true);
        CheckpointedInputGate checkpointedInputGate = new CheckpointedInputGate((InputGate)singleInputGate, (CheckpointBarrierHandler)barrierHandler, (MailboxExecutor)mailboxExecutor, UpstreamRecoveryTracker.forInputGate((InputGate)singleInputGate));
        for (int i = 0; i < numberOfChannels; ++i) {
            ((RemoteInputChannel)checkpointedInputGate.getChannel(i)).requestSubpartitions();
        }
        return checkpointedInputGate;
    }

    private CheckpointedInputGate setupInputGateWithAlternatingController(int numberOfChannels, NetworkBufferPool networkBufferPool, AbstractInvokable abstractInvokable, RecordingChannelStateWriter stateWriter) throws Exception {
        TestingConnectionManager connectionManager = new TestingConnectionManager();
        SingleInputGate singleInputGate = new SingleInputGateBuilder().setBufferPoolFactory(networkBufferPool.createBufferPool(numberOfChannels, Integer.MAX_VALUE)).setSegmentProvider((MemorySegmentProvider)networkBufferPool).setChannelFactory((arg_0, arg_1) -> CheckpointedInputGateTest.lambda$setupInputGateWithAlternatingController$2((ConnectionManager)connectionManager, arg_0, arg_1)).setNumberOfChannels(numberOfChannels).setChannelStateWriter((ChannelStateWriter)stateWriter).build();
        singleInputGate.setup();
        MailboxExecutorImpl mailboxExecutor = new MailboxExecutorImpl((TaskMailbox)new TaskMailboxImpl(), 0, StreamTaskActionExecutor.IMMEDIATE);
        SingleCheckpointBarrierHandler barrierHandler = TestBarrierHandlerFactory.forTarget(abstractInvokable).create(singleInputGate, (ChannelStateWriter)stateWriter);
        CheckpointedInputGate checkpointedInputGate = new CheckpointedInputGate((InputGate)singleInputGate, (CheckpointBarrierHandler)barrierHandler, (MailboxExecutor)mailboxExecutor, UpstreamRecoveryTracker.forInputGate((InputGate)singleInputGate));
        for (int i = 0; i < numberOfChannels; ++i) {
            ((RemoteInputChannel)checkpointedInputGate.getChannel(i)).requestSubpartitions();
        }
        return checkpointedInputGate;
    }

    private static /* synthetic */ InputChannel lambda$setupInputGateWithAlternatingController$2(ConnectionManager connectionManager, InputChannelBuilder builder, SingleInputGate gate) {
        return builder.setConnectionManager(connectionManager).buildRemoteChannel(gate);
    }

    private static class ResumeCountingConnectionManager
    extends TestingConnectionManager {
        private int numResumed;

        private ResumeCountingConnectionManager() {
        }

        public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) {
            return new TestingPartitionRequestClient(){

                public void resumeConsumption(RemoteInputChannel inputChannel) {
                    numResumed++;
                    super.resumeConsumption(inputChannel);
                }
            };
        }

        private int getNumResumed() {
            return this.numResumed;
        }
    }
}

