/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.utils;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import java.time.Instant;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import org.apache.flink.kubernetes.operator.api.listener.FlinkResourceListener;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
import org.apache.flink.kubernetes.operator.exception.StatusConflictException;
import org.apache.flink.kubernetes.operator.listener.AuditUtils;
import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StatusRecorder<CR extends CustomResource<?, STATUS>, STATUS> {
    private static final Logger LOG = LoggerFactory.getLogger(StatusRecorder.class);
    protected final ObjectMapper objectMapper = new ObjectMapper();
    protected final ConcurrentHashMap<ResourceID, ObjectNode> statusCache = new ConcurrentHashMap();
    private final MetricManager<CR> metricManager;
    private final BiConsumer<CR, STATUS> statusUpdateListener;

    public StatusRecorder(MetricManager<CR> metricManager, BiConsumer<CR, STATUS> statusUpdateListener) {
        this.statusUpdateListener = statusUpdateListener;
        this.metricManager = metricManager;
    }

    public void notifyListeners(CR resource, STATUS prevStatus) {
        this.statusUpdateListener.accept(resource, prevStatus);
    }

    public void patchAndCacheStatus(CR resource, KubernetesClient client) {
        ObjectNode newStatusNode = (ObjectNode)this.objectMapper.convertValue(resource.getStatus(), ObjectNode.class);
        ResourceID resourceId = ResourceID.fromResource(resource);
        ObjectNode previousStatusNode = this.statusCache.get(resourceId);
        if (newStatusNode.equals((Object)previousStatusNode)) {
            LOG.debug("No status change.");
            return;
        }
        STATUS prevStatus = this.convertPreviousStatus(resource, previousStatusNode);
        KubernetesClientException err = null;
        for (int i = 0; i < 3; ++i) {
            try {
                this.replaceStatus(resource, prevStatus, client);
                err = null;
                continue;
            }
            catch (KubernetesClientException e) {
                LOG.error("Error while patching status, retrying {}/3...", (Object)(i + 1), (Object)e);
                Thread.sleep(1000L);
                err = e;
            }
        }
        if (err != null) {
            throw err;
        }
        this.statusCache.put(resourceId, newStatusNode);
        this.statusUpdateListener.accept(resource, prevStatus);
        this.metricManager.onUpdate(resource);
    }

    private STATUS convertPreviousStatus(CR resource, ObjectNode previousStatusNode) {
        Class<FlinkDeploymentStatus> statusClass;
        if (resource instanceof FlinkDeployment) {
            statusClass = FlinkDeploymentStatus.class;
        } else if (resource instanceof FlinkSessionJob) {
            statusClass = FlinkSessionJobStatus.class;
        } else if (resource instanceof FlinkStateSnapshot) {
            statusClass = FlinkStateSnapshotStatus.class;
        } else {
            throw new RuntimeException(String.format("Resource is unknown class: %s", resource.getClass()));
        }
        return (STATUS)this.objectMapper.convertValue((Object)previousStatusNode, statusClass);
    }

    private void replaceStatus(CR resource, STATUS prevStatus, KubernetesClient client) throws JsonProcessingException {
        int retries = 0;
        while (true) {
            try {
                CustomResource updated = (CustomResource)client.resource(resource).lockResourceVersion().updateStatus();
                resource.getMetadata().setResourceVersion(updated.getMetadata().getResourceVersion());
                return;
            }
            catch (KubernetesClientException kce) {
                if (kce.getCode() == 409) {
                    this.handleLockingError(resource, prevStatus, client, retries, kce);
                    ++retries;
                    continue;
                }
                throw kce;
            }
            break;
        }
    }

    @VisibleForTesting
    void handleLockingError(CR resource, STATUS prevStatus, KubernetesClient client, int retries, KubernetesClientException kce) throws JsonProcessingException {
        String currentVersion = resource.getMetadata().getResourceVersion();
        LOG.debug("Could not apply status update for resource version {}", (Object)currentVersion);
        CustomResource latest = (CustomResource)client.resource(resource).get();
        if (latest == null || latest.getMetadata() == null) {
            throw new KubernetesClientException(String.format("Failed to retrieve latest %s", latest == null ? "resource" : "metadata"), (Throwable)kce);
        }
        String latestVersion = latest.getMetadata().getResourceVersion();
        if (currentVersion.equals(latestVersion)) {
            LOG.error("Unable to fetch latest resource version");
            throw kce;
        }
        if (latest.getStatus().equals(prevStatus)) {
            if (retries >= 3) {
                throw kce;
            }
        } else {
            throw new StatusConflictException("Status have been modified externally in version " + latestVersion + " Previous: " + this.objectMapper.writeValueAsString(prevStatus) + " Latest: " + this.objectMapper.writeValueAsString(latest.getStatus()));
        }
        LOG.debug("Retrying status update for latest version {}", (Object)latestVersion);
        resource.getMetadata().setResourceVersion(latestVersion);
    }

    public void updateStatusFromCache(CR resource) {
        ResourceID key = ResourceID.fromResource(resource);
        ObjectNode cachedStatus = this.statusCache.get(key);
        if (cachedStatus != null) {
            resource.setStatus(this.objectMapper.convertValue((Object)cachedStatus, resource.getStatus().getClass()));
        } else {
            this.statusCache.put(key, (ObjectNode)this.objectMapper.convertValue(resource.getStatus(), ObjectNode.class));
            if (resource.getStatus() instanceof CommonStatus && ResourceLifecycleState.CREATED.equals((Object)((CommonStatus)resource.getStatus()).getLifecycleState())) {
                this.statusUpdateListener.accept(resource, resource.getStatus());
            }
        }
        this.metricManager.onUpdate(resource);
    }

    public void cleanupForDeletion(CR resource) {
        ObjectNode prevJson = this.statusCache.remove(ResourceID.fromResource(resource));
        STATUS prevStatus = this.convertPreviousStatus(resource, prevJson);
        this.metricManager.onRemove(resource);
        this.statusUpdateListener.accept(resource, prevStatus);
    }

    public static <S extends CommonStatus<?>, CR extends AbstractFlinkResource<?, S>> StatusRecorder<CR, S> create(KubernetesClient kubernetesClient, MetricManager<CR> metricManager, Collection<FlinkResourceListener> listeners) {
        BiConsumer<AbstractFlinkResource, CommonStatus> consumer = (resource, previousStatus) -> {
            Instant now = Instant.now();
            FlinkResourceListener.StatusUpdateContext ctx = new FlinkResourceListener.StatusUpdateContext((CommonStatus)previousStatus, (AbstractFlinkResource)resource, kubernetesClient, now){
                final /* synthetic */ CommonStatus val$previousStatus;
                final /* synthetic */ AbstractFlinkResource val$resource;
                final /* synthetic */ KubernetesClient val$kubernetesClient;
                final /* synthetic */ Instant val$now;
                {
                    this.val$previousStatus = commonStatus;
                    this.val$resource = abstractFlinkResource;
                    this.val$kubernetesClient = kubernetesClient;
                    this.val$now = instant;
                }

                public S getPreviousStatus() {
                    return this.val$previousStatus;
                }

                public AbstractFlinkResource<?, S> getFlinkResource() {
                    return this.val$resource;
                }

                public KubernetesClient getKubernetesClient() {
                    return this.val$kubernetesClient;
                }

                public Instant getTimestamp() {
                    return this.val$now;
                }
            };
            listeners.forEach(arg_0 -> StatusRecorder.lambda$create$0(resource, ctx, arg_0));
            AuditUtils.logContext(ctx);
        };
        return new StatusRecorder<AbstractFlinkResource, CommonStatus>(metricManager, consumer);
    }

    public static StatusRecorder<FlinkStateSnapshot, FlinkStateSnapshotStatus> createForFlinkStateSnapshot(KubernetesClient kubernetesClient, MetricManager<FlinkStateSnapshot> metricManager, Collection<FlinkResourceListener> listeners) {
        BiConsumer<FlinkStateSnapshot, FlinkStateSnapshotStatus> consumer = (resource, previousStatus) -> {
            Instant now = Instant.now();
            FlinkResourceListener.FlinkStateSnapshotStatusUpdateContext ctx = new FlinkResourceListener.FlinkStateSnapshotStatusUpdateContext((FlinkStateSnapshotStatus)previousStatus, kubernetesClient, (FlinkStateSnapshot)resource, now){
                final /* synthetic */ FlinkStateSnapshotStatus val$previousStatus;
                final /* synthetic */ KubernetesClient val$kubernetesClient;
                final /* synthetic */ FlinkStateSnapshot val$resource;
                final /* synthetic */ Instant val$now;
                {
                    this.val$previousStatus = flinkStateSnapshotStatus;
                    this.val$kubernetesClient = kubernetesClient;
                    this.val$resource = flinkStateSnapshot;
                    this.val$now = instant;
                }

                public FlinkStateSnapshotStatus getPreviousStatus() {
                    return this.val$previousStatus;
                }

                public KubernetesClient getKubernetesClient() {
                    return this.val$kubernetesClient;
                }

                public FlinkStateSnapshot getStateSnapshot() {
                    return this.val$resource;
                }

                public Instant getTimestamp() {
                    return this.val$now;
                }
            };
            listeners.forEach(arg_0 -> StatusRecorder.lambda$createForFlinkStateSnapshot$2(ctx, arg_0));
            AuditUtils.logContext(ctx);
        };
        return new StatusRecorder<FlinkStateSnapshot, FlinkStateSnapshotStatus>(metricManager, consumer);
    }

    private static /* synthetic */ void lambda$createForFlinkStateSnapshot$2(2 ctx, FlinkResourceListener listener) {
        listener.onStateSnapshotStatusUpdate((FlinkResourceListener.FlinkStateSnapshotStatusUpdateContext)ctx);
    }

    private static /* synthetic */ void lambda$create$0(AbstractFlinkResource resource, 1 ctx, FlinkResourceListener listener) {
        if (resource instanceof FlinkDeployment) {
            listener.onDeploymentStatusUpdate((FlinkResourceListener.StatusUpdateContext)ctx);
        } else {
            listener.onSessionJobStatusUpdate((FlinkResourceListener.StatusUpdateContext)ctx);
        }
    }
}

