/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import org.apache.flink.hadoop.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.hadoop.shaded.com.google.common.cache.CacheBuilder;
import org.apache.flink.hadoop.shaded.com.google.common.cache.CacheLoader;
import org.apache.flink.hadoop.shaded.com.google.common.cache.LoadingCache;
import org.apache.flink.hadoop.shaded.com.google.common.cache.RemovalListener;
import org.apache.flink.hadoop.shaded.com.google.common.cache.RemovalNotification;
import org.apache.flink.hadoop.shaded.com.google.common.collect.ImmutableCollection;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSClientFaultInjector;
import org.apache.hadoop.hdfs.DFSPacket;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.util.ByteArrayManager;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time;
import org.apache.htrace.core.Span;
import org.apache.htrace.core.SpanId;
import org.apache.htrace.core.TraceScope;
import org.apache.htrace.core.Tracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
class DataStreamer
extends Daemon {
    static final Logger LOG = LoggerFactory.getLogger(DataStreamer.class);
    private volatile boolean streamerClosed = false;
    private final BlockToWrite block;
    private Token<BlockTokenIdentifier> accessToken;
    private DataOutputStream blockStream;
    private DataInputStream blockReplyStream;
    private ResponseProcessor response = null;
    private volatile DatanodeInfo[] nodes = null;
    private volatile StorageType[] storageTypes = null;
    private volatile String[] storageIDs = null;
    private final ErrorState errorState;
    private BlockConstructionStage stage;
    private long bytesSent = 0L;
    private final boolean isLazyPersistFile;
    private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>();
    private List<DatanodeInfo> restartingNodes = new ArrayList<DatanodeInfo>();
    private volatile int pipelineRecoveryCount = 0;
    private boolean isHflushed = false;
    private final boolean isAppend;
    private long currentSeqno = 0L;
    private long lastQueuedSeqno = -1L;
    private long lastAckedSeqno = -1L;
    private long bytesCurBlock = 0L;
    private final LastExceptionInStreamer lastException = new LastExceptionInStreamer();
    private Socket s;
    private final DFSClient dfsClient;
    private final String src;
    private final DataChecksum checksum4WriteBlock;
    private final Progressable progress;
    private final HdfsFileStatus stat;
    private volatile boolean appendChunk = false;
    private final LinkedList<DFSPacket> dataQueue = new LinkedList();
    private final Map<Long, Long> packetSendTime = new HashMap<Long, Long>();
    private final LinkedList<DFSPacket> ackQueue = new LinkedList();
    private final AtomicReference<CachingStrategy> cachingStrategy;
    private final ByteArrayManager byteArrayManager;
    private final AtomicBoolean persistBlocks = new AtomicBoolean(false);
    private boolean failPacket = false;
    private final long dfsclientSlowLogThresholdMs;
    private long artificialSlowdown = 0L;
    private final List<DatanodeInfo> congestedNodes = new ArrayList<DatanodeInfo>();
    private static final int CONGESTION_BACKOFF_MEAN_TIME_IN_MS = 5000;
    private static final int CONGESTION_BACK_OFF_MAX_TIME_IN_MS = 50000;
    private int lastCongestionBackoffTime;
    private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes;
    private final String[] favoredNodes;
    private final EnumSet<AddBlockFlag> addBlockFlags;

    static Socket createSocketForPipeline(DatanodeInfo first, int length, DFSClient client) throws IOException {
        DfsClientConf conf = client.getConf();
        String dnAddr = first.getXferAddr(conf.isConnectToDnViaHostname());
        LOG.debug("Connecting to datanode {}", (Object)dnAddr);
        InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr);
        Socket sock = client.socketFactory.createSocket();
        int timeout = client.getDatanodeReadTimeout(length);
        NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), conf.getSocketTimeout());
        sock.setTcpNoDelay(conf.getDataTransferTcpNoDelay());
        sock.setSoTimeout(timeout);
        sock.setKeepAlive(true);
        if (conf.getSocketSendBufferSize() > 0) {
            sock.setSendBufferSize(conf.getSocketSendBufferSize());
        }
        LOG.debug("Send buf size {}", (Object)sock.getSendBufferSize());
        return sock;
    }

    static boolean isLazyPersist(HdfsFileStatus stat) {
        return stat.getStoragePolicy() == 15;
    }

    private static void releaseBuffer(List<DFSPacket> packets, ByteArrayManager bam) {
        for (DFSPacket p : packets) {
            p.releaseBuffer(bam);
        }
        packets.clear();
    }

    private DataStreamer(HdfsFileStatus stat, ExtendedBlock block, DFSClient dfsClient, String src, Progressable progress, DataChecksum checksum, AtomicReference<CachingStrategy> cachingStrategy, ByteArrayManager byteArrayManage, boolean isAppend, String[] favoredNodes, EnumSet<AddBlockFlag> flags) {
        this.block = new BlockToWrite(block);
        this.dfsClient = dfsClient;
        this.src = src;
        this.progress = progress;
        this.stat = stat;
        this.checksum4WriteBlock = checksum;
        this.cachingStrategy = cachingStrategy;
        this.byteArrayManager = byteArrayManage;
        this.isLazyPersistFile = DataStreamer.isLazyPersist(stat);
        this.isAppend = isAppend;
        this.favoredNodes = favoredNodes;
        DfsClientConf conf = dfsClient.getConf();
        this.dfsclientSlowLogThresholdMs = conf.getSlowIoWarningThresholdMs();
        this.excludedNodes = DataStreamer.initExcludedNodes(conf.getExcludedNodesCacheExpiry());
        this.errorState = new ErrorState(conf.getDatanodeRestartTimeout());
        this.addBlockFlags = flags;
    }

    DataStreamer(HdfsFileStatus stat, ExtendedBlock block, DFSClient dfsClient, String src, Progressable progress, DataChecksum checksum, AtomicReference<CachingStrategy> cachingStrategy, ByteArrayManager byteArrayManage, String[] favoredNodes, EnumSet<AddBlockFlag> flags) {
        this(stat, block, dfsClient, src, progress, checksum, cachingStrategy, byteArrayManage, false, favoredNodes, flags);
        this.stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
    }

    DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat, DFSClient dfsClient, String src, Progressable progress, DataChecksum checksum, AtomicReference<CachingStrategy> cachingStrategy, ByteArrayManager byteArrayManage) throws IOException {
        this(stat, lastBlock.getBlock(), dfsClient, src, progress, checksum, cachingStrategy, byteArrayManage, true, null, null);
        this.stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
        this.bytesSent = this.block.getNumBytes();
        this.accessToken = lastBlock.getBlockToken();
    }

    void setPipelineInConstruction(LocatedBlock lastBlock) throws IOException {
        this.setPipeline(lastBlock);
        if (this.nodes.length < 1) {
            throw new IOException("Unable to retrieve blocks locations  for last block " + this.block + " of file " + this.src);
        }
    }

    private void setPipeline(LocatedBlock lb) {
        this.setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
    }

    private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes, String[] storageIDs) {
        this.nodes = nodes;
        this.storageTypes = storageTypes;
        this.storageIDs = storageIDs;
    }

    private void initDataStreaming() {
        this.setName("DataStreamer for file " + this.src + " block " + this.block);
        this.response = new ResponseProcessor(this.nodes);
        this.response.start();
        this.stage = BlockConstructionStage.DATA_STREAMING;
    }

    private void endBlock() {
        LOG.debug("Closing old block {}", (Object)this.block);
        this.setName("DataStreamer for file " + this.src);
        this.closeResponder();
        this.closeStream();
        this.setPipeline(null, null, null);
        this.stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
    }

    private boolean shouldStop() {
        return this.streamerClosed || this.errorState.hasError() || !this.dfsClient.clientRunning;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        long lastPacket = Time.monotonicNow();
        TraceScope scope = null;
        while (!this.streamerClosed && this.dfsClient.clientRunning) {
            if (this.errorState.hasError() && this.response != null) {
                try {
                    this.response.close();
                    this.response.join();
                    this.response = null;
                }
                catch (InterruptedException e) {
                    LOG.warn("Caught exception", (Throwable)e);
                }
            }
            try {
                DFSPacket one;
                boolean doSleep = this.processDatanodeError();
                int halfSocketTimeout = this.dfsClient.getConf().getSocketTimeout() / 2;
                LinkedList<DFSPacket> linkedList = this.dataQueue;
                synchronized (linkedList) {
                    block67: {
                        long now = Time.monotonicNow();
                        while (!this.shouldStop() && this.dataQueue.size() == 0 && (this.stage != BlockConstructionStage.DATA_STREAMING || this.stage == BlockConstructionStage.DATA_STREAMING && now - lastPacket < (long)halfSocketTimeout) || doSleep) {
                            long timeout = (long)halfSocketTimeout - (now - lastPacket);
                            timeout = timeout <= 0L ? 1000L : timeout;
                            timeout = this.stage == BlockConstructionStage.DATA_STREAMING ? timeout : 1000L;
                            try {
                                this.dataQueue.wait(timeout);
                            }
                            catch (InterruptedException e) {
                                LOG.warn("Caught exception", (Throwable)e);
                            }
                            doSleep = false;
                            now = Time.monotonicNow();
                        }
                        if (!this.shouldStop()) break block67;
                        continue;
                    }
                    if (this.dataQueue.isEmpty()) {
                        one = this.createHeartbeatPacket();
                    } else {
                        try {
                            this.backOffIfNecessary();
                        }
                        catch (InterruptedException e) {
                            LOG.warn("Caught exception", (Throwable)e);
                        }
                        one = this.dataQueue.getFirst();
                        SpanId[] parents = one.getTraceParents();
                        if (parents.length > 0) {
                            scope = this.dfsClient.getTracer().newScope("dataStreamer", parents[0]);
                            scope.getSpan().setParents(parents);
                        }
                    }
                }
                if (this.stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
                    LOG.debug("Allocating new block");
                    this.setPipeline(this.nextBlockOutputStream());
                    this.initDataStreaming();
                } else if (this.stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
                    LOG.debug("Append to block {}", (Object)this.block);
                    this.setupPipelineForAppendOrRecovery();
                    if (this.streamerClosed) continue;
                    this.initDataStreaming();
                }
                long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
                if (lastByteOffsetInBlock > this.stat.getBlockSize()) {
                    throw new IOException("BlockSize " + this.stat.getBlockSize() + " is smaller than data size. " + " Offset of packet in block " + lastByteOffsetInBlock + " Aborting file " + this.src);
                }
                if (one.isLastPacketInBlock()) {
                    LinkedList<DFSPacket> linkedList2 = this.dataQueue;
                    synchronized (linkedList2) {
                        while (!this.shouldStop() && this.ackQueue.size() != 0) {
                            try {
                                this.dataQueue.wait(1000L);
                            }
                            catch (InterruptedException e) {
                                LOG.warn("Caught exception", (Throwable)e);
                            }
                        }
                    }
                    if (this.shouldStop()) continue;
                    this.stage = BlockConstructionStage.PIPELINE_CLOSE;
                }
                SpanId spanId = SpanId.INVALID;
                LinkedList<DFSPacket> e = this.dataQueue;
                synchronized (e) {
                    if (!one.isHeartbeatPacket()) {
                        if (scope != null) {
                            spanId = scope.getSpanId();
                            scope.detach();
                            one.setTraceScope(scope);
                        }
                        scope = null;
                        this.dataQueue.removeFirst();
                        this.ackQueue.addLast(one);
                        this.packetSendTime.put(one.getSeqno(), Time.monotonicNow());
                        this.dataQueue.notifyAll();
                    }
                }
                LOG.debug("DataStreamer block {} sending packet {}", (Object)this.block, (Object)one);
                try (TraceScope ignored = this.dfsClient.getTracer().newScope("DataStreamer#writeTo", spanId);){
                    one.writeTo(this.blockStream);
                    this.blockStream.flush();
                }
                catch (IOException e2) {
                    this.errorState.markFirstNodeIfNotMarked();
                    throw e2;
                }
                lastPacket = Time.monotonicNow();
                long tmpBytesSent = one.getLastByteOffsetBlock();
                if (this.bytesSent < tmpBytesSent) {
                    this.bytesSent = tmpBytesSent;
                }
                if (this.shouldStop()) continue;
                if (one.isLastPacketInBlock()) {
                    LinkedList<DFSPacket> linkedList3 = this.dataQueue;
                    synchronized (linkedList3) {
                        while (!this.shouldStop() && this.ackQueue.size() != 0) {
                            this.dataQueue.wait(1000L);
                        }
                    }
                    if (this.shouldStop()) continue;
                    this.endBlock();
                }
                if (this.progress != null) {
                    this.progress.progress();
                }
                if (this.artificialSlowdown == 0L || !this.dfsClient.clientRunning) continue;
                Thread.sleep(this.artificialSlowdown);
            }
            catch (Throwable e) {
                if (!this.errorState.isRestartingNode()) {
                    if (e instanceof QuotaExceededException) {
                        LOG.debug("DataStreamer Quota Exception", e);
                    } else {
                        LOG.warn("DataStreamer Exception", e);
                    }
                }
                this.lastException.set(e);
                assert (!(e instanceof NullPointerException));
                this.errorState.setError(true);
                if (this.errorState.isNodeMarked()) continue;
                this.streamerClosed = true;
            }
            finally {
                if (scope == null) continue;
                scope.close();
                scope = null;
            }
        }
        this.closeInternal();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeInternal() {
        this.closeResponder();
        this.closeStream();
        this.streamerClosed = true;
        this.release();
        LinkedList<DFSPacket> linkedList = this.dataQueue;
        synchronized (linkedList) {
            this.dataQueue.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void release() {
        LinkedList<DFSPacket> linkedList = this.dataQueue;
        synchronized (linkedList) {
            DataStreamer.releaseBuffer(this.dataQueue, this.byteArrayManager);
            DataStreamer.releaseBuffer(this.ackQueue, this.byteArrayManager);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void waitForAckedSeqno(long seqno) throws IOException {
        try (TraceScope ignored = this.dfsClient.getTracer().newScope("waitForAckedSeqno");){
            LOG.debug("Waiting for ack for: {}", (Object)seqno);
            long begin = Time.monotonicNow();
            try {
                LinkedList<DFSPacket> linkedList = this.dataQueue;
                synchronized (linkedList) {
                    while (!this.streamerClosed) {
                        this.checkClosed();
                        if (this.lastAckedSeqno >= seqno) break;
                        try {
                            this.dataQueue.wait(1000L);
                        }
                        catch (InterruptedException ie) {
                            throw new InterruptedIOException("Interrupted while waiting for data to be acknowledged by pipeline");
                        }
                    }
                }
                this.checkClosed();
            }
            catch (ClosedChannelException closedChannelException) {
                // empty catch block
            }
            long duration = Time.monotonicNow() - begin;
            if (duration > this.dfsclientSlowLogThresholdMs) {
                LOG.warn("Slow waitForAckedSeqno took " + duration + "ms (threshold=" + this.dfsclientSlowLogThresholdMs + "ms)");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void waitAndQueuePacket(DFSPacket packet) throws IOException {
        LinkedList<DFSPacket> linkedList = this.dataQueue;
        synchronized (linkedList) {
            try {
                Span span;
                boolean firstWait = true;
                try {
                    while (!this.streamerClosed && this.dataQueue.size() + this.ackQueue.size() > this.dfsClient.getConf().getWriteMaxPackets()) {
                        if (firstWait) {
                            span = Tracer.getCurrentSpan();
                            if (span != null) {
                                span.addTimelineAnnotation("dataQueue.wait");
                            }
                            firstWait = false;
                        }
                        try {
                            this.dataQueue.wait();
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
                finally {
                    span = Tracer.getCurrentSpan();
                    if (span != null && !firstWait) {
                        span.addTimelineAnnotation("end.wait");
                    }
                }
                this.checkClosed();
                this.queuePacket(packet);
            }
            catch (ClosedChannelException closedChannelException) {
                // empty catch block
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void close(boolean force) {
        this.streamerClosed = true;
        LinkedList<DFSPacket> linkedList = this.dataQueue;
        synchronized (linkedList) {
            this.dataQueue.notifyAll();
        }
        if (force) {
            this.interrupt();
        }
    }

    private void checkClosed() throws IOException {
        if (this.streamerClosed) {
            this.lastException.throwException4Close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeResponder() {
        if (this.response != null) {
            try {
                this.response.close();
                this.response.join();
            }
            catch (InterruptedException e) {
                LOG.warn("Caught exception", (Throwable)e);
            }
            finally {
                this.response = null;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeStream() {
        IOException ioe;
        MultipleIOException.Builder b = new MultipleIOException.Builder();
        if (this.blockStream != null) {
            try {
                this.blockStream.close();
            }
            catch (IOException e) {
                b.add(e);
            }
            finally {
                this.blockStream = null;
            }
        }
        if (this.blockReplyStream != null) {
            try {
                this.blockReplyStream.close();
            }
            catch (IOException e) {
                b.add(e);
            }
            finally {
                this.blockReplyStream = null;
            }
        }
        if (null != this.s) {
            try {
                this.s.close();
            }
            catch (IOException e) {
                b.add(e);
            }
            finally {
                this.s = null;
            }
        }
        if ((ioe = b.build()) != null) {
            this.lastException.set(ioe);
        }
    }

    boolean shouldWaitForRestart(int index) {
        InetAddress addr;
        block4: {
            if (this.nodes.length == 1) {
                return true;
            }
            if (DFSClientFaultInjector.get().skipRollingRestartWait()) {
                return false;
            }
            addr = null;
            try {
                addr = InetAddress.getByName(this.nodes[index].getIpAddr());
            }
            catch (UnknownHostException e) {
                if ($assertionsDisabled) break block4;
                throw new AssertionError();
            }
        }
        return addr != null && NetUtils.isLocalAddress(addr);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean processDatanodeError() throws IOException {
        if (!this.errorState.hasDatanodeError()) {
            return false;
        }
        if (this.response != null) {
            LOG.info("Error Recovery for " + this.block + " waiting for responder to exit. ");
            return true;
        }
        this.closeStream();
        LinkedList<DFSPacket> linkedList = this.dataQueue;
        synchronized (linkedList) {
            this.dataQueue.addAll(0, this.ackQueue);
            this.ackQueue.clear();
            this.packetSendTime.clear();
        }
        if (!this.errorState.isRestartingNode() && ++this.pipelineRecoveryCount > 5) {
            LOG.warn("Error recovering pipeline for writing " + this.block + ". Already retried 5 times for the same packet.");
            this.lastException.set(new IOException("Failing write. Tried pipeline recovery 5 times without success."));
            this.streamerClosed = true;
            return false;
        }
        boolean doSleep = this.setupPipelineForAppendOrRecovery();
        if (!this.streamerClosed && this.dfsClient.clientRunning) {
            if (this.stage == BlockConstructionStage.PIPELINE_CLOSE) {
                LinkedList<DFSPacket> linkedList2 = this.dataQueue;
                synchronized (linkedList2) {
                    DFSPacket endOfBlockPacket = this.dataQueue.remove();
                    TraceScope scope = endOfBlockPacket.getTraceScope();
                    if (scope != null) {
                        scope.reattach();
                        scope.close();
                        endOfBlockPacket.setTraceScope(null);
                    }
                    assert (endOfBlockPacket.isLastPacketInBlock());
                    assert (this.lastAckedSeqno == endOfBlockPacket.getSeqno() - 1L);
                    this.lastAckedSeqno = endOfBlockPacket.getSeqno();
                    this.pipelineRecoveryCount = 0;
                    this.dataQueue.notifyAll();
                }
                this.endBlock();
            } else {
                this.initDataStreaming();
            }
        }
        return doSleep;
    }

    void setHflush() {
        this.isHflushed = true;
    }

    private int findNewDatanode(DatanodeInfo[] original) throws IOException {
        if (this.nodes.length != original.length + 1) {
            throw new IOException("Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being available to try. (Nodes: current=" + Arrays.asList(this.nodes) + ", original=" + Arrays.asList(original) + "). " + "The current failed datanode replacement policy is " + this.dfsClient.dtpReplaceDatanodeOnFailure + ", and a client may configure this via '" + "dfs.client.block.write.replace-datanode-on-failure.policy" + "' in its configuration.");
        }
        for (int i = 0; i < this.nodes.length; ++i) {
            int j;
            for (j = 0; j < original.length && !this.nodes[i].equals(original[j]); ++j) {
            }
            if (j != original.length) continue;
            return i;
        }
        throw new IOException("Failed: new datanode not found: nodes=" + Arrays.asList(this.nodes) + ", original=" + Arrays.asList(original));
    }

    private void addDatanode2ExistingPipeline() throws IOException {
        DataTransferProtocol.LOG.debug("lastAckedSeqno = {}", (Object)this.lastAckedSeqno);
        if (!this.isAppend && this.lastAckedSeqno < 0L && this.stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
            return;
        }
        if (this.stage == BlockConstructionStage.PIPELINE_CLOSE || this.stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
            return;
        }
        DatanodeInfo[] original = this.nodes;
        StorageType[] originalTypes = this.storageTypes;
        String[] originalIDs = this.storageIDs;
        IOException caughtException = null;
        ArrayList<DatanodeInfo> exclude = new ArrayList<DatanodeInfo>(this.failed);
        for (int tried = 0; tried < 3; ++tried) {
            int d;
            LocatedBlock lb = this.dfsClient.namenode.getAdditionalDatanode(this.src, this.stat.getFileId(), this.block.getCurrentBlock(), this.nodes, this.storageIDs, exclude.toArray(new DatanodeInfo[exclude.size()]), 1, this.dfsClient.clientName);
            this.setPipeline(lb);
            try {
                d = this.findNewDatanode(original);
            }
            catch (IOException ioe) {
                if (this.dfsClient.dtpReplaceDatanodeOnFailureReplication > 0 && this.nodes.length >= this.dfsClient.dtpReplaceDatanodeOnFailureReplication) {
                    DFSClient.LOG.warn("Failed to find a new datanode to add to the write pipeline,  continue to write to the pipeline with " + this.nodes.length + " nodes since it's no less than minimum replication: " + this.dfsClient.dtpReplaceDatanodeOnFailureReplication + " configured by " + "dfs.client.block.write.replace-datanode-on-failure.min-replication" + ".", (Throwable)ioe);
                    return;
                }
                throw ioe;
            }
            DatanodeInfo src = original[tried % original.length];
            DatanodeInfo[] targets = new DatanodeInfo[]{this.nodes[d]};
            StorageType[] targetStorageTypes = new StorageType[]{this.storageTypes[d]};
            try {
                this.transfer(src, targets, targetStorageTypes, lb.getBlockToken());
            }
            catch (IOException ioe) {
                DFSClient.LOG.warn("Error transferring data from " + src + " to " + this.nodes[d] + ": " + ioe.getMessage());
                caughtException = ioe;
                exclude.add(this.nodes[d]);
                this.setPipeline(original, originalTypes, originalIDs);
                continue;
            }
            return;
        }
        throw caughtException != null ? caughtException : new IOException("Failed to add a node");
    }

    private long computeTransferWriteTimeout() {
        return this.dfsClient.getDatanodeWriteTimeout(2);
    }

    private long computeTransferReadTimeout() {
        int multi = 2 + (int)(this.bytesSent / (long)this.dfsClient.getConf().getWritePacketSize()) / 200;
        return this.dfsClient.getDatanodeReadTimeout(multi);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void transfer(DatanodeInfo src, DatanodeInfo[] targets, StorageType[] targetStorageTypes, Token<BlockTokenIdentifier> blockToken) throws IOException {
        RefetchEncryptionKeyPolicy policy = new RefetchEncryptionKeyPolicy(src);
        do {
            StreamerStreams streams = null;
            try {
                long writeTimeout = this.computeTransferWriteTimeout();
                long readTimeout = this.computeTransferReadTimeout();
                streams = new StreamerStreams(src, writeTimeout, readTimeout, blockToken);
                streams.sendTransferBlock(targets, targetStorageTypes, blockToken);
                IOUtils.closeStream(streams);
                return;
            }
            catch (InvalidEncryptionKeyException e) {
                policy.recordFailure(e);
            }
            finally {
                IOUtils.closeStream(streams);
            }
        } while (policy.continueRetryingOrThrow());
    }

    private boolean setupPipelineForAppendOrRecovery() throws IOException {
        if (this.nodes == null || this.nodes.length == 0) {
            String msg = "Could not get block locations. Source file \"" + this.src + "\" - Aborting...";
            LOG.warn(msg);
            this.lastException.set(new IOException(msg));
            this.streamerClosed = true;
            return false;
        }
        boolean success = false;
        long newGS = 0L;
        while (!success && !this.streamerClosed && this.dfsClient.clientRunning) {
            if (!this.handleRestartingDatanode()) {
                return false;
            }
            boolean isRecovery = this.errorState.hasError();
            if (!this.handleBadDatanode()) {
                return false;
            }
            this.handleDatanodeReplacement();
            LocatedBlock lb = this.updateBlockForPipeline();
            newGS = lb.getBlock().getGenerationStamp();
            this.accessToken = lb.getBlockToken();
            success = this.createBlockOutputStream(this.nodes, this.storageTypes, newGS, isRecovery);
            this.failPacket4Testing();
            this.errorState.checkRestartingNodeDeadline(this.nodes);
        }
        if (success) {
            this.updatePipeline(newGS);
        }
        return false;
    }

    private boolean handleRestartingDatanode() {
        if (this.errorState.isRestartingNode()) {
            if (!this.errorState.doWaitForRestart()) {
                this.errorState.setBadNodeIndex(this.errorState.getRestartingNodeIndex());
                return true;
            }
            long delay = Math.min(this.errorState.datanodeRestartTimeout, 4000L);
            try {
                Thread.sleep(delay);
            }
            catch (InterruptedException ie) {
                this.lastException.set(new IOException("Interrupted while waiting for restarting " + this.nodes[this.errorState.getRestartingNodeIndex()]));
                this.streamerClosed = true;
                return false;
            }
        }
        return true;
    }

    private boolean handleBadDatanode() {
        int badNodeIndex = this.errorState.getBadNodeIndex();
        if (badNodeIndex >= 0) {
            if (this.nodes.length <= 1) {
                this.lastException.set(new IOException("All datanodes " + Arrays.toString(this.nodes) + " are bad. Aborting..."));
                this.streamerClosed = true;
                return false;
            }
            String reason = "bad.";
            if (this.errorState.getRestartingNodeIndex() == badNodeIndex) {
                reason = "restarting.";
                this.restartingNodes.add(this.nodes[badNodeIndex]);
            }
            LOG.warn("Error Recovery for " + this.block + " in pipeline " + Arrays.toString(this.nodes) + ": datanode " + badNodeIndex + "(" + this.nodes[badNodeIndex] + ") is " + reason);
            this.failed.add(this.nodes[badNodeIndex]);
            DatanodeInfo[] newnodes = new DatanodeInfo[this.nodes.length - 1];
            DataStreamer.arraycopy(this.nodes, newnodes, badNodeIndex);
            StorageType[] newStorageTypes = new StorageType[newnodes.length];
            DataStreamer.arraycopy(this.storageTypes, newStorageTypes, badNodeIndex);
            String[] newStorageIDs = new String[newnodes.length];
            DataStreamer.arraycopy(this.storageIDs, newStorageIDs, badNodeIndex);
            this.setPipeline(newnodes, newStorageTypes, newStorageIDs);
            this.errorState.adjustState4RestartingNode();
            this.lastException.clear();
        }
        return true;
    }

    private void handleDatanodeReplacement() throws IOException {
        if (this.dfsClient.dtpReplaceDatanodeOnFailure.satisfy(this.stat.getReplication(), this.nodes, this.isAppend, this.isHflushed)) {
            try {
                this.addDatanode2ExistingPipeline();
            }
            catch (IOException ioe) {
                if (!this.dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) {
                    throw ioe;
                }
                LOG.warn("Failed to replace datanode. Continue with the remaining datanodes since dfs.client.block.write.replace-datanode-on-failure.best-effort is set to true.", (Throwable)ioe);
            }
        }
    }

    private void failPacket4Testing() {
        if (this.failPacket) {
            this.failPacket = false;
            try {
                Thread.sleep(2000L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    private LocatedBlock updateBlockForPipeline() throws IOException {
        return this.dfsClient.namenode.updateBlockForPipeline(this.block.getCurrentBlock(), this.dfsClient.clientName);
    }

    void updateBlockGS(long newGS) {
        this.block.setGenerationStamp(newGS);
    }

    @VisibleForTesting
    void updatePipeline(long newGS) throws IOException {
        ExtendedBlock oldBlock = this.block.getCurrentBlock();
        this.updateBlockGS(newGS);
        this.dfsClient.namenode.updatePipeline(this.dfsClient.clientName, oldBlock, this.block.getCurrentBlock(), this.nodes, this.storageIDs);
    }

    DatanodeInfo[] getExcludedNodes() {
        return ((ImmutableCollection)((Object)this.excludedNodes.getAllPresent(this.excludedNodes.asMap().keySet()).keySet())).toArray(new DatanodeInfo[0]);
    }

    protected LocatedBlock nextBlockOutputStream() throws IOException {
        LocatedBlock lb;
        boolean success;
        int count = this.dfsClient.getConf().getNumBlockWriteRetry();
        ExtendedBlock oldBlock = this.block.getCurrentBlock();
        do {
            this.errorState.reset();
            this.lastException.clear();
            success = false;
            DatanodeInfo[] excluded = this.getExcludedNodes();
            lb = this.locateFollowingBlock((DatanodeInfo[])(excluded.length > 0 ? excluded : null), oldBlock);
            this.block.setCurrentBlock(lb.getBlock());
            this.block.setNumBytes(0L);
            this.bytesSent = 0L;
            this.accessToken = lb.getBlockToken();
            DatanodeInfo[] nodes = lb.getLocations();
            StorageType[] storageTypes = lb.getStorageTypes();
            success = this.createBlockOutputStream(nodes, storageTypes, 0L, false);
            if (success) continue;
            LOG.warn("Abandoning " + this.block);
            this.dfsClient.namenode.abandonBlock(this.block.getCurrentBlock(), this.stat.getFileId(), this.src, this.dfsClient.clientName);
            this.block.setCurrentBlock(null);
            DatanodeInfo badNode = nodes[this.errorState.getBadNodeIndex()];
            LOG.warn("Excluding datanode " + badNode);
            this.excludedNodes.put(badNode, badNode);
        } while (!success && --count >= 0);
        if (!success) {
            throw new IOException("Unable to create new block.");
        }
        return lb;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean createBlockOutputStream(DatanodeInfo[] nodes, StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) {
        boolean result;
        if (nodes.length == 0) {
            LOG.info("nodes are empty for write pipeline of " + this.block);
            return false;
        }
        String firstBadLink = "";
        boolean checkRestart = false;
        if (LOG.isDebugEnabled()) {
            LOG.debug("pipeline = " + Arrays.asList(nodes));
        }
        this.persistBlocks.set(true);
        int refetchEncryptionKey = 1;
        while (true) {
            result = false;
            DataOutputStream out = null;
            try {
                assert (null == this.s) : "Previous socket unclosed";
                assert (null == this.blockReplyStream) : "Previous blockReplyStream unclosed";
                this.s = DataStreamer.createSocketForPipeline(nodes[0], nodes.length, this.dfsClient);
                long writeTimeout = this.dfsClient.getDatanodeWriteTimeout(nodes.length);
                long readTimeout = this.dfsClient.getDatanodeReadTimeout(nodes.length);
                OutputStream unbufOut = NetUtils.getOutputStream(this.s, writeTimeout);
                InputStream unbufIn = NetUtils.getInputStream(this.s, readTimeout);
                IOStreamPair saslStreams = this.dfsClient.saslClient.socketSend(this.s, unbufOut, unbufIn, this.dfsClient, this.accessToken, nodes[0]);
                unbufOut = saslStreams.out;
                unbufIn = saslStreams.in;
                out = new DataOutputStream(new BufferedOutputStream(unbufOut, DFSUtilClient.getSmallBufferSize(this.dfsClient.getConfiguration())));
                this.blockReplyStream = new DataInputStream(unbufIn);
                BlockConstructionStage bcs = recoveryFlag ? this.stage.getRecoveryStage() : this.stage;
                ExtendedBlock blockCopy = this.block.getCurrentBlock();
                blockCopy.setNumBytes(this.stat.getBlockSize());
                boolean[] targetPinnings = this.getPinnings(nodes);
                new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], this.accessToken, this.dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, nodes.length, this.block.getNumBytes(), this.bytesSent, newGS, this.checksum4WriteBlock, this.cachingStrategy.get(), this.isLazyPersistFile, targetPinnings != null && targetPinnings[0], targetPinnings);
                DataTransferProtos.BlockOpResponseProto resp = DataTransferProtos.BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(this.blockReplyStream));
                DataTransferProtos.Status pipelineStatus = resp.getStatus();
                firstBadLink = resp.getFirstBadLink();
                if (PipelineAck.isRestartOOBStatus(pipelineStatus) && !this.errorState.isRestartingNode()) {
                    checkRestart = true;
                    throw new IOException("A datanode is restarting.");
                }
                String logInfo = "ack with firstBadLink as " + firstBadLink;
                DataTransferProtoUtil.checkBlockOpStatus(resp, logInfo);
                assert (null == this.blockStream) : "Previous blockStream unclosed";
                this.blockStream = out;
                result = true;
                this.errorState.reset();
                this.failed.removeAll(this.restartingNodes);
                this.restartingNodes.clear();
                if (result) break;
            }
            catch (IOException ie) {
                int i;
                block19: {
                    block18: {
                        if (!this.errorState.isRestartingNode()) {
                            LOG.info("Exception in createBlockOutputStream", (Throwable)ie);
                        }
                        if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
                            LOG.info("Will fetch a new encryption key and retry, encryption key was invalid when connecting to " + nodes[0] + " : " + ie);
                            --refetchEncryptionKey;
                            this.dfsClient.clearDataEncryptionKey();
                            continue;
                        }
                        if (firstBadLink.length() == 0) break block18;
                        for (i = 0; i < nodes.length; ++i) {
                            if (!firstBadLink.equals(nodes[i].getXferAddr())) continue;
                            this.errorState.setBadNodeIndex(i);
                            break block19;
                        }
                        break block19;
                    }
                    assert (!checkRestart);
                    this.errorState.setBadNodeIndex(0);
                }
                i = this.errorState.getBadNodeIndex();
                if (checkRestart) {
                    this.errorState.initRestartingNode(i, "Datanode " + i + " is restarting: " + nodes[i], this.shouldWaitForRestart(i));
                }
                this.errorState.setError(true);
                this.lastException.set(ie);
                result = false;
                break;
            }
            finally {
                if (result) continue;
                IOUtils.closeSocket(this.s);
                this.s = null;
                IOUtils.closeStream(out);
                IOUtils.closeStream(this.blockReplyStream);
                this.blockReplyStream = null;
                continue;
            }
            IOUtils.closeSocket(this.s);
            this.s = null;
            IOUtils.closeStream(out);
            IOUtils.closeStream(this.blockReplyStream);
            this.blockReplyStream = null;
            break;
        }
        return result;
    }

    private boolean[] getPinnings(DatanodeInfo[] nodes) {
        if (this.favoredNodes == null) {
            return null;
        }
        boolean[] pinnings = new boolean[nodes.length];
        HashSet<String> favoredSet = new HashSet<String>(Arrays.asList(this.favoredNodes));
        for (int i = 0; i < nodes.length; ++i) {
            pinnings[i] = favoredSet.remove(nodes[i].getXferAddrWithHostname());
            LOG.debug("{} was chosen by name node (favored={}).", (Object)nodes[i].getXferAddrWithHostname(), (Object)pinnings[i]);
        }
        if (!favoredSet.isEmpty()) {
            LOG.warn("These favored nodes were specified but not chosen: " + favoredSet + " Specified favored nodes: " + Arrays.toString(this.favoredNodes));
        }
        return pinnings;
    }

    protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excluded, ExtendedBlock oldBlock) throws IOException {
        DfsClientConf conf = this.dfsClient.getConf();
        int retries = conf.getNumBlockWriteLocateFollowingRetry();
        long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
        long localstart = Time.monotonicNow();
        while (true) {
            try {
                return this.dfsClient.namenode.addBlock(this.src, this.dfsClient.clientName, oldBlock, excluded, this.stat.getFileId(), this.favoredNodes, this.addBlockFlags);
            }
            catch (RemoteException e) {
                IOException ue = e.unwrapRemoteException(FileNotFoundException.class, AccessControlException.class, NSQuotaExceededException.class, DSQuotaExceededException.class, QuotaByStorageTypeExceededException.class, UnresolvedPathException.class);
                if (ue != e) {
                    throw ue;
                }
                if (NotReplicatedYetException.class.getName().equals(e.getClassName())) {
                    if (retries == 0) {
                        throw e;
                    }
                    --retries;
                    LOG.info("Exception while adding a block", (Throwable)e);
                    long elapsed = Time.monotonicNow() - localstart;
                    if (elapsed > 5000L) {
                        LOG.info("Waiting for replication for " + elapsed / 1000L + " seconds");
                    }
                    try {
                        LOG.warn("NotReplicatedYetException sleeping " + this.src + " retries left " + retries);
                        Thread.sleep(sleeptime);
                        sleeptime *= 2L;
                    }
                    catch (InterruptedException ie) {
                        LOG.warn("Caught exception", (Throwable)ie);
                    }
                    continue;
                }
                throw e;
            }
            break;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void backOffIfNecessary() throws InterruptedException {
        int t = 0;
        List<DatanodeInfo> list = this.congestedNodes;
        synchronized (list) {
            if (!this.congestedNodes.isEmpty()) {
                StringBuilder sb = new StringBuilder("DataNode");
                for (DatanodeInfo i : this.congestedNodes) {
                    sb.append(' ').append(i);
                }
                int range = Math.abs(this.lastCongestionBackoffTime * 3 - 5000);
                int base = Math.min(this.lastCongestionBackoffTime * 3, 5000);
                this.lastCongestionBackoffTime = t = Math.min(50000, (int)((double)base + Math.random() * (double)range));
                sb.append(" are congested. Backing off for ").append(t).append(" ms");
                LOG.info(sb.toString());
                this.congestedNodes.clear();
            }
        }
        if (t != 0) {
            Thread.sleep(t);
        }
    }

    ExtendedBlock getBlock() {
        return this.block.getCurrentBlock();
    }

    DatanodeInfo[] getNodes() {
        return this.nodes;
    }

    Token<BlockTokenIdentifier> getBlockToken() {
        return this.accessToken;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void queuePacket(DFSPacket packet) {
        LinkedList<DFSPacket> linkedList = this.dataQueue;
        synchronized (linkedList) {
            if (packet == null) {
                return;
            }
            packet.addTraceParent(Tracer.getCurrentSpanId());
            this.dataQueue.addLast(packet);
            this.lastQueuedSeqno = packet.getSeqno();
            LOG.debug("Queued packet {}", (Object)packet.getSeqno());
            this.dataQueue.notifyAll();
        }
    }

    private DFSPacket createHeartbeatPacket() {
        byte[] buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
        return new DFSPacket(buf, 0, 0L, -1L, 0, false);
    }

    private static LoadingCache<DatanodeInfo, DatanodeInfo> initExcludedNodes(long excludedNodesCacheExpiry) {
        return CacheBuilder.newBuilder().expireAfterWrite(excludedNodesCacheExpiry, TimeUnit.MILLISECONDS).removalListener(new RemovalListener<DatanodeInfo, DatanodeInfo>(){

            @Override
            public void onRemoval(@Nonnull RemovalNotification<DatanodeInfo, DatanodeInfo> notification) {
                LOG.info("Removing node " + notification.getKey() + " from the excluded nodes list");
            }
        }).build(new CacheLoader<DatanodeInfo, DatanodeInfo>(){

            @Override
            public DatanodeInfo load(DatanodeInfo key) throws Exception {
                return key;
            }
        });
    }

    private static <T> void arraycopy(T[] srcs, T[] dsts, int skipIndex) {
        System.arraycopy(srcs, 0, dsts, 0, skipIndex);
        System.arraycopy(srcs, skipIndex + 1, dsts, skipIndex, dsts.length - skipIndex);
    }

    AtomicBoolean getPersistBlocks() {
        return this.persistBlocks;
    }

    void setAppendChunk(boolean appendChunk) {
        this.appendChunk = appendChunk;
    }

    boolean getAppendChunk() {
        return this.appendChunk;
    }

    LastExceptionInStreamer getLastException() {
        return this.lastException;
    }

    void setSocketToNull() {
        this.s = null;
    }

    long getAndIncCurrentSeqno() {
        long old = this.currentSeqno++;
        return old;
    }

    long getLastQueuedSeqno() {
        return this.lastQueuedSeqno;
    }

    long getBytesCurBlock() {
        return this.bytesCurBlock;
    }

    void setBytesCurBlock(long bytesCurBlock) {
        this.bytesCurBlock = bytesCurBlock;
    }

    void incBytesCurBlock(long len) {
        this.bytesCurBlock += len;
    }

    void setArtificialSlowdown(long period) {
        this.artificialSlowdown = period;
    }

    boolean streamerClosed() {
        return this.streamerClosed;
    }

    @VisibleForTesting
    int getPipelineRecoveryCount() {
        return this.pipelineRecoveryCount;
    }

    void closeSocket() throws IOException {
        if (this.s != null) {
            this.s.close();
        }
    }

    @Override
    public String toString() {
        ExtendedBlock extendedBlock = this.block.getCurrentBlock();
        return (extendedBlock == null ? null : extendedBlock.getLocalBlock()) + "@" + Arrays.toString(this.getNodes());
    }

    private class ResponseProcessor
    extends Daemon {
        private volatile boolean responderClosed = false;
        private DatanodeInfo[] targets = null;
        private boolean isLastPacketInBlock = false;

        ResponseProcessor(DatanodeInfo[] targets) {
            this.targets = targets;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            this.setName("ResponseProcessor for block " + DataStreamer.this.block);
            PipelineAck ack = new PipelineAck();
            TraceScope scope = null;
            while (!this.responderClosed && ((DataStreamer)DataStreamer.this).dfsClient.clientRunning && !this.isLastPacketInBlock) {
                try {
                    DFSPacket one;
                    List i2;
                    long duration;
                    Long begin;
                    ack.readFields(DataStreamer.this.blockReplyStream);
                    if (ack.getSeqno() != -1L && (begin = (Long)DataStreamer.this.packetSendTime.get(ack.getSeqno())) != null && (duration = Time.monotonicNow() - begin) > DataStreamer.this.dfsclientSlowLogThresholdMs) {
                        LOG.info("Slow ReadProcessor read fields for block " + DataStreamer.this.block + " took " + duration + "ms (threshold=" + DataStreamer.this.dfsclientSlowLogThresholdMs + "ms); ack: " + ack + ", targets: " + Arrays.asList(this.targets));
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("DFSClient {}", (Object)ack);
                    }
                    long seqno = ack.getSeqno();
                    ArrayList<DatanodeInfo> congestedNodesFromAck = new ArrayList<DatanodeInfo>();
                    for (int i2 = ack.getNumOfReplies() - 1; i2 >= 0 && ((DataStreamer)DataStreamer.this).dfsClient.clientRunning; --i2) {
                        DataTransferProtos.Status reply = PipelineAck.getStatusFromHeader(ack.getHeaderFlag(i2));
                        if (PipelineAck.getECNFromHeader(ack.getHeaderFlag(i2)) == PipelineAck.ECN.CONGESTED) {
                            congestedNodesFromAck.add(this.targets[i2]);
                        }
                        if (PipelineAck.isRestartOOBStatus(reply)) {
                            String message = "Datanode " + i2 + " is restarting: " + this.targets[i2];
                            DataStreamer.this.errorState.initRestartingNode(i2, message, DataStreamer.this.shouldWaitForRestart(i2));
                            throw new IOException(message);
                        }
                        if (reply == DataTransferProtos.Status.SUCCESS) continue;
                        DataStreamer.this.errorState.setBadNodeIndex(i2);
                        throw new IOException("Bad response " + reply + " for " + DataStreamer.this.block + " from datanode " + this.targets[i2]);
                    }
                    if (!congestedNodesFromAck.isEmpty()) {
                        i2 = DataStreamer.this.congestedNodes;
                        synchronized (i2) {
                            DataStreamer.this.congestedNodes.clear();
                            DataStreamer.this.congestedNodes.addAll(congestedNodesFromAck);
                        }
                    }
                    i2 = DataStreamer.this.congestedNodes;
                    synchronized (i2) {
                        DataStreamer.this.congestedNodes.clear();
                        DataStreamer.this.lastCongestionBackoffTime = 0;
                    }
                    assert (seqno != -2L) : "Ack for unknown seqno should be a failed ack: " + ack;
                    if (seqno == -1L) continue;
                    LinkedList linkedList = DataStreamer.this.dataQueue;
                    synchronized (linkedList) {
                        one = (DFSPacket)DataStreamer.this.ackQueue.getFirst();
                    }
                    if (one.getSeqno() != seqno) {
                        throw new IOException("ResponseProcessor: Expecting seqno  for block " + DataStreamer.this.block + one.getSeqno() + " but received " + seqno);
                    }
                    this.isLastPacketInBlock = one.isLastPacketInBlock();
                    if (DFSClientFaultInjector.get().failPacket() && this.isLastPacketInBlock) {
                        DataStreamer.this.failPacket = true;
                        throw new IOException("Failing the last packet for testing.");
                    }
                    DataStreamer.this.block.setNumBytes(one.getLastByteOffsetBlock());
                    linkedList = DataStreamer.this.dataQueue;
                    synchronized (linkedList) {
                        scope = one.getTraceScope();
                        if (scope != null) {
                            scope.reattach();
                            one.setTraceScope(null);
                        }
                        DataStreamer.this.lastAckedSeqno = seqno;
                        DataStreamer.this.pipelineRecoveryCount = 0;
                        DataStreamer.this.ackQueue.removeFirst();
                        DataStreamer.this.packetSendTime.remove(seqno);
                        DataStreamer.this.dataQueue.notifyAll();
                        one.releaseBuffer(DataStreamer.this.byteArrayManager);
                    }
                }
                catch (Exception e) {
                    if (this.responderClosed) continue;
                    DataStreamer.this.lastException.set(e);
                    DataStreamer.this.errorState.setError(true);
                    DataStreamer.this.errorState.markFirstNodeIfNotMarked();
                    LinkedList linkedList = DataStreamer.this.dataQueue;
                    synchronized (linkedList) {
                        DataStreamer.this.dataQueue.notifyAll();
                    }
                    if (!DataStreamer.this.errorState.isRestartingNode()) {
                        LOG.warn("Exception for " + DataStreamer.this.block, (Throwable)e);
                    }
                    this.responderClosed = true;
                }
                finally {
                    if (scope != null) {
                        scope.close();
                    }
                    scope = null;
                }
            }
        }

        void close() {
            this.responderClosed = true;
            this.interrupt();
        }
    }

    static class ErrorState {
        private boolean error = false;
        private int badNodeIndex = -1;
        private boolean waitForRestart = true;
        private int restartingNodeIndex = -1;
        private long restartingNodeDeadline = 0L;
        private final long datanodeRestartTimeout;

        ErrorState(long datanodeRestartTimeout) {
            this.datanodeRestartTimeout = datanodeRestartTimeout;
        }

        synchronized void reset() {
            this.error = false;
            this.badNodeIndex = -1;
            this.restartingNodeIndex = -1;
            this.restartingNodeDeadline = 0L;
            this.waitForRestart = true;
        }

        synchronized boolean hasError() {
            return this.error;
        }

        synchronized boolean hasDatanodeError() {
            return this.error && this.isNodeMarked();
        }

        synchronized void setError(boolean err) {
            this.error = err;
        }

        synchronized void setBadNodeIndex(int index) {
            this.badNodeIndex = index;
        }

        synchronized int getBadNodeIndex() {
            return this.badNodeIndex;
        }

        synchronized int getRestartingNodeIndex() {
            return this.restartingNodeIndex;
        }

        synchronized void initRestartingNode(int i, String message, boolean shouldWait) {
            this.restartingNodeIndex = i;
            if (shouldWait) {
                this.restartingNodeDeadline = Time.monotonicNow() + this.datanodeRestartTimeout;
                this.badNodeIndex = -1;
            } else {
                this.waitForRestart = false;
            }
            LOG.info(message);
        }

        synchronized boolean isRestartingNode() {
            return this.restartingNodeIndex >= 0;
        }

        synchronized boolean isNodeMarked() {
            return this.badNodeIndex >= 0 || this.isRestartingNode() && this.doWaitForRestart();
        }

        synchronized void markFirstNodeIfNotMarked() {
            if (!this.isNodeMarked()) {
                this.badNodeIndex = 0;
            }
        }

        synchronized void adjustState4RestartingNode() {
            if (this.restartingNodeIndex >= 0) {
                if (this.badNodeIndex > this.restartingNodeIndex) {
                    this.restartingNodeIndex = -1;
                } else if (this.badNodeIndex < this.restartingNodeIndex) {
                    --this.restartingNodeIndex;
                } else if (this.waitForRestart) {
                    throw new IllegalStateException("badNodeIndex = " + this.badNodeIndex + " = restartingNodeIndex = " + this.restartingNodeIndex);
                }
            }
            if (!this.isRestartingNode()) {
                this.error = false;
            }
            this.badNodeIndex = -1;
        }

        synchronized void checkRestartingNodeDeadline(DatanodeInfo[] nodes) {
            if (this.restartingNodeIndex >= 0) {
                if (!this.error) {
                    throw new IllegalStateException("error=false while checking restarting node deadline");
                }
                if (this.badNodeIndex == this.restartingNodeIndex) {
                    this.badNodeIndex = -1;
                }
                if (Time.monotonicNow() >= this.restartingNodeDeadline) {
                    this.restartingNodeDeadline = 0L;
                    int i = this.restartingNodeIndex;
                    this.restartingNodeIndex = -1;
                    LOG.warn("Datanode " + i + " did not restart within " + this.datanodeRestartTimeout + "ms: " + nodes[i]);
                    if (this.badNodeIndex == -1) {
                        this.badNodeIndex = i;
                    }
                }
            }
        }

        boolean doWaitForRestart() {
            return this.waitForRestart;
        }
    }

    static class LastExceptionInStreamer {
        private IOException thrown;

        LastExceptionInStreamer() {
        }

        synchronized void set(Throwable t) {
            assert (t != null);
            this.thrown = t instanceof IOException ? (IOException)t : new IOException(t);
        }

        synchronized void clear() {
            this.thrown = null;
        }

        synchronized void check(boolean resetToNull) throws IOException {
            if (this.thrown != null) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Got Exception while checking", new Throwable(this.thrown));
                }
                IOException e = this.thrown;
                if (resetToNull) {
                    this.thrown = null;
                }
                throw e;
            }
        }

        synchronized void throwException4Close() throws IOException {
            this.check(false);
            throw new ClosedChannelException();
        }
    }

    static class BlockToWrite {
        private ExtendedBlock currentBlock;

        BlockToWrite(ExtendedBlock block) {
            this.setCurrentBlock(block);
        }

        synchronized ExtendedBlock getCurrentBlock() {
            return this.currentBlock == null ? null : new ExtendedBlock(this.currentBlock);
        }

        synchronized long getNumBytes() {
            return this.currentBlock == null ? 0L : this.currentBlock.getNumBytes();
        }

        synchronized void setCurrentBlock(ExtendedBlock block) {
            this.currentBlock = block == null || block.getLocalBlock() == null ? null : new ExtendedBlock(block);
        }

        synchronized void setNumBytes(long numBytes) {
            assert (this.currentBlock != null);
            this.currentBlock.setNumBytes(numBytes);
        }

        synchronized void setGenerationStamp(long generationStamp) {
            assert (this.currentBlock != null);
            this.currentBlock.setGenerationStamp(generationStamp);
        }

        public synchronized String toString() {
            return this.currentBlock == null ? "null" : this.currentBlock.toString();
        }
    }

    private class StreamerStreams
    implements Closeable {
        private Socket sock = null;
        private DataOutputStream out = null;
        private DataInputStream in = null;

        StreamerStreams(DatanodeInfo src, long writeTimeout, long readTimeout, Token<BlockTokenIdentifier> blockToken) throws IOException {
            this.sock = DataStreamer.createSocketForPipeline(src, 2, DataStreamer.this.dfsClient);
            OutputStream unbufOut = NetUtils.getOutputStream(this.sock, writeTimeout);
            InputStream unbufIn = NetUtils.getInputStream(this.sock, readTimeout);
            IOStreamPair saslStreams = ((DataStreamer)DataStreamer.this).dfsClient.saslClient.socketSend(this.sock, unbufOut, unbufIn, DataStreamer.this.dfsClient, blockToken, src);
            unbufOut = saslStreams.out;
            unbufIn = saslStreams.in;
            this.out = new DataOutputStream(new BufferedOutputStream(unbufOut, DFSUtilClient.getSmallBufferSize(DataStreamer.this.dfsClient.getConfiguration())));
            this.in = new DataInputStream(unbufIn);
        }

        void sendTransferBlock(DatanodeInfo[] targets, StorageType[] targetStorageTypes, Token<BlockTokenIdentifier> blockToken) throws IOException {
            new Sender(this.out).transferBlock(DataStreamer.this.block.getCurrentBlock(), blockToken, ((DataStreamer)DataStreamer.this).dfsClient.clientName, targets, targetStorageTypes);
            this.out.flush();
            DataTransferProtos.BlockOpResponseProto transferResponse = DataTransferProtos.BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(this.in));
            if (DataTransferProtos.Status.SUCCESS != transferResponse.getStatus()) {
                throw new IOException("Failed to add a datanode. Response status: " + transferResponse.getStatus());
            }
        }

        @Override
        public void close() throws IOException {
            IOUtils.closeStream(this.in);
            IOUtils.closeStream(this.out);
            IOUtils.closeSocket(this.sock);
        }
    }

    private class RefetchEncryptionKeyPolicy {
        private int fetchEncryptionKeyTimes = 0;
        private InvalidEncryptionKeyException lastException;
        private final DatanodeInfo src;

        RefetchEncryptionKeyPolicy(DatanodeInfo src) {
            this.src = src;
        }

        boolean continueRetryingOrThrow() throws InvalidEncryptionKeyException {
            if (this.fetchEncryptionKeyTimes >= 2) {
                throw this.lastException;
            }
            LOG.info("Will fetch a new encryption key and retry, encryption key was invalid when connecting to " + this.src + ": ", (Throwable)this.lastException);
            DataStreamer.this.dfsClient.clearDataEncryptionKey();
            return true;
        }

        void recordFailure(InvalidEncryptionKeyException e) throws InvalidEncryptionKeyException {
            ++this.fetchEncryptionKeyTimes;
            this.lastException = e;
        }
    }
}

