/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.HashSet;
import java.util.Set;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionDirectTransferReader;
import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionReader;
import org.apache.flink.runtime.io.network.partition.BoundedData;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.FileChannelBoundedData;
import org.apache.flink.runtime.io.network.partition.FileChannelMemoryMappedBoundedData;
import org.apache.flink.runtime.io.network.partition.MemoryMappedBoundedData;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;

final class BoundedBlockingSubpartition
extends ResultSubpartition {
    private final Object lock = new Object();
    @Nullable
    private BufferConsumer currentBuffer;
    private final BoundedData data;
    @GuardedBy(value="lock")
    private final Set<ResultSubpartitionView> readers;
    private final boolean useDirectFileTransfer;
    private int numDataBuffersWritten;
    private int numBuffersAndEventsWritten;
    private boolean isFinished;
    private boolean isReleased;

    public BoundedBlockingSubpartition(int index, ResultPartition parent, BoundedData data, boolean useDirectFileTransfer) {
        super(index, parent);
        this.data = Preconditions.checkNotNull(data);
        this.useDirectFileTransfer = useDirectFileTransfer;
        this.readers = new HashSet<ResultSubpartitionView>();
    }

    public boolean isFinished() {
        return this.isFinished;
    }

    @Override
    public boolean isReleased() {
        return this.isReleased;
    }

    @Override
    public int add(BufferConsumer bufferConsumer, int partialRecordLength) throws IOException {
        if (this.isFinished()) {
            bufferConsumer.close();
            return -1;
        }
        this.flushCurrentBuffer();
        this.currentBuffer = bufferConsumer;
        return Integer.MAX_VALUE;
    }

    @Override
    public void flush() {
        try {
            this.flushCurrentBuffer();
        }
        catch (IOException e) {
            throw new FlinkRuntimeException(e.getMessage(), e);
        }
    }

    private void flushCurrentBuffer() throws IOException {
        if (this.currentBuffer != null) {
            this.writeAndCloseBufferConsumer(this.currentBuffer);
            this.currentBuffer = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writeAndCloseBufferConsumer(BufferConsumer bufferConsumer) throws IOException {
        try {
            Buffer buffer = bufferConsumer.build();
            try {
                if (this.parent.canBeCompressed(buffer)) {
                    Buffer compressedBuffer = this.parent.bufferCompressor.compressToIntermediateBuffer(buffer);
                    this.data.writeBuffer(compressedBuffer);
                    if (compressedBuffer != buffer) {
                        compressedBuffer.recycleBuffer();
                    }
                } else {
                    this.data.writeBuffer(buffer);
                }
                ++this.numBuffersAndEventsWritten;
                if (buffer.isBuffer()) {
                    ++this.numDataBuffersWritten;
                }
            }
            finally {
                buffer.recycleBuffer();
            }
        }
        finally {
            bufferConsumer.close();
        }
    }

    @Override
    public int finish() throws IOException {
        Preconditions.checkState(!this.isReleased, "data partition already released");
        Preconditions.checkState(!this.isFinished, "data partition already finished");
        this.isFinished = true;
        this.flushCurrentBuffer();
        BufferConsumer eventBufferConsumer = EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE, false);
        this.writeAndCloseBufferConsumer(eventBufferConsumer);
        this.data.finishWrite();
        return eventBufferConsumer.getWrittenBytes();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void release() throws IOException {
        Object object = this.lock;
        synchronized (object) {
            if (this.isReleased) {
                return;
            }
            this.isReleased = true;
            this.isFinished = true;
            if (this.currentBuffer != null) {
                this.currentBuffer.close();
                this.currentBuffer = null;
            }
            this.checkReaderReferencesAndDispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ResultSubpartitionView createReadView(BufferAvailabilityListener availability) throws IOException {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState(!this.isReleased, "data partition already released");
            Preconditions.checkState(this.isFinished, "writing of blocking partition not yet finished");
            if (!Files.isReadable(this.data.getFilePath())) {
                throw new PartitionNotFoundException(this.parent.getPartitionId());
            }
            ResultSubpartitionView reader = this.useDirectFileTransfer ? new BoundedBlockingSubpartitionDirectTransferReader(this, this.data.getFilePath(), this.numDataBuffersWritten, this.numBuffersAndEventsWritten) : new BoundedBlockingSubpartitionReader(this, this.data, this.numDataBuffersWritten, availability);
            this.readers.add(reader);
            return reader;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void releaseReaderReference(ResultSubpartitionView reader) throws IOException {
        this.onConsumedSubpartition();
        Object object = this.lock;
        synchronized (object) {
            if (this.readers.remove(reader) && this.isReleased) {
                this.checkReaderReferencesAndDispose();
            }
        }
    }

    @GuardedBy(value="lock")
    private void checkReaderReferencesAndDispose() throws IOException {
        assert (Thread.holdsLock(this.lock));
        if (this.readers.isEmpty()) {
            this.data.close();
        }
    }

    @VisibleForTesting
    public BufferConsumer getCurrentBuffer() {
        return this.currentBuffer;
    }

    @Override
    public int unsynchronizedGetNumberOfQueuedBuffers() {
        return 0;
    }

    @Override
    public int getNumberOfQueuedBuffers() {
        return 0;
    }

    @Override
    public void bufferSize(int desirableNewBufferSize) {
    }

    @Override
    protected long getTotalNumberOfBuffersUnsafe() {
        return this.numBuffersAndEventsWritten;
    }

    @Override
    protected long getTotalNumberOfBytesUnsafe() {
        return this.data.getSize();
    }

    @Override
    public void alignedBarrierTimeout(long checkpointId) {
    }

    @Override
    public void abortCheckpoint(long checkpointId, CheckpointException cause) {
    }

    @Override
    int getBuffersInBacklogUnsafe() {
        return this.numDataBuffersWritten;
    }

    public static BoundedBlockingSubpartition createWithFileChannel(int index, ResultPartition parent, File tempFile, int readBufferSize, boolean sslEnabled) throws IOException {
        FileChannelBoundedData bd = FileChannelBoundedData.create(tempFile.toPath(), readBufferSize);
        return new BoundedBlockingSubpartition(index, parent, bd, !sslEnabled);
    }

    public static BoundedBlockingSubpartition createWithMemoryMappedFile(int index, ResultPartition parent, File tempFile) throws IOException {
        MemoryMappedBoundedData bd = MemoryMappedBoundedData.create(tempFile.toPath());
        return new BoundedBlockingSubpartition(index, parent, bd, false);
    }

    public static BoundedBlockingSubpartition createWithFileAndMemoryMappedReader(int index, ResultPartition parent, File tempFile) throws IOException {
        FileChannelMemoryMappedBoundedData bd = FileChannelMemoryMappedBoundedData.create(tempFile.toPath());
        return new BoundedBlockingSubpartition(index, parent, bd, false);
    }
}

