/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.hdfs.server.namenode;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URISyntaxException;
import java.net.URL;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hadoop.$internal.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hadoop.$internal.com.google.common.collect.Lists;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hadoop.$internal.org.apache.commons.logging.Log;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hadoop.$internal.org.apache.commons.logging.LogFactory;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hadoop.$internal.org.apache.http.client.utils.URIBuilder;
import org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.classification.InterfaceAudience;
import org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.conf.Configuration;
import org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.fs.FileUtil;
import org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
import org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.hdfs.server.namenode.CheckpointFaultInjector;
import org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.hdfs.server.namenode.ImageServlet;
import org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.hdfs.server.namenode.SaveNamespaceCancelledException;
import org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.hdfs.util.Canceler;
import org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.io.IOUtils;
import org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.io.MD5Hash;
import org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.security.UserGroupInformation;
import org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.util.Time;
import org.mortbay.jetty.EofException;

@InterfaceAudience.Private
public class TransferFsImage {
    public static final String CONTENT_LENGTH = "Content-Length";
    public static final String FILE_LENGTH = "File-Length";
    public static final String MD5_HEADER = "X-MD5-Digest";
    private static final String CONTENT_TYPE = "Content-Type";
    private static final String CONTENT_TRANSFER_ENCODING = "Content-Transfer-Encoding";
    @VisibleForTesting
    static int timeout = 0;
    private static final URLConnectionFactory connectionFactory;
    private static final boolean isSpnegoEnabled;
    private static final Log LOG;

    public static void downloadMostRecentImageToDirectory(URL infoServer, File dir) throws IOException {
        String fileId = ImageServlet.getParamStringForMostRecentImage();
        TransferFsImage.getFileClient(infoServer, fileId, Lists.newArrayList(dir), null, false);
    }

    public static MD5Hash downloadImageToStorage(URL fsName, long imageTxId, Storage dstStorage, boolean needDigest) throws IOException {
        String fileid = ImageServlet.getParamStringForImage(null, imageTxId, dstStorage);
        String fileName = NNStorage.getCheckpointImageFileName(imageTxId);
        List<File> dstFiles = dstStorage.getFiles(NNStorage.NameNodeDirType.IMAGE, fileName);
        if (dstFiles.isEmpty()) {
            throw new IOException("No targets in destination storage!");
        }
        MD5Hash hash = TransferFsImage.getFileClient(fsName, fileid, dstFiles, dstStorage, needDigest);
        LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size " + dstFiles.get(0).length() + " bytes.");
        return hash;
    }

    static MD5Hash handleUploadImageRequest(HttpServletRequest request, long imageTxId, Storage dstStorage, InputStream stream, long advertisedSize, DataTransferThrottler throttler) throws IOException {
        String fileName = NNStorage.getCheckpointImageFileName(imageTxId);
        List<File> dstFiles = dstStorage.getFiles(NNStorage.NameNodeDirType.IMAGE, fileName);
        if (dstFiles.isEmpty()) {
            throw new IOException("No targets in destination storage!");
        }
        MD5Hash advertisedDigest = TransferFsImage.parseMD5Header(request);
        MD5Hash hash = TransferFsImage.receiveFile(fileName, dstFiles, dstStorage, true, advertisedSize, advertisedDigest, fileName, stream, throttler);
        LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size " + dstFiles.get(0).length() + " bytes.");
        return hash;
    }

