/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.query;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.query.ChainedExecutionQueryRunner;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.Druids;
import org.apache.druid.query.ForwardingQueryProcessingPool;
import org.apache.druid.query.PrioritizedExecutorService;
import org.apache.druid.query.PrioritizedQueryRunnerCallable;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class ChainedExecutionQueryRunnerTest {
    private final Lock neverRelease = new ReentrantLock();
    private QueryProcessingPool processingPool;

    @Before
    public void setup() {
        this.neverRelease.lock();
        this.processingPool = new ForwardingQueryProcessingPool(Execs.multiThreaded((int)2, (String)"ChainedExecutionQueryRunnerTestExecutor-%d"), Execs.scheduledSingleThreaded((String)"ChainedExecutionQueryRunnerTestExecutor-Timeout-%d"));
    }

    @After
    public void tearDown() {
        this.processingPool.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testQueryCancellation() throws Exception {
        DyingQueryRunner remainingRunner;
        DyingQueryRunner interrupted2;
        DyingQueryRunner interrupted1;
        PrioritizedExecutorService exec = PrioritizedExecutorService.create((Lifecycle)new Lifecycle(), (DruidProcessingConfig)new DruidProcessingConfig(){

            public String getFormatString() {
                return "test";
            }

            public int getNumThreads() {
                return 2;
            }
        });
        CountDownLatch queriesStarted = new CountDownLatch(2);
        CountDownLatch queriesInterrupted = new CountDownLatch(2);
        final CountDownLatch queryIsRegistered = new CountDownLatch(1);
        Capture capturedFuture = EasyMock.newCapture();
        QueryWatcher watcher = (QueryWatcher)EasyMock.createStrictMock(QueryWatcher.class);
        watcher.registerQueryFuture((Query)EasyMock.anyObject(), (ListenableFuture)EasyMock.and((Object)((ListenableFuture)EasyMock.anyObject()), (Object)((ListenableFuture)EasyMock.capture((Capture)capturedFuture))));
        EasyMock.expectLastCall().andAnswer((IAnswer)new IAnswer<Void>(){

            public Void answer() {
                queryIsRegistered.countDown();
                return null;
            }
        }).once();
        EasyMock.replay((Object[])new Object[]{watcher});
        ArrayBlockingQueue<DyingQueryRunner> interrupted = new ArrayBlockingQueue<DyingQueryRunner>(3);
        HashSet runners = Sets.newHashSet((Object[])new DyingQueryRunner[]{new DyingQueryRunner(queriesStarted, queriesInterrupted, interrupted), new DyingQueryRunner(queriesStarted, queriesInterrupted, interrupted), new DyingQueryRunner(queriesStarted, queriesInterrupted, interrupted)});
        ChainedExecutionQueryRunner chainedRunner = new ChainedExecutionQueryRunner(this.processingPool, watcher, (Iterable)Lists.newArrayList((Iterable)runners));
        TimeseriesQuery query = Druids.newTimeseriesQueryBuilder().dataSource("test").intervals("2014/2015").aggregators(Collections.singletonList(new CountAggregatorFactory("count"))).build();
        final Sequence seq = chainedRunner.run(QueryPlus.wrap((Query)query));
        Future<?> resultFuture = Execs.multiThreaded((int)1, (String)"ChainedExecutionQueryRunnerTest-%d").submit(new Runnable(){

            @Override
            public void run() {
                seq.toList();
            }
        });
        queryIsRegistered.await();
        queriesStarted.await();
        Assert.assertTrue((boolean)capturedFuture.hasCaptured());
        ListenableFuture future = (ListenableFuture)capturedFuture.getValue();
        future.cancel(true);
        QueryInterruptedException cause = null;
        try {
            resultFuture.get();
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof QueryInterruptedException));
            cause = (QueryInterruptedException)e.getCause();
        }
        queriesInterrupted.await();
        Assert.assertNotNull(cause);
        Assert.assertTrue((boolean)future.isCancelled());
        DyingQueryRunner dyingQueryRunner = interrupted1 = interrupted.poll();
        synchronized (dyingQueryRunner) {
            Assert.assertTrue((String)"runner 1 started", (boolean)interrupted1.hasStarted);
            Assert.assertTrue((String)"runner 1 interrupted", (boolean)interrupted1.interrupted);
        }
        DyingQueryRunner dyingQueryRunner2 = interrupted2 = interrupted.poll();
        synchronized (dyingQueryRunner2) {
            Assert.assertTrue((String)"runner 2 started", (boolean)interrupted2.hasStarted);
            Assert.assertTrue((String)"runner 2 interrupted", (boolean)interrupted2.interrupted);
        }
        runners.remove(interrupted1);
        runners.remove(interrupted2);
        DyingQueryRunner dyingQueryRunner3 = remainingRunner = (DyingQueryRunner)runners.iterator().next();
        synchronized (dyingQueryRunner3) {
            Assert.assertTrue((String)"runner 3 should be interrupted or not have started", (!remainingRunner.hasStarted || remainingRunner.interrupted ? 1 : 0) != 0);
        }
        Assert.assertFalse((String)"runner 1 not completed", (boolean)interrupted1.hasCompleted);
        Assert.assertFalse((String)"runner 2 not completed", (boolean)interrupted2.hasCompleted);
        Assert.assertFalse((String)"runner 3 not completed", (boolean)remainingRunner.hasCompleted);
        EasyMock.verify((Object[])new Object[]{watcher});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testQueryTimeout() throws Exception {
        DyingQueryRunner remainingRunner;
        DyingQueryRunner interrupted2;
        DyingQueryRunner interrupted1;
        PrioritizedExecutorService exec = PrioritizedExecutorService.create((Lifecycle)new Lifecycle(), (DruidProcessingConfig)new DruidProcessingConfig(){

            public String getFormatString() {
                return "test";
            }

            public int getNumThreads() {
                return 2;
            }
        });
        CountDownLatch queriesStarted = new CountDownLatch(2);
        CountDownLatch queriesInterrupted = new CountDownLatch(2);
        final CountDownLatch queryIsRegistered = new CountDownLatch(1);
        Capture capturedFuture = Capture.newInstance();
        QueryWatcher watcher = (QueryWatcher)EasyMock.createStrictMock(QueryWatcher.class);
        watcher.registerQueryFuture((Query)EasyMock.anyObject(), (ListenableFuture)EasyMock.and((Object)((ListenableFuture)EasyMock.anyObject()), (Object)((ListenableFuture)EasyMock.capture((Capture)capturedFuture))));
        EasyMock.expectLastCall().andAnswer((IAnswer)new IAnswer<Void>(){

            public Void answer() {
                queryIsRegistered.countDown();
                return null;
            }
        }).once();
        EasyMock.replay((Object[])new Object[]{watcher});
        ArrayBlockingQueue<DyingQueryRunner> interrupted = new ArrayBlockingQueue<DyingQueryRunner>(3);
        HashSet runners = Sets.newHashSet((Object[])new DyingQueryRunner[]{new DyingQueryRunner(queriesStarted, queriesInterrupted, interrupted), new DyingQueryRunner(queriesStarted, queriesInterrupted, interrupted), new DyingQueryRunner(queriesStarted, queriesInterrupted, interrupted)});
        ChainedExecutionQueryRunner chainedRunner = new ChainedExecutionQueryRunner(this.processingPool, watcher, (Iterable)Lists.newArrayList((Iterable)runners));
        TimeseriesQuery query = Druids.newTimeseriesQueryBuilder().dataSource("test").intervals("2014/2015").aggregators(Collections.singletonList(new CountAggregatorFactory("count"))).context((Map)ImmutableMap.of((Object)"timeout", (Object)100, (Object)"queryId", (Object)"test")).build();
        final Sequence seq = chainedRunner.run(QueryPlus.wrap((Query)query));
        Future<?> resultFuture = Execs.multiThreaded((int)1, (String)"ChainedExecutionQueryRunnerTest-%d").submit(new Runnable(){

            @Override
            public void run() {
                seq.toList();
            }
        });
        queryIsRegistered.await();
        queriesStarted.await();
        Assert.assertTrue((boolean)capturedFuture.hasCaptured());
        ListenableFuture future = (ListenableFuture)capturedFuture.getValue();
        QueryTimeoutException cause = null;
        try {
            resultFuture.get();
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof QueryTimeoutException));
            Assert.assertEquals((Object)"Query timeout", (Object)((QueryTimeoutException)e.getCause()).getErrorCode());
            cause = (QueryTimeoutException)e.getCause();
        }
        queriesInterrupted.await();
        Assert.assertNotNull(cause);
        Assert.assertTrue((boolean)future.isCancelled());
        DyingQueryRunner dyingQueryRunner = interrupted1 = interrupted.poll();
        synchronized (dyingQueryRunner) {
            Assert.assertTrue((String)"runner 1 started", (boolean)interrupted1.hasStarted);
            Assert.assertTrue((String)"runner 1 interrupted", (boolean)interrupted1.interrupted);
        }
        DyingQueryRunner dyingQueryRunner2 = interrupted2 = interrupted.poll();
        synchronized (dyingQueryRunner2) {
            Assert.assertTrue((String)"runner 2 started", (boolean)interrupted2.hasStarted);
            Assert.assertTrue((String)"runner 2 interrupted", (boolean)interrupted2.interrupted);
        }
        runners.remove(interrupted1);
        runners.remove(interrupted2);
        DyingQueryRunner dyingQueryRunner3 = remainingRunner = (DyingQueryRunner)runners.iterator().next();
        synchronized (dyingQueryRunner3) {
            Assert.assertTrue((String)"runner 3 should be interrupted or not have started", (!remainingRunner.hasStarted || remainingRunner.interrupted ? 1 : 0) != 0);
        }
        Assert.assertFalse((String)"runner 1 not completed", (boolean)interrupted1.hasCompleted);
        Assert.assertFalse((String)"runner 2 not completed", (boolean)interrupted2.hasCompleted);
        Assert.assertFalse((String)"runner 3 not completed", (boolean)remainingRunner.hasCompleted);
        EasyMock.verify((Object[])new Object[]{watcher});
    }

    @Test
    public void testSubmittedTaskType() {
        QueryProcessingPool queryProcessingPool = (QueryProcessingPool)Mockito.mock(QueryProcessingPool.class);
        QueryWatcher watcher = (QueryWatcher)EasyMock.createStrictMock(QueryWatcher.class);
        TimeseriesQuery query = Druids.newTimeseriesQueryBuilder().dataSource("test").intervals("2014/2015").aggregators(Collections.singletonList(new CountAggregatorFactory("count"))).context((Map)ImmutableMap.of((Object)"timeout", (Object)100, (Object)"queryId", (Object)"test")).build();
        List<QueryRunner> runners = Arrays.asList((QueryRunner)Mockito.mock(QueryRunner.class), (QueryRunner)Mockito.mock(QueryRunner.class));
        ChainedExecutionQueryRunner chainedRunner = new ChainedExecutionQueryRunner(queryProcessingPool, watcher, runners);
        Mockito.when((Object)queryProcessingPool.submitRunnerTask((PrioritizedQueryRunnerCallable)ArgumentMatchers.any())).thenReturn((Object)Futures.immediateFuture(Collections.singletonList(123)));
        chainedRunner.run(QueryPlus.wrap((Query)query)).toList();
        ArgumentCaptor captor = ArgumentCaptor.forClass(PrioritizedQueryRunnerCallable.class);
        ((QueryProcessingPool)Mockito.verify((Object)queryProcessingPool, (VerificationMode)Mockito.times((int)2))).submitRunnerTask((PrioritizedQueryRunnerCallable)captor.capture());
        List actual = captor.getAllValues().stream().map(PrioritizedQueryRunnerCallable::getRunner).collect(Collectors.toList());
        Assert.assertEquals(runners, actual);
    }

    @Test(timeout=10000L)
    public void testPerSegmentTimeout() {
        QueryRunner slowRunner = (queryPlus, responseContext) -> {
            try {
                Thread.sleep(500L);
                return Sequences.of((Object[])new Integer[]{2});
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        };
        QueryRunner fastRunner = (queryPlus, responseContext) -> Sequences.of((Object[])new Integer[]{1});
        QueryWatcher watcher = (QueryWatcher)EasyMock.createStrictMock(QueryWatcher.class);
        watcher.registerQueryFuture((Query)EasyMock.anyObject(), (ListenableFuture)EasyMock.anyObject());
        EasyMock.expectLastCall().anyTimes();
        EasyMock.replay((Object[])new Object[]{watcher});
        ChainedExecutionQueryRunner chainedRunner = new ChainedExecutionQueryRunner(this.processingPool, watcher, Arrays.asList(slowRunner, fastRunner));
        TimeseriesQuery query = Druids.newTimeseriesQueryBuilder().dataSource("test").intervals("2014/2015").aggregators(Collections.singletonList(new CountAggregatorFactory("count"))).context((Map)ImmutableMap.of((Object)"perSegmentTimeout", (Object)100L, (Object)"timeout", (Object)5000L)).queryId("test").build();
        Sequence seq = chainedRunner.run(QueryPlus.wrap((Query)query));
        List results = null;
        Exception thrown = null;
        try {
            results = seq.toList();
        }
        catch (Exception e) {
            thrown = e;
        }
        Assert.assertNull((String)"No results expected due to timeout", (Object)results);
        Assert.assertNotNull((String)"Exception should be thrown", (Object)thrown);
        Assert.assertTrue((String)"Should be QueryTimeoutException or caused by it", (boolean)(Throwables.getRootCause((Throwable)thrown) instanceof QueryTimeoutException));
        Assert.assertEquals((Object)"Query timeout, cancelling pending results for query [test]. Per-segment timeout exceeded.", (Object)thrown.getMessage());
        EasyMock.verify((Object[])new Object[]{watcher});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=5000L)
    public void test_perSegmentTimeout_crossQuery() throws Exception {
        CountDownLatch slowStarted = new CountDownLatch(2);
        CountDownLatch fastStarted = new CountDownLatch(1);
        QueryRunner slowRunner = (queryPlus, responseContext) -> {
            slowStarted.countDown();
            try {
                Thread.sleep(60000L);
            }
            catch (InterruptedException e) {
                throw new QueryInterruptedException((Throwable)e);
            }
            return Sequences.empty();
        };
        QueryRunner fastRunner = (queryPlus, responseContext) -> {
            fastStarted.countDown();
            return Sequences.simple(Collections.singletonList(new Result(null, (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)1)))));
        };
        TimeseriesQuery slowQuery = Druids.newTimeseriesQueryBuilder().dataSource("test").intervals("2014/2015").aggregators(Collections.singletonList(new CountAggregatorFactory("count"))).context((Map)ImmutableMap.of((Object)"timeout", (Object)300000L, (Object)"perSegmentTimeout", (Object)1000L)).queryId("slow").build();
        TimeseriesQuery fastQuery = Druids.newTimeseriesQueryBuilder().dataSource("test").intervals("2014/2015").aggregators(Collections.singletonList(new CountAggregatorFactory("count"))).context((Map)ImmutableMap.of((Object)"timeout", (Object)5000L, (Object)"perSegmentTimeout", (Object)3000L)).queryId("fast").build();
        ChainedExecutionQueryRunner slowChainedRunner = new ChainedExecutionQueryRunner(this.processingPool, QueryRunnerTestHelper.NOOP_QUERYWATCHER, Arrays.asList(slowRunner, slowRunner));
        ChainedExecutionQueryRunner fastChainedRunner = new ChainedExecutionQueryRunner(this.processingPool, QueryRunnerTestHelper.NOOP_QUERYWATCHER, Collections.singletonList(fastRunner));
        ExecutorService exec = Execs.multiThreaded((int)2, (String)"QueryExecutor-%d");
        try {
            Future<List> slowFuture = exec.submit(() -> slowChainedRunner.run(QueryPlus.wrap((Query)slowQuery)).toList());
            slowStarted.await();
            Future<List> fastFuture = exec.submit(() -> fastChainedRunner.run(QueryPlus.wrap((Query)fastQuery)).toList());
            boolean fastStartedEarly = fastStarted.await(500L, TimeUnit.MILLISECONDS);
            Assert.assertFalse((String)"Fast query should be blocked and not started while slow queries are running", (boolean)fastStartedEarly);
            ExecutionException ex = (ExecutionException)Assert.assertThrows(ExecutionException.class, slowFuture::get);
            Assert.assertTrue((boolean)(Throwables.getRootCause((Throwable)ex) instanceof QueryTimeoutException));
            Assert.assertEquals((Object)"Query timeout, cancelling pending results for query [slow]. Per-segment timeout exceeded.", (Object)ex.getCause().getMessage());
            Assert.assertEquals(Collections.singletonList(new Result(null, (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)1)))), (Object)fastFuture.get());
        }
        finally {
            exec.shutdownNow();
        }
    }

    private class DyingQueryRunner
    implements QueryRunner<Integer> {
        private final CountDownLatch start;
        private final CountDownLatch stop;
        private final Queue<DyingQueryRunner> interruptedRunners;
        private volatile boolean hasStarted = false;
        private volatile boolean hasCompleted = false;
        private volatile boolean interrupted = false;

        public DyingQueryRunner(CountDownLatch start, CountDownLatch stop, Queue<DyingQueryRunner> interruptedRunners) {
            this.start = start;
            this.stop = stop;
            this.interruptedRunners = interruptedRunners;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Sequence<Integer> run(QueryPlus<Integer> queryPlus, ResponseContext responseContext) {
            DyingQueryRunner dyingQueryRunner = this;
            synchronized (dyingQueryRunner) {
                try {
                    this.hasStarted = true;
                    this.start.countDown();
                    ChainedExecutionQueryRunnerTest.this.neverRelease.lockInterruptibly();
                }
                catch (InterruptedException e) {
                    this.interrupted = true;
                    this.interruptedRunners.offer(this);
                    this.stop.countDown();
                    throw new QueryInterruptedException((Throwable)e);
                }
            }
            this.hasCompleted = true;
            this.stop.countDown();
            return Sequences.simple(Collections.singletonList(123));
        }
    }
}

