package com.mulesoft.mule.runtime.module.batch.internal.engine.queue;

import com.mulesoft.mule.runtime.module.batch.api.record.Record;
import com.mulesoft.mule.runtime.module.batch.engine.BatchJobInstanceAdapter;
import com.mulesoft.mule.runtime.module.batch.engine.buffer.BatchContextTransactionManager;
import com.mulesoft.mule.runtime.module.batch.engine.queue.BatchQueueDelegate;
import com.mulesoft.mule.runtime.module.batch.engine.transaction.BatchTransactionContext;
import com.mulesoft.mule.runtime.module.batch.internal.streaming.SerializationAwareQueueProducer;
import com.mulesoft.mule.runtime.module.batch.util.BatchUtils;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections.CollectionUtils;
import org.mule.runtime.api.exception.DefaultMuleException;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.serialization.ObjectSerializer;
import org.mule.runtime.api.serialization.SerializationException;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.config.QueueProfile;
import org.mule.runtime.core.api.streaming.iterator.ConsumerStreamingIterator;
import org.mule.runtime.core.api.util.queue.Queue;
import org.mule.runtime.core.api.util.queue.QueueManager;
import org.mule.runtime.core.internal.streaming.object.iterator.SimpleConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mulesoft/mule/runtime/module/batch/internal/engine/queue/AbstractBatchQueueDelegate.class */
public abstract class AbstractBatchQueueDelegate implements BatchQueueDelegate {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) AbstractBatchQueueDelegate.class);
    private String name;
    private final long queueTimeout;
    private final QueueManager queueManager;
    private final ObjectSerializer serializer;
    private final QueueProfile queueProfile = QueueProfile.newInstanceWithPersistentQueueStore();
    private final Set<String> configuredQueueNames = Collections.newSetFromMap(new ConcurrentHashMap());
    private final MuleContext muleContext;
    protected final BatchJobInstanceAdapter jobInstance;

    public AbstractBatchQueueDelegate(BatchJobInstanceAdapter batchJobInstanceAdapter, QueueManager queueManager, long j, ObjectSerializer objectSerializer, MuleContext muleContext) {
        this.jobInstance = batchJobInstanceAdapter;
        this.queueManager = queueManager;
        this.queueTimeout = j;
        this.serializer = objectSerializer;
        this.muleContext = muleContext;
    }

    private void assertMuleStarted(BatchTransactionContext batchTransactionContext) {
        if (this.muleContext.isStarted()) {
            return;
        }
        BatchUtils.rollback(batchTransactionContext);
        throw new IllegalStateException(String.format("Mule Context is Stopped or stopping. Execution context '%s' for instance '%s' of job '%s' has been rolled back", batchTransactionContext.getId(), batchTransactionContext.getJobInstance().getId(), batchTransactionContext.getJob().getName()));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v11, types: [byte[], java.io.Serializable] */
    @Override // com.mulesoft.mule.runtime.module.batch.engine.queue.BatchQueueDelegate
    public void dispatch(BatchTransactionContext batchTransactionContext, Collection<Record> collection) throws MuleException {
        assertMuleStarted(batchTransactionContext);
        if (CollectionUtils.isEmpty(collection)) {
            return;
        }
        Queue queue = getQueue(batchTransactionContext);
        try {
            if (!queue.offer(this.serializer.getInternalProtocol().serialize(collection), this.queueTimeout)) {
                throw new DefaultMuleException(I18nMessageFactory.createStaticMessage(String.format("Could not dispatch records to batch queue %s (%d elements already queued)", queue.getName(), Integer.valueOf(queue.size()))));
            }
            if (logger.isDebugEnabled()) {
                logger.debug(String.format("dispatched %d records to Batch Queue %s", Integer.valueOf(collection.size()), batchTransactionContext.getJobInstance().getQueueName()));
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new DefaultMuleException(String.format("Could not dispatch records to batch queue %s due to Thread interruption Exception", batchTransactionContext.getJobInstance().getQueueName()), e);
        } catch (SerializationException e2) {
            throw new DefaultMuleException(String.format("Could not dispatch records to batch queue %s due to Serialization Exception", batchTransactionContext.getJobInstance().getQueueName()), e2);
        }
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.queue.BatchQueueDelegate
    public void dispatchAndCommit(BatchTransactionContext batchTransactionContext, BatchJobInstanceAdapter batchJobInstanceAdapter, List<Record> list, BatchContextTransactionManager batchContextTransactionManager) {
        try {
            dispatch(batchTransactionContext, list);
            batchContextTransactionManager.commit(batchTransactionContext);
        } catch (MuleException e) {
            batchContextTransactionManager.rollback(batchTransactionContext, batchJobInstanceAdapter, list, e);
        }
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.queue.BatchQueueDelegate
    public List<Record> poll(BatchTransactionContext batchTransactionContext) throws MuleException {
        assertMuleStarted(batchTransactionContext);
        Queue queue = getQueue(batchTransactionContext);
        try {
            byte[] bArr = (byte[]) queue.poll(this.queueTimeout);
            if (bArr == null) {
                return null;
            }
            return (List) this.serializer.getInternalProtocol().deserialize(bArr);
        } catch (InterruptedException e) {
            throw new DefaultMuleException(String.format("Thread interrupted while trying to read from queue %s", queue.getName()), e);
        }
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.queue.BatchQueueDelegate
    public long size(BatchTransactionContext batchTransactionContext) throws MuleException {
        assertMuleStarted(batchTransactionContext);
        return getQueue(batchTransactionContext) != null ? r0.size() : 0;
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.queue.BatchQueueDelegate
    public Iterator<List<Record>> iterator(BatchTransactionContext batchTransactionContext) throws MuleException {
        assertMuleStarted(batchTransactionContext);
        return new ConsumerStreamingIterator(new SimpleConsumer(new SerializationAwareQueueProducer(getQueue(batchTransactionContext), this.serializer)));
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.queue.BatchQueueDelegate
    public void dispose(BatchTransactionContext batchTransactionContext) {
        try {
            getQueue(batchTransactionContext).dispose();
        } catch (Exception e) {
            logger.error(String.format("Exception found while trying to dispose queue %s for instance '%s' of job '%s", getQueueName(), batchTransactionContext.getJobInstance().getId(), batchTransactionContext.getJobInstance().getOwnerJobName()), (Throwable) e);
        }
    }

    private Queue getQueue(BatchTransactionContext batchTransactionContext) {
        String queueName = getQueueName();
        if (this.configuredQueueNames.add(queueName)) {
            try {
                this.queueProfile.configureQueue(queueName, this.queueManager);
            } catch (InitialisationException e) {
                throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage("Could not configure queue " + queueName), e);
            }
        }
        return batchTransactionContext.getQueue(queueName);
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.queue.BatchQueueDelegate
    public final String getQueueName() {
        if (this.name == null) {
            this.name = buildQueueName();
        }
        return this.name;
    }

    protected abstract String buildQueueName();
}
