/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.client.web;

import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.plan.FlinkPlan;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobSubmissionServlet
extends HttpServlet {
    private static final long serialVersionUID = 8447312301029847397L;
    public static final String START_PAGE_URL = "launch.html";
    private static final String ACTION_PARAM_NAME = "action";
    private static final String ACTION_SUBMIT_VALUE = "submit";
    private static final String ACTION_RUN_SUBMITTED_VALUE = "runsubmitted";
    private static final String ACTION_BACK_VALUE = "back";
    private static final String OPTIONS_PARAM_NAME = "options";
    private static final String JOB_PARAM_NAME = "job";
    private static final String CLASS_PARAM_NAME = "assemblerClass";
    private static final String ARGUMENTS_PARAM_NAME = "arguments";
    private static final String SHOW_PLAN_PARAM_NAME = "show_plan";
    private static final String SUSPEND_PARAM_NAME = "suspend";
    private static final Logger LOG = LoggerFactory.getLogger(JobSubmissionServlet.class);
    private final File jobStoreDirectory;
    private final File planDumpDirectory;
    private final Map<Long, Tuple2<PackagedProgram, FlinkPlan>> submittedJobs;
    private final Random rand;
    private final CliFrontend cli;

    public JobSubmissionServlet(CliFrontend cli, File jobDir, File planDir) {
        this.cli = cli;
        this.jobStoreDirectory = jobDir;
        this.planDumpDirectory = planDir;
        this.submittedJobs = Collections.synchronizedMap(new HashMap());
        this.rand = new Random(System.currentTimeMillis());
    }

    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        String action = req.getParameter(ACTION_PARAM_NAME);
        if (this.checkParameterSet(resp, action, ACTION_PARAM_NAME)) {
            return;
        }
        if (action.equals(ACTION_SUBMIT_VALUE)) {
            FlinkPlan optPlan;
            List<String> cliArguments;
            List<String> cliOptions;
            String options = req.getParameter(OPTIONS_PARAM_NAME);
            String jobName = req.getParameter(JOB_PARAM_NAME);
            String assemblerClass = req.getParameter(CLASS_PARAM_NAME);
            String arguments = req.getParameter(ARGUMENTS_PARAM_NAME);
            String showPlan = req.getParameter(SHOW_PLAN_PARAM_NAME);
            String suspendPlan = req.getParameter(SUSPEND_PARAM_NAME);
            if (this.checkParameterSet(resp, jobName, JOB_PARAM_NAME) || this.checkParameterSet(resp, arguments, ARGUMENTS_PARAM_NAME) || this.checkParameterSet(resp, showPlan, SHOW_PLAN_PARAM_NAME) || this.checkParameterSet(resp, suspendPlan, SUSPEND_PARAM_NAME)) {
                return;
            }
            boolean show = Boolean.parseBoolean(showPlan);
            boolean suspend = Boolean.parseBoolean(suspendPlan);
            try {
                cliOptions = JobSubmissionServlet.tokenizeArguments(options);
            }
            catch (IllegalArgumentException iaex) {
                this.showErrorPage(resp, "Flink options contain an unterminated quoted string.");
                return;
            }
            try {
                cliArguments = JobSubmissionServlet.tokenizeArguments(arguments);
            }
            catch (IllegalArgumentException iaex) {
                this.showErrorPage(resp, "Program arguments contain an unterminated quoted string.");
                return;
            }
            String[] args = new String[1 + (assemblerClass == null ? 0 : 2) + cliOptions.size() + 1 + cliArguments.size()];
            ArrayList<String> parameters = new ArrayList<String>(args.length);
            parameters.add("info");
            parameters.addAll(cliOptions);
            if (assemblerClass != null) {
                parameters.add("-" + CliFrontendParser.CLASS_OPTION.getOpt());
                parameters.add(assemblerClass);
            }
            parameters.add(this.jobStoreDirectory + File.separator + jobName);
            parameters.addAll(cliArguments);
            try {
                this.cli.parseParameters(parameters.toArray(args));
                optPlan = this.cli.getFlinkPlan();
                if (optPlan == null) {
                    throw new RuntimeException(new Exception("The optimized plan could not be produced."));
                }
            }
            catch (RuntimeException e) {
                Throwable t = e.getCause();
                if (t instanceof ProgramInvocationException) {
                    StringWriter sw = new StringWriter();
                    PrintWriter w = new PrintWriter(sw);
                    if (t.getCause() == null) {
                        t.printStackTrace(w);
                    } else {
                        t.getCause().printStackTrace(w);
                    }
                    String message = sw.toString();
                    message = StringEscapeUtils.escapeHtml4((String)message);
                    this.showErrorPage(resp, "An error occurred while invoking the program:<br/><br/>" + t.getMessage() + "<br/>" + "<br/><br/><pre>" + message + "</pre>");
                    return;
                }
                if (t instanceof CompilerException) {
                    StringWriter sw = new StringWriter();
                    PrintWriter w = new PrintWriter(sw);
                    t.printStackTrace(w);
                    String message = sw.toString();
                    message = StringEscapeUtils.escapeHtml4((String)message);
                    this.showErrorPage(resp, "An error occurred in the compiler:<br/><br/>" + t.getMessage() + "<br/>" + (t.getCause() != null ? "Caused by: " + t.getCause().getMessage() : "") + "<br/><br/><pre>" + message + "</pre>");
                    return;
                }
                StringWriter sw = new StringWriter();
                PrintWriter w = new PrintWriter(sw);
                t.printStackTrace(w);
                String message = sw.toString();
                message = StringEscapeUtils.escapeHtml4((String)message);
                this.showErrorPage(resp, "An unexpected error occurred:<br/><br/>" + t.getMessage() + "<br/><br/><pre>" + message + "</pre>");
                return;
            }
            if (show) {
                Long uid;
                while (this.submittedJobs.containsKey(uid = Long.valueOf(Math.abs(this.rand.nextLong())))) {
                }
                String planName = uid + ".json";
                File jsonFile = new File(this.planDumpDirectory, planName);
                if (optPlan instanceof StreamingPlan) {
                    ((StreamingPlan)optPlan).dumpStreamingPlanAsJSON(jsonFile);
                } else {
                    PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
                    jsonGen.setEncodeForHTML(true);
                    jsonGen.dumpOptimizerPlanAsJSON((OptimizedPlan)optPlan, jsonFile);
                }
                if (!suspend) {
                    parameters.set(0, "run");
                    try {
                        this.cli.parseParameters(parameters.toArray(args));
                    }
                    catch (RuntimeException e) {
                        LOG.error("Error submitting job to the job-manager.", e.getCause());
                        this.showErrorPage(resp, e.getCause().getMessage());
                        return;
                    }
                } else {
                    this.submittedJobs.put(uid, (Tuple2<PackagedProgram, FlinkPlan>)new Tuple2((Object)this.cli.getPackagedProgram(), (Object)optPlan));
                }
                resp.sendRedirect("showPlan?id=" + uid + "&suspended=" + (suspend ? "true" : "false"));
            } else {
                parameters.set(0, "run");
                try {
                    this.cli.parseParameters(parameters.toArray(args));
                }
                catch (RuntimeException e) {
                    LOG.error("Error submitting job to the job-manager.", e.getCause());
                    String errorMessage = e.getCause().getMessage().split("\n")[0];
                    this.showErrorPage(resp, errorMessage);
                    return;
                }
                resp.sendRedirect(START_PAGE_URL);
            }
        } else if (action.equals(ACTION_RUN_SUBMITTED_VALUE)) {
            Long uid;
            String id = req.getParameter("id");
            if (this.checkParameterSet(resp, id, "id")) {
                return;
            }
            try {
                uid = Long.parseLong(id);
            }
            catch (NumberFormatException nfex) {
                this.showErrorPage(resp, "An invalid id for the job was provided.");
                return;
            }
            Tuple2<PackagedProgram, FlinkPlan> job = this.submittedJobs.remove(uid);
            if (job == null) {
                resp.sendError(400, "No job with the given uid was retained for later submission.");
                return;
            }
            try {
                Client client = new Client(GlobalConfiguration.getConfiguration());
                client.runDetached(client.getJobGraph((PackagedProgram)job.f0, (FlinkPlan)job.f1), ((PackagedProgram)job.f0).getUserCodeClassLoader());
            }
            catch (Exception ex) {
                LOG.error("Error submitting job to the job-manager.", (Throwable)ex);
                resp.setStatus(400);
                String errorMessage = ex.getMessage().split("\n")[0];
                resp.getWriter().print(errorMessage);
                return;
            }
            resp.sendRedirect(START_PAGE_URL);
        } else if (action.equals(ACTION_BACK_VALUE)) {
            Long uid;
            String id = req.getParameter("id");
            if (this.checkParameterSet(resp, id, "id")) {
                return;
            }
            try {
                uid = Long.parseLong(id);
            }
            catch (NumberFormatException nfex) {
                this.showErrorPage(resp, "An invalid id for the job was provided.");
                return;
            }
            this.submittedJobs.remove(uid);
            resp.sendRedirect(START_PAGE_URL);
        } else {
            this.showErrorPage(resp, "Invalid action specified.");
        }
    }

    private void showErrorPage(HttpServletResponse resp, String message) throws IOException {
        resp.setStatus(200);
        resp.setContentType("text/html;charset=utf-8");
        PrintWriter writer = resp.getWriter();
        writer.println("<!DOCTYPE html PUBLIC \"-//W3C//DTD XHTML 1.0 Transitional//EN\"\n        \"http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd\">");
        writer.println("<html>");
        writer.println("<head>");
        writer.println("  <title>Launch Job - Error</title>");
        writer.println("  <meta http-equiv=\"content-type\" content=\"text/html; charset=UTF-8\" />");
        writer.println("  <link rel=\"stylesheet\" type=\"text/css\" href=\"css/nephelefrontend.css\" />");
        writer.println("</head>");
        writer.println("<body>");
        writer.println("  <div class=\"mainHeading\">");
        writer.println("    <h1><img src=\"img/flink-logo.png\" width=\"100\" height=\"100\" alt=\"Flink Logo\" align=\"middle\"/>Flink Web Submission Client</h1>");
        writer.println("  </div>");
        writer.println("  <div style=\"margin-top: 50px; text-align: center;\">");
        writer.println("    <p class=\"error_text\" style=\"font-size: 18px;\">");
        writer.println(message);
        writer.println("    </p><br/><br/>");
        writer.println("    <form action=\"launch.html\" method=\"GET\">");
        writer.println("      <input type=\"submit\" value=\"back\">");
        writer.println("    </form>");
        writer.println("  </div>");
        writer.println("</body>");
        writer.println("</html>");
    }

    private boolean checkParameterSet(HttpServletResponse resp, String parameter, String parameterName) throws IOException {
        if (parameter == null) {
            this.showErrorPage(resp, "The parameter '" + parameterName + "' is not set.");
            return true;
        }
        return false;
    }

    private static List<String> tokenizeArguments(String args) {
        ArrayList<String> list = new ArrayList<String>();
        StringBuilder curr = new StringBuilder();
        boolean quoted = false;
        for (int pos = 0; pos < args.length(); ++pos) {
            char c = args.charAt(pos);
            if (!(c != ' ' && c != '\t' || quoted)) {
                if (curr.length() <= 0) continue;
                list.add(curr.toString());
                curr.setLength(0);
                continue;
            }
            if (c == '\"') {
                quoted = !quoted;
                continue;
            }
            curr.append(c);
        }
        if (quoted) {
            throw new IllegalArgumentException("Unterminated quoted string.");
        }
        if (curr.length() > 0) {
            list.add(curr.toString());
        }
        return list;
    }
}

