/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.segment.local.upsert;

import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.AbstractMetrics;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.upsert.BasePartitionUpsertMetadataManager;
import org.apache.pinot.segment.local.upsert.PartialUpsertHandler;
import org.apache.pinot.segment.local.upsert.RecordInfo;
import org.apache.pinot.segment.local.upsert.UpsertUtils;
import org.apache.pinot.segment.local.utils.HashUtils;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.roaringbitmap.PeekableIntIterator;
import org.roaringbitmap.buffer.MutableRoaringBitmap;

@ThreadSafe
public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUpsertMetadataManager {
    @VisibleForTesting
    final ConcurrentHashMap<Object, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap();

    public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, int partitionId, List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn, HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot, double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
        super(tableNameWithType, partitionId, primaryKeyColumns, comparisonColumns, deleteRecordColumn, hashFunction, partialUpsertHandler, enableSnapshot, metadataTTL, tableIndexDir, serverMetrics);
    }

    @Override
    protected long getNumPrimaryKeys() {
        return this._primaryKeyToRecordLocationMap.size();
    }

    @Override
    protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator, @Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap validDocIdsForOldSegment) {
        String segmentName = segment.getSegmentName();
        segment.enableUpsert(this, validDocIds, queryableDocIds);
        AtomicInteger numKeysInWrongSegment = new AtomicInteger();
        while (recordInfoIterator.hasNext()) {
            RecordInfo recordInfo = recordInfoIterator.next();
            int newDocId = recordInfo.getDocId();
            Comparable newComparisonValue = recordInfo.getComparisonValue();
            this._primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), this._hashFunction), (primaryKey, currentRecordLocation) -> {
                if (currentRecordLocation != null) {
                    IndexSegment currentSegment = currentRecordLocation.getSegment();
                    int currentDocId = currentRecordLocation.getDocId();
                    int comparisonResult = newComparisonValue.compareTo(currentRecordLocation.getComparisonValue());
                    if (currentSegment == segment) {
                        if (comparisonResult >= 0) {
                            ConcurrentMapPartitionUpsertMetadataManager.replaceDocId(validDocIds, queryableDocIds, currentDocId, newDocId, recordInfo);
                            return new RecordLocation((IndexSegment)segment, newDocId, newComparisonValue);
                        }
                        return currentRecordLocation;
                    }
                    if (currentSegment == oldSegment) {
                        if (comparisonResult >= 0) {
                            ConcurrentMapPartitionUpsertMetadataManager.addDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
                            if (validDocIdsForOldSegment != null) {
                                validDocIdsForOldSegment.remove(currentDocId);
                            }
                            return new RecordLocation((IndexSegment)segment, newDocId, newComparisonValue);
                        }
                        return currentRecordLocation;
                    }
                    String currentSegmentName = currentSegment.getSegmentName();
                    if (currentSegmentName.equals(segmentName)) {
                        numKeysInWrongSegment.getAndIncrement();
                        if (comparisonResult >= 0) {
                            ConcurrentMapPartitionUpsertMetadataManager.addDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
                            return new RecordLocation((IndexSegment)segment, newDocId, newComparisonValue);
                        }
                        return currentRecordLocation;
                    }
                    if (comparisonResult > 0 || comparisonResult == 0 && LLCSegmentName.isLowLevelConsumerSegmentName((String)segmentName) && LLCSegmentName.isLowLevelConsumerSegmentName((String)currentSegmentName) && LLCSegmentName.getSequenceNumber((String)segmentName) > LLCSegmentName.getSequenceNumber((String)currentSegmentName)) {
                        ConcurrentMapPartitionUpsertMetadataManager.removeDocId(currentSegment, currentDocId);
                        ConcurrentMapPartitionUpsertMetadataManager.addDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
                        return new RecordLocation((IndexSegment)segment, newDocId, newComparisonValue);
                    }
                    return currentRecordLocation;
                }
                ConcurrentMapPartitionUpsertMetadataManager.addDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
                return new RecordLocation((IndexSegment)segment, newDocId, newComparisonValue);
            });
        }
        int numKeys = numKeysInWrongSegment.get();
        if (numKeys > 0) {
            this._logger.warn("Found {} primary keys in the wrong segment when adding segment: {}", (Object)numKeys, (Object)segmentName);
            this._serverMetrics.addMeteredTableValue(this._tableNameWithType, (AbstractMetrics.Meter)ServerMeter.UPSERT_KEYS_IN_WRONG_SEGMENT, (long)numKeys);
        }
    }

    @Override
    protected void addSegmentWithoutUpsert(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator) {
        segment.enableUpsert(this, validDocIds, queryableDocIds);
        while (recordInfoIterator.hasNext()) {
            RecordInfo recordInfo = recordInfoIterator.next();
            int newDocId = recordInfo.getDocId();
            Comparable newComparisonValue = recordInfo.getComparisonValue();
            ConcurrentMapPartitionUpsertMetadataManager.addDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
            this._primaryKeyToRecordLocationMap.put(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), this._hashFunction), new RecordLocation((IndexSegment)segment, newDocId, newComparisonValue));
        }
    }

    private static void replaceDocId(ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int oldDocId, int newDocId, RecordInfo recordInfo) {
        validDocIds.replace(oldDocId, newDocId);
        if (queryableDocIds != null) {
            if (recordInfo.isDeleteRecord()) {
                queryableDocIds.remove(oldDocId);
            } else {
                queryableDocIds.replace(oldDocId, newDocId);
            }
        }
    }

    private static void addDocId(ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int docId, RecordInfo recordInfo) {
        validDocIds.add(docId);
        if (queryableDocIds != null && !recordInfo.isDeleteRecord()) {
            queryableDocIds.add(docId);
        }
    }

    private static void removeDocId(IndexSegment segment, int docId) {
        Objects.requireNonNull(segment.getValidDocIds()).remove(docId);
        ThreadSafeMutableRoaringBitmap currentQueryableDocIds = segment.getQueryableDocIds();
        if (currentQueryableDocIds != null) {
            currentQueryableDocIds.remove(docId);
        }
    }

    @Override
    protected void removeSegment(IndexSegment segment, MutableRoaringBitmap validDocIds) {
        assert (!validDocIds.isEmpty());
        PrimaryKey primaryKey = new PrimaryKey(new Object[this._primaryKeyColumns.size()]);
        PeekableIntIterator iterator = validDocIds.getIntIterator();
        try (UpsertUtils.PrimaryKeyReader primaryKeyReader = new UpsertUtils.PrimaryKeyReader(segment, this._primaryKeyColumns);){
            while (iterator.hasNext()) {
                primaryKeyReader.getPrimaryKey(iterator.next(), primaryKey);
                this._primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey, this._hashFunction), (pk, recordLocation) -> {
                    if (recordLocation.getSegment() == segment) {
                        return null;
                    }
                    return recordLocation;
                });
            }
        }
        catch (Exception e) {
            throw new RuntimeException(String.format("Caught exception while removing segment: %s, table: %s", segment.getSegmentName(), this._tableNameWithType), e);
        }
    }

    @Override
    public void doRemoveExpiredPrimaryKeys() {
        double threshold = this._largestSeenComparisonValue - this._metadataTTL;
        this._primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> {
            if (((Number)((Object)recordLocation.getComparisonValue())).doubleValue() < threshold) {
                this._primaryKeyToRecordLocationMap.remove(primaryKey, recordLocation);
            }
        });
        this.persistWatermark(this._largestSeenComparisonValue);
    }

    @Override
    protected void doAddRecord(MutableSegment segment, RecordInfo recordInfo) {
        ThreadSafeMutableRoaringBitmap validDocIds = Objects.requireNonNull(segment.getValidDocIds());
        ThreadSafeMutableRoaringBitmap queryableDocIds = segment.getQueryableDocIds();
        int newDocId = recordInfo.getDocId();
        Comparable newComparisonValue = recordInfo.getComparisonValue();
        if (this._metadataTTL > 0.0) {
            double comparisonValue = ((Number)((Object)newComparisonValue)).doubleValue();
            this._largestSeenComparisonValue = Math.max(this._largestSeenComparisonValue, comparisonValue);
        }
        this._primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), this._hashFunction), (primaryKey, currentRecordLocation) -> {
            if (currentRecordLocation != null) {
                if (newComparisonValue.compareTo(currentRecordLocation.getComparisonValue()) >= 0) {
                    IndexSegment currentSegment = currentRecordLocation.getSegment();
                    int currentDocId = currentRecordLocation.getDocId();
                    if (segment == currentSegment) {
                        ConcurrentMapPartitionUpsertMetadataManager.replaceDocId(validDocIds, queryableDocIds, currentDocId, newDocId, recordInfo);
                    } else {
                        ConcurrentMapPartitionUpsertMetadataManager.removeDocId(currentSegment, currentDocId);
                        ConcurrentMapPartitionUpsertMetadataManager.addDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
                    }
                    return new RecordLocation((IndexSegment)segment, newDocId, newComparisonValue);
                }
                this.handleOutOfOrderEvent(currentRecordLocation.getComparisonValue(), recordInfo.getComparisonValue());
                return currentRecordLocation;
            }
            ConcurrentMapPartitionUpsertMetadataManager.addDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
            return new RecordLocation((IndexSegment)segment, newDocId, newComparisonValue);
        });
        this._serverMetrics.setValueOfPartitionGauge(this._tableNameWithType, this._partitionId, (AbstractMetrics.Gauge)ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, (long)this._primaryKeyToRecordLocationMap.size());
    }

    @Override
    protected GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo) {
        assert (this._partialUpsertHandler != null);
        this._primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), this._hashFunction), (pk, recordLocation) -> {
            if (!recordInfo.isDeleteRecord() && recordInfo.getComparisonValue().compareTo(recordLocation.getComparisonValue()) >= 0) {
                IndexSegment currentSegment = recordLocation.getSegment();
                ThreadSafeMutableRoaringBitmap currentQueryableDocIds = currentSegment.getQueryableDocIds();
                int currentDocId = recordLocation.getDocId();
                if (currentQueryableDocIds == null || currentQueryableDocIds.contains(currentDocId)) {
                    this._partialUpsertHandler.merge(currentSegment, currentDocId, record);
                }
            }
            return recordLocation;
        });
        return record;
    }

    @VisibleForTesting
    static class RecordLocation {
        private final IndexSegment _segment;
        private final int _docId;
        private final Comparable _comparisonValue;

        public RecordLocation(IndexSegment indexSegment, int docId, Comparable comparisonValue) {
            this._segment = indexSegment;
            this._docId = docId;
            this._comparisonValue = comparisonValue;
        }

        public IndexSegment getSegment() {
            return this._segment;
        }

        public int getDocId() {
            return this._docId;
        }

        public Comparable getComparisonValue() {
            return this._comparisonValue;
        }
    }
}

