/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.batch.integration.job;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobInterruptedException;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.UnexpectedJobExecutionException;
import org.springframework.batch.core.step.AbstractStep;
import org.springframework.batch.integration.job.JobExecutionRequest;
import org.springframework.batch.integration.job.StepExecutionTimeoutException;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.repeat.ExitStatus;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.message.GenericMessage;
import org.springframework.integration.message.Message;
import org.springframework.util.Assert;

public class MessageOrientedStep
extends AbstractStep {
    public static final String WAITING = MessageOrientedStep.class.getName() + ".WAITING";
    private MessageChannel requestChannel;
    private MessageChannel replyChannel;
    private static int MINUTE = 60000;
    private long executionTimeout = 30 * MINUTE;
    private long pollingInterval = 5L;

    public void setExecutionTimeoutMinutes(int executionTimeoutMinutes) {
        this.executionTimeout = executionTimeoutMinutes * MINUTE;
    }

    public void setExecutionTimeout(long executionTimeout) {
        this.executionTimeout = executionTimeout;
    }

    public void setPollingInterval(long pollingInterval) {
        this.pollingInterval = pollingInterval;
    }

    @Required
    public void setRequestChannel(MessageChannel requestChannel) {
        this.requestChannel = requestChannel;
    }

    @Required
    public void setReplyChannel(MessageChannel replyChannel) {
        this.replyChannel = replyChannel;
    }

    public ExitStatus doExecute(StepExecution stepExecution) throws JobInterruptedException, UnexpectedJobExecutionException {
        JobExecutionRequest request = new JobExecutionRequest(stepExecution.getJobExecution());
        ExecutionContext executionContext = stepExecution.getExecutionContext();
        if (executionContext.containsKey(WAITING)) {
            this.waitForReply(request.getJobId());
        } else {
            executionContext.putString(WAITING, "true");
            this.getJobRepository().saveOrUpdate(stepExecution);
            this.requestChannel.send((Message)new GenericMessage((Object)request));
            this.waitForReply(request.getJobId());
        }
        return ExitStatus.FINISHED;
    }

    protected void open(ExecutionContext ctx) throws Exception {
    }

    protected void close(ExecutionContext ctx) throws Exception {
    }

    private void waitForReply(Long expectedJobId) {
        long timeout = this.pollingInterval;
        long maxCount = this.executionTimeout / timeout;
        long count = 0L;
        while (count++ < maxCount) {
            Message message = this.replyChannel.receive(timeout);
            if (message == null) continue;
            JobExecutionRequest payload = (JobExecutionRequest)message.getPayload();
            Long jobInstanceId = payload.getJobId();
            Assert.state((jobInstanceId != null ? 1 : 0) != 0, (String)"Message did not contain job instance id.");
            Assert.state((boolean)jobInstanceId.equals(expectedJobId), (String)("Message contained wrong job instance id [" + jobInstanceId + "] should have been [" + expectedJobId + "]."));
            if (payload.getStatus() == BatchStatus.COMPLETED) break;
            if (!payload.hasErrors()) continue;
            MessageOrientedStep.rethrow(payload.getLastThrowable());
        }
        if (count >= maxCount) {
            throw new StepExecutionTimeoutException("Timed out waiting for steps to execute.");
        }
    }

    private static void rethrow(Throwable t) throws RuntimeException {
        if (t instanceof RuntimeException) {
            throw (RuntimeException)t;
        }
        if (t instanceof Exception) {
            throw new UnexpectedJobExecutionException("Unexpected checked exception thrown by step.", t);
        }
        throw (Error)t;
    }
}

