/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.source.reader;

import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.connector.base.source.reader.RecordsBySplits;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
import org.apache.flink.connector.pulsar.source.config.CursorVerification;
import org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.CursorPosition;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.shade.com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class PulsarPartitionSplitReader
implements SplitReader<Message<byte[]>, PulsarPartitionSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarPartitionSplitReader.class);
    private final PulsarClient pulsarClient;
    private final SourceConfiguration sourceConfiguration;
    private final Schema<byte[]> schema;
    private final PulsarCrypto pulsarCrypto;
    private final SourceReaderMetricGroup metricGroup;
    private Consumer<byte[]> pulsarConsumer;
    private PulsarPartitionSplit registeredSplit;

    public PulsarPartitionSplitReader(PulsarClient pulsarClient, SourceConfiguration sourceConfiguration, Schema<byte[]> schema, PulsarCrypto pulsarCrypto, SourceReaderMetricGroup metricGroup) {
        this.pulsarClient = pulsarClient;
        this.sourceConfiguration = sourceConfiguration;
        this.schema = schema;
        this.pulsarCrypto = pulsarCrypto;
        this.metricGroup = metricGroup;
    }

    public RecordsWithSplitIds<Message<byte[]>> fetch() throws IOException {
        RecordsBySplits.Builder builder = new RecordsBySplits.Builder();
        if (this.pulsarConsumer == null || this.registeredSplit == null) {
            return builder.build();
        }
        StopCursor stopCursor = this.registeredSplit.getStopCursor();
        String splitId = this.registeredSplit.splitId();
        Deadline deadline = Deadline.fromNow((Duration)this.sourceConfiguration.getMaxFetchTime());
        for (int messageNum = 0; messageNum < this.sourceConfiguration.getMaxFetchRecords() && deadline.hasTimeLeft(); ++messageNum) {
            try {
                Message message;
                int fetchTime = this.sourceConfiguration.getFetchOneMessageTime();
                if (fetchTime <= 0) {
                    fetchTime = (int)deadline.timeLeftIfAny().toMillis();
                }
                if ((message = this.pulsarConsumer.receive(fetchTime, TimeUnit.MILLISECONDS)) == null) break;
                StopCursor.StopCondition condition = stopCursor.shouldStop(message);
                if (condition == StopCursor.StopCondition.CONTINUE || condition == StopCursor.StopCondition.EXACTLY) {
                    builder.add(splitId, (Object)message);
                    LOG.debug("Finished polling message {}", (Object)message);
                }
                if (condition != StopCursor.StopCondition.EXACTLY && condition != StopCursor.StopCondition.TERMINATE) continue;
                builder.addFinishedSplit(splitId);
                break;
            }
            catch (TimeoutException e) {
                break;
            }
            catch (Exception e) {
                throw new IOException(e);
            }
        }
        return builder.build();
    }

    public void handleSplitsChanges(SplitsChange<PulsarPartitionSplit> splitsChanges) {
        LOG.debug("Handle split changes {}", splitsChanges);
        if (!(splitsChanges instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("The SplitChange type of %s is not supported.", splitsChanges.getClass()));
        }
        if (this.registeredSplit != null) {
            throw new IllegalStateException("This split reader have assigned split.");
        }
        List newSplits = splitsChanges.splits();
        Preconditions.checkArgument((newSplits.size() == 1 ? 1 : 0) != 0, (Object)"This pulsar split reader only supports one split.");
        this.registeredSplit = (PulsarPartitionSplit)newSplits.get(0);
        try {
            this.registeredSplit.open(this.pulsarClient);
        }
        catch (Exception e) {
            throw new FlinkRuntimeException((Throwable)e);
        }
        MessageId latestConsumedId = this.registeredSplit.getLatestConsumedId();
        if (latestConsumedId != null) {
            LOG.info("Reset subscription position by the checkpoint {}", (Object)latestConsumedId);
            try {
                CursorPosition cursorPosition = latestConsumedId == MessageId.latest || latestConsumedId == MessageId.earliest ? new CursorPosition(latestConsumedId, true) : new CursorPosition(latestConsumedId, false);
                String topicName = this.registeredSplit.getPartition().getFullTopicName();
                String subscriptionName = this.sourceConfiguration.getSubscriptionName();
                cursorPosition.setupSubPosition(this.pulsarClient, topicName, subscriptionName);
            }
            catch (PulsarClientException e) {
                if (this.sourceConfiguration.getVerifyInitialOffsets() == CursorVerification.FAIL_ON_MISMATCH) {
                    throw new IllegalArgumentException(e);
                }
                LOG.warn("Failed to reset cursor to {} on partition {}", new Object[]{latestConsumedId, this.registeredSplit.getPartition(), e});
            }
        }
        try {
            this.pulsarConsumer = this.createPulsarConsumer(this.registeredSplit.getPartition());
        }
        catch (PulsarClientException e) {
            throw new FlinkRuntimeException((Throwable)e);
        }
        LOG.info("Register split {} consumer for current reader.", (Object)this.registeredSplit);
    }

    public void pauseOrResumeSplits(Collection<PulsarPartitionSplit> splitsToPause, Collection<PulsarPartitionSplit> splitsToResume) {
        Preconditions.checkState((splitsToPause.size() + splitsToResume.size() <= 1 ? 1 : 0) != 0, (Object)"This pulsar split reader only supports one split.");
        if (!splitsToPause.isEmpty()) {
            this.pulsarConsumer.pause();
        } else if (!splitsToResume.isEmpty()) {
            this.pulsarConsumer.resume();
        }
    }

    public void wakeUp() {
    }

    public void close() throws PulsarClientException {
        if (this.pulsarConsumer != null) {
            this.pulsarConsumer.close();
        }
    }

    public void notifyCheckpointComplete(TopicPartition partition, MessageId offsetsToCommit) throws PulsarClientException {
        if (this.pulsarConsumer == null) {
            this.pulsarConsumer = this.createPulsarConsumer(partition);
        }
        this.pulsarConsumer.acknowledgeCumulative(offsetsToCommit);
    }

    private Consumer<byte[]> createPulsarConsumer(TopicPartition partition) throws PulsarClientException {
        ConsumerBuilder<byte[]> consumerBuilder = PulsarSourceConfigUtils.createConsumerBuilder(this.pulsarClient, this.schema, this.sourceConfiguration);
        consumerBuilder.topic(new String[]{partition.getFullTopicName()});
        CryptoKeyReader cryptoKeyReader = this.pulsarCrypto.cryptoKeyReader();
        if (cryptoKeyReader != null) {
            consumerBuilder.cryptoKeyReader(cryptoKeyReader);
            MessageCrypto<MessageMetadata, MessageMetadata> messageCrypto = this.pulsarCrypto.messageCrypto();
            if (messageCrypto != null) {
                consumerBuilder.messageCrypto(messageCrypto);
            }
        }
        if (!TopicRangeUtils.isFullTopicRanges(partition.getRanges())) {
            KeySharedPolicy.KeySharedPolicySticky policy = KeySharedPolicy.stickyHashRange().ranges(partition.getPulsarRanges());
            policy.setAllowOutOfOrderDelivery(this.sourceConfiguration.isAllowKeySharedOutOfOrderDelivery());
            consumerBuilder.keySharedPolicy((KeySharedPolicy)policy);
        }
        Consumer consumer = consumerBuilder.subscribe();
        this.exposeConsumerMetrics((Consumer<byte[]>)consumer);
        return consumer;
    }

    private void exposeConsumerMetrics(Consumer<byte[]> consumer) {
        if (this.sourceConfiguration.isEnableMetrics()) {
            String consumerIdentity = consumer.getConsumerName();
            if (Strings.isNullOrEmpty((String)consumerIdentity)) {
                consumerIdentity = UUID.randomUUID().toString();
            }
            MetricGroup group = this.metricGroup.addGroup("PulsarConsumer").addGroup(consumer.getTopic()).addGroup(consumerIdentity);
            ConsumerStats stats = consumer.getStats();
            group.gauge("numMsgsReceived", () -> ((ConsumerStats)stats).getNumMsgsReceived());
            group.gauge("numBytesReceived", () -> ((ConsumerStats)stats).getNumBytesReceived());
            group.gauge("rateMsgsReceived", () -> ((ConsumerStats)stats).getRateMsgsReceived());
            group.gauge("rateBytesReceived", () -> ((ConsumerStats)stats).getRateBytesReceived());
            group.gauge("numAcksSent", () -> ((ConsumerStats)stats).getNumAcksSent());
            group.gauge("numAcksFailed", () -> ((ConsumerStats)stats).getNumAcksFailed());
            group.gauge("numReceiveFailed", () -> ((ConsumerStats)stats).getNumReceiveFailed());
            group.gauge("numBatchReceiveFailed", () -> ((ConsumerStats)stats).getNumBatchReceiveFailed());
            group.gauge("totalMsgsReceived", () -> ((ConsumerStats)stats).getTotalMsgsReceived());
            group.gauge("totalBytesReceived", () -> ((ConsumerStats)stats).getTotalBytesReceived());
            group.gauge("totalReceivedFailed", () -> ((ConsumerStats)stats).getTotalReceivedFailed());
            group.gauge("totalBatchReceivedFailed", () -> ((ConsumerStats)stats).getTotaBatchReceivedFailed());
            group.gauge("totalAcksSent", () -> ((ConsumerStats)stats).getTotalAcksSent());
            group.gauge("totalAcksFailed", () -> ((ConsumerStats)stats).getTotalAcksFailed());
            group.gauge("msgNumInReceiverQueue", () -> ((ConsumerStats)stats).getMsgNumInReceiverQueue());
        }
    }

    @VisibleForTesting
    String getSubscriptionName() {
        return this.sourceConfiguration.getSubscriptionName();
    }
}

