/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.function.BiFunction;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.contrib.streaming.state.RocksDBOperationUtils;
import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;

public class RocksDBIncrementalCheckpointUtils {
    private static final double OVERLAP_FRACTION_THRESHOLD = 0.75;
    private static final BiFunction<KeyedStateHandle, KeyGroupRange, Score> STATE_HANDLE_EVALUATOR = (stateHandle, targetKeyGroupRange) -> {
        KeyGroupRange handleKeyGroupRange = stateHandle.getKeyGroupRange();
        KeyGroupRange intersectGroup = handleKeyGroupRange.getIntersection(targetKeyGroupRange);
        double overlapFraction = (double)intersectGroup.getNumberOfKeyGroups() / (double)handleKeyGroupRange.getNumberOfKeyGroups();
        if (overlapFraction < 0.75) {
            return Score.MIN;
        }
        return new Score(intersectGroup.getNumberOfKeyGroups(), overlapFraction);
    };

    public static void clipDBWithKeyGroupRange(@Nonnull RocksDB db, @Nonnull List<ColumnFamilyHandle> columnFamilyHandles, @Nonnull KeyGroupRange targetKeyGroupRange, @Nonnull KeyGroupRange currentKeyGroupRange, @Nonnegative int keyGroupPrefixBytes, @Nonnegative long writeBatchSize) throws RocksDBException {
        byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes];
        byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes];
        if (currentKeyGroupRange.getStartKeyGroup() < targetKeyGroupRange.getStartKeyGroup()) {
            CompositeKeySerializationUtils.serializeKeyGroup((int)currentKeyGroupRange.getStartKeyGroup(), (byte[])beginKeyGroupBytes);
            CompositeKeySerializationUtils.serializeKeyGroup((int)targetKeyGroupRange.getStartKeyGroup(), (byte[])endKeyGroupBytes);
            RocksDBIncrementalCheckpointUtils.deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes, writeBatchSize);
        }
        if (currentKeyGroupRange.getEndKeyGroup() > targetKeyGroupRange.getEndKeyGroup()) {
            CompositeKeySerializationUtils.serializeKeyGroup((int)(targetKeyGroupRange.getEndKeyGroup() + 1), (byte[])beginKeyGroupBytes);
            CompositeKeySerializationUtils.serializeKeyGroup((int)(currentKeyGroupRange.getEndKeyGroup() + 1), (byte[])endKeyGroupBytes);
            RocksDBIncrementalCheckpointUtils.deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes, writeBatchSize);
        }
    }

    private static void deleteRange(RocksDB db, List<ColumnFamilyHandle> columnFamilyHandles, byte[] beginKeyBytes, byte[] endKeyBytes, @Nonnegative long writeBatchSize) throws RocksDBException {
        for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
            ReadOptions readOptions = new ReadOptions();
            Throwable throwable = null;
            try {
                RocksIteratorWrapper iteratorWrapper = RocksDBOperationUtils.getRocksIterator(db, columnFamilyHandle, readOptions);
                Throwable throwable2 = null;
                try {
                    RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeBatchSize);
                    Throwable throwable3 = null;
                    try {
                        byte[] currentKey;
                        iteratorWrapper.seek(beginKeyBytes);
                        while (iteratorWrapper.isValid() && RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(currentKey = iteratorWrapper.key(), endKeyBytes)) {
                            writeBatchWrapper.remove(columnFamilyHandle, currentKey);
                            iteratorWrapper.next();
                        }
                    }
                    catch (Throwable throwable4) {
                        throwable3 = throwable4;
                        throw throwable4;
                    }
                    finally {
                        if (writeBatchWrapper == null) continue;
                        if (throwable3 != null) {
                            try {
                                writeBatchWrapper.close();
                            }
                            catch (Throwable throwable5) {
                                throwable3.addSuppressed(throwable5);
                            }
                            continue;
                        }
                        writeBatchWrapper.close();
                    }
                }
                catch (Throwable throwable6) {
                    throwable2 = throwable6;
                    throw throwable6;
                }
                finally {
                    if (iteratorWrapper == null) continue;
                    if (throwable2 != null) {
                        try {
                            iteratorWrapper.close();
                        }
                        catch (Throwable throwable7) {
                            throwable2.addSuppressed(throwable7);
                        }
                        continue;
                    }
                    iteratorWrapper.close();
                }
            }
            catch (Throwable throwable8) {
                throwable = throwable8;
                throw throwable8;
            }
            finally {
                if (readOptions == null) continue;
                if (throwable != null) {
                    try {
                        readOptions.close();
                    }
                    catch (Throwable throwable9) {
                        throwable.addSuppressed(throwable9);
                    }
                    continue;
                }
                readOptions.close();
            }
        }
    }

    public static boolean beforeThePrefixBytes(@Nonnull byte[] bytes, @Nonnull byte[] prefixBytes) {
        int prefixLength = prefixBytes.length;
        for (int i = 0; i < prefixLength; ++i) {
            int r = (char)prefixBytes[i] - (char)bytes[i];
            if (r == 0) continue;
            return r > 0;
        }
        return false;
    }

    @Nullable
    public static KeyedStateHandle chooseTheBestStateHandleForInitial(@Nonnull Collection<KeyedStateHandle> restoreStateHandles, @Nonnull KeyGroupRange targetKeyGroupRange) {
        KeyedStateHandle bestStateHandle = null;
        Score bestScore = Score.MIN;
        for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
            Score handleScore = STATE_HANDLE_EVALUATOR.apply(rawStateHandle, targetKeyGroupRange);
            if (handleScore.compareTo(bestScore) <= 0) continue;
            bestStateHandle = rawStateHandle;
            bestScore = handleScore;
        }
        return bestStateHandle;
    }

    private static class Score
    implements Comparable<Score> {
        public static final Score MIN = new Score(Integer.MIN_VALUE, -1.0);
        private final int intersectGroupRange;
        private final double overlapFraction;

        public Score(int intersectGroupRange, double overlapFraction) {
            this.intersectGroupRange = intersectGroupRange;
            this.overlapFraction = overlapFraction;
        }

        public int getIntersectGroupRange() {
            return this.intersectGroupRange;
        }

        public double getOverlapFraction() {
            return this.overlapFraction;
        }

        @Override
        public int compareTo(@Nonnull Score other) {
            return Comparator.comparing(Score::getIntersectGroupRange).thenComparing(Score::getOverlapFraction).compare(this, other);
        }
    }
}

