/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.operator.autoscaler;

import io.javaoperatorsdk.operator.processing.event.ResourceID;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.realizer.ScalingRealizer;
import org.apache.flink.autoscaler.tuning.ConfigChanges;
import org.apache.flink.autoscaler.tuning.MemoryTuning;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.Resource;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KubernetesScalingRealizer
implements ScalingRealizer<ResourceID, KubernetesJobAutoScalerContext> {
    private static final Logger LOG = LoggerFactory.getLogger(KubernetesScalingRealizer.class);

    public void realizeParallelismOverrides(KubernetesJobAutoScalerContext context, Map<String, String> parallelismOverrides) {
        ((AbstractFlinkSpec)context.getResource().getSpec()).getFlinkConfiguration().put(PipelineOptions.PARALLELISM_OVERRIDES.key(), KubernetesScalingRealizer.getOverrideString(context, parallelismOverrides));
    }

    public void realizeConfigOverrides(KubernetesJobAutoScalerContext context, ConfigChanges configChanges) {
        if (!(context.getResource() instanceof FlinkDeployment)) {
            return;
        }
        FlinkDeployment flinkDeployment = (FlinkDeployment)context.getResource();
        Map flinkConf = ((FlinkDeploymentSpec)flinkDeployment.getSpec()).getFlinkConfiguration();
        for (String keyToRemove : configChanges.getRemovals()) {
            flinkConf.remove(keyToRemove);
        }
        flinkConf.putAll(configChanges.getOverrides());
        MemorySize totalMemoryOverride = MemoryTuning.getTotalMemory((Configuration)Configuration.fromMap((Map)flinkConf), (JobAutoScalerContext)context);
        if (totalMemoryOverride.compareTo(MemorySize.ZERO) <= 0) {
            LOG.warn("Total memory override {} is not valid", (Object)totalMemoryOverride);
            return;
        }
        Resource tmResource = ((FlinkDeploymentSpec)flinkDeployment.getSpec()).getTaskManager().getResource();
        MemorySize currentMemory = MemorySize.parse((String)FlinkConfigBuilder.parseResourceMemoryString(tmResource.getMemory()));
        if (!totalMemoryOverride.equals((Object)currentMemory)) {
            tmResource.setMemory(String.valueOf(totalMemoryOverride.getBytes()));
        }
    }

    @Nullable
    private static String getOverrideString(KubernetesJobAutoScalerContext context, Map<String, String> newOverrides) {
        if (((CommonStatus)context.getResource().getStatus()).getReconciliationStatus().isBeforeFirstDeployment()) {
            return (String)ConfigurationUtils.convertValue(newOverrides, String.class);
        }
        Configuration conf = context.getResourceContext().getObserveConfig();
        Map currentOverrides = conf.getOptional(PipelineOptions.PARALLELISM_OVERRIDES).orElse(Map.of());
        if (currentOverrides.equals(newOverrides)) {
            return conf.getValue(PipelineOptions.PARALLELISM_OVERRIDES);
        }
        return (String)ConfigurationUtils.convertValue(newOverrides, String.class);
    }
}

