package uk.gov.gchq.gaffer.accumulostore.operation.hdfs.handler;

import java.io.IOException;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.gov.gchq.gaffer.accumulostore.AccumuloStore;
import uk.gov.gchq.gaffer.accumulostore.operation.hdfs.handler.job.factory.AccumuloAddElementsFromHdfsJobFactory;
import uk.gov.gchq.gaffer.accumulostore.operation.hdfs.handler.job.tool.ImportElementsToAccumuloTool;
import uk.gov.gchq.gaffer.accumulostore.utils.AccumuloStoreConstants;
import uk.gov.gchq.gaffer.hdfs.operation.AddElementsFromHdfs;
import uk.gov.gchq.gaffer.hdfs.operation.SampleDataForSplitPoints;
import uk.gov.gchq.gaffer.hdfs.operation.handler.job.tool.AddElementsFromHdfsTool;
import uk.gov.gchq.gaffer.operation.OperationChain;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.operation.impl.SplitStoreFromFile;
import uk.gov.gchq.gaffer.operation.io.Output;
import uk.gov.gchq.gaffer.store.Context;
import uk.gov.gchq.gaffer.store.Store;
import uk.gov.gchq.gaffer.store.StoreException;
import uk.gov.gchq.gaffer.store.operation.handler.OperationHandler;

