package com.mulesoft.connector.googlepubsub.internal.source;

import com.google.api.core.ApiService;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.util.concurrent.MoreExecutors;
import com.mulesoft.connector.googlepubsub.internal.ResourceConstants;
import com.mulesoft.connector.googlepubsub.internal.config.PubSubConfiguration;
import com.mulesoft.connector.googlepubsub.internal.connection.PubSubConnection;
import com.mulesoft.connector.googlepubsub.internal.connection.provider.FlowControlParameters;
import com.mulesoft.connector.googlepubsub.internal.error.exception.GooglePubSubRuntimeException;
import com.mulesoft.connector.googlepubsub.internal.metadata.MessageListenerMetadataResolver;
import com.mulesoft.connector.googlepubsub.internal.operation.params.SubscriptionIdentifier;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.execution.OnError;
import org.mule.runtime.extension.api.annotation.execution.OnSuccess;
import org.mule.runtime.extension.api.annotation.execution.OnTerminate;
import org.mule.runtime.extension.api.annotation.metadata.MetadataKeyId;
import org.mule.runtime.extension.api.annotation.metadata.MetadataScope;
import org.mule.runtime.extension.api.annotation.param.Config;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.MediaType;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.ParameterGroup;
import org.mule.runtime.extension.api.annotation.param.display.DisplayName;
import org.mule.runtime.extension.api.annotation.param.display.Placement;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.annotation.source.BackPressure;
import org.mule.runtime.extension.api.annotation.source.ClusterSupport;
import org.mule.runtime.extension.api.annotation.source.OnBackPressure;
import org.mule.runtime.extension.api.annotation.source.SourceClusterSupport;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.BackPressureContext;
import org.mule.runtime.extension.api.runtime.source.BackPressureMode;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.mule.runtime.extension.api.runtime.source.SourceCompletionCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ClusterSupport(SourceClusterSupport.DEFAULT_ALL_NODES)
@DisplayName("On message listener")
@MediaType("*/*")
@Alias("message-listener")
@BackPressure(defaultMode = BackPressureMode.WAIT, supportedModes = {BackPressureMode.WAIT, BackPressureMode.DROP, BackPressureMode.FAIL})
@Summary("Listen for messages on selected subscription")
@MetadataScope(outputResolver = MessageListenerMetadataResolver.class, attributesResolver = MessageListenerMetadataResolver.class)
/* loaded from: input_file:com/mulesoft/connector/googlepubsub/internal/source/MessageListenerSource.class */
public class MessageListenerSource extends Source<InputStream, Map<String, String>> {
    private static final Logger logger = LoggerFactory.getLogger(MessageListenerSource.class);
    public static final String CONSUMER = "consumer";

    @Config
    private PubSubConfiguration configuration;

    @Connection
    private ConnectionProvider<PubSubConnection> connectionProvider;
    private PubSubConnection pubSubConnection;
    private Subscriber subscriber;

    @MetadataKeyId
    @ParameterGroup(name = "identifier")
    private SubscriptionIdentifier identifier;

    @Optional(defaultValue = "5")
    @Parameter
    @Summary("Provides specified amount of executor service for processing messages.")
    @Placement(tab = "Advanced")
    @DisplayName("Consumer count")
    private int consumerCount;

    @ParameterGroup(name = "Flow control parameters")
    private FlowControlParameters flowControlParameters;

    public void onStart(final SourceCallback<InputStream, Map<String, String>> sourceCallback) throws MuleException {
        this.pubSubConnection = (PubSubConnection) this.connectionProvider.connect();
        this.subscriber = initSubscriber(sourceCallback);
        this.subscriber.addListener(new ApiService.Listener() { // from class: com.mulesoft.connector.googlepubsub.internal.source.MessageListenerSource.1
            public void failed(ApiService.State state, Throwable th) {
                if (state == ApiService.State.STARTING) {
                    throw new GooglePubSubRuntimeException(th);
                }
                sourceCallback.onConnectionException(new ConnectionException(th));
            }
        }, MoreExecutors.directExecutor());
        this.subscriber.startAsync().awaitRunning();
        logger.debug("Listening for messages on: {}", this.identifier.getSubscriptionName());
    }

