/*
 * Decompiled with CFR 0.152.
 */
package alluxio.job.plan.transform;

import alluxio.AlluxioURI;
import alluxio.client.file.URIStatus;
import alluxio.collections.Pair;
import alluxio.job.RunTaskContext;
import alluxio.job.SelectExecutorsContext;
import alluxio.job.plan.AbstractVoidPlanDefinition;
import alluxio.job.plan.transform.CompactConfig;
import alluxio.job.plan.transform.CompactTask;
import alluxio.job.plan.transform.Format;
import alluxio.job.plan.transform.format.TableReader;
import alluxio.job.plan.transform.format.TableRow;
import alluxio.job.plan.transform.format.TableSchema;
import alluxio.job.plan.transform.format.TableWriter;
import alluxio.job.util.SerializableVoid;
import alluxio.util.CommonUtils;
import alluxio.wire.WorkerInfo;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class CompactDefinition
extends AbstractVoidPlanDefinition<CompactConfig, ArrayList<CompactTask>> {
    private static final Logger LOG = LoggerFactory.getLogger(CompactDefinition.class);
    private static final int TASKS_PER_WORKER = 10;
    private static final String COMPACTED_FILE_PATTERN = "part-%d.parquet";
    private static final String SUCCESS_FILENAME = "_SUCCESS";
    private static final String CRC_FILENAME_SUFFIX = ".crc";
    private static final Map<Format, Double> COMPRESSION_RATIO = ImmutableMap.of((Object)Format.PARQUET, (Object)1.0, (Object)Format.CSV, (Object)5.0, (Object)Format.GZIP_CSV, (Object)2.5, (Object)Format.ORC, (Object)1.0);

    @Override
    public Class<CompactConfig> getJobConfigClass() {
        return CompactConfig.class;
    }

    private String getOutputPath(AlluxioURI outputDir, int outputIndex) {
        return outputDir.join(String.format(COMPACTED_FILE_PATTERN, outputIndex)).toString();
    }

    private boolean shouldIgnore(URIStatus status) {
        return status.isFolder() || status.getName().equals(SUCCESS_FILENAME) || status.getName().endsWith(CRC_FILENAME_SUFFIX);
    }

    @Override
    public Set<Pair<WorkerInfo, ArrayList<CompactTask>>> selectExecutors(CompactConfig config, List<WorkerInfo> jobWorkers, SelectExecutorsContext context) throws Exception {
        Preconditions.checkState((!jobWorkers.isEmpty() ? 1 : 0) != 0, (Object)"No job worker");
        AlluxioURI inputDir = new AlluxioURI(config.getInput());
        AlluxioURI outputDir = new AlluxioURI(config.getOutput());
        ArrayList files = Lists.newArrayList();
        double totalFileSize = 0.0;
        for (URIStatus status : context.getFileSystem().listStatus(inputDir)) {
            if (this.shouldIgnore(status)) continue;
            files.add(status);
            totalFileSize += (double)status.getLength();
        }
        HashMap assignments = Maps.newHashMap();
        int maxNumFiles = config.getMaxNumFiles();
        long groupMinSize = config.getMinFileSize();
        if (!files.isEmpty() && config.getInputPartitionInfo() != null) {
            groupMinSize = (long)((double)groupMinSize * COMPRESSION_RATIO.get(config.getInputPartitionInfo().getFormat(((URIStatus)files.get(0)).getName())));
        }
        if (totalFileSize / (double)groupMinSize > (double)maxNumFiles) {
            groupMinSize = Math.round(totalFileSize / (double)maxNumFiles);
        }
        ArrayList<String> group = new ArrayList<String>();
        int workerIndex = 0;
        int outputIndex = 0;
        int groupIndex = 0;
        long currentGroupSize = 0L;
        long halfGroupMinSize = groupMinSize / 2L;
        for (URIStatus file : files) {
            if (group.isEmpty() || groupIndex == maxNumFiles - 1 || currentGroupSize + file.getLength() <= halfGroupMinSize || Math.abs(currentGroupSize + file.getLength() - groupMinSize) <= Math.abs(currentGroupSize - groupMinSize)) {
                group.add(inputDir.join(file.getName()).toString());
                currentGroupSize += file.getLength();
                continue;
            }
            WorkerInfo worker = jobWorkers.get(workerIndex++);
            if (workerIndex == jobWorkers.size()) {
                workerIndex = 0;
            }
            if (!assignments.containsKey(worker)) {
                assignments.put(worker, new ArrayList());
            }
            ArrayList tasks = (ArrayList)assignments.get(worker);
            tasks.add(new CompactTask(group, this.getOutputPath(outputDir, outputIndex++)));
            group = new ArrayList();
            group.add(inputDir.join(file.getName()).toString());
            currentGroupSize = file.getLength();
            ++groupIndex;
        }
        if (!group.isEmpty()) {
            WorkerInfo worker = jobWorkers.get(workerIndex);
            if (!assignments.containsKey(worker)) {
                assignments.put(worker, new ArrayList());
            }
            ArrayList tasks = (ArrayList)assignments.get(worker);
            tasks.add(new CompactTask(group, this.getOutputPath(outputDir, outputIndex)));
        }
        HashSet result = Sets.newHashSet();
        for (Map.Entry assignment : assignments.entrySet()) {
            List partitioned = CommonUtils.partition((List)((List)assignment.getValue()), (int)10);
            for (List compactTasks : partitioned) {
                if (compactTasks.isEmpty()) continue;
                result.add(new Pair(assignment.getKey(), (Object)Lists.newArrayList((Iterable)compactTasks)));
            }
        }
        return result;
    }

    @Override
    public SerializableVoid runTask(CompactConfig config, ArrayList<CompactTask> tasks, RunTaskContext context) throws Exception {
        for (CompactTask task : tasks) {
            TableSchema schema;
            ArrayList<String> inputs = task.getInputs();
            if (inputs.isEmpty()) continue;
            AlluxioURI output = new AlluxioURI(task.getOutput());
            try (TableReader reader = TableReader.create(new AlluxioURI(inputs.get(0)), config.getInputPartitionInfo());){
                schema = reader.getSchema();
            }
            try {
                TableWriter writer = TableWriter.create(schema, output, config.getOutputPartitionInfo());
                var10_11 = null;
                try {
                    for (String input : inputs) {
                        TableReader reader = TableReader.create(new AlluxioURI(input), config.getInputPartitionInfo());
                        Throwable throwable = null;
                        try {
                            TableRow row = reader.read();
                            while (row != null) {
                                writer.write(row);
                                row = reader.read();
                            }
                        }
                        catch (Throwable throwable2) {
                            throwable = throwable2;
                            throw throwable2;
                        }
                        finally {
                            if (reader == null) continue;
                            if (throwable != null) {
                                try {
                                    reader.close();
                                }
                                catch (Throwable throwable3) {
                                    throwable.addSuppressed(throwable3);
                                }
                                continue;
                            }
                            reader.close();
                        }
                    }
                }
                catch (Throwable throwable) {
                    var10_11 = throwable;
                    throw throwable;
                }
                finally {
                    if (writer == null) continue;
                    if (var10_11 != null) {
                        try {
                            writer.close();
                        }
                        catch (Throwable throwable) {
                            var10_11.addSuppressed(throwable);
                        }
                        continue;
                    }
                    writer.close();
                }
            }
            catch (Throwable e) {
                try {
                    context.getFileSystem().delete(output);
                }
                catch (Throwable t) {
                    e.addSuppressed(t);
                }
                throw e;
            }
        }
        return null;
    }
}

