/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.IOException;
import java.util.Arrays;
import junit.extensions.TestSetup;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TestMapCollection;
import org.apache.hadoop.mapred.lib.IdentityReducer;

public class TestReduceFetch
extends TestCase {
    private static MiniMRCluster mrCluster = null;
    private static MiniDFSCluster dfsCluster = null;

    public static Test suite() {
        TestSetup setup = new TestSetup((Test)new TestSuite(TestReduceFetch.class)){

            protected void setUp() throws Exception {
                Configuration conf = new Configuration();
                dfsCluster = new MiniDFSCluster(conf, 2, true, null);
                mrCluster = new MiniMRCluster(2, dfsCluster.getFileSystem().getUri().toString(), 1);
            }

            protected void tearDown() throws Exception {
                if (dfsCluster != null) {
                    dfsCluster.shutdown();
                }
                if (mrCluster != null) {
                    mrCluster.shutdown();
                }
            }
        };
        return setup;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static Counters runJob(JobConf conf) throws Exception {
        conf.setMapperClass(MapMB.class);
        conf.setReducerClass(IdentityReducer.class);
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(Text.class);
        conf.setNumReduceTasks(1);
        conf.setInputFormat(TestMapCollection.FakeIF.class);
        FileInputFormat.setInputPaths((JobConf)conf, (Path[])new Path[]{new Path("/in")});
        Path outp = new Path("/out");
        FileOutputFormat.setOutputPath((JobConf)conf, (Path)outp);
        RunningJob job = null;
        try {
            job = JobClient.runJob((JobConf)conf);
            TestReduceFetch.assertTrue((boolean)job.isSuccessful());
        }
        finally {
            FileSystem fs = dfsCluster.getFileSystem();
            if (fs.exists(outp)) {
                fs.delete(outp, true);
            }
        }
        return job.getCounters();
    }

    public void testReduceFromDisk() throws Exception {
        int MAP_TASKS = 8;
        JobConf job = mrCluster.createJobConf();
        job.set("mapred.job.reduce.input.buffer.percent", "0.0");
        job.setNumMapTasks(8);
        job.setInt("mapred.job.reduce.total.mem.bytes", 0x8000000);
        job.set("mapred.job.shuffle.input.buffer.percent", "0.05");
        job.setInt("io.sort.factor", 2);
        job.setInt("mapred.inmem.merge.threshold", 4);
        Counters c = TestReduceFetch.runJob(job);
        long spill = c.findCounter((Enum)Task.Counter.SPILLED_RECORDS).getCounter();
        long out = c.findCounter((Enum)Task.Counter.MAP_OUTPUT_RECORDS).getCounter();
        TestReduceFetch.assertTrue((String)("Expected all records spilled during reduce (" + spill + ")"), (spill >= 2L * out ? 1 : 0) != 0);
        TestReduceFetch.assertTrue((String)("Expected intermediate merges (" + spill + ")"), (spill >= 2L * out + out / 8L ? 1 : 0) != 0);
    }

    public void testReduceFromPartialMem() throws Exception {
        int MAP_TASKS = 7;
        JobConf job = mrCluster.createJobConf();
        job.setNumMapTasks(7);
        job.setInt("mapred.inmem.merge.threshold", 0);
        job.set("mapred.job.reduce.input.buffer.percent", "1.0");
        job.setInt("mapred.reduce.parallel.copies", 1);
        job.setInt("io.sort.mb", 10);
        job.setInt("mapred.job.reduce.total.mem.bytes", 0x8000000);
        job.set("mapred.job.shuffle.input.buffer.percent", "0.14");
        job.setNumTasksToExecutePerJvm(1);
        job.set("mapred.job.shuffle.merge.percent", "1.0");
        Counters c = TestReduceFetch.runJob(job);
        long out = c.findCounter((Enum)Task.Counter.MAP_OUTPUT_RECORDS).getCounter();
        long spill = c.findCounter((Enum)Task.Counter.SPILLED_RECORDS).getCounter();
        TestReduceFetch.assertTrue((String)("Expected some records not spilled during reduce" + spill + ")"), (spill < 2L * out ? 1 : 0) != 0);
    }

    public void testReduceFromMem() throws Exception {
        int MAP_TASKS = 3;
        JobConf job = mrCluster.createJobConf();
        job.set("mapred.job.reduce.input.buffer.percent", "1.0");
        job.set("mapred.job.shuffle.input.buffer.percent", "1.0");
        job.setInt("mapred.job.reduce.total.mem.bytes", 0x8000000);
        job.setNumMapTasks(3);
        Counters c = TestReduceFetch.runJob(job);
        long spill = c.findCounter((Enum)Task.Counter.SPILLED_RECORDS).getCounter();
        long out = c.findCounter((Enum)Task.Counter.MAP_OUTPUT_RECORDS).getCounter();
        TestReduceFetch.assertEquals((String)("Spilled records: " + spill), (long)out, (long)spill);
    }

    public static class MapMB
    implements Mapper<NullWritable, NullWritable, Text, Text> {
        public void map(NullWritable nk, NullWritable nv, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            Text key = new Text();
            Text val = new Text();
            key.set("KEYKEYKEYKEYKEYKEYKEYKEY");
            byte[] b = new byte[1000];
            Arrays.fill(b, (byte)86);
            val.set(b);
            b = null;
            for (int i = 0; i < 4096; ++i) {
                output.collect((Object)key, (Object)val);
            }
        }

        public void configure(JobConf conf) {
        }

        public void close() throws IOException {
        }
    }
}

