/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.master.tableOps;

import java.io.BufferedWriter;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.impl.ServerClient;
import org.apache.accumulo.core.client.impl.thrift.ClientService;
import org.apache.accumulo.core.client.impl.thrift.TableOperation;
import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.SimpleThreadPool;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.master.Master;
import org.apache.accumulo.master.tableOps.BulkImport;
import org.apache.accumulo.master.tableOps.CompleteBulkImport;
import org.apache.accumulo.master.tableOps.MasterRepo;
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.security.SystemCredentials;
import org.apache.accumulo.trace.instrument.TraceExecutorService;
import org.apache.accumulo.trace.instrument.Tracer;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;

class LoadFiles
extends MasterRepo {
    private static final long serialVersionUID = 1L;
    private static ExecutorService threadPool = null;
    private static final Logger log = Logger.getLogger(BulkImport.class);
    private String tableId;
    private String source;
    private String bulk;
    private String errorDir;
    private boolean setTime;

    public LoadFiles(String tableId, String source, String bulk, String errorDir, boolean setTime) {
        this.tableId = tableId;
        this.source = source;
        this.bulk = bulk;
        this.errorDir = errorDir;
        this.setTime = setTime;
    }

    @Override
    public long isReady(long tid, Master master) throws Exception {
        if (master.onlineTabletServers().size() == 0) {
            return 500L;
        }
        return 0L;
    }

    private static synchronized ExecutorService getThreadPool(Master master) {
        if (threadPool == null) {
            int threadPoolSize = master.getSystemConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE);
            SimpleThreadPool pool = new SimpleThreadPool(threadPoolSize, "bulk import");
            pool.allowCoreThreadTimeOut(true);
            threadPool = new TraceExecutorService((ExecutorService)pool);
        }
        return threadPool;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Repo<Master> call(final long tid, final Master master) throws Exception {
        ExecutorService executor = LoadFiles.getThreadPool(master);
        SiteConfiguration conf = ServerConfiguration.getSiteConfiguration();
        VolumeManager fs = master.getFileSystem();
        ArrayList<FileStatus> files = new ArrayList<FileStatus>();
        for (FileStatus entry : fs.listStatus(new Path(this.bulk))) {
            files.add(entry);
        }
        log.debug((Object)("tid " + tid + " importing " + files.size() + " files"));
        Path writable = new Path(this.errorDir, ".iswritable");
        if (!fs.createNewFile(writable)) {
            fs.delete(writable);
            if (!fs.createNewFile(writable)) {
                throw new ThriftTableOperationException(this.tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, "Unable to write to " + this.errorDir);
            }
        }
        fs.delete(writable);
        Set<String> filesToLoad = Collections.synchronizedSet(new HashSet());
        for (FileStatus f : files) {
            filesToLoad.add(f.getPath().toString());
        }
        int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES));
        for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; ++attempt) {
            ArrayList<Future<List<String>>> results = new ArrayList<Future<List<String>>>();
            if (master.onlineTabletServers().size() == 0) {
                log.warn((Object)("There are no tablet server to process bulk import, waiting (tid = " + tid + ")"));
            }
            while (master.onlineTabletServers().size() == 0) {
                UtilWaitThread.sleep((long)500L);
            }
            final List loaded = Collections.synchronizedList(new ArrayList());
            for (final String file : filesToLoad) {
                results.add(executor.submit(new Callable<List<String>>(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public List<String> call() {
                        ClientService.Client client;
                        ArrayList<String> failures;
                        block5: {
                            failures = new ArrayList<String>();
                            client = null;
                            String server = null;
                            try {
                                long timeInMillis = master.getConfiguration().getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
                                Pair pair = ServerClient.getConnection((Instance)master.getInstance(), (boolean)false, (long)timeInMillis);
                                client = (ClientService.Client)pair.getSecond();
                                server = (String)pair.getFirst();
                                List<String> attempt = Collections.singletonList(file);
                                log.debug((Object)("Asking " + (String)pair.getFirst() + " to bulk import " + file));
                                List fail = client.bulkImportFiles(Tracer.traceInfo(), SystemCredentials.get().toThrift(master.getInstance()), tid, LoadFiles.this.tableId, attempt, LoadFiles.this.errorDir, LoadFiles.this.setTime);
                                if (fail.isEmpty()) {
                                    loaded.add(file);
                                    break block5;
                                }
                                failures.addAll(fail);
                            }
                            catch (Exception ex) {
                                try {
                                    log.error((Object)("rpc failed server:" + server + ", tid:" + tid + " " + ex));
                                }
                                catch (Throwable throwable) {
                                    ServerClient.close(client);
                                    throw throwable;
                                }
                                ServerClient.close((ClientService.Client)client);
                            }
                        }
                        ServerClient.close((ClientService.Client)client);
                        return failures;
                    }
                }));
            }
            HashSet failures = new HashSet();
            for (Future future : results) {
                failures.addAll((Collection)future.get());
            }
            filesToLoad.removeAll(loaded);
            if (filesToLoad.size() <= 0) continue;
            log.debug((Object)("tid " + tid + " attempt " + (attempt + 1) + " " + LoadFiles.sampleList(filesToLoad, 10) + " failed"));
            UtilWaitThread.sleep((long)100L);
        }
        FSDataOutputStream failFile = fs.create(new Path(this.errorDir, "failures.txt"), true);
        BufferedWriter out = new BufferedWriter(new OutputStreamWriter((OutputStream)failFile, Constants.UTF8));
        try {
            for (String f : filesToLoad) {
                out.write(f);
                out.write("\n");
            }
        }
        finally {
            out.close();
        }
        return new CompleteBulkImport(this.tableId, this.source, this.bulk, this.errorDir);
    }

    static String sampleList(Collection<?> potentiallyLongList, int max) {
        StringBuffer result = new StringBuffer();
        result.append("[");
        int i = 0;
        for (Object obj : potentiallyLongList) {
            result.append(obj);
            if (i >= max) {
                result.append("...");
                break;
            }
            result.append(", ");
            ++i;
        }
        if (i < max) {
            result.delete(result.length() - 2, result.length());
        }
        result.append("]");
        return result.toString();
    }
}

