package org.elasticsearch.cluster.coordination;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStatePublicationEvent;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.IncompatibleClusterStateVersionException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.PositionTrackingOutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.LazyInitializable;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.Strings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BytesTransportRequest;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;

/* loaded from: input_file:org/elasticsearch/cluster/coordination/PublicationTransportHandler.class */
public class PublicationTransportHandler {
    private static final Logger logger;
    public static final String PUBLISH_STATE_ACTION_NAME = "internal:cluster/coordination/publish_state";
    private final TransportService transportService;
    private final NamedWriteableRegistry namedWriteableRegistry;
    private final Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest;
    private static final TransportRequestOptions STATE_REQUEST_OPTIONS;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final AtomicReference<ClusterState> lastSeenClusterState = new AtomicReference<>();
    private final AtomicLong fullClusterStateReceivedCount = new AtomicLong();
    private final AtomicLong incompatibleClusterStateDiffReceivedCount = new AtomicLong();
    private final AtomicLong compatibleClusterStateDiffReceivedCount = new AtomicLong();
    private final SerializationStatsTracker serializationStatsTracker = new SerializationStatsTracker();

    /* loaded from: input_file:org/elasticsearch/cluster/coordination/PublicationTransportHandler$PublicationContext.class */
    public class PublicationContext extends AbstractRefCounted {
        private final DiscoveryNodes discoveryNodes;
        private final ClusterState newState;
        private final ClusterState previousState;
        private final Task task;
        private final boolean sendFullVersion;
        private final Map<Version, ReleasableBytesReference> serializedStates = new ConcurrentHashMap();
        private final Map<Version, ReleasableBytesReference> serializedDiffs = new HashMap();
        static final /* synthetic */ boolean $assertionsDisabled;

        PublicationContext(ClusterStatePublicationEvent clusterStatePublicationEvent) {
            this.discoveryNodes = clusterStatePublicationEvent.getNewState().nodes();
            this.newState = clusterStatePublicationEvent.getNewState();
            this.previousState = clusterStatePublicationEvent.getOldState();
            this.task = clusterStatePublicationEvent.getTask();
            this.sendFullVersion = this.previousState.getBlocks().disableStatePersistence();
        }

        void buildDiffAndSerializeStates() {
            if (!$assertionsDisabled && refCount() <= 0) {
                throw new AssertionError();
            }
            LazyInitializable lazyInitializable = new LazyInitializable(() -> {
                return this.newState.diff(this.previousState);
            });
            Iterator<DiscoveryNode> it = this.discoveryNodes.iterator();
            while (it.hasNext()) {
                DiscoveryNode next = it.next();
                if (!next.equals(PublicationTransportHandler.this.transportService.getLocalNode())) {
                    if (this.sendFullVersion || !this.previousState.nodes().nodeExists(next)) {
                        this.serializedStates.computeIfAbsent(next.getVersion(), version -> {
                            return PublicationTransportHandler.this.serializeFullClusterState(this.newState, next);
                        });
                    } else {
                        this.serializedDiffs.computeIfAbsent(next.getVersion(), version2 -> {
                            return PublicationTransportHandler.this.serializeDiffClusterState(this.newState.version(), (Diff) lazyInitializable.getOrCompute(), next);
                        });
                    }
                }
            }
        }