    static void downloadEditsToStorage(URL fsName, RemoteEditLog log, NNStorage dstStorage) throws IOException {
        assert (log.getStartTxId() > 0L && log.getEndTxId() > 0L) : "bad log: " + log;
        String fileid = ImageServlet.getParamStringForLog(log, dstStorage);
        String finalFileName = NNStorage.getFinalizedEditsFileName(log.getStartTxId(), log.getEndTxId());
        List<File> finalFiles = dstStorage.getFiles(NNStorage.NameNodeDirType.EDITS, finalFileName);
        assert (!finalFiles.isEmpty()) : "No checkpoint targets.";
        for (File f : finalFiles) {
            if (f.exists() && FileUtil.canRead(f)) {
                LOG.info("Skipping download of remote edit log " + log + " since it already is stored locally at " + f);
                return;
            }
            if (!LOG.isDebugEnabled()) continue;
            LOG.debug("Dest file: " + f);
        }
        long milliTime = Time.monotonicNow();
        String tmpFileName = NNStorage.getTemporaryEditsFileName(log.getStartTxId(), log.getEndTxId(), milliTime);
        List<File> tmpFiles = dstStorage.getFiles(NNStorage.NameNodeDirType.EDITS, tmpFileName);
        TransferFsImage.getFileClient(fsName, fileid, tmpFiles, dstStorage, false);
        LOG.info("Downloaded file " + tmpFiles.get(0).getName() + " size " + finalFiles.get(0).length() + " bytes.");
        CheckpointFaultInjector.getInstance().beforeEditsRename();
        for (Storage.StorageDirectory sd : dstStorage.dirIterable(NNStorage.NameNodeDirType.EDITS)) {
            boolean success;
            File tmpFile = NNStorage.getTemporaryEditsFile(sd, log.getStartTxId(), log.getEndTxId(), milliTime);
            File finalizedFile = NNStorage.getFinalizedEditsFile(sd, log.getStartTxId(), log.getEndTxId());
            if (LOG.isDebugEnabled()) {
                LOG.debug("Renaming " + tmpFile + " to " + finalizedFile);
            }
            if (success = tmpFile.renameTo(finalizedFile)) continue;
            LOG.warn("Unable to rename edits file from " + tmpFile + " to " + finalizedFile);
        }
    }

    public static void uploadImageFromStorage(URL fsName, Configuration conf, NNStorage storage, NNStorage.NameNodeFile nnf, long txid) throws IOException {
        TransferFsImage.uploadImageFromStorage(fsName, conf, storage, nnf, txid, null);
    }

    public static void uploadImageFromStorage(URL fsName, Configuration conf, NNStorage storage, NNStorage.NameNodeFile nnf, long txid, Canceler canceler) throws IOException {
        URL url = new URL(fsName, "/imagetransfer");
        long startTime = Time.monotonicNow();
        try {
            TransferFsImage.uploadImage(url, conf, storage, nnf, txid, canceler);
        }
        catch (HttpPutFailedException e) {
            if (e.getResponseCode() == 409) {
                LOG.info("Image upload with txid " + txid + " conflicted with a previous image upload to the " + "same NameNode. Continuing...", e);
                return;
            }
            throw e;
        }
        double xferSec = Math.max((double)(Time.monotonicNow() - startTime) / 1000.0, 0.001);
        LOG.info("Uploaded image with txid " + txid + " to namenode at " + fsName + " in " + xferSec + " seconds");
    }

