/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.server.coordinator;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ChildrenDeletable;
import org.apache.curator.framework.api.GetDataWatchBackgroundStatable;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.curator.CuratorTestBase;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.coordination.DataSegmentChangeCallback;
import org.apache.druid.server.coordination.DataSegmentChangeHandler;
import org.apache.druid.server.coordination.DataSegmentChangeRequest;
import org.apache.druid.server.coordination.SegmentChangeRequestDrop;
import org.apache.druid.server.coordination.SegmentChangeRequestLoad;
import org.apache.druid.server.coordinator.CuratorLoadQueuePeon;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.LoadQueuePeon;
import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class LoadQueuePeonTest
extends CuratorTestBase {
    private static final String LOAD_QUEUE_PATH = "/druid/loadqueue/localhost:1234";
    private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
    private LoadQueuePeon loadQueuePeon;
    private PathChildrenCache loadQueueCache;

    @Before
    public void setUp() throws Exception {
        this.setupServerAndCurator();
        this.curator.start();
        this.curator.blockUntilConnected();
        this.curator.create().creatingParentsIfNeeded().forPath(LOAD_QUEUE_PATH);
        this.loadQueueCache = new PathChildrenCache(this.curator, LOAD_QUEUE_PATH, true, true, Execs.singleThreaded((String)"load_queue_cache-%d"));
    }

    @Test
    public void testMultipleLoadDropSegments() throws Exception {
        int i;
        this.loadQueuePeon = new CuratorLoadQueuePeon(this.curator, LOAD_QUEUE_PATH, this.jsonMapper, Execs.scheduledSingleThreaded((String)"test_load_queue_peon_scheduled-%d"), Execs.singleThreaded((String)"test_load_queue_peon-%d"), (DruidCoordinatorConfig)new TestDruidCoordinatorConfig.Builder().withCoordinatorKillMaxSegments(10).withCoordinatorKillIgnoreDurationToRetain(false).build());
        this.loadQueuePeon.start();
        final ConcurrentHashMap<SegmentId, CountDownLatch> loadRequestSignals = new ConcurrentHashMap<SegmentId, CountDownLatch>(5);
        final ConcurrentHashMap<SegmentId, CountDownLatch> dropRequestSignals = new ConcurrentHashMap<SegmentId, CountDownLatch>(5);
        final ConcurrentHashMap<SegmentId, CountDownLatch> segmentLoadedSignals = new ConcurrentHashMap<SegmentId, CountDownLatch>(5);
        ConcurrentHashMap<SegmentId, CountDownLatch> segmentDroppedSignals = new ConcurrentHashMap<SegmentId, CountDownLatch>(5);
        List segmentToDrop = Lists.transform((List)ImmutableList.of((Object)"2014-10-26T00:00:00Z/P1D", (Object)"2014-10-25T00:00:00Z/P1D", (Object)"2014-10-24T00:00:00Z/P1D", (Object)"2014-10-23T00:00:00Z/P1D", (Object)"2014-10-22T00:00:00Z/P1D"), (Function)new Function<String, DataSegment>(){

            public DataSegment apply(String intervalStr) {
                DataSegment dataSegment = LoadQueuePeonTest.this.dataSegmentWithInterval(intervalStr);
                return dataSegment;
            }
        });
        CountDownLatch[] dropRequestLatches = new CountDownLatch[5];
        CountDownLatch[] dropSegmentLatches = new CountDownLatch[5];
        for (i = 0; i < 5; ++i) {
            dropRequestLatches[i] = new CountDownLatch(1);
            dropSegmentLatches[i] = new CountDownLatch(1);
        }
        i = 0;
        for (DataSegment s : segmentToDrop) {
            dropRequestSignals.put(s.getId(), dropRequestLatches[i]);
            segmentDroppedSignals.put(s.getId(), dropSegmentLatches[i++]);
        }
        List segmentToLoad = Lists.transform((List)ImmutableList.of((Object)"2014-10-27T00:00:00Z/P1D", (Object)"2014-10-29T00:00:00Z/P1M", (Object)"2014-10-31T00:00:00Z/P1D", (Object)"2014-10-30T00:00:00Z/P1D", (Object)"2014-10-28T00:00:00Z/P1D"), (Function)new Function<String, DataSegment>(){

            public DataSegment apply(String intervalStr) {
                DataSegment dataSegment = LoadQueuePeonTest.this.dataSegmentWithInterval(intervalStr);
                loadRequestSignals.put(dataSegment.getId(), new CountDownLatch(1));
                segmentLoadedSignals.put(dataSegment.getId(), new CountDownLatch(1));
                return dataSegment;
            }
        });
        CountDownLatch[] loadRequestLatches = new CountDownLatch[5];
        CountDownLatch[] segmentLoadedLatches = new CountDownLatch[5];
        for (i = 0; i < 5; ++i) {
            loadRequestLatches[i] = new CountDownLatch(1);
            segmentLoadedLatches[i] = new CountDownLatch(1);
        }
        i = 0;
        for (DataSegment s : segmentToDrop) {
            loadRequestSignals.put(s.getId(), loadRequestLatches[i]);
            segmentLoadedSignals.put(s.getId(), segmentLoadedLatches[i++]);
        }
        List expectedLoadOrder = Lists.transform((List)ImmutableList.of((Object)"2014-10-29T00:00:00Z/P1M", (Object)"2014-10-31T00:00:00Z/P1D", (Object)"2014-10-30T00:00:00Z/P1D", (Object)"2014-10-28T00:00:00Z/P1D", (Object)"2014-10-27T00:00:00Z/P1D"), intervalStr -> this.dataSegmentWithInterval((String)intervalStr));
        DataSegmentChangeHandler handler = new DataSegmentChangeHandler(){

            public void addSegment(DataSegment segment, DataSegmentChangeCallback callback) {
                ((CountDownLatch)loadRequestSignals.get(segment.getId())).countDown();
            }

            public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback) {
                ((CountDownLatch)dropRequestSignals.get(segment.getId())).countDown();
            }
        };
        this.loadQueueCache.getListenable().addListener((client, event) -> {
            if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
                DataSegmentChangeRequest request = (DataSegmentChangeRequest)this.jsonMapper.readValue(event.getData().getData(), DataSegmentChangeRequest.class);
                request.go(handler, null);
            }
        });
        this.loadQueueCache.start();
        for (DataSegment segment : segmentToDrop) {
            this.loadQueuePeon.dropSegment(segment, success -> ((CountDownLatch)segmentDroppedSignals.get(segment.getId())).countDown());
        }
        for (DataSegment segment : segmentToLoad) {
            this.loadQueuePeon.loadSegment(segment, success -> ((CountDownLatch)segmentLoadedSignals.get(segment.getId())).countDown());
        }
        Assert.assertEquals((long)6000L, (long)this.loadQueuePeon.getLoadQueueSize());
        Assert.assertEquals((long)5L, (long)this.loadQueuePeon.getSegmentsToLoad().size());
        Assert.assertEquals((long)5L, (long)this.loadQueuePeon.getSegmentsToDrop().size());
        Assert.assertEquals((long)0L, (long)this.loadQueuePeon.getTimedOutSegments().size());
        for (DataSegment segment : segmentToDrop) {
            String dropRequestPath = ZKPaths.makePath((String)LOAD_QUEUE_PATH, (String)segment.getId().toString());
            Assert.assertTrue((String)("Latch not counted down for " + dropRequestSignals.get(segment.getId())), (boolean)((CountDownLatch)dropRequestSignals.get(segment.getId())).await(10L, TimeUnit.SECONDS));
            Assert.assertNotNull((String)("Path " + dropRequestPath + " doesn't exist"), (Object)this.curator.checkExists().forPath(dropRequestPath));
            Assert.assertEquals((Object)segment, (Object)((SegmentChangeRequestDrop)this.jsonMapper.readValue((byte[])((GetDataWatchBackgroundStatable)this.curator.getData().decompressed()).forPath(dropRequestPath), DataSegmentChangeRequest.class)).getSegment());
            ((ChildrenDeletable)this.curator.delete().guaranteed()).forPath(dropRequestPath);
            Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch((CountDownLatch)segmentDroppedSignals.get(segment.getId())));
        }
        for (DataSegment segment : expectedLoadOrder) {
            String loadRequestPath = ZKPaths.makePath((String)LOAD_QUEUE_PATH, (String)segment.getId().toString());
            Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch((CountDownLatch)loadRequestSignals.get(segment.getId())));
            Assert.assertNotNull((Object)this.curator.checkExists().forPath(loadRequestPath));
            Assert.assertEquals((Object)segment, (Object)((SegmentChangeRequestLoad)this.jsonMapper.readValue((byte[])((GetDataWatchBackgroundStatable)this.curator.getData().decompressed()).forPath(loadRequestPath), DataSegmentChangeRequest.class)).getSegment());
            ((ChildrenDeletable)this.curator.delete().guaranteed()).forPath(loadRequestPath);
            Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch((CountDownLatch)segmentLoadedSignals.get(segment.getId())));
        }
    }

    @Test
    public void testFailAssignForNonTimeoutFailures() throws Exception {
        DataSegment segment = this.dataSegmentWithInterval("2014-10-22T00:00:00Z/P1D");
        CountDownLatch segmentLoadedSignal = new CountDownLatch(1);
        this.loadQueuePeon = new CuratorLoadQueuePeon(this.curator, LOAD_QUEUE_PATH, null, Execs.scheduledSingleThreaded((String)"test_load_queue_peon_scheduled-%d"), Execs.singleThreaded((String)"test_load_queue_peon-%d"), (DruidCoordinatorConfig)new TestDruidCoordinatorConfig.Builder().withLoadTimeoutDelay(new Duration(1L)).withCoordinatorKillMaxSegments(10).withCoordinatorKillIgnoreDurationToRetain(false).build());
        this.loadQueuePeon.start();
        this.loadQueueCache.start();
        this.loadQueuePeon.loadSegment(segment, success -> segmentLoadedSignal.countDown());
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(segmentLoadedSignal));
        Assert.assertEquals((long)0L, (long)this.loadQueuePeon.getSegmentsToLoad().size());
        Assert.assertEquals((long)0L, (long)this.loadQueuePeon.getLoadQueueSize());
        Assert.assertEquals((long)0L, (long)this.loadQueuePeon.getTimedOutSegments().size());
    }

    @Test
    public void testFailAssignForLoadDropTimeout() throws Exception {
        DataSegment segment = this.dataSegmentWithInterval("2014-10-22T00:00:00Z/P1D");
        final CountDownLatch loadRequestSignal = new CountDownLatch(1);
        CountDownLatch segmentLoadedSignal = new CountDownLatch(1);
        CountDownLatch delayedSegmentLoadedSignal = new CountDownLatch(2);
        final CountDownLatch loadRequestRemoveSignal = new CountDownLatch(1);
        this.loadQueuePeon = new CuratorLoadQueuePeon(this.curator, LOAD_QUEUE_PATH, this.jsonMapper, Execs.scheduledSingleThreaded((String)"test_load_queue_peon_scheduled-%d"), Execs.singleThreaded((String)"test_load_queue_peon-%d"), (DruidCoordinatorConfig)new TestDruidCoordinatorConfig.Builder().withLoadTimeoutDelay(new Duration(1L)).withCoordinatorKillMaxSegments(10).withCoordinatorKillIgnoreDurationToRetain(false).build());
        this.loadQueuePeon.start();
        this.loadQueueCache.getListenable().addListener((Object)new PathChildrenCacheListener(){

            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
                switch (event.getType()) {
                    case CHILD_ADDED: {
                        loadRequestSignal.countDown();
                        break;
                    }
                    case CHILD_REMOVED: {
                        loadRequestRemoveSignal.countDown();
                        break;
                    }
                }
            }
        });
        this.loadQueueCache.start();
        this.loadQueuePeon.loadSegment(segment, success -> {
            segmentLoadedSignal.countDown();
            delayedSegmentLoadedSignal.countDown();
        });
        String loadRequestPath = ZKPaths.makePath((String)LOAD_QUEUE_PATH, (String)segment.getId().toString());
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(loadRequestSignal));
        Assert.assertNotNull((Object)this.curator.checkExists().forPath(loadRequestPath));
        Assert.assertEquals((Object)segment, (Object)((SegmentChangeRequestLoad)this.jsonMapper.readValue((byte[])((GetDataWatchBackgroundStatable)this.curator.getData().decompressed()).forPath(loadRequestPath), DataSegmentChangeRequest.class)).getSegment());
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(segmentLoadedSignal));
        Assert.assertEquals((long)1L, (long)this.loadQueuePeon.getSegmentsToLoad().size());
        Assert.assertEquals((long)1200L, (long)this.loadQueuePeon.getLoadQueueSize());
        Assert.assertEquals((long)1L, (long)this.loadQueuePeon.getTimedOutSegments().size());
        ((ChildrenDeletable)this.curator.delete().guaranteed()).forPath(loadRequestPath);
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(delayedSegmentLoadedSignal));
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(loadRequestRemoveSignal));
        Assert.assertEquals((long)0L, (long)this.loadQueuePeon.getSegmentsToLoad().size());
        Assert.assertEquals((long)0L, (long)this.loadQueuePeon.getLoadQueueSize());
        Assert.assertEquals((long)0L, (long)this.loadQueuePeon.getTimedOutSegments().size());
    }

    private DataSegment dataSegmentWithInterval(String intervalStr) {
        return DataSegment.builder().dataSource("test_load_queue_peon").interval(Intervals.of((String)intervalStr)).loadSpec((Map)ImmutableMap.of()).version("2015-05-27T03:38:35.683Z").dimensions((List)ImmutableList.of()).metrics((List)ImmutableList.of()).shardSpec((ShardSpec)NoneShardSpec.instance()).binaryVersion(Integer.valueOf(9)).size(1200L).build();
    }

    @After
    public void tearDown() throws Exception {
        this.loadQueueCache.close();
        this.loadQueuePeon.stop();
        this.tearDownServerAndCurator();
    }
}

