package org.apache.druid.frame.processor;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.channel.WritableFrameChannel;
import org.apache.druid.frame.processor.manager.ProcessorManager;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;

/* loaded from: input_file:org/apache/druid/frame/processor/FrameProcessorExecutor.class */
public class FrameProcessorExecutor {
    private static final Logger log = new Logger(FrameProcessorExecutor.class);
    private final ListeningExecutorService exec;
    private final Object lock = new Object();

    @GuardedBy("lock")
    private final Set<String> activeCancellationIds = new HashSet();

    @GuardedBy("lock")
    private final SetMultimap<String, ListenableFuture<?>> cancelableFutures = Multimaps.newSetMultimap(new HashMap(), Sets::newIdentityHashSet);

    @GuardedBy("lock")
    private final SetMultimap<String, ListenableFuture<?>> cancelableReturnFutures = Multimaps.newSetMultimap(new HashMap(), Sets::newIdentityHashSet);

    @GuardedBy("lock")
    private final SetMultimap<String, FrameProcessor<?>> cancelableProcessors = Multimaps.newSetMultimap(new HashMap(), Sets::newIdentityHashSet);

    @GuardedBy("lock")
    private final Map<FrameProcessor<?>, Thread> runningProcessors = new IdentityHashMap();

    public FrameProcessorExecutor(ListeningExecutorService listeningExecutorService) {
        this.exec = listeningExecutorService;
    }

