package org.apache.hadoop.hive.ql.exec.spark.session;

import io.trino.hive.$internal.com.google.common.annotations.VisibleForTesting;
import io.trino.hive.$internal.com.google.common.base.Preconditions;
import io.trino.hive.$internal.com.google.common.base.Throwables;
import io.trino.hive.$internal.com.google.common.collect.ImmutableMap;
import io.trino.hive.$internal.com.google.common.collect.Maps;
import io.trino.hive.$internal.org.slf4j.Logger;
import io.trino.hive.$internal.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.common.ObjectPair;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient;
import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.spark.SparkConf;
import org.apache.spark.util.Utils;

/* loaded from: input_file:org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.class */
public class SparkSessionImpl implements SparkSession {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SparkSession.class);
    private static final String SPARK_DIR = "_spark_session_dir";
    private static final String AM_TIMEOUT_ERR = ".*ApplicationMaster for attempt.*timed out.*";
    private static final String UNKNOWN_QUEUE_ERR = "(submitted by user.*to unknown queue:.*)\n";
    private static final String STOPPED_QUEUE_ERR = "(Queue.*is STOPPED)";
    private static final String FULL_QUEUE_ERR = "(Queue.*already has.*applications)";
    private static final String INVALILD_MEM_ERR = "(Required executor memory.*is above the max threshold.*) of this";
    private static final String INVALID_CORE_ERR = "(initial executor number.*must between min executor.*and max executor number.*)\n";
    private static Map<String, Pattern> errorPatterns;
    private HiveConf conf;
    private boolean isOpen;
    private HiveSparkClient hiveSparkClient;
    private Path scratchDir;
    private final Object dirLock = new Object();
    private String matchedString = null;
    private final String sessionId = makeSessionId();

    public SparkSessionImpl() {
        initErrorPatterns();
    }

    @Override // org.apache.hadoop.hive.ql.exec.spark.session.SparkSession
    public void open(HiveConf hiveConf) throws HiveException {
        LOG.info("Trying to open Spark session {}", this.sessionId);
        this.conf = hiveConf;
        this.isOpen = true;
        try {
            this.hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(hiveConf, this.sessionId);
            LOG.info("Spark session {} is successfully opened", this.sessionId);
        } catch (Throwable th) {
            throw (this.isOpen ? getHiveException(th) : new HiveException(th, ErrorMsg.SPARK_CREATE_CLIENT_CLOSED_SESSION, this.sessionId));
        }
    }

    @Override // org.apache.hadoop.hive.ql.exec.spark.session.SparkSession
    public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception {
        Preconditions.checkState(this.isOpen, "Session is not open. Can't submit jobs.");
        return this.hiveSparkClient.execute(driverContext, sparkWork);
    }

    @Override // org.apache.hadoop.hive.ql.exec.spark.session.SparkSession
    public ObjectPair<Long, Integer> getMemoryAndCores() throws Exception {
        int max;
        SparkConf sparkConf = this.hiveSparkClient.getSparkConf();
        int executorCount = this.hiveSparkClient.getExecutorCount();
        if (executorCount <= 0) {
            return new ObjectPair<>(-1L, -1);
        }
        int memoryStringToMb = Utils.memoryStringToMb(sparkConf.get("spark.executor.memory", "512m"));
        double d = 1.0d - sparkConf.getDouble("spark.storage.memoryFraction", 0.6d);
        long j = (long) (executorCount * memoryStringToMb * d * 1024.0d * 1024.0d);
        String str = sparkConf.get("spark.master");
        if (str.startsWith("spark") || str.startsWith("local")) {
            max = Math.max(sparkConf.contains("spark.default.parallelism") ? sparkConf.getInt("spark.default.parallelism", 1) : this.hiveSparkClient.getDefaultParallelism(), executorCount);
        } else {
            max = executorCount * sparkConf.getInt("spark.executor.cores", 1);
        }
        int i = max / sparkConf.getInt("spark.task.cpus", 1);
        long j2 = j / i;
        LOG.info("Spark cluster current has executors: " + executorCount + ", total cores: " + i + ", memory per executor: " + memoryStringToMb + "M, memoryFraction: " + d);
        return new ObjectPair<>(Long.valueOf(j2), Integer.valueOf(i));
    }

    @Override // org.apache.hadoop.hive.ql.exec.spark.session.SparkSession
    public boolean isOpen() {
        return this.isOpen;
    }

    @Override // org.apache.hadoop.hive.ql.exec.spark.session.SparkSession
    public HiveConf getConf() {
        return this.conf;
    }

    @Override // org.apache.hadoop.hive.ql.exec.spark.session.SparkSession
    public String getSessionId() {
        return this.sessionId;
    }

    @Override // org.apache.hadoop.hive.ql.exec.spark.session.SparkSession
    public void close() {
        LOG.info("Trying to close Spark session {}", this.sessionId);
        this.isOpen = false;
        if (this.hiveSparkClient != null) {
            try {
                this.hiveSparkClient.close();
                LOG.info("Spark session {} is successfully closed", this.sessionId);
                cleanScratchDir();
            } catch (IOException e) {
                LOG.error("Failed to close spark session (" + this.sessionId + ").", (Throwable) e);
            }
        }
        this.hiveSparkClient = null;
    }

    private Path createScratchDir() throws IOException {
        Path path = new Path(new Path(SessionState.get().getHdfsScratchDirURIString(), SPARK_DIR), this.sessionId);
        FileSystem fileSystem = path.getFileSystem(this.conf);
        fileSystem.mkdirs(path, new FsPermission(HiveConf.getVar(this.conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION)));
        fileSystem.deleteOnExit(path);
        return path;
    }

    private static void initErrorPatterns() {
        errorPatterns = Maps.newHashMap(new ImmutableMap.Builder().put(AM_TIMEOUT_ERR, Pattern.compile(AM_TIMEOUT_ERR)).put(UNKNOWN_QUEUE_ERR, Pattern.compile(UNKNOWN_QUEUE_ERR)).put(STOPPED_QUEUE_ERR, Pattern.compile(STOPPED_QUEUE_ERR)).put(FULL_QUEUE_ERR, Pattern.compile(FULL_QUEUE_ERR)).put(INVALILD_MEM_ERR, Pattern.compile(INVALILD_MEM_ERR)).put(INVALID_CORE_ERR, Pattern.compile(INVALID_CORE_ERR)).build());
    }

    @VisibleForTesting
    HiveException getHiveException(Throwable th) {
        while (th != null) {
            if (th instanceof TimeoutException) {
                return new HiveException(th, ErrorMsg.SPARK_CREATE_CLIENT_TIMEOUT);
            }
            if (th instanceof InterruptedException) {
                return new HiveException(th, ErrorMsg.SPARK_CREATE_CLIENT_INTERRUPTED, this.sessionId);
            }
            if (th instanceof RuntimeException) {
                String stackTraceAsString = Throwables.getStackTraceAsString(th);
                return matches(stackTraceAsString, AM_TIMEOUT_ERR) ? new HiveException(th, ErrorMsg.SPARK_CREATE_CLIENT_TIMEOUT) : (matches(stackTraceAsString, UNKNOWN_QUEUE_ERR) || matches(stackTraceAsString, STOPPED_QUEUE_ERR)) ? new HiveException(th, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_QUEUE, this.matchedString) : matches(stackTraceAsString, FULL_QUEUE_ERR) ? new HiveException(th, ErrorMsg.SPARK_CREATE_CLIENT_QUEUE_FULL, this.matchedString) : (matches(stackTraceAsString, INVALILD_MEM_ERR) || matches(stackTraceAsString, INVALID_CORE_ERR)) ? new HiveException(th, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_RESOURCE_REQUEST, this.matchedString) : new HiveException(th, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, this.sessionId);
            }
            th = th.getCause();
        }
        return new HiveException(th, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, this.sessionId);
    }

    @VisibleForTesting
    String getMatchedString() {
        return this.matchedString;
    }

    private boolean matches(String str, String str2) {
        if (!errorPatterns.containsKey(str2)) {
            LOG.warn("No error pattern found for regex: {}", str2);
            return false;
        }
        Matcher matcher = errorPatterns.get(str2).matcher(str);
        boolean find = matcher.find();
        if (find && matcher.groupCount() == 1) {
            this.matchedString = matcher.group(1);
        }
        return find;
    }

    private void cleanScratchDir() throws IOException {
        if (this.scratchDir != null) {
            this.scratchDir.getFileSystem(this.conf).delete(this.scratchDir, true);
            this.scratchDir = null;
        }
    }

    @Override // org.apache.hadoop.hive.ql.exec.spark.session.SparkSession
    public Path getHDFSSessionDir() throws IOException {
        if (this.scratchDir == null) {
            synchronized (this.dirLock) {
                if (this.scratchDir == null) {
                    this.scratchDir = createScratchDir();
                }
            }
        }
        return this.scratchDir;
    }

    public static String makeSessionId() {
        return UUID.randomUUID().toString();
    }

    @VisibleForTesting
    HiveSparkClient getHiveSparkClient() {
        return this.hiveSparkClient;
    }
}
