/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.shaded.opensearch2.org.opensearch.indices.replication;

import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.function.IntSupplier;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.graylog.shaded.opensearch2.org.apache.lucene.index.CorruptIndexException;
import org.graylog.shaded.opensearch2.org.apache.lucene.store.IOContext;
import org.graylog.shaded.opensearch2.org.apache.lucene.store.IndexInput;
import org.graylog.shaded.opensearch2.org.apache.lucene.util.ArrayUtil;
import org.graylog.shaded.opensearch2.org.opensearch.ExceptionsHelper;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.node.DiscoveryNode;
import org.graylog.shaded.opensearch2.org.opensearch.common.lease.Releasable;
import org.graylog.shaded.opensearch2.org.opensearch.common.lucene.store.InputStreamIndexInput;
import org.graylog.shaded.opensearch2.org.opensearch.common.util.CancellableThreads;
import org.graylog.shaded.opensearch2.org.opensearch.common.util.io.IOUtils;
import org.graylog.shaded.opensearch2.org.opensearch.core.action.ActionListener;
import org.graylog.shaded.opensearch2.org.opensearch.core.common.bytes.BytesArray;
import org.graylog.shaded.opensearch2.org.opensearch.core.common.bytes.BytesReference;
import org.graylog.shaded.opensearch2.org.opensearch.index.shard.IndexShard;
import org.graylog.shaded.opensearch2.org.opensearch.index.store.Store;
import org.graylog.shaded.opensearch2.org.opensearch.index.store.StoreFileMetadata;
import org.graylog.shaded.opensearch2.org.opensearch.indices.recovery.FileChunkWriter;
import org.graylog.shaded.opensearch2.org.opensearch.indices.recovery.MultiChunkTransfer;
import org.graylog.shaded.opensearch2.org.opensearch.threadpool.ThreadPool;
import org.graylog.shaded.opensearch2.org.opensearch.transport.RemoteTransportException;
import org.graylog.shaded.opensearch2.org.opensearch.transport.Transports;

public final class SegmentFileTransferHandler {
    private final Logger logger;
    private final IndexShard shard;
    private final FileChunkWriter chunkWriter;
    private final ThreadPool threadPool;
    private final int chunkSizeInBytes;
    private final int maxConcurrentFileChunks;
    private final DiscoveryNode targetNode;
    private final CancellableThreads cancellableThreads;

    public SegmentFileTransferHandler(IndexShard shard, DiscoveryNode targetNode, FileChunkWriter chunkWriter, Logger logger, ThreadPool threadPool, CancellableThreads cancellableThreads, int fileChunkSizeInBytes, int maxConcurrentFileChunks) {
        this.shard = shard;
        this.targetNode = targetNode;
        this.chunkWriter = chunkWriter;
        this.logger = logger;
        this.threadPool = threadPool;
        this.cancellableThreads = cancellableThreads;
        this.chunkSizeInBytes = fileChunkSizeInBytes;
        this.maxConcurrentFileChunks = maxConcurrentFileChunks;
    }