    public <T> ListenableFuture<T> runFully(final FrameProcessor<T> frameProcessor, @Nullable final String str) {
        final List<ReadableFrameChannel> inputChannels = frameProcessor.inputChannels();
        final List<WritableFrameChannel> outputChannels = frameProcessor.outputChannels();
        final SettableFuture registerCancelableFuture = registerCancelableFuture(SettableFuture.create(), true, str);
        if (registerCancelableFuture.isDone()) {
            return registerCancelableFuture;
        }
        Runnable runnable = new Runnable() { // from class: org.apache.druid.frame.processor.FrameProcessorExecutor.1ExecutorRunnable
            private final AwaitAnyWidget awaitAnyWidget;

            {
                this.awaitAnyWidget = new AwaitAnyWidget(inputChannels);
            }

            @Override // java.lang.Runnable
            public void run() {
                try {
                    List<ListenableFuture<?>> gatherWritabilityFutures = gatherWritabilityFutures();
                    List list = (List) gatherWritabilityFutures.stream().filter(listenableFuture -> {
                        return !listenableFuture.isDone();
                    }).collect(Collectors.toList());
                    FrameProcessorExecutor.logProcessorStatusString(frameProcessor, registerCancelableFuture, gatherWritabilityFutures);
                    if (!list.isEmpty()) {
                        runProcessorAfterFutureResolves(Futures.allAsList(list));
                        return;
                    }
                    Optional runProcessorNow = runProcessorNow();
                    if (runProcessorNow.isPresent()) {
                        ReturnOrAwait returnOrAwait = (ReturnOrAwait) runProcessorNow.get();
                        FrameProcessorExecutor.logProcessorStatusString(frameProcessor, registerCancelableFuture, null);
                        if (returnOrAwait.isReturn()) {
                            succeed(returnOrAwait.value());
                        } else {
                            IntSet awaitSet = returnOrAwait.awaitSet();
                            if (awaitSet.isEmpty()) {
                                FrameProcessorExecutor.this.exec.execute(this);
                            } else if (returnOrAwait.isAwaitAll() || awaitSet.size() == 1) {
                                ArrayList arrayList = new ArrayList();
                                IntIterator it = awaitSet.iterator();
                                while (it.hasNext()) {
                                    ReadableFrameChannel readableFrameChannel = (ReadableFrameChannel) inputChannels.get(((Integer) it.next()).intValue());
                                    if (!readableFrameChannel.isFinished() && !readableFrameChannel.canRead()) {
                                        arrayList.add(readableFrameChannel.readabilityFuture());
                                    }
                                }
                                if (arrayList.isEmpty()) {
                                    FrameProcessorExecutor.this.exec.execute(this);
                                } else {
                                    runProcessorAfterFutureResolves(Futures.allAsList(arrayList));
                                }
                            } else {
                                runProcessorAfterFutureResolves(this.awaitAnyWidget.awaitAny(awaitSet));
                            }
                        }
                    }
                } catch (Throwable th) {
                    fail(th);
                }
            }

            private List<ListenableFuture<?>> gatherWritabilityFutures() {
                ArrayList arrayList = new ArrayList();
                Iterator it = outputChannels.iterator();
                while (it.hasNext()) {
                    arrayList.add(((WritableFrameChannel) it.next()).writabilityFuture());
                }
                return arrayList;
            }

            private Optional<ReturnOrAwait<T>> runProcessorNow() {
                Either error;
                IntSet intOpenHashSet = new IntOpenHashSet(inputChannels.size());
                for (int i = 0; i < inputChannels.size(); i++) {
                    ReadableFrameChannel readableFrameChannel = (ReadableFrameChannel) inputChannels.get(i);
                    if (readableFrameChannel.isFinished() || readableFrameChannel.canRead()) {
                        intOpenHashSet.add(i);
                    }
                }
                if (str != null) {
                    synchronized (FrameProcessorExecutor.this.lock) {
                        if (!FrameProcessorExecutor.this.cancelableProcessors.containsEntry(str, frameProcessor)) {
                            return Optional.empty();
                        }
                        FrameProcessorExecutor.this.runningProcessors.put(frameProcessor, Thread.currentThread());
                    }
                }
                try {
                    try {
                    } catch (Throwable th) {
                        error = Either.error(th);
                        if (str != null) {
                            synchronized (FrameProcessorExecutor.this.lock) {
                                if (Thread.interrupted()) {
                                }
                                FrameProcessorExecutor.this.runningProcessors.remove(frameProcessor);
                                FrameProcessorExecutor.this.lock.notifyAll();
                                r6 = FrameProcessorExecutor.this.cancelableProcessors.containsEntry(str, frameProcessor) ? false : true;
                            }
                        }
                    }
                    if (Thread.interrupted()) {
                        throw new InterruptedException();
                    }
                    error = Either.value(frameProcessor.runIncrementally(intOpenHashSet));
                    if (str != null) {
                        synchronized (FrameProcessorExecutor.this.lock) {
                            if (Thread.interrupted()) {
                            }
                            FrameProcessorExecutor.this.runningProcessors.remove(frameProcessor);
                            FrameProcessorExecutor.this.lock.notifyAll();
                            r6 = FrameProcessorExecutor.this.cancelableProcessors.containsEntry(str, frameProcessor) ? false : true;
                        }
                    }
                    return r6 ? Optional.empty() : Optional.of(error.valueOrThrow());
                } catch (Throwable th2) {
                    if (str != null) {
                        synchronized (FrameProcessorExecutor.this.lock) {
                            if (Thread.interrupted()) {
                            }
                            FrameProcessorExecutor.this.runningProcessors.remove(frameProcessor);
                            FrameProcessorExecutor.this.lock.notifyAll();
                            if (!FrameProcessorExecutor.this.cancelableProcessors.containsEntry(str, frameProcessor)) {
                            }
                        }
                    }
                    throw th2;
                }
            }

            private <V> void runProcessorAfterFutureResolves(ListenableFuture<V> listenableFuture) {
                final ListenableFuture registerCancelableFuture2 = FrameProcessorExecutor.this.registerCancelableFuture(listenableFuture, false, str);
                Futures.addCallback(registerCancelableFuture2, new FutureCallback<V>() { // from class: org.apache.druid.frame.processor.FrameProcessorExecutor.1ExecutorRunnable.1
                    public void onSuccess(V v) {
                        try {
                            FrameProcessorExecutor.this.exec.execute(C1ExecutorRunnable.this);
                        } catch (Throwable th) {
                            fail(th);
                        }
                    }

                    public void onFailure(Throwable th) {
                        if (registerCancelableFuture2.isCancelled()) {
                            return;
                        }
                        fail(th);
                    }
                }, MoreExecutors.directExecutor());
            }

            private void succeed(T t) {
                try {
                    doProcessorCleanup();
                    registerCancelableFuture.set(t);
                } catch (Throwable th) {
                    registerCancelableFuture.setException(th);
                }
            }

            /* JADX INFO: Access modifiers changed from: private */
            public void fail(Throwable th) {
                Iterator it = outputChannels.iterator();
                while (it.hasNext()) {
                    try {
                        ((WritableFrameChannel) it.next()).fail(th);
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                try {
                    doProcessorCleanup();
                } catch (Throwable th3) {
                    th.addSuppressed(th3);
                }
                registerCancelableFuture.setException(th);
            }

            void doProcessorCleanup() throws IOException {
                boolean z;
                if (str != null) {
                    synchronized (FrameProcessorExecutor.this.lock) {
                        z = FrameProcessorExecutor.this.cancelableProcessors.remove(str, frameProcessor);
                    }
                } else {
                    z = true;
                }
                if (z) {
                    frameProcessor.cleanup();
                }
            }
        };
        registerCancelableFuture.addListener(() -> {
            boolean remove;
            logProcessorStatusString(frameProcessor, registerCancelableFuture, null);
            if (!registerCancelableFuture.isCancelled() || str == null) {
                return;
            }
            synchronized (this.lock) {
                remove = this.cancelableProcessors.remove(str, frameProcessor);
            }
            if (remove) {
                try {
                    cancel(Collections.singleton(frameProcessor));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }, Execs.directExecutor());
        logProcessorStatusString(frameProcessor, registerCancelableFuture, null);
        registerCancelableProcessor(frameProcessor, str);
        this.exec.execute(runnable);
        return registerCancelableFuture;
    }

    public <T, R> ListenableFuture<R> runAllFully(ProcessorManager<T, R> processorManager, int i, Bouncer bouncer, @Nullable String str) {
        return new RunAllFullyWidget(processorManager, this, i, bouncer, str).run();
    }

    public void registerCancellationId(String str) {
        synchronized (this.lock) {
            this.activeCancellationIds.add(str);
        }
    }

    public void cancel(String str) throws InterruptedException {
        Set removeAll;
        Set<FrameProcessor<?>> removeAll2;
        Set removeAll3;
        Preconditions.checkNotNull(str, "cancellationId");
        synchronized (this.lock) {
            this.activeCancellationIds.remove(str);
            removeAll = this.cancelableFutures.removeAll(str);
            removeAll2 = this.cancelableProcessors.removeAll(str);
            removeAll3 = this.cancelableReturnFutures.removeAll(str);
        }
        Iterator it = removeAll.iterator();
        while (it.hasNext()) {
            ((ListenableFuture) it.next()).cancel(true);
        }
        cancel(removeAll2);
        Iterator it2 = removeAll3.iterator();
        while (it2.hasNext()) {
            ((ListenableFuture) it2.next()).cancel(true);
        }
    }

    public Executor asExecutor(@Nullable String str) {
        return runnable -> {
            runFully(new RunnableFrameProcessor(runnable), str);
        };
    }

    public void shutdownNow() {
        this.exec.shutdownNow();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ListeningExecutorService getExecutorService() {
        return this.exec;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T, FutureType extends ListenableFuture<T>> FutureType registerCancelableFuture(FutureType futuretype, boolean z, @Nullable String str) {
        if (str != null) {
            synchronized (this.lock) {
                if (!this.activeCancellationIds.contains(str)) {
                    futuretype.cancel(true);
                    return futuretype;
                }
                SetMultimap<String, ListenableFuture<?>> setMultimap = z ? this.cancelableReturnFutures : this.cancelableFutures;
                setMultimap.put(str, futuretype);
                futuretype.addListener(() -> {
                    synchronized (this.lock) {
                        setMultimap.remove(str, futuretype);
                    }
                }, Execs.directExecutor());
            }
        }
        return futuretype;
    }

    @VisibleForTesting
    int cancelableProcessorCount() {
        int size;
        synchronized (this.lock) {
            size = this.cancelableProcessors.size();
        }
        return size;
    }

    private void cancel(Set<FrameProcessor<?>> set) throws InterruptedException {
        synchronized (this.lock) {
            Iterator<FrameProcessor<?>> it = set.iterator();
            while (it.hasNext()) {
                Thread thread = this.runningProcessors.get(it.next());
                if (thread != null) {
                    thread.interrupt();
                }
            }
            while (anyIsRunning(set)) {
                this.lock.wait();
            }
        }
        for (FrameProcessor<?> frameProcessor : set) {
            Iterator<WritableFrameChannel> it2 = frameProcessor.outputChannels().iterator();
            while (it2.hasNext()) {
                try {
                    it2.next().fail(new CancellationException("Canceled"));
                } catch (Throwable th) {
                    log.debug(th, "Exception encountered while marking output channel failed for processor [%s]", frameProcessor);
                }
            }
            try {
                frameProcessor.cleanup();
            } catch (Throwable th2) {
                log.noStackTrace().warn(th2, "Exception encountered while canceling processor [%s]", frameProcessor);
            }
        }
    }

    private <T> void registerCancelableProcessor(FrameProcessor<T> frameProcessor, @Nullable String str) {
        if (str != null) {
            synchronized (this.lock) {
                this.cancelableProcessors.put(str, frameProcessor);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> void logProcessorStatusString(FrameProcessor<T> frameProcessor, ListenableFuture<?> listenableFuture, @Nullable List<ListenableFuture<?>> list) {
        if (log.isDebugEnabled()) {
            StringBuilder append = new StringBuilder().append("Processor [").append(frameProcessor).append("]; in=[");
            for (ReadableFrameChannel readableFrameChannel : frameProcessor.inputChannels()) {
                if (readableFrameChannel.canRead()) {
                    append.append("R");
                } else if (readableFrameChannel.isFinished()) {
                    append.append("D");
                } else {
                    append.append("~");
                }
            }
            append.append("]");
            if (list != null) {
                append.append("; out=[");
                Iterator<ListenableFuture<?>> it = list.iterator();
                while (it.hasNext()) {
                    if (it.next().isDone()) {
                        append.append("W");
                    } else {
                        append.append("~");
                    }
                }
                append.append("]");
            }
            append.append("; cancel=").append(listenableFuture.isCancelled() ? "y" : "n");
            append.append("; done=").append(listenableFuture.isDone() ? "y" : "n");
            log.debug(StringUtils.encodeForFormat(append.toString()), new Object[0]);
        }
    }

    @GuardedBy("lock")
    private boolean anyIsRunning(Set<FrameProcessor<?>> set) {
        Iterator<FrameProcessor<?>> it = set.iterator();
        while (it.hasNext()) {
            if (this.runningProcessors.containsKey(it.next())) {
                return true;
            }
        }
        return false;
    }
}
