package org.mule.extension.s3.internal.connection.publisher;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import org.mule.extension.s3.internal.error.exception.S3RuntimeException;
import org.mule.runtime.api.util.DataUnit;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/mule/extension/s3/internal/connection/publisher/InputStreamPublisher.class */
public class InputStreamPublisher implements Publisher<ByteBuffer> {
    private static final Logger log = LoggerFactory.getLogger(InputStreamPublisher.class);
    private static final int DEFAULT_BUFFER_SIZE = DataUnit.MB.toBytes(8);
    private final InputStream stream;
    private final int bufferSize;

    /* loaded from: input_file:org/mule/extension/s3/internal/connection/publisher/InputStreamPublisher$StreamSubscription.class */
    class StreamSubscription implements Subscription {
        private boolean done = false;
        private final Subscriber<? super ByteBuffer> subscriber;

        public StreamSubscription(Subscriber<? super ByteBuffer> subscriber) {
            this.subscriber = subscriber;
        }

        public void request(long j) {
            if (this.done) {
                return;
            }
            try {
                if (j > 0) {
                    int i = 0;
                    while (true) {
                        if (i >= j) {
                            break;
                        }
                        ByteBuffer allocate = ByteBuffer.allocate(InputStreamPublisher.this.bufferSize);
                        int readNext = InputStreamPublisher.this.readNext(InputStreamPublisher.this.stream, allocate);
                        if (readNext > 0) {
                            allocate.limit(readNext);
                            this.subscriber.onNext(allocate);
                        } else if (readNext < 0) {
                            this.done = true;
                            this.subscriber.onComplete();
                            break;
                        }
                        i++;
                    }
                } else {
                    this.subscriber.onError(new IllegalArgumentException("3.9: non-positive requests are not allowed!"));
                }
            } catch (Exception e) {
                this.subscriber.onError(e);
            }
        }

        public void cancel() {
            synchronized (this) {
                this.done = true;
            }
        }
    }

    public InputStreamPublisher(InputStream inputStream) {
        this(inputStream, DEFAULT_BUFFER_SIZE);
    }

    public InputStreamPublisher(InputStream inputStream, int i) {
        this.bufferSize = i;
        this.stream = inputStream;
    }

    public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
        if (subscriber == null) {
            throw new NullPointerException("Subscription MUST NOT be null.");
        }
        try {
            subscriber.onSubscribe(new StreamSubscription(subscriber));
        } catch (Throwable th) {
            log.error(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", th);
        }
    }

    protected int readNext(InputStream inputStream, ByteBuffer byteBuffer) {
        try {
            int read = inputStream.read(byteBuffer.array(), 0, this.bufferSize);
            if (read < 0) {
                inputStream.close();
            }
            return read;
        } catch (IOException e) {
            throw new S3RuntimeException(e);
        }
    }
}
