/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.source.reader.split;

import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.connector.base.source.reader.RecordsBySplits;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils;
import org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessageCollector;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class PulsarPartitionSplitReaderBase<OUT>
implements SplitReader<PulsarMessage<OUT>, PulsarPartitionSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarPartitionSplitReaderBase.class);
    protected final PulsarClient pulsarClient;
    protected final PulsarAdmin pulsarAdmin;
    protected final SourceConfiguration sourceConfiguration;
    protected final PulsarDeserializationSchema<OUT> deserializationSchema;
    protected Consumer<byte[]> pulsarConsumer;
    protected PulsarPartitionSplit registeredSplit;

    protected PulsarPartitionSplitReaderBase(PulsarClient pulsarClient, PulsarAdmin pulsarAdmin, SourceConfiguration sourceConfiguration, PulsarDeserializationSchema<OUT> deserializationSchema) {
        this.pulsarClient = pulsarClient;
        this.pulsarAdmin = pulsarAdmin;
        this.sourceConfiguration = sourceConfiguration;
        this.deserializationSchema = deserializationSchema;
    }

    public RecordsWithSplitIds<PulsarMessage<OUT>> fetch() throws IOException {
        RecordsBySplits.Builder builder = new RecordsBySplits.Builder();
        if (this.pulsarConsumer == null || this.registeredSplit == null) {
            return builder.build();
        }
        StopCursor stopCursor = this.registeredSplit.getStopCursor();
        String splitId = this.registeredSplit.splitId();
        PulsarMessageCollector collector = new PulsarMessageCollector(splitId, builder);
        Deadline deadline = Deadline.fromNow((Duration)this.sourceConfiguration.getMaxFetchTime());
        for (int messageNum = 0; messageNum < this.sourceConfiguration.getMaxFetchRecords() && deadline.hasTimeLeft(); ++messageNum) {
            try {
                Duration timeout = deadline.timeLeftIfAny();
                Message<byte[]> message = this.pollMessage(timeout);
                if (message == null) break;
                StopCursor.StopCondition condition = stopCursor.shouldStop(message);
                if (condition == StopCursor.StopCondition.CONTINUE || condition == StopCursor.StopCondition.EXACTLY) {
                    collector.setMessage(message);
                    this.deserializationSchema.deserialize(message, collector);
                    this.finishedPollMessage(message);
                }
                if (condition != StopCursor.StopCondition.EXACTLY && condition != StopCursor.StopCondition.TERMINATE) continue;
                builder.addFinishedSplit(splitId);
                break;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
            catch (TimeoutException e) {
                break;
            }
            catch (ExecutionException e) {
                LOG.error("Error in polling message from pulsar consumer.", (Throwable)e);
                break;
            }
            catch (Exception e) {
                throw new IOException(e);
            }
        }
        return builder.build();
    }

    public void handleSplitsChanges(SplitsChange<PulsarPartitionSplit> splitsChanges) {
        LOG.debug("Handle split changes {}", splitsChanges);
        if (!(splitsChanges instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("The SplitChange type of %s is not supported.", splitsChanges.getClass()));
        }
        if (this.registeredSplit != null) {
            throw new IllegalStateException("This split reader have assigned split.");
        }
        List newSplits = splitsChanges.splits();
        Preconditions.checkArgument((newSplits.size() == 1 ? 1 : 0) != 0, (Object)"This pulsar split reader only support one split.");
        this.registeredSplit = (PulsarPartitionSplit)newSplits.get(0);
        this.registeredSplit.open(this.pulsarAdmin);
        this.beforeCreatingConsumer(this.registeredSplit);
        this.pulsarConsumer = this.createPulsarConsumer(this.registeredSplit);
        this.afterCreatingConsumer(this.registeredSplit, this.pulsarConsumer);
        LOG.info("Register split {} consumer for current reader.", (Object)this.registeredSplit);
    }

    public void wakeUp() {
    }

    public void close() {
        if (this.pulsarConsumer != null) {
            PulsarExceptionUtils.sneakyClient(() -> this.pulsarConsumer.close());
        }
    }

    @Nullable
    protected abstract Message<byte[]> pollMessage(Duration var1) throws ExecutionException, InterruptedException, PulsarClientException;

    protected abstract void finishedPollMessage(Message<byte[]> var1);

    protected void beforeCreatingConsumer(PulsarPartitionSplit split) {
    }

    protected void afterCreatingConsumer(PulsarPartitionSplit split, Consumer<byte[]> consumer) {
    }

    protected Consumer<byte[]> createPulsarConsumer(PulsarPartitionSplit split) {
        return this.createPulsarConsumer(split.getPartition());
    }

    protected Consumer<byte[]> createPulsarConsumer(TopicPartition partition) {
        ConsumerBuilder consumerBuilder = PulsarSourceConfigUtils.createConsumerBuilder(this.pulsarClient, Schema.BYTES, this.sourceConfiguration);
        consumerBuilder.topic(new String[]{partition.getFullTopicName()});
        if (this.sourceConfiguration.getSubscriptionType() == SubscriptionType.Key_Shared) {
            KeySharedPolicy.KeySharedPolicySticky policy = KeySharedPolicy.stickyHashRange().ranges(new Range[]{partition.getPulsarRange()});
            consumerBuilder.keySharedPolicy((KeySharedPolicy)policy);
        }
        return (Consumer)PulsarExceptionUtils.sneakyClient(() -> consumerBuilder.subscribe());
    }
}

