/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.observer;

import java.time.Clock;
import java.util.List;
import java.util.Map;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.health.ClusterHealthInfo;
import org.apache.flink.kubernetes.operator.observer.ClusterHealthEvaluator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusterHealthObserver {
    private static final Logger LOG = LoggerFactory.getLogger(ClusterHealthObserver.class);
    private static final String FULL_RESTARTS_METRIC_NAME = "fullRestarts";
    private static final String NUM_RESTARTS_METRIC_NAME = "numRestarts";
    private static final String NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME = "numberOfCompletedCheckpoints";
    private final ClusterHealthEvaluator clusterHealthEvaluator = new ClusterHealthEvaluator(Clock.systemDefaultZone());

    public void observe(FlinkResourceContext<FlinkDeployment> ctx) {
        FlinkDeployment flinkApp = ctx.getResource();
        try {
            LOG.debug("Observing cluster health");
            FlinkDeploymentStatus deploymentStatus = (FlinkDeploymentStatus)flinkApp.getStatus();
            JobStatus jobStatus = deploymentStatus.getJobStatus();
            String jobId = jobStatus.getJobId();
            Map<String, String> metrics = ctx.getFlinkService().getMetrics(ctx.getObserveConfig(), jobId, List.of(FULL_RESTARTS_METRIC_NAME, NUM_RESTARTS_METRIC_NAME, NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME));
            ClusterHealthInfo observedClusterHealthInfo = new ClusterHealthInfo();
            if (metrics.containsKey(NUM_RESTARTS_METRIC_NAME)) {
                LOG.debug("numRestarts metric is used");
                observedClusterHealthInfo.setNumRestarts(Integer.parseInt(metrics.get(NUM_RESTARTS_METRIC_NAME)));
            } else if (metrics.containsKey(FULL_RESTARTS_METRIC_NAME)) {
                LOG.debug("fullRestarts metric is used because numRestarts is missing");
                observedClusterHealthInfo.setNumRestarts(Integer.parseInt(metrics.get(FULL_RESTARTS_METRIC_NAME)));
            } else {
                throw new IllegalStateException("No job restart metric found. Either fullRestarts(old and deprecated in never Flink versions) or numRestarts(new) must exist.");
            }
            observedClusterHealthInfo.setNumCompletedCheckpoints(Integer.parseInt(metrics.get(NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME)));
            LOG.debug("Observed cluster health: {}", (Object)observedClusterHealthInfo);
            this.clusterHealthEvaluator.evaluate(ctx.getObserveConfig(), deploymentStatus.getClusterInfo(), observedClusterHealthInfo);
        }
        catch (Exception e) {
            LOG.warn("Exception while observing cluster health: {}", (Object)e.getMessage());
        }
    }
}

