/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.observer;

import java.time.Clock;
import java.util.List;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.health.ClusterHealthInfo;
import org.apache.flink.kubernetes.operator.observer.ClusterHealthEvaluator;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusterHealthObserver {
    private static final Logger LOG = LoggerFactory.getLogger(ClusterHealthObserver.class);
    private static final String FULL_RESTARTS_METRIC_NAME = "fullRestarts";
    private static final String NUM_RESTARTS_METRIC_NAME = "numRestarts";
    private final FlinkService flinkService;
    private final ClusterHealthEvaluator clusterHealthEvaluator;

    public ClusterHealthObserver(FlinkService flinkService) {
        this.flinkService = flinkService;
        this.clusterHealthEvaluator = new ClusterHealthEvaluator(Clock.systemDefaultZone());
    }

    public void observe(FlinkDeployment flinkApp, Configuration deployedConfig) {
        try {
            ClusterHealthInfo observedClusterHealthInfo;
            LOG.info("Observing cluster health");
            FlinkDeploymentStatus deploymentStatus = (FlinkDeploymentStatus)flinkApp.getStatus();
            JobStatus jobStatus = deploymentStatus.getJobStatus();
            String jobId = jobStatus.getJobId();
            Map<String, String> metrics = this.flinkService.getMetrics(deployedConfig, jobId, List.of(FULL_RESTARTS_METRIC_NAME, NUM_RESTARTS_METRIC_NAME));
            if (metrics.containsKey(NUM_RESTARTS_METRIC_NAME)) {
                LOG.debug("numRestarts metric is used");
                observedClusterHealthInfo = ClusterHealthInfo.of(Integer.parseInt(metrics.get(NUM_RESTARTS_METRIC_NAME)));
            } else if (metrics.containsKey(FULL_RESTARTS_METRIC_NAME)) {
                LOG.debug("fullRestarts metric is used because numRestarts is missing");
                observedClusterHealthInfo = ClusterHealthInfo.of(Integer.parseInt(metrics.get(FULL_RESTARTS_METRIC_NAME)));
            } else {
                throw new IllegalStateException("No job restart metric found. Either fullRestarts(old and deprecated in never Flink versions) or numRestarts(new) must exist.");
            }
            LOG.debug("Observed cluster health: {}", (Object)observedClusterHealthInfo);
            this.clusterHealthEvaluator.evaluate(deployedConfig, deploymentStatus.getClusterInfo(), observedClusterHealthInfo);
        }
        catch (Exception e) {
            LOG.warn("Exception while observing cluster health: {}", (Object)e.getMessage());
        }
    }
}

