/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexer;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.druid.common.guava.ThreadRenamingRunnable;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.Row;
import org.apache.druid.data.input.Rows;
import org.apache.druid.indexer.Bucket;
import org.apache.druid.indexer.DataSegmentAndIndexZipFilePath;
import org.apache.druid.indexer.HadoopDruidIndexerConfig;
import org.apache.druid.indexer.HadoopDruidIndexerMapper;
import org.apache.druid.indexer.HadoopTuningConfig;
import org.apache.druid.indexer.InputRowSerde;
import org.apache.druid.indexer.JobHelper;
import org.apache.druid.indexer.Jobby;
import org.apache.druid.indexer.SortableBytes;
import org.apache.druid.indexer.TaskMetricsUtils;
import org.apache.druid.indexer.Utils;
import org.apache.druid.indexer.hadoop.SegmentInputRow;
import org.apache.druid.indexer.path.DatasourcePathSpec;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.BaseProgressIndicator;
import org.apache.druid.segment.ProgressIndicator;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ColumnFormat;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Progressable;
import org.joda.time.DateTime;
import org.joda.time.Interval;

public class IndexGeneratorJob
implements Jobby {
    private static final Logger log = new Logger(IndexGeneratorJob.class);
    private final HadoopDruidIndexerConfig config;
    private IndexGeneratorStats jobStats;
    private Job job;

    public static List<DataSegmentAndIndexZipFilePath> getPublishedSegmentAndIndexZipFilePaths(HadoopDruidIndexerConfig config) {
        Configuration conf = JobHelper.injectSystemProperties(new Configuration(), config);
        config.addJobProperties(conf);
        ObjectMapper jsonMapper = HadoopDruidIndexerConfig.JSON_MAPPER;
        ImmutableList.Builder publishedSegmentAndIndexZipFilePathsBuilder = ImmutableList.builder();
        Path descriptorInfoDir = config.makeDescriptorInfoDir();
        try {
            FileSystem fs = descriptorInfoDir.getFileSystem(conf);
            for (FileStatus status : fs.listStatus(descriptorInfoDir)) {
                DataSegmentAndIndexZipFilePath segmentAndIndexZipFilePath = (DataSegmentAndIndexZipFilePath)jsonMapper.readValue((InputStream)fs.open(status.getPath()), DataSegmentAndIndexZipFilePath.class);
                publishedSegmentAndIndexZipFilePathsBuilder.add((Object)segmentAndIndexZipFilePath);
                log.info("Adding segment %s to the list of published segments", new Object[]{segmentAndIndexZipFilePath.getSegment().getId()});
            }
        }
        catch (FileNotFoundException e) {
            log.error("[%s] SegmentDescriptorInfo is not found usually when indexing process did not produce any segments meaning either there was no input data to process or all the input events were discarded due to some error", new Object[]{e.getMessage()});
            throw new RuntimeException(e);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        ImmutableList publishedSegmentAndIndexZipFilePaths = publishedSegmentAndIndexZipFilePathsBuilder.build();
        return publishedSegmentAndIndexZipFilePaths;
    }

    public IndexGeneratorJob(HadoopDruidIndexerConfig config) {
        this.config = config;
        this.jobStats = new IndexGeneratorStats();
    }

    protected void setReducerClass(Job job) {
        job.setReducerClass(IndexGeneratorReducer.class);
    }

    public boolean run() {
        try {
            this.job = Job.getInstance((Configuration)new Configuration(), (String)StringUtils.format((String)"%s-index-generator-%s", (Object[])new Object[]{this.config.getDataSource(), this.config.getIntervals()}));
            this.job.getConfiguration().set("io.sort.record.percent", "0.23");
            JobHelper.injectSystemProperties(this.job.getConfiguration(), this.config);
            this.config.addJobProperties(this.job);
            JobHelper.injectDruidProperties(this.job.getConfiguration(), this.config);
            this.job.setMapperClass(IndexGeneratorMapper.class);
            this.job.setMapOutputValueClass(BytesWritable.class);
            SortableBytes.useSortableBytesAsMapOutputKey(this.job, IndexGeneratorPartitioner.class);
            int numReducers = Iterables.size((Iterable)((Iterable)this.config.getAllBuckets().get()));
            if (numReducers == 0) {
                throw new RuntimeException("No buckets?? seems there is no data to index.");
            }
            if (this.config.getSchema().getTuningConfig().getUseCombiner()) {
                this.job.setCombinerClass(IndexGeneratorCombiner.class);
                this.job.setCombinerKeyGroupingComparatorClass(BytesWritable.Comparator.class);
            }
            this.job.setNumReduceTasks(numReducers);
            this.setReducerClass(this.job);
            this.job.setOutputKeyClass(BytesWritable.class);
            this.job.setOutputValueClass(Text.class);
            this.job.setOutputFormatClass(IndexGeneratorOutputFormat.class);
            FileOutputFormat.setOutputPath((Job)this.job, (Path)this.config.makeIntermediatePath());
            this.config.addInputPaths(this.job);
            this.config.intoConfiguration(this.job);
            JobHelper.setupClasspath(JobHelper.distributedClassPath(this.config.getWorkingPath()), JobHelper.distributedClassPath(this.config.makeIntermediatePath()), this.job);
            this.job.submit();
            log.info("Job %s submitted, status available at %s", new Object[]{this.job.getJobName(), this.job.getTrackingURL()});
            if (this.job.getJobID() != null) {
                JobHelper.writeJobIdToFile(this.config.getHadoopJobIdFileName(), this.job.getJobID().toString());
            }
            try {
                boolean success = this.job.waitForCompletion(true);
                Counters counters = this.job.getCounters();
                if (counters == null) {
                    log.info("No counters found for job [%s]", new Object[]{this.job.getJobName()});
                } else {
                    Counter invalidRowCount = counters.findCounter((Enum)HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER);
                    if (invalidRowCount != null) {
                        this.jobStats.setInvalidRowCount(invalidRowCount.getValue());
                    } else {
                        log.info("No invalid row counter found for job [%s]", new Object[]{this.job.getJobName()});
                    }
                }
                return success;
            }
            catch (IOException ioe) {
                if (!Utils.checkAppSuccessForJobIOException(ioe, this.job, this.config.isUseYarnRMJobStatusFallback())) {
                    throw ioe;
                }
                return true;
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public Map<String, Object> getStats() {
        if (this.job == null) {
            return null;
        }
        try {
            Counters jobCounters = this.job.getCounters();
            Map metrics = TaskMetricsUtils.makeIngestionRowMetrics((long)jobCounters.findCounter((Enum)HadoopDruidIndexerConfig.IndexJobCounters.ROWS_PROCESSED_COUNTER).getValue(), (long)jobCounters.findCounter((Enum)HadoopDruidIndexerConfig.IndexJobCounters.ROWS_PROCESSED_WITH_ERRORS_COUNTER).getValue(), (long)jobCounters.findCounter((Enum)HadoopDruidIndexerConfig.IndexJobCounters.ROWS_UNPARSEABLE_COUNTER).getValue(), (long)jobCounters.findCounter((Enum)HadoopDruidIndexerConfig.IndexJobCounters.ROWS_THROWN_AWAY_COUNTER).getValue());
            return metrics;
        }
        catch (IllegalStateException ise) {
            log.debug("Couldn't get counters due to job state", new Object[0]);
            return null;
        }
        catch (Exception e) {
            log.debug((Throwable)e, "Encountered exception in getStats().", new Object[0]);
            return null;
        }
    }

    public String getErrorMessage() {
        if (this.job == null) {
            return null;
        }
        return Utils.getFailureMessage(this.job, HadoopDruidIndexerConfig.JSON_MAPPER);
    }

    private static IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs, HadoopDruidIndexerConfig config, @Nullable Iterable<String> oldDimOrder, @Nullable Map<String, ColumnFormat> oldCapabilities) {
        HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig();
        IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder().withMinTimestamp(theBucket.time.getMillis()).withTimestampSpec(config.getSchema().getDataSchema().getTimestampSpec()).withDimensionsSpec(config.getSchema().getDataSchema().getDimensionsSpec()).withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity()).withMetrics(aggs).withRollup(config.getSchema().getDataSchema().getGranularitySpec().isRollup()).build();
        IncrementalIndex newIndex = tuningConfig.getAppendableIndexSpec().builder().setIndexSchema(indexSchema).setMaxRowCount(tuningConfig.getMaxRowsInMemory()).setMaxBytesInMemory(tuningConfig.getMaxBytesInMemoryOrDefault()).setUseMaxMemoryEstimates(tuningConfig.isUseMaxMemoryEstimates()).build();
        if (oldDimOrder != null && !indexSchema.getDimensionsSpec().hasFixedDimensions()) {
            newIndex.loadDimensionIterable(oldDimOrder, oldCapabilities);
        }
        return newIndex;
    }

    public static class IndexGeneratorStats {
        private long invalidRowCount = 0L;

        public long getInvalidRowCount() {
            return this.invalidRowCount;
        }

        public void setInvalidRowCount(long invalidRowCount) {
            this.invalidRowCount = invalidRowCount;
        }
    }

    public static class IndexGeneratorReducer
    extends Reducer<BytesWritable, BytesWritable, BytesWritable, Text> {
        protected HadoopDruidIndexerConfig config;
        private List<String> metricNames = new ArrayList<String>();
        private AggregatorFactory[] aggregators;
        private AggregatorFactory[] combiningAggs;
        private Map<String, InputRowSerde.IndexSerdeTypeHelper> typeHelperMap;

        protected ProgressIndicator makeProgressIndicator(final Reducer.Context context) {
            return new BaseProgressIndicator(){

                public void progress() {
                    super.progress();
                    context.progress();
                }
            };
        }

        private File persist(IncrementalIndex index, Interval interval, File file, ProgressIndicator progressIndicator) throws IOException {
            return HadoopDruidIndexerConfig.INDEX_MERGER_V9.persist(index, interval, file, this.config.getIndexSpecForIntermediatePersists(), progressIndicator, null);
        }

        protected File mergeQueryableIndex(List<QueryableIndex> indexes, AggregatorFactory[] aggs, File file, ProgressIndicator progressIndicator) throws IOException {
            boolean rollup = this.config.getSchema().getDataSchema().getGranularitySpec().isRollup();
            return HadoopDruidIndexerConfig.INDEX_MERGER_V9.mergeQueryableIndex(indexes, rollup, aggs, null, file, this.config.getIndexSpec(), this.config.getIndexSpecForIntermediatePersists(), progressIndicator, null, -1);
        }

        protected void setup(Reducer.Context context) {
            this.config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
            this.aggregators = this.config.getSchema().getDataSchema().getAggregators();
            this.combiningAggs = new AggregatorFactory[this.aggregators.length];
            for (int i = 0; i < this.aggregators.length; ++i) {
                this.metricNames.add(this.aggregators[i].getName());
                this.combiningAggs[i] = this.aggregators[i].getCombiningFactory();
            }
            this.typeHelperMap = InputRowSerde.getTypeHelperMap(this.config.getSchema().getDataSchema().getDimensionsSpec());
        }

        protected void reduce(BytesWritable key, Iterable<BytesWritable> values, Reducer.Context context) throws IOException, InterruptedException {
            SortableBytes keyBytes = SortableBytes.fromBytesWritable(key);
            Bucket bucket = (Bucket)Bucket.fromGroupKey((byte[])keyBytes.getGroupKey()).lhs;
            final Interval interval = (Interval)this.config.getGranularitySpec().bucketInterval(bucket.time).get();
            ListeningExecutorService persistExecutor = null;
            ArrayList<ListenableFuture> persistFutures = new ArrayList<ListenableFuture>();
            IncrementalIndex index = IndexGeneratorJob.makeIncrementalIndex(bucket, this.combiningAggs, this.config, null, null);
            try {
                File mergedBase;
                File baseFlushFile = FileUtils.createTempDir((String)"base-flush");
                TreeSet<Object> toMerge = new TreeSet<Object>();
                int indexCount = 0;
                int lineCount = 0;
                int runningTotalLineCount = 0;
                long startTime = System.currentTimeMillis();
                LinkedHashSet allDimensionNames = new LinkedHashSet();
                final ProgressIndicator progressIndicator = this.makeProgressIndicator(context);
                int numBackgroundPersistThreads = this.config.getSchema().getTuningConfig().getNumBackgroundPersistThreads();
                if (numBackgroundPersistThreads > 0) {
                    SynchronousQueue queue = new SynchronousQueue();
                    ThreadPoolExecutor executorService = new ThreadPoolExecutor(numBackgroundPersistThreads, numBackgroundPersistThreads, 0L, TimeUnit.MILLISECONDS, queue, Execs.makeThreadFactory((String)"IndexGeneratorJob_persist_%d"), new RejectedExecutionHandler(){

                        @Override
                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                            try {
                                executor.getQueue().put(r);
                            }
                            catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                                throw new RejectedExecutionException("Got Interrupted while adding to the Queue", e);
                            }
                        }
                    });
                    persistExecutor = MoreExecutors.listeningDecorator((ExecutorService)executorService);
                } else {
                    persistExecutor = Execs.directExecutor();
                }
                for (BytesWritable bw : values) {
                    context.progress();
                    InputRow inputRow = index.formatRow(InputRowSerde.fromBytes(this.typeHelperMap, bw.getBytes(), this.aggregators));
                    int n = index.add(inputRow).getRowCount();
                    ++lineCount;
                    if (index.canAppendRow()) continue;
                    allDimensionNames.addAll(index.getDimensionNames(false));
                    log.info(index.getOutOfRowsReason(), new Object[0]);
                    log.info("%,d lines to %,d rows in %,d millis", new Object[]{lineCount - runningTotalLineCount, n, System.currentTimeMillis() - startTime});
                    runningTotalLineCount = lineCount;
                    final File file = new File(baseFlushFile, StringUtils.format((String)"index%,05d", (Object[])new Object[]{indexCount}));
                    toMerge.add(file);
                    context.progress();
                    final IncrementalIndex persistIndex = index;
                    persistFutures.add(persistExecutor.submit((Runnable)new ThreadRenamingRunnable(StringUtils.format((String)"%s-persist", (Object[])new Object[]{file.getName()})){

                        public void doRun() {
                            try {
                                this.persist(persistIndex, interval, file, progressIndicator);
                            }
                            catch (Exception e) {
                                log.error((Throwable)e, "persist index error", new Object[0]);
                                throw new RuntimeException(e);
                            }
                            finally {
                                persistIndex.close();
                            }
                        }
                    }));
                    index = IndexGeneratorJob.makeIncrementalIndex(bucket, this.combiningAggs, this.config, index.getDimensionOrder(), persistIndex.getColumnFormats());
                    startTime = System.currentTimeMillis();
                    ++indexCount;
                }
                allDimensionNames.addAll(index.getDimensionNames(false));
                log.info("%,d lines completed.", new Object[]{lineCount});
                ArrayList indexes = Lists.newArrayListWithCapacity((int)indexCount);
                if (toMerge.size() == 0) {
                    if (index.isEmpty()) {
                        throw new IAE("If you try to persist empty indexes you are going to have a bad time", new Object[0]);
                    }
                    mergedBase = new File(baseFlushFile, "merged");
                    this.persist(index, interval, mergedBase, progressIndicator);
                } else {
                    if (!index.isEmpty()) {
                        File finalFile = new File(baseFlushFile, "final");
                        this.persist(index, interval, finalFile, progressIndicator);
                        toMerge.add(finalFile);
                    }
                    Futures.allAsList(persistFutures).get(1L, TimeUnit.HOURS);
                    persistExecutor.shutdown();
                    for (File file : toMerge) {
                        indexes.add(HadoopDruidIndexerConfig.INDEX_IO.loadIndex(file));
                    }
                    log.info("starting merge of intermediate persisted segments.", new Object[0]);
                    long mergeStartTime = System.currentTimeMillis();
                    mergedBase = this.mergeQueryableIndex(indexes, this.aggregators, new File(baseFlushFile, "merged"), progressIndicator);
                    log.info("finished merge of intermediate persisted segments. time taken [%d] ms.", new Object[]{System.currentTimeMillis() - mergeStartTime});
                }
                FileSystem outputFS = new Path(this.config.getSchema().getIOConfig().getSegmentOutputPath()).getFileSystem(context.getConfiguration());
                ShardSpec shardSpec = this.config.getShardSpec(bucket).getActualSpec();
                Object shardSpecForPublishing = this.config.isForceExtendableShardSpecs() ? new NumberedShardSpec(shardSpec.getPartitionNum(), this.config.getShardSpecCount(bucket)) : shardSpec;
                DataSegment segmentTemplate = new DataSegment(this.config.getDataSource(), interval, this.config.getSchema().getTuningConfig().getVersion(), null, (List)ImmutableList.copyOf(allDimensionNames), this.metricNames, shardSpecForPublishing, Integer.valueOf(-1), 0L);
                DataSegmentAndIndexZipFilePath segmentAndIndexZipFilePath = JobHelper.serializeOutIndex(segmentTemplate, context.getConfiguration(), (Progressable)context, mergedBase, JobHelper.makeFileNamePath(new Path(this.config.getSchema().getIOConfig().getSegmentOutputPath()), outputFS, segmentTemplate, "index.zip", HadoopDruidIndexerConfig.DATA_SEGMENT_PUSHER), JobHelper.makeTmpPath(new Path(this.config.getSchema().getIOConfig().getSegmentOutputPath()), outputFS, segmentTemplate, context.getTaskAttemptID(), HadoopDruidIndexerConfig.DATA_SEGMENT_PUSHER), HadoopDruidIndexerConfig.DATA_SEGMENT_PUSHER);
                Path descriptorPath = this.config.makeDescriptorInfoPath(segmentAndIndexZipFilePath.getSegment());
                descriptorPath = JobHelper.prependFSIfNullScheme(FileSystem.get((URI)descriptorPath.toUri(), (Configuration)context.getConfiguration()), descriptorPath);
                log.info("Writing descriptor to path[%s]", new Object[]{descriptorPath});
                JobHelper.writeSegmentDescriptor(this.config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration()), segmentAndIndexZipFilePath, descriptorPath, (Progressable)context);
                for (File file : toMerge) {
                    FileUtils.deleteDirectory((File)file);
                }
            }
            catch (ExecutionException | TimeoutException e) {
                throw new RuntimeException(e);
            }
            finally {
                index.close();
                if (persistExecutor != null) {
                    persistExecutor.shutdownNow();
                }
            }
        }
    }

    public static class IndexGeneratorMapper
    extends HadoopDruidIndexerMapper<BytesWritable, BytesWritable> {
        private static final HashFunction HASH_FUNCTION = Hashing.murmur3_128();
        private AggregatorFactory[] aggregators;
        private AggregatorFactory[] aggsForSerializingSegmentInputRow;
        private Map<String, InputRowSerde.IndexSerdeTypeHelper> typeHelperMap;

        @Override
        protected void setup(Mapper.Context context) throws IOException, InterruptedException {
            super.setup(context);
            this.aggregators = this.config.getSchema().getDataSchema().getAggregators();
            if (DatasourcePathSpec.checkIfReindexingAndIsUseAggEnabled(this.config.getSchema().getIOConfig().getPathSpec())) {
                this.aggsForSerializingSegmentInputRow = this.aggregators;
            } else {
                this.aggsForSerializingSegmentInputRow = new AggregatorFactory[this.aggregators.length];
                for (int i = 0; i < this.aggregators.length; ++i) {
                    this.aggsForSerializingSegmentInputRow[i] = this.aggregators[i].getCombiningFactory();
                }
            }
            this.typeHelperMap = InputRowSerde.getTypeHelperMap(this.config.getSchema().getDataSchema().getDimensionsSpec());
        }

        @Override
        protected void innerMap(InputRow inputRow, Mapper.Context context) throws IOException, InterruptedException {
            Optional<Bucket> bucket = this.getConfig().getBucket(inputRow);
            if (!bucket.isPresent()) {
                throw new ISE("No bucket found for row: %s", new Object[]{inputRow});
            }
            long truncatedTimestamp = this.granularitySpec.getQueryGranularity().bucketStart(inputRow.getTimestamp()).getMillis();
            byte[] hashedDimensions = HASH_FUNCTION.hashBytes(HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsBytes((Object)Rows.toGroupKey((long)truncatedTimestamp, (InputRow)inputRow))).asBytes();
            InputRowSerde.SerializeResult serializeResult = inputRow instanceof SegmentInputRow ? InputRowSerde.toBytes(this.typeHelperMap, inputRow, this.aggsForSerializingSegmentInputRow) : InputRowSerde.toBytes(this.typeHelperMap, inputRow, this.aggregators);
            context.write((Object)new SortableBytes(((Bucket)bucket.get()).toGroupKey(new byte[0][]), ByteBuffer.allocate(8 + hashedDimensions.length).putLong(truncatedTimestamp).put(hashedDimensions).array()).toBytesWritable(), (Object)new BytesWritable(serializeResult.getSerializedRow()));
            ParseException pe = IncrementalIndex.getCombinedParseException((InputRow)inputRow, serializeResult.getParseExceptionMessages(), null);
            if (pe != null) {
                throw pe;
            }
            context.getCounter((Enum)HadoopDruidIndexerConfig.IndexJobCounters.ROWS_PROCESSED_COUNTER).increment(1L);
        }
    }

    public static class IndexGeneratorPartitioner
    extends Partitioner<BytesWritable, Writable>
    implements Configurable {
        private Configuration config;

        public int getPartition(BytesWritable bytesWritable, Writable value, int numPartitions) {
            ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes());
            bytes.position(4);
            int shardNum = bytes.getInt();
            if ("local".equals(JobHelper.getJobTrackerAddress(this.config))) {
                return shardNum % numPartitions;
            }
            if (shardNum >= numPartitions) {
                throw new ISE("Not enough partitions, shard[%,d] >= numPartitions[%,d]", new Object[]{shardNum, numPartitions});
            }
            return shardNum;
        }

        public Configuration getConf() {
            return this.config;
        }

        public void setConf(Configuration config) {
            this.config = config;
        }
    }

    public static class IndexGeneratorCombiner
    extends Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable> {
        private HadoopDruidIndexerConfig config;
        private AggregatorFactory[] aggregators;
        private AggregatorFactory[] combiningAggs;
        private Map<String, InputRowSerde.IndexSerdeTypeHelper> typeHelperMap;

        protected void setup(Reducer.Context context) {
            this.config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
            this.aggregators = this.config.getSchema().getDataSchema().getAggregators();
            this.combiningAggs = new AggregatorFactory[this.aggregators.length];
            for (int i = 0; i < this.aggregators.length; ++i) {
                this.combiningAggs[i] = this.aggregators[i].getCombiningFactory();
            }
            this.typeHelperMap = InputRowSerde.getTypeHelperMap(this.config.getSchema().getDataSchema().getDimensionsSpec());
        }

        protected void reduce(BytesWritable key, Iterable<BytesWritable> values, Reducer.Context context) throws IOException, InterruptedException {
            Iterator<BytesWritable> iter = values.iterator();
            BytesWritable first = iter.next();
            if (iter.hasNext()) {
                LinkedHashSet<String> dimOrder = new LinkedHashSet<String>();
                SortableBytes keyBytes = SortableBytes.fromBytesWritable(key);
                Bucket bucket = (Bucket)Bucket.fromGroupKey((byte[])keyBytes.getGroupKey()).lhs;
                IncrementalIndex index = IndexGeneratorJob.makeIncrementalIndex(bucket, this.combiningAggs, this.config, null, null);
                index.add(InputRowSerde.fromBytes(this.typeHelperMap, first.getBytes(), this.aggregators));
                while (iter.hasNext()) {
                    context.progress();
                    InputRow value = InputRowSerde.fromBytes(this.typeHelperMap, iter.next().getBytes(), this.aggregators);
                    if (!index.canAppendRow()) {
                        dimOrder.addAll(index.getDimensionOrder());
                        log.info("current index full due to [%s]. creating new index.", new Object[]{index.getOutOfRowsReason()});
                        this.flushIndexToContextAndClose(key, index, context);
                        index = IndexGeneratorJob.makeIncrementalIndex(bucket, this.combiningAggs, this.config, dimOrder, index.getColumnFormats());
                    }
                    index.add(value);
                }
                this.flushIndexToContextAndClose(key, index, context);
            } else {
                context.write((Object)key, (Object)first);
            }
        }

        private void flushIndexToContextAndClose(BytesWritable key, IncrementalIndex index, Reducer.Context context) throws IOException, InterruptedException {
            List dimensions = index.getDimensionNames(false);
            Iterator rows = index.iterator();
            while (rows.hasNext()) {
                context.progress();
                Row row = (Row)rows.next();
                InputRow inputRow = this.getInputRowFromRow(row, dimensions);
                InputRowSerde.SerializeResult serializeResult = InputRowSerde.toBytes(this.typeHelperMap, inputRow, this.combiningAggs);
                context.write((Object)key, (Object)new BytesWritable(serializeResult.getSerializedRow()));
            }
            index.close();
        }

        private InputRow getInputRowFromRow(final Row row, final List<String> dimensions) {
            return new InputRow(){

                public List<String> getDimensions() {
                    return dimensions;
                }

                public long getTimestampFromEpoch() {
                    return row.getTimestampFromEpoch();
                }

                public DateTime getTimestamp() {
                    return row.getTimestamp();
                }

                public List<String> getDimension(String dimension) {
                    return row.getDimension(dimension);
                }

                public Object getRaw(String dimension) {
                    return row.getRaw(dimension);
                }

                public Number getMetric(String metric) {
                    return row.getMetric(metric);
                }

                public int compareTo(Row o) {
                    return row.compareTo((Object)o);
                }
            };
        }
    }

    public static class IndexGeneratorOutputFormat
    extends TextOutputFormat {
        public void checkOutputSpecs(JobContext job) throws IOException {
            Path outDir = IndexGeneratorOutputFormat.getOutputPath((JobContext)job);
            if (outDir == null) {
                throw new InvalidJobConfException("Output directory not set.");
            }
        }
    }
}

