package org.mule.extension.sqs.internal.source;

import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import java.util.Map;
import org.mule.extension.sqs.internal.connection.SQSConnection;
import org.mule.extension.sqs.internal.operation.SQSModelFactory;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/mule/extension/sqs/internal/source/MessageReceiver.class */
public class MessageReceiver implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(MessageReceiver.class);
    private volatile boolean running = true;
    private final SourceCallback<String, Map<String, Object>> sourceCallback;
    private final SQSConnection connection;
    private final int visibilityTimeout;
    private final boolean preserveMessages;
    private final int numberOfMessages;
    private final String queueUrl;
    private final int waitTime;
    private final long pollingPeriodMilliseconds;

    public MessageReceiver(SourceCallback<String, Map<String, Object>> sourceCallback, SQSConnection sQSConnection, int i, boolean z, int i2, String str, int i3, long j) {
        this.sourceCallback = sourceCallback;
        this.connection = sQSConnection;
        this.visibilityTimeout = i;
        this.preserveMessages = z;
        this.numberOfMessages = i2;
        this.queueUrl = str;
        this.waitTime = i3;
        this.pollingPeriodMilliseconds = j;
    }

    @Override // java.lang.Runnable
    public void run() {
        SourceCallbackContext createContext = this.sourceCallback.createContext();
        ReceiveMessageRequest withWaitTimeSeconds = new ReceiveMessageRequest().withAttributeNames(new String[]{"All"}).withMessageAttributeNames(new String[]{"All"}).withQueueUrl(this.queueUrl).withMaxNumberOfMessages(Integer.valueOf(this.numberOfMessages)).withVisibilityTimeout(Integer.valueOf(this.visibilityTimeout)).withWaitTimeSeconds(Integer.valueOf(this.waitTime));
        while (this.running) {
            try {
                logger.debug("Handling received message from queue: {} ", this.queueUrl);
                handleRequest(withWaitTimeSeconds, createContext);
                if (this.pollingPeriodMilliseconds > 0) {
                    Thread.sleep(this.pollingPeriodMilliseconds);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.sourceCallback.onConnectionException(new ConnectionException(e, this.connection));
                return;
            } catch (Exception e2) {
                logger.error("Handle message request failed with exception: ", e2);
                this.sourceCallback.onConnectionException(new ConnectionException(e2, this.connection));
                return;
            }
        }
    }

    public void stop() {
        synchronized (this) {
            this.running = false;
        }
    }

    private void handleRequest(ReceiveMessageRequest receiveMessageRequest, SourceCallbackContext sourceCallbackContext) {
        this.connection.receiveMessage(receiveMessageRequest).getMessages().forEach(message -> {
            this.sourceCallback.handle(Result.builder().output(message.getBody()).attributes(SQSModelFactory.wrapMessageAttributes(message)).build(), sourceCallbackContext);
            if (this.preserveMessages) {
                return;
            }
            this.connection.deleteMessage(new DeleteMessageRequest(this.queueUrl, message.getReceiptHandle()));
        });
    }
}
