/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.examples.java.distcp;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.examples.java.distcp.FileCopyTask;
import org.apache.flink.examples.java.distcp.FileCopyTaskInputFormat;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DistCp {
    private static final Logger LOGGER = LoggerFactory.getLogger(DistCp.class);
    public static final String BYTES_COPIED_CNT_NAME = "BYTES_COPIED";
    public static final String FILES_COPIED_CNT_NAME = "FILES_COPIED";

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        ParameterTool params = ParameterTool.fromArgs((String[])args);
        if (!params.has("input") || !params.has("output")) {
            System.err.println("Usage: --input <path> --output <path> [--parallelism <n>]");
            return;
        }
        Path sourcePath = new Path(params.get("input"));
        final Path targetPath = new Path(params.get("output"));
        if (!(DistCp.isLocal(env) || DistCp.isOnDistributedFS(sourcePath) && DistCp.isOnDistributedFS(targetPath))) {
            System.out.println("In a distributed mode only HDFS input/output paths are supported");
            return;
        }
        int parallelism = params.getInt("parallelism", 10);
        if (parallelism <= 0) {
            System.err.println("Parallelism should be greater than 0");
            return;
        }
        env.getConfig().setGlobalJobParameters((ExecutionConfig.GlobalJobParameters)params);
        env.setParallelism(parallelism);
        long startTime = System.currentTimeMillis();
        LOGGER.info("Initializing copy tasks");
        List<FileCopyTask> tasks = DistCp.getCopyTasks(sourcePath);
        LOGGER.info("Copy task initialization took " + (System.currentTimeMillis() - startTime) + "ms");
        DataSource inputTasks = new DataSource(env, (InputFormat)new FileCopyTaskInputFormat(tasks), (TypeInformation)new GenericTypeInfo(FileCopyTask.class), "fileCopyTasks");
        FlatMapOperator res = inputTasks.flatMap((FlatMapFunction)new RichFlatMapFunction<FileCopyTask, Object>(){
            private LongCounter fileCounter;
            private LongCounter bytesCounter;

            public void open(Configuration parameters) throws Exception {
                this.bytesCounter = this.getRuntimeContext().getLongCounter(DistCp.BYTES_COPIED_CNT_NAME);
                this.fileCounter = this.getRuntimeContext().getLongCounter(DistCp.FILES_COPIED_CNT_NAME);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void flatMap(FileCopyTask task, Collector<Object> out) throws Exception {
                File outFile;
                File parentFile;
                LOGGER.info("Processing task: " + task);
                Path outPath = new Path(targetPath, task.getRelativePath());
                FileSystem targetFs = targetPath.getFileSystem();
                if (!(targetFs.isDistributedFS() || (parentFile = (outFile = outPath.toUri().isAbsolute() ? new File(outPath.toUri()) : new File(outPath.toString())).getParentFile()).mkdirs() || parentFile.exists())) {
                    throw new RuntimeException("Cannot create local file system directories: " + parentFile);
                }
                FSDataOutputStream outputStream = null;
                FSDataInputStream inputStream = null;
                try {
                    outputStream = targetFs.create(outPath, true);
                    inputStream = task.getPath().getFileSystem().open(task.getPath());
                    int bytes = IOUtils.copy((InputStream)inputStream, (OutputStream)outputStream);
                    this.bytesCounter.add((long)bytes);
                }
                catch (Throwable throwable) {
                    IOUtils.closeQuietly(inputStream);
                    IOUtils.closeQuietly((OutputStream)outputStream);
                    throw throwable;
                }
                IOUtils.closeQuietly((InputStream)inputStream);
                IOUtils.closeQuietly((OutputStream)outputStream);
                this.fileCounter.add(1L);
            }
        });
        res.print();
        Map accumulators = env.getLastJobExecutionResult().getAllAccumulatorResults();
        LOGGER.info("== COUNTERS ==");
        for (Map.Entry e : accumulators.entrySet()) {
            LOGGER.info((String)e.getKey() + ": " + e.getValue());
        }
    }

    private static boolean isLocal(ExecutionEnvironment env) {
        return env instanceof LocalEnvironment;
    }

    private static boolean isOnDistributedFS(Path path) throws IOException {
        return path.getFileSystem().isDistributedFS();
    }

    private static List<FileCopyTask> getCopyTasks(Path sourcePath) throws IOException {
        ArrayList<FileCopyTask> tasks = new ArrayList<FileCopyTask>();
        DistCp.getCopyTasks(sourcePath, "", tasks);
        return tasks;
    }

    private static void getCopyTasks(Path p, String rel, List<FileCopyTask> tasks) throws IOException {
        FileStatus[] res = p.getFileSystem().listStatus(p);
        if (res == null) {
            return;
        }
        for (FileStatus fs : res) {
            if (fs.isDir()) {
                DistCp.getCopyTasks(fs.getPath(), rel + fs.getPath().getName() + "/", tasks);
                continue;
            }
            Path cp = fs.getPath();
            tasks.add(new FileCopyTask(cp, rel + cp.getName()));
        }
    }
}

