/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.kinesis.shaded.com.amazonaws.services.dynamodbv2.streamsadapter.leases;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.impl.Lease;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.interfaces.ILeaseTaker;
import software.amazon.kinesis.shaded.org.apache.commons.logging.Log;
import software.amazon.kinesis.shaded.org.apache.commons.logging.LogFactory;

public final class StreamsLeaseTaker<T extends Lease>
implements ILeaseTaker<T> {
    private static final Log LOG = LogFactory.getLog(StreamsLeaseTaker.class);
    private static final int SCAN_RETRIES = 1;
    private static final int TAKE_RETRIES = 3;
    private static final Callable<Long> SYSTEM_CLOCK_CALLABLE = System::nanoTime;
    private final ILeaseManager<T> leaseManager;
    private final String workerIdentifier;
    private final long leaseDurationNanos;
    private int maxLeasesForWorker = Integer.MAX_VALUE;
    private final Map<String, T> allLeases = new HashMap<String, T>();
    private long lastScanTimeNanos = 0L;
    private static String SHARD_END = SentinelCheckpoint.SHARD_END.toString();

    public StreamsLeaseTaker(ILeaseManager<T> leaseManager, String workerIdentifier, long leaseDurationMillis) {
        this.leaseManager = leaseManager;
        this.workerIdentifier = workerIdentifier;
        this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(leaseDurationMillis);
    }

    public StreamsLeaseTaker<T> maxLeasesForWorker(int maxLeasesForWorker) {
        if (maxLeasesForWorker <= 0) {
            throw new IllegalArgumentException("maxLeasesForWorker should be >= 1");
        }
        this.maxLeasesForWorker = maxLeasesForWorker;
        return this;
    }

    @Override
    public synchronized Map<String, T> takeLeases() throws DependencyException, InvalidStateException {
        long startTime = System.currentTimeMillis();
        this.refreshAllLeases(SYSTEM_CLOCK_CALLABLE);
        Set<T> expiredLeases = this.getExpiredLeases();
        Map<String, Integer> leaseCountsByHost = this.getUnfinishedLeaseCountsByHost(expiredLeases);
        int numWorkers = leaseCountsByHost.size();
        List<T> allUnfinishedLeases = this.getUnfinishedLeases(this.allLeases.values());
        List<T> allFinishedLeases = this.getFinishedLeases(this.allLeases.values());
        int myUnfinishedLeasesCount = this.getMyLeaseCount(allUnfinishedLeases, expiredLeases);
        int myFinishedLeasesCount = this.getMyLeaseCount(allFinishedLeases, expiredLeases);
        int targetUnfinishedLeasesCount = this.getTargetUnfinishedLeasesCount(allUnfinishedLeases.size(), numWorkers);
        int targetFinishedLeasesCount = this.getTargetFinishedLeasesCount(allFinishedLeases.size(), numWorkers);
        int remainingSlotsForUnfinishedLeases = targetUnfinishedLeasesCount - myUnfinishedLeasesCount;
        int remainingSlotsForFinishedLeases = targetFinishedLeasesCount - myFinishedLeasesCount;
        List<T> unfinishedExpiredLeases = this.getUnfinishedLeases(expiredLeases);
        List<T> leasesToTake = this.getLeasesToTakeFromExpiredLeases(unfinishedExpiredLeases, remainingSlotsForUnfinishedLeases);
        leasesToTake.addAll(this.getLeasesToTakeFromExpiredLeases(this.getFinishedLeases(expiredLeases), remainingSlotsForFinishedLeases));
        leasesToTake.addAll(this.getLeasesToSteal(leaseCountsByHost, remainingSlotsForUnfinishedLeases - unfinishedExpiredLeases.size(), targetUnfinishedLeasesCount, allUnfinishedLeases));
        LOG.info(String.format("Worker %s saw %d total leases, %d expired leases, %d workers.Unfinished lease target: %d leases, I have %d unfinished leases. Finished leases target is %d and I have %d finished leases. I will take %d leases in total.", this.workerIdentifier, this.allLeases.size(), expiredLeases.size(), numWorkers, targetUnfinishedLeasesCount, myUnfinishedLeasesCount, targetFinishedLeasesCount, myFinishedLeasesCount, leasesToTake.size()));
        Map<String, T> takenLeases = this.takeLeases(leasesToTake);
        if (remainingSlotsForFinishedLeases < 0) {
            this.evictLeases(this.getLeasesToEvict(Math.abs(remainingSlotsForFinishedLeases)));
        }
        LOG.info(String.format("TakeLeases took %d seconds.", (System.currentTimeMillis() - startTime) / 1000L));
        return takenLeases;
    }

    private void refreshAllLeases(Callable<Long> timeProvider) throws InvalidStateException, DependencyException {
        ProvisionedThroughputException lastException = null;
        for (int i = 1; i <= 1; ++i) {
            try {
                this.updateAllLeases(timeProvider);
                lastException = null;
                continue;
            }
            catch (ProvisionedThroughputException e) {
                LOG.info(String.format("Worker %s could not find expired leases on try %d out of %d", this.workerIdentifier, i, 1));
                lastException = e;
            }
        }
        if (lastException != null) {
            LOG.error("Worker " + this.workerIdentifier + " could not scan leases table, aborting takeLeases. Exception caught by last retry:", lastException);
        }
    }

    private void updateAllLeases(Callable<Long> timeProvider) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
        List<T> freshList = this.leaseManager.listLeases();
        try {
            this.lastScanTimeNanos = timeProvider.call();
        }
        catch (Exception e) {
            throw new DependencyException("Exception caught from timeProvider", e);
        }
        HashSet<String> notUpdated = new HashSet<String>(this.allLeases.keySet());
        for (Lease lease : freshList) {
            String leaseKey = lease.getLeaseKey();
            Lease oldLease = (Lease)this.allLeases.get(leaseKey);
            this.allLeases.put(leaseKey, lease);
            notUpdated.remove(leaseKey);
            if (oldLease != null) {
                if (oldLease.getLeaseCounter().equals(lease.getLeaseCounter())) {
                    lease.setLastCounterIncrementNanos(oldLease.getLastCounterIncrementNanos());
                    continue;
                }
                lease.setLastCounterIncrementNanos(this.lastScanTimeNanos);
                continue;
            }
            if (lease.getLeaseOwner() == null) {
                lease.setLastCounterIncrementNanos(0L);
                if (!LOG.isDebugEnabled()) continue;
                LOG.debug("Treating new lease with key " + leaseKey + " as never renewed because it is new and unowned.");
                continue;
            }
            lease.setLastCounterIncrementNanos(this.lastScanTimeNanos);
            if (!LOG.isDebugEnabled()) continue;
            LOG.debug("Treating new lease with key " + leaseKey + " as recently renewed because it is new and owned.");
        }
        for (String key : notUpdated) {
            this.allLeases.remove(key);
        }
    }

    private Set<T> getExpiredLeases() {
        return this.allLeases.values().stream().filter(lease -> lease.isExpired(this.leaseDurationNanos, this.lastScanTimeNanos)).collect(Collectors.toSet());
    }

    private Map<String, Integer> getUnfinishedLeaseCountsByHost(Set<T> expiredLeases) {
        HashMap<String, Integer> leaseCounts = new HashMap<String, Integer>();
        this.allLeases.values().stream().filter(lease -> !expiredLeases.contains(lease) && lease.getLeaseOwner() != null).filter(KinesisClientLease.class::isInstance).map(KinesisClientLease.class::cast).forEach(lease -> {
            int numToAdd = 0;
            if (lease.getCheckpoint() != null && !SHARD_END.equals(lease.getCheckpoint().getSequenceNumber())) {
                numToAdd = 1;
            }
            leaseCounts.merge(lease.getLeaseOwner(), numToAdd, Integer::sum);
        });
        leaseCounts.putIfAbsent(this.workerIdentifier, 0);
        return leaseCounts;
    }

    private List<T> getUnfinishedLeases(Collection<T> leases) {
        return leases.stream().filter(lease -> lease instanceof KinesisClientLease).filter(lease -> ((KinesisClientLease)lease).getCheckpoint() != null).filter(lease -> !SHARD_END.equals(((KinesisClientLease)lease).getCheckpoint().getSequenceNumber())).collect(Collectors.toList());
    }

    private List<T> getFinishedLeases(Collection<T> leases) {
        return leases.stream().filter(lease -> lease instanceof KinesisClientLease).filter(lease -> ((KinesisClientLease)lease).getCheckpoint() != null).filter(lease -> SHARD_END.equals(((KinesisClientLease)lease).getCheckpoint().getSequenceNumber())).collect(Collectors.toList());
    }

    private int getTargetUnfinishedLeasesCount(int allUnfinishedLeasesCount, int numWorkers) {
        int leaseCount = numWorkers >= allUnfinishedLeasesCount ? 1 : allUnfinishedLeasesCount / numWorkers + (allUnfinishedLeasesCount % numWorkers == 0 ? 0 : 1);
        int leaseSpillover = Math.max(0, leaseCount - this.maxLeasesForWorker);
        if (leaseSpillover > 0) {
            LOG.warn(String.format("Worker %s : target is %d unfinished shard leases and maxLeasesForWorker is %d. Resetting target to %d, lease spillover is %d.  Note that some shards may not be processed if no other workers are able to pick them up resulting in a possible stall.", this.workerIdentifier, leaseCount, this.maxLeasesForWorker, this.maxLeasesForWorker, leaseSpillover));
            leaseCount = this.maxLeasesForWorker;
        }
        return leaseCount;
    }

    private int getTargetFinishedLeasesCount(int allFinishedLeasesCount, int numWorkers) {
        int leaseCount = numWorkers >= allFinishedLeasesCount ? 1 : allFinishedLeasesCount / numWorkers + (allFinishedLeasesCount % numWorkers == 0 ? 0 : 1);
        return leaseCount;
    }

    private int getMyLeaseCount(List<T> leases, Set<T> expiredLeases) {
        return Math.toIntExact(leases.stream().filter(lease -> !expiredLeases.contains(lease)).filter(lease -> this.workerIdentifier.equals(lease.getLeaseOwner())).count());
    }

    private List<T> getLeasesToTakeFromExpiredLeases(List<T> expiredLeases, int remainingSlots) {
        if (remainingSlots <= 0) {
            return new ArrayList();
        }
        Collections.shuffle(expiredLeases);
        return expiredLeases.stream().limit(remainingSlots).collect(Collectors.toList());
    }

    private List<T> getLeasesToSteal(Map<String, Integer> leaseCountsForHosts, int needed, int target, List<T> activeLeases) {
        ArrayList<Lease> leasesToSteal = new ArrayList<Lease>();
        if (needed <= 0) {
            return leasesToSteal;
        }
        HashMap<String, Integer> leasesAddedSoFar = new HashMap<String, Integer>();
        Map<String, Integer> extraLeasesWithHosts = leaseCountsForHosts.entrySet().stream().filter(entry -> !((String)entry.getKey()).equals(this.workerIdentifier)).filter(entry -> (Integer)entry.getValue() > target).collect(Collectors.toMap(Map.Entry::getKey, entry -> (Integer)entry.getValue() - target));
        int numWorkersToStealFrom = extraLeasesWithHosts.size();
        if (numWorkersToStealFrom <= 0) {
            return leasesToSteal;
        }
        int leaseCountToStealPerWorker = needed / numWorkersToStealFrom + (needed % numWorkersToStealFrom > 0 ? 1 : 0);
        Collections.shuffle(activeLeases);
        for (Lease lease : activeLeases) {
            String leaseOwner = lease.getLeaseOwner();
            if (leaseOwner == null) continue;
            int extraLeasesWithCurrentOwner = extraLeasesWithHosts.getOrDefault(leaseOwner, 0);
            int addedSoFarFromCurrentOwner = leasesAddedSoFar.getOrDefault(leaseOwner, 0);
            if (extraLeasesWithCurrentOwner > 0 && addedSoFarFromCurrentOwner < leaseCountToStealPerWorker) {
                extraLeasesWithHosts.put(leaseOwner, extraLeasesWithCurrentOwner - 1);
                leasesAddedSoFar.put(leaseOwner, addedSoFarFromCurrentOwner + 1);
                leasesToSteal.add(lease);
            }
            if (leasesToSteal.size() < needed) continue;
            break;
        }
        for (Lease leaseToSteal : leasesToSteal) {
            LOG.info(String.format("Worker %s needs %d leases. It will steal lease %s from %s", this.workerIdentifier, needed, leaseToSteal.getLeaseKey(), leaseToSteal.getLeaseOwner()));
        }
        LOG.info(String.format("Worker %s will try to steal total %d leases", this.workerIdentifier, leasesToSteal.size()));
        return leasesToSteal;
    }

    private Map<String, T> takeLeases(List<T> leasesToTake) throws DependencyException, InvalidStateException {
        HashMap<String, Lease> takenLeases = new HashMap<String, Lease>();
        HashSet<String> untakenLeaseKeys = new HashSet<String>();
        block2: for (Lease lease : leasesToTake) {
            String leaseKey = lease.getLeaseKey();
            for (int i = 0; i < 3; ++i) {
                try {
                    if (this.leaseManager.takeLease(lease, this.workerIdentifier)) {
                        lease.setLastCounterIncrementNanos(System.nanoTime());
                        takenLeases.put(leaseKey, lease);
                        continue block2;
                    }
                    untakenLeaseKeys.add(leaseKey);
                    continue block2;
                }
                catch (ProvisionedThroughputException e) {
                    LOG.info(String.format("Could not take lease with key %s for worker %s on try %d out of %d due to capacity", leaseKey, this.workerIdentifier, i, 3));
                    continue;
                }
            }
        }
        if (takenLeases.size() > 0) {
            LOG.info(String.format("Worker %s successfully took %d leases: %s", this.workerIdentifier, takenLeases.size(), StreamsLeaseTaker.stringJoin(takenLeases.keySet(), ", ")));
        }
        if (untakenLeaseKeys.size() > 0) {
            LOG.info(String.format("Worker %s failed to take %d leases: %s", this.workerIdentifier, untakenLeaseKeys.size(), StreamsLeaseTaker.stringJoin(untakenLeaseKeys, ", ")));
        }
        return takenLeases;
    }

    private List<T> getLeasesToEvict(int numLeasesToEvict) {
        ArrayList<Lease> leasesToEvict = new ArrayList<Lease>();
        for (Lease lease : this.allLeases.values()) {
            String sequenceNumber;
            KinesisClientLease kinesisClientLease;
            if (numLeasesToEvict <= 0) {
                return leasesToEvict;
            }
            if (!this.workerIdentifier.equals(lease.getLeaseOwner()) || !(lease instanceof KinesisClientLease) || (kinesisClientLease = (KinesisClientLease)lease).getCheckpoint() == null || !SHARD_END.equals(sequenceNumber = kinesisClientLease.getCheckpoint().getSequenceNumber())) continue;
            leasesToEvict.add(lease);
            --numLeasesToEvict;
        }
        return leasesToEvict;
    }

    private void evictLeases(List<T> leases) throws DependencyException, InvalidStateException {
        for (Lease lease : leases) {
            LOG.info(String.format("Worker %s : LeaseTaker will try to evict lease %s", this.workerIdentifier, lease.getLeaseKey()));
            try {
                this.leaseManager.evictLease(lease);
            }
            catch (ProvisionedThroughputException e) {
                LOG.info(String.format("Worker %s could not evict leases to take due to capacity", this.workerIdentifier));
                return;
            }
        }
        LOG.info(String.format("Worker %s : LeaseTaker evicted %d leases", this.workerIdentifier, leases.size()));
    }

    static String stringJoin(Collection<String> strings, String delimiter) {
        StringBuilder builder = new StringBuilder();
        boolean needDelimiter = false;
        for (String string : strings) {
            if (needDelimiter) {
                builder.append(delimiter);
            }
            builder.append(string);
            needDelimiter = true;
        }
        return builder.toString();
    }

    @Override
    public String getWorkerIdentifier() {
        return this.workerIdentifier;
    }
}

