package com.mulesoft.mule.runtime.module.batch.internal.engine.buffer;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.mulesoft.mule.runtime.module.batch.api.record.Record;
import com.mulesoft.mule.runtime.module.batch.engine.BatchEngine;
import com.mulesoft.mule.runtime.module.batch.engine.BatchJobInstanceAdapter;
import com.mulesoft.mule.runtime.module.batch.engine.BatchStepAdapter;
import com.mulesoft.mule.runtime.module.batch.engine.queue.BatchQueueDelegate;
import com.mulesoft.mule.runtime.module.batch.engine.queue.BatchQueueManager;
import com.mulesoft.mule.runtime.module.batch.engine.transaction.BatchTransactionContext;
import com.mulesoft.mule.runtime.module.batch.internal.engine.BatchProcessingTemplate;
import com.mulesoft.mule.runtime.module.batch.internal.engine.DefaultBatchEngine;
import com.mulesoft.mule.runtime.module.batch.internal.engine.transaction.BatchTransactionContextProvider;
import com.mulesoft.mule.runtime.module.batch.internal.engine.transaction.ManagedBatchTransactionContextProvider;
import com.mulesoft.mule.runtime.module.batch.util.BatchUtils;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.notification.NotificationDispatcher;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.internal.profiling.tracing.event.tracer.CoreEventTracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mulesoft/mule/runtime/module/batch/internal/engine/buffer/StreamingAggregatorBuffer.class */
public class StreamingAggregatorBuffer extends AggregatorRecordBuffer {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamingAggregatorBuffer.class);
    private final StreamingAggregatorInputQueueBuffer inputQueueBuffer;
    private final RecordBuffer outputQueueBuffer;
    private final BatchQueueManager queueManager;
    private final Set<BatchJobInstanceAdapter> flushingInstances;
    private final BatchTransactionContextProvider inputContexts;
    private final BatchTransactionContextProvider outputContexts;

    public StreamingAggregatorBuffer(BatchEngine batchEngine, BatchStepAdapter batchStepAdapter, Processor processor, NotificationDispatcher notificationDispatcher, MuleContext muleContext, CoreEventTracer coreEventTracer) {
        super(String.format("batch-step-%s-streaming-aggregator-buffer", batchStepAdapter.getName()), 0, batchEngine, batchStepAdapter, processor, batchStepAdapter.getLocation(), notificationDispatcher, muleContext, coreEventTracer);
        this.flushingInstances = Collections.newSetFromMap(new ConcurrentHashMap());
        this.queueManager = batchEngine.getBatchQueueManager();
        this.inputQueueBuffer = new StreamingAggregatorInputQueueBuffer(batchEngine, batchStepAdapter);
        this.outputQueueBuffer = new StreamingAggregatorOutputQueueBuffer(batchEngine, batchStepAdapter);
        this.inputContexts = newProvider();
        this.outputContexts = newProvider();
    }

    @Override // com.mulesoft.mule.runtime.module.batch.internal.engine.buffer.InMemoryRecordBuffer, com.mulesoft.mule.runtime.module.batch.internal.engine.buffer.RecordBuffer
    public int add(BatchJobInstanceAdapter batchJobInstanceAdapter, BatchTransactionContext batchTransactionContext, Record record) {
        record.getCompletionCallback().incrementConsumers();
        return BufferUtils.addToBufferInSplitTransaction(batchJobInstanceAdapter, this.inputQueueBuffer, batchTransactionContext, this.inputContexts, record);
    }

    @Override // com.mulesoft.mule.runtime.module.batch.internal.engine.buffer.InMemoryRecordBuffer, com.mulesoft.mule.runtime.module.batch.internal.engine.buffer.RecordBuffer
    public void forget(BatchJobInstanceAdapter batchJobInstanceAdapter) {
        this.inputQueueBuffer.flushAndForget(batchJobInstanceAdapter);
    }

    @Override // com.mulesoft.mule.runtime.module.batch.internal.engine.buffer.InMemoryRecordBuffer, com.mulesoft.mule.runtime.module.batch.internal.engine.buffer.RecordBuffer
    public void flushAndForget(BatchJobInstanceAdapter batchJobInstanceAdapter) {
        flush(batchJobInstanceAdapter);
    }

    @Override // com.mulesoft.mule.runtime.module.batch.internal.engine.buffer.InMemoryRecordBuffer, com.mulesoft.mule.runtime.module.batch.internal.engine.buffer.RecordBuffer
    public void flush(BatchJobInstanceAdapter batchJobInstanceAdapter) {
        if (this.flushingInstances.add(batchJobInstanceAdapter)) {
            try {
                forget(batchJobInstanceAdapter);
                doFlush(batchJobInstanceAdapter, ArrayListMultimap.create());
            } finally {
                this.flushingInstances.remove(batchJobInstanceAdapter);
            }
        }
    }

    @Override // com.mulesoft.mule.runtime.module.batch.internal.engine.buffer.InMemoryRecordBuffer, com.mulesoft.mule.runtime.module.batch.internal.engine.buffer.RecordBuffer
    public synchronized long size(BatchJobInstanceAdapter batchJobInstanceAdapter) {
        return this.inputQueueBuffer.size(batchJobInstanceAdapter) + this.inputQueueBuffer.getFlushedRecordCount(batchJobInstanceAdapter);
    }

    @Override // com.mulesoft.mule.runtime.module.batch.internal.engine.buffer.AggregatorRecordBuffer
    protected CoreEvent configureChainEvent(CoreEvent coreEvent, BatchJobInstanceAdapter batchJobInstanceAdapter, Multimap<BatchTransactionContext, Record> multimap) throws MuleException {
        BatchTransactionContext batchTransactionContext = this.inputContexts.get(batchJobInstanceAdapter);
        StreamingAggregatorIteratorWrapper streamingAggregatorIteratorWrapper = new StreamingAggregatorIteratorWrapper(this.queueManager.streamingAggregatorInputQueue(batchJobInstanceAdapter, this.step).iterator(batchTransactionContext), this.outputContexts, batchTransactionContext, batchJobInstanceAdapter, this.outputQueueBuffer, this.batchEngine.getBlockSize(batchJobInstanceAdapter));
        if (streamingAggregatorIteratorWrapper.hasNext()) {
            return CoreEvent.builder(coreEvent).message(Message.builder(coreEvent.getMessage()).value(streamingAggregatorIteratorWrapper).build()).build();
        }
        streamingAggregatorIteratorWrapper.commit();
        streamingAggregatorIteratorWrapper.close();
        return null;
    }

    private void flushOutputBuffer(BatchJobInstanceAdapter batchJobInstanceAdapter) {
        this.outputQueueBuffer.flushAndForget(batchJobInstanceAdapter);
    }

    private void backToSteppingQueue(BatchJobInstanceAdapter batchJobInstanceAdapter, Exception exc) throws MuleException {
        BatchQueueDelegate streamingAggregatorOutputQueue = this.queueManager.streamingAggregatorOutputQueue(batchJobInstanceAdapter, this.step);
        BatchTransactionContext batchTransactionContext = this.outputContexts.get(batchJobInstanceAdapter);
        while (streamingAggregatorOutputQueue.size(batchTransactionContext) > 0) {
            try {
                updateStatisticsAndRoute(batchTransactionContext, batchJobInstanceAdapter, streamingAggregatorOutputQueue.poll(batchTransactionContext), exc);
                batchTransactionContext.beginTransaction();
            } catch (Exception e) {
                BatchUtils.rollback(batchTransactionContext);
                throw e;
            }
        }
        BatchUtils.commit(batchTransactionContext);
    }

    private void updateStatisticsAndRoute(BatchTransactionContext batchTransactionContext, BatchJobInstanceAdapter batchJobInstanceAdapter, List<Record> list, Exception exc) throws MuleException {
        if (exc != null) {
            markError(batchJobInstanceAdapter, list, exc);
        }
        this.batchEngine.updateStatisticsAndRoute(batchTransactionContext, list);
        list.clear();
        BatchUtils.commit(batchTransactionContext);
    }

    @Override // com.mulesoft.mule.runtime.module.batch.internal.engine.buffer.AggregatorRecordBuffer
    protected BatchProcessingTemplate makeProcessingTemplate(Multimap<BatchTransactionContext, Record> multimap, Processor processor, MuleContext muleContext) {
        return new BatchProcessingTemplate(processor, getLocation(), muleContext.getFlowTraceManager(), muleContext.getStreamCloserService()) { // from class: com.mulesoft.mule.runtime.module.batch.internal.engine.buffer.StreamingAggregatorBuffer.1
            private StreamingAggregatorIteratorWrapper streamingIterator;

            @Override // com.mulesoft.mule.runtime.module.batch.internal.engine.BatchProcessingTemplate
            public CoreEvent process(BatchJobInstanceAdapter batchJobInstanceAdapter, CoreEvent coreEvent) throws MuleException {
                this.streamingIterator = (StreamingAggregatorIteratorWrapper) coreEvent.getMessage().getPayload().getValue();
                return super.process(batchJobInstanceAdapter, coreEvent);
            }

            @Override // com.mulesoft.mule.runtime.module.batch.internal.engine.BatchProcessingTemplate
            protected void onSuccess(BatchJobInstanceAdapter batchJobInstanceAdapter, CoreEvent coreEvent) throws MuleException {
                StreamingAggregatorBuffer.this.route(batchJobInstanceAdapter, this.streamingIterator, null);
                this.streamingIterator.commit();
            }

            @Override // com.mulesoft.mule.runtime.module.batch.internal.engine.BatchProcessingTemplate
            protected void onException(BatchJobInstanceAdapter batchJobInstanceAdapter, Exception exc, CoreEvent coreEvent) throws MuleException {
                if (StreamingAggregatorBuffer.LOGGER.isDebugEnabled()) {
                    StreamingAggregatorBuffer.LOGGER.debug(String.format("Exception was found processing streaming aggregator on step %s for instance %s of job %s", StreamingAggregatorBuffer.this.step.getName(), batchJobInstanceAdapter.getId(), batchJobInstanceAdapter.getOwnerJobName()));
                    StreamingAggregatorBuffer.LOGGER.debug(DefaultBatchEngine.buildExceptionLogMessage(exc));
                }
                StreamingAggregatorBuffer.this.route(batchJobInstanceAdapter, this.streamingIterator, exc);
                this.streamingIterator.rollback();
            }
        };
    }

    private void consumePayload(BatchJobInstanceAdapter batchJobInstanceAdapter, StreamingAggregatorIteratorWrapper streamingAggregatorIteratorWrapper) {
        while (streamingAggregatorIteratorWrapper.hasNext()) {
            streamingAggregatorIteratorWrapper.next();
        }
        Record previous = streamingAggregatorIteratorWrapper.getPrevious();
        if (previous != null) {
            BufferUtils.addToBufferInSplitTransaction(batchJobInstanceAdapter, this.outputQueueBuffer, null, this.outputContexts, previous);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void route(BatchJobInstanceAdapter batchJobInstanceAdapter, StreamingAggregatorIteratorWrapper streamingAggregatorIteratorWrapper, Exception exc) throws MuleException {
        consumePayload(batchJobInstanceAdapter, streamingAggregatorIteratorWrapper);
        flushOutputBuffer(batchJobInstanceAdapter);
        backToSteppingQueue(batchJobInstanceAdapter, exc);
    }

    private BatchTransactionContextProvider newProvider() {
        return new ManagedBatchTransactionContextProvider(this.batchEngine, true);
    }
}
