/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.batch.integration.chunk;

import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.listener.StepExecutionListenerSupport;
import org.springframework.batch.integration.chunk.AsynchronousFailureException;
import org.springframework.batch.integration.chunk.ChunkRequest;
import org.springframework.batch.integration.chunk.ChunkResponse;
import org.springframework.batch.item.ClearFailedException;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.FlushFailedException;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.ExitStatus;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.message.GenericMessage;
import org.springframework.integration.message.Message;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class ChunkMessageChannelItemWriter
extends StepExecutionListenerSupport
implements ItemWriter,
ItemStream {
    private static final Log logger = LogFactory.getLog(ChunkMessageChannelItemWriter.class);
    private static final String ITEMS_PROCESSED = ChunkMessageChannelItemWriter.class.getName() + ".ITEMS_PROCESSED";
    static final String ACTUAL = "ACTUAL";
    static final String EXPECTED = "EXPECTED";
    private static final long DEFAULT_THROTTLE_LIMIT = 6L;
    private MessageChannel requestChannel;
    private MessageChannel replyChannel;
    private LocalState localState = new LocalState();
    private long throttleLimit = 6L;

    public void setThrottleLimit(long throttleLimit) {
        this.throttleLimit = throttleLimit;
    }

    public void setReplyChannel(MessageChannel replyChannel) {
        this.replyChannel = replyChannel;
    }

    public void setRequestChannel(MessageChannel requestChannel) {
        this.requestChannel = requestChannel;
    }

    public void write(Object item) throws Exception {
        this.bindTransactionResources();
        this.getProcessed().add(item);
        logger.debug((Object)("Added item to chunk: " + item));
    }

    public void flush() throws FlushFailedException {
        this.bindTransactionResources();
        while (this.localState.getExpecting() > this.throttleLimit) {
            this.getNextResult(100L);
        }
        List<Object> processed = this.getProcessed();
        if (!processed.isEmpty()) {
            logger.debug((Object)("Dispatching chunk: " + processed));
            ChunkRequest request = new ChunkRequest(processed, this.localState.getJobId(), this.localState.getSkipCount());
            GenericMessage message = new GenericMessage((Object)request);
            this.requestChannel.send((Message)message);
            this.localState.expected++;
        }
        this.getNextResult(1L);
        this.unbindTransactionResources();
    }

    public void beforeStep(StepExecution stepExecution) {
        this.localState.setStepExecution(stepExecution);
    }

    public ExitStatus afterStep(StepExecution stepExecution) {
        boolean timedOut;
        if (stepExecution.getStatus() != BatchStatus.COMPLETED) {
            return ExitStatus.CONTINUABLE;
        }
        long expecting = this.localState.getExpecting();
        try {
            logger.debug((Object)"Waiting for results in step listener...");
            timedOut = !this.waitForResults();
            logger.debug((Object)"Finished waiting for results in step listener.");
        }
        catch (RuntimeException e) {
            logger.debug((Object)"Detected failure waiting for results in step listener.");
            stepExecution.setStatus(BatchStatus.FAILED);
            return ExitStatus.FAILED.addExitDescription(e.getClass().getName() + ": " + e.getMessage());
        }
        if (timedOut) {
            stepExecution.setStatus(BatchStatus.FAILED);
            throw new ItemStreamException("Timed out waiting for back log at end of step");
        }
        return ExitStatus.FINISHED.addExitDescription("Waited for " + expecting + " results.");
    }

    public void close(ExecutionContext executionContext) throws ItemStreamException {
        this.localState.reset();
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if (executionContext.containsKey(EXPECTED)) {
            this.localState.expected = executionContext.getLong(EXPECTED);
            this.localState.actual = executionContext.getLong(ACTUAL);
            if (!this.waitForResults()) {
                throw new ItemStreamException("Timed out waiting for back log on open");
            }
        }
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.putLong(EXPECTED, this.localState.expected);
        executionContext.putLong(ACTUAL, this.localState.actual);
    }

    private boolean waitForResults() {
        int count = 0;
        int maxCount = 40;
        while (this.localState.getExpecting() > 0L && count++ < maxCount) {
            this.getNextResult(100L);
        }
        return count < maxCount;
    }

    private void getNextResult(long timeout) {
        Message message = this.replyChannel.receive(timeout);
        if (message != null) {
            ChunkResponse payload = (ChunkResponse)message.getPayload();
            Long jobInstanceId = payload.getJobId();
            Assert.state((jobInstanceId != null ? 1 : 0) != 0, (String)"Message did not contain job instance id.");
            Assert.state((boolean)jobInstanceId.equals(this.localState.getJobId()), (String)("Message contained wrong job instance id [" + jobInstanceId + "] should have been [" + this.localState.getJobId() + "]."));
            this.localState.actual++;
            ExitStatus result = payload.getExitStatus();
            if (!result.isContinuable()) {
                throw new AsynchronousFailureException("Failure or early completion detected in handler: " + result);
            }
        }
    }

    private List<Object> getProcessed() {
        Assert.state((boolean)TransactionSynchronizationManager.hasResource((Object)ITEMS_PROCESSED), (String)"Processed items not bound to transaction.");
        List processed = (List)TransactionSynchronizationManager.getResource((Object)ITEMS_PROCESSED);
        return processed;
    }

    private void bindTransactionResources() {
        if (TransactionSynchronizationManager.hasResource((Object)ITEMS_PROCESSED)) {
            return;
        }
        TransactionSynchronizationManager.bindResource((Object)ITEMS_PROCESSED, new ArrayList());
    }

    private void unbindTransactionResources() {
        if (!TransactionSynchronizationManager.hasResource((Object)ITEMS_PROCESSED)) {
            return;
        }
        TransactionSynchronizationManager.unbindResource((Object)ITEMS_PROCESSED);
    }

    public void clear() throws ClearFailedException {
        this.unbindTransactionResources();
    }

    private static class LocalState {
        private long actual;
        private long expected;
        private StepExecution stepExecution;

        private LocalState() {
        }

        public long getExpecting() {
            return this.expected - this.actual;
        }

        public int getSkipCount() {
            return this.stepExecution.getSkipCount();
        }

        public Long getJobId() {
            return this.stepExecution.getJobExecution().getJobId();
        }

        public void setStepExecution(StepExecution stepExecution) {
            this.stepExecution = stepExecution;
        }

        public void reset() {
            this.actual = 0L;
            this.expected = 0L;
        }
    }
}

