/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.segment.realtime.firehose;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
import org.apache.commons.io.IOUtils;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.JSONParseSpec;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import org.apache.druid.server.metrics.EventReceiverFirehoseMetric;
import org.apache.druid.server.metrics.EventReceiverFirehoseRegister;
import org.apache.druid.server.security.AllowAllAuthenticator;
import org.apache.druid.server.security.AuthTestUtils;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class EventReceiverFirehoseTest {
    private static final int CAPACITY = 300;
    private static final int NUM_EVENTS = 100;
    private static final long MAX_IDLE_TIME_MILLIS = TimeUnit.SECONDS.toMillis(20L);
    private static final String SERVICE_NAME = "test_firehose";
    private final String inputRow = "[{\n  \"timestamp\":123,\n  \"d1\":\"v1\"\n}]";
    private EventReceiverFirehoseFactory eventReceiverFirehoseFactory;
    private EventReceiverFirehoseFactory.EventReceiverFirehose firehose;
    private EventReceiverFirehoseRegister register = new EventReceiverFirehoseRegister();
    private HttpServletRequest req;

    @Before
    public void setUp() {
        this.req = (HttpServletRequest)EasyMock.createMock(HttpServletRequest.class);
        this.eventReceiverFirehoseFactory = new EventReceiverFirehoseFactory(SERVICE_NAME, Integer.valueOf(300), Long.valueOf(MAX_IDLE_TIME_MILLIS), null, (ObjectMapper)new DefaultObjectMapper(), (ObjectMapper)new DefaultObjectMapper(), this.register, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
        this.firehose = (EventReceiverFirehoseFactory.EventReceiverFirehose)this.eventReceiverFirehoseFactory.connect((InputRowParser)new MapInputRowParser((ParseSpec)new JSONParseSpec(new TimestampSpec("timestamp", "auto", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas((List)ImmutableList.of((Object)"d1")), null, null), null, null, null)), null);
    }

    @Test(timeout=60000L)
    public void testSingleThread() throws IOException, InterruptedException {
        for (int i = 0; i < 100; ++i) {
            this.setUpRequestExpectations(null, null);
            InputStream inputStream = IOUtils.toInputStream((String)"[{\n  \"timestamp\":123,\n  \"d1\":\"v1\"\n}]", (Charset)StandardCharsets.UTF_8);
            this.firehose.addAll(inputStream, this.req);
            Assert.assertEquals((long)(i + 1), (long)this.firehose.getCurrentBufferSize());
            inputStream.close();
        }
        EasyMock.verify((Object[])new Object[]{this.req});
        Iterable metrics = this.register.getMetrics();
        Assert.assertEquals((long)1L, (long)Iterables.size((Iterable)metrics));
        Map.Entry entry = (Map.Entry)Iterables.getLast((Iterable)metrics);
        Assert.assertEquals((Object)SERVICE_NAME, entry.getKey());
        Assert.assertEquals((long)300L, (long)((EventReceiverFirehoseMetric)entry.getValue()).getCapacity());
        Assert.assertEquals((long)300L, (long)this.firehose.getCapacity());
        Assert.assertEquals((long)100L, (long)((EventReceiverFirehoseMetric)entry.getValue()).getCurrentBufferSize());
        Assert.assertEquals((long)100L, (long)this.firehose.getCurrentBufferSize());
        for (int i = 99; i >= 0; --i) {
            Assert.assertTrue((boolean)this.firehose.hasMore());
            Assert.assertNotNull((Object)this.firehose.nextRow());
            Assert.assertEquals((long)i, (long)this.firehose.getCurrentBufferSize());
        }
        Assert.assertEquals((long)300L, (long)((EventReceiverFirehoseMetric)entry.getValue()).getCapacity());
        Assert.assertEquals((long)300L, (long)this.firehose.getCapacity());
        Assert.assertEquals((long)0L, (long)((EventReceiverFirehoseMetric)entry.getValue()).getCurrentBufferSize());
        Assert.assertEquals((long)0L, (long)this.firehose.getCurrentBufferSize());
        this.firehose.close();
        Assert.assertFalse((boolean)this.firehose.hasMore());
        Assert.assertEquals((long)0L, (long)Iterables.size((Iterable)this.register.getMetrics()));
        this.awaitDelayedExecutorThreadTerminated();
    }

    @Test(timeout=60000L)
    public void testMultipleThreads() throws InterruptedException, IOException, TimeoutException, ExecutionException {
        EasyMock.expect((Object)this.req.getAttribute("Druid-Authorization-Checked")).andReturn(null).anyTimes();
        EasyMock.expect((Object)this.req.getAttribute("Druid-Allow-Unsecured-Path")).andReturn(null).anyTimes();
        EasyMock.expect((Object)this.req.getAttribute("Druid-Authentication-Result")).andReturn((Object)AllowAllAuthenticator.ALLOW_ALL_RESULT).anyTimes();
        this.req.setAttribute("Druid-Authorization-Checked", (Object)true);
        EasyMock.expectLastCall().anyTimes();
        EasyMock.expect((Object)this.req.getContentType()).andReturn((Object)"application/json").times(200);
        EasyMock.expect((Object)this.req.getHeader("X-Firehose-Producer-Id")).andReturn(null).times(200);
        EasyMock.replay((Object[])new Object[]{this.req});
        ExecutorService executorService = Execs.singleThreaded((String)"single_thread");
        Future<Boolean> future = executorService.submit(new Callable<Boolean>(){

            @Override
            public Boolean call() throws Exception {
                for (int i = 0; i < 100; ++i) {
                    InputStream inputStream = IOUtils.toInputStream((String)"[{\n  \"timestamp\":123,\n  \"d1\":\"v1\"\n}]", (Charset)StandardCharsets.UTF_8);
                    EventReceiverFirehoseTest.this.firehose.addAll(inputStream, EventReceiverFirehoseTest.this.req);
                    inputStream.close();
                }
                return true;
            }
        });
        for (int i = 0; i < 100; ++i) {
            InputStream inputStream = IOUtils.toInputStream((String)"[{\n  \"timestamp\":123,\n  \"d1\":\"v1\"\n}]", (Charset)StandardCharsets.UTF_8);
            this.firehose.addAll(inputStream, this.req);
            inputStream.close();
        }
        future.get(10L, TimeUnit.SECONDS);
        EasyMock.verify((Object[])new Object[]{this.req});
        Iterable metrics = this.register.getMetrics();
        Assert.assertEquals((long)1L, (long)Iterables.size((Iterable)metrics));
        Map.Entry entry = (Map.Entry)Iterables.getLast((Iterable)metrics);
        Assert.assertEquals((Object)SERVICE_NAME, entry.getKey());
        Assert.assertEquals((long)300L, (long)((EventReceiverFirehoseMetric)entry.getValue()).getCapacity());
        Assert.assertEquals((long)300L, (long)this.firehose.getCapacity());
        Assert.assertEquals((long)200L, (long)((EventReceiverFirehoseMetric)entry.getValue()).getCurrentBufferSize());
        Assert.assertEquals((long)200L, (long)this.firehose.getCurrentBufferSize());
        for (int i = 199; i >= 0; --i) {
            Assert.assertTrue((boolean)this.firehose.hasMore());
            Assert.assertNotNull((Object)this.firehose.nextRow());
            Assert.assertEquals((long)i, (long)this.firehose.getCurrentBufferSize());
        }
        Assert.assertEquals((long)300L, (long)((EventReceiverFirehoseMetric)entry.getValue()).getCapacity());
        Assert.assertEquals((long)300L, (long)this.firehose.getCapacity());
        Assert.assertEquals((long)0L, (long)((EventReceiverFirehoseMetric)entry.getValue()).getCurrentBufferSize());
        Assert.assertEquals((long)0L, (long)this.firehose.getCurrentBufferSize());
        this.firehose.close();
        Assert.assertFalse((boolean)this.firehose.hasMore());
        Assert.assertEquals((long)0L, (long)Iterables.size((Iterable)this.register.getMetrics()));
        this.awaitDelayedExecutorThreadTerminated();
        executorService.shutdownNow();
    }

    @Test(expected=ISE.class)
    public void testDuplicateRegistering() {
        EventReceiverFirehoseFactory eventReceiverFirehoseFactory2 = new EventReceiverFirehoseFactory(SERVICE_NAME, Integer.valueOf(300), Long.valueOf(MAX_IDLE_TIME_MILLIS), null, (ObjectMapper)new DefaultObjectMapper(), (ObjectMapper)new DefaultObjectMapper(), this.register, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
        EventReceiverFirehoseFactory.EventReceiverFirehose firehose2 = (EventReceiverFirehoseFactory.EventReceiverFirehose)eventReceiverFirehoseFactory2.connect((InputRowParser)new MapInputRowParser((ParseSpec)new JSONParseSpec(new TimestampSpec("timestamp", "auto", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas((List)ImmutableList.of((Object)"d1")), null, null), null, null, null)), null);
    }

    @Test(timeout=60000L)
    public void testShutdownWithPrevTime() throws Exception {
        EasyMock.expect((Object)this.req.getAttribute("Druid-Authorization-Checked")).andReturn(null).anyTimes();
        EasyMock.expect((Object)this.req.getAttribute("Druid-Allow-Unsecured-Path")).andReturn(null).anyTimes();
        EasyMock.expect((Object)this.req.getAttribute("Druid-Authentication-Result")).andReturn((Object)AllowAllAuthenticator.ALLOW_ALL_RESULT).anyTimes();
        this.req.setAttribute("Druid-Authorization-Checked", (Object)true);
        EasyMock.expectLastCall().anyTimes();
        EasyMock.replay((Object[])new Object[]{this.req});
        this.firehose.shutdown(DateTimes.nowUtc().minusMinutes(2).toString(), this.req);
        this.awaitFirehoseClosed();
        this.awaitDelayedExecutorThreadTerminated();
    }

    private void awaitFirehoseClosed() throws InterruptedException {
        while (!this.firehose.isClosed()) {
            Thread.sleep(50L);
        }
    }

    private void awaitDelayedExecutorThreadTerminated() throws InterruptedException {
        this.firehose.getDelayedCloseExecutor().join();
    }

    @Test(timeout=60000L)
    public void testShutdown() throws Exception {
        EasyMock.expect((Object)this.req.getAttribute("Druid-Authorization-Checked")).andReturn(null).anyTimes();
        EasyMock.expect((Object)this.req.getAttribute("Druid-Allow-Unsecured-Path")).andReturn(null).anyTimes();
        EasyMock.expect((Object)this.req.getAttribute("Druid-Authentication-Result")).andReturn((Object)AllowAllAuthenticator.ALLOW_ALL_RESULT).anyTimes();
        this.req.setAttribute("Druid-Authorization-Checked", (Object)true);
        EasyMock.expectLastCall().anyTimes();
        EasyMock.replay((Object[])new Object[]{this.req});
        this.firehose.shutdown(DateTimes.nowUtc().plusMillis(100).toString(), this.req);
        this.awaitFirehoseClosed();
        this.awaitDelayedExecutorThreadTerminated();
    }

    @Test
    public void testProducerSequence() throws IOException {
        for (int i = 0; i < 100; ++i) {
            this.setUpRequestExpectations("producer", String.valueOf(i));
            InputStream inputStream = IOUtils.toInputStream((String)"[{\n  \"timestamp\":123,\n  \"d1\":\"v1\"\n}]", (Charset)StandardCharsets.UTF_8);
            this.firehose.addAll(inputStream, this.req);
            Assert.assertEquals((long)(i + 1), (long)this.firehose.getCurrentBufferSize());
            inputStream.close();
        }
        EasyMock.verify((Object[])new Object[]{this.req});
        Iterable metrics = this.register.getMetrics();
        Assert.assertEquals((long)1L, (long)Iterables.size((Iterable)metrics));
        Map.Entry entry = (Map.Entry)Iterables.getLast((Iterable)metrics);
        Assert.assertEquals((Object)SERVICE_NAME, entry.getKey());
        Assert.assertEquals((long)300L, (long)((EventReceiverFirehoseMetric)entry.getValue()).getCapacity());
        Assert.assertEquals((long)300L, (long)this.firehose.getCapacity());
        Assert.assertEquals((long)100L, (long)((EventReceiverFirehoseMetric)entry.getValue()).getCurrentBufferSize());
        Assert.assertEquals((long)100L, (long)this.firehose.getCurrentBufferSize());
        for (int i = 99; i >= 0; --i) {
            Assert.assertTrue((boolean)this.firehose.hasMore());
            Assert.assertNotNull((Object)this.firehose.nextRow());
            Assert.assertEquals((long)i, (long)this.firehose.getCurrentBufferSize());
        }
        Assert.assertEquals((long)300L, (long)((EventReceiverFirehoseMetric)entry.getValue()).getCapacity());
        Assert.assertEquals((long)300L, (long)this.firehose.getCapacity());
        Assert.assertEquals((long)0L, (long)((EventReceiverFirehoseMetric)entry.getValue()).getCurrentBufferSize());
        Assert.assertEquals((long)0L, (long)this.firehose.getCurrentBufferSize());
        this.firehose.close();
        Assert.assertFalse((boolean)this.firehose.hasMore());
        Assert.assertEquals((long)0L, (long)Iterables.size((Iterable)this.register.getMetrics()));
    }

    @Test
    public void testLowProducerSequence() throws IOException {
        for (int i = 0; i < 100; ++i) {
            this.setUpRequestExpectations("producer", "1");
            InputStream inputStream = IOUtils.toInputStream((String)"[{\n  \"timestamp\":123,\n  \"d1\":\"v1\"\n}]", (Charset)StandardCharsets.UTF_8);
            Response response = this.firehose.addAll(inputStream, this.req);
            Assert.assertEquals((long)Response.Status.OK.getStatusCode(), (long)response.getStatus());
            Assert.assertEquals((long)1L, (long)this.firehose.getCurrentBufferSize());
            inputStream.close();
        }
        EasyMock.verify((Object[])new Object[]{this.req});
        this.firehose.close();
    }

    @Test
    public void testMissingProducerSequence() throws IOException {
        this.setUpRequestExpectations("producer", null);
        InputStream inputStream = IOUtils.toInputStream((String)"[{\n  \"timestamp\":123,\n  \"d1\":\"v1\"\n}]", (Charset)StandardCharsets.UTF_8);
        Response response = this.firehose.addAll(inputStream, this.req);
        Assert.assertEquals((long)Response.Status.BAD_REQUEST.getStatusCode(), (long)response.getStatus());
        inputStream.close();
        EasyMock.verify((Object[])new Object[]{this.req});
        this.firehose.close();
    }

    @Test
    public void testTooManyProducerIds() throws IOException {
        for (int i = 0; i < 9999; ++i) {
            this.setUpRequestExpectations("producer-" + i, "0");
            InputStream inputStream = IOUtils.toInputStream((String)"[{\n  \"timestamp\":123,\n  \"d1\":\"v1\"\n}]", (Charset)StandardCharsets.UTF_8);
            Response response = this.firehose.addAll(inputStream, this.req);
            Assert.assertEquals((long)Response.Status.OK.getStatusCode(), (long)response.getStatus());
            inputStream.close();
            Assert.assertTrue((boolean)this.firehose.hasMore());
            Assert.assertNotNull((Object)this.firehose.nextRow());
        }
        this.setUpRequestExpectations("toomany", "0");
        InputStream inputStream = IOUtils.toInputStream((String)"[{\n  \"timestamp\":123,\n  \"d1\":\"v1\"\n}]", (Charset)StandardCharsets.UTF_8);
        Response response = this.firehose.addAll(inputStream, this.req);
        Assert.assertEquals((long)Response.Status.FORBIDDEN.getStatusCode(), (long)response.getStatus());
        inputStream.close();
        EasyMock.verify((Object[])new Object[]{this.req});
        this.firehose.close();
    }

    @Test
    public void testNaNProducerSequence() throws IOException {
        this.setUpRequestExpectations("producer", "foo");
        InputStream inputStream = IOUtils.toInputStream((String)"[{\n  \"timestamp\":123,\n  \"d1\":\"v1\"\n}]", (Charset)StandardCharsets.UTF_8);
        Response response = this.firehose.addAll(inputStream, this.req);
        Assert.assertEquals((long)Response.Status.BAD_REQUEST.getStatusCode(), (long)response.getStatus());
        inputStream.close();
        EasyMock.verify((Object[])new Object[]{this.req});
        this.firehose.close();
    }

    private void setUpRequestExpectations(String producerId, String producerSequenceValue) {
        EasyMock.reset((Object[])new Object[]{this.req});
        EasyMock.expect((Object)this.req.getAttribute("Druid-Authorization-Checked")).andReturn(null).anyTimes();
        EasyMock.expect((Object)this.req.getAttribute("Druid-Allow-Unsecured-Path")).andReturn(null).anyTimes();
        EasyMock.expect((Object)this.req.getAttribute("Druid-Authentication-Result")).andReturn((Object)AllowAllAuthenticator.ALLOW_ALL_RESULT).anyTimes();
        this.req.setAttribute("Druid-Authorization-Checked", (Object)true);
        EasyMock.expectLastCall().anyTimes();
        EasyMock.expect((Object)this.req.getContentType()).andReturn((Object)"application/json");
        EasyMock.expect((Object)this.req.getHeader("X-Firehose-Producer-Id")).andReturn((Object)producerId);
        if (producerId != null) {
            EasyMock.expect((Object)this.req.getHeader("X-Firehose-Producer-Seq")).andReturn((Object)producerSequenceValue);
        }
        EasyMock.replay((Object[])new Object[]{this.req});
    }
}

