/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.blob;

import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketException;
import java.security.MessageDigest;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class BlobServerConnection
extends Thread {
    private static final Logger LOG = LoggerFactory.getLogger(BlobServerConnection.class);
    private final Socket clientSocket;
    private final BlobServer blobServer;
    private final Lock readLock;

    BlobServerConnection(Socket clientSocket, BlobServer blobServer) {
        super("BLOB connection for " + String.valueOf(clientSocket.getRemoteSocketAddress()));
        this.setDaemon(true);
        this.clientSocket = clientSocket;
        this.blobServer = (BlobServer)Preconditions.checkNotNull((Object)blobServer);
        ReadWriteLock readWriteLock = blobServer.getReadWriteLock();
        this.readLock = readWriteLock.readLock();
    }

    /*
     * Exception decompiling
     */
    @Override
    public void run() {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [12[UNCONDITIONALDOLOOP]], but top level block is 2[TRYBLOCK]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    public void close() {
        BlobUtils.closeSilently(this.clientSocket, LOG);
        this.interrupt();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void get(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException {
        BlobKey blobKey;
        JobID jobId;
        try {
            int mode = inputStream.read();
            if (mode < 0) {
                throw new EOFException("Premature end of GET request");
            }
            if (mode == 0) {
                jobId = null;
            } else if (mode == 2) {
                byte[] jidBytes = new byte[16];
                BlobUtils.readFully(inputStream, jidBytes, 0, 16, "JobID");
                jobId = JobID.fromByteArray((byte[])jidBytes);
            } else {
                throw new IOException("Unknown type of BLOB addressing: " + mode + ".");
            }
            blobKey = BlobKey.readFromInputStream(inputStream);
            Preconditions.checkArgument((blobKey instanceof TransientBlobKey || jobId != null ? 1 : 0) != 0, (Object)"Invalid BLOB addressing for permanent BLOBs");
            if (LOG.isDebugEnabled()) {
                LOG.debug("Received GET request for BLOB {}/{} from {}.", new Object[]{jobId, blobKey, this.clientSocket.getInetAddress()});
            }
        }
        catch (Throwable t) {
            LOG.error("GET operation from {} failed.", (Object)this.clientSocket.getInetAddress(), (Object)t);
            try {
                BlobServerConnection.writeErrorToStream(outputStream, t);
            }
            catch (IOException jidBytes) {
                // empty catch block
            }
            this.clientSocket.close();
            return;
        }
        try {
            this.readLock.lock();
            try {
                File blobFile;
                try {
                    blobFile = this.blobServer.getFileInternal(jobId, blobKey);
                    if (blobFile.length() > Integer.MAX_VALUE) {
                        throw new IOException("BLOB size exceeds the maximum size (2 GB).");
                    }
                    outputStream.write(0);
                }
                catch (Throwable t) {
                    LOG.error("GET operation failed for BLOB {}/{} from {}.", new Object[]{jobId, blobKey, this.clientSocket.getInetAddress(), t});
                    try {
                        BlobServerConnection.writeErrorToStream(outputStream, t);
                    }
                    catch (IOException jidBytes) {
                        // empty catch block
                    }
                    this.clientSocket.close();
                    this.readLock.unlock();
                    return;
                }
                int blobLen = (int)blobFile.length();
                BlobUtils.writeLength(blobLen, outputStream);
                try (FileInputStream fis = new FileInputStream(blobFile);){
                    int read;
                    for (int bytesRemaining = blobLen; bytesRemaining > 0; bytesRemaining -= read) {
                        read = fis.read(buf);
                        if (read < 0) {
                            throw new IOException("Premature end of BLOB file stream for " + blobFile.getAbsolutePath());
                        }
                        outputStream.write(buf, 0, read);
                    }
                }
            }
            finally {
                this.readLock.unlock();
            }
            int result = inputStream.read();
            if (result < 0) {
                throw new EOFException("Premature end of GET request");
            }
            if (blobKey instanceof TransientBlobKey && result == 0 && !this.blobServer.deleteInternal(jobId, (TransientBlobKey)blobKey)) {
                LOG.warn("DELETE operation failed for BLOB {}/{} from {}.", new Object[]{jobId, blobKey, this.clientSocket.getInetAddress()});
            }
        }
        catch (SocketException e) {
            LOG.debug("Socket connection closed", (Throwable)e);
        }
        catch (Throwable t) {
            LOG.error("GET operation failed", t);
            this.clientSocket.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void put(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException {
        File incomingFile = null;
        try {
            BlobKey.BlobType blobType;
            JobID jobId;
            int mode = inputStream.read();
            if (mode < 0) {
                throw new EOFException("Premature end of PUT request");
            }
            if (mode == 0) {
                jobId = null;
            } else if (mode == 2) {
                byte[] jidBytes = new byte[16];
                BlobUtils.readFully(inputStream, jidBytes, 0, 16, "JobID");
                jobId = JobID.fromByteArray((byte[])jidBytes);
            } else {
                throw new IOException("Unknown type of BLOB addressing.");
            }
            int read = inputStream.read();
            if (read < 0) {
                throw new EOFException("Read an incomplete BLOB type");
            }
            if (read == BlobKey.BlobType.TRANSIENT_BLOB.ordinal()) {
                blobType = BlobKey.BlobType.TRANSIENT_BLOB;
            } else if (read == BlobKey.BlobType.PERMANENT_BLOB.ordinal()) {
                blobType = BlobKey.BlobType.PERMANENT_BLOB;
                Preconditions.checkArgument((jobId != null ? 1 : 0) != 0, (Object)"Invalid BLOB addressing for permanent BLOBs");
            } else {
                throw new IOException("Invalid data received for the BLOB type: " + read);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Received PUT request for BLOB of job {} with from {}.", (Object)jobId, (Object)this.clientSocket.getInetAddress());
            }
            incomingFile = this.blobServer.createTemporaryFilename();
            byte[] digest = BlobServerConnection.readFileFully(inputStream, incomingFile, buf);
            BlobKey blobKey = this.blobServer.moveTempFileToStore(incomingFile, jobId, digest, blobType);
            outputStream.write(0);
            blobKey.writeToOutputStream(outputStream);
        }
        catch (SocketException e) {
            LOG.debug("Socket connection closed", (Throwable)e);
        }
        catch (Throwable t) {
            LOG.error("PUT operation failed", t);
            try {
                BlobServerConnection.writeErrorToStream(outputStream, t);
            }
            catch (IOException iOException) {
                // empty catch block
            }
            this.clientSocket.close();
        }
        finally {
            if (incomingFile != null && !incomingFile.delete() && incomingFile.exists()) {
                LOG.warn("Cannot delete BLOB server staging file " + incomingFile.getAbsolutePath());
            }
        }
    }

    private static byte[] readFileFully(InputStream inputStream, File incomingFile, byte[] buf) throws IOException {
        MessageDigest md = BlobUtils.createMessageDigest();
        try (FileOutputStream fos = new FileOutputStream(incomingFile);){
            int bytesExpected;
            while ((bytesExpected = BlobUtils.readLength(inputStream)) != -1) {
                if (bytesExpected > 65536) {
                    throw new IOException("Unexpected number of incoming bytes: " + bytesExpected);
                }
                BlobUtils.readFully(inputStream, buf, 0, bytesExpected, "buffer");
                fos.write(buf, 0, bytesExpected);
                md.update(buf, 0, bytesExpected);
            }
            byte[] byArray = md.digest();
            return byArray;
        }
    }

    private static void writeErrorToStream(OutputStream out, Throwable t) throws IOException {
        byte[] bytes = InstantiationUtil.serializeObject((Object)t);
        out.write(1);
        BlobUtils.writeLength(bytes.length, out);
        out.write(bytes);
    }
}

