/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.util;

import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Striped;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.SerializationUtils;
import org.apache.gobblin.util.WritableShimSerialization;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParallelRunner
implements Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger(ParallelRunner.class);
    public static final String PARALLEL_RUNNER_THREADS_KEY = "parallel.runner.threads";
    public static final int DEFAULT_PARALLEL_RUNNER_THREADS = 10;
    private final ExecutorService executor;
    private FileSystem fs;
    private final List<NamedFuture> futures = Lists.newArrayList();
    private final Striped<Lock> locks = Striped.lazyWeakLock((int)Integer.MAX_VALUE);
    private final FailPolicy failPolicy;

    public ParallelRunner(int threads, FileSystem fs) {
        this(threads, fs, FailPolicy.FAIL_ONE_FAIL_ALL);
    }

    public ParallelRunner(int threads, FileSystem fs, FailPolicy failPolicy) {
        this.executor = ExecutorsUtils.loggingDecorator(Executors.newFixedThreadPool(threads, ExecutorsUtils.newThreadFactory((Optional<Logger>)Optional.of((Object)LOGGER), (Optional<String>)Optional.of((Object)"ParallelRunner"))));
        this.fs = fs;
        this.failPolicy = failPolicy;
    }

    public <T extends State> void serializeToFile(final T state, final Path outputFilePath) {
        this.futures.add(new NamedFuture(this.executor.submit(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                SerializationUtils.serializeState(ParallelRunner.this.fs, outputFilePath, state);
                return null;
            }
        }), "Serialize state to " + outputFilePath));
    }

    public <T extends State> void deserializeFromFile(final T state, final Path inputFilePath) {
        this.futures.add(new NamedFuture(this.executor.submit(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                SerializationUtils.deserializeState(ParallelRunner.this.fs, inputFilePath, state);
                return null;
            }
        }), "Deserialize state from " + inputFilePath));
    }

    public <T extends State> void deserializeFromSequenceFile(final Class<? extends Writable> keyClass, final Class<T> stateClass, final Path inputFilePath, final Collection<T> states, final boolean deleteAfter) {
        this.futures.add(new NamedFuture(this.executor.submit(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                Configuration conf = new Configuration(ParallelRunner.this.fs.getConf());
                WritableShimSerialization.addToHadoopConfiguration(conf);
                try (SequenceFile.Reader reader = new SequenceFile.Reader(ParallelRunner.this.fs, inputFilePath, conf);){
                    Writable key = (Writable)keyClass.newInstance();
                    State state = (State)stateClass.newInstance();
                    while (reader.next(key)) {
                        state = (State)reader.getCurrentValue((Object)state);
                        states.add(state);
                        state = (State)stateClass.newInstance();
                    }
                    if (deleteAfter) {
                        HadoopUtils.deletePath(ParallelRunner.this.fs, inputFilePath, false);
                    }
                }
                return null;
            }
        }), "Deserialize state from file " + inputFilePath));
    }

    public void deletePath(final Path path, final boolean recursive) {
        this.futures.add(new NamedFuture(this.executor.submit(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                Lock lock = (Lock)ParallelRunner.this.locks.get((Object)path.toString());
                lock.lock();
                try {
                    HadoopUtils.deletePath(ParallelRunner.this.fs, path, recursive);
                    Void void_ = null;
                    return void_;
                }
                finally {
                    lock.unlock();
                }
            }
        }), "Delete path " + path));
    }

    public void renamePath(final Path src, final Path dst, final Optional<String> group) {
        this.futures.add(new NamedFuture(this.executor.submit(new Callable<Void>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public Void call() throws Exception {
                Lock lock = (Lock)ParallelRunner.this.locks.get((Object)src.toString());
                lock.lock();
                try {
                    if (ParallelRunner.this.fs.exists(src)) {
                        HadoopUtils.renamePath(ParallelRunner.this.fs, src, dst);
                        if (group.isPresent()) {
                            HadoopUtils.setGroup(ParallelRunner.this.fs, dst, (String)group.get());
                        }
                    }
                    Void void_ = null;
                    return void_;
                }
                catch (FileAlreadyExistsException e) {
                    LOGGER.warn(String.format("Failed to rename %s to %s: dst already exists", src, dst), (Throwable)e);
                    Void void_ = null;
                    return void_;
                }
                finally {
                    lock.unlock();
                }
            }
        }), "Rename " + src + " to " + dst));
    }

    public void movePath(Path src, FileSystem dstFs, Path dst, Optional<String> group) {
        this.movePath(src, dstFs, dst, false, group);
    }

    public void movePath(final Path src, final FileSystem dstFs, final Path dst, final boolean overwrite, final Optional<String> group) {
        this.futures.add(new NamedFuture(this.executor.submit(new Callable<Void>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public Void call() throws Exception {
                Lock lock = (Lock)ParallelRunner.this.locks.get((Object)src.toString());
                lock.lock();
                try {
                    if (ParallelRunner.this.fs.exists(src)) {
                        HadoopUtils.movePath(ParallelRunner.this.fs, src, dstFs, dst, overwrite, dstFs.getConf());
                        if (group.isPresent()) {
                            HadoopUtils.setGroup(dstFs, dst, (String)group.get());
                        }
                    }
                    Void void_ = null;
                    return void_;
                }
                catch (FileAlreadyExistsException e) {
                    LOGGER.warn(String.format("Failed to move %s to %s: dst already exists", src, dst), (Throwable)e);
                    Void void_ = null;
                    return void_;
                }
                finally {
                    lock.unlock();
                }
            }
        }), "Move " + src + " to " + dst));
    }

    public void submitCallable(Callable<Void> callable, String name) {
        this.futures.add(new NamedFuture(this.executor.submit(callable), name));
    }

    public void waitForTasks(long timeoutInMills) throws IOException {
        boolean wasInterrupted = false;
        IOException exception = null;
        for (NamedFuture future : this.futures) {
            try {
                if (wasInterrupted) {
                    future.getFuture().cancel(true);
                    continue;
                }
                future.getFuture().get(timeoutInMills, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException ie) {
                LOGGER.warn("Task was interrupted: " + future.getName());
                wasInterrupted = true;
                if (exception != null) continue;
                exception = new IOException(ie);
            }
            catch (ExecutionException ee) {
                LOGGER.warn("Task failed: " + future.getName(), ee.getCause());
                if (exception != null) continue;
                exception = new IOException(ee.getCause());
            }
            catch (TimeoutException te) {
                LOGGER.warn("Tasks not fully finished before Parallel runner waiting until timeout due to:", (Throwable)te);
                if (exception != null) continue;
                exception = new IOException(te.getCause());
            }
        }
        if (wasInterrupted) {
            Thread.currentThread().interrupt();
        }
        if (exception != null && this.failPolicy == FailPolicy.FAIL_ONE_FAIL_ALL) {
            throw exception;
        }
        this.futures.clear();
    }

    public void waitForTasks() throws IOException {
        this.waitForTasks(Long.MAX_VALUE);
    }

    @Override
    public void close() throws IOException {
        try {
            this.waitForTasks();
        }
        finally {
            ExecutorsUtils.shutdownExecutorService(this.executor, (Optional<Logger>)Optional.of((Object)LOGGER));
        }
    }

    public FileSystem getFs() {
        return this.fs;
    }

    public void setFs(FileSystem fs) {
        this.fs = fs;
    }

    public static class NamedFuture {
        private final Future<?> future;
        private final String name;

        public NamedFuture(Future<?> future, String name) {
            this.future = future;
            this.name = name;
        }

        public Future<?> getFuture() {
            return this.future;
        }

        public String getName() {
            return this.name;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof NamedFuture)) {
                return false;
            }
            NamedFuture other = (NamedFuture)o;
            if (!other.canEqual(this)) {
                return false;
            }
            Future<?> this$future = this.getFuture();
            Future<?> other$future = other.getFuture();
            if (this$future == null ? other$future != null : !this$future.equals(other$future)) {
                return false;
            }
            String this$name = this.getName();
            String other$name = other.getName();
            return !(this$name == null ? other$name != null : !this$name.equals(other$name));
        }

        protected boolean canEqual(Object other) {
            return other instanceof NamedFuture;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            Future<?> $future = this.getFuture();
            result = result * 59 + ($future == null ? 43 : $future.hashCode());
            String $name = this.getName();
            result = result * 59 + ($name == null ? 43 : $name.hashCode());
            return result;
        }

        public String toString() {
            return "ParallelRunner.NamedFuture(future=" + this.getFuture() + ", name=" + this.getName() + ")";
        }
    }

    public static enum FailPolicy {
        ISOLATE_FAILURES,
        FAIL_ONE_FAIL_ALL;

    }
}