    private static void uploadImage(URL url, Configuration conf, NNStorage storage, NNStorage.NameNodeFile nnf, long txId, Canceler canceler) throws IOException {
        File imageFile = storage.findImageFile(nnf, txId);
        if (imageFile == null) {
            throw new IOException("Could not find image with txid " + txId);
        }
        HttpURLConnection connection = null;
        try {
            URIBuilder uriBuilder = new URIBuilder(url.toURI());
            Map<String, String> params = ImageServlet.getParamsForPutImage(storage, txId, imageFile.length(), nnf);
            for (Map.Entry<String, String> entry : params.entrySet()) {
                uriBuilder.addParameter(entry.getKey(), entry.getValue());
            }
            URL urlWithParams = uriBuilder.build().toURL();
            connection = (HttpURLConnection)connectionFactory.openConnection(urlWithParams, UserGroupInformation.isSecurityEnabled());
            connection.setRequestMethod("PUT");
            connection.setDoOutput(true);
            int chunkSize = conf.getInt("dfs.image.transfer.chunksize", 65536);
            if (imageFile.length() > (long)chunkSize) {
                connection.setChunkedStreamingMode(chunkSize);
            }
            TransferFsImage.setTimeout(connection);
            ImageServlet.setVerificationHeadersForPut(connection, imageFile);
            TransferFsImage.writeFileToPutRequest(conf, connection, imageFile, canceler);
            int responseCode = connection.getResponseCode();
            if (responseCode != 200) {
                throw new HttpPutFailedException(String.format("Image uploading failed, status: %d, url: %s, message: %s", responseCode, urlWithParams, connection.getResponseMessage()), responseCode);
            }
        }
        catch (AuthenticationException e) {
            throw new IOException(e);
        }
        catch (URISyntaxException e) {
            throw new IOException(e);
        }
        finally {
            if (connection != null) {
                connection.disconnect();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void writeFileToPutRequest(Configuration conf, HttpURLConnection connection, File imageFile, Canceler canceler) throws FileNotFoundException, IOException {
        connection.setRequestProperty(CONTENT_TYPE, "application/octet-stream");
        connection.setRequestProperty(CONTENT_TRANSFER_ENCODING, "binary");
        OutputStream output = connection.getOutputStream();
        FileInputStream input = new FileInputStream(imageFile);
        try {
            TransferFsImage.copyFileToStream(output, imageFile, input, ImageServlet.getThrottler(conf), canceler);
        }
        finally {
            IOUtils.closeStream(input);
            IOUtils.closeStream(output);
        }
    }

    public static void copyFileToStream(OutputStream out, File localfile, FileInputStream infile, DataTransferThrottler throttler) throws IOException {
        TransferFsImage.copyFileToStream(out, localfile, infile, throttler, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void copyFileToStream(OutputStream out, File localfile, FileInputStream infile, DataTransferThrottler throttler, Canceler canceler) throws IOException {
        byte[] buf = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE];
        try {
            CheckpointFaultInjector.getInstance().aboutToSendFile(localfile);
            if (CheckpointFaultInjector.getInstance().shouldSendShortFile(localfile)) {
                long len = localfile.length();
                buf = new byte[(int)Math.min(len / 2L, (long)HdfsConstants.IO_FILE_BUFFER_SIZE)];
                infile.read(buf);
            }
            int num = 1;
            while (num > 0) {
                if (canceler != null && canceler.isCancelled()) {
                    throw new SaveNamespaceCancelledException(canceler.getCancellationReason());
                }
                num = infile.read(buf);
                if (num <= 0) {
                    break;
                }
                if (CheckpointFaultInjector.getInstance().shouldCorruptAByte(localfile)) {
                    LOG.warn("SIMULATING A CORRUPT BYTE IN IMAGE TRANSFER!");
                    buf[0] = (byte)(buf[0] + 1);
                }
                out.write(buf, 0, num);
                if (throttler == null) continue;
                throttler.throttle(num, canceler);
            }
        }
        catch (EofException e) {
            LOG.info("Connection closed by client");
            out = null;
        }
        finally {
            if (out != null) {
                out.close();
            }
        }
    }

    static MD5Hash getFileClient(URL infoServer, String queryString, List<File> localPaths, Storage dstStorage, boolean getChecksum) throws IOException {
        URL url = new URL(infoServer, "/imagetransfer?" + queryString);
        LOG.info("Opening connection to " + url);
        return TransferFsImage.doGetUrl(url, localPaths, dstStorage, getChecksum);
    }

    public static MD5Hash doGetUrl(URL url, List<File> localPaths, Storage dstStorage, boolean getChecksum) throws IOException {
        HttpURLConnection connection;
        try {
            connection = (HttpURLConnection)connectionFactory.openConnection(url, isSpnegoEnabled);
        }
        catch (AuthenticationException e) {
            throw new IOException(e);
        }
        TransferFsImage.setTimeout(connection);
        if (connection.getResponseCode() != 200) {
            throw new HttpGetFailedException("Image transfer servlet at " + url + " failed with status code " + connection.getResponseCode() + "\nResponse message:\n" + connection.getResponseMessage(), connection);
        }
        String contentLength = connection.getHeaderField(CONTENT_LENGTH);
        if (contentLength == null) {
            throw new IOException("Content-Length header is not provided by the namenode when trying to fetch " + url);
        }
        long advertisedSize = Long.parseLong(contentLength);
        MD5Hash advertisedDigest = TransferFsImage.parseMD5Header(connection);
        String fsImageName = connection.getHeaderField("X-Image-Edits-Name");
        InputStream stream = connection.getInputStream();
        return TransferFsImage.receiveFile(url.toExternalForm(), localPaths, dstStorage, getChecksum, advertisedSize, advertisedDigest, fsImageName, stream, null);
    }

    private static void setTimeout(HttpURLConnection connection) {
        if (timeout <= 0) {
            HdfsConfiguration conf = new HdfsConfiguration();
            timeout = conf.getInt("dfs.image.transfer.timeout", 60000);
            LOG.info("Image Transfer timeout configured to " + timeout + " milliseconds");
        }
        if (timeout > 0) {
            connection.setConnectTimeout(timeout);
            connection.setReadTimeout(timeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static MD5Hash receiveFile(String url, List<File> localPaths, Storage dstStorage, boolean getChecksum, long advertisedSize, MD5Hash advertisedDigest, String fsImageName, InputStream stream, DataTransferThrottler throttler) throws IOException {
        long startTime = Time.monotonicNow();
        if (localPaths != null) {
            ArrayList<File> newLocalPaths = new ArrayList<File>();
            for (File localPath : localPaths) {
                if (localPath.isDirectory()) {
                    if (fsImageName == null) {
                        throw new IOException("No filename header provided by server");
                    }
                    newLocalPaths.add(new File(localPath, fsImageName));
                    continue;
                }
                newLocalPaths.add(localPath);
            }
            localPaths = newLocalPaths;
        }
        long received = 0L;
        MessageDigest digester = null;
        if (getChecksum) {
            digester = MD5Hash.getDigester();
            stream = new DigestInputStream(stream, digester);
        }
        boolean finishedReceiving = false;
        ArrayList<FileOutputStream> outputStreams = Lists.newArrayList();
        try {
            if (localPaths != null) {
                for (File f : localPaths) {
                    try {
                        if (f.exists()) {
                            LOG.warn("Overwriting existing file " + f + " with file downloaded from " + url);
                        }
                        outputStreams.add(new FileOutputStream(f));
                    }
                    catch (IOException ioe) {
                        LOG.warn("Unable to download file " + f, ioe);
                        if (dstStorage == null || !(dstStorage instanceof StorageErrorReporter)) continue;
                        ((StorageErrorReporter)((Object)dstStorage)).reportErrorOnFile(f);
                    }
                }
                if (outputStreams.isEmpty()) {
                    throw new IOException("Unable to download to any storage directory");
                }
            }
            int num = 1;
            byte[] buf = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE];
            while (num > 0) {
                num = stream.read(buf);
                if (num <= 0) continue;
                received += (long)num;
                for (FileOutputStream fos : outputStreams) {
                    fos.write(buf, 0, num);
                }
                if (throttler == null) continue;
                throttler.throttle(num);
            }
            finishedReceiving = true;
        }
        finally {
            stream.close();
            for (FileOutputStream fos : outputStreams) {
                fos.getChannel().force(true);
                fos.close();
            }
            if (!finishedReceiving) {
                TransferFsImage.deleteTmpFiles(localPaths);
            }
            if (finishedReceiving && received != advertisedSize) {
                TransferFsImage.deleteTmpFiles(localPaths);
                throw new IOException("File " + url + " received length " + received + " is not of the advertised size " + advertisedSize);
            }
        }
        double xferSec = Math.max((double)(Time.monotonicNow() - startTime) / 1000.0, 0.001);
        long xferKb = received / 1024L;
        LOG.info(String.format("Transfer took %.2fs at %.2f KB/s", xferSec, (double)xferKb / xferSec));
        if (digester != null) {
            MD5Hash computedDigest = new MD5Hash(digester.digest());
            if (advertisedDigest != null && !computedDigest.equals(advertisedDigest)) {
                TransferFsImage.deleteTmpFiles(localPaths);
                throw new IOException("File " + url + " computed digest " + computedDigest + " does not match advertised digest " + advertisedDigest);
            }
            return computedDigest;
        }
        return null;
    }

    private static void deleteTmpFiles(List<File> files) {
        if (files == null) {
            return;
        }
        LOG.info("Deleting temporary files: " + files);
        for (File file : files) {
            if (file.delete()) continue;
            LOG.warn("Deleting " + file + " has failed");
        }
    }

    private static MD5Hash parseMD5Header(HttpURLConnection connection) {
        String header = connection.getHeaderField(MD5_HEADER);
        return header != null ? new MD5Hash(header) : null;
    }

    private static MD5Hash parseMD5Header(HttpServletRequest request) {
        String header = request.getHeader(MD5_HEADER);
        return header != null ? new MD5Hash(header) : null;
    }

    static {
        Configuration conf = new Configuration();
        connectionFactory = URLConnectionFactory.newDefaultURLConnectionFactory(conf);
        isSpnegoEnabled = UserGroupInformation.isSecurityEnabled();
        LOG = LogFactory.getLog(TransferFsImage.class);
    }

    public static class HttpPutFailedException
    extends IOException {
        private static final long serialVersionUID = 1L;
        private final int responseCode;

        HttpPutFailedException(String msg, int responseCode) throws IOException {
            super(msg);
            this.responseCode = responseCode;
        }

        public int getResponseCode() {
            return this.responseCode;
        }
    }

    public static class HttpGetFailedException
    extends IOException {
        private static final long serialVersionUID = 1L;
        private final int responseCode;

        HttpGetFailedException(String msg, HttpURLConnection connection) throws IOException {
            super(msg);
            this.responseCode = connection.getResponseCode();
        }

        public int getResponseCode() {
            return this.responseCode;
        }
    }
}

