/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.shaded.opensearch2.org.opensearch.cluster.service;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.graylog.shaded.opensearch2.org.opensearch.Version;
import org.graylog.shaded.opensearch2.org.opensearch.action.support.PlainActionFuture;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.AckedClusterStateTaskListener;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.ClusterChangedEvent;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.ClusterState;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.ClusterStateTaskConfig;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.ClusterStateTaskExecutor;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.ClusterStateTaskListener;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.coordination.ClusterStatePublisher;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.coordination.FailedToCommitClusterStateException;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.metadata.Metadata;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.node.DiscoveryNode;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.node.DiscoveryNodes;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.routing.RoutingTable;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.service.ClusterManagerTaskThrottler;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.service.ClusterManagerThrottlingStats;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.service.PendingClusterTask;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.service.SourcePrioritizedRunnable;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.service.TaskBatcher;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.service.TaskBatcherListener;
import org.graylog.shaded.opensearch2.org.opensearch.common.Nullable;
import org.graylog.shaded.opensearch2.org.opensearch.common.Priority;
import org.graylog.shaded.opensearch2.org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.graylog.shaded.opensearch2.org.opensearch.common.settings.ClusterSettings;
import org.graylog.shaded.opensearch2.org.opensearch.common.settings.Setting;
import org.graylog.shaded.opensearch2.org.opensearch.common.settings.Settings;
import org.graylog.shaded.opensearch2.org.opensearch.common.unit.TimeValue;
import org.graylog.shaded.opensearch2.org.opensearch.common.util.concurrent.CountDown;
import org.graylog.shaded.opensearch2.org.opensearch.common.util.concurrent.FutureUtils;
import org.graylog.shaded.opensearch2.org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.graylog.shaded.opensearch2.org.opensearch.common.util.concurrent.PrioritizedOpenSearchThreadPoolExecutor;
import org.graylog.shaded.opensearch2.org.opensearch.common.util.concurrent.ThreadContext;
import org.graylog.shaded.opensearch2.org.opensearch.core.Assertions;
import org.graylog.shaded.opensearch2.org.opensearch.core.action.ActionListener;
import org.graylog.shaded.opensearch2.org.opensearch.core.common.text.Text;
import org.graylog.shaded.opensearch2.org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.graylog.shaded.opensearch2.org.opensearch.node.Node;
import org.graylog.shaded.opensearch2.org.opensearch.threadpool.Scheduler;
import org.graylog.shaded.opensearch2.org.opensearch.threadpool.ThreadPool;