        public void sendPublishRequest(final DiscoveryNode discoveryNode, final PublishRequest publishRequest, ActionListener<PublishWithJoinResponse> actionListener) {
            if (!$assertionsDisabled && refCount() <= 0) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && publishRequest.getAcceptedState() != this.newState) {
                throw new AssertionError("state got switched on us");
            }
            if (!$assertionsDisabled && !PublicationTransportHandler.this.transportService.getThreadPool().getThreadContext().isSystemContext()) {
                throw new AssertionError();
            }
            final long version = this.newState.version();
            if (discoveryNode.equals(this.discoveryNodes.getLocalNode())) {
                final boolean contains = this.discoveryNodes.getLocalNode().getRoles().contains(DiscoveryNodeRole.VOTING_ONLY_NODE_ROLE);
                PublicationTransportHandler.logger.trace("handling cluster state version [{}] locally on [{}]", Long.valueOf(version), discoveryNode);
                PublicationTransportHandler.this.transportService.getThreadPool().executor(ThreadPool.Names.CLUSTER_COORDINATION).execute(PublicationTransportHandler.this.transportService.getThreadPool().getThreadContext().preserveContext(ActionRunnable.supply(actionListener, new CheckedSupplier<PublishWithJoinResponse, Exception>() { // from class: org.elasticsearch.cluster.coordination.PublicationTransportHandler.PublicationContext.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // org.elasticsearch.common.CheckedSupplier
                    public PublishWithJoinResponse get() {
                        if (contains) {
                            throw new TransportException(new ElasticsearchException("voting-only node skipping local publication to " + discoveryNode, new Object[0]));
                        }
                        return PublicationTransportHandler.this.handlePublishRequest.apply(publishRequest);
                    }

                    public String toString() {
                        long j = version;
                        DiscoveryNode discoveryNode2 = discoveryNode;
                        return "handling cluster state version [" + j + "] locally on [" + j + "]";
                    }
                })));
            } else if (this.sendFullVersion || !this.previousState.nodes().nodeExists(discoveryNode)) {
                PublicationTransportHandler.logger.trace("sending full cluster state version [{}] to [{}]", Long.valueOf(version), discoveryNode);
                sendFullClusterState(discoveryNode, actionListener);
            } else {
                PublicationTransportHandler.logger.trace("sending cluster state diff for version [{}] to [{}]", Long.valueOf(version), discoveryNode);
                sendClusterStateDiff(discoveryNode, actionListener);
            }
        }

        private void sendFullClusterState(DiscoveryNode discoveryNode, ActionListener<PublishWithJoinResponse> actionListener) {
            if (!$assertionsDisabled && refCount() <= 0) {
                throw new AssertionError();
            }
            ReleasableBytesReference releasableBytesReference = this.serializedStates.get(discoveryNode.getVersion());
            if (releasableBytesReference == null) {
                try {
                    releasableBytesReference = this.serializedStates.computeIfAbsent(discoveryNode.getVersion(), version -> {
                        return PublicationTransportHandler.this.serializeFullClusterState(this.newState, discoveryNode);
                    });
                } catch (Exception e) {
                    PublicationTransportHandler.logger.warn(() -> {
                        return Strings.format("failed to serialize cluster state before publishing it to node %s", new Object[]{discoveryNode});
                    }, e);
                    actionListener.onFailure(e);
                    return;
                }
            }
            sendClusterState(discoveryNode, releasableBytesReference, actionListener);
        }

        private void sendClusterStateDiff(DiscoveryNode discoveryNode, ActionListener<PublishWithJoinResponse> actionListener) {
            ReleasableBytesReference releasableBytesReference = this.serializedDiffs.get(discoveryNode.getVersion());
            if (!$assertionsDisabled && releasableBytesReference == null) {
                throw new AssertionError("failed to find serialized diff for node " + discoveryNode + " of version [" + discoveryNode.getVersion() + "]");
            }
            if (tryIncRef()) {
                sendClusterState(discoveryNode, releasableBytesReference, ActionListener.runAfter(actionListener.delegateResponse((actionListener2, exc) -> {
                    if (exc instanceof TransportException) {
                        TransportException transportException = (TransportException) exc;
                        if (transportException.unwrapCause() instanceof IncompatibleClusterStateVersionException) {
                            PublicationTransportHandler.logger.debug(() -> {
                                return Strings.format("resending full cluster state to node %s reason %s", new Object[]{discoveryNode, transportException.getDetailedMessage()});
                            });
                            sendFullClusterState(discoveryNode, actionListener2);
                            return;
                        }
                    }
                    PublicationTransportHandler.logger.debug(() -> {
                        return Strings.format("failed to send cluster state to %s", new Object[]{discoveryNode});
                    }, exc);
                    actionListener2.onFailure(exc);
                }), this::decRef));
            } else {
                if (!$assertionsDisabled) {
                    throw new AssertionError();
                }
                actionListener.onFailure(new IllegalStateException("publication context released before transmission"));
            }
        }

