/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.master.tableOps;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IsolatedScanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.master.thrift.BulkImportState;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.master.Master;
import org.apache.accumulo.master.tableOps.CleanUpBulkImport;
import org.apache.accumulo.master.tableOps.MasterRepo;
import org.apache.accumulo.server.fs.FileRef;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.master.LiveTServerSet;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
import org.apache.hadoop.fs.Path;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class CopyFailed
extends MasterRepo {
    private static final Logger log = LoggerFactory.getLogger(CopyFailed.class);
    private static final long serialVersionUID = 1L;
    private String tableId;
    private String source;
    private String bulk;
    private String error;

    public CopyFailed(String tableId, String source, String bulk, String error) {
        this.tableId = tableId;
        this.source = source;
        this.bulk = bulk;
        this.error = error;
    }

    @Override
    public long isReady(long tid, Master master) throws Exception {
        HashSet<TServerInstance> finished = new HashSet<TServerInstance>();
        Set<TServerInstance> running = master.onlineTabletServers();
        for (TServerInstance server : running) {
            try {
                LiveTServerSet.TServerConnection client = master.getConnection(server);
                if (client == null || client.isActive(tid)) continue;
                finished.add(server);
            }
            catch (TException ex) {
                log.info("Ignoring error trying to check on tid " + tid + " from server " + server + ": " + (Object)((Object)ex));
            }
        }
        if (finished.containsAll(running)) {
            return 0L;
        }
        return 500L;
    }

    @Override
    public Repo<Master> call(long tid, Master master) throws Exception {
        master.updateBulkImportStatus(this.source, BulkImportState.COPY_FILES);
        VolumeManager fs = master.getFileSystem();
        if (!fs.exists(new Path(this.error, "failures.txt"))) {
            return new CleanUpBulkImport(this.tableId, this.source, this.bulk, this.error);
        }
        HashMap<FileRef, String> failures = new HashMap<FileRef, String>();
        HashMap<FileRef, String> loadedFailures = new HashMap<FileRef, String>();
        try (BufferedReader in = new BufferedReader(new InputStreamReader((InputStream)fs.open(new Path(this.error, "failures.txt")), StandardCharsets.UTF_8));){
            String line = null;
            while ((line = in.readLine()) != null) {
                Path path = new Path(line);
                if (fs.exists(new Path(this.error, path.getName()))) continue;
                failures.put(new FileRef(line, path), line);
            }
        }
        Connector conn = master.getConnector();
        try (IsolatedScanner mscanner = new IsolatedScanner(conn.createScanner("accumulo.metadata", Authorizations.EMPTY));){
            mscanner.setRange(new KeyExtent(this.tableId, null, null).toMetadataRange());
            mscanner.fetchColumnFamily(MetadataSchema.TabletsSection.BulkFileColumnFamily.NAME);
            for (Map.Entry entry : mscanner) {
                FileRef loadedFile;
                String absPath;
                if (Long.parseLong(((Value)entry.getValue()).toString()) != tid || (absPath = (String)failures.remove(loadedFile = new FileRef(fs, (Key)entry.getKey()))) == null) continue;
                loadedFailures.put(loadedFile, absPath);
            }
        }
        for (String failure : failures.values()) {
            Path orig = new Path(failure);
            Path dest = new Path(this.error, orig.getName());
            fs.rename(orig, dest);
            log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": import failed");
        }
        if (loadedFailures.size() > 0) {
            DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue("/accumulo/" + master.getInstance().getInstanceID() + "/bulk_failed_copyq", master.getConfiguration());
            HashSet<String> workIds = new HashSet<String>();
            for (String failure : loadedFailures.values()) {
                Path orig = new Path(failure);
                Path dest = new Path(this.error, orig.getName());
                if (fs.exists(dest)) continue;
                bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes(StandardCharsets.UTF_8));
                workIds.add(orig.getName());
                log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + ": failed");
            }
            bifCopyQueue.waitUntilDone(workIds);
        }
        fs.deleteRecursively(new Path(this.error, "failures.txt"));
        return new CleanUpBulkImport(this.tableId, this.source, this.bulk, this.error);
    }
}

