/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.pulsar;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.pulsar.common.config.PulsarClientFactory;
import org.apache.flink.connector.pulsar.common.config.PulsarConfiguration;
import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
import org.apache.flink.connector.pulsar.source.PulsarSource;
import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
import org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
import org.apache.paimon.flink.action.cdc.format.DataFormat;
import org.apache.paimon.flink.action.cdc.format.DataFormatFactory;
import org.apache.paimon.utils.Preconditions;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.lookup.GetTopicsResult;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;

public class PulsarActionUtils {
    public static final ConfigOption<String> VALUE_FORMAT = ConfigOptions.key((String)"value.format").stringType().noDefaultValue().withDescription("Defines the format identifier for encoding value data.");
    public static final ConfigOption<List<String>> TOPIC = ConfigOptions.key((String)"topic").stringType().asList().noDefaultValue().withDescription("Topic name(s) from which the data is read. It also supports topic list by separating topic by semicolon like 'topic-1;topic-2'. Note, only one of \"topic-pattern\" and \"topic\" can be specified.");
    public static final ConfigOption<String> TOPIC_PATTERN = ConfigOptions.key((String)"topic-pattern").stringType().noDefaultValue().withDescription("The regular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running. Note, only one of \"topic-pattern\" and \"topic\" can be specified.");
    static final ConfigOption<String> PULSAR_START_CURSOR_FROM_MESSAGE_ID = ConfigOptions.key((String)"pulsar.startCursor.fromMessageId").stringType().defaultValue((Object)"EARLIEST").withDescription("Using a unique identifier of a single message to seek the start position. The common format is a triple '<long>ledgerId,<long>entryId,<int>partitionIndex'. Specially, you can set it to EARLIEST (-1, -1, -1) or LATEST (Long.MAX_VALUE, Long.MAX_VALUE, -1).");
    static final ConfigOption<Long> PULSAR_START_CURSOR_FORM_PUBLISH_TIME = ConfigOptions.key((String)"pulsar.startCursor.fromPublishTime").longType().noDefaultValue().withDescription("Using the message publish time to seek the start position.");
    static final ConfigOption<Boolean> PULSAR_START_CURSOR_FROM_MESSAGE_ID_INCLUSIVE = ConfigOptions.key((String)"pulsar.startCursor.fromMessageIdInclusive").booleanType().defaultValue((Object)true).withDescription("Whether to include the given message id. This option only works when the message id is not EARLIEST or LATEST.");
    static final ConfigOption<String> PULSAR_STOP_CURSOR_AT_MESSAGE_ID = ConfigOptions.key((String)"pulsar.stopCursor.atMessageId").stringType().noDefaultValue().withDescription("Stop consuming when the message id is equal or greater than the specified message id. Message that is equal to the specified message id will not be consumed. The common format is a triple '<long>ledgerId,<long>entryId,<int>partitionIndex'. Specially, you can set it to LATEST (Long.MAX_VALUE, Long.MAX_VALUE, -1).");
    static final ConfigOption<String> PULSAR_STOP_CURSOR_AFTER_MESSAGE_ID = ConfigOptions.key((String)"pulsar.stopCursor.afterMessageId").stringType().noDefaultValue().withDescription("Stop consuming when the message id is greater than the specified message id. Message that is equal to the specified message id will be consumed. The common format is a triple '<long>ledgerId,<long>entryId,<int>partitionIndex'. Specially, you can set it to LATEST (Long.MAX_VALUE, Long.MAX_VALUE, -1).");
    static final ConfigOption<Long> PULSAR_STOP_CURSOR_AT_EVENT_TIME = ConfigOptions.key((String)"pulsar.stopCursor.atEventTime").longType().noDefaultValue().withDescription("Stop consuming when message event time is greater than or equals the specified timestamp. Message that even time is equal to the specified timestamp will not be consumed.");
    static final ConfigOption<Long> PULSAR_STOP_CURSOR_AFTER_EVENT_TIME = ConfigOptions.key((String)"pulsar.stopCursor.afterEventTime").longType().noDefaultValue().withDescription("Stop consuming when message event time is greater than the specified timestamp. Message that even time is equal to the specified timestamp will be consumed.");
    static final ConfigOption<Boolean> PULSAR_SOURCE_UNBOUNDED = ConfigOptions.key((String)"pulsar.source.unbounded").booleanType().defaultValue((Object)true).withDescription("To specify the boundedness of a stream.");

