/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap.shufflehandler;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.cache.Weigher;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import io.netty.util.NetUtil;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import javax.crypto.SecretKey;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.llap.shufflehandler.AttemptRegistrationListener;
import org.apache.hadoop.hive.llap.shufflehandler.DirWatcher;
import org.apache.hadoop.hive.llap.shufflehandler.FadvisedChunkedFile;
import org.apache.hadoop.hive.llap.shufflehandler.FadvisedFileRegion;
import org.apache.hadoop.hive.llap.shufflehandler.IndexCache;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.ReadaheadPool;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ShuffleHandler
implements AttemptRegistrationListener {
    private static final Logger LOG = LoggerFactory.getLogger(ShuffleHandler.class);
    public static final String SHUFFLE_HANDLER_LOCAL_DIRS = "llap.shuffle.handler.local-dirs";
    public static final String SHUFFLE_MANAGE_OS_CACHE = "llap.shuffle.manage.os.cache";
    public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
    public static final String SHUFFLE_OS_CACHE_ALWAYS_EVICT = "llap.shuffle.os.cache.always.evict";
    public static final boolean DEFAULT_SHUFFLE_OS_CACHE_ALWAYS_EVICT = false;
    public static final String SHUFFLE_READAHEAD_BYTES = "llap.shuffle.readahead.bytes";
    public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 0x400000;
    public static final String SHUFFLE_DIR_WATCHER_ENABLED = "llap.shuffle.dir-watcher.enabled";
    public static final boolean SHUFFLE_DIR_WATCHER_ENABLED_DEFAULT = false;
    private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile("^.*(?:connection.*reset|connection.*closed|broken.*pipe).*$", 2);
    private int port;
    private NioEventLoopGroup bossGroup;
    private NioEventLoopGroup workerGroup;
    private final ChannelGroup accepted = new DefaultChannelGroup((EventExecutor)GlobalEventExecutor.INSTANCE);
    private final int sslFileBufferSize;
    private Shuffle SHUFFLE;
    private SSLFactory sslFactory;
    private final Configuration conf;
    private final String[] localDirs;
    private final DirWatcher dirWatcher;
    private final boolean manageOsCache;
    private final boolean shouldAlwaysEvictOsCache;
    private final int readaheadLength;
    private final int maxShuffleConnections;
    private final int shuffleBufferSize;
    private final boolean shuffleTransferToAllowed;
    private final ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
    private final ConcurrentMap<String, Integer> registeredApps = new ConcurrentHashMap<String, Integer>();
    private final ConcurrentMap<String, Integer> registeredDirectories = new ConcurrentHashMap<String, Integer>();
    private final ConcurrentMap<String, String> userRsrc;
    private JobTokenSecretManager secretManager;
    public static final String SHUFFLE_PORT_CONFIG_KEY = "llap.shuffle.port";
    public static final int DEFAULT_SHUFFLE_PORT = 15551;
    public static final String SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED = "llap.shuffle.connection-keep-alive.enable";
    public static final boolean DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED = true;
    public static final String SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT = "llap.shuffle.connection-keep-alive.timeout";
    public static final int DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT = 5;
    public static final String SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE = "llap.shuffle.mapoutput-info.meta.cache.size";
    public static final int DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE = 10000;
    public static final String CONNECTION_CLOSE = "close";
    public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY = "llap.shuffle.ssl.file.buffer.size";
    public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 61440;
    public static final String MAX_SHUFFLE_CONNECTIONS = "llap.shuffle.max.connections";
    public static final int DEFAULT_MAX_SHUFFLE_CONNECTIONS = 0;
    public static final String MAX_SHUFFLE_THREADS = "llap.shuffle.max.threads";
    public static final int DEFAULT_MAX_SHUFFLE_THREADS = Runtime.getRuntime().availableProcessors() * 3;
    public static final String SHUFFLE_BUFFER_SIZE = "llap.shuffle.transfer.buffer.size";
    public static final int DEFAULT_SHUFFLE_BUFFER_SIZE = 131072;
    public static final String SHUFFLE_TRANSFERTO_ALLOWED = "llap.shuffle.transferTo.allowed";
    public static final boolean DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = true;
    static final String DATA_FILE_NAME = "file.out";
    static final String INDEX_FILE_NAME = "file.out.index";
    private static final AtomicBoolean started = new AtomicBoolean(false);
    private static final AtomicBoolean initing = new AtomicBoolean(false);
    private static ShuffleHandler INSTANCE;
    private static final String TIMEOUT_HANDLER = "timeout";
    final boolean connectionKeepAliveEnabled;
    final int connectionKeepAliveTimeOut;
    final int mapOutputMetaInfoCacheSize;
    private final LocalDirAllocator lDirAlloc = new LocalDirAllocator("llap.shuffle.handler.local-dirs");
    private final Shuffle shuffle;
    private static final String USERCACHE_CONSTANT = "usercache";
    private static final String APPCACHE_CONSTANT = "appcache";

    @Override
    public void registerAttemptDirs(AttemptPathIdentifier identifier, AttemptPathInfo pathInfo) {
        this.shuffle.registerAttemptDirs(identifier, pathInfo);
    }

    @VisibleForTesting
    ShuffleHandler(Configuration conf) {
        this.conf = conf;
        this.manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE, true);
        this.shouldAlwaysEvictOsCache = conf.getBoolean(SHUFFLE_OS_CACHE_ALWAYS_EVICT, false);
        this.readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES, 0x400000);
        this.maxShuffleConnections = conf.getInt(MAX_SHUFFLE_CONNECTIONS, 0);
        int maxShuffleThreads = conf.getInt(MAX_SHUFFLE_THREADS, DEFAULT_MAX_SHUFFLE_THREADS);
        if (maxShuffleThreads == 0) {
            maxShuffleThreads = 2 * Runtime.getRuntime().availableProcessors();
        }
        this.port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, 15551);
        this.localDirs = conf.getTrimmedStrings(SHUFFLE_HANDLER_LOCAL_DIRS);
        this.shuffleBufferSize = conf.getInt(SHUFFLE_BUFFER_SIZE, 131072);
        this.shuffleTransferToAllowed = conf.getBoolean(SHUFFLE_TRANSFERTO_ALLOWED, true);
        String BOSS_THREAD_NAME_PREFIX = "ShuffleHandler Netty Boss #";
        final AtomicInteger bossThreadCounter = new AtomicInteger(0);
        this.bossGroup = new NioEventLoopGroup(maxShuffleThreads, new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "ShuffleHandler Netty Boss #" + bossThreadCounter.incrementAndGet());
            }
        });
        String WORKER_THREAD_NAME_PREFIX = "ShuffleHandler Netty Worker #";
        final AtomicInteger workerThreadCounter = new AtomicInteger(0);
        this.workerGroup = new NioEventLoopGroup(maxShuffleThreads, new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "ShuffleHandler Netty Worker #" + workerThreadCounter.incrementAndGet());
            }
        });
        this.sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY, 61440);
        this.connectionKeepAliveEnabled = conf.getBoolean(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, true);
        this.connectionKeepAliveTimeOut = Math.max(1, conf.getInt(SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, 5));
        this.mapOutputMetaInfoCacheSize = Math.max(1, conf.getInt(SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE, 10000));
        this.userRsrc = new ConcurrentHashMap<String, String>();
        this.secretManager = new JobTokenSecretManager();
        this.shuffle = new Shuffle(conf);
        if (conf.getBoolean(SHUFFLE_DIR_WATCHER_ENABLED, false)) {
            LOG.info("Attempting to start dirWatcher");
            DirWatcher localDirWatcher = null;
            try {
                localDirWatcher = new DirWatcher(this);
            }
            catch (IOException e) {
                LOG.warn("Unable to start DirWatcher. Active scans disabled");
            }
            this.dirWatcher = localDirWatcher;
        } else {
            LOG.info("DirWatcher disabled by config");
            this.dirWatcher = null;
        }
        LOG.info("manageOsCache:{}, shouldAlwaysEvictOsCache:{}, readaheadLength:{}, maxShuffleConnections:{}, localDirs:{}, shuffleBufferSize:{}, shuffleTransferToAllowed:{}, connectionKeepAliveEnabled:{}, connectionKeepAliveTimeOut:{}, mapOutputMetaInfoCacheSize:{}, sslFileBufferSize:{}", new Object[]{this.manageOsCache, this.shouldAlwaysEvictOsCache, this.readaheadLength, this.maxShuffleConnections, this.localDirs, this.shuffleBufferSize, this.shuffleTransferToAllowed, this.connectionKeepAliveEnabled, this.connectionKeepAliveTimeOut, this.mapOutputMetaInfoCacheSize, this.sslFileBufferSize});
    }

    public void start() throws Exception {
        ServerBootstrap bootstrap = ((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)new ServerBootstrap().channel(NioServerSocketChannel.class)).group((EventLoopGroup)this.bossGroup, (EventLoopGroup)this.workerGroup).localAddress(this.port)).option(ChannelOption.SO_BACKLOG, (Object)NetUtil.SOMAXCONN)).childOption(ChannelOption.SO_KEEPALIVE, (Object)true);
        this.initPipeline(bootstrap, this.conf);
        Channel ch = bootstrap.bind().sync().channel();
        this.accepted.add((Object)ch);
        this.port = ((InetSocketAddress)ch.localAddress()).getPort();
        this.conf.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(this.port));
        this.SHUFFLE.setPort(this.port);
        if (this.dirWatcher != null) {
            this.dirWatcher.start();
        }
        LOG.info("LlapShuffleHandler listening on port {} (SOMAXCONN: {})", (Object)this.port, (Object)NetUtil.SOMAXCONN);
    }

    private void initPipeline(ServerBootstrap bootstrap, Configuration conf) throws Exception {
        this.SHUFFLE = this.getShuffle(conf);
        ChannelInitializer<NioSocketChannel> channelInitializer = new ChannelInitializer<NioSocketChannel>(){

            public void initChannel(NioSocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                if (ShuffleHandler.this.sslFactory != null) {
                    pipeline.addLast("ssl", (ChannelHandler)new SslHandler(ShuffleHandler.this.sslFactory.createSSLEngine()));
                }
                if (LOG.isDebugEnabled()) {
                    pipeline.addLast("loggingHandler", (ChannelHandler)new LoggingHandler(LogLevel.DEBUG));
                }
                pipeline.addLast("decoder", (ChannelHandler)new HttpRequestDecoder());
                pipeline.addLast("aggregator", (ChannelHandler)new HttpObjectAggregator(65536));
                pipeline.addLast("encoder", (ChannelHandler)new HttpResponseEncoder());
                pipeline.addLast("chunking", (ChannelHandler)new ChunkedWriteHandler());
                pipeline.addLast("shuffle", (ChannelHandler)ShuffleHandler.this.SHUFFLE);
                pipeline.addLast("idle", (ChannelHandler)new IdleStateHandler(0, ShuffleHandler.this.connectionKeepAliveTimeOut, 0));
                pipeline.addLast(ShuffleHandler.TIMEOUT_HANDLER, (ChannelHandler)new TimeoutHandler());
            }
        };
        bootstrap.childHandler((ChannelHandler)channelInitializer);
    }

    private void destroyPipeline() {
        if (this.sslFactory != null) {
            this.sslFactory.destroy();
        }
    }

    public static void initializeAndStart(Configuration conf) throws Exception {
        if (!initing.getAndSet(true)) {
            INSTANCE = new ShuffleHandler(conf);
            INSTANCE.start();
            started.set(true);
        }
    }

    public static void shutdown() throws Exception {
        if (INSTANCE != null) {
            INSTANCE.stop();
        }
    }

    public static ShuffleHandler get() {
        Preconditions.checkState((boolean)started.get(), (Object)"ShuffleHandler must be started before invoking get");
        return INSTANCE;
    }

    public static ByteBuffer serializeMetaData(int port) throws IOException {
        DataOutputBuffer portDob = new DataOutputBuffer();
        portDob.writeInt(port);
        ByteBuffer buf = ByteBuffer.wrap(portDob.getData(), 0, portDob.getLength());
        portDob.close();
        return buf;
    }

    public static int deserializeMetaData(ByteBuffer meta) throws IOException {
        DataInputByteBuffer in = new DataInputByteBuffer();
        in.reset(new ByteBuffer[]{meta});
        int port = in.readInt();
        return port;
    }

    public static ByteBuffer serializeServiceData(Token<JobTokenIdentifier> jobToken) throws IOException {
        DataOutputBuffer jobToken_dob = new DataOutputBuffer();
        jobToken.write((DataOutput)jobToken_dob);
        return ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength());
    }

    static Token<JobTokenIdentifier> deserializeServiceData(ByteBuffer secret) throws IOException {
        DataInputByteBuffer in = new DataInputByteBuffer();
        in.reset(new ByteBuffer[]{secret});
        Token jt = new Token();
        jt.readFields((DataInput)in);
        return jt;
    }

    public int getPort() {
        return this.port;
    }

    public boolean isDirWatcherEnabled() {
        return this.dirWatcher != null;
    }

    public void registerDag(String applicationIdString, int dagIdentifier, Token<JobTokenIdentifier> appToken, String user, String[] appDirs) {
        Integer registeredDagIdentifier = this.registeredApps.putIfAbsent(applicationIdString, dagIdentifier);
        if (registeredDagIdentifier == null && appToken != null) {
            this.recordJobShuffleInfo(applicationIdString, user, appToken);
        }
        if (registeredDagIdentifier != null && !registeredDagIdentifier.equals(dagIdentifier)) {
            this.registeredApps.put(applicationIdString, dagIdentifier);
        }
        if (appDirs == null) {
            return;
        }
        registeredDagIdentifier = this.registeredDirectories.put(applicationIdString, dagIdentifier);
        if (registeredDagIdentifier != null && !registeredDagIdentifier.equals(dagIdentifier)) {
            this.registeredDirectories.put(applicationIdString, dagIdentifier);
        }
        if (!(registeredDagIdentifier != null && registeredDagIdentifier.equals(dagIdentifier) || this.dirWatcher == null)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Registering watches for AppDirs: appId={}, dagId={}", (Object)applicationIdString, (Object)dagIdentifier);
            }
            for (String appDir : appDirs) {
                try {
                    this.dirWatcher.registerDagDir(appDir, applicationIdString, dagIdentifier, user, 300000L);
                }
                catch (IOException e) {
                    LOG.warn("Unable to register dir: " + appDir + " with watcher");
                }
            }
        }
    }

    public void unregisterDag(String dir, String applicationIdString, int dagIdentifier) {
        Integer currentDagIdentifier = (Integer)this.registeredApps.get(applicationIdString);
        if (currentDagIdentifier != null && currentDagIdentifier.equals(dagIdentifier)) {
            this.registeredApps.remove(applicationIdString);
            this.registeredDirectories.remove(applicationIdString);
            this.removeJobShuffleInfo(applicationIdString);
        }
        if (this.dirWatcher != null) {
            this.dirWatcher.unregisterDagDir(dir, applicationIdString, dagIdentifier);
        }
    }

    protected void stop() throws Exception {
        this.accepted.close().awaitUninterruptibly(10L, TimeUnit.SECONDS);
        if (this.bossGroup != null) {
            this.bossGroup.shutdownGracefully();
        }
        if (this.workerGroup != null) {
            this.workerGroup.shutdownGracefully();
        }
        this.destroyPipeline();
        if (this.dirWatcher != null) {
            this.dirWatcher.stop();
        }
    }

    @VisibleForTesting
    public Map getRegisteredApps() {
        return new HashMap<String, Integer>(this.registeredApps);
    }

    @VisibleForTesting
    public Map getRegisteredDirectories() {
        return new HashMap<String, Integer>(this.registeredDirectories);
    }

    protected Shuffle getShuffle(Configuration conf) {
        return this.shuffle;
    }

    private void addJobToken(String appIdString, String user, Token<JobTokenIdentifier> jobToken) {
        String jobIdString = appIdString.replace("application", "job");
        this.userRsrc.putIfAbsent(jobIdString, user);
        this.secretManager.addTokenForJob(jobIdString, jobToken);
        LOG.info("Added token for " + jobIdString);
    }

    private void recordJobShuffleInfo(String appIdString, String user, Token<JobTokenIdentifier> jobToken) {
        this.addJobToken(appIdString, user, jobToken);
    }

    private void removeJobShuffleInfo(String appIdString) {
        this.secretManager.removeTokenForJob(appIdString);
        this.userRsrc.remove(appIdString);
    }

    private static String getBaseLocation(String jobIdString, int dagId, String user) {
        String[] parts = jobIdString.split("_");
        Preconditions.checkArgument((parts.length == 3 ? 1 : 0) != 0, (Object)"Invalid jobId. Expecting 3 parts");
        ApplicationId appID = ApplicationId.newInstance((long)Long.parseLong(parts[1]), (int)Integer.parseInt(parts[2]));
        String baseStr = "usercache/" + user + "/" + APPCACHE_CONSTANT + "/" + ConverterUtils.toString((ApplicationId)appID) + "/" + dagId + "/output/";
        return baseStr;
    }

    static class AttemptPathIdentifier {
        private final String jobId;
        private final int dagId;
        private final String user;
        private final String attemptId;

        public AttemptPathIdentifier(String jobId, int dagId, String user, String attemptId) {
            this.jobId = jobId;
            this.dagId = dagId;
            this.user = user;
            this.attemptId = attemptId;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            AttemptPathIdentifier that = (AttemptPathIdentifier)o;
            if (this.dagId != that.dagId) {
                return false;
            }
            if (!this.jobId.equals(that.jobId)) {
                return false;
            }
            return this.attemptId.equals(that.attemptId);
        }

        public int hashCode() {
            int result = this.jobId.hashCode();
            result = 31 * result + this.dagId;
            result = 31 * result + this.attemptId.hashCode();
            return result;
        }

        public String toString() {
            return "AttemptPathIdentifier{jobId='" + this.jobId + '\'' + ", dagId=" + this.dagId + ", user='" + this.user + '\'' + ", attemptId='" + this.attemptId + '\'' + '}';
        }
    }

    static class AttemptPathInfo {
        private final Path indexPath;
        private final Path dataPath;

        public AttemptPathInfo(Path indexPath, Path dataPath) {
            this.indexPath = indexPath;
            this.dataPath = dataPath;
        }
    }

    @ChannelHandler.Sharable
    class Shuffle
    extends ChannelInboundHandlerAdapter {
        private final Configuration conf;
        private final IndexCache indexCache;
        private int port;
        private final LoadingCache<AttemptPathIdentifier, AttemptPathInfo> pathCache = CacheBuilder.newBuilder().expireAfterAccess(300L, TimeUnit.SECONDS).softValues().concurrencyLevel(16).removalListener((RemovalListener)new RemovalListener<AttemptPathIdentifier, AttemptPathInfo>(){

            public void onRemoval(RemovalNotification<AttemptPathIdentifier, AttemptPathInfo> notification) {
                LOG.debug("PathCacheEviction: " + notification.getKey() + ", Reason=" + notification.getCause());
            }
        }).maximumWeight(0xA00000L).weigher((Weigher)new Weigher<AttemptPathIdentifier, AttemptPathInfo>(){

            public int weigh(AttemptPathIdentifier key, AttemptPathInfo value) {
                return key.jobId.length() + key.user.length() + key.attemptId.length() + value.indexPath.toString().length() + value.dataPath.toString().length();
            }
        }).build((CacheLoader)new CacheLoader<AttemptPathIdentifier, AttemptPathInfo>(){

            public AttemptPathInfo load(AttemptPathIdentifier key) throws Exception {
                String base = ShuffleHandler.getBaseLocation(key.jobId, key.dagId, key.user);
                String attemptBase = base + key.attemptId;
                Path indexFileName = ShuffleHandler.this.lDirAlloc.getLocalPathToRead(attemptBase + "/" + ShuffleHandler.INDEX_FILE_NAME, Shuffle.this.conf);
                Path mapOutputFileName = ShuffleHandler.this.lDirAlloc.getLocalPathToRead(attemptBase + "/" + ShuffleHandler.DATA_FILE_NAME, Shuffle.this.conf);
                LOG.debug("Loaded : " + key + " via loader");
                if (ShuffleHandler.this.dirWatcher != null) {
                    ShuffleHandler.this.dirWatcher.attemptInfoFound(key);
                }
                return new AttemptPathInfo(indexFileName, mapOutputFileName);
            }
        });

        public Shuffle(Configuration conf) {
            this.conf = conf;
            this.indexCache = new IndexCache(conf);
            this.port = conf.getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 15551);
        }

        public void setPort(int port) {
            this.port = port;
        }

        void registerAttemptDirs(AttemptPathIdentifier identifier, AttemptPathInfo pathInfo) {
            LOG.debug("Registering " + identifier + " via watcher");
            this.pathCache.put((Object)identifier, (Object)pathInfo);
        }

        private List<String> splitMaps(List<String> mapq) {
            if (null == mapq) {
                return null;
            }
            ArrayList<String> ret = new ArrayList<String>();
            for (String s : mapq) {
                Collections.addAll(ret, s.split(","));
            }
            return ret;
        }

        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            if (ShuffleHandler.this.maxShuffleConnections > 0 && ShuffleHandler.this.accepted.size() >= ShuffleHandler.this.maxShuffleConnections) {
                LOG.info(String.format("Current number of shuffle connections (%d) is greater than or equal to the max allowed shuffle connections (%d)", ShuffleHandler.this.accepted.size(), ShuffleHandler.this.maxShuffleConnections));
                ctx.channel().close();
                return;
            }
            ShuffleHandler.this.accepted.add((Object)ctx.channel());
            super.channelActive(ctx);
        }

        public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception {
            FullHttpRequest request = (FullHttpRequest)message;
            this.handleRequest(ctx, request);
            request.release();
        }

        private void handleRequest(ChannelHandlerContext ctx, FullHttpRequest request) throws IOException {
            int dagId;
            String jobId;
            int reduceId;
            if (request.getMethod() != HttpMethod.GET) {
                this.sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
                return;
            }
            if (!"mapreduce".equals(request.headers().get("name")) || !"1.0.0".equals(request.headers().get("version"))) {
                this.sendError(ctx, "Incompatible shuffle request version", HttpResponseStatus.BAD_REQUEST);
            }
            Map q = new QueryStringDecoder(request.uri()).parameters();
            List keepAliveList = (List)q.get("keepAlive");
            boolean keepAliveParam = false;
            if (keepAliveList != null && keepAliveList.size() == 1) {
                keepAliveParam = Boolean.parseBoolean((String)keepAliveList.get(0));
                LOG.debug("KeepAliveParam : {} : {}", (Object)keepAliveList, (Object)keepAliveParam);
            }
            List<String> mapIds = this.splitMaps((List)q.get("map"));
            List reduceQ = (List)q.get("reduce");
            List jobQ = (List)q.get("job");
            List dagIdQ = (List)q.get("dag");
            if (LOG.isDebugEnabled()) {
                LOG.debug("RECV: " + request.uri() + "\n  mapId: " + mapIds + "\n  reduceId: " + reduceQ + "\n  jobId: " + jobQ + "\n  dagId: " + dagIdQ + "\n  keepAlive: " + keepAliveParam);
            }
            if (mapIds == null || reduceQ == null || jobQ == null | dagIdQ == null) {
                this.sendError(ctx, "Required param job, map and reduce", HttpResponseStatus.BAD_REQUEST);
                return;
            }
            if (reduceQ.size() != 1 || jobQ.size() != 1 || dagIdQ.size() != 1) {
                this.sendError(ctx, "Too many job/reduce parameters", HttpResponseStatus.BAD_REQUEST);
                return;
            }
            try {
                reduceId = Integer.parseInt((String)reduceQ.get(0));
                jobId = (String)jobQ.get(0);
                dagId = Integer.parseInt((String)dagIdQ.get(0));
            }
            catch (NumberFormatException e) {
                this.sendError(ctx, "Bad reduce parameter", HttpResponseStatus.BAD_REQUEST);
                return;
            }
            catch (IllegalArgumentException e) {
                this.sendError(ctx, "Bad job parameter", HttpResponseStatus.BAD_REQUEST);
                return;
            }
            String reqUri = request.uri();
            if (null == reqUri) {
                this.sendError(ctx, HttpResponseStatus.FORBIDDEN);
                return;
            }
            DefaultHttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
            try {
                this.verifyRequest(jobId, ctx, (HttpRequest)request, (HttpResponse)response, new URL("http", "", this.port, reqUri));
            }
            catch (IOException e) {
                LOG.warn("Shuffle failure ", (Throwable)e);
                this.sendError(ctx, e.getMessage(), HttpResponseStatus.UNAUTHORIZED);
                return;
            }
            HashMap<String, MapOutputInfo> mapOutputInfoMap = new HashMap<String, MapOutputInfo>();
            Channel ch = ctx.channel();
            ChannelPipeline pipeline = ch.pipeline();
            final TimeoutHandler timeoutHandler = (TimeoutHandler)pipeline.get(ShuffleHandler.TIMEOUT_HANDLER);
            timeoutHandler.setEnabledTimeout(false);
            String user = (String)ShuffleHandler.this.userRsrc.get(jobId);
            try {
                this.populateHeaders(mapIds, jobId, dagId, user, reduceId, (HttpResponse)response, keepAliveParam, mapOutputInfoMap);
            }
            catch (DiskChecker.DiskErrorException e) {
                LOG.error("Shuffle error in populating headers (fatal: DiskErrorException):", (Throwable)e);
                String errorMessage = this.getErrorMessage(e);
                this.sendFakeShuffleHeaderWithError(ctx, "DISK_ERROR_EXCEPTION: " + errorMessage, (HttpResponse)response);
                return;
            }
            catch (IOException e) {
                ch.write((Object)response);
                LOG.error("Shuffle error in populating headers :", (Throwable)e);
                String errorMessage = this.getErrorMessage(e);
                this.sendError(ctx, errorMessage, HttpResponseStatus.INTERNAL_SERVER_ERROR);
                return;
            }
            ch.write((Object)response);
            ChannelFuture lastMap = null;
            for (String mapId : mapIds) {
                try {
                    MapOutputInfo info = (MapOutputInfo)mapOutputInfoMap.get(mapId);
                    if (info == null) {
                        info = this.getMapOutputInfo(jobId, dagId, mapId, reduceId, user);
                    }
                    if (null != (lastMap = this.sendMapOutput(ctx, ch, user, mapId, reduceId, info))) continue;
                    this.sendError(ctx, HttpResponseStatus.NOT_FOUND);
                    return;
                }
                catch (IOException e) {
                    LOG.error("Shuffle error :", (Throwable)e);
                    String errorMessage = this.getErrorMessage(e);
                    this.sendError(ctx, errorMessage, HttpResponseStatus.INTERNAL_SERVER_ERROR);
                    return;
                }
            }
            ch.writeAndFlush((Object)LastHttpContent.EMPTY_LAST_CONTENT);
            if (!keepAliveParam && !ShuffleHandler.this.connectionKeepAliveEnabled) {
                lastMap.addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
            } else {
                lastMap.addListener((GenericFutureListener)new ChannelFutureListener(){

                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            future.channel().close();
                            return;
                        }
                        timeoutHandler.setEnabledTimeout(true);
                    }
                });
            }
        }

        private String getErrorMessage(Throwable t) {
            StringBuilder sb = new StringBuilder(t.getMessage());
            while (t.getCause() != null) {
                sb.append(t.getCause().getMessage());
                t = t.getCause();
            }
            return sb.toString();
        }

        protected MapOutputInfo getMapOutputInfo(String jobId, int dagId, String mapId, int reduce, String user) throws IOException {
            AttemptPathInfo pathInfo;
            try {
                AttemptPathIdentifier identifier = new AttemptPathIdentifier(jobId, dagId, user, mapId);
                pathInfo = (AttemptPathInfo)this.pathCache.get((Object)identifier);
                LOG.debug("Retrieved pathInfo for {} check for corresponding loaded messages to determine whether it was loaded or cached", (Object)identifier);
            }
            catch (ExecutionException e) {
                if (e.getCause() instanceof IOException) {
                    throw (IOException)e.getCause();
                }
                throw new RuntimeException(e.getCause());
            }
            TezIndexRecord info = this.indexCache.getIndexInformation(mapId, reduce, pathInfo.indexPath, user);
            if (LOG.isDebugEnabled()) {
                LOG.debug("jobId=" + jobId + ", mapId=" + mapId + ",dataFile=" + pathInfo.dataPath + ", indexFile=" + pathInfo.indexPath);
            }
            MapOutputInfo outputInfo = new MapOutputInfo(pathInfo.dataPath, info);
            return outputInfo;
        }

        protected void populateHeaders(List<String> mapIds, String jobId, int dagId, String user, int reduce, HttpResponse response, boolean keepAliveParam, Map<String, MapOutputInfo> mapOutputInfoMap) throws IOException {
            long contentLength = 0L;
            for (String mapId : mapIds) {
                MapOutputInfo outputInfo = this.getMapOutputInfo(jobId, dagId, mapId, reduce, user);
                if (mapOutputInfoMap.size() < ShuffleHandler.this.mapOutputMetaInfoCacheSize) {
                    mapOutputInfoMap.put(mapId, outputInfo);
                }
                ShuffleHeader header = new ShuffleHeader(mapId, outputInfo.indexRecord.getPartLength(), outputInfo.indexRecord.getRawLength(), reduce);
                DataOutputBuffer dob = new DataOutputBuffer();
                header.write((DataOutput)dob);
                contentLength += outputInfo.indexRecord.getPartLength();
                contentLength += (long)dob.getLength();
            }
            this.setResponseHeaders(response, keepAliveParam, contentLength);
        }

        protected void setResponseHeaders(HttpResponse response, boolean keepAliveParam, long contentLength) {
            if (!ShuffleHandler.this.connectionKeepAliveEnabled && !keepAliveParam) {
                LOG.info("Setting connection close header...");
                response.headers().add("Connection", (Object)ShuffleHandler.CONNECTION_CLOSE);
            } else {
                response.headers().add("Content-Length", (Object)String.valueOf(contentLength));
                response.headers().add("Connection", (Object)"keep-alive");
                response.headers().add("keep-alive", (Object)("timeout=" + ShuffleHandler.this.connectionKeepAliveTimeOut));
                LOG.debug("Content Length in shuffle : " + contentLength);
            }
        }

        protected void verifyRequest(String appid, ChannelHandlerContext ctx, HttpRequest request, HttpResponse response, URL requestUri) throws IOException {
            SecretKey tokenSecret = ShuffleHandler.this.secretManager.retrieveTokenSecret(appid);
            if (null == tokenSecret) {
                LOG.info("Request for unknown token " + appid);
                throw new IOException("could not find jobid");
            }
            String enc_str = SecureShuffleUtils.buildMsgFrom((URL)requestUri);
            String urlHashStr = request.headers().get("UrlHash");
            if (urlHashStr == null) {
                LOG.info("Missing header hash for " + appid);
                throw new IOException("fetcher cannot be authenticated");
            }
            if (LOG.isDebugEnabled()) {
                int len = urlHashStr.length();
                LOG.debug("verifying request. enc_str=" + enc_str + "; hash=..." + urlHashStr.substring(len - len / 2, len - 1));
            }
            SecureShuffleUtils.verifyReply((String)urlHashStr, (String)enc_str, (SecretKey)tokenSecret);
            String reply = SecureShuffleUtils.generateHash((byte[])urlHashStr.getBytes(Charsets.UTF_8), (SecretKey)tokenSecret);
            response.headers().add("ReplyHash", (Object)reply);
            response.headers().add("name", (Object)"mapreduce");
            response.headers().add("version", (Object)"1.0.0");
            if (LOG.isDebugEnabled()) {
                int len = reply.length();
                LOG.debug("Fetcher request verified. enc_str=" + enc_str + ";reply=" + reply.substring(len - len / 2, len - 1));
            }
        }

        protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, String user, String mapId, int reduce, MapOutputInfo mapOutputInfo) throws IOException {
            ChannelFuture writeFuture;
            RandomAccessFile spill;
            TezIndexRecord info = mapOutputInfo.indexRecord;
            ShuffleHeader header = new ShuffleHeader(mapId, info.getPartLength(), info.getRawLength(), reduce);
            DataOutputBuffer dob = new DataOutputBuffer();
            header.write((DataOutput)dob);
            ch.write((Object)Unpooled.wrappedBuffer((byte[])dob.getData(), (int)0, (int)dob.getLength()));
            File spillfile = new File(mapOutputInfo.mapOutputFileName.toString());
            try {
                spill = SecureIOUtils.openForRandomRead((File)spillfile, (String)"r", (String)user, null);
            }
            catch (FileNotFoundException e) {
                LOG.info(spillfile + " not found");
                return null;
            }
            if (ch.pipeline().get(SslHandler.class) == null) {
                boolean canEvictAfterTransfer = true;
                if (!ShuffleHandler.this.shouldAlwaysEvictOsCache) {
                    canEvictAfterTransfer = reduce > 0;
                }
                FadvisedFileRegion partition = new FadvisedFileRegion(spill, info.getStartOffset(), info.getPartLength(), ShuffleHandler.this.manageOsCache, ShuffleHandler.this.readaheadLength, ShuffleHandler.this.readaheadPool, spillfile.getAbsolutePath(), ShuffleHandler.this.shuffleBufferSize, ShuffleHandler.this.shuffleTransferToAllowed, canEvictAfterTransfer);
                writeFuture = ch.write((Object)partition);
            } else {
                FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill, info.getStartOffset(), info.getPartLength(), ShuffleHandler.this.sslFileBufferSize, ShuffleHandler.this.manageOsCache, ShuffleHandler.this.readaheadLength, ShuffleHandler.this.readaheadPool, spillfile.getAbsolutePath());
                writeFuture = ch.write((Object)chunk);
            }
            return writeFuture;
        }

        protected void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
            this.sendError(ctx, "", status);
        }

        protected void sendError(ChannelHandlerContext ctx, String message, HttpResponseStatus status) {
            DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status);
            this.sendError(ctx, message, (FullHttpResponse)response);
        }

        protected void sendError(ChannelHandlerContext ctx, String message, FullHttpResponse response) {
            this.sendError(ctx, Unpooled.copiedBuffer((CharSequence)message, (Charset)CharsetUtil.UTF_8), response);
        }

        private void sendFakeShuffleHeaderWithError(ChannelHandlerContext ctx, String message, HttpResponse response) throws IOException {
            DefaultFullHttpResponse fullResponse = new DefaultFullHttpResponse(response.getProtocolVersion(), response.getStatus());
            fullResponse.headers().set(response.headers());
            ShuffleHeader header = new ShuffleHeader(message, -1L, -1L, -1);
            DataOutputBuffer out = new DataOutputBuffer();
            header.write((DataOutput)out);
            this.sendError(ctx, Unpooled.wrappedBuffer((byte[])out.getData(), (int)0, (int)out.getLength()), (FullHttpResponse)fullResponse);
        }

        protected void sendError(ChannelHandlerContext ctx, ByteBuf content, FullHttpResponse response) {
            response.headers().set("Content-Type", (Object)"text/plain; charset=UTF-8");
            response.headers().add("name", (Object)"mapreduce");
            response.headers().add("version", (Object)"1.0.0");
            response.content().writeBytes(content);
            ctx.channel().writeAndFlush((Object)response).addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
            content.release();
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            if (cause instanceof TooLongFrameException) {
                this.sendError(ctx, HttpResponseStatus.BAD_REQUEST);
                return;
            }
            if (cause instanceof IOException) {
                if (cause instanceof ClosedChannelException) {
                    LOG.debug("Ignoring closed channel error", cause);
                    return;
                }
                String message = String.valueOf(cause.getMessage());
                if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
                    LOG.debug("Ignoring client socket close", cause);
                    return;
                }
            }
            LOG.error("Shuffle error: ", cause);
            if (ctx.channel().isActive()) {
                LOG.error("Shuffle error", cause);
                this.sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
            }
        }

        class MapOutputInfo {
            final Path mapOutputFileName;
            final TezIndexRecord indexRecord;

            MapOutputInfo(Path mapOutputFileName, TezIndexRecord indexRecord) {
                this.mapOutputFileName = mapOutputFileName;
                this.indexRecord = indexRecord;
            }
        }
    }

    static class TimeoutHandler
    extends ChannelDuplexHandler {
        private boolean enabledTimeout;

        TimeoutHandler() {
        }

        void setEnabledTimeout(boolean enabledTimeout) {
            this.enabledTimeout = enabledTimeout;
        }

        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            IdleStateEvent e;
            if (evt instanceof IdleStateEvent && (e = (IdleStateEvent)evt).state() == IdleState.WRITER_IDLE && this.enabledTimeout) {
                ctx.channel().close();
            }
        }
    }

    @Metrics(about="Shuffle output metrics", context="mapred")
    static class ShuffleMetrics
    implements ChannelFutureListener {
        @Metric(value={"Shuffle output in bytes"})
        MutableCounterLong shuffleOutputBytes;
        @Metric(value={"# of failed shuffle outputs"})
        MutableCounterInt shuffleOutputsFailed;
        @Metric(value={"# of succeeded shuffle outputs"})
        MutableCounterInt shuffleOutputsOK;
        @Metric(value={"# of current shuffle connections"})
        MutableGaugeInt shuffleConnections;

        ShuffleMetrics() {
        }

        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                this.shuffleOutputsOK.incr();
            } else {
                this.shuffleOutputsFailed.incr();
            }
            this.shuffleConnections.decr();
        }
    }
}