    public MultiChunkTransfer<StoreFileMetadata, FileChunk> createTransfer(final Store store, StoreFileMetadata[] files, final IntSupplier translogOps, ActionListener<Void> listener) {
        ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetadata::length));
        return new MultiChunkTransfer<StoreFileMetadata, FileChunk>(this.logger, this.threadPool.getThreadContext(), listener, this.maxConcurrentFileChunks, Arrays.asList(files)){
            final Deque<byte[]> buffers;
            InputStreamIndexInput currentInput;
            long offset;
            {
                super(logger, threadContext, listener, maxConcurrentChunks, sources);
                this.buffers = new ConcurrentLinkedDeque<byte[]>();
                this.currentInput = null;
                this.offset = 0L;
            }

            @Override
            protected void onNewResource(StoreFileMetadata md) throws IOException {
                this.offset = 0L;
                IOUtils.close(this.currentInput, () -> {
                    this.currentInput = null;
                });
                final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE);
                this.currentInput = new InputStreamIndexInput(indexInput, md.length()){

                    @Override
                    public void close() throws IOException {
                        IOUtils.close(indexInput, () -> super.close());
                    }
                };
            }

            private byte[] acquireBuffer() {
                byte[] buffer = this.buffers.pollFirst();
                if (buffer != null) {
                    return buffer;
                }
                return new byte[SegmentFileTransferHandler.this.chunkSizeInBytes];
            }

            @Override
            protected FileChunk nextChunkRequest(StoreFileMetadata md) throws IOException {
                assert (Transports.assertNotTransportThread("read file chunk"));
                SegmentFileTransferHandler.this.cancellableThreads.checkForCancel();
                byte[] buffer = this.acquireBuffer();
                int bytesRead = this.currentInput.read(buffer);
                if (bytesRead == -1) {
                    throw new CorruptIndexException("file truncated; length=" + md.length() + " offset=" + this.offset, md.name());
                }
                boolean lastChunk = this.offset + (long)bytesRead == md.length();
                FileChunk chunk = new FileChunk(md, new BytesArray(buffer, 0, bytesRead), this.offset, lastChunk, () -> this.buffers.addFirst(buffer));
                this.offset += (long)bytesRead;
                return chunk;
            }

            @Override
            protected void executeChunkRequest(FileChunk request, ActionListener<Void> listener1) {
                SegmentFileTransferHandler.this.cancellableThreads.checkForCancel();
                SegmentFileTransferHandler.this.chunkWriter.writeFileChunk(request.md, request.position, request.content, request.lastChunk, translogOps.getAsInt(), ActionListener.runBefore(listener1, request::close));
            }

            @Override
            protected void handleError(StoreFileMetadata md, Exception e) throws Exception {
                SegmentFileTransferHandler.this.handleErrorOnSendFiles(store, e, new StoreFileMetadata[]{md});
            }

            @Override
            public void close() throws IOException {
                IOUtils.close(this.currentInput, () -> {
                    this.currentInput = null;
                });
            }
        };
    }

    public void handleErrorOnSendFiles(Store store, Exception e, StoreFileMetadata[] mds) throws Exception {
        IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(e);
        assert (Transports.assertNotTransportThread(this + "[handle error on send/clean files]"));
        if (corruptIndexException != null) {
            IOException localException = null;
            for (StoreFileMetadata md : mds) {
                this.cancellableThreads.checkForCancel();
                this.logger.debug("checking integrity for file {} after remove corruption exception", (Object)md);
                if (store.checkIntegrityNoException(md)) continue;
                this.logger.warn("{} Corrupted file detected {} checksum mismatch", (Object)this.shard.shardId(), (Object)md);
                if (localException == null) {
                    localException = corruptIndexException;
                }
                this.shard.failShard("error sending files", corruptIndexException);
            }
            if (localException != null) {
                throw localException;
            }
            RemoteTransportException remoteException = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null);
            remoteException.addSuppressed(e);
            this.logger.warn(() -> new ParameterizedMessage("{} Remote file corruption on node {}, recovering {}. local checksum OK", new Object[]{this.shard.shardId(), this.targetNode, mds}), (Throwable)corruptIndexException);
            throw remoteException;
        }
        throw e;
    }

    public static final class FileChunk
    implements MultiChunkTransfer.ChunkRequest,
    Releasable {
        final StoreFileMetadata md;
        final BytesReference content;
        final long position;
        final boolean lastChunk;
        final Releasable onClose;

        FileChunk(StoreFileMetadata md, BytesReference content, long position, boolean lastChunk, Releasable onClose) {
            this.md = md;
            this.content = content;
            this.position = position;
            this.lastChunk = lastChunk;
            this.onClose = onClose;
        }

        @Override
        public boolean lastChunk() {
            return this.lastChunk;
        }

        @Override
        public void close() {
            this.onClose.close();
        }
    }
}

