/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.controller;

import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import java.util.ArrayList;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.exception.UpgradeFailureException;
import org.apache.flink.kubernetes.operator.health.CanaryResourceManager;
import org.apache.flink.kubernetes.operator.observer.deployment.FlinkDeploymentObserverFactory;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ControllerConfiguration
public class FlinkDeploymentController
implements Reconciler<FlinkDeployment>,
ErrorStatusHandler<FlinkDeployment>,
EventSourceInitializer<FlinkDeployment>,
Cleaner<FlinkDeployment> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkDeploymentController.class);
    private final Set<FlinkResourceValidator> validators;
    private final FlinkResourceContextFactory ctxFactory;
    private final ReconcilerFactory reconcilerFactory;
    private final FlinkDeploymentObserverFactory observerFactory;
    private final StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder;
    private final EventRecorder eventRecorder;
    private final CanaryResourceManager<FlinkDeployment> canaryResourceManager;

    public FlinkDeploymentController(Set<FlinkResourceValidator> validators, FlinkResourceContextFactory ctxFactory, ReconcilerFactory reconcilerFactory, FlinkDeploymentObserverFactory observerFactory, StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder, EventRecorder eventRecorder, CanaryResourceManager<FlinkDeployment> canaryResourceManager) {
        this.validators = validators;
        this.ctxFactory = ctxFactory;
        this.reconcilerFactory = reconcilerFactory;
        this.observerFactory = observerFactory;
        this.statusRecorder = statusRecorder;
        this.eventRecorder = eventRecorder;
        this.canaryResourceManager = canaryResourceManager;
    }

    public DeleteControl cleanup(FlinkDeployment flinkApp, Context josdkContext) {
        if (this.canaryResourceManager.handleCanaryResourceDeletion(flinkApp)) {
            return DeleteControl.defaultDelete();
        }
        this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)flinkApp, EventRecorder.Type.Normal, EventRecorder.Reason.Cleanup, EventRecorder.Component.Operator, "Cleaning up FlinkDeployment", josdkContext.getClient());
        this.statusRecorder.updateStatusFromCache(flinkApp);
        ((FlinkDeploymentStatus)flinkApp.getStatus()).setLifecycleState(ResourceLifecycleState.DELETING);
        FlinkResourceContext<FlinkDeployment> ctx = this.ctxFactory.getResourceContext(flinkApp, josdkContext);
        try {
            this.observerFactory.getOrCreate(flinkApp).observe(ctx);
        }
        catch (Exception err) {
            LOG.error("Error while observing for cleanup", (Throwable)err);
        }
        DeleteControl deleteControl = this.reconcilerFactory.getOrCreate(flinkApp).cleanup(ctx);
        if (deleteControl.isRemoveFinalizer()) {
            ((FlinkDeploymentStatus)flinkApp.getStatus()).setLifecycleState(ResourceLifecycleState.DELETED);
            this.statusRecorder.cleanupForDeletion(flinkApp);
            this.ctxFactory.cleanup(flinkApp);
        } else {
            this.statusRecorder.patchAndCacheStatus(flinkApp, ctx.getKubernetesClient());
        }
        return deleteControl;
    }

    public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context josdkContext) throws Exception {
        if (this.canaryResourceManager.handleCanaryResourceReconciliation(flinkApp, josdkContext.getClient())) {
            return UpdateControl.noUpdate();
        }
        LOG.debug("Starting reconciliation");
        this.statusRecorder.updateStatusFromCache(flinkApp);
        FlinkDeployment previousDeployment = ReconciliationUtils.clone(flinkApp);
        FlinkResourceContext<FlinkDeployment> ctx = this.ctxFactory.getResourceContext(flinkApp, josdkContext);
        if (!ValidatorUtils.validateSupportedVersion(ctx, this.eventRecorder)) {
            return UpdateControl.noUpdate();
        }
        try {
            this.observerFactory.getOrCreate(flinkApp).observe(ctx);
            if (!this.validateDeployment(ctx)) {
                this.statusRecorder.patchAndCacheStatus(flinkApp, ctx.getKubernetesClient());
                return ReconciliationUtils.toUpdateControl(ctx.getOperatorConfig(), flinkApp, previousDeployment, false);
            }
            this.statusRecorder.patchAndCacheStatus(flinkApp, ctx.getKubernetesClient());
            this.reconcilerFactory.getOrCreate(flinkApp).reconcile(ctx);
        }
        catch (UpgradeFailureException ufe) {
            this.handleUpgradeFailure(ctx, ufe);
        }
        catch (DeploymentFailedException dfe) {
            this.handleDeploymentFailed(ctx, dfe);
        }
        catch (Exception e) {
            this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)flinkApp, EventRecorder.Type.Warning, "ClusterDeploymentException", e.getMessage(), EventRecorder.Component.JobManagerDeployment, josdkContext.getClient());
            throw new ReconciliationException(e);
        }
        LOG.debug("End of reconciliation");
        this.statusRecorder.patchAndCacheStatus(flinkApp, ctx.getKubernetesClient());
        return ReconciliationUtils.toUpdateControl(ctx.getOperatorConfig(), flinkApp, previousDeployment, true);
    }

    private void handleDeploymentFailed(FlinkResourceContext<FlinkDeployment> ctx, DeploymentFailedException dfe) {
        FlinkDeployment flinkApp = ctx.getResource();
        LOG.error("Flink Deployment failed", (Throwable)dfe);
        ((FlinkDeploymentStatus)flinkApp.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
        ((FlinkDeploymentStatus)flinkApp.getStatus()).getJobStatus().setState(JobStatus.RECONCILING);
        ReconciliationUtils.updateForReconciliationError(ctx, (Throwable)dfe);
        this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)flinkApp, EventRecorder.Type.Warning, dfe.getReason(), dfe.getMessage(), EventRecorder.Component.JobManagerDeployment, ctx.getKubernetesClient());
    }

    private void handleUpgradeFailure(FlinkResourceContext<FlinkDeployment> ctx, UpgradeFailureException ufe) {
        LOG.error("Error while upgrading Flink Deployment", (Throwable)ufe);
        FlinkDeployment flinkApp = ctx.getResource();
        ReconciliationUtils.updateForReconciliationError(ctx, (Throwable)ufe);
        this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)flinkApp, EventRecorder.Type.Warning, ufe.getReason(), ufe.getMessage(), EventRecorder.Component.JobManagerDeployment, ctx.getKubernetesClient());
    }

    public Map<String, EventSource> prepareEventSources(EventSourceContext<FlinkDeployment> context) {
        ArrayList<Object> eventSources = new ArrayList<Object>();
        eventSources.add(EventSourceUtils.getSessionJobInformerEventSource(context));
        eventSources.add(EventSourceUtils.getDeploymentInformerEventSource(context));
        if (KubernetesClientUtils.isCrdInstalled(FlinkStateSnapshot.class)) {
            eventSources.add(EventSourceUtils.getStateSnapshotForFlinkResourceInformerEventSource(context));
        } else {
            LOG.warn("Could not initialize informer for snapshots as the CRD has not been installed!");
        }
        return EventSourceInitializer.nameEventSources((EventSource[])((EventSource[])eventSources.toArray(EventSource[]::new)));
    }

    public ErrorStatusUpdateControl<FlinkDeployment> updateErrorStatus(FlinkDeployment flinkDeployment, Context<FlinkDeployment> context, Exception e) {
        FlinkResourceContext<FlinkDeployment> ctx = this.ctxFactory.getResourceContext(flinkDeployment, context);
        return ReconciliationUtils.toErrorStatusUpdateControl(ctx, e, this.statusRecorder);
    }

    private boolean validateDeployment(FlinkResourceContext<FlinkDeployment> ctx) {
        FlinkDeployment deployment = ctx.getResource();
        for (FlinkResourceValidator validator : this.validators) {
            Optional<String> validationError = validator.validateDeployment(deployment);
            if (!validationError.isPresent()) continue;
            this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>)deployment, EventRecorder.Type.Warning, EventRecorder.Reason.ValidationError, EventRecorder.Component.Operator, validationError.get(), ctx.getKubernetesClient());
            return ReconciliationUtils.applyValidationErrorAndResetSpec(ctx, validationError.get());
        }
        return true;
    }
}

