/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.Utils;
import org.apache.hadoop.mapred.lib.LazyOutputFormat;
import org.junit.Assert;
import org.junit.Test;

public class TestLazyOutput {
    private static final int NUM_HADOOP_WORKERS = 3;
    private static final int NUM_MAPS_PER_NODE = 2;
    private static final Path INPUTPATH = new Path("/testlazy/input");
    private static final List<String> INPUTLIST = Arrays.asList("All", "Roads", "Lead", "To", "Hadoop");

    private static void runTestLazyOutput(JobConf job, Path output, int numReducers, boolean createLazily) throws Exception {
        job.setJobName("test-lazy-output");
        FileInputFormat.setInputPaths((JobConf)job, (Path[])new Path[]{INPUTPATH});
        FileOutputFormat.setOutputPath((JobConf)job, (Path)output);
        job.setInputFormat(TextInputFormat.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        job.setMapperClass(TestMapper.class);
        job.setReducerClass(TestReducer.class);
        JobClient client = new JobClient(job);
        job.setNumReduceTasks(numReducers);
        if (createLazily) {
            LazyOutputFormat.setOutputFormatClass((JobConf)job, TextOutputFormat.class);
        } else {
            job.setOutputFormat(TextOutputFormat.class);
        }
        JobClient.runJob((JobConf)job);
    }

    public void createInput(FileSystem fs, int numMappers) throws Exception {
        for (int i = 0; i < numMappers; ++i) {
            FSDataOutputStream os = fs.create(new Path(INPUTPATH, "text" + i + ".txt"));
            OutputStreamWriter wr = new OutputStreamWriter((OutputStream)os);
            for (String inp : INPUTLIST) {
                wr.write(inp + "\n");
            }
            ((Writer)wr).close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLazyOutput() throws Exception {
        MiniDFSCluster dfs = null;
        MiniMRCluster mr = null;
        DistributedFileSystem fileSys = null;
        try {
            Configuration conf = new Configuration();
            dfs = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
            fileSys = dfs.getFileSystem();
            mr = new MiniMRCluster(3, fileSys.getUri().toString(), 1);
            int numReducers = 2;
            int numMappers = 6;
            this.createInput((FileSystem)fileSys, numMappers);
            Path output1 = new Path("/testlazy/output1");
            TestLazyOutput.runTestLazyOutput(mr.createJobConf(), output1, numReducers, true);
            Path[] fileList = FileUtil.stat2Paths((FileStatus[])fileSys.listStatus(output1, (PathFilter)new Utils.OutputFileUtils.OutputFilesFilter()));
            for (int i = 0; i < fileList.length; ++i) {
                System.out.println("Test1 File list[" + i + "]" + ": " + fileList[i]);
            }
            Assert.assertTrue((fileList.length == numReducers - 1 ? 1 : 0) != 0);
            Path output2 = new Path("/testlazy/output2");
            TestLazyOutput.runTestLazyOutput(mr.createJobConf(), output2, 0, true);
            fileList = FileUtil.stat2Paths((FileStatus[])fileSys.listStatus(output2, (PathFilter)new Utils.OutputFileUtils.OutputFilesFilter()));
            for (int i = 0; i < fileList.length; ++i) {
                System.out.println("Test2 File list[" + i + "]" + ": " + fileList[i]);
            }
            Assert.assertTrue((fileList.length == numMappers - 1 ? 1 : 0) != 0);
            Path output3 = new Path("/testlazy/output3");
            TestLazyOutput.runTestLazyOutput(mr.createJobConf(), output3, 0, false);
            fileList = FileUtil.stat2Paths((FileStatus[])fileSys.listStatus(output3, (PathFilter)new Utils.OutputFileUtils.OutputFilesFilter()));
            for (int i = 0; i < fileList.length; ++i) {
                System.out.println("Test3 File list[" + i + "]" + ": " + fileList[i]);
            }
            Assert.assertTrue((fileList.length == numMappers ? 1 : 0) != 0);
        }
        finally {
            if (dfs != null) {
                dfs.shutdown();
            }
            if (mr != null) {
                mr.shutdown();
            }
        }
    }

    static class TestReducer
    extends MapReduceBase
    implements Reducer<LongWritable, Text, LongWritable, Text> {
        private String id;

        TestReducer() {
        }

        public void configure(JobConf job) {
            this.id = job.get("mapreduce.task.attempt.id");
        }

        public void reduce(LongWritable key, Iterator<Text> values, OutputCollector<LongWritable, Text> output, Reporter reporter) throws IOException {
            while (values.hasNext()) {
                Text v = values.next();
                if (this.id.endsWith("0_0")) continue;
                output.collect((Object)key, (Object)v);
            }
        }
    }

    static class TestMapper
    extends MapReduceBase
    implements Mapper<LongWritable, Text, LongWritable, Text> {
        private String id;

        TestMapper() {
        }

        public void configure(JobConf job) {
            this.id = job.get("mapreduce.task.attempt.id");
        }

        public void map(LongWritable key, Text val, OutputCollector<LongWritable, Text> output, Reporter reporter) throws IOException {
            if (!this.id.endsWith("0_0")) {
                output.collect((Object)key, (Object)val);
            }
        }
    }
}

