package alluxio.client.block.stream;

import alluxio.exception.status.AlluxioStatusException;
import alluxio.exception.status.CancelledException;
import alluxio.exception.status.DeadlineExceededException;
import alluxio.exception.status.UnavailableException;
import alluxio.resource.LockResource;
import alluxio.util.LogUtils;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:alluxio/client/block/stream/GrpcBlockingStream.class */
public class GrpcBlockingStream<ReqT, ResT> {
    private static final Logger LOG = LoggerFactory.getLogger(GrpcBlockingStream.class);
    private final StreamObserver<ResT> mResponseObserver;
    private final ClientCallStreamObserver<ReqT> mRequestObserver;
    private final BlockingQueue<Object> mResponses;
    private final String mDescription;

    @GuardedBy("mLock")
    private Throwable mError;
    private volatile boolean mCompleted = false;
    private volatile boolean mClosed = false;
    private volatile boolean mCanceled = false;
    private final ReentrantLock mLock = new ReentrantLock();
    private final Condition mReadyOrFailed = this.mLock.newCondition();
    private volatile boolean mClosedFromRemote = false;

    /* loaded from: input_file:alluxio/client/block/stream/GrpcBlockingStream$ResponseStreamObserver.class */
    private final class ResponseStreamObserver implements ClientResponseObserver<ReqT, ResT> {
        private ResponseStreamObserver() {
        }

        public void onNext(ResT rest) {
            try {
                GrpcBlockingStream.this.mResponses.put(rest);
            } catch (InterruptedException e) {
                handleInterruptedException(e);
            }
        }

