/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.mongodb.source.enumerator.assigner;

import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
import org.apache.flink.connector.mongodb.source.config.MongoReadOptions;
import org.apache.flink.connector.mongodb.source.enumerator.MongoSourceEnumState;
import org.apache.flink.connector.mongodb.source.enumerator.assigner.MongoSplitAssigner;
import org.apache.flink.connector.mongodb.source.enumerator.splitter.MongoSplitters;
import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit;
import org.apache.flink.connector.mongodb.source.split.MongoSourceSplit;
import org.apache.flink.mongodb.shaded.com.mongodb.MongoNamespace;
import org.apache.flink.mongodb.shaded.com.mongodb.client.MongoClient;
import org.apache.flink.mongodb.shaded.com.mongodb.client.MongoClients;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class MongoScanSplitAssigner
implements MongoSplitAssigner {
    private static final Logger LOG = LoggerFactory.getLogger(MongoScanSplitAssigner.class);
    private final MongoConnectionOptions connectionOptions;
    private final MongoReadOptions readOptions;
    private final LinkedList<String> remainingCollections;
    private final List<String> alreadyProcessedCollections;
    private final LinkedList<MongoScanSourceSplit> remainingScanSplits;
    private final Map<String, MongoScanSourceSplit> assignedScanSplits;
    private boolean initialized;
    private MongoClient mongoClient;

    public MongoScanSplitAssigner(MongoConnectionOptions connectionOptions, MongoReadOptions readOptions, MongoSourceEnumState sourceEnumState) {
        this.connectionOptions = connectionOptions;
        this.readOptions = readOptions;
        this.remainingCollections = new LinkedList<String>(sourceEnumState.getRemainingCollections());
        this.alreadyProcessedCollections = sourceEnumState.getAlreadyProcessedCollections();
        this.remainingScanSplits = new LinkedList<MongoScanSourceSplit>(sourceEnumState.getRemainingScanSplits());
        this.assignedScanSplits = sourceEnumState.getAssignedScanSplits();
        this.initialized = sourceEnumState.isInitialized();
    }

    @Override
    public void open() {
        LOG.info("Mongo scan split assigner is opening.");
        if (!this.initialized) {
            String collectionId = String.format("%s.%s", this.connectionOptions.getDatabase(), this.connectionOptions.getCollection());
            this.remainingCollections.add(collectionId);
            this.mongoClient = MongoClients.create(this.connectionOptions.getUri());
            this.initialized = true;
        }
    }

    @Override
    public Optional<MongoSourceSplit> getNext() {
        if (!this.remainingScanSplits.isEmpty()) {
            MongoScanSourceSplit split = this.remainingScanSplits.poll();
            this.assignedScanSplits.put(split.splitId(), split);
            return Optional.of(split);
        }
        String nextCollection = this.remainingCollections.poll();
        if (nextCollection != null) {
            Collection<MongoScanSourceSplit> splits = MongoSplitters.split(this.mongoClient, this.readOptions, new MongoNamespace(nextCollection));
            this.remainingScanSplits.addAll(splits);
            this.alreadyProcessedCollections.add(nextCollection);
            return this.getNext();
        }
        return Optional.empty();
    }

    @Override
    public void addSplitsBack(Collection<MongoSourceSplit> splits) {
        for (MongoSourceSplit split : splits) {
            if (!(split instanceof MongoScanSourceSplit)) continue;
            this.remainingScanSplits.add((MongoScanSourceSplit)split);
            this.assignedScanSplits.remove(split.splitId());
        }
    }

    @Override
    public MongoSourceEnumState snapshotState(long checkpointId) {
        return new MongoSourceEnumState(this.remainingCollections, this.alreadyProcessedCollections, this.remainingScanSplits, this.assignedScanSplits, this.initialized);
    }

    @Override
    public boolean noMoreSplits() {
        Preconditions.checkState((boolean)this.initialized, (Object)"The noMoreSplits method was called but not initialized.");
        return this.remainingCollections.isEmpty() && this.remainingScanSplits.isEmpty();
    }

    @Override
    public void close() throws IOException {
        if (this.mongoClient != null) {
            this.mongoClient.close();
            LOG.info("Mongo scan split assigner is closed.");
        }
    }
}

