/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io.checkpointing;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class CheckpointBarrierTracker
extends CheckpointBarrierHandler {
    private static final Logger LOG = LoggerFactory.getLogger(CheckpointBarrierTracker.class);
    private static final int MAX_CHECKPOINTS_TO_TRACK = 50;
    private int numOpenChannels;
    private final ArrayDeque<CheckpointBarrierCount> pendingCheckpoints;
    private long latestPendingCheckpointID = -1L;

    public CheckpointBarrierTracker(int totalNumberOfInputChannels, CheckpointableTask toNotifyOnCheckpoint, Clock clock, boolean enableCheckpointAfterTasksFinished) {
        super(toNotifyOnCheckpoint, clock, enableCheckpointAfterTasksFinished);
        this.numOpenChannels = totalNumberOfInputChannels;
        this.pendingCheckpoints = new ArrayDeque();
    }

    @Override
    public void processBarrier(CheckpointBarrier receivedBarrier, InputChannelInfo channelInfo, boolean isRpcTriggered) throws IOException {
        long barrierId = receivedBarrier.getId();
        if (barrierId > this.latestPendingCheckpointID && this.numOpenChannels == 1) {
            this.markAlignmentStartAndEnd(barrierId, receivedBarrier.getTimestamp());
            this.notifyCheckpoint(receivedBarrier);
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received barrier for checkpoint {} from channel {}", (Object)barrierId, (Object)channelInfo);
        }
        CheckpointBarrierCount barrierCount = null;
        int pos = 0;
        for (CheckpointBarrierCount next : this.pendingCheckpoints) {
            if (next.checkpointId() == barrierId) {
                barrierCount = next;
                break;
            }
            ++pos;
        }
        if (barrierCount != null) {
            int numChannelsNew = barrierCount.markChannelAligned(channelInfo);
            if (numChannelsNew == barrierCount.getTargetChannelCount()) {
                for (int i = 0; i <= pos; ++i) {
                    this.pendingCheckpoints.pollFirst();
                }
                if (!barrierCount.isAborted()) {
                    this.triggerCheckpointOnAligned(barrierCount);
                }
            }
        } else if (barrierId > this.latestPendingCheckpointID) {
            this.markAlignmentStart(barrierId, receivedBarrier.getTimestamp());
            this.latestPendingCheckpointID = barrierId;
            this.pendingCheckpoints.addLast(new CheckpointBarrierCount(receivedBarrier, channelInfo, this.numOpenChannels));
            if (this.pendingCheckpoints.size() > 50) {
                this.pendingCheckpoints.pollFirst();
            }
        }
    }

    @Override
    public void processBarrierAnnouncement(CheckpointBarrier announcedBarrier, int sequenceNumber, InputChannelInfo channelInfo) throws IOException {
    }

    @Override
    public void processCancellationBarrier(CancelCheckpointMarker cancelBarrier, InputChannelInfo channelInfo) throws IOException {
        CheckpointBarrierCount cbc;
        long checkpointId = cancelBarrier.getCheckpointId();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received cancellation barrier for checkpoint {}", (Object)checkpointId);
        }
        if (cancelBarrier.getCheckpointId() > this.latestPendingCheckpointID && this.numOpenChannels == 1) {
            this.resetAlignment();
            this.notifyAbortOnCancellationBarrier(checkpointId);
            return;
        }
        while ((cbc = this.pendingCheckpoints.peekFirst()) != null && cbc.checkpointId() < checkpointId) {
            this.pendingCheckpoints.removeFirst();
            if (!cbc.markAborted()) continue;
            this.notifyAbortOnCancellationBarrier(cbc.checkpointId());
        }
        if (cbc != null && cbc.checkpointId() == checkpointId) {
            if (cbc.markAborted()) {
                this.notifyAbortOnCancellationBarrier(checkpointId);
            }
            if (cbc.markChannelAligned(channelInfo) == cbc.getTargetChannelCount()) {
                this.pendingCheckpoints.removeFirst();
            }
        } else if (checkpointId > this.latestPendingCheckpointID) {
            this.notifyAbortOnCancellationBarrier(checkpointId);
            this.latestPendingCheckpointID = checkpointId;
            CheckpointBarrierCount abortedMarker = new CheckpointBarrierCount(cancelBarrier.getCheckpointId(), channelInfo, this.numOpenChannels);
            abortedMarker.markAborted();
            this.pendingCheckpoints.addFirst(abortedMarker);
        }
    }

    @Override
    public void processEndOfPartition(InputChannelInfo channelInfo) throws IOException {
        --this.numOpenChannels;
        if (!this.isCheckpointAfterTasksFinishedEnabled()) {
            while (!this.pendingCheckpoints.isEmpty()) {
                CheckpointBarrierCount barrierCount = this.pendingCheckpoints.removeFirst();
                if (!barrierCount.markAborted()) continue;
                this.notifyAbort(barrierCount.checkpointId(), new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
            }
            this.resetAlignment();
        } else {
            this.checkAlignmentOnEndOfPartitionIfEnabled(channelInfo);
        }
    }

    private void checkAlignmentOnEndOfPartitionIfEnabled(InputChannelInfo channelInfo) throws IOException {
        CheckpointBarrierCount barrierCount = null;
        int pos = this.pendingCheckpoints.size();
        Iterator<CheckpointBarrierCount> reverseCheckpointIterator = this.pendingCheckpoints.descendingIterator();
        while (reverseCheckpointIterator.hasNext()) {
            CheckpointBarrierCount next = reverseCheckpointIterator.next();
            --pos;
            if (next.markChannelAligned(channelInfo) != next.getTargetChannelCount()) continue;
            barrierCount = next;
            break;
        }
        if (barrierCount != null) {
            for (int i = 0; i <= pos; ++i) {
                this.pendingCheckpoints.pollFirst();
            }
            if (!barrierCount.isAborted()) {
                this.triggerCheckpointOnAligned(barrierCount);
            }
        }
    }

    private void triggerCheckpointOnAligned(CheckpointBarrierCount barrierCount) throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("All the channels are aligned for checkpoint {}", (Object)barrierCount.checkpointId());
        }
        if (barrierCount.checkpointId == this.latestPendingCheckpointID) {
            this.markAlignmentEnd();
        }
        Preconditions.checkState((barrierCount.getPendingCheckpoint() != null ? 1 : 0) != 0, (Object)"Pending checkpoint barrier mustexists for non-aborted checkpoints.");
        this.notifyCheckpoint(barrierCount.getPendingCheckpoint());
    }

    @Override
    public long getLatestCheckpointId() {
        return this.pendingCheckpoints.isEmpty() ? -1L : this.pendingCheckpoints.peekLast().checkpointId();
    }

    @Override
    public boolean isCheckpointPending() {
        return !this.pendingCheckpoints.isEmpty();
    }

    @VisibleForTesting
    int getNumOpenChannels() {
        return this.numOpenChannels;
    }

    @VisibleForTesting
    List<Long> getPendingCheckpointIds() {
        return this.pendingCheckpoints.stream().map(CheckpointBarrierCount::checkpointId).collect(Collectors.toList());
    }

    private static final class CheckpointBarrierCount {
        private final long checkpointId;
        private final int targetChannelCount;
        private final Set<InputChannelInfo> alignedChannels = new HashSet<InputChannelInfo>();
        @Nullable
        private final CheckpointBarrier pendingCheckpoint;
        private boolean aborted;

        CheckpointBarrierCount(CheckpointBarrier pendingCheckpoint, InputChannelInfo channelInfo, int targetChannelCount) {
            Preconditions.checkNotNull((Object)pendingCheckpoint);
            this.checkpointId = pendingCheckpoint.getId();
            this.pendingCheckpoint = pendingCheckpoint;
            this.alignedChannels.add(channelInfo);
            this.targetChannelCount = targetChannelCount;
        }

        CheckpointBarrierCount(long checkpointId, InputChannelInfo channelInfo, int targetChannelCount) {
            this.checkpointId = checkpointId;
            this.pendingCheckpoint = null;
            this.alignedChannels.add(channelInfo);
            this.targetChannelCount = targetChannelCount;
        }

        public long checkpointId() {
            return this.checkpointId;
        }

        @Nullable
        public CheckpointBarrier getPendingCheckpoint() {
            return this.pendingCheckpoint;
        }

        public int getTargetChannelCount() {
            return this.targetChannelCount;
        }

        public int markChannelAligned(InputChannelInfo inputChannelInfo) {
            this.alignedChannels.add(inputChannelInfo);
            return this.alignedChannels.size();
        }

        public boolean isAborted() {
            return this.aborted;
        }

        public boolean markAborted() {
            boolean firstAbort = !this.aborted;
            this.aborted = true;
            return firstAbort;
        }

        public String toString() {
            return this.isAborted() ? String.format("checkpointID=%d - ABORTED", this.checkpointId) : String.format("checkpointID=%d, count=%d, targetChannelCount=%d", this.checkpointId, this.alignedChannels.size(), this.targetChannelCount);
        }
    }
}

