/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.checkpoint.StateForTask;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CheckpointCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class);
    private static final int NUM_GHOST_CHECKPOINT_IDS = 16;
    private final Object lock = new Object();
    private final JobID job;
    private final ExecutionVertex[] tasksToTrigger;
    private final ExecutionVertex[] tasksToWaitFor;
    private final ExecutionVertex[] tasksToCommitTo;
    private final Map<Long, PendingCheckpoint> pendingCheckpoints;
    private final CompletedCheckpointStore completedCheckpointStore;
    private final ArrayDeque<Long> recentPendingCheckpoints;
    private final CheckpointIDCounter checkpointIdCounter;
    private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger();
    private final Timer timer;
    private final long checkpointTimeout;
    private TimerTask periodicScheduler;
    private ActorGateway jobStatusListener;
    private ClassLoader userClassLoader;
    private volatile boolean shutdown;
    private final Thread shutdownHook;

    public CheckpointCoordinator(JobID job, long checkpointTimeout, ExecutionVertex[] tasksToTrigger, ExecutionVertex[] tasksToWaitFor, ExecutionVertex[] tasksToCommitTo, ClassLoader userClassLoader, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore, RecoveryMode recoveryMode) throws Exception {
        Preconditions.checkArgument(checkpointTimeout >= 1L, "Checkpoint timeout must be larger than zero");
        this.job = Preconditions.checkNotNull(job);
        this.checkpointTimeout = checkpointTimeout;
        this.tasksToTrigger = Preconditions.checkNotNull(tasksToTrigger);
        this.tasksToWaitFor = Preconditions.checkNotNull(tasksToWaitFor);
        this.tasksToCommitTo = Preconditions.checkNotNull(tasksToCommitTo);
        this.pendingCheckpoints = new LinkedHashMap<Long, PendingCheckpoint>();
        this.completedCheckpointStore = Preconditions.checkNotNull(completedCheckpointStore);
        this.recentPendingCheckpoints = new ArrayDeque(16);
        this.userClassLoader = userClassLoader;
        this.checkpointIdCounter = Preconditions.checkNotNull(checkpointIDCounter);
        checkpointIDCounter.start();
        this.timer = new Timer("Checkpoint Timer", true);
        if (recoveryMode == RecoveryMode.STANDALONE) {
            this.shutdownHook = new Thread(new Runnable(){

                @Override
                public void run() {
                    try {
                        CheckpointCoordinator.this.shutdown();
                    }
                    catch (Throwable t) {
                        LOG.error("Error during shutdown of checkpoint coordniator via JVM shutdown hook: " + t.getMessage(), t);
                    }
                }
            });
            try {
                Runtime.getRuntime().addShutdownHook(this.shutdownHook);
            }
            catch (IllegalStateException illegalStateException) {
            }
            catch (Throwable t) {
                LOG.error("Cannot register checkpoint coordinator shutdown hook.", t);
            }
        } else {
            this.shutdownHook = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() throws Exception {
        Object object = this.lock;
        synchronized (object) {
            try {
                if (!this.shutdown) {
                    this.shutdown = true;
                    LOG.info("Stopping checkpoint coordinator for job " + this.job);
                    this.timer.cancel();
                    if (this.jobStatusListener != null) {
                        this.jobStatusListener.tell(PoisonPill.getInstance());
                        this.jobStatusListener = null;
                    }
                    if (this.periodicScheduler != null) {
                        this.periodicScheduler.cancel();
                        this.periodicScheduler = null;
                    }
                    this.checkpointIdCounter.stop();
                    for (PendingCheckpoint pending : this.pendingCheckpoints.values()) {
                        pending.discard(this.userClassLoader, true);
                    }
                    this.pendingCheckpoints.clear();
                    this.completedCheckpointStore.discardAllCheckpoints();
                }
            }
            finally {
                if (this.shutdownHook != null && this.shutdownHook != Thread.currentThread()) {
                    try {
                        Runtime.getRuntime().removeShutdownHook(this.shutdownHook);
                    }
                    catch (IllegalStateException i$) {
                    }
                    catch (Throwable t) {
                        LOG.warn("Error unregistering checkpoint cooordniator shutdown hook.", t);
                    }
                }
            }
        }
    }

    public boolean isShutdown() {
        return this.shutdown;
    }

    public void triggerCheckpoint() throws Exception {
        this.triggerCheckpoint(System.currentTimeMillis());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean triggerCheckpoint(long timestamp) throws Exception {
        if (this.shutdown) {
            LOG.error("Cannot trigger checkpoint, checkpoint coordinator has been shutdown.");
            return false;
        }
        final long checkpointID = this.checkpointIdCounter.getAndIncrement();
        LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp);
        try {
            ExecutionAttemptID[] triggerIDs = new ExecutionAttemptID[this.tasksToTrigger.length];
            for (int i = 0; i < this.tasksToTrigger.length; ++i) {
                Execution ee = this.tasksToTrigger[i].getCurrentExecutionAttempt();
                if (ee == null || ee.getState() != ExecutionState.RUNNING) {
                    LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.", (Object)this.tasksToTrigger[i].getSimpleName());
                    return false;
                }
                triggerIDs[i] = ee.getAttemptId();
            }
            HashMap<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<ExecutionAttemptID, ExecutionVertex>(this.tasksToWaitFor.length);
            for (ExecutionVertex ev : this.tasksToWaitFor) {
                Execution ee = ev.getCurrentExecutionAttempt();
                if (ee == null) {
                    LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.", (Object)ev.getSimpleName());
                    return false;
                }
                ackTasks.put(ee.getAttemptId(), ev);
            }
            final PendingCheckpoint checkpoint = new PendingCheckpoint(this.job, checkpointID, timestamp, ackTasks);
            TimerTask canceller = new TimerTask(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    try {
                        Object object = CheckpointCoordinator.this.lock;
                        synchronized (object) {
                            if (!checkpoint.isDiscarded()) {
                                LOG.info("Checkpoint " + checkpointID + " expired before completing.");
                                checkpoint.discard(CheckpointCoordinator.this.userClassLoader, true);
                                CheckpointCoordinator.this.pendingCheckpoints.remove(checkpointID);
                                CheckpointCoordinator.this.rememberRecentCheckpointId(checkpointID);
                            }
                        }
                    }
                    catch (Throwable t) {
                        LOG.error("Exception while handling checkpoint timeout", t);
                    }
                }
            };
            Object i$ = this.lock;
            synchronized (i$) {
                if (this.shutdown) {
                    throw new IllegalStateException("Checkpoint coordinator has been shutdown.");
                }
                this.pendingCheckpoints.put(checkpointID, checkpoint);
                this.timer.schedule(canceller, this.checkpointTimeout);
            }
            for (int i = 0; i < this.tasksToTrigger.length; ++i) {
                ExecutionAttemptID id = triggerIDs[i];
                TriggerCheckpoint message = new TriggerCheckpoint(this.job, id, checkpointID, timestamp);
                this.tasksToTrigger[i].sendMessageToCurrentExecution(message, id);
            }
            this.numUnsuccessfulCheckpointsTriggers.set(0);
            return true;
        }
        catch (Throwable t) {
            int numUnsuccessful = this.numUnsuccessfulCheckpointsTriggers.incrementAndGet();
            LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
            Object object = this.lock;
            synchronized (object) {
                PendingCheckpoint checkpoint = this.pendingCheckpoints.remove(checkpointID);
                if (checkpoint != null && !checkpoint.isDiscarded()) {
                    checkpoint.discard(this.userClassLoader, true);
                }
            }
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws Exception {
        if (this.shutdown || message == null) {
            return;
        }
        if (!this.job.equals((Object)message.getJob())) {
            LOG.error("Received AcknowledgeCheckpoint message for wrong job: {}", (Object)message);
            return;
        }
        long checkpointId = message.getCheckpointId();
        CompletedCheckpoint completed = null;
        Object object = this.lock;
        synchronized (object) {
            if (this.shutdown) {
                return;
            }
            PendingCheckpoint checkpoint = this.pendingCheckpoints.get(checkpointId);
            if (checkpoint != null && !checkpoint.isDiscarded()) {
                if (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState())) {
                    if (checkpoint.isFullyAcknowledged()) {
                        completed = checkpoint.toCompletedCheckpoint();
                        this.completedCheckpointStore.addCheckpoint(completed);
                        LOG.info("Completed checkpoint " + checkpointId);
                        LOG.debug(completed.getStates().toString());
                        this.pendingCheckpoints.remove(checkpointId);
                        this.rememberRecentCheckpointId(checkpointId);
                        this.dropSubsumedCheckpoints(completed.getTimestamp());
                    }
                } else {
                    LOG.error("Received duplicate or invalid acknowledge message for checkpoint " + checkpointId + " , task " + (Object)((Object)message.getTaskExecutionId()));
                }
            } else {
                if (checkpoint != null) {
                    throw new IllegalStateException("Received message for discarded but non-removed checkpoint " + checkpointId);
                }
                if (this.recentPendingCheckpoints.contains(checkpointId)) {
                    LOG.warn("Received late message for now expired checkpoint attempt " + checkpointId);
                } else {
                    LOG.info("Received message for non-existing checkpoint " + checkpointId);
                }
            }
        }
        if (completed != null) {
            long timestamp = completed.getTimestamp();
            for (ExecutionVertex ev : this.tasksToCommitTo) {
                Execution ee = ev.getCurrentExecutionAttempt();
                if (ee == null) continue;
                ExecutionAttemptID attemptId = ee.getAttemptId();
                NotifyCheckpointComplete notifyMessage = new NotifyCheckpointComplete(this.job, attemptId, checkpointId, timestamp);
                ev.sendMessageToCurrentExecution(notifyMessage, ee.getAttemptId());
            }
        }
    }

    private void rememberRecentCheckpointId(long id) {
        if (this.recentPendingCheckpoints.size() >= 16) {
            this.recentPendingCheckpoints.removeFirst();
        }
        this.recentPendingCheckpoints.addLast(id);
    }

    private void dropSubsumedCheckpoints(long timestamp) {
        Iterator<Map.Entry<Long, PendingCheckpoint>> entries = this.pendingCheckpoints.entrySet().iterator();
        while (entries.hasNext()) {
            PendingCheckpoint p = entries.next().getValue();
            if (p.getCheckpointTimestamp() >= timestamp) continue;
            this.rememberRecentCheckpointId(p.getCheckpointId());
            p.discard(this.userClassLoader, true);
            entries.remove();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void restoreLatestCheckpointedState(Map<JobVertexID, ExecutionJobVertex> tasks, boolean errorIfNoCheckpoint, boolean allOrNothingState) throws Exception {
        Object object = this.lock;
        synchronized (object) {
            if (this.shutdown) {
                throw new IllegalStateException("CheckpointCoordinator is shut down");
            }
            this.completedCheckpointStore.recover();
            CompletedCheckpoint latest = this.completedCheckpointStore.getLatestCheckpoint();
            if (latest == null) {
                if (errorIfNoCheckpoint) {
                    throw new IllegalStateException("No completed checkpoint available");
                }
                return;
            }
            if (allOrNothingState) {
                ExecutionJobVertex vertex;
                HashMap<ExecutionJobVertex, Integer> stateCounts = new HashMap<ExecutionJobVertex, Integer>();
                for (StateForTask stateForTask : latest.getStates()) {
                    vertex = tasks.get((Object)stateForTask.getOperatorId());
                    Execution exec = vertex.getTaskVertices()[stateForTask.getSubtask()].getCurrentExecutionAttempt();
                    exec.setInitialState(stateForTask.getState());
                    Integer count = (Integer)stateCounts.get(vertex);
                    if (count != null) {
                        stateCounts.put(vertex, count + 1);
                        continue;
                    }
                    stateCounts.put(vertex, 1);
                }
                for (Map.Entry entry : stateCounts.entrySet()) {
                    vertex = (ExecutionJobVertex)entry.getKey();
                    if (((Integer)entry.getValue()).intValue() == vertex.getParallelism()) continue;
                    throw new IllegalStateException("The checkpoint contained state only for a subset of tasks for vertex " + vertex);
                }
            } else {
                for (StateForTask state : latest.getStates()) {
                    ExecutionJobVertex executionJobVertex = tasks.get((Object)state.getOperatorId());
                    Execution exec = executionJobVertex.getTaskVertices()[state.getSubtask()].getCurrentExecutionAttempt();
                    exec.setInitialState(state.getState());
                }
            }
        }
    }

    public int getNumberOfPendingCheckpoints() {
        return this.pendingCheckpoints.size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getNumberOfRetainedSuccessfulCheckpoints() {
        Object object = this.lock;
        synchronized (object) {
            return this.completedCheckpointStore.getNumberOfRetainedCheckpoints();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Map<Long, PendingCheckpoint> getPendingCheckpoints() {
        Object object = this.lock;
        synchronized (object) {
            return new HashMap<Long, PendingCheckpoint>(this.pendingCheckpoints);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<CompletedCheckpoint> getSuccessfulCheckpoints() throws Exception {
        Object object = this.lock;
        synchronized (object) {
            return this.completedCheckpointStore.getAllCheckpoints();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void startPeriodicCheckpointScheduler(long interval) {
        Object object = this.lock;
        synchronized (object) {
            if (this.shutdown) {
                throw new IllegalArgumentException("Checkpoint coordinator is shut down");
            }
            this.stopPeriodicCheckpointScheduler();
            this.periodicScheduler = new TimerTask(){

                @Override
                public void run() {
                    try {
                        CheckpointCoordinator.this.triggerCheckpoint();
                    }
                    catch (Exception e) {
                        LOG.error("Exception while triggering checkpoint", e);
                    }
                }
            };
            this.timer.scheduleAtFixedRate(this.periodicScheduler, interval, interval);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stopPeriodicCheckpointScheduler() {
        Object object = this.lock;
        synchronized (object) {
            if (this.periodicScheduler != null) {
                this.periodicScheduler.cancel();
                this.periodicScheduler = null;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ActorGateway createJobStatusListener(ActorSystem actorSystem, long checkpointInterval, UUID leaderSessionID) {
        Object object = this.lock;
        synchronized (object) {
            if (this.shutdown) {
                throw new IllegalArgumentException("Checkpoint coordinator is shut down");
            }
            if (this.jobStatusListener == null) {
                Props props = Props.create(CheckpointCoordinatorDeActivator.class, (Object[])new Object[]{this, checkpointInterval, leaderSessionID});
                this.jobStatusListener = new AkkaActorGateway(actorSystem.actorOf(props), leaderSessionID);
            }
            return this.jobStatusListener;
        }
    }
}

