package org.mule.service.http.netty.impl.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.io.IOException;
import java.io.InputStream;
import org.mule.runtime.http.api.domain.entity.HttpEntity;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;

/* loaded from: input_file:lib/mule-netty-http-service-0.2.0-SNAPSHOT.jar:org/mule/service/http/netty/impl/client/ChunkedHttpEntityPublisher.class */
public class ChunkedHttpEntityPublisher extends Flux<ByteBuf> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ChunkedHttpEntityPublisher.class);
    private final HttpEntity httpEntity;
    private final int bufferSize;

    /* loaded from: input_file:lib/mule-netty-http-service-0.2.0-SNAPSHOT.jar:org/mule/service/http/netty/impl/client/ChunkedHttpEntityPublisher$InputStreamSubscription.class */
    private static class InputStreamSubscription implements Subscription {
        private static final String HTTP_ENTITY_PRECONDITION = "If a HttpEntity returns true for isStreaming(), it has to provide a length in getBytesLength()";
        private final CoreSubscriber<? super ByteBuf> subscriber;
        private final HttpEntity httpEntity;
        private final int bufferSize;
        private final InputStream contentAsInputStream;

        InputStreamSubscription(CoreSubscriber<? super ByteBuf> coreSubscriber, HttpEntity httpEntity, int i) {
            this.subscriber = coreSubscriber;
            this.httpEntity = httpEntity;
            this.bufferSize = getBufferSize(httpEntity, i);
            if (this.httpEntity.isStreaming()) {
                this.contentAsInputStream = httpEntity.getContent();
            } else {
                this.contentAsInputStream = null;
            }
        }

        private static int getBufferSize(HttpEntity httpEntity, int i) {
            return httpEntity.isStreaming() ? Math.min(Math.toIntExact(httpEntity.getBytesLength().orElse(i)), i) : (int) httpEntity.getBytesLength().orElseThrow(() -> {
                return new IllegalStateException(HTTP_ENTITY_PRECONDITION);
            });
        }

        @Override // org.reactivestreams.Subscription
        public void request(long j) {
            try {
                if (this.httpEntity.isStreaming()) {
                    sendRequestedChunksAndCompleteIfFullyConsumed(j);
                } else {
                    sendAllContentInOneNextAndComplete();
                }
            } catch (IOException e) {
                this.subscriber.onError(e);
            }
        }

        private void sendRequestedChunksAndCompleteIfFullyConsumed(long j) throws IOException {
            boolean z = false;
            int i = 0;
            while (!z && i < j) {
                byte[] bArr = new byte[this.bufferSize];
                int read = this.contentAsInputStream.read(bArr);
                z = read <= 0;
                if (!z) {
                    this.subscriber.onNext(createByteBuf(bArr, read));
                    i++;
                }
            }
            if (z) {
                this.subscriber.onComplete();
            }
        }

        private void sendAllContentInOneNextAndComplete() throws IOException {
            byte[] bytes = this.httpEntity.getBytes();
            if (bytes.length > 0) {
                this.subscriber.onNext(createByteBuf(bytes, bytes.length));
            }
            this.subscriber.onComplete();
        }

        private static ByteBuf createByteBuf(byte[] bArr, int i) {
            return ByteBufAllocator.DEFAULT.buffer(i, i).writeBytes(bArr, 0, i);
        }

        @Override // org.reactivestreams.Subscription
        public void cancel() {
            try {
                if (this.contentAsInputStream != null) {
                    this.contentAsInputStream.close();
                }
            } catch (IOException e) {
                ChunkedHttpEntityPublisher.LOGGER.warn("There was a problem while closing the request content stream", (Throwable) e);
            }
        }
    }

    public ChunkedHttpEntityPublisher(HttpEntity httpEntity) {
        this(httpEntity, 8192);
    }

    public ChunkedHttpEntityPublisher(HttpEntity httpEntity, int i) {
        this.httpEntity = httpEntity;
        this.bufferSize = i;
    }

    @Override // reactor.core.publisher.Flux, reactor.core.CorePublisher
    public void subscribe(CoreSubscriber<? super ByteBuf> coreSubscriber) {
        coreSubscriber.onSubscribe(new InputStreamSubscription(coreSubscriber, this.httpEntity, this.bufferSize));
    }
}
