/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.observer.snapshot;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotContext;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.observer.CheckpointFetchResult;
import org.apache.flink.kubernetes.operator.observer.CheckpointStatsResult;
import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StateSnapshotObserver {
    private static final Logger LOG = LoggerFactory.getLogger(StateSnapshotObserver.class);
    private final FlinkResourceContextFactory ctxFactory;
    private final EventRecorder eventRecorder;

    public void observe(FlinkStateSnapshotContext ctx) {
        FlinkStateSnapshot resource = ctx.getResource();
        FlinkStateSnapshotStatus.State savepointState = ((FlinkStateSnapshotStatus)resource.getStatus()).getState();
        if (FlinkStateSnapshotStatus.State.IN_PROGRESS.equals((Object)savepointState)) {
            this.observeSnapshotState(ctx);
        }
    }

    private void observeSnapshotState(FlinkStateSnapshotContext ctx) {
        FlinkStateSnapshot resource = ctx.getResource();
        String resourceName = resource.getMetadata().getName();
        String triggerId = ((FlinkStateSnapshotStatus)resource.getStatus()).getTriggerId();
        if (StringUtils.isEmpty((CharSequence)triggerId)) {
            return;
        }
        LOG.debug("Observing snapshot state for resource {}...", (Object)resourceName);
        if (FlinkStateSnapshotUtils.abandonSnapshotIfJobNotRunning(ctx.getKubernetesClient(), ctx.getResource(), ctx.getSecondaryResource().orElse(null), this.eventRecorder)) {
            return;
        }
        String jobId = ((CommonStatus)ctx.getSecondaryResource().orElseThrow().getStatus()).getJobStatus().getJobId();
        FlinkResourceContext<FlinkDeployment> ctxFlinkDeployment = this.ctxFactory.getResourceContext(ctx.getReferencedJobFlinkDeployment(), ctx.getJosdkContext());
        Configuration observeConfig = ctx.getReferencedJobObserveConfig();
        if (((FlinkStateSnapshotSpec)resource.getSpec()).isSavepoint()) {
            SavepointFetchResult savepointInfo = ctxFlinkDeployment.getFlinkService().fetchSavepointInfo(triggerId, jobId, observeConfig);
            this.handleSavepoint(ctx, savepointInfo);
        } else {
            CheckpointFetchResult checkpointInfo = ctxFlinkDeployment.getFlinkService().fetchCheckpointInfo(triggerId, jobId, observeConfig);
            this.handleCheckpoint(ctx, checkpointInfo, ctxFlinkDeployment, jobId);
        }
    }

    private void handleSavepoint(FlinkStateSnapshotContext ctx, SavepointFetchResult savepointInfo) {
        FlinkStateSnapshot resource = ctx.getResource();
        String resourceName = resource.getMetadata().getName();
        if (savepointInfo.isPending()) {
            LOG.debug("Savepoint '{}' with ID {} is pending", (Object)resourceName, (Object)((FlinkStateSnapshotStatus)resource.getStatus()).getTriggerId());
        } else {
            if (savepointInfo.getError() != null) {
                throw new ReconciliationException(savepointInfo.getError());
            }
            LOG.info("Savepoint {} successful: {}", (Object)resourceName, (Object)savepointInfo.getLocation());
            FlinkStateSnapshotUtils.snapshotSuccessful(resource, savepointInfo.getLocation(), false);
        }
    }

    private void handleCheckpoint(FlinkStateSnapshotContext ctx, CheckpointFetchResult checkpointInfo, FlinkResourceContext<FlinkDeployment> ctxFlinkDeployment, String jobId) {
        FlinkStateSnapshot resource = ctx.getResource();
        String resourceName = resource.getMetadata().getName();
        if (checkpointInfo.isPending()) {
            LOG.debug("Checkpoint for {} with ID {} is pending", (Object)resourceName, (Object)((FlinkStateSnapshotStatus)resource.getStatus()).getTriggerId());
            return;
        }
        if (checkpointInfo.getError() != null) {
            throw new ReconciliationException(checkpointInfo.getError());
        }
        LOG.debug("Checkpoint {} was successful, querying final checkpoint path...", (Object)resourceName);
        CheckpointStatsResult checkpointStatsResult = ctxFlinkDeployment.getFlinkService().fetchCheckpointStats(jobId, checkpointInfo.getCheckpointId(), ctx.getReferencedJobObserveConfig());
        if (checkpointStatsResult.isPending()) {
            return;
        }
        String path = checkpointStatsResult.getPath();
        if (checkpointStatsResult.getError() != null) {
            path = "";
            String error = String.format("Checkpoint %s was successful, but failed to fetch path. Flink webserver stores only a limited amount of checkpoints in its cache, try increasing '%s' config for this job.\n%s", resourceName, WebOptions.CHECKPOINTS_HISTORY_SIZE.key(), checkpointStatsResult.getError());
            this.eventRecorder.triggerSnapshotEvent(resource, EventRecorder.Type.Warning, EventRecorder.Reason.CheckpointError, EventRecorder.Component.Snapshot, error, ctx.getKubernetesClient());
        }
        LOG.info("Checkpoint {} successful: {}", (Object)resourceName, (Object)path);
        FlinkStateSnapshotUtils.snapshotSuccessful(resource, path, false);
    }

    public StateSnapshotObserver(FlinkResourceContextFactory ctxFactory, EventRecorder eventRecorder) {
        this.ctxFactory = ctxFactory;
        this.eventRecorder = eventRecorder;
    }
}

