package org.apache.pekko.stream.impl.io;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.stream.impl.io.InputStreamSinkStage;
import org.apache.pekko.util.ByteString;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Some$;
import scala.collection.SeqOps;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: InputStreamSinkStage.scala */
@InternalApi
/* loaded from: input_file:org/apache/pekko/stream/impl/io/InputStreamAdapter.class */
public class InputStreamAdapter extends InputStream {
    private final BlockingQueue<InputStreamSinkStage.StreamToAdapterMessage> sharedBuffer;
    private final Function1<InputStreamSinkStage.AdapterToStageMessage, BoxedUnit> sendToStage;
    private final FiniteDuration readTimeout;
    private boolean isInitialized = false;
    private final AtomicBoolean isActive = new AtomicBoolean(true);
    private boolean isStageAlive = true;
    private Option detachedChunk = None$.MODULE$;

    public InputStreamAdapter(BlockingQueue<InputStreamSinkStage.StreamToAdapterMessage> blockingQueue, Function1<InputStreamSinkStage.AdapterToStageMessage, BoxedUnit> function1, FiniteDuration finiteDuration) {
        this.sharedBuffer = blockingQueue;
        this.sendToStage = function1;
        this.readTimeout = finiteDuration;
    }

    public boolean isInitialized() {
        return this.isInitialized;
    }

    public void isInitialized_$eq(boolean z) {
        this.isInitialized = z;
    }

    public AtomicBoolean isActive() {
        return this.isActive;
    }

    public boolean isStageAlive() {
        return this.isStageAlive;
    }

    public void isStageAlive_$eq(boolean z) {
        this.isStageAlive = z;
    }

    public IOException subscriberClosedException() {
        return new IOException("Reactive stream is terminated, no reads are possible");
    }

    public Option<ByteString> detachedChunk() {
        return this.detachedChunk;
    }

    public void detachedChunk_$eq(Option<ByteString> option) {
        this.detachedChunk = option;
    }

    private <T> T executeIfNotClosed(Function0<T> function0) throws IOException {
        if (!isActive().get()) {
            throw subscriberClosedException();
        }
        waitIfNotInitialized();
        return (T) function0.apply();
    }

    @Override // java.io.InputStream
    public int read() throws IOException {
        byte[] bArr = new byte[1];
        int read = read(bArr, 0, 1);
        if (1 == read) {
            return bArr[0] & 255;
        }
        if (-1 == read) {
            return -1;
        }
        throw new IllegalStateException(new StringBuilder(17).append("Invalid length [").append(read).append("]").toString());
    }

    @Override // java.io.InputStream
    public int read(byte[] bArr) throws IOException {
        return read(bArr, 0, bArr.length);
    }

    @Override // java.io.InputStream
    public int read(byte[] bArr, int i, int i2) throws IOException {
        Predef$.MODULE$.require(bArr.length > 0, InputStreamAdapter::read$$anonfun$1);
        Predef$.MODULE$.require(i >= 0, InputStreamAdapter::read$$anonfun$2);
        Predef$.MODULE$.require(i2 >= 0, InputStreamAdapter::read$$anonfun$3);
        Predef$.MODULE$.require(i + i2 <= bArr.length, InputStreamAdapter::read$$anonfun$4);
        if (i2 == 0) {
            return 0;
        }
        return BoxesRunTime.unboxToInt(executeIfNotClosed(() -> {
            int i3;
            if (!isStageAlive()) {
                return -1;
            }
            Option<ByteString> detachedChunk = detachedChunk();
            if (!None$.MODULE$.equals(detachedChunk)) {
                if (detachedChunk instanceof Some) {
                    return readBytes(bArr, i, i2);
                }
                throw new MatchError(detachedChunk);
            }
            try {
                InputStreamSinkStage.StreamToAdapterMessage poll = this.sharedBuffer.poll(this.readTimeout.toMillis(), TimeUnit.MILLISECONDS);
                if (poll instanceof InputStreamSinkStage.Data) {
                    detachedChunk_$eq(Some$.MODULE$.apply(InputStreamSinkStage$Data$.MODULE$.unapply((InputStreamSinkStage.Data) poll)._1()));
                    i3 = readBytes(bArr, i, i2);
                } else {
                    if (!InputStreamSinkStage$Finished$.MODULE$.equals(poll)) {
                        if (poll instanceof InputStreamSinkStage.Failed) {
                            Throwable _1 = InputStreamSinkStage$Failed$.MODULE$.unapply((InputStreamSinkStage.Failed) poll)._1();
                            isStageAlive_$eq(false);
                            throw new IOException(_1);
                        }
                        if (poll == null) {
                            throw new IOException("Timeout on waiting for new data");
                        }
                        if (InputStreamSinkStage$Initialized$.MODULE$.equals(poll)) {
                            throw new IllegalStateException("message 'Initialized' must come first");
                        }
                        throw new MatchError(poll);
                    }
                    isStageAlive_$eq(false);
                    i3 = -1;
                }
                return i3;
            } catch (InterruptedException e) {
                throw new IOException(e);
            }
        }));
    }

