/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.shortcircuit;

import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.hadoop.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.hadoop.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.net.DomainPeer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShm;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.DomainSocketWatcher;

@InterfaceAudience.Private
public class DfsClientShmManager
implements Closeable {
    private static final Log LOG = LogFactory.getLog(DfsClientShmManager.class);
    private boolean closed = false;
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition finishedLoading = this.lock.newCondition();
    private final HashMap<DatanodeInfo, EndpointShmManager> datanodes = new HashMap(1);
    private final DomainSocketWatcher domainSocketWatcher;

    DfsClientShmManager(int interruptCheckPeriodMs) throws IOException {
        this.domainSocketWatcher = new DomainSocketWatcher(interruptCheckPeriodMs, "client");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ShortCircuitShm.Slot allocSlot(DatanodeInfo datanode, DomainPeer peer, MutableBoolean usedPeer, ExtendedBlockId blockId, String clientName) throws IOException {
        this.lock.lock();
        try {
            if (this.closed) {
                LOG.trace((Object)(this + ": the DfsClientShmManager isclosed."));
                ShortCircuitShm.Slot slot = null;
                return slot;
            }
            EndpointShmManager shmManager = this.datanodes.get(datanode);
            if (shmManager == null) {
                shmManager = new EndpointShmManager(datanode);
                this.datanodes.put(datanode, shmManager);
            }
            ShortCircuitShm.Slot slot = shmManager.allocSlot(peer, usedPeer, clientName, blockId);
            return slot;
        }
        finally {
            this.lock.unlock();
        }
    }

    public void freeSlot(ShortCircuitShm.Slot slot) {
        this.lock.lock();
        try {
            DfsClientShm shm = (DfsClientShm)slot.getShm();
            shm.getEndpointShmManager().freeSlot(slot);
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public void visit(Visitor visitor) throws IOException {
        this.lock.lock();
        try {
            HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info = new HashMap<DatanodeInfo, PerDatanodeVisitorInfo>();
            for (Map.Entry<DatanodeInfo, EndpointShmManager> entry : this.datanodes.entrySet()) {
                info.put(entry.getKey(), entry.getValue().getVisitorInfo());
            }
            visitor.visit(info);
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public void close() throws IOException {
        this.lock.lock();
        try {
            if (this.closed) {
                return;
            }
            this.closed = true;
        }
        finally {
            this.lock.unlock();
        }
        IOUtils.cleanup(LOG, this.domainSocketWatcher);
    }

    public String toString() {
        return String.format("ShortCircuitShmManager(%08x)", System.identityHashCode(this));
    }

    @VisibleForTesting
    public DomainSocketWatcher getDomainSocketWatcher() {
        return this.domainSocketWatcher;
    }

    @VisibleForTesting
    public static interface Visitor {
        public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> var1) throws IOException;
    }

    @VisibleForTesting
    public static class PerDatanodeVisitorInfo {
        public final TreeMap<ShortCircuitShm.ShmId, DfsClientShm> full;
        public final TreeMap<ShortCircuitShm.ShmId, DfsClientShm> notFull;
        public final boolean disabled;

        PerDatanodeVisitorInfo(TreeMap<ShortCircuitShm.ShmId, DfsClientShm> full, TreeMap<ShortCircuitShm.ShmId, DfsClientShm> notFull, boolean disabled) {
            this.full = full;
            this.notFull = notFull;
            this.disabled = disabled;
        }
    }

    class EndpointShmManager {
        private final DatanodeInfo datanode;
        private final TreeMap<ShortCircuitShm.ShmId, DfsClientShm> full = new TreeMap();
        private final TreeMap<ShortCircuitShm.ShmId, DfsClientShm> notFull = new TreeMap();
        private boolean disabled = false;
        private boolean loading = false;

        EndpointShmManager(DatanodeInfo datanode) {
            this.datanode = datanode;
        }

        private ShortCircuitShm.Slot allocSlotFromExistingShm(ExtendedBlockId blockId) {
            if (this.notFull.isEmpty()) {
                return null;
            }
            Map.Entry<ShortCircuitShm.ShmId, DfsClientShm> entry = this.notFull.firstEntry();
            DfsClientShm shm = entry.getValue();
            ShortCircuitShm.ShmId shmId = shm.getShmId();
            ShortCircuitShm.Slot slot = shm.allocAndRegisterSlot(blockId);
            if (shm.isFull()) {
                DfsClientShm removedShm;
                if (LOG.isTraceEnabled()) {
                    LOG.trace((Object)(this + ": pulled the last slot " + slot.getSlotIdx() + " out of " + shm));
                }
                Preconditions.checkState((removedShm = this.notFull.remove(shmId)) == shm);
                this.full.put(shmId, shm);
            } else if (LOG.isTraceEnabled()) {
                LOG.trace((Object)(this + ": pulled slot " + slot.getSlotIdx() + " out of " + shm));
            }
            return slot;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private DfsClientShm requestNewShm(String clientName, DomainPeer peer) throws IOException {
            DataOutputStream out = new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
            new Sender(out).requestShortCircuitShm(clientName);
            DataTransferProtos.ShortCircuitShmResponseProto resp = DataTransferProtos.ShortCircuitShmResponseProto.parseFrom(PBHelper.vintPrefixed(peer.getInputStream()));
            String error = resp.hasError() ? resp.getError() : "(unknown)";
            switch (resp.getStatus()) {
                case SUCCESS: {
                    DfsClientShm dfsClientShm;
                    DomainSocket sock = peer.getDomainSocket();
                    byte[] buf = new byte[1];
                    FileInputStream[] fis = new FileInputStream[1];
                    if (sock.recvFileInputStreams(fis, buf, 0, buf.length) < 0) {
                        throw new EOFException("got EOF while trying to transfer the file descriptor for the shared memory segment.");
                    }
                    if (fis[0] == null) {
                        throw new IOException("the datanode " + this.datanode + " failed to " + "pass a file descriptor for the shared memory segment.");
                    }
                    try {
                        DfsClientShm shm = new DfsClientShm(PBHelper.convert(resp.getId()), fis[0], this, peer);
                        if (LOG.isTraceEnabled()) {
                            LOG.trace((Object)(this + ": createNewShm: created " + shm));
                        }
                        dfsClientShm = shm;
                    }
                    catch (Throwable throwable) {
                        IOUtils.cleanup(LOG, fis[0]);
                        throw throwable;
                    }
                    IOUtils.cleanup(LOG, fis[0]);
                    return dfsClientShm;
                }
                case ERROR_UNSUPPORTED: {
                    LOG.info((Object)(this + ": datanode does not support short-circuit " + "shared memory access: " + error));
                    this.disabled = true;
                    return null;
                }
            }
            LOG.warn((Object)(this + ": error requesting short-circuit shared memory " + "access: " + error));
            return null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        ShortCircuitShm.Slot allocSlot(DomainPeer peer, MutableBoolean usedPeer, String clientName, ExtendedBlockId blockId) throws IOException {
            while (true) {
                DfsClientShm shm;
                if (DfsClientShmManager.this.closed) {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace((Object)(this + ": the DfsClientShmManager has been closed."));
                    }
                    return null;
                }
                if (this.disabled) {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace((Object)(this + ": shared memory segment access is disabled."));
                    }
                    return null;
                }
                ShortCircuitShm.Slot slot = this.allocSlotFromExistingShm(blockId);
                if (slot != null) {
                    return slot;
                }
                if (this.loading) {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace((Object)(this + ": waiting for loading to finish..."));
                    }
                    DfsClientShmManager.this.finishedLoading.awaitUninterruptibly();
                    continue;
                }
                this.loading = true;
                DfsClientShmManager.this.lock.unlock();
                try {
                    shm = this.requestNewShm(clientName, peer);
                    if (shm == null) continue;
                    DfsClientShmManager.this.domainSocketWatcher.add(peer.getDomainSocket(), shm);
                    usedPeer.setValue(true);
                }
                finally {
                    DfsClientShmManager.this.lock.lock();
                    this.loading = false;
                    DfsClientShmManager.this.finishedLoading.signalAll();
                    continue;
                }
                if (shm.isDisconnected()) {
                    if (!LOG.isDebugEnabled()) continue;
                    LOG.debug((Object)(this + ": the UNIX domain socket associated with " + "this short-circuit memory closed before we could make " + "use of the shm."));
                    continue;
                }
                this.notFull.put(shm.getShmId(), shm);
            }
        }

        void freeSlot(ShortCircuitShm.Slot slot) {
            DfsClientShm shm = (DfsClientShm)slot.getShm();
            shm.unregisterSlot(slot.getSlotIdx());
            if (shm.isDisconnected()) {
                Preconditions.checkState(!this.full.containsKey(shm.getShmId()));
                Preconditions.checkState(!this.notFull.containsKey(shm.getShmId()));
                if (shm.isEmpty()) {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace((Object)(this + ": freeing empty stale " + shm));
                    }
                    shm.free();
                }
            } else {
                ShortCircuitShm.ShmId shmId = shm.getShmId();
                this.full.remove(shmId);
                if (shm.isEmpty()) {
                    this.notFull.remove(shmId);
                    if (LOG.isTraceEnabled()) {
                        LOG.trace((Object)(this + ": shutting down UNIX domain socket for " + "empty " + shm));
                    }
                    this.shutdown(shm);
                } else {
                    this.notFull.put(shmId, shm);
                }
            }
        }

        void unregisterShm(ShortCircuitShm.ShmId shmId) {
            DfsClientShmManager.this.lock.lock();
            try {
                this.full.remove(shmId);
                this.notFull.remove(shmId);
            }
            finally {
                DfsClientShmManager.this.lock.unlock();
            }
        }

        public String toString() {
            return String.format("EndpointShmManager(%s, parent=%s)", this.datanode, DfsClientShmManager.this);
        }

        PerDatanodeVisitorInfo getVisitorInfo() {
            return new PerDatanodeVisitorInfo(this.full, this.notFull, this.disabled);
        }

        final void shutdown(DfsClientShm shm) {
            try {
                shm.getPeer().getDomainSocket().shutdown();
            }
            catch (IOException e) {
                LOG.warn((Object)(this + ": error shutting down shm: got IOException calling " + "shutdown(SHUT_RDWR)"), (Throwable)e);
            }
        }
    }
}

