/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.v2.app;

import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TestRuntimeEstimators;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.ExponentiallySmoothedTaskRuntimeEstimator;
import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator;
import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.junit.Assert;
import org.junit.Test;

public class TestRuntimeEstimators {
    private static int INITIAL_NUMBER_FREE_SLOTS = 600;
    private static int MAP_SLOT_REQUIREMENT = 3;
    private static int REDUCE_SLOT_REQUIREMENT = 4;
    private static int MAP_TASKS = 200;
    private static int REDUCE_TASKS = 150;
    MockClock clock;
    Job myJob;
    AppContext myAppContext;
    private static final Log LOG = LogFactory.getLog(TestRuntimeEstimators.class);
    private final AtomicInteger slotsInUse = new AtomicInteger(0);
    AsyncDispatcher dispatcher;
    DefaultSpeculator speculator;
    TaskRuntimeEstimator estimator;
    private final AtomicInteger completedMaps = new AtomicInteger(0);
    private final AtomicInteger completedReduces = new AtomicInteger(0);
    private final AtomicInteger successfulSpeculations = new AtomicInteger(0);
    private final AtomicLong taskTimeSavedBySpeculation = new AtomicLong(0L);
    private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);

    private void coreTestEstimator(TaskRuntimeEstimator testedEstimator, int expectedSpeculations) {
        this.estimator = testedEstimator;
        this.clock = new MockClock();
        this.dispatcher = new AsyncDispatcher();
        this.myJob = null;
        this.slotsInUse.set(0);
        this.completedMaps.set(0);
        this.completedReduces.set(0);
        this.successfulSpeculations.set(0);
        this.taskTimeSavedBySpeculation.set(0L);
        this.clock.advanceTime(1000L);
        Configuration conf = new Configuration();
        this.myAppContext = new MyAppContext(this, MAP_TASKS, REDUCE_TASKS);
        this.myJob = (Job)this.myAppContext.getAllJobs().values().iterator().next();
        this.estimator.contextualize(conf, this.myAppContext);
        this.speculator = new DefaultSpeculator(conf, this.myAppContext, this.estimator, (Clock)this.clock);
        this.dispatcher.register(Speculator.EventType.class, (EventHandler)this.speculator);
        this.dispatcher.register(TaskEventType.class, (EventHandler)new SpeculationRequestEventHandler(this));
        this.dispatcher.init(conf);
        this.dispatcher.start();
        this.speculator.init(conf);
        this.speculator.start();
        int undoneMaps = MAP_TASKS;
        int undoneReduces = REDUCE_TASKS;
        LinkedList allTasksSequence = new LinkedList();
        allTasksSequence.addAll(this.myJob.getTasks(TaskType.MAP).values());
        allTasksSequence.addAll(this.myJob.getTasks(TaskType.REDUCE).values());
        while (undoneMaps + undoneReduces > 0) {
            undoneMaps = 0;
            undoneReduces = 0;
            for (Task task : allTasksSequence) {
                if (!task.isFinished()) {
                    if (task.getType() == TaskType.MAP) {
                        ++undoneMaps;
                    } else {
                        ++undoneReduces;
                    }
                }
                for (TaskAttempt attempt : task.getAttempts().values()) {
                    SpeculatorEvent event;
                    if (attempt.getState() == TaskAttemptState.NEW && INITIAL_NUMBER_FREE_SLOTS - this.slotsInUse.get() >= this.taskTypeSlots(task.getType())) {
                        MyTaskAttemptImpl attemptImpl = (MyTaskAttemptImpl)attempt;
                        event = new SpeculatorEvent(attempt.getID(), false, this.clock.getTime());
                        this.speculator.handle(event);
                        attemptImpl.startUp();
                        continue;
                    }
                    TaskAttemptStatusUpdateEvent.TaskAttemptStatus status = new TaskAttemptStatusUpdateEvent.TaskAttemptStatus();
                    status.id = attempt.getID();
                    status.progress = attempt.getProgress();
                    status.stateString = attempt.getState().name();
                    status.taskState = attempt.getState();
                    event = new SpeculatorEvent(status, this.clock.getTime());
                    this.speculator.handle(event);
                }
            }
            long startTime = System.currentTimeMillis();
            while (!this.speculator.eventQueueEmpty()) {
                Thread.yield();
                if (System.currentTimeMillis() <= startTime + 130000L) continue;
                return;
            }
            this.clock.advanceTime(1000L);
            if (this.clock.getTime() % 10000L != 0L) continue;
            this.speculator.scanForSpeculations();
        }
        Assert.assertEquals((String)"We got the wrong number of successful speculations.", (long)expectedSpeculations, (long)this.successfulSpeculations.get());
    }

    @Test
    public void testLegacyEstimator() throws Exception {
        LegacyTaskRuntimeEstimator specificEstimator = new LegacyTaskRuntimeEstimator();
        this.coreTestEstimator((TaskRuntimeEstimator)specificEstimator, 3);
    }

    @Test
    public void testExponentialEstimator() throws Exception {
        ExponentiallySmoothedTaskRuntimeEstimator specificEstimator = new ExponentiallySmoothedTaskRuntimeEstimator();
        this.coreTestEstimator((TaskRuntimeEstimator)specificEstimator, 3);
    }

    int taskTypeSlots(TaskType type) {
        return type == TaskType.MAP ? MAP_SLOT_REQUIREMENT : REDUCE_SLOT_REQUIREMENT;
    }

    void addAttempt(Task task) {
        MyTaskImpl myTask = (MyTaskImpl)task;
        myTask.addAttempt();
    }

    static /* synthetic */ RecordFactory access$000(TestRuntimeEstimators x0) {
        return x0.recordFactory;
    }

    static /* synthetic */ AtomicInteger access$100(TestRuntimeEstimators x0) {
        return x0.completedMaps;
    }

    static /* synthetic */ AtomicInteger access$200(TestRuntimeEstimators x0) {
        return x0.completedReduces;
    }

    static /* synthetic */ AtomicInteger access$300(TestRuntimeEstimators x0) {
        return x0.slotsInUse;
    }

    static /* synthetic */ AtomicInteger access$400(TestRuntimeEstimators x0) {
        return x0.successfulSpeculations;
    }

    static /* synthetic */ AtomicLong access$500(TestRuntimeEstimators x0) {
        return x0.taskTimeSavedBySpeculation;
    }

    static /* synthetic */ Log access$600() {
        return LOG;
    }
}

