package netflix.karyon.transport.util;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;

/* loaded from: input_file:netflix/karyon/transport/util/HttpContentInputStream.class */
public class HttpContentInputStream extends InputStream {
    private static final Logger logger = LoggerFactory.getLogger(HttpContentInputStream.class);
    private final Lock lock = new ReentrantLock();
    private volatile boolean isClosed = false;
    private volatile boolean isCompleted = false;
    private volatile Throwable completedWithError = null;
    private final Condition contentAvailabilityMonitor = this.lock.newCondition();
    private final ByteBuf contentBuffer;

    public HttpContentInputStream(ByteBufAllocator byteBufAllocator, Observable<ByteBuf> observable) {
        this.contentBuffer = byteBufAllocator.buffer();
        observable.subscribe(new Observer<ByteBuf>() { // from class: netflix.karyon.transport.util.HttpContentInputStream.1
            public void onCompleted() {
                HttpContentInputStream.this.lock.lock();
                try {
                    HttpContentInputStream.this.isCompleted = true;
                    HttpContentInputStream.logger.debug("Processing complete");
                    HttpContentInputStream.this.contentAvailabilityMonitor.signalAll();
                    HttpContentInputStream.this.lock.unlock();
                } catch (Throwable th) {
                    HttpContentInputStream.this.lock.unlock();
                    throw th;
                }
            }

            public void onError(Throwable th) {
                HttpContentInputStream.this.lock.lock();
                try {
                    HttpContentInputStream.this.completedWithError = th;
                    HttpContentInputStream.this.isCompleted = true;
                    HttpContentInputStream.logger.error("Observer, got error: " + th.getMessage());
                    HttpContentInputStream.this.contentAvailabilityMonitor.signalAll();
                    HttpContentInputStream.this.lock.unlock();
                } catch (Throwable th2) {
                    HttpContentInputStream.this.lock.unlock();
                    throw th2;
                }
            }

            public void onNext(ByteBuf byteBuf) {
                HttpContentInputStream.this.lock.lock();
                try {
                    try {
                        if (byteBuf.readableBytes() > 0) {
                            HttpContentInputStream.this.contentBuffer.writeBytes(byteBuf);
                        }
                        HttpContentInputStream.this.contentAvailabilityMonitor.signalAll();
                        HttpContentInputStream.this.lock.unlock();
                    } catch (Exception e) {
                        HttpContentInputStream.logger.error("Error on server", e);
                        HttpContentInputStream.this.lock.unlock();
                    }
                } catch (Throwable th) {
                    HttpContentInputStream.this.lock.unlock();
                    throw th;
                }
            }
        });
    }

    @Override // java.io.InputStream
    public int available() throws IOException {
        if (this.isCompleted) {
            return this.contentBuffer.readableBytes();
        }
        return 0;
    }

    @Override // java.io.InputStream
    public void mark(int i) {
        this.contentBuffer.markReaderIndex();
    }

    @Override // java.io.InputStream
    public boolean markSupported() {
        return true;
    }

    @Override // java.io.InputStream
    public int read() throws IOException {
        this.lock.lock();
        try {
            if (!await()) {
                return -1;
            }
            int readByte = this.contentBuffer.readByte() & 255;
            this.lock.unlock();
            return readByte;
        } finally {
            this.lock.unlock();
        }
    }

    @Override // java.io.InputStream
    public int read(byte[] bArr, int i, int i2) throws IOException {
        if (bArr == null) {
            throw new NullPointerException("Null buffer");
        }
        if (i2 < 0 || i < 0 || i2 > bArr.length - i) {
            throw new IndexOutOfBoundsException("Invalid index");
        }
        if (i2 == 0) {
            return 0;
        }
        this.lock.lock();
        try {
            if (!await()) {
                return -1;
            }
            int min = Math.min(i2, this.contentBuffer.readableBytes());
            this.contentBuffer.readBytes(bArr, i, min);
            this.lock.unlock();
            return min;
        } finally {
            this.lock.unlock();
        }
    }

    @Override // java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        synchronized (this) {
            if (this.isClosed) {
                return;
            }
            this.isClosed = true;
            this.contentBuffer.release();
        }
    }

    @Override // java.io.InputStream
    public void reset() throws IOException {
        this.contentBuffer.resetReaderIndex();
    }

    @Override // java.io.InputStream
    public long skip(long j) throws IOException {
        if (j <= 0) {
            return 0L;
        }
        return j > 2147483647L ? skipBytes(Integer.MAX_VALUE) : skipBytes((int) j);
    }

    private int skipBytes(int i) throws IOException {
        int min = Math.min(available(), i);
        this.contentBuffer.skipBytes(min);
        return min;
    }

    private boolean await() throws IOException {
        while (!this.isCompleted && !this.contentBuffer.isReadable()) {
            try {
                this.contentAvailabilityMonitor.await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                logger.error("Interrupted: " + e.getMessage());
                throw new IOException(e);
            }
        }
        if (this.completedWithError != null) {
            throw new IOException(this.completedWithError);
        }
        return !this.isCompleted || this.contentBuffer.isReadable();
    }
}