    public static PulsarSource<CdcSourceRecord> buildPulsarSource(Configuration pulsarConfig, DeserializationSchema<CdcSourceRecord> deserializationSchema) {
        PulsarSourceBuilder pulsarSourceBuilder = PulsarSource.builder();
        pulsarSourceBuilder.setServiceUrl((String)pulsarConfig.get(PulsarOptions.PULSAR_SERVICE_URL)).setAdminUrl((String)pulsarConfig.get(PulsarOptions.PULSAR_ADMIN_URL)).setSubscriptionName((String)pulsarConfig.get(PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME)).setDeserializationSchema(deserializationSchema);
        pulsarConfig.getOptional(TOPIC).ifPresent(arg_0 -> ((PulsarSourceBuilder)pulsarSourceBuilder).setTopics(arg_0));
        pulsarConfig.getOptional(TOPIC_PATTERN).ifPresent(arg_0 -> ((PulsarSourceBuilder)pulsarSourceBuilder).setTopicPattern(arg_0));
        pulsarConfig.getOptional(PulsarSourceOptions.PULSAR_CONSUMER_NAME).ifPresent(arg_0 -> ((PulsarSourceBuilder)pulsarSourceBuilder).setConsumerName(arg_0));
        if (pulsarConfig.contains(PULSAR_START_CURSOR_FORM_PUBLISH_TIME)) {
            Preconditions.checkArgument((!pulsarConfig.contains(PULSAR_START_CURSOR_FROM_MESSAGE_ID) ? 1 : 0) != 0, (Object)"");
            pulsarSourceBuilder.setStartCursor(StartCursor.fromPublishTime((long)((Long)pulsarConfig.get(PULSAR_START_CURSOR_FORM_PUBLISH_TIME))));
        } else {
            String messageId = (String)pulsarConfig.get(PULSAR_START_CURSOR_FROM_MESSAGE_ID);
            if (messageId.equalsIgnoreCase("EARLIEST")) {
                pulsarSourceBuilder.setStartCursor(StartCursor.earliest());
            } else if (messageId.equalsIgnoreCase("LATEST")) {
                pulsarSourceBuilder.setStartCursor(StartCursor.latest());
            } else {
                StartCursor startCursor = StartCursor.fromMessageId((MessageId)PulsarActionUtils.toMessageId(messageId), (boolean)((Boolean)pulsarConfig.get(PULSAR_START_CURSOR_FROM_MESSAGE_ID_INCLUSIVE)));
                pulsarSourceBuilder.setStartCursor(startCursor);
            }
        }
        StopCursor stopCursor = StopCursor.never();
        int stopCursorSet = 0;
        if (pulsarConfig.contains(PULSAR_STOP_CURSOR_AT_MESSAGE_ID)) {
            stopCursor = StopCursor.atMessageId((MessageId)PulsarActionUtils.toMessageId((String)pulsarConfig.get(PULSAR_STOP_CURSOR_AT_MESSAGE_ID)));
            ++stopCursorSet;
        }
        if (pulsarConfig.contains(PULSAR_STOP_CURSOR_AFTER_MESSAGE_ID)) {
            stopCursor = StopCursor.afterMessageId((MessageId)PulsarActionUtils.toMessageId((String)pulsarConfig.get(PULSAR_STOP_CURSOR_AT_MESSAGE_ID)));
            ++stopCursorSet;
        }
        if (pulsarConfig.contains(PULSAR_STOP_CURSOR_AT_EVENT_TIME)) {
            stopCursor = StopCursor.atEventTime((long)((Long)pulsarConfig.get(PULSAR_STOP_CURSOR_AT_EVENT_TIME)));
            ++stopCursorSet;
        }
        if (pulsarConfig.contains(PULSAR_STOP_CURSOR_AFTER_EVENT_TIME)) {
            stopCursor = StopCursor.atEventTime((long)((Long)pulsarConfig.get(PULSAR_STOP_CURSOR_AFTER_EVENT_TIME)));
            ++stopCursorSet;
        }
        Preconditions.checkArgument((stopCursorSet <= 1 ? 1 : 0) != 0, (Object)"You can set at most one of the stop cursor options.");
        if (((Boolean)pulsarConfig.get(PULSAR_SOURCE_UNBOUNDED)).booleanValue()) {
            pulsarSourceBuilder.setUnboundedStopCursor(stopCursor);
        } else {
            pulsarSourceBuilder.setBoundedStopCursor(stopCursor);
        }
        String authPluginClassName = (String)pulsarConfig.get(PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME);
        if (authPluginClassName != null) {
            String authParamsString = (String)pulsarConfig.get(PulsarOptions.PULSAR_AUTH_PARAMS);
            Map authParamsMap = (Map)pulsarConfig.get(PulsarOptions.PULSAR_AUTH_PARAM_MAP);
            Preconditions.checkArgument((authParamsString != null || authParamsMap != null ? 1 : 0) != 0, (String)"You should set '%s' or '%s'", (Object[])new Object[]{PulsarOptions.PULSAR_AUTH_PARAMS.key(), PulsarOptions.PULSAR_AUTH_PARAM_MAP.key()});
            Preconditions.checkArgument((authParamsString == null || authParamsMap == null ? 1 : 0) != 0, (String)"You can only set one of '%s' and '%s'", (Object[])new Object[]{PulsarOptions.PULSAR_AUTH_PARAMS.key(), PulsarOptions.PULSAR_AUTH_PARAM_MAP.key()});
            if (authParamsString != null) {
                pulsarSourceBuilder.setAuthentication(authPluginClassName, authParamsString);
            } else {
                pulsarSourceBuilder.setAuthentication(authPluginClassName, authParamsMap);
            }
        }
        pulsarSourceBuilder.setConfig(pulsarConfig);
        return pulsarSourceBuilder.build();
    }

