/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.utils;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import java.time.Instant;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import org.apache.flink.kubernetes.operator.api.listener.FlinkResourceListener;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.exception.StatusConflictException;
import org.apache.flink.kubernetes.operator.listener.AuditUtils;
import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StatusRecorder<CR extends AbstractFlinkResource<?, STATUS>, STATUS extends CommonStatus<?>> {
    private static final Logger LOG = LoggerFactory.getLogger(StatusRecorder.class);
    protected final ObjectMapper objectMapper = new ObjectMapper();
    protected final ConcurrentHashMap<ResourceID, ObjectNode> statusCache = new ConcurrentHashMap();
    private final KubernetesClient client;
    private final MetricManager<CR> metricManager;
    private final BiConsumer<CR, STATUS> statusUpdateListener;

    public StatusRecorder(KubernetesClient client, MetricManager<CR> metricManager, BiConsumer<CR, STATUS> statusUpdateListener) {
        this.client = client;
        this.statusUpdateListener = statusUpdateListener;
        this.metricManager = metricManager;
    }

    public void patchAndCacheStatus(CR resource) {
        ObjectNode newStatusNode = (ObjectNode)this.objectMapper.convertValue(resource.getStatus(), ObjectNode.class);
        ResourceID resourceId = ResourceID.fromResource(resource);
        ObjectNode previousStatusNode = this.statusCache.get(resourceId);
        if (newStatusNode.equals((Object)previousStatusNode)) {
            LOG.debug("No status change.");
            return;
        }
        Class statusClass = resource instanceof FlinkDeployment ? FlinkDeploymentStatus.class : FlinkSessionJobStatus.class;
        CommonStatus prevStatus = (CommonStatus)this.objectMapper.convertValue((Object)previousStatusNode, statusClass);
        KubernetesClientException err = null;
        for (int i = 0; i < 3; ++i) {
            try {
                this.replaceStatus(resource, prevStatus);
                err = null;
                continue;
            }
            catch (KubernetesClientException e) {
                LOG.error("Error while patching status, retrying {}/3...", (Object)(i + 1), (Object)e);
                Thread.sleep(1000L);
                err = e;
            }
        }
        if (err != null) {
            throw err;
        }
        this.statusCache.put(resourceId, newStatusNode);
        this.statusUpdateListener.accept(resource, prevStatus);
        this.metricManager.onUpdate(resource);
    }

    private void replaceStatus(CR resource, STATUS prevStatus) throws JsonProcessingException {
        int retries = 0;
        while (true) {
            try {
                AbstractFlinkResource updated = (AbstractFlinkResource)this.client.resource(resource).lockResourceVersion().replaceStatus();
                resource.getMetadata().setResourceVersion(updated.getMetadata().getResourceVersion());
                return;
            }
            catch (KubernetesClientException kce) {
                if (kce.getCode() == 409) {
                    String currentVersion = resource.getMetadata().getResourceVersion();
                    LOG.debug("Could not apply status update for resource version {}", (Object)currentVersion);
                    AbstractFlinkResource latest = (AbstractFlinkResource)this.client.resource(resource).fromServer().get();
                    String latestVersion = latest.getMetadata().getResourceVersion();
                    if (latestVersion.equals(currentVersion)) {
                        LOG.error("Unable to fetch latest resource version");
                        throw kce;
                    }
                    if (((CommonStatus)latest.getStatus()).equals(prevStatus)) {
                        if (retries++ < 3) {
                            LOG.debug("Retrying status update for latest version {}", (Object)latestVersion);
                            resource.getMetadata().setResourceVersion(latestVersion);
                            continue;
                        }
                        throw kce;
                    }
                    throw new StatusConflictException("Status have been modified externally in version " + latestVersion + " Previous: " + this.objectMapper.writeValueAsString(prevStatus) + " Latest: " + this.objectMapper.writeValueAsString(latest.getStatus()));
                }
                throw kce;
            }
            break;
        }
    }

    public void updateStatusFromCache(CR resource) {
        ResourceID key = ResourceID.fromResource(resource);
        ObjectNode cachedStatus = this.statusCache.get(key);
        if (cachedStatus != null) {
            resource.setStatus((Object)((CommonStatus)this.objectMapper.convertValue((Object)cachedStatus, ((CommonStatus)resource.getStatus()).getClass())));
        } else {
            this.statusCache.put(key, (ObjectNode)this.objectMapper.convertValue(resource.getStatus(), ObjectNode.class));
            if (ResourceLifecycleState.CREATED.equals((Object)((CommonStatus)resource.getStatus()).getLifecycleState())) {
                this.statusUpdateListener.accept(resource, (CommonStatus)resource.getStatus());
            }
        }
        this.metricManager.onUpdate(resource);
    }

    public void removeCachedStatus(CR resource) {
        this.statusCache.remove(ResourceID.fromResource(resource));
        this.metricManager.onRemove(resource);
    }

    public static <S extends CommonStatus<?>, CR extends AbstractFlinkResource<?, S>> StatusRecorder<CR, S> create(KubernetesClient kubernetesClient, MetricManager<CR> metricManager, Collection<FlinkResourceListener> listeners) {
        BiConsumer<AbstractFlinkResource, CommonStatus> consumer = (resource, previousStatus) -> {
            Instant now = Instant.now();
            FlinkResourceListener.StatusUpdateContext ctx = new FlinkResourceListener.StatusUpdateContext((CommonStatus)previousStatus, (AbstractFlinkResource)resource, kubernetesClient, now){
                final /* synthetic */ CommonStatus val$previousStatus;
                final /* synthetic */ AbstractFlinkResource val$resource;
                final /* synthetic */ KubernetesClient val$kubernetesClient;
                final /* synthetic */ Instant val$now;
                {
                    this.val$previousStatus = commonStatus;
                    this.val$resource = abstractFlinkResource;
                    this.val$kubernetesClient = kubernetesClient;
                    this.val$now = instant;
                }

                public S getPreviousStatus() {
                    return this.val$previousStatus;
                }

                public AbstractFlinkResource<?, S> getFlinkResource() {
                    return this.val$resource;
                }

                public KubernetesClient getKubernetesClient() {
                    return this.val$kubernetesClient;
                }

                public Instant getTimestamp() {
                    return this.val$now;
                }
            };
            listeners.forEach(arg_0 -> StatusRecorder.lambda$create$0(resource, ctx, arg_0));
            AuditUtils.logContext(ctx);
        };
        return new StatusRecorder<AbstractFlinkResource, CommonStatus>(kubernetesClient, metricManager, consumer);
    }

    private static /* synthetic */ void lambda$create$0(AbstractFlinkResource resource, 1 ctx, FlinkResourceListener listener) {
        if (resource instanceof FlinkDeployment) {
            listener.onDeploymentStatusUpdate((FlinkResourceListener.StatusUpdateContext)ctx);
        } else {
            listener.onSessionJobStatusUpdate((FlinkResourceListener.StatusUpdateContext)ctx);
        }
    }
}

