/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.source.reader.split;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils;
import org.apache.flink.connector.pulsar.source.config.CursorVerification;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class PulsarOrderedPartitionSplitReader<OUT>
extends PulsarPartitionSplitReaderBase<OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarOrderedPartitionSplitReader.class);

    public PulsarOrderedPartitionSplitReader(PulsarClient pulsarClient, PulsarAdmin pulsarAdmin, SourceConfiguration sourceConfiguration, PulsarDeserializationSchema<OUT> deserializationSchema) {
        super(pulsarClient, pulsarAdmin, sourceConfiguration, deserializationSchema);
    }

    @Override
    protected Message<byte[]> pollMessage(Duration timeout) throws PulsarClientException {
        return this.pulsarConsumer.receive(Math.toIntExact(timeout.toMillis()), TimeUnit.MILLISECONDS);
    }

    @Override
    protected void finishedPollMessage(Message<byte[]> message) {
        LOG.debug("Finished polling message {}", message);
        message.release();
    }

    @Override
    protected void startConsumer(PulsarPartitionSplit split, Consumer<byte[]> consumer) {
        MessageId latestConsumedId = split.getLatestConsumedId();
        if (latestConsumedId != null) {
            StartCursor startCursor = StartCursor.fromMessageId(latestConsumedId, false);
            TopicPartition partition = split.getPartition();
            try {
                startCursor.seekPosition(partition.getTopic(), partition.getPartitionId(), consumer);
            }
            catch (PulsarClientException e) {
                if (this.sourceConfiguration.getVerifyInitialOffsets() == CursorVerification.FAIL_ON_MISMATCH) {
                    throw new IllegalArgumentException(e);
                }
                LOG.warn("Failed to reset cursor to {} on partition {}", new Object[]{latestConsumedId, partition, e});
            }
        }
    }

    public void notifyCheckpointComplete(TopicPartition partition, MessageId offsetsToCommit) {
        if (this.pulsarConsumer == null) {
            this.pulsarConsumer = this.createPulsarConsumer(partition);
        }
        PulsarExceptionUtils.sneakyClient(() -> this.pulsarConsumer.acknowledgeCumulative(offsetsToCommit));
    }
}

