package org.mule.service.http.netty.impl.util;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import junit.framework.AssertionFailedError;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mule.service.http.netty.impl.streaming.BlockingBidirectionalStream;
import org.mule.tck.junit4.AbstractMuleTestCase;
import org.mule.tck.probe.PollingProber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/mule/service/http/netty/impl/util/BlockingBufferTestCase.class */
public class BlockingBufferTestCase extends AbstractMuleTestCase {
    private static final String TEST_PAYLOAD = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.";
    private static final Logger LOGGER = LoggerFactory.getLogger(BlockingBufferTestCase.class);
    private OutputStream sink;
    private TestConsumer consumer;

    /* loaded from: input_file:org/mule/service/http/netty/impl/util/BlockingBufferTestCase$TestConsumer.class */
    private static class TestConsumer extends Thread {
        private final InputStream inputStream;
        private final int consumerBufferSize;
        private final ByteArrayOutputStream consumedData = new ByteArrayOutputStream();
        private final AtomicBoolean continueReading = new AtomicBoolean(true);
        private Exception errorWhileConsuming;

        public TestConsumer(InputStream inputStream, int i) {
            this.inputStream = inputStream;
            this.consumerBufferSize = i;
        }

        public synchronized byte[] getConsumedData() {
            return this.consumedData.toByteArray();
        }

        public boolean finishedReading() {
            return !this.continueReading.get();
        }

        public Optional<Exception> getErrorWhileConsuming() {
            return Optional.ofNullable(this.errorWhileConsuming);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.continueReading.get()) {
                byte[] bArr = new byte[this.consumerBufferSize];
                try {
                    int read = this.inputStream.read(bArr, 0, this.consumerBufferSize);
                    if (read == -1 || read == 0) {
                        this.continueReading.set(false);
                    } else {
                        synchronized (this) {
                            BlockingBufferTestCase.LOGGER.debug("Reading this chunk [{}]", new String(bArr, 0, read));
                            this.consumedData.write(bArr, 0, read);
                        }
                    }
                } catch (IOException e) {
                    BlockingBufferTestCase.LOGGER.error("Found error while consuming", e);
                    this.errorWhileConsuming = e;
                    this.continueReading.set(false);
                }
            }
        }
    }

    @Before
    public void setUp() {
        BlockingBidirectionalStream blockingBidirectionalStream = new BlockingBidirectionalStream();
        this.consumer = new TestConsumer(blockingBidirectionalStream.getInputStream(), 8);
        this.consumer.start();
        this.sink = blockingBidirectionalStream.getOutputStream();
    }

    @After
    public void tearDown() throws InterruptedException {
        if (this.consumer != null) {
            this.consumer.join();
        }
    }

    @Test
    public void consumerBlocksWhenBufferIsEmpty() throws InterruptedException, IOException {
        Thread.sleep(500L);
        MatcherAssert.assertThat(Boolean.valueOf(this.consumer.finishedReading()), Matchers.is(false));
        MatcherAssert.assertThat(Integer.valueOf(this.consumer.getConsumedData().length), Matchers.is(0));
        this.consumer.interrupt();
        PollingProber.probe(() -> {
            Exception orElseThrow = this.consumer.getErrorWhileConsuming().orElseThrow(errorShouldBePresent());
            MatcherAssert.assertThat(orElseThrow, Matchers.instanceOf(IOException.class));
            MatcherAssert.assertThat(orElseThrow.getCause(), Matchers.instanceOf(InterruptedException.class));
            return true;
        });
    }

    @Test
    public void consumerUnblocksWhenBufferIsClosedAndEmpty() throws InterruptedException, IOException {
        Thread.sleep(500L);
        MatcherAssert.assertThat(Boolean.valueOf(this.consumer.finishedReading()), Matchers.is(false));
        MatcherAssert.assertThat(Integer.valueOf(this.consumer.getConsumedData().length), Matchers.is(0));
        this.sink.close();
        PollingProber.probe(() -> {
            MatcherAssert.assertThat(Boolean.valueOf(this.consumer.finishedReading()), Matchers.is(true));
            MatcherAssert.assertThat(this.consumer.getErrorWhileConsuming(), Matchers.is(Optional.empty()));
            MatcherAssert.assertThat(Integer.valueOf(this.consumer.getConsumedData().length), Matchers.is(0));
            return true;
        });
    }

    @Test
    public void writeAndReadAPayloadWithDifferentChunkSizes() throws IOException {
        for (String str : TEST_PAYLOAD.split(" ")) {
            this.sink.write(str.getBytes(StandardCharsets.UTF_8));
        }
        this.sink.close();
        String replace = TEST_PAYLOAD.replace(" ", "");
        PollingProber.probe(() -> {
            MatcherAssert.assertThat(new String(this.consumer.getConsumedData()), Matchers.is(replace));
            MatcherAssert.assertThat(Boolean.valueOf(this.consumer.finishedReading()), Matchers.is(true));
            MatcherAssert.assertThat(this.consumer.getErrorWhileConsuming(), Matchers.is(Optional.empty()));
            return true;
        });
    }

    @Test
    public void writeBytePerByte() throws IOException {
        for (byte b : TEST_PAYLOAD.getBytes(StandardCharsets.UTF_8)) {
            this.sink.write(b);
        }
        this.sink.close();
        PollingProber.probe(() -> {
            MatcherAssert.assertThat(new String(this.consumer.getConsumedData()), Matchers.is(TEST_PAYLOAD));
            MatcherAssert.assertThat(Boolean.valueOf(this.consumer.finishedReading()), Matchers.is(true));
            MatcherAssert.assertThat(this.consumer.getErrorWhileConsuming(), Matchers.is(Optional.empty()));
            return true;
        });
    }

    @Test
    public void writeLessBytesThanBufferSize() throws IOException {
        this.sink.write("abc".getBytes(StandardCharsets.UTF_8));
        PollingProber.probe(() -> {
            MatcherAssert.assertThat(new String(this.consumer.getConsumedData()), Matchers.is("abc"));
            MatcherAssert.assertThat(Boolean.valueOf(this.consumer.finishedReading()), Matchers.is(false));
            return true;
        });
        this.sink.close();
        PollingProber.probe(() -> {
            MatcherAssert.assertThat(Boolean.valueOf(this.consumer.finishedReading()), Matchers.is(true));
            MatcherAssert.assertThat(this.consumer.getErrorWhileConsuming(), Matchers.is(Optional.empty()));
            return true;
        });
    }

    private Supplier<? extends Throwable> errorShouldBePresent() {
        return () -> {
            return new AssertionFailedError("Error should be present");
        };
    }
}
