/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.source.enumerator;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.util.InstantiationUtil;
import org.apache.pulsar.client.api.SubscriptionType;

@Internal
public class SplitsAssignmentState {
    private final StopCursor stopCursor;
    private final SourceConfiguration sourceConfiguration;
    private final Set<TopicPartition> appendedPartitions;
    private final Set<PulsarPartitionSplit> pendingPartitionSplits;
    private final Map<Integer, Set<PulsarPartitionSplit>> sharedPendingPartitionSplits;
    private final Map<Integer, Set<String>> readerAssignedSplits;
    private boolean initialized;

    public SplitsAssignmentState(StopCursor stopCursor, SourceConfiguration sourceConfiguration) {
        this.stopCursor = stopCursor;
        this.sourceConfiguration = sourceConfiguration;
        this.appendedPartitions = new HashSet<TopicPartition>();
        this.pendingPartitionSplits = new HashSet<PulsarPartitionSplit>();
        this.sharedPendingPartitionSplits = new HashMap<Integer, Set<PulsarPartitionSplit>>();
        this.readerAssignedSplits = new HashMap<Integer, Set<String>>();
        this.initialized = false;
    }

    public SplitsAssignmentState(StopCursor stopCursor, SourceConfiguration sourceConfiguration, PulsarSourceEnumState sourceEnumState) {
        this.stopCursor = stopCursor;
        this.sourceConfiguration = sourceConfiguration;
        this.appendedPartitions = sourceEnumState.getAppendedPartitions();
        this.pendingPartitionSplits = sourceEnumState.getPendingPartitionSplits();
        this.sharedPendingPartitionSplits = sourceEnumState.getSharedPendingPartitionSplits();
        this.readerAssignedSplits = sourceEnumState.getReaderAssignedSplits();
        this.initialized = sourceEnumState.isInitialized();
    }

    public PulsarSourceEnumState snapshotState() {
        return new PulsarSourceEnumState(this.appendedPartitions, this.pendingPartitionSplits, this.sharedPendingPartitionSplits, this.readerAssignedSplits, this.initialized);
    }

    public void appendTopicPartitions(Set<TopicPartition> fetchedPartitions) {
        for (TopicPartition partition : fetchedPartitions) {
            if (this.appendedPartitions.contains(partition)) continue;
            if (!this.sharePartition()) {
                this.pendingPartitionSplits.add(this.createSplit(partition));
            }
            this.appendedPartitions.add(partition);
        }
        if (!this.initialized) {
            this.initialized = true;
        }
    }

    public boolean containsTopic(String topicName) {
        return this.appendedPartitions.stream().anyMatch(partition -> Objects.equals(partition.getFullTopicName(), topicName));
    }

    public void putSplitsBackToPendingList(List<PulsarPartitionSplit> splits, int readerId) {
        if (!this.sharePartition()) {
            this.pendingPartitionSplits.addAll(splits);
        } else {
            Set pending = this.sharedPendingPartitionSplits.computeIfAbsent(readerId, id -> new HashSet());
            pending.addAll(splits);
        }
    }

    public Optional<SplitsAssignment<PulsarPartitionSplit>> assignSplits(List<Integer> pendingReaders) {
        if (pendingReaders.isEmpty()) {
            return Optional.empty();
        }
        Map<Integer, List<PulsarPartitionSplit>> assignMap = !this.sharePartition() ? this.assignNormalSplits(pendingReaders) : this.assignSharedSplits(pendingReaders);
        if (assignMap.isEmpty()) {
            return Optional.empty();
        }
        return Optional.of(new SplitsAssignment(assignMap));
    }

    public boolean noMoreNewPartitionSplits() {
        return !this.sourceConfiguration.isEnablePartitionDiscovery() && this.initialized && this.pendingPartitionSplits.isEmpty();
    }

    private Map<Integer, List<PulsarPartitionSplit>> assignNormalSplits(List<Integer> pendingReaders) {
        HashMap<Integer, List<PulsarPartitionSplit>> assignMap = new HashMap<Integer, List<PulsarPartitionSplit>>();
        List<PulsarPartitionSplit> pendingSplits = this.drainPendingPartitionsSplits();
        for (int i = 0; i < pendingSplits.size(); ++i) {
            PulsarPartitionSplit split = pendingSplits.get(i);
            int readerId = pendingReaders.get(i % pendingReaders.size());
            assignMap.computeIfAbsent(readerId, id -> new ArrayList()).add(split);
        }
        return assignMap;
    }

    private Map<Integer, List<PulsarPartitionSplit>> assignSharedSplits(List<Integer> pendingReaders) {
        HashMap<Integer, List<PulsarPartitionSplit>> assignMap = new HashMap<Integer, List<PulsarPartitionSplit>>();
        for (Integer reader : pendingReaders) {
            Set<PulsarPartitionSplit> pendingSplits = this.sharedPendingPartitionSplits.remove(reader);
            if (pendingSplits == null) {
                pendingSplits = new HashSet<PulsarPartitionSplit>();
            }
            Set assignedSplits = this.readerAssignedSplits.computeIfAbsent(reader, r -> new HashSet());
            for (TopicPartition partition : this.appendedPartitions) {
                String partitionName = partition.toString();
                if (assignedSplits.contains(partitionName)) continue;
                pendingSplits.add(this.createSplit(partition));
                assignedSplits.add(partitionName);
            }
            if (pendingSplits.isEmpty()) continue;
            assignMap.put(reader, new ArrayList<PulsarPartitionSplit>(pendingSplits));
        }
        return assignMap;
    }

    private PulsarPartitionSplit createSplit(TopicPartition partition) {
        try {
            StopCursor stop = (StopCursor)InstantiationUtil.clone((Serializable)this.stopCursor);
            return new PulsarPartitionSplit(partition, stop);
        }
        catch (IOException | ClassNotFoundException e) {
            throw new IllegalStateException(e);
        }
    }

    private List<PulsarPartitionSplit> drainPendingPartitionsSplits() {
        ArrayList<PulsarPartitionSplit> splits = new ArrayList<PulsarPartitionSplit>(this.pendingPartitionSplits);
        this.pendingPartitionSplits.clear();
        return splits;
    }

    private boolean sharePartition() {
        return this.sourceConfiguration.getSubscriptionType() == SubscriptionType.Shared;
    }
}

