package com.netflix.conductor.client.task;

import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.netflix.appinfo.InstanceInfo;
import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.worker.PropertyFactory;
import com.netflix.conductor.client.worker.Worker;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.conductor.common.utils.RetryUtil;
import com.netflix.discovery.EurekaClient;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netflix/conductor/client/task/WorkflowTaskCoordinator.class */
public class WorkflowTaskCoordinator {
    private static final Logger logger = LoggerFactory.getLogger(WorkflowTaskCoordinator.class);
    private TaskClient taskClient;
    private ExecutorService executorService;
    private ScheduledExecutorService scheduledExecutorService;
    private EurekaClient eurekaClient;
    private List<Worker> workers = new LinkedList();
    private int sleepWhenRetry;
    private int updateRetryCount;
    private int workerQueueSize;
    private LinkedBlockingQueue<Runnable> workerQueue;
    private int threadCount;
    private String workerNamePrefix;
    private static final String DOMAIN = "domain";
    private static final String ALL_WORKERS = "all";
    private static final long SHUTDOWN_WAIT_TIME_IN_SEC = 10;

    /* loaded from: input_file:com/netflix/conductor/client/task/WorkflowTaskCoordinator$Builder.class */
    public static class Builder {
        private String workerNamePrefix = "workflow-worker-";
        private int sleepWhenRetry = 500;
        private int updateRetryCount = 3;
        private int workerQueueSize = 100;
        private int threadCount = -1;
        private Iterable<Worker> taskWorkers;
        private EurekaClient eurekaClient;
        private TaskClient taskClient;

        public Builder withWorkerNamePrefix(String str) {
            this.workerNamePrefix = str;
            return this;
        }

        public Builder withSleepWhenRetry(int i) {
            this.sleepWhenRetry = i;
            return this;
        }

        public Builder withUpdateRetryCount(int i) {
            this.updateRetryCount = i;
            return this;
        }

        public Builder withWorkerQueueSize(int i) {
            this.workerQueueSize = i;
            return this;
        }

        public Builder withThreadCount(int i) {
            if (i < 1) {
                throw new IllegalArgumentException("No. of threads cannot be less than 1");
            }
            this.threadCount = i;
            return this;
        }

        public Builder withTaskClient(TaskClient taskClient) {
            this.taskClient = taskClient;
            return this;
        }

        public Builder withEurekaClient(EurekaClient eurekaClient) {
            this.eurekaClient = eurekaClient;
            return this;
        }

        public Builder withWorkers(Iterable<Worker> iterable) {
            this.taskWorkers = iterable;
            return this;
        }

        public Builder withWorkers(Worker... workerArr) {
            this.taskWorkers = Arrays.asList(workerArr);
            return this;
        }

        public WorkflowTaskCoordinator build() {
            if (this.taskWorkers == null) {
                throw new IllegalArgumentException("No task workers are specified. use withWorkers() to add one mor more task workers");
            }
            if (this.taskClient == null) {
                throw new IllegalArgumentException("No TaskClient provided. use withTaskClient() to provide one");
            }
            return new WorkflowTaskCoordinator(this.eurekaClient, this.taskClient, this.threadCount, this.sleepWhenRetry, this.updateRetryCount, this.workerQueueSize, this.taskWorkers, this.workerNamePrefix);
        }
    }

    public WorkflowTaskCoordinator(EurekaClient eurekaClient, TaskClient taskClient, int i, int i2, int i3, int i4, Iterable<Worker> iterable, String str) {
        this.eurekaClient = eurekaClient;
        this.taskClient = taskClient;
        this.threadCount = i;
        this.sleepWhenRetry = i2;
        this.updateRetryCount = i3;
        this.workerQueueSize = i4;
        this.workerNamePrefix = str;
        List<Worker> list = this.workers;
        list.getClass();
        iterable.forEach((v1) -> {
            r1.add(v1);
        });
    }

    public synchronized void init() {
        if (this.threadCount == -1) {
            this.threadCount = this.workers.size();
        }
        logger.info("Initialized the worker with {} threads", Integer.valueOf(this.threadCount));
        this.workerQueue = new LinkedBlockingQueue<>(this.workerQueueSize);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        this.executorService = new ThreadPoolExecutor(this.threadCount, this.threadCount, 0L, TimeUnit.MILLISECONDS, this.workerQueue, runnable -> {
            Thread thread = new Thread(runnable);
            thread.setName(this.workerNamePrefix + atomicInteger.getAndIncrement());
            return thread;
        });
        this.scheduledExecutorService = Executors.newScheduledThreadPool(this.workers.size());
        this.workers.forEach(worker -> {
            this.scheduledExecutorService.scheduleWithFixedDelay(() -> {
                pollForTask(worker);
            }, worker.getPollingInterval(), worker.getPollingInterval(), TimeUnit.MILLISECONDS);
        });
    }

