package com.github.kagkarlsson.scheduler;

import com.github.kagkarlsson.scheduler.PollingStrategyConfig;
import com.github.kagkarlsson.scheduler.SchedulerClient;
import com.github.kagkarlsson.scheduler.SchedulerState;
import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger;
import com.github.kagkarlsson.scheduler.logging.LogLevel;
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
import com.github.kagkarlsson.scheduler.task.Execution;
import com.github.kagkarlsson.scheduler.task.ExecutionComplete;
import com.github.kagkarlsson.scheduler.task.ExecutionOperations;
import com.github.kagkarlsson.scheduler.task.OnStartup;
import com.github.kagkarlsson.scheduler.task.SchedulableInstance;
import com.github.kagkarlsson.scheduler.task.Task;
import com.github.kagkarlsson.scheduler.task.TaskInstance;
import com.github.kagkarlsson.scheduler.task.TaskInstanceId;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/kagkarlsson/scheduler/Scheduler.class */
public class Scheduler implements SchedulerClient {
    public static final double TRIGGER_NEXT_BATCH_WHEN_AVAILABLE_THREADS_RATIO = 0.5d;
    public static final String THREAD_PREFIX = "db-scheduler";
    private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
    private final SchedulerClient delegate;
    final Clock clock;
    final TaskRepository schedulerTaskRepository;
    final TaskResolver taskResolver;
    protected final PollStrategy executeDueStrategy;
    protected final Executor executor;
    private final ScheduledExecutorService housekeeperExecutor;
    private final HeartbeatConfig heartbeatConfig;
    private final int numberOfMissedHeartbeatsBeforeDead;
    int threadpoolSize;
    private final Waiter executeDueWaiter;
    private final Duration deleteUnresolvedAfter;
    private final Duration shutdownMaxWait;
    protected final List<OnStartup> onStartup;
    private final Waiter detectDeadWaiter;
    private final Duration heartbeatInterval;
    final StatsRegistry statsRegistry;
    private final ExecutorService dueExecutor;
    private final Waiter heartbeatWaiter;
    final SchedulerState.SettableSchedulerState schedulerState = new SchedulerState.SettableSchedulerState();
    final ConfigurableLogger failureLogger;
    private SchedulerClientEventListener earlyExecutionListener;

    /* JADX INFO: Access modifiers changed from: protected */
    public Scheduler(Clock clock, TaskRepository taskRepository, TaskRepository taskRepository2, TaskResolver taskResolver, int i, ExecutorService executorService, SchedulerName schedulerName, Waiter waiter, Duration duration, int i2, boolean z, StatsRegistry statsRegistry, PollingStrategyConfig pollingStrategyConfig, Duration duration2, Duration duration3, LogLevel logLevel, boolean z2, List<OnStartup> list, ExecutorService executorService2, ScheduledExecutorService scheduledExecutorService) {
        this.clock = clock;
        this.schedulerTaskRepository = taskRepository;
        this.taskResolver = taskResolver;
        this.threadpoolSize = i;
        this.executor = new Executor(executorService, clock);
        this.executeDueWaiter = waiter;
        this.deleteUnresolvedAfter = duration2;
        this.shutdownMaxWait = duration3;
        this.onStartup = list;
        this.detectDeadWaiter = new Waiter(duration.multipliedBy(2L), clock);
        this.heartbeatInterval = duration;
        this.numberOfMissedHeartbeatsBeforeDead = i2;
        this.heartbeatWaiter = new Waiter(duration, clock);
        this.heartbeatConfig = new HeartbeatConfig(duration, i2, getMaxAgeBeforeConsideredDead());
        this.statsRegistry = statsRegistry;
        this.dueExecutor = executorService2;
        this.housekeeperExecutor = scheduledExecutorService;
        this.earlyExecutionListener = z ? new TriggerCheckForDueExecutions(this.schedulerState, clock, waiter) : SchedulerClientEventListener.NOOP;
        this.delegate = new SchedulerClient.StandardSchedulerClient(taskRepository2, this.earlyExecutionListener, clock);
        this.failureLogger = ConfigurableLogger.create(LOG, logLevel, z2);
        if (pollingStrategyConfig.type == PollingStrategyConfig.Type.LOCK_AND_FETCH) {
            taskRepository.verifySupportsLockAndFetch();
            this.executeDueStrategy = new LockAndFetchCandidates(this.executor, taskRepository, this, this.earlyExecutionListener, i, statsRegistry, this.schedulerState, this.failureLogger, taskResolver, clock, pollingStrategyConfig, this::triggerCheckForDueExecutions, this.heartbeatConfig);
        } else {
            if (pollingStrategyConfig.type != PollingStrategyConfig.Type.FETCH) {
                throw new IllegalArgumentException("Unknown polling-strategy type: " + pollingStrategyConfig.type);
            }
            this.executeDueStrategy = new FetchCandidates(this.executor, taskRepository, this, this.earlyExecutionListener, i, statsRegistry, this.schedulerState, this.failureLogger, taskResolver, clock, pollingStrategyConfig, this::triggerCheckForDueExecutions, this.heartbeatConfig);
        }
        LOG.info("Using polling-strategy: " + pollingStrategyConfig.describe());
    }

