/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.sink.writer.topic;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.connector.pulsar.common.config.PulsarClientFactory;
import org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils;
import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
import org.apache.flink.shaded.guava30.com.google.common.base.Objects;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class TopicMetadataListener
implements Serializable,
Closeable {
    private static final long serialVersionUID = 6186948471557507522L;
    private static final Logger LOG = LoggerFactory.getLogger(TopicMetadataListener.class);
    private final ImmutableList<String> partitionedTopics;
    private final Map<String, Integer> topicMetadata;
    private volatile ImmutableList<String> availableTopics;
    private transient PulsarAdmin pulsarAdmin;
    private transient Long topicMetadataRefreshInterval;
    private transient ProcessingTimeService timeService;

    public TopicMetadataListener() {
        this(Collections.emptyList());
    }

    public TopicMetadataListener(List<String> topics) {
        ArrayList<String> partitions = new ArrayList<String>(topics.size());
        HashMap<String, Integer> metadata = new HashMap<String, Integer>(topics.size());
        for (String topic : topics) {
            if (TopicNameUtils.isPartitioned(topic)) {
                partitions.add(topic);
                continue;
            }
            metadata.put(topic, -1);
        }
        this.partitionedTopics = ImmutableList.copyOf(partitions);
        this.topicMetadata = metadata;
        this.availableTopics = ImmutableList.of();
    }

    public void open(SinkConfiguration sinkConfiguration, ProcessingTimeService timeService) {
        if (this.topicMetadata.isEmpty()) {
            LOG.info("No topics have been provided, skip listener initialize.");
            return;
        }
        this.pulsarAdmin = PulsarClientFactory.createAdmin(sinkConfiguration);
        this.topicMetadataRefreshInterval = sinkConfiguration.getTopicMetadataRefreshInterval();
        this.timeService = timeService;
        PulsarExceptionUtils.sneakyAdmin(this::updateTopicMetadata);
        this.triggerNextTopicMetadataUpdate(true);
    }

    public List<String> availableTopics() {
        if (!(!this.availableTopics.isEmpty() || this.partitionedTopics.isEmpty() && this.topicMetadata.isEmpty())) {
            ArrayList<String> results = new ArrayList<String>();
            for (Map.Entry<String, Integer> entry : this.topicMetadata.entrySet()) {
                int partitionNums = entry.getValue();
                if (partitionNums == 0) {
                    results.add(TopicNameUtils.topicNameWithNonPartition(entry.getKey()));
                    continue;
                }
                for (int i = 0; i < partitionNums; ++i) {
                    results.add(TopicNameUtils.topicNameWithPartition(entry.getKey(), i));
                }
            }
            results.addAll((Collection<String>)this.partitionedTopics);
            this.availableTopics = ImmutableList.copyOf(results);
        }
        return this.availableTopics;
    }

    @Override
    public void close() throws IOException {
        if (this.pulsarAdmin != null) {
            this.pulsarAdmin.close();
        }
    }

    private void triggerNextTopicMetadataUpdate(boolean initial) {
        if (!initial) {
            try {
                this.updateTopicMetadata();
            }
            catch (PulsarAdminException e) {
                LOG.warn("", (Throwable)e);
            }
        }
        long currentProcessingTime = this.timeService.getCurrentProcessingTime();
        long triggerTime = currentProcessingTime + this.topicMetadataRefreshInterval;
        this.timeService.registerTimer(triggerTime, time -> this.triggerNextTopicMetadataUpdate(false));
    }

    private void updateTopicMetadata() throws PulsarAdminException {
        boolean shouldUpdate = false;
        for (Map.Entry<String, Integer> entry : this.topicMetadata.entrySet()) {
            String topic = entry.getKey();
            PartitionedTopicMetadata metadata = this.pulsarAdmin.topics().getPartitionedTopicMetadata(topic);
            if (Objects.equal((Object)entry.getValue(), (Object)metadata.partitions)) continue;
            entry.setValue(metadata.partitions);
            shouldUpdate = true;
        }
        if (shouldUpdate) {
            this.availableTopics = ImmutableList.of();
        }
    }
}