    public void onStop() {
        logger.debug("Stopping async subscriber for subscription: {}", this.identifier.getSubscriptionName());
        if (this.subscriber != null) {
            this.subscriber.stopAsync();
        }
    }

    @OnError
    public void onError(SourceCallbackContext sourceCallbackContext) {
        logger.debug("Error processing message. Message will be NACK-ed");
        try {
            ((AckReplyConsumer) sourceCallbackContext.getVariable(CONSUMER).orElseThrow(() -> {
                return new IllegalStateException("Consumer for message NACK not found!");
            })).nack();
        } catch (Exception e) {
            logger.warn("An error {} occurred during the automatic not-acknowledgement of the current message. \n Continuing...", e.getMessage());
        }
    }

    @OnSuccess
    public void onSuccess(SourceCallbackContext sourceCallbackContext) {
        logger.debug("Message processed successfully.");
        try {
            ((AckReplyConsumer) sourceCallbackContext.getVariable(CONSUMER).orElseThrow(() -> {
                return new IllegalStateException("Consumer for message ACK not found!");
            })).ack();
        } catch (Exception e) {
            logger.warn("An error {} occurred during the automatic acknowledgement of the current message. \n Continuing...", e.getMessage());
        }
    }

    @OnBackPressure
    public void onBackPressure(BackPressureContext backPressureContext, SourceCompletionCallback sourceCompletionCallback) {
        try {
            logger.debug("Flow is unable to accept new messages at this time. Message will be NACK-ed");
            ((AckReplyConsumer) backPressureContext.getSourceCallbackContext().getVariable(CONSUMER).orElseThrow(() -> {
                return new IllegalStateException("Consumer for message NACK not found!");
            })).nack();
        } catch (Exception e) {
            logger.warn("An error {} occurred during the automatic not-acknowledgement of the current message. \n Continuing...", e.getMessage());
        }
    }

    @OnTerminate
    public void onTerminate(SourceCallbackContext sourceCallbackContext) {
        logger.trace("Message was processed by messageListenerSource flow");
    }

    private Subscriber initSubscriber(SourceCallback<InputStream, Map<String, String>> sourceCallback) {
        return Subscriber.newBuilder(String.format(ResourceConstants.PROJECT_SUBSCRIPTION_NAME, this.identifier.getProjectId(), this.identifier.getSubscriptionName()), (pubsubMessage, ackReplyConsumer) -> {
            HashMap hashMap = new HashMap();
            hashMap.put("messageId", pubsubMessage.getMessageId());
            hashMap.put("publishTime", pubsubMessage.getPublishTime().toString());
            hashMap.put("orderingKey", pubsubMessage.getOrderingKey());
            hashMap.putAll(pubsubMessage.getAttributesMap());
            SourceCallbackContext createContext = sourceCallback.createContext();
            createContext.addVariable(CONSUMER, ackReplyConsumer);
            sourceCallback.handle(Result.builder().output(pubsubMessage.getData().newInput()).attributes(hashMap).build(), createContext);
        }).setFlowControlSettings(FlowControlSettings.newBuilder().setLimitExceededBehavior(FlowController.LimitExceededBehavior.valueOf(this.flowControlParameters.getLimitExceededBehavior().getValue())).setMaxOutstandingElementCount(Long.valueOf(this.flowControlParameters.getMaxOutstandingElementCount())).setMaxOutstandingRequestBytes(Long.valueOf(this.flowControlParameters.getMaxOutstandingRequestSizeInBytes())).build()).setCredentialsProvider(this.pubSubConnection.getCredentialsProvider()).setExecutorProvider(InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(this.consumerCount).build()).build();
    }
}
