/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.failure;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.core.failure.FailureEnricherFactory;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FailureEnricherUtils {
    private static final Logger LOG = LoggerFactory.getLogger(FailureEnricherUtils.class);
    public static final CompletableFuture<Map<String, String>> EMPTY_FAILURE_LABELS = CompletableFuture.completedFuture(Collections.emptyMap());
    private static final Pattern enricherListPattern = Pattern.compile("\\s*,\\s*");
    static final String MERGE_EXCEPTION_MSG = "Trying to merge a label with a duplicate key %s. This is a bug that should be reported, because Flink shouldn't allow registering enrichers with the same output.";

    public static Collection<FailureEnricher> getFailureEnrichers(Configuration configuration) {
        PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);
        return FailureEnricherUtils.getFailureEnrichers(configuration, pluginManager);
    }

    @VisibleForTesting
    static Collection<FailureEnricher> getFailureEnrichers(Configuration configuration, PluginManager pluginManager) {
        Set<String> includedEnrichers = FailureEnricherUtils.getIncludedFailureEnrichers(configuration);
        if (includedEnrichers.isEmpty()) {
            return Collections.emptySet();
        }
        Iterator<FailureEnricherFactory> factoryIterator = pluginManager.load(FailureEnricherFactory.class);
        HashSet<FailureEnricher> failureEnrichers = new HashSet<FailureEnricher>();
        while (factoryIterator.hasNext()) {
            FailureEnricherFactory failureEnricherFactory = factoryIterator.next();
            FailureEnricher failureEnricher = failureEnricherFactory.createFailureEnricher(configuration);
            if (includedEnrichers.contains(failureEnricher.getClass().getName())) {
                failureEnrichers.add(failureEnricher);
                LOG.info("Found failure enricher {} at {}.", (Object)failureEnricherFactory.getClass().getName(), (Object)failureEnricher.getClass().getProtectionDomain().getCodeSource().getLocation().getPath());
                continue;
            }
            LOG.debug("Excluding failure enricher {}, not configured in enricher list ({}).", (Object)failureEnricherFactory.getClass().getName(), includedEnrichers);
        }
        return FailureEnricherUtils.filterInvalidEnrichers(failureEnrichers);
    }

    @VisibleForTesting
    static Set<String> getIncludedFailureEnrichers(Configuration configuration) {
        String includedEnrichersString = configuration.getString(JobManagerOptions.FAILURE_ENRICHERS_LIST, "");
        return enricherListPattern.splitAsStream(includedEnrichersString).filter(r -> !r.isEmpty()).collect(Collectors.toSet());
    }

    @VisibleForTesting
    static Collection<FailureEnricher> filterInvalidEnrichers(Set<FailureEnricher> failureEnrichers) {
        HashMap enrichersByKey = new HashMap();
        failureEnrichers.forEach(enricher -> enricher.getOutputKeys().forEach(enricherKey -> enrichersByKey.computeIfAbsent(enricherKey, ignored -> new HashSet()).add(enricher.getClass())));
        Set invalidEnrichers = enrichersByKey.entrySet().stream().filter(entry -> ((Set)entry.getValue()).size() > 1).flatMap(entry -> {
            LOG.warn("Following enrichers have have registered duplicate output key [%s] and will be ignored: {}.", (Object)((Set)entry.getValue()).stream().map(Class::getName).collect(Collectors.joining(", ")));
            return ((Set)entry.getValue()).stream();
        }).collect(Collectors.toSet());
        return failureEnrichers.stream().filter(enricher -> !invalidEnrichers.contains(enricher.getClass())).collect(Collectors.toList());
    }

    public static CompletableFuture<Map<String, String>> labelFailure(Throwable cause, FailureEnricher.Context context, Executor mainThreadExecutor, Collection<FailureEnricher> failureEnrichers) {
        ArrayList<CompletionStage> enrichFutures = new ArrayList<CompletionStage>();
        for (FailureEnricher enricher : failureEnrichers) {
            enrichFutures.add(((CompletableFuture)enricher.processFailure(cause, context).thenApply(enricherLabels -> {
                HashMap validLabels = new HashMap();
                enricherLabels.forEach((k, v) -> {
                    if (!enricher.getOutputKeys().contains(k)) {
                        LOG.warn("Ignoring label with key {} from enricher {} violating contract, keys allowed {}.", new Object[]{k, enricher.getClass(), enricher.getOutputKeys()});
                    } else {
                        validLabels.put(k, v);
                    }
                });
                return validLabels;
            })).exceptionally(t -> {
                LOG.warn("Enricher {} threw an exception.", enricher.getClass(), t);
                return Collections.emptyMap();
            }));
        }
        return FutureUtils.combineAll(enrichFutures).thenApplyAsync(labelsToMerge -> {
            HashMap mergedLabels = new HashMap();
            for (Map labels : labelsToMerge) {
                labels.forEach((k, v) -> mergedLabels.merge(k, v, (first, second) -> {
                    throw new FlinkRuntimeException(String.format(MERGE_EXCEPTION_MSG, k));
                }));
            }
            return mergedLabels;
        }, mainThreadExecutor);
    }
}