    public void start() {
        LOG.info("Starting scheduler.");
        executeOnStartup();
        this.dueExecutor.submit(new RunUntilShutdown(this.executeDueStrategy, this.executeDueWaiter, this.schedulerState, this.statsRegistry));
        this.housekeeperExecutor.scheduleWithFixedDelay(new RunAndLogErrors(this::detectDeadExecutions, this.statsRegistry), 0L, this.detectDeadWaiter.getWaitDuration().toMillis(), TimeUnit.MILLISECONDS);
        this.housekeeperExecutor.scheduleWithFixedDelay(new RunAndLogErrors(this::updateHeartbeats, this.statsRegistry), 0L, this.heartbeatWaiter.getWaitDuration().toMillis(), TimeUnit.MILLISECONDS);
        this.schedulerState.setStarted();
    }

    protected void executeDue() {
        this.executeDueStrategy.run();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void executeOnStartup() {
        SchedulerClient.StandardSchedulerClient standardSchedulerClient = new SchedulerClient.StandardSchedulerClient(this.schedulerTaskRepository, this.clock);
        this.onStartup.forEach(onStartup -> {
            try {
                onStartup.onStartup(standardSchedulerClient, this.clock);
            } catch (Exception e) {
                LOG.error("Unexpected error while executing OnStartup tasks. Continuing.", e);
                this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
            }
        });
    }

    public void stop() {
        stop(Duration.ofSeconds(1L), Duration.ofSeconds(5L));
    }

    void stop(Duration duration, Duration duration2) {
        if (this.schedulerState.isShuttingDown()) {
            LOG.warn("Multiple calls to 'stop()'. Scheduler is already stopping.");
            return;
        }
        this.schedulerState.setIsShuttingDown();
        LOG.info("Shutting down Scheduler.");
        if (this.executeDueWaiter.isWaiting()) {
            this.dueExecutor.shutdownNow();
            if (!ExecutorUtils.awaitTermination(this.dueExecutor, duration2)) {
                LOG.warn("Failed to shutdown due-executor properly.");
            }
        } else if (!ExecutorUtils.shutdownAndAwaitTermination(this.dueExecutor, duration, duration2)) {
            LOG.warn("Failed to shutdown due-executor properly.");
        }
        this.executor.stop(this.shutdownMaxWait);
        if (ExecutorUtils.shutdownAndAwaitTermination(this.housekeeperExecutor, duration, duration2)) {
            return;
        }
        LOG.warn("Failed to shutdown housekeeper-executor properly.");
    }

    public void pause() {
        LOG.info("Pausing scheduler.");
        this.schedulerState.setPaused(true);
    }

    public void resume() {
        LOG.info("Resuming scheduler.");
        this.schedulerState.setPaused(false);
    }

    public SchedulerState getSchedulerState() {
        return this.schedulerState;
    }

    @Override // com.github.kagkarlsson.scheduler.SchedulerClient
    public <T> void schedule(SchedulableInstance<T> schedulableInstance) {
        this.delegate.schedule(schedulableInstance);
    }

    @Override // com.github.kagkarlsson.scheduler.SchedulerClient
    public <T> void schedule(TaskInstance<T> taskInstance, Instant instant) {
        this.delegate.schedule(taskInstance, instant);
    }

    @Override // com.github.kagkarlsson.scheduler.SchedulerClient
    public void reschedule(TaskInstanceId taskInstanceId, Instant instant) {
        this.delegate.reschedule(taskInstanceId, instant);
    }

    @Override // com.github.kagkarlsson.scheduler.SchedulerClient
    public <T> void reschedule(SchedulableInstance<T> schedulableInstance) {
        this.delegate.reschedule(schedulableInstance);
    }

    @Override // com.github.kagkarlsson.scheduler.SchedulerClient
    public <T> void reschedule(TaskInstanceId taskInstanceId, Instant instant, T t) {
        this.delegate.reschedule(taskInstanceId, instant, t);
    }

    @Override // com.github.kagkarlsson.scheduler.SchedulerClient
    public void cancel(TaskInstanceId taskInstanceId) {
        this.delegate.cancel(taskInstanceId);
    }

    @Override // com.github.kagkarlsson.scheduler.SchedulerClient
    public void fetchScheduledExecutions(Consumer<ScheduledExecution<Object>> consumer) {
        this.delegate.fetchScheduledExecutions(consumer);
    }

    @Override // com.github.kagkarlsson.scheduler.SchedulerClient
    public void fetchScheduledExecutions(ScheduledExecutionsFilter scheduledExecutionsFilter, Consumer<ScheduledExecution<Object>> consumer) {
        this.delegate.fetchScheduledExecutions(scheduledExecutionsFilter, consumer);
    }

    @Override // com.github.kagkarlsson.scheduler.SchedulerClient
    public <T> void fetchScheduledExecutionsForTask(String str, Class<T> cls, Consumer<ScheduledExecution<T>> consumer) {
        this.delegate.fetchScheduledExecutionsForTask(str, cls, consumer);
    }

    @Override // com.github.kagkarlsson.scheduler.SchedulerClient
    public <T> void fetchScheduledExecutionsForTask(String str, Class<T> cls, ScheduledExecutionsFilter scheduledExecutionsFilter, Consumer<ScheduledExecution<T>> consumer) {
        this.delegate.fetchScheduledExecutionsForTask(str, cls, scheduledExecutionsFilter, consumer);
    }

    @Override // com.github.kagkarlsson.scheduler.SchedulerClient
    public Optional<ScheduledExecution<Object>> getScheduledExecution(TaskInstanceId taskInstanceId) {
        return this.delegate.getScheduledExecution(taskInstanceId);
    }

    public List<Execution> getFailingExecutions(Duration duration) {
        return this.schedulerTaskRepository.getExecutionsFailingLongerThan(duration);
    }

    public void triggerCheckForDueExecutions() {
        this.executeDueWaiter.wakeOrSkipNextWait();
    }

    public List<CurrentlyExecuting> getCurrentlyExecuting() {
        return this.executor.getCurrentlyExecuting();
    }

    public List<CurrentlyExecuting> getCurrentlyExecutingWithStaleHeartbeat() {
        return (List) this.executor.getCurrentlyExecuting().stream().filter(currentlyExecuting -> {
            return currentlyExecuting.getHeartbeatState().hasStaleHeartbeat();
        }).collect(Collectors.toList());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void detectDeadExecutions() {
        LOG.debug("Deleting executions with unresolved tasks.");
        this.taskResolver.getUnresolvedTaskNames(this.deleteUnresolvedAfter).forEach(str -> {
            LOG.warn("Deleting all executions for task with name '{}'. They have been unresolved for more than {}", str, this.deleteUnresolvedAfter);
            LOG.info("Removed {} executions", Integer.valueOf(this.schedulerTaskRepository.removeExecutions(str)));
            this.taskResolver.clearUnresolved(str);
        });
        LOG.debug("Checking for dead executions.");
        Instant now = this.clock.now();
        List<Execution> deadExecutions = this.schedulerTaskRepository.getDeadExecutions(now.minus((TemporalAmount) getMaxAgeBeforeConsideredDead()));
        if (deadExecutions.isEmpty()) {
            LOG.trace("No dead executions found.");
        } else {
            deadExecutions.forEach(execution -> {
                LOG.info("Found dead execution. Delegating handling to task. Execution: " + execution);
                try {
                    Optional<Task> resolve = this.taskResolver.resolve(execution.taskInstance.getTaskName());
                    if (resolve.isPresent()) {
                        this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.DEAD_EXECUTION);
                        resolve.get().getDeadExecutionHandler().deadExecution(ExecutionComplete.failure(execution, now, now, null), new ExecutionOperations(this.schedulerTaskRepository, this.earlyExecutionListener, execution));
                    } else {
                        LOG.error("Failed to find implementation for task with name '{}' for detected dead execution. Either delete the execution from the databaser, or add an implementation for it.", execution.taskInstance.getTaskName());
                    }
                } catch (Throwable th) {
                    LOG.error("Failed while handling dead execution {}. Will be tried again later.", execution, th);
                    this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
                }
            });
        }
        this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.RAN_DETECT_DEAD);
    }

