/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.List;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.rabbitmq.QueueingConsumer;
import org.apache.flink.streaming.connectors.rabbitmq.RMQDeserializationSchema;
import org.apache.flink.streaming.connectors.rabbitmq.RMQDeserializationSchemaWrapper;
import org.apache.flink.streaming.connectors.rabbitmq.Util;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RMQSource<OUT>
extends MultipleIdsMessageAcknowledgingSourceBase<OUT, String, Long>
implements ResultTypeQueryable<OUT> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(RMQSource.class);
    private final RMQConnectionConfig rmqConnectionConfig;
    protected final String queueName;
    private final boolean usesCorrelationId;
    protected RMQDeserializationSchema<OUT> deliveryDeserializer;
    protected transient Connection connection;
    protected transient Channel channel;
    protected transient QueueingConsumer consumer;
    protected transient boolean autoAck;
    private volatile transient boolean running;

    public RMQSource(RMQConnectionConfig rmqConnectionConfig, String queueName, DeserializationSchema<OUT> deserializationSchema) {
        this(rmqConnectionConfig, queueName, false, deserializationSchema);
    }

    public RMQSource(RMQConnectionConfig rmqConnectionConfig, String queueName, boolean usesCorrelationId, DeserializationSchema<OUT> deserializationSchema) {
        super(String.class);
        this.rmqConnectionConfig = rmqConnectionConfig;
        this.queueName = queueName;
        this.usesCorrelationId = usesCorrelationId;
        this.deliveryDeserializer = new RMQDeserializationSchemaWrapper<OUT>(deserializationSchema);
    }

    public RMQSource(RMQConnectionConfig rmqConnectionConfig, String queueName, RMQDeserializationSchema<OUT> deliveryDeserializer) {
        this(rmqConnectionConfig, queueName, false, deliveryDeserializer);
    }

    public RMQSource(RMQConnectionConfig rmqConnectionConfig, String queueName, boolean usesCorrelationId, RMQDeserializationSchema<OUT> deliveryDeserializer) {
        super(String.class);
        this.rmqConnectionConfig = rmqConnectionConfig;
        this.queueName = queueName;
        this.usesCorrelationId = usesCorrelationId;
        this.deliveryDeserializer = deliveryDeserializer;
    }

    protected ConnectionFactory setupConnectionFactory() throws Exception {
        return this.rmqConnectionConfig.getConnectionFactory();
    }

    @VisibleForTesting
    protected Connection setupConnection() throws Exception {
        return this.setupConnectionFactory().newConnection();
    }

    private Channel setupChannel(Connection connection) throws Exception {
        Channel chan = connection.createChannel();
        if (this.rmqConnectionConfig.getPrefetchCount().isPresent()) {
            chan.basicQos(this.rmqConnectionConfig.getPrefetchCount().get().intValue(), true);
        }
        return chan;
    }

    @VisibleForTesting
    protected void setupQueue() throws IOException {
        Util.declareQueueDefaults(this.channel, this.queueName);
    }

    public void open(Configuration config) throws Exception {
        super.open(config);
        try {
            this.connection = this.setupConnection();
            this.channel = this.setupChannel(this.connection);
            if (this.channel == null) {
                throw new RuntimeException("None of RabbitMQ channels are available");
            }
            this.setupQueue();
            this.consumer = new QueueingConsumer(this.channel);
            RuntimeContext runtimeContext = this.getRuntimeContext();
            if (runtimeContext instanceof StreamingRuntimeContext && ((StreamingRuntimeContext)runtimeContext).isCheckpointingEnabled()) {
                this.autoAck = false;
                this.channel.txSelect();
            } else {
                this.autoAck = true;
            }
            LOG.debug("Starting RabbitMQ source with autoAck status: " + this.autoAck);
            this.channel.basicConsume(this.queueName, this.autoAck, (Consumer)this.consumer);
        }
        catch (IOException e) {
            IOUtils.closeAllQuietly((AutoCloseable[])new AutoCloseable[]{this.channel, this.connection});
            throw new RuntimeException("Cannot create RMQ connection with " + this.queueName + " at " + this.rmqConnectionConfig.getHost(), e);
        }
        this.deliveryDeserializer.open(RuntimeContextInitializationContextAdapters.deserializationAdapter((RuntimeContext)this.getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
        this.running = true;
    }

    public void close() throws Exception {
        super.close();
        Exception exception = null;
        try {
            if (this.consumer != null && this.channel != null) {
                this.channel.basicCancel(this.consumer.getConsumerTag());
            }
        }
        catch (IOException e) {
            exception = new RuntimeException("Error while cancelling RMQ consumer on " + this.queueName + " at " + this.rmqConnectionConfig.getHost(), e);
        }
        try {
            IOUtils.closeAll((AutoCloseable[])new AutoCloseable[]{this.channel, this.connection});
        }
        catch (IOException e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)new RuntimeException("Error while closing RMQ source with " + this.queueName + " at " + this.rmqConnectionConfig.getHost(), e), (Throwable)exception);
        }
        if (exception != null) {
            throw exception;
        }
    }

    private void processMessage(Delivery delivery, RMQCollectorImpl collector) throws IOException {
        AMQP.BasicProperties properties = delivery.getProperties();
        byte[] body = delivery.getBody();
        Envelope envelope = delivery.getEnvelope();
        collector.setFallBackIdentifiers(properties.getCorrelationId(), envelope.getDeliveryTag());
        this.deliveryDeserializer.deserialize(envelope, properties, body, collector);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(SourceFunction.SourceContext<OUT> ctx) throws Exception {
        RMQCollectorImpl collector = new RMQCollectorImpl(ctx);
        long timeout = this.rmqConnectionConfig.getDeliveryTimeout();
        while (this.running) {
            Delivery delivery = this.consumer.nextDelivery(timeout);
            Object object = ctx.getCheckpointLock();
            synchronized (object) {
                if (delivery != null) {
                    this.processMessage(delivery, collector);
                }
                if (collector.isEndOfStreamSignalled()) {
                    this.running = false;
                    return;
                }
            }
        }
    }

    public void cancel() {
        this.running = false;
    }

    protected void acknowledgeSessionIDs(List<Long> sessionIds) {
        try {
            for (long id : sessionIds) {
                this.channel.basicAck(id, false);
            }
            this.channel.txCommit();
        }
        catch (IOException e) {
            throw new RuntimeException("Messages could not be acknowledged during checkpoint creation.", e);
        }
    }

    public TypeInformation<OUT> getProducedType() {
        return this.deliveryDeserializer.getProducedType();
    }

    private class RMQCollectorImpl
    implements RMQDeserializationSchema.RMQCollector<OUT> {
        private final SourceFunction.SourceContext<OUT> ctx;
        private boolean endOfStreamSignalled = false;
        private String correlationId;
        private long deliveryTag;
        private boolean customIdentifiersSet = false;

        private RMQCollectorImpl(SourceFunction.SourceContext<OUT> ctx) {
            this.ctx = ctx;
        }

        public void collect(OUT record) {
            boolean newMessage;
            if (!this.customIdentifiersSet && !(newMessage = this.setMessageIdentifiers(this.correlationId, this.deliveryTag))) {
                return;
            }
            if (this.isEndOfStream(record)) {
                this.endOfStreamSignalled = true;
                return;
            }
            this.ctx.collect(record);
        }

        public void setFallBackIdentifiers(String correlationId, long deliveryTag) {
            this.correlationId = correlationId;
            this.deliveryTag = deliveryTag;
            this.customIdentifiersSet = false;
        }

        @Override
        public boolean setMessageIdentifiers(String correlationId, long deliveryTag) {
            if (this.customIdentifiersSet) {
                throw new IllegalStateException("You can set only a single set of identifiers for a block of messages.");
            }
            this.customIdentifiersSet = true;
            if (!RMQSource.this.autoAck) {
                if (RMQSource.this.usesCorrelationId) {
                    Preconditions.checkNotNull((Object)correlationId, (String)"RabbitMQ source was instantiated with usesCorrelationId set to true yet we couldn't extract the correlation id from it!");
                    if (!RMQSource.this.addId(correlationId)) {
                        try {
                            RMQSource.this.channel.basicReject(deliveryTag, false);
                        }
                        catch (IOException e) {
                            throw new RuntimeException("Message could not be acknowledged with basicReject.", e);
                        }
                        return false;
                    }
                }
                RMQSource.this.sessionIds.add(deliveryTag);
            }
            return true;
        }

        boolean isEndOfStream(OUT record) {
            return this.endOfStreamSignalled || RMQSource.this.deliveryDeserializer.isEndOfStream(record);
        }

        public boolean isEndOfStreamSignalled() {
            return this.endOfStreamSignalled;
        }

        public void close() {
        }
    }
}

