/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.source.reader;

import java.util.function.Supplier;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.pulsar.common.config.PulsarClientFactory;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader;
import org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader;
import org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader;
import org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;

@Internal
public final class PulsarSourceReaderFactory {
    private PulsarSourceReaderFactory() {
    }

    public static <OUT> SourceReader<OUT, PulsarPartitionSplit> create(SourceReaderContext readerContext, PulsarDeserializationSchema<OUT> deserializationSchema, SourceConfiguration sourceConfiguration) {
        PulsarClient pulsarClient = PulsarClientFactory.createClient(sourceConfiguration);
        PulsarAdmin pulsarAdmin = PulsarClientFactory.createAdmin(sourceConfiguration);
        int queueCapacity = sourceConfiguration.getMessageQueueCapacity();
        FutureCompletingBlockingQueue elementsQueue = new FutureCompletingBlockingQueue(queueCapacity);
        SubscriptionType subscriptionType = sourceConfiguration.getSubscriptionType();
        if (subscriptionType == SubscriptionType.Failover || subscriptionType == SubscriptionType.Exclusive) {
            Supplier splitReaderSupplier = () -> new PulsarOrderedPartitionSplitReader(pulsarClient, pulsarAdmin, sourceConfiguration, deserializationSchema);
            return new PulsarOrderedSourceReader(elementsQueue, splitReaderSupplier, readerContext, sourceConfiguration, pulsarClient, pulsarAdmin);
        }
        if (subscriptionType == SubscriptionType.Shared || subscriptionType == SubscriptionType.Key_Shared) {
            TransactionCoordinatorClientImpl coordinatorClient = ((PulsarClientImpl)pulsarClient).getTcClient();
            if (coordinatorClient == null && !sourceConfiguration.isEnableAutoAcknowledgeMessage()) {
                throw new IllegalStateException("Transaction is required but didn't enabled");
            }
            Supplier splitReaderSupplier = () -> PulsarSourceReaderFactory.lambda$create$1(pulsarClient, pulsarAdmin, sourceConfiguration, deserializationSchema, (TransactionCoordinatorClient)coordinatorClient);
            return new PulsarUnorderedSourceReader(elementsQueue, splitReaderSupplier, readerContext, sourceConfiguration, pulsarClient, pulsarAdmin, (TransactionCoordinatorClient)coordinatorClient);
        }
        throw new UnsupportedOperationException("This subscription type is not " + subscriptionType + " supported currently.");
    }

    private static /* synthetic */ PulsarUnorderedPartitionSplitReader lambda$create$1(PulsarClient pulsarClient, PulsarAdmin pulsarAdmin, SourceConfiguration sourceConfiguration, PulsarDeserializationSchema deserializationSchema, TransactionCoordinatorClient coordinatorClient) {
        return new PulsarUnorderedPartitionSplitReader(pulsarClient, pulsarAdmin, sourceConfiguration, deserializationSchema, coordinatorClient);
    }
}