/* loaded from: input_file:uk/gov/gchq/gaffer/accumulostore/operation/hdfs/handler/AddElementsFromHdfsHandler.class */
public class AddElementsFromHdfsHandler implements OperationHandler<AddElementsFromHdfs> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) AddElementsFromHdfsHandler.class);

    @Override // uk.gov.gchq.gaffer.store.operation.handler.OperationHandler
    public Void doOperation(AddElementsFromHdfs addElementsFromHdfs, Context context, Store store) throws OperationException {
        doOperation(addElementsFromHdfs, context, (AccumuloStore) store);
        return null;
    }

    public void doOperation(AddElementsFromHdfs addElementsFromHdfs, Context context, AccumuloStore accumuloStore) throws OperationException {
        validateOperation(addElementsFromHdfs);
        if (null == addElementsFromHdfs.getSplitsFilePath()) {
            String str = getPathWithSlashSuffix(addElementsFromHdfs.getWorkingPath()) + context.getJobId() + "/splits";
            LOGGER.info("Using working directory for splits files: " + str);
            addElementsFromHdfs.setSplitsFilePath(str);
        }
        try {
            checkHdfsDirectories(addElementsFromHdfs, accumuloStore);
            if (!addElementsFromHdfs.isUseProvidedSplits() && needsSplitting(accumuloStore)) {
                sampleAndSplit(addElementsFromHdfs, context, accumuloStore);
            }
            fetchElements(addElementsFromHdfs, accumuloStore);
            String option = addElementsFromHdfs.getOption(AccumuloStoreConstants.ADD_ELEMENTS_FROM_HDFS_SKIP_IMPORT);
            if (null == option || !"TRUE".equalsIgnoreCase(option)) {
                importElements(addElementsFromHdfs, accumuloStore);
            } else {
                LOGGER.info("Skipping import as {} was {}", AccumuloStoreConstants.ADD_ELEMENTS_FROM_HDFS_SKIP_IMPORT, option);
            }
        } catch (IOException e) {
            throw new OperationException("Operation failed due to filesystem error: " + e.getMessage());
        }
    }

    private void validateOperation(AddElementsFromHdfs addElementsFromHdfs) {
        if (null != addElementsFromHdfs.getMinMapTasks()) {
            LOGGER.warn("minMapTasks field will be ignored");
        }
        if (null != addElementsFromHdfs.getMaxMapTasks()) {
            LOGGER.warn("maxMapTasks field will be ignored");
        }
        if (null != addElementsFromHdfs.getNumReduceTasks() && (null != addElementsFromHdfs.getMinReduceTasks() || null != addElementsFromHdfs.getMaxReduceTasks())) {
            throw new IllegalArgumentException("minReduceTasks and/or maxReduceTasks should not be set if numReduceTasks is");
        }
        if (null != addElementsFromHdfs.getMinReduceTasks() && null != addElementsFromHdfs.getMaxReduceTasks()) {
            LOGGER.warn("Logic for the minimum may result in more reducers than the maximum set");
            if (addElementsFromHdfs.getMinReduceTasks().intValue() > addElementsFromHdfs.getMaxReduceTasks().intValue()) {
                throw new IllegalArgumentException("Minimum number of reducers must be less than the maximum number of reducers");
            }
        }
        if (null == addElementsFromHdfs.getSplitsFilePath()) {
            throw new IllegalArgumentException("splitsFilePath is required");
        }
        if (null == addElementsFromHdfs.getWorkingPath()) {
            throw new IllegalArgumentException("workingPath is required");
        }
    }

    private boolean needsSplitting(AccumuloStore accumuloStore) throws OperationException {
        boolean z = false;
        try {
            if (accumuloStore.getConnection().tableOperations().listSplits(accumuloStore.getTableName(), 2).size() < 2) {
                try {
                    if (accumuloStore.getTabletServers().size() > 1) {
                        z = true;
                    }
                } catch (StoreException e) {
                    throw new OperationException("Unable to get accumulo's tablet servers", e);
                }
            }
            return z;
        } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException | StoreException e2) {
            throw new OperationException("Unable to get accumulo's split points", e2);
        }
    }

    private void sampleAndSplit(AddElementsFromHdfs addElementsFromHdfs, Context context, AccumuloStore accumuloStore) throws OperationException {
        LOGGER.info("Starting to sample input data to create splits points to set on the table");
        String workingPath = addElementsFromHdfs.getWorkingPath();
        if (null == workingPath) {
            throw new IllegalArgumentException("Prior to adding the data, the table needs to be split. To do this the workingPath must be set to a temporary directory");
        }
        String str = getPathWithSlashSuffix(workingPath) + context.getJobId();
        try {
            accumuloStore.execute((Output) new OperationChain.Builder().first(new SampleDataForSplitPoints.Builder().addInputMapperPairs(addElementsFromHdfs.getInputMapperPairs()).jobInitialiser(addElementsFromHdfs.getJobInitialiser()).mappers(addElementsFromHdfs.getNumMapTasks()).validate(addElementsFromHdfs.isValidate()).outputPath(str + "/sampleSplitsOutput").splitsFilePath(addElementsFromHdfs.getSplitsFilePath()).options(addElementsFromHdfs.getOptions()).build()).then(new SplitStoreFromFile.Builder().inputPath(addElementsFromHdfs.getSplitsFilePath()).options(addElementsFromHdfs.getOptions()).build()).build(), context);
            try {
                FileSystem fileSystem = FileSystem.get(new JobConf(new Configuration()));
                Path path = new Path(str);
                if (fileSystem.exists(path)) {
                    fileSystem.delete(path, true);
                }
            } catch (IOException e) {
                LOGGER.warn("Unable to delete temporary files used to calculate splits", (Throwable) e);
            }
        } catch (Throwable th) {
            try {
                FileSystem fileSystem2 = FileSystem.get(new JobConf(new Configuration()));
                Path path2 = new Path(str);
                if (fileSystem2.exists(path2)) {
                    fileSystem2.delete(path2, true);
                }
            } catch (IOException e2) {
                LOGGER.warn("Unable to delete temporary files used to calculate splits", (Throwable) e2);
            }
            throw th;
        }
    }

    private String getPathWithSlashSuffix(String str) {
        return str.endsWith("/") ? str : str + "/";
    }

    private void fetchElements(AddElementsFromHdfs addElementsFromHdfs, AccumuloStore accumuloStore) throws OperationException {
        AddElementsFromHdfsTool addElementsFromHdfsTool = new AddElementsFromHdfsTool(new AccumuloAddElementsFromHdfsJobFactory(), addElementsFromHdfs, accumuloStore);
        try {
            LOGGER.info("Running FetchElementsFromHdfsTool job");
            int run = ToolRunner.run(addElementsFromHdfsTool.getConfig(), addElementsFromHdfsTool, new String[0]);
            LOGGER.info("Finished running FetchElementsFromHdfsTool job");
            if (1 != run) {
                LOGGER.error("Failed to fetch elements from HDFS. Response code was {}", Integer.valueOf(run));
                throw new OperationException("Failed to fetch elements from HDFS. Response code was: " + run);
            }
        } catch (Exception e) {
            LOGGER.error("Failed to fetch elements from HDFS: {}", e.getMessage());
            throw new OperationException("Failed to fetch elements from HDFS", e);
        }
    }

    private void importElements(AddElementsFromHdfs addElementsFromHdfs, AccumuloStore accumuloStore) throws OperationException {
        ImportElementsToAccumuloTool importElementsToAccumuloTool = new ImportElementsToAccumuloTool(addElementsFromHdfs.getOutputPath(), addElementsFromHdfs.getFailurePath(), accumuloStore, addElementsFromHdfs.getOptions());
        try {
            LOGGER.info("Running import job");
            int run = ToolRunner.run(importElementsToAccumuloTool, new String[0]);
            LOGGER.info("Finished running import job");
            if (0 != run) {
                LOGGER.error("Failed to import elements into Accumulo. Response code was {}", Integer.valueOf(run));
                throw new OperationException("Failed to import elements into Accumulo. Response code was: " + run);
            }
        } catch (Exception e) {
            LOGGER.error("Failed to import elements into Accumulo: {}", e.getMessage());
            throw new OperationException("Failed to import elements into Accumulo", e);
        }
    }

    private void checkHdfsDirectories(AddElementsFromHdfs addElementsFromHdfs, AccumuloStore accumuloStore) throws IOException {
        AddElementsFromHdfsTool addElementsFromHdfsTool = new AddElementsFromHdfsTool(new AccumuloAddElementsFromHdfsJobFactory(), addElementsFromHdfs, accumuloStore);
        LOGGER.info("Checking that the correct HDFS directories exist");
        FileSystem fileSystem = FileSystem.get(addElementsFromHdfsTool.getConfig());
        Path path = new Path(addElementsFromHdfs.getOutputPath());
        LOGGER.info("Ensuring output directory {} doesn't exist", path);
        if (fileSystem.exists(path)) {
            if (fileSystem.listFiles(path, true).hasNext()) {
                LOGGER.error("Output directory exists and is not empty: {}", path);
                throw new IllegalArgumentException("Output directory exists and is not empty: " + path);
            }
            LOGGER.info("Output directory exists and is empty so deleting: {}", path);
            fileSystem.delete(path, true);
        }
    }
}