@Deprecated
public class MasterService
extends AbstractLifecycleComponent {
    private static final Logger logger = LogManager.getLogger(MasterService.class);
    public static final Setting<TimeValue> MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING = Setting.positiveTimeSetting("cluster.service.slow_master_task_logging_threshold", TimeValue.timeValueSeconds(10L), Setting.Property.Dynamic, Setting.Property.NodeScope, Setting.Property.Deprecated);
    public static final Setting<TimeValue> CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING = Setting.positiveTimeSetting("cluster.service.slow_cluster_manager_task_logging_threshold", MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, Setting.Property.Dynamic, Setting.Property.NodeScope);
    static final String CLUSTER_MANAGER_UPDATE_THREAD_NAME = "clusterManagerService#updateTask";
    @Deprecated
    static final String MASTER_UPDATE_THREAD_NAME = "masterService#updateTask";
    ClusterStatePublisher clusterStatePublisher;
    private final String nodeName;
    private Supplier<ClusterState> clusterStateSupplier;
    private volatile TimeValue slowTaskLoggingThreshold;
    protected final ThreadPool threadPool;
    private volatile PrioritizedOpenSearchThreadPoolExecutor threadPoolExecutor;
    private volatile Batcher taskBatcher;
    protected final ClusterManagerTaskThrottler clusterManagerTaskThrottler;
    private final ClusterManagerThrottlingStats throttlingStats;

    public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
        this.nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings));
        this.slowTaskLoggingThreshold = CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);
        clusterSettings.addSettingsUpdateConsumer(CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, this::setSlowTaskLoggingThreshold);
        this.throttlingStats = new ClusterManagerThrottlingStats();
        this.clusterManagerTaskThrottler = new ClusterManagerTaskThrottler(settings, clusterSettings, this::getMinNodeVersion, this.throttlingStats);
        this.threadPool = threadPool;
    }

    private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
        this.slowTaskLoggingThreshold = slowTaskLoggingThreshold;
    }

    public synchronized void setClusterStatePublisher(ClusterStatePublisher publisher) {
        this.clusterStatePublisher = publisher;
    }

    public synchronized void setClusterStateSupplier(Supplier<ClusterState> clusterStateSupplier) {
        this.clusterStateSupplier = clusterStateSupplier;
    }

    @Override
    protected synchronized void doStart() {
        Objects.requireNonNull(this.clusterStatePublisher, "please set a cluster state publisher before starting");
        Objects.requireNonNull(this.clusterStateSupplier, "please set a cluster state supplier before starting");
        this.threadPoolExecutor = this.createThreadPoolExecutor();
        this.taskBatcher = new Batcher(logger, this.threadPoolExecutor, this.clusterManagerTaskThrottler);
    }

    protected PrioritizedOpenSearchThreadPoolExecutor createThreadPoolExecutor() {
        return OpenSearchExecutors.newSinglePrioritizing(this.nodeName + "/clusterManagerService#updateTask", OpenSearchExecutors.daemonThreadFactory(this.nodeName, CLUSTER_MANAGER_UPDATE_THREAD_NAME), this.threadPool.getThreadContext(), this.threadPool.scheduler());
    }

    @Override
    protected synchronized void doStop() {
        ThreadPool.terminate(this.threadPoolExecutor, 10L, TimeUnit.SECONDS);
    }

    @Override
    protected synchronized void doClose() {
    }

    ClusterState state() {
        return this.clusterStateSupplier.get();
    }

    private static boolean isClusterManagerUpdateThread() {
        return Thread.currentThread().getName().contains(CLUSTER_MANAGER_UPDATE_THREAD_NAME) || Thread.currentThread().getName().contains(MASTER_UPDATE_THREAD_NAME);
    }

    public static boolean assertClusterManagerUpdateThread() {
        assert (MasterService.isClusterManagerUpdateThread()) : "not called from the cluster-manager service thread";
        return true;
    }

    public static boolean assertNotClusterManagerUpdateThread(String reason) {
        assert (!MasterService.isClusterManagerUpdateThread()) : "Expected current thread [" + Thread.currentThread() + "] to not be the cluster-manager service thread. Reason: [" + reason + "]";
        return true;
    }

    @Deprecated
    public static boolean assertMasterUpdateThread() {
        return MasterService.assertClusterManagerUpdateThread();
    }

    @Deprecated
    public static boolean assertNotMasterUpdateThread(String reason) {
        return MasterService.assertNotClusterManagerUpdateThread(reason);
    }

    private void runTasks(TaskInputs taskInputs) {
        String summary = taskInputs.summary;
        if (!this.lifecycle.started()) {
            logger.debug("processing [{}]: ignoring, cluster-manager service not started", (Object)summary);
            return;
        }
        logger.debug("executing cluster state update for [{}]", (Object)summary);
        ClusterState previousClusterState = this.state();
        if (!previousClusterState.nodes().isLocalNodeElectedClusterManager() && taskInputs.runOnlyWhenClusterManager()) {
            logger.debug("failing [{}]: local node is no longer cluster-manager", (Object)summary);
            taskInputs.onNoLongerClusterManager();
            return;
        }
        long computationStartTime = this.threadPool.preciseRelativeTimeInNanos();
        TaskOutputs taskOutputs = this.calculateTaskOutputs(taskInputs, previousClusterState);
        taskOutputs.notifyFailedTasks();
        TimeValue computationTime = this.getTimeSince(computationStartTime);
        this.logExecutionTime(computationTime, "compute cluster state update", summary);
        if (taskOutputs.clusterStateUnchanged()) {
            long notificationStartTime = this.threadPool.preciseRelativeTimeInNanos();
            taskOutputs.notifySuccessfulTasksOnUnchangedClusterState();
            TimeValue executionTime = this.getTimeSince(notificationStartTime);
            this.logExecutionTime(executionTime, "notify listeners on unchanged cluster state", summary);
        } else {
            ClusterState newClusterState = taskOutputs.newClusterState;
            if (logger.isTraceEnabled()) {
                logger.trace("cluster state updated, source [{}]\n{}", (Object)summary, (Object)newClusterState);
            } else {
                logger.debug("cluster state updated, version [{}], source [{}]", (Object)newClusterState.version(), (Object)summary);
            }
            long publicationStartTime = this.threadPool.preciseRelativeTimeInNanos();
            try {
                String nodesDeltaSummary;
                ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState);
                DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
                if (nodesDelta.hasChanges() && logger.isInfoEnabled() && (nodesDeltaSummary = nodesDelta.shortSummary()).length() > 0) {
                    logger.info("{}, term: {}, version: {}, delta: {}", (Object)summary, (Object)newClusterState.term(), (Object)newClusterState.version(), (Object)nodesDeltaSummary);
                }
                logger.debug("publishing cluster state version [{}]", (Object)newClusterState.version());
                this.publish(clusterChangedEvent, taskOutputs, publicationStartTime);
            }
            catch (Exception e) {
                this.handleException(summary, publicationStartTime, newClusterState, e);
            }
        }
    }

    private TimeValue getTimeSince(long startTimeNanos) {
        return TimeValue.timeValueMillis(TimeValue.nsecToMSec(this.threadPool.preciseRelativeTimeInNanos() - startTimeNanos));
    }

    protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeMillis) {
        PlainActionFuture<Void> fut = new PlainActionFuture<Void>(){

            @Override
            protected boolean blockingAllowed() {
                return MasterService.isClusterManagerUpdateThread() || super.blockingAllowed();
            }
        };
        this.clusterStatePublisher.publish(clusterChangedEvent, (ActionListener<Void>)fut, taskOutputs.createAckListener(this.threadPool, clusterChangedEvent.state()));
        try {
            FutureUtils.get(fut);
            this.onPublicationSuccess(clusterChangedEvent, taskOutputs);
        }
        catch (Exception e) {
            this.onPublicationFailed(clusterChangedEvent, taskOutputs, startTimeMillis, e);
        }
    }

    void onPublicationSuccess(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs) {
        long notificationStartTime = this.threadPool.preciseRelativeTimeInNanos();
        taskOutputs.processedDifferentClusterState(clusterChangedEvent.previousState(), clusterChangedEvent.state());
        try {
            taskOutputs.clusterStatePublished(clusterChangedEvent);
        }
        catch (Exception e) {
            logger.error(() -> new ParameterizedMessage("exception thrown while notifying executor of new cluster state publication [{}]", (Object)clusterChangedEvent.source()), (Throwable)e);
        }
        TimeValue executionTime = this.getTimeSince(notificationStartTime);
        this.logExecutionTime(executionTime, "notify listeners on successful publication of cluster state (version: " + clusterChangedEvent.state().version() + ", uuid: " + clusterChangedEvent.state().stateUUID() + ")", clusterChangedEvent.source());
    }

    void onPublicationFailed(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeMillis, Exception exception) {
        if (exception instanceof FailedToCommitClusterStateException) {
            long version = clusterChangedEvent.state().version();
            logger.warn(() -> new ParameterizedMessage("failing [{}]: failed to commit cluster state version [{}]", (Object)clusterChangedEvent.source(), (Object)version), (Throwable)exception);
            taskOutputs.publishingFailed((FailedToCommitClusterStateException)exception);
        } else {
            this.handleException(clusterChangedEvent.source(), startTimeMillis, clusterChangedEvent.state(), exception);
        }
    }

    private void handleException(String summary, long startTimeMillis, ClusterState newClusterState, Exception e) {
        TimeValue executionTime = this.getTimeSince(startTimeMillis);
        long version = newClusterState.version();
        String stateUUID = newClusterState.stateUUID();
        String fullState = newClusterState.toString();
        logger.warn((Message)new ParameterizedMessage("took [{}] and then failed to publish updated cluster state (version: {}, uuid: {}) for [{}]:\n{}", new Object[]{executionTime, version, stateUUID, summary, fullState}), (Throwable)e);
    }

    private TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState) {
        ClusterStateTaskExecutor.ClusterTasksResult<Object> clusterTasksResult = this.executeTasks(taskInputs, previousClusterState);
        ClusterState newClusterState = this.patchVersions(previousClusterState, clusterTasksResult);
        return new TaskOutputs(taskInputs, previousClusterState, newClusterState, this.getNonFailedTasks(taskInputs, clusterTasksResult), clusterTasksResult.executionResults);
    }

    private ClusterState patchVersions(ClusterState previousClusterState, ClusterStateTaskExecutor.ClusterTasksResult<?> executionResult) {
        ClusterState newClusterState = executionResult.resultingState;
        if (previousClusterState != newClusterState) {
            ClusterState.Builder builder = this.incrementVersion(newClusterState);
            if (previousClusterState.routingTable() != newClusterState.routingTable()) {
                builder.routingTable(RoutingTable.builder(newClusterState.routingTable()).version(newClusterState.routingTable().version() + 1L).build());
            }
            if (previousClusterState.metadata() != newClusterState.metadata()) {
                builder.metadata(Metadata.builder(newClusterState.metadata()).version(newClusterState.metadata().version() + 1L));
            }
            newClusterState = builder.build();
        }
        return newClusterState;
    }

    public ClusterState.Builder incrementVersion(ClusterState clusterState) {
        return ClusterState.builder(clusterState).incrementVersion();
    }

    public <T extends ClusterStateTaskConfig & ClusterStateTaskExecutor<T>> void submitStateUpdateTask(String source, T updateTask) {
        this.submitStateUpdateTask(source, updateTask, updateTask, updateTask, (ClusterStateTaskListener)updateTask);
    }

    public <T> void submitStateUpdateTask(String source, T task, ClusterStateTaskConfig config, ClusterStateTaskExecutor<T> executor, ClusterStateTaskListener listener) {
        this.submitStateUpdateTasks(source, Collections.singletonMap(task, listener), config, executor);
    }

    public List<PendingClusterTask> pendingTasks() {
        return Arrays.stream(this.threadPoolExecutor.getPending()).map(pending -> {
            assert (pending.task instanceof SourcePrioritizedRunnable) : "thread pool executor should only use SourcePrioritizedRunnable instances but found: " + pending.task.getClass().getName();
            SourcePrioritizedRunnable task = (SourcePrioritizedRunnable)pending.task;
            return new PendingClusterTask(pending.insertionOrder, pending.priority, new Text(task.source()), task.getAgeInMillis(), pending.executing);
        }).collect(Collectors.toList());
    }

    public long numberOfThrottledPendingTasks() {
        return this.throttlingStats.getTotalThrottledTaskCount();
    }

    public ClusterManagerThrottlingStats getThrottlingStats() {
        return this.throttlingStats;
    }

    public Version getMinNodeVersion() {
        return this.state().getNodes().getMinNodeVersion();
    }

    public int numberOfPendingTasks() {
        return this.threadPoolExecutor.getNumberOfPendingTasks();
    }

    public TimeValue getMaxTaskWaitTime() {
        return this.threadPoolExecutor.getMaxTaskWaitTime();
    }

    private SafeClusterStateTaskListener safe(ClusterStateTaskListener listener, Supplier<ThreadContext.StoredContext> contextSupplier) {
        if (listener instanceof AckedClusterStateTaskListener) {
            return new SafeAckedClusterStateTaskListener((AckedClusterStateTaskListener)listener, contextSupplier, logger);
        }
        return new SafeClusterStateTaskListener(listener, contextSupplier, logger);
    }

    private void logExecutionTime(TimeValue executionTime, String activity, String summary) {
        if (executionTime.getMillis() > this.slowTaskLoggingThreshold.getMillis()) {
            logger.warn("took [{}], which is over [{}], to {} for [{}]", (Object)executionTime, (Object)this.slowTaskLoggingThreshold, (Object)activity, (Object)summary);
        } else {
            logger.debug("took [{}] to {} for [{}]", (Object)executionTime, (Object)activity, (Object)summary);
        }
    }

    private ClusterStateTaskExecutor.ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, ClusterState previousClusterState) {
        ClusterStateTaskExecutor.ClusterTasksResult<Object> clusterTasksResult;
        try {
            List inputs = taskInputs.updateTasks.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList());
            clusterTasksResult = taskInputs.executor.execute(previousClusterState, inputs);
            if (previousClusterState != clusterTasksResult.resultingState && previousClusterState.nodes().isLocalNodeElectedClusterManager() && !clusterTasksResult.resultingState.nodes().isLocalNodeElectedClusterManager()) {
                throw new AssertionError((Object)"update task submitted to ClusterManagerService cannot remove cluster-manager");
            }
        }
        catch (Exception e) {
            logger.trace(() -> new ParameterizedMessage("failed to execute cluster state update (on version: [{}], uuid: [{}]) for [{}]\n{}{}{}", new Object[]{previousClusterState.version(), previousClusterState.stateUUID(), taskInputs.summary, previousClusterState.nodes(), previousClusterState.routingTable(), previousClusterState.getRoutingNodes()}), (Throwable)e);
            clusterTasksResult = ClusterStateTaskExecutor.ClusterTasksResult.builder().failures(taskInputs.updateTasks.stream().map(updateTask -> updateTask.task)::iterator, e).build(previousClusterState);
        }
        assert (clusterTasksResult.executionResults != null);
        assert (clusterTasksResult.executionResults.size() == taskInputs.updateTasks.size()) : String.format(Locale.ROOT, "expected [%d] task result%s but was [%d]", taskInputs.updateTasks.size(), taskInputs.updateTasks.size() == 1 ? "" : "s", clusterTasksResult.executionResults.size());
        if (Assertions.ENABLED) {
            ClusterStateTaskExecutor.ClusterTasksResult<Object> finalClusterTasksResult = clusterTasksResult;
            taskInputs.updateTasks.forEach(updateTask -> {
                assert (finalClusterTasksResult.executionResults.containsKey(updateTask.task)) : "missing task result for " + updateTask;
            });
        }
        return clusterTasksResult;
    }

    private List<Batcher.UpdateTask> getNonFailedTasks(TaskInputs taskInputs, ClusterStateTaskExecutor.ClusterTasksResult<Object> clusterTasksResult) {
        return taskInputs.updateTasks.stream().filter(updateTask -> {
            assert (clusterTasksResult.executionResults.containsKey(updateTask.task)) : "missing " + updateTask;
            ClusterStateTaskExecutor.TaskResult taskResult = clusterTasksResult.executionResults.get(updateTask.task);
            return taskResult.isSuccess();
        }).collect(Collectors.toList());
    }

    public ClusterManagerTaskThrottler.ThrottlingKey registerClusterManagerTask(String taskKey, boolean throttlingEnabled) {
        return this.clusterManagerTaskThrottler.registerClusterManagerTask(taskKey, throttlingEnabled);
    }

    public <T> void submitStateUpdateTasks(String source, Map<T, ClusterStateTaskListener> tasks, ClusterStateTaskConfig config, ClusterStateTaskExecutor<T> executor) {
        block9: {
            if (!this.lifecycle.started()) {
                return;
            }
            ThreadContext threadContext = this.threadPool.getThreadContext();
            Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(true);
            try (ThreadContext.StoredContext ignore = threadContext.stashContext();){
                threadContext.markAsSystemContext();
                List safeTasks = tasks.entrySet().stream().map(e -> {
                    Batcher batcher = this.taskBatcher;
                    Objects.requireNonNull(batcher);
                    return batcher.new Batcher.UpdateTask(config.priority(), source, e.getKey(), this.safe((ClusterStateTaskListener)e.getValue(), supplier), executor);
                }).collect(Collectors.toList());
                this.taskBatcher.submitTasks(safeTasks, config.timeout());
            }
            catch (OpenSearchRejectedExecutionException e2) {
                if (this.lifecycle.stoppedOrClosed()) break block9;
                throw e2;
            }
        }
    }

    class Batcher
    extends TaskBatcher {
        Batcher(Logger logger, PrioritizedOpenSearchThreadPoolExecutor threadExecutor, TaskBatcherListener taskBatcherListener) {
            super(logger, threadExecutor, taskBatcherListener);
        }

        @Override
        protected void onTimeout(List<? extends TaskBatcher.BatchedTask> tasks, TimeValue timeout) {
            MasterService.this.threadPool.generic().execute(() -> tasks.forEach(task -> ((UpdateTask)task).listener.onFailure(task.source, new ProcessClusterEventTimeoutException(timeout, task.source))));
        }

        @Override
        protected void run(Object batchingKey, List<? extends TaskBatcher.BatchedTask> tasks, String tasksSummary) {
            ClusterStateTaskExecutor taskExecutor = (ClusterStateTaskExecutor)batchingKey;
            List<? extends TaskBatcher.BatchedTask> updateTasks = tasks;
            MasterService.this.runTasks(new TaskInputs(taskExecutor, updateTasks, tasksSummary));
        }

        class UpdateTask
        extends TaskBatcher.BatchedTask {
            final ClusterStateTaskListener listener;

            UpdateTask(Priority priority, String source, Object task, ClusterStateTaskListener listener, ClusterStateTaskExecutor<?> executor) {
                super(Batcher.this, priority, source, executor, task);
                this.listener = listener;
            }

            @Override
            public String describeTasks(List<? extends TaskBatcher.BatchedTask> tasks) {
                return ((ClusterStateTaskExecutor)this.batchingKey).describeTasks(tasks.stream().map(TaskBatcher.BatchedTask::getTask).collect(Collectors.toList()));
            }
        }
    }

    private class TaskInputs {
        final String summary;
        final List<Batcher.UpdateTask> updateTasks;
        final ClusterStateTaskExecutor<Object> executor;

        TaskInputs(ClusterStateTaskExecutor<Object> executor, List<Batcher.UpdateTask> updateTasks, String summary) {
            this.summary = summary;
            this.executor = executor;
            this.updateTasks = updateTasks;
        }

        boolean runOnlyWhenClusterManager() {
            return this.executor.runOnlyOnClusterManager();
        }

        void onNoLongerClusterManager() {
            this.updateTasks.forEach(task -> task.listener.onNoLongerClusterManager(task.source()));
        }
    }

    class TaskOutputs {
        final TaskInputs taskInputs;
        final ClusterState previousClusterState;
        final ClusterState newClusterState;
        final List<Batcher.UpdateTask> nonFailedTasks;
        final Map<Object, ClusterStateTaskExecutor.TaskResult> executionResults;

        TaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState, ClusterState newClusterState, List<Batcher.UpdateTask> nonFailedTasks, Map<Object, ClusterStateTaskExecutor.TaskResult> executionResults) {
            this.taskInputs = taskInputs;
            this.previousClusterState = previousClusterState;
            this.newClusterState = newClusterState;
            this.nonFailedTasks = nonFailedTasks;
            this.executionResults = executionResults;
        }

        void publishingFailed(FailedToCommitClusterStateException t) {
            this.nonFailedTasks.forEach(task -> task.listener.onFailure(task.source(), t));
        }

        void processedDifferentClusterState(ClusterState previousClusterState, ClusterState newClusterState) {
            this.nonFailedTasks.forEach(task -> task.listener.clusterStateProcessed(task.source(), previousClusterState, newClusterState));
        }

        void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
            this.taskInputs.executor.clusterStatePublished(clusterChangedEvent);
        }

        ClusterStatePublisher.AckListener createAckListener(ThreadPool threadPool, ClusterState newClusterState) {
            return new DelegatingAckListener(this.nonFailedTasks.stream().filter(task -> task.listener instanceof AckedClusterStateTaskListener).map(task -> new AckCountDownListener((AckedClusterStateTaskListener)task.listener, newClusterState.version(), newClusterState.nodes(), threadPool)).collect(Collectors.toList()));
        }

        boolean clusterStateUnchanged() {
            return this.previousClusterState == this.newClusterState;
        }

        void notifyFailedTasks() {
            for (Batcher.UpdateTask updateTask : this.taskInputs.updateTasks) {
                assert (this.executionResults.containsKey(updateTask.task)) : "missing " + updateTask;
                ClusterStateTaskExecutor.TaskResult taskResult = this.executionResults.get(updateTask.task);
                if (taskResult.isSuccess()) continue;
                updateTask.listener.onFailure(updateTask.source(), taskResult.getFailure());
            }
        }

        void notifySuccessfulTasksOnUnchangedClusterState() {
            this.nonFailedTasks.forEach(task -> {
                if (task.listener instanceof AckedClusterStateTaskListener) {
                    ((AckedClusterStateTaskListener)task.listener).onAllNodesAcked(null);
                }
                task.listener.clusterStateProcessed(task.source(), this.newClusterState, this.newClusterState);
            });
        }
    }

    private static class SafeAckedClusterStateTaskListener
    extends SafeClusterStateTaskListener
    implements AckedClusterStateTaskListener {
        private final AckedClusterStateTaskListener listener;
        private final Logger logger;

        SafeAckedClusterStateTaskListener(AckedClusterStateTaskListener listener, Supplier<ThreadContext.StoredContext> context, Logger logger) {
            super(listener, context, logger);
            this.listener = listener;
            this.logger = logger;
        }

        @Override
        public boolean mustAck(DiscoveryNode discoveryNode) {
            return this.listener.mustAck(discoveryNode);
        }

        @Override
        public void onAllNodesAcked(@Nullable Exception e) {
            try (ThreadContext.StoredContext ignore = (ThreadContext.StoredContext)this.context.get();){
                this.listener.onAllNodesAcked(e);
            }
            catch (Exception inner) {
                inner.addSuppressed(e);
                this.logger.error("exception thrown by listener while notifying on all nodes acked", (Throwable)inner);
            }
        }

        @Override
        public void onAckTimeout() {
            try (ThreadContext.StoredContext ignore = (ThreadContext.StoredContext)this.context.get();){
                this.listener.onAckTimeout();
            }
            catch (Exception e) {
                this.logger.error("exception thrown by listener while notifying on ack timeout", (Throwable)e);
            }
        }

        @Override
        public TimeValue ackTimeout() {
            return this.listener.ackTimeout();
        }
    }

    private static class SafeClusterStateTaskListener
    implements ClusterStateTaskListener {
        private final ClusterStateTaskListener listener;
        protected final Supplier<ThreadContext.StoredContext> context;
        private final Logger logger;

        SafeClusterStateTaskListener(ClusterStateTaskListener listener, Supplier<ThreadContext.StoredContext> context, Logger logger) {
            this.listener = listener;
            this.context = context;
            this.logger = logger;
        }

        @Override
        public void onFailure(String source, Exception e) {
            try (ThreadContext.StoredContext ignore = this.context.get();){
                this.listener.onFailure(source, e);
            }
            catch (Exception inner) {
                inner.addSuppressed(e);
                this.logger.error(() -> new ParameterizedMessage("exception thrown by listener notifying of failure from [{}]", (Object)source), (Throwable)inner);
            }
        }

        @Override
        public void onNoLongerClusterManager(String source) {
            try (ThreadContext.StoredContext ignore = this.context.get();){
                this.listener.onNoLongerClusterManager(source);
            }
            catch (Exception e) {
                this.logger.error(() -> new ParameterizedMessage("exception thrown by listener while notifying no longer cluster-manager from [{}]", (Object)source), (Throwable)e);
            }
        }

        @Override
        public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
            try (ThreadContext.StoredContext ignore = this.context.get();){
                this.listener.clusterStateProcessed(source, oldState, newState);
            }
            catch (Exception e) {
                this.logger.error(() -> new ParameterizedMessage("exception thrown by listener while notifying of cluster state processed from [{}], old cluster state:\n{}\nnew cluster state:\n{}", new Object[]{source, oldState, newState}), (Throwable)e);
            }
        }
    }

    private static class AckCountDownListener
    implements ClusterStatePublisher.AckListener {
        private static final Logger logger = LogManager.getLogger(AckCountDownListener.class);
        private final AckedClusterStateTaskListener ackedTaskListener;
        private final CountDown countDown;
        private final DiscoveryNode clusterManagerNode;
        private final ThreadPool threadPool;
        private final long clusterStateVersion;
        private volatile Scheduler.Cancellable ackTimeoutCallback;
        private Exception lastFailure;

        AckCountDownListener(AckedClusterStateTaskListener ackedTaskListener, long clusterStateVersion, DiscoveryNodes nodes, ThreadPool threadPool) {
            this.ackedTaskListener = ackedTaskListener;
            this.clusterStateVersion = clusterStateVersion;
            this.threadPool = threadPool;
            this.clusterManagerNode = nodes.getClusterManagerNode();
            int countDown = 0;
            for (DiscoveryNode node : nodes) {
                if (!node.equals(this.clusterManagerNode) && !ackedTaskListener.mustAck(node)) continue;
                ++countDown;
            }
            logger.trace("expecting {} acknowledgements for cluster_state update (version: {})", (Object)countDown, (Object)clusterStateVersion);
            this.countDown = new CountDown(countDown + 1);
        }

        @Override
        public void onCommit(TimeValue commitTime) {
            TimeValue timeLeft;
            TimeValue ackTimeout = this.ackedTaskListener.ackTimeout();
            if (ackTimeout == null) {
                ackTimeout = TimeValue.ZERO;
            }
            if ((timeLeft = TimeValue.timeValueNanos(Math.max(0L, ackTimeout.nanos() - commitTime.nanos()))).nanos() == 0L) {
                this.onTimeout();
            } else if (this.countDown.countDown()) {
                this.finish();
            } else {
                this.ackTimeoutCallback = this.threadPool.schedule(this::onTimeout, timeLeft, "generic");
                if (this.countDown.isCountedDown()) {
                    this.ackTimeoutCallback.cancel();
                }
            }
        }

        @Override
        public void onNodeAck(DiscoveryNode node, @Nullable Exception e) {
            if (!node.equals(this.clusterManagerNode) && !this.ackedTaskListener.mustAck(node)) {
                return;
            }
            if (e == null) {
                logger.trace("ack received from node [{}], cluster_state update (version: {})", (Object)node, (Object)this.clusterStateVersion);
            } else {
                this.lastFailure = e;
                logger.debug(() -> new ParameterizedMessage("ack received from node [{}], cluster_state update (version: {})", (Object)node, (Object)this.clusterStateVersion), (Throwable)e);
            }
            if (this.countDown.countDown()) {
                this.finish();
            }
        }

        private void finish() {
            logger.trace("all expected nodes acknowledged cluster_state update (version: {})", (Object)this.clusterStateVersion);
            if (this.ackTimeoutCallback != null) {
                this.ackTimeoutCallback.cancel();
            }
            this.ackedTaskListener.onAllNodesAcked(this.lastFailure);
        }

        public void onTimeout() {
            if (this.countDown.fastForward()) {
                logger.trace("timeout waiting for acknowledgement for cluster_state update (version: {})", (Object)this.clusterStateVersion);
                this.ackedTaskListener.onAckTimeout();
            }
        }
    }

    private static class DelegatingAckListener
    implements ClusterStatePublisher.AckListener {
        private final List<ClusterStatePublisher.AckListener> listeners;

        private DelegatingAckListener(List<ClusterStatePublisher.AckListener> listeners) {
            this.listeners = listeners;
        }

        @Override
        public void onCommit(TimeValue commitTime) {
            for (ClusterStatePublisher.AckListener listener : this.listeners) {
                listener.onCommit(commitTime);
            }
        }

        @Override
        public void onNodeAck(DiscoveryNode node, @Nullable Exception e) {
            for (ClusterStatePublisher.AckListener listener : this.listeners) {
                listener.onNodeAck(node, e);
            }
        }
    }
}