    void updateHeartbeats() {
        List<CurrentlyExecuting> currentlyExecuting = this.executor.getCurrentlyExecuting();
        if (currentlyExecuting.isEmpty()) {
            LOG.trace("No executions to update heartbeats for. Skipping.");
            return;
        }
        LOG.debug("Updating heartbeats for {} executions being processed.", Integer.valueOf(currentlyExecuting.size()));
        Instant now = this.clock.now();
        currentlyExecuting.forEach(currentlyExecuting2 -> {
            updateHeartbeatForExecution(now, currentlyExecuting2);
        });
        this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.RAN_UPDATE_HEARTBEATS);
    }

    protected void updateHeartbeatForExecution(Instant instant, CurrentlyExecuting currentlyExecuting) {
        Execution execution = currentlyExecuting.getExecution();
        LOG.trace("Updating heartbeat for execution: " + execution);
        try {
            boolean updateHeartbeatWithRetry = this.schedulerTaskRepository.updateHeartbeatWithRetry(execution, instant, 3);
            currentlyExecuting.heartbeat(updateHeartbeatWithRetry, instant);
            if (!updateHeartbeatWithRetry) {
                this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.FAILED_HEARTBEAT);
            }
            HeartbeatState heartbeatState = currentlyExecuting.getHeartbeatState();
            if (heartbeatState.getFailedHeartbeats() > 1) {
                LOG.warn("Execution has more than 1 failed heartbeats. Should not happen. Risk of being considered dead. See heartbeat-state. Heartbeat-state={}, Execution={}", heartbeatState.describe(), execution);
                this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.FAILED_MULTIPLE_HEARTBEATS);
            }
        } catch (Throwable th) {
            LOG.error("Unexpteced failure while while updating heartbeat for execution {}.", execution, th);
            this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.FAILED_HEARTBEAT);
            this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
        }
    }

    Duration getMaxAgeBeforeConsideredDead() {
        return this.heartbeatInterval.multipliedBy(this.numberOfMissedHeartbeatsBeforeDead);
    }

    public static SchedulerBuilder create(DataSource dataSource, Task<?>... taskArr) {
        return create(dataSource, (List<Task<?>>) Arrays.asList(taskArr));
    }

    public static SchedulerBuilder create(DataSource dataSource, List<Task<?>> list) {
        return new SchedulerBuilder(dataSource, list);
    }
}
