/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.source.enumerator;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.pulsar.common.config.PulsarClientFactory;
import org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils;
import org.apache.flink.connector.pulsar.source.config.CursorVerification;
import org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
import org.apache.flink.connector.pulsar.source.enumerator.SplitsAssignmentState;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class PulsarSourceEnumerator
implements SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceEnumerator.class);
    private final PulsarAdmin pulsarAdmin;
    private final PulsarClient pulsarClient;
    private final PulsarSubscriber subscriber;
    private final StartCursor startCursor;
    private final RangeGenerator rangeGenerator;
    private final SourceConfiguration sourceConfiguration;
    private final SplitEnumeratorContext<PulsarPartitionSplit> context;
    private final SplitsAssignmentState assignmentState;

    public PulsarSourceEnumerator(PulsarSubscriber subscriber, StartCursor startCursor, RangeGenerator rangeGenerator, SourceConfiguration sourceConfiguration, SplitEnumeratorContext<PulsarPartitionSplit> context, SplitsAssignmentState assignmentState) {
        this.pulsarAdmin = PulsarClientFactory.createAdmin(sourceConfiguration);
        this.pulsarClient = PulsarClientFactory.createClient(sourceConfiguration);
        this.subscriber = subscriber;
        this.startCursor = startCursor;
        this.rangeGenerator = rangeGenerator;
        this.sourceConfiguration = sourceConfiguration;
        this.context = context;
        this.assignmentState = assignmentState;
    }

    public void start() {
        this.rangeGenerator.open(this.sourceConfiguration);
        if (this.sourceConfiguration.isEnablePartitionDiscovery()) {
            LOG.info("Starting the PulsarSourceEnumerator for subscription {} with partition discovery interval of {} ms.", (Object)this.sourceConfiguration.getSubscriptionDesc(), (Object)this.sourceConfiguration.getPartitionDiscoveryIntervalMs());
            this.context.callAsync(this::getSubscribedTopicPartitions, this::checkPartitionChanges, 0L, this.sourceConfiguration.getPartitionDiscoveryIntervalMs());
        } else {
            LOG.info("Starting the PulsarSourceEnumerator for subscription {} without periodic partition discovery.", (Object)this.sourceConfiguration.getSubscriptionDesc());
            this.context.callAsync(this::getSubscribedTopicPartitions, this::checkPartitionChanges);
        }
    }

    public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
    }

    public void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId) {
        this.assignmentState.putSplitsBackToPendingList(splits, subtaskId);
        if (this.context.registeredReaders().containsKey(subtaskId)) {
            this.assignPendingPartitionSplits(Collections.singletonList(subtaskId));
        }
    }

    public void addReader(int subtaskId) {
        LOG.debug("Adding reader {} to PulsarSourceEnumerator for subscription {}.", (Object)subtaskId, (Object)this.sourceConfiguration.getSubscriptionDesc());
        this.assignPendingPartitionSplits(Collections.singletonList(subtaskId));
    }

    public PulsarSourceEnumState snapshotState(long checkpointId) {
        return this.assignmentState.snapshotState();
    }

    public void close() {
        if (this.pulsarAdmin != null) {
            this.pulsarAdmin.close();
        }
    }

    private Set<TopicPartition> getSubscribedTopicPartitions() {
        int parallelism = this.context.currentParallelism();
        Set<TopicPartition> partitions = this.subscriber.getSubscribedTopicPartitions(this.pulsarAdmin, this.rangeGenerator, parallelism);
        this.seekStartPosition(partitions);
        return partitions;
    }

    private void seekStartPosition(Set<TopicPartition> partitions) {
        ConsumerBuilder<byte[]> consumerBuilder = this.consumerBuilder();
        HashSet<String> seekedTopics = new HashSet<String>();
        for (TopicPartition partition : partitions) {
            String topicName = partition.getFullTopicName();
            if (this.assignmentState.containsTopic(topicName) || !seekedTopics.add(topicName)) continue;
            try {
                Consumer consumer = (Consumer)PulsarExceptionUtils.sneakyClient(() -> consumerBuilder.clone().topic(new String[]{topicName}).subscribe());
                Throwable throwable = null;
                try {
                    this.startCursor.seekPosition(partition.getTopic(), partition.getPartitionId(), consumer);
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (consumer == null) continue;
                    if (throwable != null) {
                        try {
                            consumer.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    consumer.close();
                }
            }
            catch (PulsarClientException e) {
                if (this.sourceConfiguration.getVerifyInitialOffsets() == CursorVerification.FAIL_ON_MISMATCH) {
                    throw new IllegalArgumentException(e);
                }
                LOG.warn("Failed to set initial consuming position for partition {}", (Object)partition, (Object)e);
            }
        }
    }

    private ConsumerBuilder<byte[]> consumerBuilder() {
        ConsumerBuilder builder = PulsarSourceConfigUtils.createConsumerBuilder(this.pulsarClient, Schema.BYTES, this.sourceConfiguration);
        if (this.sourceConfiguration.getSubscriptionType() == SubscriptionType.Key_Shared) {
            Range range = TopicRange.createFullRange().toPulsarRange();
            KeySharedPolicy.KeySharedPolicySticky keySharedPolicy = KeySharedPolicy.stickyHashRange().ranges(new Range[]{range});
            builder.keySharedPolicy((KeySharedPolicy)keySharedPolicy);
        }
        return builder;
    }

    private void checkPartitionChanges(Set<TopicPartition> fetchedPartitions, Throwable throwable) {
        if (throwable != null) {
            throw new FlinkRuntimeException("Failed to list subscribed topic partitions due to ", throwable);
        }
        this.assignmentState.appendTopicPartitions(fetchedPartitions);
        ArrayList<Integer> registeredReaders = new ArrayList<Integer>(this.context.registeredReaders().keySet());
        this.assignPendingPartitionSplits(registeredReaders);
    }

    private void assignPendingPartitionSplits(List<Integer> pendingReaders) {
        pendingReaders.forEach(reader -> {
            if (!this.context.registeredReaders().containsKey(reader)) {
                throw new IllegalStateException("Reader " + reader + " is not registered to source coordinator");
            }
        });
        this.assignmentState.assignSplits(pendingReaders).ifPresent(arg_0 -> this.context.assignSplits(arg_0));
        if (this.assignmentState.noMoreNewPartitionSplits()) {
            LOG.debug("No more PulsarPartitionSplits to assign. Sending NoMoreSplitsEvent to reader {} in subscription {}.", pendingReaders, (Object)this.sourceConfiguration.getSubscriptionDesc());
            pendingReaders.forEach(arg_0 -> this.context.signalNoMoreSplits(arg_0));
        }
    }
}