    private static MessageId toMessageId(String messageIdString) {
        if (messageIdString.equalsIgnoreCase("EARLIEST")) {
            return MessageId.earliest;
        }
        if (messageIdString.equalsIgnoreCase("LATEST")) {
            return MessageId.latest;
        }
        String[] splits = messageIdString.split(",");
        Preconditions.checkArgument((splits.length == 3 ? 1 : 0) != 0, (Object)"Please use format '<long>ledgerId,<long>entryId,<int>partitionIndex' for message id");
        return DefaultImplementation.getDefaultImplementation().newMessageId(Long.parseLong(splits[0].trim()), Long.parseLong(splits[1].trim()), Integer.parseInt(splits[2].trim()));
    }

    public static DataFormat getDataFormat(Configuration pulsarConfig) {
        return DataFormatFactory.createDataFormat((String)pulsarConfig.get(VALUE_FORMAT));
    }

    public static MessageQueueSchemaUtils.ConsumerWrapper createPulsarConsumer(Configuration pulsarConfig, DeserializationSchema<CdcSourceRecord> deserializationSchema) {
        try {
            SourceConfiguration pulsarSourceConfiguration = new SourceConfiguration(pulsarConfig);
            PulsarClient pulsarClient = PulsarClientFactory.createClient((PulsarConfiguration)pulsarSourceConfiguration);
            ConsumerBuilder consumerBuilder = PulsarSourceConfigUtils.createConsumerBuilder((PulsarClient)pulsarClient, (Schema)Schema.BYTES, (SourceConfiguration)pulsarSourceConfiguration);
            consumerBuilder.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
            String topic = PulsarActionUtils.findOneTopic(pulsarConfig, () -> pulsarClient);
            TopicPartition topicPartition = new TopicPartition(topic);
            consumerBuilder.topic(new String[]{topicPartition.getFullTopicName()});
            if (!TopicRangeUtils.isFullTopicRanges((List)topicPartition.getRanges())) {
                KeySharedPolicy.KeySharedPolicySticky policy = KeySharedPolicy.stickyHashRange().ranges(topicPartition.getPulsarRanges());
                policy.setAllowOutOfOrderDelivery(pulsarSourceConfiguration.isAllowKeySharedOutOfOrderDelivery());
                consumerBuilder.keySharedPolicy((KeySharedPolicy)policy);
            }
            Consumer consumer = consumerBuilder.subscribe();
            return new PulsarConsumerWrapper((Consumer<byte[]>)consumer, topic, deserializationSchema);
        }
        catch (PulsarClientException e) {
            throw new RuntimeException(e);
        }
    }

