/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kinesis.source.enumerator;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants;
import org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigUtil;
import org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner;
import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorState;
import org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException;
import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
import org.apache.flink.connector.kinesis.source.split.StartingPosition;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.Shard;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class KinesisStreamsSourceEnumerator
implements SplitEnumerator<KinesisShardSplit, KinesisStreamsSourceEnumeratorState> {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamsSourceEnumerator.class);
    private final SplitEnumeratorContext<KinesisShardSplit> context;
    private final String streamArn;
    private final Configuration sourceConfig;
    private final StreamProxy streamProxy;
    private final KinesisShardAssigner shardAssigner;
    private final ShardAssignerContext shardAssignerContext;
    private final Map<Integer, Set<KinesisShardSplit>> splitAssignment = new HashMap<Integer, Set<KinesisShardSplit>>();
    private final Set<String> assignedSplitIds = new HashSet<String>();
    private final Set<KinesisShardSplit> unassignedSplits;
    private String lastSeenShardId;

    public KinesisStreamsSourceEnumerator(SplitEnumeratorContext<KinesisShardSplit> context, String streamArn, Configuration sourceConfig, StreamProxy streamProxy, KinesisShardAssigner shardAssigner, KinesisStreamsSourceEnumeratorState state) {
        this.context = context;
        this.streamArn = streamArn;
        this.sourceConfig = sourceConfig;
        this.streamProxy = streamProxy;
        this.shardAssigner = shardAssigner;
        this.shardAssignerContext = new ShardAssignerContext(this.splitAssignment, context);
        if (state == null) {
            this.lastSeenShardId = null;
            this.unassignedSplits = new HashSet<KinesisShardSplit>();
        } else {
            this.lastSeenShardId = state.getLastSeenShardId();
            this.unassignedSplits = state.getUnassignedSplits();
        }
    }

    public void start() {
        if (this.lastSeenShardId == null) {
            this.context.callAsync(this::initialDiscoverSplits, this::assignSplits);
        }
        long shardDiscoveryInterval = (Long)this.sourceConfig.get(KinesisStreamsSourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS);
        this.context.callAsync(this::periodicallyDiscoverSplits, this::assignSplits, shardDiscoveryInterval, shardDiscoveryInterval);
    }

    public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
    }

    public void addSplitsBack(List<KinesisShardSplit> splits, int subtaskId) {
        if (!this.splitAssignment.containsKey(subtaskId)) {
            LOG.warn("Unable to add splits back for subtask {} since it is not assigned any splits. Splits: {}", (Object)subtaskId, splits);
            return;
        }
        for (KinesisShardSplit split : splits) {
            this.splitAssignment.get(subtaskId).remove(split);
            this.assignedSplitIds.remove(split.splitId());
            this.unassignedSplits.add(split);
        }
        this.assignSplits(Collections.emptyList(), null);
    }

    public void addReader(int subtaskId) {
        this.splitAssignment.putIfAbsent(subtaskId, new HashSet());
    }

    public KinesisStreamsSourceEnumeratorState snapshotState(long checkpointId) throws Exception {
        return new KinesisStreamsSourceEnumeratorState(this.unassignedSplits, this.lastSeenShardId);
    }

    public void close() throws IOException {
        this.streamProxy.close();
    }

    private List<KinesisShardSplit> initialDiscoverSplits() {
        List<Shard> shards = this.streamProxy.listShards(this.streamArn, this.lastSeenShardId);
        return this.mapToSplits(shards, (KinesisStreamsSourceConfigConstants.InitialPosition)((Object)this.sourceConfig.get(KinesisStreamsSourceConfigConstants.STREAM_INITIAL_POSITION)));
    }

    private List<KinesisShardSplit> periodicallyDiscoverSplits() {
        List<Shard> shards = this.streamProxy.listShards(this.streamArn, this.lastSeenShardId);
        return this.mapToSplits(shards, KinesisStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON);
    }

    private List<KinesisShardSplit> mapToSplits(List<Shard> shards, KinesisStreamsSourceConfigConstants.InitialPosition initialPosition) {
        StartingPosition startingPosition;
        switch (initialPosition) {
            case LATEST: {
                startingPosition = StartingPosition.fromTimestamp(Instant.now());
                break;
            }
            case AT_TIMESTAMP: {
                startingPosition = StartingPosition.fromTimestamp(KinesisStreamsSourceConfigUtil.parseStreamTimestampStartingPosition(this.sourceConfig).toInstant());
                break;
            }
            default: {
                startingPosition = StartingPosition.fromStart();
            }
        }
        ArrayList<KinesisShardSplit> splits = new ArrayList<KinesisShardSplit>();
        for (Shard shard : shards) {
            splits.add(new KinesisShardSplit(this.streamArn, shard.shardId(), startingPosition));
        }
        return splits;
    }

    private void assignSplits(List<KinesisShardSplit> discoveredSplits, Throwable throwable) {
        if (throwable != null) {
            throw new KinesisStreamsSourceException("Failed to list shards.", throwable);
        }
        if (this.context.registeredReaders().size() < this.context.currentParallelism()) {
            LOG.info("Insufficient registered readers, skipping assignment of discovered splits until all readers are registered. Required number of readers: {}, Registered readers: {}", (Object)this.context.currentParallelism(), (Object)this.context.registeredReaders().size());
            this.unassignedSplits.addAll(discoveredSplits);
            return;
        }
        HashMap<Integer, List<KinesisShardSplit>> newSplitAssignments = new HashMap<Integer, List<KinesisShardSplit>>();
        for (KinesisShardSplit split : this.unassignedSplits) {
            this.assignSplitToSubtask(split, newSplitAssignments);
        }
        this.unassignedSplits.clear();
        for (KinesisShardSplit split : discoveredSplits) {
            this.assignSplitToSubtask(split, newSplitAssignments);
        }
        this.updateLastSeenShardId(discoveredSplits);
        this.updateSplitAssignment(newSplitAssignments);
        this.context.assignSplits(new SplitsAssignment(newSplitAssignments));
    }

    private void assignSplitToSubtask(KinesisShardSplit split, Map<Integer, List<KinesisShardSplit>> newSplitAssignments) {
        if (this.assignedSplitIds.contains(split.splitId())) {
            LOG.info("Skipping assignment of shard {} from stream {} because it is already assigned.", (Object)split.getShardId(), (Object)split.getStreamArn());
            return;
        }
        int selectedSubtask = this.shardAssigner.assign(split, this.shardAssignerContext.withPendingSplitAssignments(newSplitAssignments));
        LOG.info("Assigning shard {} from stream {} to subtask {}.", new Object[]{split.getShardId(), split.getStreamArn(), selectedSubtask});
        if (newSplitAssignments.containsKey(selectedSubtask)) {
            newSplitAssignments.get(selectedSubtask).add(split);
        } else {
            ArrayList<KinesisShardSplit> subtaskList = new ArrayList<KinesisShardSplit>();
            subtaskList.add(split);
            newSplitAssignments.put(selectedSubtask, subtaskList);
        }
        this.assignedSplitIds.add(split.splitId());
    }

    private void updateLastSeenShardId(List<KinesisShardSplit> discoveredSplits) {
        if (!discoveredSplits.isEmpty()) {
            KinesisShardSplit lastSplit = discoveredSplits.get(discoveredSplits.size() - 1);
            this.lastSeenShardId = lastSplit.getShardId();
        }
    }

    private void updateSplitAssignment(Map<Integer, List<KinesisShardSplit>> newSplitsAssignment) {
        newSplitsAssignment.forEach((subtaskId, newSplits) -> {
            if (this.splitAssignment.containsKey(subtaskId)) {
                this.splitAssignment.get(subtaskId).addAll((Collection<KinesisShardSplit>)newSplits);
            } else {
                this.splitAssignment.put((Integer)subtaskId, new HashSet(newSplits));
            }
        });
    }

    @Internal
    private static class ShardAssignerContext
    implements KinesisShardAssigner.Context {
        private final Map<Integer, Set<KinesisShardSplit>> splitAssignment;
        private final SplitEnumeratorContext<KinesisShardSplit> splitEnumeratorContext;
        private Map<Integer, List<KinesisShardSplit>> pendingSplitAssignments = Collections.emptyMap();

        private ShardAssignerContext(Map<Integer, Set<KinesisShardSplit>> splitAssignment, SplitEnumeratorContext<KinesisShardSplit> splitEnumeratorContext) {
            this.splitAssignment = splitAssignment;
            this.splitEnumeratorContext = splitEnumeratorContext;
        }

        private ShardAssignerContext withPendingSplitAssignments(Map<Integer, List<KinesisShardSplit>> pendingSplitAssignments) {
            HashMap copyPendingSplitAssignments = new HashMap();
            for (Map.Entry<Integer, List<KinesisShardSplit>> entry : pendingSplitAssignments.entrySet()) {
                copyPendingSplitAssignments.put(entry.getKey(), Collections.unmodifiableList(new ArrayList(entry.getValue())));
            }
            this.pendingSplitAssignments = Collections.unmodifiableMap(copyPendingSplitAssignments);
            return this;
        }

        @Override
        public Map<Integer, Set<KinesisShardSplit>> getCurrentSplitAssignment() {
            HashMap copyCurrentSplitAssignment = new HashMap();
            for (Map.Entry<Integer, Set<KinesisShardSplit>> entry : this.splitAssignment.entrySet()) {
                copyCurrentSplitAssignment.put(entry.getKey(), Collections.unmodifiableSet(new HashSet(entry.getValue())));
            }
            return Collections.unmodifiableMap(copyCurrentSplitAssignment);
        }

        @Override
        public Map<Integer, List<KinesisShardSplit>> getPendingSplitAssignments() {
            return this.pendingSplitAssignments;
        }

        @Override
        public Map<Integer, ReaderInfo> getRegisteredReaders() {
            return this.splitEnumeratorContext.registeredReaders();
        }
    }
}