        private void sendClusterState(DiscoveryNode discoveryNode, ReleasableBytesReference releasableBytesReference, ActionListener<PublishWithJoinResponse> actionListener) {
            if (!$assertionsDisabled && refCount() <= 0) {
                throw new AssertionError();
            }
            if (!releasableBytesReference.tryIncRef()) {
                if (!$assertionsDisabled) {
                    throw new AssertionError();
                }
                actionListener.onFailure(new IllegalStateException("serialized cluster state released before transmission"));
            } else {
                TransportService transportService = PublicationTransportHandler.this.transportService;
                BytesTransportRequest bytesTransportRequest = new BytesTransportRequest(releasableBytesReference, discoveryNode.getVersion());
                Task task = this.task;
                TransportRequestOptions transportRequestOptions = PublicationTransportHandler.STATE_REQUEST_OPTIONS;
                Objects.requireNonNull(releasableBytesReference);
                transportService.sendChildRequest(discoveryNode, PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME, bytesTransportRequest, task, transportRequestOptions, new ActionListenerResponseHandler(ActionListener.runAfter(actionListener, releasableBytesReference::decRef), PublishWithJoinResponse::new, ThreadPool.Names.CLUSTER_COORDINATION));
            }
        }

        protected void closeInternal() {
            this.serializedDiffs.values().forEach((v0) -> {
                Releasables.closeExpectNoException(v0);
            });
            this.serializedStates.values().forEach((v0) -> {
                Releasables.closeExpectNoException(v0);
            });
        }

