/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.File;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.HeartbeatResponse;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.JobInProgress;
import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskInProgress;
import org.apache.hadoop.mapred.TaskTracker;
import org.apache.hadoop.mapred.UtilsForTests;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.split.JobSplit;

public class TestJobRetire
extends TestCase {
    static final Log LOG = LogFactory.getLog(TestJobRetire.class);
    static final Path testDir = new Path(System.getProperty("test.build.data", "/tmp"), "job-expiry-testing");

    private MiniMRCluster startCluster(JobConf conf, int numTrackers) throws IOException {
        conf.setLong("mapred.job.tracker.retiredjobs.cache.size", 1L);
        conf.setLong("mapred.jobtracker.retirejob.interval", 0L);
        conf.setLong("mapred.jobtracker.retirejob.check", 0L);
        conf.getLong("mapred.jobtracker.completeuserjobs.maximum", 0L);
        return new MiniMRCluster(0, 0, numTrackers, "file:///", 1, null, null, null, conf, 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testJobRetire() throws Exception {
        MiniMRCluster mr = null;
        try {
            JobConf conf = new JobConf();
            mr = this.startCluster(conf, 1);
            JobConf jobConf = mr.createJobConf();
            JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
            Path inDir = new Path(testDir, "input1");
            Path outDir = new Path(testDir, "output1");
            JobID id1 = this.validateJobRetire(jobConf, inDir, outDir, jobtracker);
            outDir = new Path(testDir, "output2");
            JobID id2 = this.validateJobRetire(jobConf, inDir, outDir, jobtracker);
            TestJobRetire.assertNull((String)"Job not removed from cache", (Object)jobtracker.getJobStatus(id1));
            TestJobRetire.assertEquals((String)"Total job in cache not correct", (int)1, (int)jobtracker.getAllJobs().length);
        }
        finally {
            if (mr != null) {
                mr.shutdown();
            }
            FileUtil.fullyDelete((File)new File(testDir.toString()));
        }
    }

    private JobID validateJobRetire(JobConf jobConf, Path inDir, Path outDir, JobTracker jobtracker) throws IOException {
        RunningJob rj = UtilsForTests.runJob(jobConf, inDir, outDir, 0, 0);
        rj.waitForCompletion();
        TestJobRetire.assertTrue((boolean)rj.isSuccessful());
        JobID id = rj.getID();
        this.waitTillRetire(id, jobtracker);
        JobTracker.RetireJobInfo retired = jobtracker.retireJobs.get(id);
        TestJobRetire.assertTrue((String)"History url not set", (retired.getHistoryFile() != null && retired.getHistoryFile().length() > 0 ? 1 : 0) != 0);
        TestJobRetire.assertNotNull((String)"Job is not in cache", (Object)jobtracker.getJobStatus(id));
        String name = JobTracker.getLocalJobFilePath((JobID)id);
        File file = new File(name);
        TestJobRetire.assertFalse((String)"JobConf file not deleted", (boolean)file.exists());
        URL jobUrl = new URL(rj.getTrackingURL());
        HttpURLConnection conn = (HttpURLConnection)jobUrl.openConnection();
        conn.setInstanceFollowRedirects(false);
        conn.connect();
        TestJobRetire.assertEquals((int)302, (int)conn.getResponseCode());
        conn.disconnect();
        URL redirectedUrl = new URL(conn.getHeaderField("Location"));
        conn = (HttpURLConnection)redirectedUrl.openConnection();
        conn.connect();
        TestJobRetire.assertEquals((int)200, (int)conn.getResponseCode());
        conn.disconnect();
        return id;
    }

    private void waitTillRetire(JobID id, JobTracker jobtracker) {
        JobInProgress job = jobtracker.getJob(id);
        for (int i = 0; i < 10 && job != null; ++i) {
            UtilsForTests.waitFor(1000L);
            job = jobtracker.getJob(id);
        }
        TestJobRetire.assertNull((String)"Job did not retire", (Object)job);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testJobRetireWithUnreportedTasks() throws Exception {
        MiniMRCluster mr = null;
        try {
            JobConf conf = new JobConf();
            conf.setInt("mapred.tasktracker.map.tasks.maximum", 1);
            conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 0);
            mr = this.startCluster(conf, 1);
            JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
            RunningJob job = UtilsForTests.runJob(mr.createJobConf(), new Path(testDir, "in-1"), new Path(testDir, "out-1"), 1, 1);
            JobID id = JobID.downgrade((org.apache.hadoop.mapreduce.JobID)job.getID());
            JobInProgress jip = jobtracker.getJob(id);
            for (int i = 0; i < 1000 && jip.finishedMaps() < 1; ++i) {
                UtilsForTests.waitFor(100L);
            }
            TestJobRetire.assertEquals((int)jip.finishedMaps(), (int)1);
            LOG.info((Object)"Adding a waiting tracker");
            MiniMRCluster miniMRCluster = mr;
            miniMRCluster.getClass();
            MiniMRCluster.TaskTrackerRunner testTrackerRunner = new MiniMRCluster.TaskTrackerRunner(miniMRCluster, 1, 1, null, mr.createJobConf()){
                {
                    MiniMRCluster miniMRCluster = x0;
                    miniMRCluster.getClass();
                    super(x1, x2, x3, x4);
                }

                @Override
                TaskTracker createTaskTracker(JobConf conf) throws InterruptedException, IOException {
                    return new WaitingTaskTracker(conf);
                }
            };
            mr.addTaskTracker(testTrackerRunner);
            LOG.info((Object)"Waiting tracker added");
            WaitingTaskTracker testTT = (WaitingTaskTracker)testTrackerRunner.getTaskTracker();
            for (int i = 0; i < 1000 && jobtracker.taskTrackers().size() < 2; ++i) {
                UtilsForTests.waitFor(100L);
            }
            TestJobRetire.assertEquals((int)jobtracker.taskTrackers().size(), (int)2);
            LOG.info((Object)"Cluster is now up with 2 trackers");
            mr.stopTaskTracker(mr.getTaskTrackerID(testTT.getName()));
            TestJobRetire.assertEquals((String)"TestTT contacted but no reduce task scheduled on it", (int)1, (int)jip.runningReduces());
            LOG.info((Object)("Killing job " + id));
            job.killJob();
            TaskInProgress tip = jip.getTasks(TaskType.REDUCE)[0];
            TestJobRetire.assertNull((Object)tip.getTaskStatus(tip.getAllTaskAttemptIDs()[0]));
            this.waitTillRetire(id, jobtracker);
            for (TaskAttemptID tid : jobtracker.taskidToTIPMap.keySet()) {
                LOG.info((Object)("TaskidToTIP : " + tid));
            }
            TestJobRetire.assertEquals((String)"'taskid' to TIP mapping still exists", (int)0, (int)jobtracker.taskidToTIPMap.size());
        }
        finally {
            if (mr != null) {
                mr.shutdown();
            }
            FileUtil.fullyDelete((File)new File(testDir.toString()));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testJobRemoval() throws Exception {
        MiniMRCluster mr = null;
        try {
            JobConf conf = new JobConf();
            mr = this.startCluster(conf, 0);
            JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
            this.testRemoveJobTasks(jobtracker, conf, TaskType.MAP);
            this.testRemoveJobTasks(jobtracker, conf, TaskType.REDUCE);
            this.testRemoveJobTasks(jobtracker, conf, TaskType.JOB_SETUP);
            this.testRemoveJobTasks(jobtracker, conf, TaskType.JOB_CLEANUP);
        }
        finally {
            if (mr != null) {
                mr.shutdown();
            }
            FileUtil.fullyDelete((File)new File(testDir.toString()));
        }
    }

    private JobInProgress createAndAddJob(JobTracker jobtracker, JobConf conf) throws IOException {
        JobID id = new JobID(jobtracker.getTrackerIdentifier(), jobtracker.jobs.size() + 1);
        JobInProgress jip = new JobInProgress(id, conf, jobtracker);
        jobtracker.jobs.put(id, jip);
        return jip;
    }

    private TaskInProgress createAndAddTIP(JobTracker jobtracker, JobInProgress jip, TaskType type) {
        JobConf conf = jip.getJobConf();
        JobID id = jip.getJobID();
        TaskInProgress tip = null;
        if (type == TaskType.MAP) {
            tip = new TaskInProgress(id, "dummy", JobSplit.EMPTY_TASK_SPLIT, jobtracker, conf, jip, 0, 1);
            jip.maps = new TaskInProgress[]{tip};
        } else if (type == TaskType.REDUCE) {
            tip = new TaskInProgress(id, "dummy", jip.desiredMaps(), 0, jobtracker, conf, jip, 1);
            jip.reduces = new TaskInProgress[]{tip};
        } else if (type == TaskType.JOB_SETUP) {
            tip = new TaskInProgress(id, "dummy", JobSplit.EMPTY_TASK_SPLIT, jobtracker, conf, jip, 0, 1);
            jip.setup = new TaskInProgress[]{tip};
        } else if (type == TaskType.JOB_CLEANUP) {
            tip = new TaskInProgress(id, "dummy", JobSplit.EMPTY_TASK_SPLIT, jobtracker, conf, jip, 0, 1);
            jip.cleanup = new TaskInProgress[]{tip};
        }
        return tip;
    }

    private TaskAttemptID createAndAddAttempt(TaskInProgress tip, int attemptId) {
        TaskAttemptID taskid = new TaskAttemptID(tip.getTIPId(), attemptId);
        tip.addRunningTask(taskid, "test-tt");
        return taskid;
    }

    private void testRemoveJobTasks(JobTracker jobtracker, JobConf conf, TaskType type) throws IOException {
        JobInProgress jip = this.createAndAddJob(jobtracker, conf);
        TaskInProgress tip = this.createAndAddTIP(jobtracker, jip, type);
        TaskAttemptID taskid = this.createAndAddAttempt(tip, 0);
        TestJobRetire.assertNull((Object)tip.getTaskStatus(taskid));
        jobtracker.removeJobTasks(jip);
        for (TaskAttemptID tid : jobtracker.taskidToTIPMap.keySet()) {
            LOG.info((Object)("TaskidToTIP : " + tid));
        }
        TestJobRetire.assertEquals((String)"'taskid' to TIP mapping still exists", (int)0, (int)jobtracker.taskidToTIPMap.size());
    }

    class WaitingTaskTracker
    extends TaskTracker {
        WaitingTaskTracker(JobConf conf) throws InterruptedException, IOException {
            super(conf);
        }

        HeartbeatResponse transmitHeartBeat(long now) throws IOException {
            HeartbeatResponse response = super.transmitHeartBeat(now);
            LOG.info((Object)"WaitingTaskTracker waiting");
            UtilsForTests.waitFor(Long.MAX_VALUE);
            throw new IOException("WaitingTaskTracker interrupted. Bailing out");
        }
    }
}

