/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.PullingAsyncDataInput;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.partition.PrioritizedDeque;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
import org.apache.flink.runtime.io.network.partition.consumer.GateNotificationHelper;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;

public class UnionInputGate
extends InputGate {
    private final Map<Integer, InputGate> inputGatesByGateIndex;
    private final Set<IndexedInputGate> inputGatesWithRemainingData;
    private final Set<IndexedInputGate> inputGatesWithRemainingUserData;
    private boolean shouldDrainOnEndOfData = true;
    private final PrioritizedDeque<IndexedInputGate> inputGatesWithData = new PrioritizedDeque();
    private final int[] inputChannelToInputGateIndex;
    private final int[] inputGateChannelIndexOffsets;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public UnionInputGate(IndexedInputGate ... inputGates) {
        this.inputGatesByGateIndex = Arrays.stream(inputGates).collect(Collectors.toMap(IndexedInputGate::getGateIndex, ig -> ig));
        Preconditions.checkArgument(inputGates.length > 1, "Union input gate should union at least two input gates.");
        if (Arrays.stream(inputGates).map(IndexedInputGate::getGateIndex).distinct().count() != (long)inputGates.length) {
            throw new IllegalArgumentException("Union of two input gates with the same gate index. Given indices: " + Arrays.stream(inputGates).map(IndexedInputGate::getGateIndex).collect(Collectors.toList()));
        }
        this.inputGatesWithRemainingData = Sets.newHashSetWithExpectedSize(inputGates.length);
        this.inputGatesWithRemainingUserData = Sets.newHashSetWithExpectedSize(inputGates.length);
        int maxGateIndex = Arrays.stream(inputGates).mapToInt(IndexedInputGate::getGateIndex).max().orElse(0);
        int totalNumberOfInputChannels = Arrays.stream(inputGates).mapToInt(CheckpointableInput::getNumberOfInputChannels).sum();
        this.inputGateChannelIndexOffsets = new int[maxGateIndex + 1];
        this.inputChannelToInputGateIndex = new int[totalNumberOfInputChannels];
        int currentNumberOfInputChannels = 0;
        for (IndexedInputGate inputGate : inputGates) {
            this.inputGateChannelIndexOffsets[inputGate.getGateIndex()] = currentNumberOfInputChannels;
            int previousNumberOfInputChannels = currentNumberOfInputChannels;
            Arrays.fill(this.inputChannelToInputGateIndex, previousNumberOfInputChannels, currentNumberOfInputChannels += inputGate.getNumberOfInputChannels(), inputGate.getGateIndex());
        }
        PrioritizedDeque<IndexedInputGate> prioritizedDeque = this.inputGatesWithData;
        synchronized (prioritizedDeque) {
            for (IndexedInputGate inputGate : inputGates) {
                this.inputGatesWithRemainingData.add(inputGate);
                this.inputGatesWithRemainingUserData.add(inputGate);
                CompletableFuture<?> available = inputGate.getAvailableFuture();
                if (available.isDone()) {
                    this.inputGatesWithData.add(inputGate);
                } else {
                    FutureUtils.assertNoException(available.thenRun(() -> this.queueInputGate(inputGate, false)));
                }
                FutureUtils.assertNoException(inputGate.getPriorityEventAvailableFuture().thenRun(() -> this.handlePriorityEventAvailable(inputGate)));
            }
            if (!this.inputGatesWithData.isEmpty()) {
                this.availabilityHelper.resetAvailable();
            }
        }
    }

    private void handlePriorityEventAvailable(IndexedInputGate inputGate) {
        this.queueInputGate(inputGate, true);
    }

    @Override
    public int getNumberOfInputChannels() {
        return this.inputChannelToInputGateIndex.length;
    }

    @Override
    public InputChannel getChannel(int channelIndex) {
        int gateIndex = this.inputChannelToInputGateIndex[channelIndex];
        return this.inputGatesByGateIndex.get(gateIndex).getChannel(channelIndex - this.inputGateChannelIndexOffsets[gateIndex]);
    }

    @Override
    public boolean isFinished() {
        return this.inputGatesWithRemainingData.isEmpty();
    }

    @Override
    public PullingAsyncDataInput.EndOfDataStatus hasReceivedEndOfData() {
        if (!this.inputGatesWithRemainingUserData.isEmpty()) {
            return PullingAsyncDataInput.EndOfDataStatus.NOT_END_OF_DATA;
        }
        if (this.shouldDrainOnEndOfData) {
            return PullingAsyncDataInput.EndOfDataStatus.DRAINED;
        }
        return PullingAsyncDataInput.EndOfDataStatus.STOPPED;
    }

    @Override
    public Optional<BufferOrEvent> getNext() throws IOException, InterruptedException {
        return this.getNextBufferOrEvent(true);
    }

    @Override
    public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
        return this.getNextBufferOrEvent(false);
    }

    private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IOException, InterruptedException {
        if (this.inputGatesWithRemainingData.isEmpty()) {
            return Optional.empty();
        }
        Optional<InputGate.InputWithData<IndexedInputGate, BufferOrEvent>> next = this.waitAndGetNextData(blocking);
        if (!next.isPresent()) {
            return Optional.empty();
        }
        InputGate.InputWithData<IndexedInputGate, BufferOrEvent> inputWithData = next.get();
        this.handleEndOfPartitionEvent((BufferOrEvent)inputWithData.data, (InputGate)inputWithData.input);
        this.handleEndOfUserDataEvent((BufferOrEvent)inputWithData.data, (InputGate)inputWithData.input);
        if (!((BufferOrEvent)inputWithData.data).moreAvailable()) {
            ((BufferOrEvent)inputWithData.data).setMoreAvailable(inputWithData.moreAvailable);
        }
        return Optional.of(inputWithData.data);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private Optional<InputGate.InputWithData<IndexedInputGate, BufferOrEvent>> waitAndGetNextData(boolean blocking) throws IOException, InterruptedException {
        while (true) {
            PrioritizedDeque<IndexedInputGate> prioritizedDeque = this.inputGatesWithData;
            synchronized (prioritizedDeque) {
                Optional<IndexedInputGate> inputGateOpt = this.getInputGate(blocking);
                if (!inputGateOpt.isPresent()) {
                    return Optional.empty();
                }
                IndexedInputGate inputGate = inputGateOpt.get();
                Optional<BufferOrEvent> nextOpt = inputGate.pollNext();
                if (nextOpt.isPresent()) {
                    return Optional.of(this.processBufferOrEvent(inputGate, nextOpt.get()));
                }
                FutureUtils.assertNoException(inputGate.getAvailableFuture().thenRun(() -> this.queueInputGate(inputGate, false)));
            }
        }
    }

    private InputGate.InputWithData<IndexedInputGate, BufferOrEvent> processBufferOrEvent(IndexedInputGate inputGate, BufferOrEvent bufferOrEvent) {
        boolean morePriorityEvents;
        assert (Thread.holdsLock(this.inputGatesWithData));
        if (bufferOrEvent.moreAvailable()) {
            this.inputGatesWithData.add(inputGate, bufferOrEvent.morePriorityEvents(), false);
        } else if (!inputGate.isFinished()) {
            FutureUtils.assertNoException(inputGate.getAvailableFuture().thenRun(() -> this.queueInputGate(inputGate, false)));
        }
        if (bufferOrEvent.hasPriority() && !bufferOrEvent.morePriorityEvents()) {
            FutureUtils.assertNoException(inputGate.getPriorityEventAvailableFuture().thenRun(() -> this.handlePriorityEventAvailable(inputGate)));
        }
        boolean bl = morePriorityEvents = this.inputGatesWithData.getNumPriorityElements() > 0;
        if (bufferOrEvent.hasPriority() && !morePriorityEvents) {
            this.priorityAvailabilityHelper.resetUnavailable();
        }
        return new InputGate.InputWithData<IndexedInputGate, BufferOrEvent>(inputGate, bufferOrEvent, !this.inputGatesWithData.isEmpty(), morePriorityEvents);
    }

    private void handleEndOfPartitionEvent(BufferOrEvent bufferOrEvent, InputGate inputGate) {
        if (bufferOrEvent.isEvent() && bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class && inputGate.isFinished()) {
            Preconditions.checkState(!bufferOrEvent.moreAvailable());
            if (!this.inputGatesWithRemainingData.remove(inputGate)) {
                throw new IllegalStateException("Couldn't find input gate in set of remaining input gates.");
            }
            if (this.isFinished()) {
                this.markAvailable();
            }
        }
    }

    private void handleEndOfUserDataEvent(BufferOrEvent bufferOrEvent, InputGate inputGate) {
        if (bufferOrEvent.isEvent() && bufferOrEvent.getEvent().getClass() == EndOfData.class && inputGate.hasReceivedEndOfData() != PullingAsyncDataInput.EndOfDataStatus.NOT_END_OF_DATA) {
            this.shouldDrainOnEndOfData &= inputGate.hasReceivedEndOfData() == PullingAsyncDataInput.EndOfDataStatus.DRAINED;
            if (!this.inputGatesWithRemainingUserData.remove(inputGate)) {
                throw new IllegalStateException("Couldn't find input gate in set of remaining input gates.");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void markAvailable() {
        CompletableFuture<?> toNotify;
        PrioritizedDeque<IndexedInputGate> prioritizedDeque = this.inputGatesWithData;
        synchronized (prioritizedDeque) {
            toNotify = this.availabilityHelper.getUnavailableToResetAvailable();
        }
        toNotify.complete(null);
    }

    @Override
    public void sendTaskEvent(TaskEvent event) throws IOException {
        for (InputGate inputGate : this.inputGatesByGateIndex.values()) {
            inputGate.sendTaskEvent(event);
        }
    }

    @Override
    public void resumeConsumption(InputChannelInfo channelInfo) throws IOException {
        this.inputGatesByGateIndex.get(channelInfo.getGateIdx()).resumeConsumption(channelInfo);
    }

    @Override
    public void acknowledgeAllRecordsProcessed(InputChannelInfo channelInfo) throws IOException {
        this.inputGatesByGateIndex.get(channelInfo.getGateIdx()).acknowledgeAllRecordsProcessed(channelInfo);
    }

    @Override
    public void setup() {
    }

    @Override
    public CompletableFuture<Void> getStateConsumedFuture() {
        return CompletableFuture.allOf(this.inputGatesByGateIndex.values().stream().map(InputGate::getStateConsumedFuture).collect(Collectors.toList()).toArray(new CompletableFuture[0]));
    }

    @Override
    public void requestPartitions() throws IOException {
        for (InputGate inputGate : this.inputGatesByGateIndex.values()) {
            inputGate.requestPartitions();
        }
    }

    @Override
    public void close() throws IOException {
    }

    public String toString() {
        return "UnionInputGate{inputGates=" + this.inputGatesByGateIndex.values() + '}';
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     * Converted monitor instructions to comments
     * Lifted jumps to return sites
     */
    private void queueInputGate(IndexedInputGate inputGate, boolean priority) {
        boolean alreadyEnqueued;
        Throwable throwable;
        GateNotificationHelper notification;
        block25: {
            block26: {
                block23: {
                    block24: {
                        Preconditions.checkNotNull(inputGate);
                        notification = new GateNotificationHelper(this, this.inputGatesWithData);
                        throwable = null;
                        PrioritizedDeque<IndexedInputGate> prioritizedDeque = this.inputGatesWithData;
                        // MONITORENTER : prioritizedDeque
                        alreadyEnqueued = this.inputGatesWithData.contains(inputGate);
                        if (!alreadyEnqueued || priority && !this.inputGatesWithData.containsPriorityElement(inputGate)) break block23;
                        // MONITOREXIT : prioritizedDeque
                        if (notification == null) return;
                        if (throwable == null) break block24;
                        try {
                            notification.close();
                            return;
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                            return;
                        }
                    }
                    notification.close();
                    return;
                }
                try {
                    if (!priority || inputGate.getPriorityEventAvailableFuture().isDone()) break block25;
                    // MONITOREXIT : prioritizedDeque
                    if (notification == null) return;
                    if (throwable == null) break block26;
                }
                catch (Throwable throwable3) {
                    throwable = throwable3;
                    throw throwable3;
                }
                try {
                    notification.close();
                    return;
                }
                catch (Throwable throwable4) {
                    throwable.addSuppressed(throwable4);
                    return;
                }
            }
            notification.close();
            return;
        }
        try {
            this.inputGatesWithData.add(inputGate, priority, alreadyEnqueued);
            if (priority && this.inputGatesWithData.getNumPriorityElements() == 1) {
                notification.notifyPriority();
            }
            if (this.inputGatesWithData.size() == 1) {
                notification.notifyDataAvailable();
            }
            // MONITOREXIT : prioritizedDeque
            return;
        }
        catch (Throwable throwable5) {
            throw throwable5;
        }
        finally {
            if (notification != null) {
                if (throwable != null) {
                    try {
                        notification.close();
                    }
                    catch (Throwable throwable6) {
                        throwable.addSuppressed(throwable6);
                    }
                } else {
                    notification.close();
                }
            }
        }
    }

    private Optional<IndexedInputGate> getInputGate(boolean blocking) throws InterruptedException {
        assert (Thread.holdsLock(this.inputGatesWithData));
        while (this.inputGatesWithData.isEmpty()) {
            if (blocking) {
                this.inputGatesWithData.wait();
                continue;
            }
            this.availabilityHelper.resetUnavailable();
            return Optional.empty();
        }
        return Optional.of(this.inputGatesWithData.poll());
    }

    @Override
    public void finishReadRecoveredState() throws IOException {
        for (InputGate inputGate : this.inputGatesByGateIndex.values()) {
            inputGate.finishReadRecoveredState();
        }
    }
}