    private int readBytes(byte[] bArr, int i, int i2) {
        Predef$.MODULE$.require(detachedChunk().nonEmpty(), InputStreamAdapter::readBytes$$anonfun$1);
        int size = ((SeqOps) detachedChunk().get()).size();
        int data = getData(bArr, i, i2, 0);
        if (data >= size) {
            this.sendToStage.apply(InputStreamSinkStage$ReadElementAcknowledgement$.MODULE$);
        }
        return data;
    }

    @Override // java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (isActive().getAndSet(false) && isStageAlive()) {
            this.sendToStage.apply(InputStreamSinkStage$Close$.MODULE$);
        }
    }

    private int getData(byte[] bArr, int i, int i2, int i3) {
        while (true) {
            Some grabDataChunk = grabDataChunk();
            if (!(grabDataChunk instanceof Some)) {
                if (None$.MODULE$.equals(grabDataChunk)) {
                    return i3;
                }
                throw new MatchError(grabDataChunk);
            }
            ByteString byteString = (ByteString) grabDataChunk.value();
            int size = byteString.size();
            if (size > i2) {
                byteString.copyToArray(bArr, i, i2);
                detachedChunk_$eq(Some$.MODULE$.apply(byteString.drop(i2)));
                return i3 + i2;
            }
            byteString.copyToArray(bArr, i, size);
            detachedChunk_$eq(None$.MODULE$);
            if (size == i2) {
                return i3 + size;
            }
            int i4 = i + size;
            i = i4;
            i2 -= size;
            i3 += size;
        }
    }

    private void waitIfNotInitialized() {
        if (isInitialized()) {
            return;
        }
        InputStreamSinkStage.StreamToAdapterMessage poll = this.sharedBuffer.poll(this.readTimeout.toMillis(), TimeUnit.MILLISECONDS);
        if (InputStreamSinkStage$Initialized$.MODULE$.equals(poll)) {
            isInitialized_$eq(true);
        } else {
            if (poll == null) {
                throw new IOException(new StringBuilder(57).append("Timeout after ").append(this.readTimeout).append(" waiting for Initialized message from stage").toString());
            }
            Predef$.MODULE$.require(false, () -> {
                return waitIfNotInitialized$$anonfun$1(r2);
            });
        }
    }

    private Option<ByteString> grabDataChunk() {
        Option<ByteString> detachedChunk = detachedChunk();
        if (!None$.MODULE$.equals(detachedChunk)) {
            if (detachedChunk instanceof Some) {
                return detachedChunk();
            }
            throw new MatchError(detachedChunk);
        }
        InputStreamSinkStage.StreamToAdapterMessage poll = this.sharedBuffer.poll();
        if (poll instanceof InputStreamSinkStage.Data) {
            detachedChunk_$eq(Some$.MODULE$.apply(InputStreamSinkStage$Data$.MODULE$.unapply((InputStreamSinkStage.Data) poll)._1()));
            return detachedChunk();
        }
        if (InputStreamSinkStage$Finished$.MODULE$.equals(poll)) {
            isStageAlive_$eq(false);
            return None$.MODULE$;
        }
        if (poll instanceof InputStreamSinkStage.Failed) {
            throw new IOException(InputStreamSinkStage$Failed$.MODULE$.unapply((InputStreamSinkStage.Failed) poll)._1());
        }
        return None$.MODULE$;
    }

    private static final Object read$$anonfun$1() {
        return "array size must be >= 0";
    }

    private static final Object read$$anonfun$2() {
        return "begin must be >= 0";
    }

    private static final Object read$$anonfun$3() {
        return "length must be >= 0";
    }

    private static final Object read$$anonfun$4() {
        return "begin + length must be smaller or equal to the array length";
    }

    private static final Object readBytes$$anonfun$1() {
        return "Chunk must be pulled from shared buffer";
    }

    private static final Object waitIfNotInitialized$$anonfun$1(InputStreamSinkStage.StreamToAdapterMessage streamToAdapterMessage) {
        return new StringBuilder(52).append("First message must be Initialized notification, got ").append(streamToAdapterMessage).toString();
    }
}