        public void onError(Throwable th) {
            LockResource lockResource = new LockResource(GrpcBlockingStream.this.mLock);
            Throwable th2 = null;
            try {
                try {
                    GrpcBlockingStream.LOG.warn("Received error on stream ({})", GrpcBlockingStream.this.mDescription, th);
                    updateException(th);
                    GrpcBlockingStream.this.mReadyOrFailed.signal();
                    if (lockResource != null) {
                        if (0 == 0) {
                            lockResource.close();
                            return;
                        }
                        try {
                            lockResource.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    }
                } catch (Throwable th4) {
                    th2 = th4;
                    throw th4;
                }
            } catch (Throwable th5) {
                if (lockResource != null) {
                    if (th2 != null) {
                        try {
                            lockResource.close();
                        } catch (Throwable th6) {
                            th2.addSuppressed(th6);
                        }
                    } else {
                        lockResource.close();
                    }
                }
                throw th5;
            }
        }

        public void onCompleted() {
            try {
                GrpcBlockingStream.LOG.debug("Received completed event for stream ({})", GrpcBlockingStream.this.mDescription);
                GrpcBlockingStream.this.mResponses.put(this);
                GrpcBlockingStream.this.mClosedFromRemote = true;
            } catch (InterruptedException e) {
                handleInterruptedException(e);
            }
        }

        public void beforeStart(ClientCallStreamObserver<ReqT> clientCallStreamObserver) {
            clientCallStreamObserver.setOnReadyHandler(() -> {
                LockResource lockResource = new LockResource(GrpcBlockingStream.this.mLock);
                Throwable th = null;
                try {
                    GrpcBlockingStream.this.mReadyOrFailed.signal();
                    if (lockResource != null) {
                        if (0 == 0) {
                            lockResource.close();
                            return;
                        }
                        try {
                            lockResource.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    if (lockResource != null) {
                        if (0 != 0) {
                            try {
                                lockResource.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            lockResource.close();
                        }
                    }
                    throw th3;
                }
            });
        }

        private void handleInterruptedException(InterruptedException interruptedException) {
            Thread.currentThread().interrupt();
            LockResource lockResource = new LockResource(GrpcBlockingStream.this.mLock);
            Throwable th = null;
            try {
                try {
                    updateException(interruptedException);
                    if (lockResource != null) {
                        if (0 != 0) {
                            try {
                                lockResource.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            lockResource.close();
                        }
                    }
                    throw new RuntimeException(interruptedException);
                } finally {
                }
            } catch (Throwable th3) {
                if (lockResource != null) {
                    if (th != null) {
                        try {
                            lockResource.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        lockResource.close();
                    }
                }
                throw th3;
            }
        }

        @GuardedBy("mLock")
        private void updateException(Throwable th) {
            if (GrpcBlockingStream.this.mError != null && GrpcBlockingStream.this.mError != th) {
                GrpcBlockingStream.this.mError.addSuppressed(th);
            } else {
                GrpcBlockingStream.this.mError = th;
                GrpcBlockingStream.this.mResponses.offer(th);
            }
        }
    }

    public GrpcBlockingStream(Function<StreamObserver<ResT>, StreamObserver<ReqT>> function, int i, String str) {
        LOG.debug("Opening stream ({})", str);
        this.mResponses = new ArrayBlockingQueue(i);
        this.mResponseObserver = new ResponseStreamObserver();
        this.mRequestObserver = function.apply(this.mResponseObserver);
        this.mDescription = str;
    }

    public void send(ReqT reqt, long j) throws IOException {
        if (this.mClosed || this.mCanceled || this.mClosedFromRemote) {
            throw new CancelledException(formatErrorMessage("Failed to send request %s: stream is already closed or cancelled. clientClosed: %s clientCancelled: %s serverClosed: %s", LogUtils.truncateMessageLineLength(reqt), Boolean.valueOf(this.mClosed), Boolean.valueOf(this.mCanceled), Boolean.valueOf(this.mClosedFromRemote)));
        }
        LockResource lockResource = new LockResource(this.mLock);
        Throwable th = null;
        try {
            long currentTimeMillis = System.currentTimeMillis();
            while (true) {
                checkError();
                if (this.mRequestObserver.isReady()) {
                    this.mRequestObserver.onNext(reqt);
                    return;
                }
                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                if (currentTimeMillis2 >= j) {
                    throw new DeadlineExceededException(formatErrorMessage("Timeout sending request %s after %dms. clientClosed: %s clientCancelled: %s serverClosed: %s", LogUtils.truncateMessageLineLength(reqt), Long.valueOf(j), Boolean.valueOf(this.mClosed), Boolean.valueOf(this.mCanceled), Boolean.valueOf(this.mClosedFromRemote)));
                }
                try {
                    if (!this.mReadyOrFailed.await(Math.min(j - currentTimeMillis2, 60000L), TimeUnit.MILLISECONDS)) {
                        LOG.warn("Stream is not ready for client to send request, will wait again. totalWaitMs: {} clientClosed: {} clientCancelled: {} serverClosed: {} description: {}", new Object[]{Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Boolean.valueOf(this.mClosed), Boolean.valueOf(this.mCanceled), Boolean.valueOf(this.mClosedFromRemote), this.mDescription});
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new CancelledException(formatErrorMessage("Failed to send request %s: interrupted while waiting for server.", LogUtils.truncateMessageLineLength(reqt)), e);
                }
            }
        } finally {
            if (lockResource != null) {
                if (0 != 0) {
                    try {
                        lockResource.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    lockResource.close();
                }
            }
        }
    }

    public void send(ReqT reqt) throws IOException {
        if (this.mClosed || this.mCanceled || this.mClosedFromRemote) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Failed to send request {}: stream is already closed or cancelled. clientClosed: {} clientCancelled: {} serverClosed: {} ({})", new Object[]{LogUtils.truncateMessageLineLength(reqt), Boolean.valueOf(this.mClosed), Boolean.valueOf(this.mCanceled), Boolean.valueOf(this.mClosedFromRemote), this.mDescription});
                return;
            }
            return;
        }
        LockResource lockResource = new LockResource(this.mLock);
        Throwable th = null;
        try {
            try {
                checkError();
                if (lockResource != null) {
                    if (0 != 0) {
                        try {
                            lockResource.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        lockResource.close();
                    }
                }
                this.mRequestObserver.onNext(reqt);
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (lockResource != null) {
                if (th != null) {
                    try {
                        lockResource.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    lockResource.close();
                }
            }
            throw th4;
        }
    }

    public ResT receive(long j) throws IOException {
        if (this.mCompleted) {
            return null;
        }
        if (this.mCanceled) {
            throw new CancelledException(formatErrorMessage("Stream is already canceled.", new Object[0]));
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            if (currentTimeMillis2 >= j) {
                throw new DeadlineExceededException(formatErrorMessage("Timeout waiting for response after %dms. clientClosed: %s clientCancelled: %s serverClosed: %s", Long.valueOf(j), Boolean.valueOf(this.mClosed), Boolean.valueOf(this.mCanceled), Boolean.valueOf(this.mClosedFromRemote)));
            }
            try {
                ResT rest = (ResT) this.mResponses.poll(Math.min(j - currentTimeMillis2, 60000L), TimeUnit.MILLISECONDS);
                if (rest != null) {
                    if (rest == this.mResponseObserver) {
                        this.mCompleted = true;
                        return null;
                    }
                    checkError();
                    return rest;
                }
                checkError();
                LOG.warn("Client did not receive message from stream, will wait again. totalWaitMs: {} clientClosed: {} clientCancelled: {} serverClosed: {} description: {}", new Object[]{Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Boolean.valueOf(this.mClosed), Boolean.valueOf(this.mCanceled), Boolean.valueOf(this.mClosedFromRemote), this.mDescription});
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new CancelledException(formatErrorMessage("Interrupted while waiting for response.", new Object[0]), e);
            }
        }
    }

    public void close() {
        if (isOpen()) {
            LOG.debug("Closing stream ({})", this.mDescription);
            this.mClosed = true;
            this.mRequestObserver.onCompleted();
        }
    }

    public void cancel() {
        if (isOpen()) {
            LOG.debug("Cancelling stream ({})", this.mDescription);
            this.mCanceled = true;
            this.mRequestObserver.cancel("Request is cancelled by user.", (Throwable) null);
        }
    }

    public Optional<ResT> waitForComplete(long j) throws IOException {
        ResT rest;
        if (this.mCompleted || this.mCanceled) {
            return Optional.empty();
        }
        ResT rest2 = null;
        do {
            rest = rest2;
            rest2 = receive(j);
        } while (rest2 != null);
        return Optional.ofNullable(rest);
    }

    /* JADX WARN: Removed duplicated region for block: B:12:0x002d  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public boolean isOpen() {
        /*
            r4 = this;
            alluxio.resource.LockResource r0 = new alluxio.resource.LockResource
            r1 = r0
            r2 = r4
            java.util.concurrent.locks.ReentrantLock r2 = r2.mLock
            r1.<init>(r2)
            r5 = r0
            r0 = 0
            r6 = r0
            r0 = r4
            boolean r0 = r0.mClosed     // Catch: java.lang.Throwable -> L49 java.lang.Throwable -> L4e
            if (r0 != 0) goto L27
            r0 = r4
            boolean r0 = r0.mCanceled     // Catch: java.lang.Throwable -> L49 java.lang.Throwable -> L4e
            if (r0 != 0) goto L27
            r0 = r4
            java.lang.Throwable r0 = r0.mError     // Catch: java.lang.Throwable -> L49 java.lang.Throwable -> L4e
            if (r0 != 0) goto L27
            r0 = 1
            goto L28
        L27:
            r0 = 0
        L28:
            r7 = r0
            r0 = r5
            if (r0 == 0) goto L47
            r0 = r6
            if (r0 == 0) goto L43
            r0 = r5
            r0.close()     // Catch: java.lang.Throwable -> L38
            goto L47
        L38:
            r8 = move-exception
            r0 = r6
            r1 = r8
            r0.addSuppressed(r1)
            goto L47
        L43:
            r0 = r5
            r0.close()
        L47:
            r0 = r7
            return r0
        L49:
            r7 = move-exception
            r0 = r7
            r6 = r0
            r0 = r7
            throw r0     // Catch: java.lang.Throwable -> L4e
        L4e:
            r9 = move-exception
            r0 = r5
            if (r0 == 0) goto L6e
            r0 = r6
            if (r0 == 0) goto L6a
            r0 = r5
            r0.close()     // Catch: java.lang.Throwable -> L5f
            goto L6e
        L5f:
            r10 = move-exception
            r0 = r6
            r1 = r10
            r0.addSuppressed(r1)
            goto L6e
        L6a:
            r0 = r5
            r0.close()
        L6e:
            r0 = r9
            throw r0
        */
        throw new UnsupportedOperationException("Method not decompiled: alluxio.client.block.stream.GrpcBlockingStream.isOpen():boolean");
    }

    public boolean isClosed() {
        return this.mClosed;
    }

    public boolean isCanceled() {
        return this.mCanceled;
    }

    private void checkError() throws IOException {
        LockResource lockResource = new LockResource(this.mLock);
        Throwable th = null;
        try {
            if (this.mError != null) {
                this.mCanceled = true;
                throw toAlluxioStatusException(this.mError);
            }
            if (lockResource != null) {
                if (0 == 0) {
                    lockResource.close();
                    return;
                }
                try {
                    lockResource.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (lockResource != null) {
                if (0 != 0) {
                    try {
                        lockResource.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    lockResource.close();
                }
            }
            throw th3;
        }
    }

    private AlluxioStatusException toAlluxioStatusException(Throwable th) {
        AlluxioStatusException fromThrowable;
        if (th instanceof StatusRuntimeException) {
            fromThrowable = AlluxioStatusException.fromStatusRuntimeException((StatusRuntimeException) th);
            if (fromThrowable.getStatusCode() == Status.Code.CANCELLED) {
                fromThrowable = new UnavailableException(formatErrorMessage("Stream is canceled by server.", new Object[0]), fromThrowable);
            }
        } else {
            fromThrowable = AlluxioStatusException.fromThrowable(this.mError);
        }
        return AlluxioStatusException.from(fromThrowable.getStatus().withDescription(formatErrorMessage(fromThrowable.getMessage(), new Object[0])));
    }

    private String formatErrorMessage(String str, Object... objArr) {
        return (str == null ? "Unknown error" : String.format(str, objArr)) + String.format(" (%s)", this.mDescription);
    }
}
