/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.hybrid.BufferIndexAndChannel;
import org.apache.flink.runtime.io.network.partition.hybrid.BufferWithIdentity;
import org.apache.flink.runtime.io.network.partition.hybrid.HsConsumerId;
import org.apache.flink.runtime.io.network.partition.hybrid.HsDataView;
import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex;
import org.apache.flink.runtime.io.network.partition.hybrid.HsMemoryDataManagerOperation;
import org.apache.flink.runtime.io.network.partition.hybrid.HsMemoryDataSpiller;
import org.apache.flink.runtime.io.network.partition.hybrid.HsOutputMetrics;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionConsumerInternalOperations;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionMemoryDataManager;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HsMemoryDataManager
implements HsSpillingInfoProvider,
HsMemoryDataManagerOperation {
    private static final Logger LOG = LoggerFactory.getLogger(HsMemoryDataManager.class);
    private final int numSubpartitions;
    private final HsSubpartitionMemoryDataManager[] subpartitionMemoryDataManagers;
    private final HsMemoryDataSpiller spiller;
    private final HsSpillingStrategy spillStrategy;
    private final HsFileDataIndex fileDataIndex;
    private final BufferPool bufferPool;
    private final Lock lock;
    private final AtomicInteger numRequestedBuffers = new AtomicInteger(0);
    private final AtomicInteger numUnSpillBuffers = new AtomicInteger(0);
    private final List<Map<HsConsumerId, HsSubpartitionConsumerInternalOperations>> subpartitionViewOperationsMap;
    private final ScheduledExecutorService poolSizeChecker = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new ExecutorThreadFactory("hybrid-shuffle-pool-size-checker-executor"));
    private final AtomicInteger poolSize;

    public HsMemoryDataManager(int numSubpartitions, int bufferSize, BufferPool bufferPool, HsSpillingStrategy spillStrategy, HsFileDataIndex fileDataIndex, Path dataFilePath, BufferCompressor bufferCompressor, long poolSizeCheckInterval) throws IOException {
        this.numSubpartitions = numSubpartitions;
        this.bufferPool = bufferPool;
        this.spiller = new HsMemoryDataSpiller(dataFilePath);
        this.spillStrategy = spillStrategy;
        this.fileDataIndex = fileDataIndex;
        this.subpartitionMemoryDataManagers = new HsSubpartitionMemoryDataManager[numSubpartitions];
        ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
        this.lock = readWriteLock.writeLock();
        this.subpartitionViewOperationsMap = new ArrayList<Map<HsConsumerId, HsSubpartitionConsumerInternalOperations>>(numSubpartitions);
        for (int subpartitionId = 0; subpartitionId < numSubpartitions; ++subpartitionId) {
            this.subpartitionMemoryDataManagers[subpartitionId] = new HsSubpartitionMemoryDataManager(subpartitionId, bufferSize, readWriteLock.readLock(), bufferCompressor, this);
            this.subpartitionViewOperationsMap.add(new ConcurrentHashMap());
        }
        this.poolSize = new AtomicInteger(this.bufferPool.getNumBuffers());
        if (poolSizeCheckInterval > 0L) {
            this.poolSizeChecker.scheduleAtFixedRate(() -> {
                int newSize = this.bufferPool.getNumBuffers();
                int oldSize = this.poolSize.getAndSet(newSize);
                if (oldSize > newSize) {
                    this.handleDecision(Optional.empty());
                }
            }, poolSizeCheckInterval, poolSizeCheckInterval, TimeUnit.MILLISECONDS);
        }
    }

    public void append(ByteBuffer record, int targetChannel, Buffer.DataType dataType) throws IOException {
        try {
            this.getSubpartitionMemoryDataManager(targetChannel).append(record, dataType);
        }
        catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    public HsDataView registerNewConsumer(int subpartitionId, HsConsumerId consumerId, HsSubpartitionConsumerInternalOperations viewOperations) {
        HsSubpartitionConsumerInternalOperations oldView = this.subpartitionViewOperationsMap.get(subpartitionId).put(consumerId, viewOperations);
        Preconditions.checkState((oldView == null ? 1 : 0) != 0, (Object)"Each subpartition view should have unique consumerId.");
        return this.getSubpartitionMemoryDataManager(subpartitionId).registerNewConsumer(consumerId);
    }

    public void close() {
        this.spillAndReleaseAllData();
        this.spiller.close();
        this.poolSizeChecker.shutdown();
    }

    private void spillAndReleaseAllData() {
        HsSpillingStrategy.Decision decision = (HsSpillingStrategy.Decision)this.callWithLock(() -> this.spillStrategy.onResultPartitionClosed(this));
        this.handleDecision(Optional.of(decision));
    }

    public void setOutputMetrics(HsOutputMetrics metrics) {
        for (int i = 0; i < this.numSubpartitions; ++i) {
            this.getSubpartitionMemoryDataManager(i).setOutputMetrics(metrics);
        }
    }

    @Override
    public int getPoolSize() {
        return this.poolSize.get();
    }

    @Override
    public int getNumSubpartitions() {
        return this.numSubpartitions;
    }

    @Override
    public int getNumTotalRequestedBuffers() {
        return this.numRequestedBuffers.get();
    }

    @Override
    public int getNumTotalUnSpillBuffers() {
        return this.numUnSpillBuffers.get();
    }

    @Override
    public Deque<BufferIndexAndChannel> getBuffersInOrder(int subpartitionId, HsSpillingInfoProvider.SpillStatus spillStatus, HsSpillingInfoProvider.ConsumeStatusWithId consumeStatusWithId) {
        HsSubpartitionMemoryDataManager targetSubpartitionDataManager = this.getSubpartitionMemoryDataManager(subpartitionId);
        return targetSubpartitionDataManager.getBuffersSatisfyStatus(spillStatus, consumeStatusWithId);
    }

    @Override
    public List<Integer> getNextBufferIndexToConsume(HsConsumerId consumerId) {
        ArrayList<Integer> consumeIndexes = new ArrayList<Integer>(this.numSubpartitions);
        for (int channel = 0; channel < this.numSubpartitions; ++channel) {
            HsSubpartitionConsumerInternalOperations viewOperation = this.subpartitionViewOperationsMap.get(channel).get(consumerId);
            consumeIndexes.add(viewOperation == null ? -1 : viewOperation.getConsumingOffset(false) + 1);
        }
        return consumeIndexes;
    }

    @Override
    public void markBufferReleasedFromFile(int subpartitionId, int bufferIndex) {
        this.fileDataIndex.markBufferReleased(subpartitionId, bufferIndex);
    }

    @Override
    public BufferBuilder requestBufferFromPool() throws InterruptedException {
        MemorySegment segment = this.bufferPool.requestMemorySegmentBlocking();
        Optional<HsSpillingStrategy.Decision> decisionOpt = this.spillStrategy.onMemoryUsageChanged(this.numRequestedBuffers.incrementAndGet(), this.getPoolSize());
        this.handleDecision(decisionOpt);
        return new BufferBuilder(segment, this::recycleBuffer);
    }

    @Override
    public void onBufferConsumed(BufferIndexAndChannel consumedBuffer) {
        Optional<HsSpillingStrategy.Decision> decision = this.spillStrategy.onBufferConsumed(consumedBuffer);
        this.handleDecision(decision);
    }

    @Override
    public void onBufferFinished() {
        Optional<HsSpillingStrategy.Decision> decision = this.spillStrategy.onBufferFinished(this.numUnSpillBuffers.incrementAndGet(), this.getPoolSize());
        this.handleDecision(decision);
    }

    @Override
    public void onDataAvailable(int subpartitionId, Collection<HsConsumerId> consumerIds) {
        Map<HsConsumerId, HsSubpartitionConsumerInternalOperations> consumerViewMap = this.subpartitionViewOperationsMap.get(subpartitionId);
        consumerIds.forEach(consumerId -> {
            HsSubpartitionConsumerInternalOperations consumerView = (HsSubpartitionConsumerInternalOperations)consumerViewMap.get(consumerId);
            if (consumerView != null) {
                consumerView.notifyDataAvailable();
            }
        });
    }

    @Override
    public void onConsumerReleased(int subpartitionId, HsConsumerId consumerId) {
        this.subpartitionViewOperationsMap.get(subpartitionId).remove(consumerId);
        this.getSubpartitionMemoryDataManager(subpartitionId).releaseConsumer(consumerId);
    }

    private void handleDecision(Optional<HsSpillingStrategy.Decision> decisionOpt) {
        HsSpillingStrategy.Decision decision = decisionOpt.orElseGet(() -> (HsSpillingStrategy.Decision)this.callWithLock(() -> this.spillStrategy.decideActionWithGlobalInfo(this)));
        if (!decision.getBufferToSpill().isEmpty()) {
            this.spillBuffers(decision.getBufferToSpill());
        }
        if (!decision.getBufferToRelease().isEmpty()) {
            this.releaseBuffers(decision.getBufferToRelease());
        }
    }

    private void spillBuffers(Map<Integer, List<BufferIndexAndChannel>> toSpill) {
        CompletableFuture spillingCompleteFuture = new CompletableFuture();
        ArrayList<BufferWithIdentity> bufferWithIdentities = new ArrayList<BufferWithIdentity>();
        toSpill.forEach((subpartitionId, bufferIndexAndChannels) -> {
            HsSubpartitionMemoryDataManager subpartitionDataManager = this.getSubpartitionMemoryDataManager((int)subpartitionId);
            bufferWithIdentities.addAll(subpartitionDataManager.spillSubpartitionBuffers((List<BufferIndexAndChannel>)bufferIndexAndChannels, spillingCompleteFuture));
            this.numUnSpillBuffers.getAndAdd(-bufferIndexAndChannels.size());
        });
        FutureUtils.assertNoException((CompletableFuture)this.spiller.spillAsync(bufferWithIdentities).thenAccept(spilledBuffers -> {
            this.fileDataIndex.addBuffers((List<HsFileDataIndex.SpilledBuffer>)spilledBuffers);
            spillingCompleteFuture.complete(null);
        }));
    }

    private void releaseBuffers(Map<Integer, List<BufferIndexAndChannel>> toRelease) {
        toRelease.forEach((subpartitionId, subpartitionBuffers) -> this.getSubpartitionMemoryDataManager((int)subpartitionId).releaseSubpartitionBuffers((List<BufferIndexAndChannel>)subpartitionBuffers));
    }

    private HsSubpartitionMemoryDataManager getSubpartitionMemoryDataManager(int targetChannel) {
        return this.subpartitionMemoryDataManagers[targetChannel];
    }

    private void recycleBuffer(MemorySegment buffer) {
        this.numRequestedBuffers.decrementAndGet();
        this.bufferPool.recycle(buffer);
    }

    private <T, R extends Exception> T callWithLock(SupplierWithException<T, R> callable) throws R {
        try {
            this.lock.lock();
            Object object = callable.get();
            return (T)object;
        }
        finally {
            this.lock.unlock();
        }
    }
}