    public void shutdown() {
        this.scheduledExecutorService.shutdown();
        this.executorService.shutdown();
        shutdownExecutorService(this.scheduledExecutorService, SHUTDOWN_WAIT_TIME_IN_SEC);
        shutdownExecutorService(this.executorService, SHUTDOWN_WAIT_TIME_IN_SEC);
    }

    private void shutdownExecutorService(ExecutorService executorService, long j) {
        try {
            if (executorService.awaitTermination(j, TimeUnit.SECONDS)) {
                logger.debug("tasks completed, shutting down");
            } else {
                logger.warn(String.format("forcing shutdown after waiting for %s second", Long.valueOf(j)));
                executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            logger.warn("shutdown interrupted, invoking shutdownNow");
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    private void pollForTask(Worker worker) {
        int min;
        if (this.eurekaClient != null && !this.eurekaClient.getInstanceRemoteStatus().equals(InstanceInfo.InstanceStatus.UP)) {
            logger.debug("Instance is NOT UP in discovery - will not poll");
            return;
        }
        if (worker.paused()) {
            WorkflowTaskMetrics.incrementTaskPausedCount(worker.getTaskDefName());
            logger.debug("Worker {} has been paused. Not polling anymore!", worker.getClass());
            return;
        }
        String str = (String) Optional.ofNullable(PropertyFactory.getString(worker.getTaskDefName(), DOMAIN, null)).orElse(PropertyFactory.getString(ALL_WORKERS, DOMAIN, null));
        logger.debug("Polling {}, domain={}, count = {} timeout = {} ms", new Object[]{worker.getTaskDefName(), str, Integer.valueOf(worker.getPollCount()), Integer.valueOf(worker.getLongPollTimeoutInMS())});
        List<Task> emptyList = Collections.emptyList();
        try {
            min = Math.min(this.workerQueue.remainingCapacity(), worker.getPollCount());
        } catch (Exception e) {
            WorkflowTaskMetrics.incrementTaskPollErrorCount(worker.getTaskDefName(), e);
            logger.error("Error when polling for tasks", e);
        }
        if (min <= 0) {
            logger.warn("All workers are busy, not polling. queue size = {}, max = {}", Integer.valueOf(this.workerQueue.size()), Integer.valueOf(this.workerQueueSize));
            return;
        }
        String taskDefName = worker.getTaskDefName();
        emptyList = (List) WorkflowTaskMetrics.getPollTimer(taskDefName).record(() -> {
            return this.taskClient.batchPollTasksInDomain(taskDefName, str, worker.getIdentity(), min, worker.getLongPollTimeoutInMS());
        });
        WorkflowTaskMetrics.incrementTaskPollCount(taskDefName, emptyList.size());
        logger.debug("Polled {}, domain {}, received {} tasks in worker - {}", new Object[]{worker.getTaskDefName(), str, Integer.valueOf(emptyList.size()), worker.getIdentity()});
        for (Task task : emptyList) {
            try {
                this.executorService.submit(() -> {
                    try {
                        logger.debug("Executing task {}, taskId - {} in worker - {}", new Object[]{task.getTaskDefName(), task.getTaskId(), worker.getIdentity()});
                        execute(worker, task);
                    } catch (Throwable th) {
                        task.setStatus(Task.Status.FAILED);
                        handleException(th, new TaskResult(task), worker, task);
                    }
                });
            } catch (RejectedExecutionException e2) {
                WorkflowTaskMetrics.incrementTaskExecutionQueueFullCount(worker.getTaskDefName());
                logger.error("Execution queue is full, returning task: {}", task.getTaskId(), e2);
                returnTask(worker, task);
            }
        }
    }

    private void execute(Worker worker, Task task) {
        String taskDefName = task.getTaskDefName();
        try {
            if (!worker.preAck(task)) {
                logger.debug("Worker decided not to ack the task {}, taskId = {}", taskDefName, task.getTaskId());
                return;
            }
            if (!this.taskClient.ack(task.getTaskId(), worker.getIdentity()).booleanValue()) {
                WorkflowTaskMetrics.incrementTaskAckFailedCount(worker.getTaskDefName());
                return;
            }
            logger.debug("Ack successful for {}, taskId = {}", taskDefName, task.getTaskId());
            Stopwatch createStarted = Stopwatch.createStarted();
            TaskResult taskResult = null;
            try {
                try {
                    logger.debug("Executing task {} in worker {} at {}", new Object[]{task, worker.getClass().getSimpleName(), worker.getIdentity()});
                    taskResult = worker.execute(task);
                    taskResult.setWorkflowInstanceId(task.getWorkflowInstanceId());
                    taskResult.setTaskId(task.getTaskId());
                    taskResult.setWorkerId(worker.getIdentity());
                    createStarted.stop();
                    WorkflowTaskMetrics.getExecutionTimer(worker.getTaskDefName()).record(createStarted.elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
                } catch (Exception e) {
                    logger.error("Unable to execute task {}", task, e);
                    if (taskResult == null) {
                        task.setStatus(Task.Status.FAILED);
                        taskResult = new TaskResult(task);
                    }
                    handleException(e, taskResult, worker, task);
                    createStarted.stop();
                    WorkflowTaskMetrics.getExecutionTimer(worker.getTaskDefName()).record(createStarted.elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
                }
                logger.debug("Task {} executed by worker {} at {} with status {}", new Object[]{task.getTaskId(), worker.getClass().getSimpleName(), worker.getIdentity(), task.getStatus()});
                updateWithRetry(this.updateRetryCount, task, taskResult, worker);
            } catch (Throwable th) {
                createStarted.stop();
                WorkflowTaskMetrics.getExecutionTimer(worker.getTaskDefName()).record(createStarted.elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
                throw th;
            }
        } catch (Exception e2) {
            logger.error(String.format("ack exception for task %s, taskId = %s in worker - %s", task.getTaskDefName(), task.getTaskId(), worker.getIdentity()), e2);
            WorkflowTaskMetrics.incrementTaskAckErrorCount(worker.getTaskDefName(), e2);
        }
    }

    public int getThreadCount() {
        return this.threadCount;
    }

    public int getWorkerQueueSize() {
        return this.workerQueueSize;
    }

    public int getSleepWhenRetry() {
        return this.sleepWhenRetry;
    }

    public int getUpdateRetryCount() {
        return this.updateRetryCount;
    }

    public String getWorkerNamePrefix() {
        return this.workerNamePrefix;
    }

    private void updateWithRetry(int i, Task task, TaskResult taskResult, Worker worker) {
        try {
            String format = String.format("Retry updating task result: %s for task: %s in worker: %s", taskResult.toString(), task.getTaskDefName(), worker.getIdentity());
            new RetryUtil().retryOnException(() -> {
                this.taskClient.evaluateAndUploadLargePayload(taskResult, task.getTaskType());
                return null;
            }, (Predicate) null, (Predicate) null, i, String.format("Evaluate Task payload for task: %s in worker: %s", task.getTaskDefName(), worker.getIdentity()), "updateWithRetry");
            new RetryUtil().retryOnException(() -> {
                this.taskClient.updateTask(taskResult);
                return null;
            }, (Predicate) null, (Predicate) null, i, format, "updateWithRetry");
        } catch (Exception e) {
            worker.onErrorUpdate(task);
            WorkflowTaskMetrics.incrementTaskUpdateErrorCount(worker.getTaskDefName(), e);
            logger.error(String.format("Failed to update result: %s for task: %s in worker: %s", taskResult.toString(), task.getTaskDefName(), worker.getIdentity()), e);
        }
    }

    private void handleException(Throwable th, TaskResult taskResult, Worker worker, Task task) {
        logger.error(String.format("Error while executing task %s", task.toString()), th);
        WorkflowTaskMetrics.incrementTaskExecutionErrorCount(worker.getTaskDefName(), th);
        taskResult.setStatus(TaskResult.Status.FAILED);
        taskResult.setReasonForIncompletion("Error while executing the task: " + th);
        StringWriter stringWriter = new StringWriter();
        th.printStackTrace(new PrintWriter(stringWriter));
        taskResult.log(stringWriter.toString());
        updateWithRetry(this.updateRetryCount, task, taskResult, worker);
    }

    private void returnTask(Worker worker, Task task) {
        logger.warn("Returning task {} back to conductor", task.getTaskId());
        updateWithRetry(this.updateRetryCount, task, new TaskResult(task), worker);
    }
}