    public static String findOneTopic(Configuration pulsarConfig) {
        return PulsarActionUtils.findOneTopic(pulsarConfig, () -> {
            try {
                return PulsarClientFactory.createClient((PulsarConfiguration)new SourceConfiguration(pulsarConfig));
            }
            catch (PulsarClientException e) {
                throw new RuntimeException(e);
            }
        });
    }

    private static String findOneTopic(Configuration pulsarConfig, Supplier<PulsarClient> pulsarClientSupplier) {
        if (pulsarConfig.contains(TOPIC)) {
            return (String)((List)pulsarConfig.get(TOPIC)).get(0);
        }
        String topicPattern = (String)pulsarConfig.get(TOPIC_PATTERN);
        TopicName destination = TopicName.get((String)topicPattern);
        String pattern = destination.toString();
        Pattern shortenedPattern = Pattern.compile(pattern.split("://")[1]);
        String namespace = destination.getNamespaceObject().toString();
        LookupService lookupService = ((PulsarClientImpl)pulsarClientSupplier.get()).getLookup();
        NamespaceName namespaceName = NamespaceName.get((String)namespace);
        try {
            GetTopicsResult topicsResult;
            List topics;
            String queryPattern = shortenedPattern.toString();
            if (!queryPattern.endsWith(".*")) {
                queryPattern = null;
            }
            if ((topics = (topicsResult = (GetTopicsResult)lookupService.getTopicsUnderNamespace(namespaceName, CommandGetTopicsOfNamespace.Mode.ALL, queryPattern, null).get()).getTopics()) == null || topics.isEmpty()) {
                throw new RuntimeException("Cannot find topics match the topic-pattern " + pattern);
            }
            return (String)topics.get(0);
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private static class PulsarConsumerWrapper
    implements MessageQueueSchemaUtils.ConsumerWrapper {
        private final Consumer<byte[]> consumer;
        private final String topic;
        private final DeserializationSchema<CdcSourceRecord> deserializationSchema;

        PulsarConsumerWrapper(Consumer<byte[]> consumer, String topic, DeserializationSchema<CdcSourceRecord> deserializationSchema) {
            this.consumer = consumer;
            this.topic = topic;
            this.deserializationSchema = deserializationSchema;
        }

        @Override
        public List<CdcSourceRecord> getRecords(int pollTimeOutMills) {
            try {
                Message message = this.consumer.receive(pollTimeOutMills, TimeUnit.MILLISECONDS);
                return message == null ? Collections.emptyList() : Collections.singletonList(this.deserializationSchema.deserialize((byte[])message.getValue()));
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public String topic() {
            return this.topic;
        }

        @Override
        public void close() throws PulsarClientException {
            this.consumer.close();
        }
    }
}

