/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.observer;

import java.time.Clock;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.health.ClusterHealthInfo;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusterHealthEvaluator {
    private static final String CLUSTER_INFO_KEY = ClusterHealthInfo.class.getSimpleName();
    private static final Logger LOG = LoggerFactory.getLogger(ClusterHealthEvaluator.class);
    private final Clock clock;

    public ClusterHealthEvaluator(Clock clock) {
        this.clock = clock;
    }

    public static ClusterHealthInfo getLastValidClusterHealthInfo(Map<String, String> clusterInfo) {
        LOG.debug("Getting last valid health check info");
        if (clusterInfo.containsKey(CLUSTER_INFO_KEY)) {
            return ClusterHealthInfo.deserialize(clusterInfo.get(CLUSTER_INFO_KEY));
        }
        LOG.debug("No last valid health check info");
        return null;
    }

    public static void setLastValidClusterHealthInfo(Map<String, String> clusterInfo, ClusterHealthInfo clusterHealthInfo) {
        LOG.debug("Setting last valid health check info");
        clusterInfo.put(CLUSTER_INFO_KEY, ClusterHealthInfo.serialize(clusterHealthInfo));
    }

    public static void removeLastValidClusterHealthInfo(Map<String, String> clusterInfo) {
        LOG.debug("Removing last valid health check info");
        clusterInfo.remove(CLUSTER_INFO_KEY);
    }

    public void evaluate(Configuration configuration, Map<String, String> clusterInfo, ClusterHealthInfo observedClusterHealthInfo) {
        if (ClusterHealthInfo.isValid(observedClusterHealthInfo)) {
            LOG.debug("Observed health info is valid");
            ClusterHealthInfo lastValidClusterHealthInfo = ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo);
            if (lastValidClusterHealthInfo == null) {
                LOG.debug("No last valid health info, skipping health check");
                observedClusterHealthInfo.setNumRestartsEvaluationTimeStamp(observedClusterHealthInfo.getTimeStamp());
                observedClusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(observedClusterHealthInfo.getTimeStamp());
                ClusterHealthEvaluator.setLastValidClusterHealthInfo(clusterInfo, observedClusterHealthInfo);
            } else {
                if (observedClusterHealthInfo.getTimeStamp() < lastValidClusterHealthInfo.getTimeStamp()) {
                    String msg = "Observed health info timestamp is less than the last valid health info timestamp, this indicates a bug...";
                    LOG.error(msg);
                    throw new IllegalStateException(msg);
                }
                LOG.debug("Valid health info exist, checking cluster health");
                LOG.debug("Last valid health info: {}", (Object)lastValidClusterHealthInfo);
                LOG.debug("Observed health info: {}", (Object)observedClusterHealthInfo);
                boolean isHealthy = this.evaluateRestarts(configuration, clusterInfo, lastValidClusterHealthInfo, observedClusterHealthInfo) && this.evaluateCheckpoints(configuration, lastValidClusterHealthInfo, observedClusterHealthInfo);
                lastValidClusterHealthInfo.setTimeStamp(observedClusterHealthInfo.getTimeStamp());
                lastValidClusterHealthInfo.setHealthy(isHealthy);
                ClusterHealthEvaluator.setLastValidClusterHealthInfo(clusterInfo, lastValidClusterHealthInfo);
            }
        }
    }

    private boolean evaluateRestarts(Configuration configuration, Map<String, String> clusterInfo, ClusterHealthInfo lastValidClusterHealthInfo, ClusterHealthInfo observedClusterHealthInfo) {
        boolean isHealthy;
        if (observedClusterHealthInfo.getNumRestarts() < lastValidClusterHealthInfo.getNumRestarts()) {
            LOG.debug("Observed health info number of restarts is less than in the last valid health info, skipping health check");
            lastValidClusterHealthInfo.setNumRestarts(observedClusterHealthInfo.getNumRestarts());
            lastValidClusterHealthInfo.setNumRestartsEvaluationTimeStamp(observedClusterHealthInfo.getTimeStamp());
            return true;
        }
        long timestampDiffMs = observedClusterHealthInfo.getTimeStamp() - lastValidClusterHealthInfo.getNumRestartsEvaluationTimeStamp();
        LOG.debug("Time difference between health infos: {}", (Object)Duration.ofMillis(timestampDiffMs));
        Duration restartCheckWindow = (Duration)configuration.get(KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW);
        long restartCheckWindowMs = restartCheckWindow.toMillis();
        double countMultiplier = (double)restartCheckWindowMs / (double)timestampDiffMs;
        if (countMultiplier > 1.0) {
            countMultiplier = 1.0;
        }
        long numRestarts = (long)((double)(observedClusterHealthInfo.getNumRestarts() - lastValidClusterHealthInfo.getNumRestarts()) * countMultiplier);
        LOG.debug("Calculated restart count for {} window: {}", (Object)restartCheckWindow, (Object)numRestarts);
        Integer restartThreshold = (Integer)configuration.get(KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD);
        boolean bl = isHealthy = numRestarts <= (long)restartThreshold.intValue();
        if (!isHealthy) {
            LOG.info("Restart count hit threshold: {}", (Object)restartThreshold);
        }
        if (lastValidClusterHealthInfo.getNumRestartsEvaluationTimeStamp() < this.clock.millis() - restartCheckWindowMs) {
            LOG.debug("Last valid number of restarts evaluation timestamp is outside of the window");
            lastValidClusterHealthInfo.setNumRestarts(observedClusterHealthInfo.getNumRestarts());
            lastValidClusterHealthInfo.setNumRestartsEvaluationTimeStamp(observedClusterHealthInfo.getTimeStamp());
        }
        return isHealthy;
    }

    private boolean evaluateCheckpoints(Configuration configuration, ClusterHealthInfo lastValidClusterHealthInfo, ClusterHealthInfo observedClusterHealthInfo) {
        if (!configuration.getBoolean(KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED)) {
            return true;
        }
        Optional windowOpt = configuration.getOptional(KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW);
        CheckpointConfig checkpointConfig = new CheckpointConfig();
        checkpointConfig.configure((ReadableConfig)configuration);
        long checkpointingInterval = checkpointConfig.getCheckpointInterval();
        long checkpointingTimeout = checkpointConfig.getCheckpointTimeout();
        int tolerationFailureNumber = checkpointConfig.getTolerableCheckpointFailureNumber() + 2;
        Duration minCheckWindow = Duration.ofMillis(Math.max(checkpointingInterval * (long)tolerationFailureNumber, checkpointingTimeout * (long)tolerationFailureNumber));
        if (windowOpt.isEmpty() && !checkpointConfig.isCheckpointingEnabled()) {
            return true;
        }
        Duration completedCheckpointsCheckWindow = windowOpt.filter(d -> {
            if (d.compareTo(minCheckWindow) < 0) {
                LOG.debug("{} is not long enough. Default to max({} * {}, {} * {}): {}", new Object[]{KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW.key(), ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL.key(), ExecutionCheckpointingOptions.TOLERABLE_FAILURE_NUMBER.key(), ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.key(), ExecutionCheckpointingOptions.TOLERABLE_FAILURE_NUMBER.key(), minCheckWindow});
                return false;
            }
            return true;
        }).orElse(minCheckWindow);
        if (observedClusterHealthInfo.getNumCompletedCheckpoints() < lastValidClusterHealthInfo.getNumCompletedCheckpoints()) {
            LOG.debug("Observed health info number of completed checkpoints is less than in the last valid health info, skipping health check");
            lastValidClusterHealthInfo.setNumCompletedCheckpoints(observedClusterHealthInfo.getNumCompletedCheckpoints());
            lastValidClusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(observedClusterHealthInfo.getTimeStamp());
            return true;
        }
        long timestampDiffMs = observedClusterHealthInfo.getTimeStamp() - lastValidClusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp();
        LOG.debug("Time difference between health infos: {}", (Object)Duration.ofMillis(timestampDiffMs));
        boolean isHealthy = true;
        long completedCheckpointsCheckWindowMs = completedCheckpointsCheckWindow.toMillis();
        if (observedClusterHealthInfo.getNumCompletedCheckpoints() > lastValidClusterHealthInfo.getNumCompletedCheckpoints()) {
            LOG.debug("Last valid number of completed checkpoints increased marking timestamp");
            lastValidClusterHealthInfo.setNumCompletedCheckpoints(observedClusterHealthInfo.getNumCompletedCheckpoints());
            lastValidClusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(observedClusterHealthInfo.getTimeStamp());
        } else if (lastValidClusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp() + completedCheckpointsCheckWindowMs < this.clock.millis()) {
            LOG.info("Cluster is not able to complete checkpoints");
            isHealthy = false;
        }
        return isHealthy;
    }
}

