/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.observer;

import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SavepointInfo;
import org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.ConfigOptionUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SavepointObserver<CR extends AbstractFlinkResource<?, STATUS>, STATUS extends CommonStatus<?>> {
    private static final Logger LOG = LoggerFactory.getLogger(SavepointObserver.class);
    private final FlinkService flinkService;
    private final FlinkConfigManager configManager;
    private final EventRecorder eventRecorder;

    public SavepointObserver(FlinkService flinkService, FlinkConfigManager configManager, EventRecorder eventRecorder) {
        this.flinkService = flinkService;
        this.configManager = configManager;
        this.eventRecorder = eventRecorder;
    }

    public void observeSavepointStatus(CR resource, Configuration deployedConfig) {
        JobStatus jobStatus = ((CommonStatus)resource.getStatus()).getJobStatus();
        SavepointInfo savepointInfo = jobStatus.getSavepointInfo();
        String jobId = jobStatus.getJobId();
        if (SavepointUtils.savepointInProgress(jobStatus)) {
            this.observeTriggeredSavepoint((AbstractFlinkResource<?, ?>)resource, jobId, deployedConfig);
        }
        if (ReconciliationUtils.isJobInTerminalState((CommonStatus)resource.getStatus())) {
            this.observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
        }
        this.cleanupSavepointHistory(savepointInfo, deployedConfig);
    }

    private void observeTriggeredSavepoint(AbstractFlinkResource<?, ?> resource, String jobID, Configuration deployedConfig) {
        SavepointInfo savepointInfo = ((CommonStatus)resource.getStatus()).getJobStatus().getSavepointInfo();
        LOG.info("Observing savepoint status.");
        SavepointFetchResult savepointFetchResult = this.flinkService.fetchSavepointInfo(savepointInfo.getTriggerId(), jobID, deployedConfig);
        if (savepointFetchResult.isPending()) {
            LOG.info("Savepoint operation not finished yet...");
            return;
        }
        if (savepointFetchResult.getError() != null) {
            String err = savepointFetchResult.getError();
            if (SavepointUtils.gracePeriodEnded(deployedConfig, savepointInfo)) {
                LOG.error("Savepoint attempt failed after grace period. Won't be retried again: " + err);
                ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(savepointInfo, resource);
            } else {
                LOG.warn("Savepoint failed within grace period, retrying: " + err);
            }
            this.eventRecorder.triggerEvent(resource, EventRecorder.Type.Warning, EventRecorder.Reason.SavepointError, EventRecorder.Component.Operator, SavepointUtils.createSavepointError(savepointInfo, ((AbstractFlinkSpec)resource.getSpec()).getJob().getSavepointTriggerNonce()));
            savepointInfo.resetTrigger();
            return;
        }
        Savepoint savepoint = new Savepoint(savepointInfo.getTriggerTimestamp().longValue(), savepointFetchResult.getLocation(), savepointInfo.getTriggerType(), savepointInfo.getFormatType(), SavepointTriggerType.MANUAL == savepointInfo.getTriggerType() ? ((AbstractFlinkSpec)resource.getSpec()).getJob().getSavepointTriggerNonce() : null);
        ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(savepointInfo, resource);
        savepointInfo.updateLastSavepoint(savepoint);
    }

    @VisibleForTesting
    void cleanupSavepointHistory(SavepointInfo currentSavepointInfo, Configuration deployedConfig) {
        List savepointHistory = currentSavepointInfo.getSavepointHistory();
        if (savepointHistory.size() < 2) {
            return;
        }
        Savepoint lastSavepoint = (Savepoint)savepointHistory.get(savepointHistory.size() - 1);
        int maxCount = Math.max(1, ConfigOptionUtils.getValueWithThreshold(deployedConfig, KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT, this.configManager.getOperatorConfiguration().getSavepointHistoryCountThreshold()));
        while (savepointHistory.size() > maxCount) {
            this.disposeSavepointQuietly((Savepoint)savepointHistory.remove(0), deployedConfig);
        }
        Duration maxAge = ConfigOptionUtils.getValueWithThreshold(deployedConfig, KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE, this.configManager.getOperatorConfiguration().getSavepointHistoryAgeThreshold());
        long maxTms = System.currentTimeMillis() - maxAge.toMillis();
        Iterator it = savepointHistory.iterator();
        while (it.hasNext()) {
            Savepoint sp = (Savepoint)it.next();
            if (sp.getTimeStamp() >= maxTms || sp == lastSavepoint) continue;
            it.remove();
            this.disposeSavepointQuietly(sp, deployedConfig);
        }
    }

    private void disposeSavepointQuietly(Savepoint sp, Configuration conf) {
        try {
            LOG.info("Disposing savepoint {}", (Object)sp);
            this.flinkService.disposeSavepoint(sp.getLocation(), conf);
        }
        catch (Exception e) {
            LOG.error("Exception while disposing savepoint {}", (Object)sp.getLocation(), (Object)e);
        }
    }

    private void observeLatestSavepoint(SavepointInfo savepointInfo, String jobID, Configuration deployedConfig) {
        try {
            this.flinkService.getLastCheckpoint(JobID.fromHexString((String)jobID), deployedConfig).ifPresent(arg_0 -> ((SavepointInfo)savepointInfo).updateLastSavepoint(arg_0));
        }
        catch (Exception e) {
            LOG.error("Could not observe latest savepoint information.", (Throwable)e);
            throw new ReconciliationException(e);
        }
    }
}

