/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.elasticsearch6.shaded.org.elasticsearch.gateway;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.elasticsearch6.shaded.com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ElasticsearchTimeoutException;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ExceptionsHelper;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.ActionListener;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.FailedNodeException;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.cluster.node.DiscoveryNode;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.cluster.node.DiscoveryNodes;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.Nullable;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.lease.Releasable;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.index.shard.ShardId;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.transport.ReceiveTimeoutTransportException;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;

public abstract class AsyncShardFetch<T extends BaseNodeResponse>
implements Releasable {
    protected final Logger logger;
    protected final String type;
    protected final ShardId shardId;
    private final Lister<BaseNodesResponse<T>, T> action;
    private final Map<String, NodeEntry<T>> cache = new HashMap<String, NodeEntry<T>>();
    private final Set<String> nodesToIgnore = new HashSet<String>();
    private final AtomicLong round = new AtomicLong();
    private boolean closed;

    protected AsyncShardFetch(Logger logger, String type, ShardId shardId, Lister<? extends BaseNodesResponse<T>, T> action) {
        this.logger = logger;
        this.type = type;
        this.shardId = shardId;
        this.action = action;
    }

    @Override
    public synchronized void close() {
        this.closed = true;
    }

    public synchronized int getNumberOfInFlightFetches() {
        int count = 0;
        for (NodeEntry<T> nodeEntry : this.cache.values()) {
            if (!nodeEntry.isFetching()) continue;
            ++count;
        }
        return count;
    }

    public synchronized FetchResult<T> fetchData(DiscoveryNodes nodes, Set<String> ignoreNodes) {
        if (this.closed) {
            throw new IllegalStateException(this.shardId + ": can't fetch data on closed async fetch");
        }
        this.nodesToIgnore.addAll(ignoreNodes);
        this.fillShardCacheWithDataNodes(this.cache, nodes);
        List<NodeEntry<T>> nodesToFetch = this.findNodesToFetch(this.cache);
        if (!nodesToFetch.isEmpty()) {
            long fetchingRound = this.round.incrementAndGet();
            for (NodeEntry<T> nodeEntry : nodesToFetch) {
                nodeEntry.markAsFetching(fetchingRound);
            }
            DiscoveryNode[] discoNodesToFetch = (DiscoveryNode[])nodesToFetch.stream().map(NodeEntry::getNodeId).map(nodes::get).toArray(DiscoveryNode[]::new);
            this.asyncFetch(discoNodesToFetch, fetchingRound);
        }
        if (this.hasAnyNodeFetching(this.cache)) {
            return new FetchResult(this.shardId, null, Collections.emptySet());
        }
        HashMap<DiscoveryNode, BaseNodeResponse> fetchData = new HashMap<DiscoveryNode, BaseNodeResponse>();
        HashSet<String> failedNodes = new HashSet<String>();
        Iterator<Map.Entry<String, NodeEntry<T>>> it = this.cache.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, NodeEntry<T>> entry = it.next();
            String nodeId = entry.getKey();
            NodeEntry<T> nodeEntry = entry.getValue();
            DiscoveryNode node = nodes.get(nodeId);
            if (node == null) continue;
            if (nodeEntry.isFailed()) {
                it.remove();
                failedNodes.add(nodeEntry.getNodeId());
                continue;
            }
            if (nodeEntry.getValue() == null) continue;
            fetchData.put(node, (BaseNodeResponse)nodeEntry.getValue());
        }
        Set<String> allIgnoreNodes = Collections.unmodifiableSet(new HashSet<String>(this.nodesToIgnore));
        this.nodesToIgnore.clear();
        if (!failedNodes.isEmpty() || !allIgnoreNodes.isEmpty()) {
            this.reroute(this.shardId, "nodes failed [" + failedNodes.size() + "], ignored [" + allIgnoreNodes.size() + "]");
        }
        return new FetchResult(this.shardId, fetchData, allIgnoreNodes);
    }

    protected synchronized void processAsyncFetch(List<T> responses, List<FailedNodeException> failures, long fetchingRound) {
        NodeEntry<BaseNodeResponse> nodeEntry;
        if (this.closed) {
            this.logger.trace("{} ignoring fetched [{}] results, already closed", (Object)this.shardId, (Object)this.type);
            return;
        }
        this.logger.trace("{} processing fetched [{}] results", (Object)this.shardId, (Object)this.type);
        if (responses != null) {
            for (BaseNodeResponse response : responses) {
                nodeEntry = this.cache.get(response.getNode().getId());
                if (nodeEntry == null) continue;
                if (nodeEntry.getFetchingRound() != fetchingRound) {
                    assert (nodeEntry.getFetchingRound() > fetchingRound) : "node entries only replaced by newer rounds";
                    this.logger.trace("{} received response for [{}] from node {} for an older fetching round (expected: {} but was: {})", (Object)this.shardId, (Object)nodeEntry.getNodeId(), (Object)this.type, (Object)nodeEntry.getFetchingRound(), (Object)fetchingRound);
                    continue;
                }
                if (nodeEntry.isFailed()) {
                    this.logger.trace("{} node {} has failed for [{}] (failure [{}])", (Object)this.shardId, (Object)nodeEntry.getNodeId(), (Object)this.type, (Object)nodeEntry.getFailure());
                    continue;
                }
                this.logger.trace("{} marking {} as done for [{}], result is [{}]", (Object)this.shardId, (Object)nodeEntry.getNodeId(), (Object)this.type, (Object)response);
                nodeEntry.doneFetching(response);
            }
        }
        if (failures != null) {
            for (FailedNodeException failure : failures) {
                this.logger.trace("{} processing failure {} for [{}]", (Object)this.shardId, (Object)failure, (Object)this.type);
                nodeEntry = this.cache.get(failure.nodeId());
                if (nodeEntry == null) continue;
                if (nodeEntry.getFetchingRound() != fetchingRound) {
                    assert (nodeEntry.getFetchingRound() > fetchingRound) : "node entries only replaced by newer rounds";
                    this.logger.trace("{} received failure for [{}] from node {} for an older fetching round (expected: {} but was: {})", (Object)this.shardId, (Object)nodeEntry.getNodeId(), (Object)this.type, (Object)nodeEntry.getFetchingRound(), (Object)fetchingRound);
                    continue;
                }
                if (nodeEntry.isFailed()) continue;
                Throwable unwrappedCause = ExceptionsHelper.unwrapCause(failure.getCause());
                if (unwrappedCause instanceof EsRejectedExecutionException || unwrappedCause instanceof ReceiveTimeoutTransportException || unwrappedCause instanceof ElasticsearchTimeoutException) {
                    nodeEntry.restartFetching();
                    continue;
                }
                this.logger.warn(() -> new ParameterizedMessage("{}: failed to list shard for {} on node [{}]", new Object[]{this.shardId, this.type, failure.nodeId()}), (Throwable)failure);
                nodeEntry.doneFetching(failure.getCause());
            }
        }
        this.reroute(this.shardId, "post_response");
    }

    protected abstract void reroute(ShardId var1, String var2);

    private void fillShardCacheWithDataNodes(Map<String, NodeEntry<T>> shardCache, DiscoveryNodes nodes) {
        for (ObjectObjectCursor<String, DiscoveryNode> objectObjectCursor : nodes.getDataNodes()) {
            DiscoveryNode node = (DiscoveryNode)objectObjectCursor.value;
            if (shardCache.containsKey(node.getId())) continue;
            shardCache.put(node.getId(), new NodeEntry(node.getId()));
        }
        shardCache.keySet().removeIf(nodeId -> !nodes.nodeExists((String)nodeId));
    }

    private List<NodeEntry<T>> findNodesToFetch(Map<String, NodeEntry<T>> shardCache) {
        ArrayList<NodeEntry<T>> nodesToFetch = new ArrayList<NodeEntry<T>>();
        for (NodeEntry<T> nodeEntry : shardCache.values()) {
            if (nodeEntry.hasData() || nodeEntry.isFetching()) continue;
            nodesToFetch.add(nodeEntry);
        }
        return nodesToFetch;
    }

    private boolean hasAnyNodeFetching(Map<String, NodeEntry<T>> shardCache) {
        for (NodeEntry<T> nodeEntry : shardCache.values()) {
            if (!nodeEntry.isFetching()) continue;
            return true;
        }
        return false;
    }

    void asyncFetch(final DiscoveryNode[] nodes, final long fetchingRound) {
        this.logger.trace("{} fetching [{}] from {}", (Object)this.shardId, (Object)this.type, (Object)nodes);
        this.action.list(this.shardId, nodes, new ActionListener<BaseNodesResponse<T>>(){

            @Override
            public void onResponse(BaseNodesResponse<T> response) {
                AsyncShardFetch.this.processAsyncFetch(response.getNodes(), response.failures(), fetchingRound);
            }

            @Override
            public void onFailure(Exception e) {
                ArrayList<FailedNodeException> failures = new ArrayList<FailedNodeException>(nodes.length);
                for (DiscoveryNode node : nodes) {
                    failures.add(new FailedNodeException(node.getId(), "total failure in fetching", e));
                }
                AsyncShardFetch.this.processAsyncFetch(null, failures, fetchingRound);
            }
        });
    }

    static class NodeEntry<T> {
        private final String nodeId;
        private boolean fetching;
        @Nullable
        private T value;
        private boolean valueSet;
        private Throwable failure;
        private long fetchingRound;

        NodeEntry(String nodeId) {
            this.nodeId = nodeId;
        }

        String getNodeId() {
            return this.nodeId;
        }

        boolean isFetching() {
            return this.fetching;
        }

        void markAsFetching(long fetchingRound) {
            assert (!this.fetching) : "double marking a node as fetching";
            this.fetching = true;
            this.fetchingRound = fetchingRound;
        }

        void doneFetching(T value) {
            assert (this.fetching) : "setting value but not in fetching mode";
            assert (this.failure == null) : "setting value when failure already set";
            this.valueSet = true;
            this.value = value;
            this.fetching = false;
        }

        void doneFetching(Throwable failure) {
            assert (this.fetching) : "setting value but not in fetching mode";
            assert (!this.valueSet) : "setting failure when already set value";
            assert (failure != null) : "setting failure can't be null";
            this.failure = failure;
            this.fetching = false;
        }

        void restartFetching() {
            assert (this.fetching) : "restarting fetching, but not in fetching mode";
            assert (!this.valueSet) : "value can't be set when restarting fetching";
            assert (this.failure == null) : "failure can't be set when restarting fetching";
            this.fetching = false;
        }

        boolean isFailed() {
            return this.failure != null;
        }

        boolean hasData() {
            return this.valueSet || this.failure != null;
        }

        Throwable getFailure() {
            assert (this.hasData()) : "getting failure when data has not been fetched";
            return this.failure;
        }

        @Nullable
        T getValue() {
            assert (this.failure == null) : "trying to fetch value, but its marked as failed, check isFailed";
            assert (this.valueSet) : "value is not set, hasn't been fetched yet";
            return this.value;
        }

        long getFetchingRound() {
            return this.fetchingRound;
        }
    }

    public static class FetchResult<T extends BaseNodeResponse> {
        private final ShardId shardId;
        private final Map<DiscoveryNode, T> data;
        private final Set<String> ignoreNodes;

        public FetchResult(ShardId shardId, Map<DiscoveryNode, T> data, Set<String> ignoreNodes) {
            this.shardId = shardId;
            this.data = data;
            this.ignoreNodes = ignoreNodes;
        }

        public boolean hasData() {
            return this.data != null;
        }

        public Map<DiscoveryNode, T> getData() {
            assert (this.data != null) : "getData should only be called if there is data to be fetched, please check hasData first";
            return this.data;
        }

        public void processAllocation(RoutingAllocation allocation) {
            for (String ignoreNode : this.ignoreNodes) {
                allocation.addIgnoreShardForNode(this.shardId, ignoreNode);
            }
        }
    }

    public static interface Lister<NodesResponse extends BaseNodesResponse<NodeResponse>, NodeResponse extends BaseNodeResponse> {
        public void list(ShardId var1, DiscoveryNode[] var2, ActionListener<NodesResponse> var3);
    }
}