        static {
            $assertionsDisabled = !PublicationTransportHandler.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsearch/cluster/coordination/PublicationTransportHandler$SerializationStatsTracker.class */
    public static class SerializationStatsTracker {
        private long fullStateCount;
        private long totalUncompressedFullStateBytes;
        private long totalCompressedFullStateBytes;
        private long diffCount;
        private long totalUncompressedDiffBytes;
        private long totalCompressedDiffBytes;

        private SerializationStatsTracker() {
        }

        public synchronized void serializedFullState(long j, int i) {
            this.fullStateCount++;
            this.totalUncompressedFullStateBytes += j;
            this.totalCompressedFullStateBytes += i;
        }

        public synchronized void serializedDiff(long j, int i) {
            this.diffCount++;
            this.totalUncompressedDiffBytes += j;
            this.totalCompressedDiffBytes += i;
        }

        public synchronized ClusterStateSerializationStats getSerializationStats() {
            return new ClusterStateSerializationStats(this.fullStateCount, this.totalUncompressedFullStateBytes, this.totalCompressedFullStateBytes, this.diffCount, this.totalUncompressedDiffBytes, this.totalCompressedDiffBytes);
        }
    }

    public PublicationTransportHandler(TransportService transportService, NamedWriteableRegistry namedWriteableRegistry, Function<PublishRequest, PublishWithJoinResponse> function) {
        this.transportService = transportService;
        this.namedWriteableRegistry = namedWriteableRegistry;
        this.handlePublishRequest = function;
        transportService.registerRequestHandler(PUBLISH_STATE_ACTION_NAME, ThreadPool.Names.CLUSTER_COORDINATION, false, false, BytesTransportRequest::new, (bytesTransportRequest, transportChannel, task) -> {
            transportChannel.sendResponse(handleIncomingPublishRequest(bytesTransportRequest));
        });
    }

    public PublishClusterStateStats stats() {
        return new PublishClusterStateStats(this.fullClusterStateReceivedCount.get(), this.incompatibleClusterStateDiffReceivedCount.get(), this.compatibleClusterStateDiffReceivedCount.get(), this.serializationStatsTracker.getSerializationStats());
    }

    private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest bytesTransportRequest) throws IOException {
        Compressor compressor = CompressorFactory.compressor(bytesTransportRequest.bytes());
        StreamInput streamInput = bytesTransportRequest.bytes().streamInput();
        if (compressor != null) {
            try {
                streamInput = new InputStreamStreamInput(compressor.threadLocalInputStream(streamInput));
            } catch (Throwable th) {
                IOUtils.close(streamInput);
                throw th;
            }
        }
        streamInput = new NamedWriteableAwareStreamInput(streamInput, this.namedWriteableRegistry);
        streamInput.setVersion(bytesTransportRequest.version());
        if (streamInput.readBoolean()) {
            try {
                try {
                    ClusterState readFrom = ClusterState.readFrom(streamInput, this.transportService.getLocalNode());
                    if (streamInput != null) {
                        streamInput.close();
                    }
                    this.fullClusterStateReceivedCount.incrementAndGet();
                    logger.debug("received full cluster state version [{}] with size [{}]", Long.valueOf(readFrom.version()), Integer.valueOf(bytesTransportRequest.bytes().length()));
                    PublishWithJoinResponse acceptState = acceptState(readFrom);
                    this.lastSeenClusterState.set(readFrom);
                    IOUtils.close(streamInput);
                    return acceptState;
                } catch (Exception e) {
                    logger.warn("unexpected error while deserializing an incoming cluster state", e);
                    if ($assertionsDisabled) {
                        throw e;
                    }
                    throw new AssertionError(e);
                }
            } finally {
            }
        }
        ClusterState clusterState = this.lastSeenClusterState.get();
        if (clusterState == null) {
            logger.debug("received diff for but don't have any local cluster state - requesting full state");
            this.incompatibleClusterStateDiffReceivedCount.incrementAndGet();
            throw new IncompatibleClusterStateVersionException("have no local cluster state");
        }
        try {
            try {
                try {
                    Diff<ClusterState> readDiffFrom = ClusterState.readDiffFrom(streamInput, clusterState.nodes().getLocalNode());
                    if (streamInput != null) {
                        streamInput.close();
                    }
                    ClusterState apply = readDiffFrom.apply(clusterState);
                    this.compatibleClusterStateDiffReceivedCount.incrementAndGet();
                    logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]", Long.valueOf(apply.version()), apply.stateUUID(), Integer.valueOf(bytesTransportRequest.bytes().length()));
                    PublishWithJoinResponse acceptState2 = acceptState(apply);
                    this.lastSeenClusterState.compareAndSet(clusterState, apply);
                    IOUtils.close(streamInput);
                    return acceptState2;
                } catch (IncompatibleClusterStateVersionException e2) {
                    this.incompatibleClusterStateDiffReceivedCount.incrementAndGet();
                    throw e2;
                }
            } catch (Exception e3) {
                logger.warn("unexpected error while deserializing an incoming cluster state", e3);
                if ($assertionsDisabled) {
                    throw e3;
                }
                throw new AssertionError(e3);
            }
        } finally {
        }
        IOUtils.close(streamInput);
        throw th;
    }

    private PublishWithJoinResponse acceptState(ClusterState clusterState) {
        if ($assertionsDisabled || !clusterState.nodes().isLocalNodeElectedMaster()) {
            return this.handlePublishRequest.apply(new PublishRequest(clusterState));
        }
        throw new AssertionError("should handle local publications locally, but got " + clusterState);
    }

    public PublicationContext newPublicationContext(ClusterStatePublicationEvent clusterStatePublicationEvent) {
        PublicationContext publicationContext = new PublicationContext(clusterStatePublicationEvent);
        boolean z = false;
        try {
            publicationContext.buildDiffAndSerializeStates();
            z = true;
            if (1 == 0) {
                publicationContext.decRef();
            }
            return publicationContext;
        } catch (Throwable th) {
            if (!z) {
                publicationContext.decRef();
            }
            throw th;
        }
    }

    private ReleasableBytesReference serializeFullClusterState(ClusterState clusterState, DiscoveryNode discoveryNode) {
        Version version = discoveryNode.getVersion();
        RecyclerBytesStreamOutput newNetworkBytesStream = this.transportService.newNetworkBytesStream();
        try {
            try {
                PositionTrackingOutputStreamStreamOutput positionTrackingOutputStreamStreamOutput = new PositionTrackingOutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(newNetworkBytesStream)));
                try {
                    positionTrackingOutputStreamStreamOutput.setVersion(version);
                    positionTrackingOutputStreamStreamOutput.writeBoolean(true);
                    clusterState.writeTo(positionTrackingOutputStreamStreamOutput);
                    long position = positionTrackingOutputStreamStreamOutput.position();
                    positionTrackingOutputStreamStreamOutput.close();
                    ReleasableBytesReference releasableBytesReference = new ReleasableBytesReference(newNetworkBytesStream.bytes(), newNetworkBytesStream);
                    this.serializationStatsTracker.serializedFullState(position, releasableBytesReference.length());
                    logger.trace("serialized full cluster state version [{}] for node version [{}] with size [{}]", Long.valueOf(clusterState.version()), version, Integer.valueOf(releasableBytesReference.length()));
                    if (1 == 0) {
                        newNetworkBytesStream.close();
                    }
                    return releasableBytesReference;
                } catch (Throwable th) {
                    try {
                        positionTrackingOutputStreamStreamOutput.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } catch (IOException e) {
                throw new ElasticsearchException("failed to serialize cluster state for publishing to node {}", e, discoveryNode);
            }
        } catch (Throwable th3) {
            if (0 == 0) {
                newNetworkBytesStream.close();
            }
            throw th3;
        }
    }

    private ReleasableBytesReference serializeDiffClusterState(long j, Diff<ClusterState> diff, DiscoveryNode discoveryNode) {
        Version version = discoveryNode.getVersion();
        RecyclerBytesStreamOutput newNetworkBytesStream = this.transportService.newNetworkBytesStream();
        try {
            try {
                PositionTrackingOutputStreamStreamOutput positionTrackingOutputStreamStreamOutput = new PositionTrackingOutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(newNetworkBytesStream)));
                try {
                    positionTrackingOutputStreamStreamOutput.setVersion(version);
                    positionTrackingOutputStreamStreamOutput.writeBoolean(false);
                    diff.writeTo(positionTrackingOutputStreamStreamOutput);
                    long position = positionTrackingOutputStreamStreamOutput.position();
                    positionTrackingOutputStreamStreamOutput.close();
                    ReleasableBytesReference releasableBytesReference = new ReleasableBytesReference(newNetworkBytesStream.bytes(), newNetworkBytesStream);
                    this.serializationStatsTracker.serializedDiff(position, releasableBytesReference.length());
                    logger.trace("serialized cluster state diff for version [{}] for node version [{}] with size [{}]", Long.valueOf(j), version, Integer.valueOf(releasableBytesReference.length()));
                    if (1 == 0) {
                        newNetworkBytesStream.close();
                    }
                    return releasableBytesReference;
                } catch (Throwable th) {
                    try {
                        positionTrackingOutputStreamStreamOutput.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } catch (IOException e) {
                throw new ElasticsearchException("failed to serialize cluster state diff for publishing to node {}", e, discoveryNode);
            }
        } catch (Throwable th3) {
            if (0 == 0) {
                newNetworkBytesStream.close();
            }
            throw th3;
        }
    }

    static {
        $assertionsDisabled = !PublicationTransportHandler.class.desiredAssertionStatus();
        logger = LogManager.getLogger(PublicationTransportHandler.class);
        STATE_REQUEST_OPTIONS = TransportRequestOptions.of(null, TransportRequestOptions.Type.STATE);
    }
}
