/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.segment.local.upsert;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AtomicDouble;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.metrics.AbstractMetrics;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.UploadedRealtimeSegmentName;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader;
import org.apache.pinot.segment.local.upsert.PartialUpsertHandler;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.segment.local.upsert.RecordInfo;
import org.apache.pinot.segment.local.upsert.UpsertContext;
import org.apache.pinot.segment.local.upsert.UpsertUtils;
import org.apache.pinot.segment.local.upsert.UpsertViewManager;
import org.apache.pinot.segment.local.utils.HashUtils;
import org.apache.pinot.segment.local.utils.SegmentPreloadUtils;
import org.apache.pinot.segment.local.utils.WatermarkUtils;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.apache.pinot.spi.utils.BooleanUtils;
import org.roaringbitmap.PeekableIntIterator;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public abstract class BasePartitionUpsertMetadataManager
implements PartitionUpsertMetadataManager {
    protected static final long OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS = TimeUnit.MINUTES.toNanos(1L);
    protected static final double TTL_WATERMARK_NOT_SET = Double.NEGATIVE_INFINITY;
    protected final String _tableNameWithType;
    protected final int _partitionId;
    protected final UpsertContext _context;
    protected final List<String> _primaryKeyColumns;
    protected final List<String> _comparisonColumns;
    protected final String _deleteRecordColumn;
    protected final HashFunction _hashFunction;
    protected final PartialUpsertHandler _partialUpsertHandler;
    protected final boolean _enableSnapshot;
    protected final double _metadataTTL;
    protected final double _deletedKeysTTL;
    protected final File _tableIndexDir;
    protected final ServerMetrics _serverMetrics;
    protected final Logger _logger;
    protected final Set<IndexSegment> _trackedSegments = ConcurrentHashMap.newKeySet();
    protected final Set<IndexSegment> _updatedSegmentsSinceLastSnapshot = ConcurrentHashMap.newKeySet();
    protected volatile boolean _gotFirstConsumingSegment = false;
    protected final ReadWriteLock _snapshotLock;
    protected long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE;
    protected int _numOutOfOrderEvents = 0;
    protected final AtomicDouble _largestSeenComparisonValue;
    private boolean _stopped;
    private int _numPendingOperations = 1;
    private boolean _closed;
    private final Lock _preloadLock = new ReentrantLock();
    private volatile boolean _isPreloading;
    private final UpsertViewManager _upsertViewManager;
    private final Map<String, Long> _newlyAddedSegments = new ConcurrentHashMap<String, Long>();
    private final long _newSegmentTrackingTimeMs;

    protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId, UpsertContext context) {
        this._tableNameWithType = tableNameWithType;
        this._partitionId = partitionId;
        this._context = context;
        this._primaryKeyColumns = context.getPrimaryKeyColumns();
        this._comparisonColumns = context.getComparisonColumns();
        this._deleteRecordColumn = context.getDeleteRecordColumn();
        this._hashFunction = context.getHashFunction();
        this._partialUpsertHandler = context.getPartialUpsertHandler();
        this._enableSnapshot = context.isSnapshotEnabled();
        this._snapshotLock = this._enableSnapshot ? new ReentrantReadWriteLock() : null;
        this._isPreloading = context.isPreloadEnabled();
        this._metadataTTL = context.getMetadataTTL();
        this._deletedKeysTTL = context.getDeletedKeysTTL();
        this._tableIndexDir = context.getTableIndexDir();
        long trackingTimeMs = context.getNewSegmentTrackingTimeMs();
        UpsertConfig.ConsistencyMode cmode = context.getConsistencyMode();
        if (cmode == UpsertConfig.ConsistencyMode.SYNC || cmode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
            this._upsertViewManager = new UpsertViewManager(cmode, context);
            this._newSegmentTrackingTimeMs = trackingTimeMs > 0L ? trackingTimeMs : 10000L;
        } else {
            this._upsertViewManager = null;
            this._newSegmentTrackingTimeMs = trackingTimeMs;
        }
        this._serverMetrics = ServerMetrics.get();
        this._logger = LoggerFactory.getLogger((String)(tableNameWithType + "-" + partitionId + "-" + this.getClass().getSimpleName()));
        if (this.isTTLEnabled()) {
            Preconditions.checkState((this._comparisonColumns.size() == 1 ? 1 : 0) != 0, (Object)"Upsert TTL does not work with multiple comparison columns");
            Preconditions.checkState((this._metadataTTL <= 0.0 || this._enableSnapshot ? 1 : 0) != 0, (Object)"Upsert metadata TTL must have snapshot enabled");
            this._largestSeenComparisonValue = new AtomicDouble(WatermarkUtils.loadWatermark(this.getWatermarkFile(), Double.NEGATIVE_INFINITY));
        } else {
            this._largestSeenComparisonValue = new AtomicDouble(Double.NEGATIVE_INFINITY);
            WatermarkUtils.deleteWatermark(this.getWatermarkFile());
        }
    }

    @Override
    public List<String> getPrimaryKeyColumns() {
        return this._primaryKeyColumns;
    }

    @Nullable
    protected MutableRoaringBitmap getQueryableDocIds(IndexSegment segment, MutableRoaringBitmap validDocIds) {
        if (this._deleteRecordColumn == null) {
            return null;
        }
        MutableRoaringBitmap queryableDocIds = new MutableRoaringBitmap();
        try (PinotSegmentColumnReader deleteRecordColumnReader = new PinotSegmentColumnReader(segment, this._deleteRecordColumn);){
            PeekableIntIterator docIdIterator = validDocIds.getIntIterator();
            while (docIdIterator.hasNext()) {
                int docId = docIdIterator.next();
                if (BooleanUtils.toBoolean((Object)deleteRecordColumnReader.getValue(docId))) continue;
                queryableDocIds.add(docId);
            }
        }
        catch (IOException e) {
            this._logger.error("Failed to close column reader for delete record column: {} for segment: {} ", new Object[]{this._deleteRecordColumn, segment.getSegmentName(), e});
        }
        return queryableDocIds;
    }

    @Override
    public boolean isPreloading() {
        return this._isPreloading;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void preloadSegments(IndexLoadingConfig indexLoadingConfig) {
        if (!this._isPreloading) {
            return;
        }
        TableDataManager tableDataManager = this._context.getTableDataManager();
        Preconditions.checkNotNull((Object)tableDataManager, (Object)"Preloading segments requires tableDataManager");
        HelixManager helixManager = tableDataManager.getHelixManager();
        ExecutorService segmentPreloadExecutor = tableDataManager.getSegmentPreloadExecutor();
        this._preloadLock.lock();
        try {
            if (!this._isPreloading) {
                return;
            }
            long startTime = System.currentTimeMillis();
            this.doPreloadSegments(tableDataManager, indexLoadingConfig, helixManager, segmentPreloadExecutor);
            long duration = System.currentTimeMillis() - startTime;
            this._serverMetrics.addTimedTableValue(this._tableNameWithType, (AbstractMetrics.Timer)ServerTimer.UPSERT_PRELOAD_TIME_MS, duration, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            this._logger.warn("Failed to preload segments from partition: {} of table: {}, skipping", new Object[]{this._partitionId, this._tableNameWithType, e});
            this._serverMetrics.addMeteredTableValue(this._tableNameWithType, (AbstractMetrics.Meter)ServerMeter.UPSERT_PRELOAD_FAILURE, 1L);
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
        }
        finally {
            this._isPreloading = false;
            this._preloadLock.unlock();
        }
    }

    protected void doPreloadSegments(TableDataManager tableDataManager, IndexLoadingConfig indexLoadingConfig, HelixManager helixManager, ExecutorService segmentPreloadExecutor) throws Exception {
        TableConfig tableConfig = indexLoadingConfig.getTableConfig();
        SegmentPreloadUtils.preloadSegments(tableDataManager, this._partitionId, indexLoadingConfig, helixManager, segmentPreloadExecutor, (segmentName, segmentZKMetadata) -> {
            String tier = segmentZKMetadata.getTier();
            if (SegmentPreloadUtils.hasValidDocIdsSnapshot(tableDataManager, tableConfig, segmentName, tier)) {
                return true;
            }
            this._logger.info("Skip segment: {} on tier: {} as it has no validDocIds snapshot", segmentName, (Object)tier);
            return false;
        });
    }

    @Override
    public void addSegment(ImmutableSegment segment) {
        String segmentName = segment.getSegmentName();
        if (segment instanceof EmptyIndexSegment) {
            this._logger.info("Skip adding empty segment: {}", (Object)segmentName);
            return;
        }
        Preconditions.checkArgument((boolean)(segment instanceof ImmutableSegmentImpl), (String)"Got unsupported segment implementation: %s for segment: %s, table: %s", segment.getClass(), (Object)segmentName, (Object)this._tableNameWithType);
        if (!this.startOperation()) {
            this._logger.info("Skip adding segment: {} because metadata manager is already stopped", (Object)segment.getSegmentName());
            return;
        }
        if (this._enableSnapshot) {
            this._snapshotLock.readLock().lock();
        }
        try {
            this.doAddSegment((ImmutableSegmentImpl)segment);
            this._trackedSegments.add((IndexSegment)segment);
            if (this._enableSnapshot) {
                this._updatedSegmentsSinceLastSnapshot.add((IndexSegment)segment);
            }
        }
        finally {
            if (this._enableSnapshot) {
                this._snapshotLock.readLock().unlock();
            }
            this.finishOperation();
        }
    }

    protected boolean isTTLEnabled() {
        return this._metadataTTL > 0.0 || this._deletedKeysTTL > 0.0;
    }

    protected double getMaxComparisonValue(IndexSegment segment) {
        return ((Number)((Object)((ColumnMetadata)segment.getSegmentMetadata().getColumnMetadataMap().get(this._comparisonColumns.get(0))).getMaxValue())).doubleValue();
    }

    protected boolean isOutOfMetadataTTL(double maxComparisonValue) {
        return this._metadataTTL > 0.0 && this._largestSeenComparisonValue.get() != Double.NEGATIVE_INFINITY && maxComparisonValue < this._largestSeenComparisonValue.get() - this._metadataTTL;
    }

    protected boolean isOutOfMetadataTTL(IndexSegment segment) {
        if (this._metadataTTL > 0.0) {
            double maxComparisonValue = this.getMaxComparisonValue(segment);
            return this.isOutOfMetadataTTL(maxComparisonValue);
        }
        return false;
    }

    protected boolean skipAddSegmentOutOfTTL(ImmutableSegmentImpl segment) {
        String segmentName = segment.getSegmentName();
        this._logger.info("Skip adding segment: {} because it's out of TTL", (Object)segmentName);
        MutableRoaringBitmap validDocIdsSnapshot = segment.loadValidDocIdsFromSnapshot();
        if (validDocIdsSnapshot != null) {
            MutableRoaringBitmap queryableDocIds = this.getQueryableDocIds((IndexSegment)segment, validDocIdsSnapshot);
            segment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(validDocIdsSnapshot), queryableDocIds != null ? new ThreadSafeMutableRoaringBitmap(queryableDocIds) : null);
        } else {
            this._logger.warn("Failed to find validDocIds snapshot to add segment: {} out of TTL, treating all docs as valid", (Object)segmentName);
        }
        return true;
    }

    protected boolean skipPreloadSegmentOutOfTTL(ImmutableSegmentImpl segment, MutableRoaringBitmap validDocIdsSnapshot) {
        String segmentName = segment.getSegmentName();
        this._logger.info("Skip preloading segment: {} because it's out of TTL", (Object)segmentName);
        MutableRoaringBitmap queryableDocIds = this.getQueryableDocIds((IndexSegment)segment, validDocIdsSnapshot);
        segment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(validDocIdsSnapshot), queryableDocIds != null ? new ThreadSafeMutableRoaringBitmap(queryableDocIds) : null);
        return true;
    }

    protected void doAddSegment(ImmutableSegmentImpl segment) {
        String segmentName = segment.getSegmentName();
        this._logger.info("Adding segment: {}, current primary key count: {}", (Object)segmentName, (Object)this.getNumPrimaryKeys());
        if (this.isTTLEnabled()) {
            double maxComparisonValue = this.getMaxComparisonValue((IndexSegment)segment);
            this._largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, maxComparisonValue));
            if (this.isOutOfMetadataTTL(maxComparisonValue) && this.skipAddSegmentOutOfTTL(segment)) {
                return;
            }
        }
        long startTimeMs = System.currentTimeMillis();
        if (!this._enableSnapshot) {
            segment.deleteValidDocIdsSnapshot();
        }
        try (UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader((IndexSegment)segment, this._primaryKeyColumns, this._comparisonColumns, this._deleteRecordColumn);){
            Iterator<RecordInfo> recordInfoIterator = UpsertUtils.getRecordInfoIterator(recordInfoReader, segment.getSegmentMetadata().getTotalDocs());
            this.addSegment(segment, null, null, recordInfoIterator);
        }
        catch (Exception e) {
            throw new RuntimeException(String.format("Caught exception while adding segment: %s, table: %s", segmentName, this._tableNameWithType), e);
        }
        long numPrimaryKeys = this.getNumPrimaryKeys();
        this.updatePrimaryKeyGauge(numPrimaryKeys);
        this._logger.info("Finished adding segment: {} in {}ms, current primary key count: {}", new Object[]{segmentName, System.currentTimeMillis() - startTimeMs, numPrimaryKeys});
    }

    protected abstract long getNumPrimaryKeys();

    protected void updatePrimaryKeyGauge(long numPrimaryKeys) {
        this._serverMetrics.setValueOfPartitionGauge(this._tableNameWithType, this._partitionId, (AbstractMetrics.Gauge)ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, numPrimaryKeys);
    }

    protected void updatePrimaryKeyGauge() {
        this.updatePrimaryKeyGauge(this.getNumPrimaryKeys());
    }

    @Override
    public void preloadSegment(ImmutableSegment segment) {
        String segmentName = segment.getSegmentName();
        Preconditions.checkArgument((boolean)this._enableSnapshot, (String)"Snapshot must be enabled to preload segment: %s, table: %s", (Object)segmentName, (Object)this._tableNameWithType);
        Preconditions.checkArgument((boolean)(segment instanceof ImmutableSegmentImpl), (String)"Got unsupported segment implementation: %s for segment: %s, table: %s", segment.getClass(), (Object)segmentName, (Object)this._tableNameWithType);
        if (!this.startOperation()) {
            this._logger.info("Skip preloading segment: {} because metadata manager is already stopped", (Object)segmentName);
            return;
        }
        this._snapshotLock.readLock().lock();
        try {
            this.doPreloadSegment((ImmutableSegmentImpl)segment);
            this._trackedSegments.add((IndexSegment)segment);
            this._updatedSegmentsSinceLastSnapshot.add((IndexSegment)segment);
        }
        finally {
            this._snapshotLock.readLock().unlock();
            this.finishOperation();
        }
    }

    protected void doPreloadSegment(ImmutableSegmentImpl segment) {
        String segmentName = segment.getSegmentName();
        this._logger.info("Preloading segment: {}, current primary key count: {}", (Object)segmentName, (Object)this.getNumPrimaryKeys());
        long startTimeMs = System.currentTimeMillis();
        MutableRoaringBitmap validDocIds = segment.loadValidDocIdsFromSnapshot();
        Preconditions.checkState((validDocIds != null ? 1 : 0) != 0, (String)"Snapshot of validDocIds is required to preload segment: %s, table: %s", (Object)segmentName, (Object)this._tableNameWithType);
        if (validDocIds.isEmpty()) {
            this._logger.info("Skip preloading segment: {} without valid doc, current primary key count: {}", (Object)segment.getSegmentName(), (Object)this.getNumPrimaryKeys());
            segment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(), null);
            return;
        }
        if (this.isTTLEnabled()) {
            double maxComparisonValue = this.getMaxComparisonValue((IndexSegment)segment);
            this._largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, maxComparisonValue));
            if (this.isOutOfMetadataTTL(maxComparisonValue) && this.skipPreloadSegmentOutOfTTL(segment, validDocIds)) {
                return;
            }
        }
        try (UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader((IndexSegment)segment, this._primaryKeyColumns, this._comparisonColumns, this._deleteRecordColumn);){
            this.doPreloadSegment(segment, null, null, UpsertUtils.getRecordInfoIterator(recordInfoReader, validDocIds));
        }
        catch (Exception e) {
            throw new RuntimeException(String.format("Caught exception while preloading segment: %s, table: %s", segmentName, this._tableNameWithType), e);
        }
        long numPrimaryKeys = this.getNumPrimaryKeys();
        this.updatePrimaryKeyGauge(numPrimaryKeys);
        this._logger.info("Finished preloading segment: {} in {}ms, current primary key count: {}", new Object[]{segmentName, System.currentTimeMillis() - startTimeMs, numPrimaryKeys});
    }

    @VisibleForTesting
    void doPreloadSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator) {
        if (validDocIds == null) {
            validDocIds = new ThreadSafeMutableRoaringBitmap();
        }
        if (queryableDocIds == null && this._deleteRecordColumn != null) {
            queryableDocIds = new ThreadSafeMutableRoaringBitmap();
        }
        this.addSegmentWithoutUpsert(segment, validDocIds, queryableDocIds, recordInfoIterator);
    }

    @VisibleForTesting
    public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator) {
        if (validDocIds == null) {
            validDocIds = new ThreadSafeMutableRoaringBitmap();
        }
        if (queryableDocIds == null && this._deleteRecordColumn != null) {
            queryableDocIds = new ThreadSafeMutableRoaringBitmap();
        }
        this.addOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, null, null);
    }

    protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator, @Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap validDocIdsForOldSegment) {
        if (this._partialUpsertHandler != null) {
            recordInfoIterator = BasePartitionUpsertMetadataManager.resolveComparisonTies(recordInfoIterator, this._hashFunction);
        }
        this.doAddOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, oldSegment, validDocIdsForOldSegment);
    }

    protected abstract void doAddOrReplaceSegment(ImmutableSegmentImpl var1, ThreadSafeMutableRoaringBitmap var2, @Nullable ThreadSafeMutableRoaringBitmap var3, Iterator<RecordInfo> var4, @Nullable IndexSegment var5, @Nullable MutableRoaringBitmap var6);

    protected void addSegmentWithoutUpsert(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator) {
        this.addOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, null, null);
    }

    protected boolean shouldReplaceOnComparisonTie(String segmentName, String currentSegmentName, long segmentCreationTimeMs, long currentSegmentCreationTimeMs) {
        LLCSegmentName llcSegmentName = LLCSegmentName.of((String)segmentName);
        LLCSegmentName currentLLCSegmentName = LLCSegmentName.of((String)currentSegmentName);
        if (llcSegmentName != null && currentLLCSegmentName != null) {
            return llcSegmentName.getSequenceNumber() > currentLLCSegmentName.getSequenceNumber();
        }
        int creationTimeComparisonRes = Long.compare(segmentCreationTimeMs, currentSegmentCreationTimeMs);
        if (creationTimeComparisonRes != 0) {
            return creationTimeComparisonRes > 0;
        }
        if (UploadedRealtimeSegmentName.of((String)currentSegmentName) != null) {
            return false;
        }
        return UploadedRealtimeSegmentName.of((String)segmentName) != null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean addRecord(MutableSegment segment, RecordInfo recordInfo) {
        this._gotFirstConsumingSegment = true;
        if (!this.startOperation()) {
            this._logger.debug("Skip adding record to segment: {} because metadata manager is already stopped", (Object)segment.getSegmentName());
            return false;
        }
        try {
            boolean addRecord = this.doAddRecord(segment, recordInfo);
            this._trackedSegments.add((IndexSegment)segment);
            boolean bl = addRecord;
            return bl;
        }
        finally {
            this.finishOperation();
        }
    }

    protected abstract boolean doAddRecord(MutableSegment var1, RecordInfo var2);

    @Override
    public void replaceSegment(ImmutableSegment segment, IndexSegment oldSegment) {
        if (!this.startOperation()) {
            this._logger.info("Skip replacing segment: {} because metadata manager is already stopped", (Object)segment.getSegmentName());
            return;
        }
        if (this._enableSnapshot) {
            this._snapshotLock.readLock().lock();
        }
        try {
            this.doReplaceSegment(segment, oldSegment);
            if (!(segment instanceof EmptyIndexSegment)) {
                this._trackedSegments.add((IndexSegment)segment);
                if (this._enableSnapshot) {
                    this._updatedSegmentsSinceLastSnapshot.add((IndexSegment)segment);
                }
            }
            this._trackedSegments.remove(oldSegment);
        }
        finally {
            if (this._enableSnapshot) {
                this._snapshotLock.readLock().unlock();
            }
            this.finishOperation();
        }
    }

    protected void doReplaceSegment(ImmutableSegment segment, IndexSegment oldSegment) {
        String segmentName = segment.getSegmentName();
        Preconditions.checkArgument((boolean)segmentName.equals(oldSegment.getSegmentName()), (String)"Cannot replace segment with different name for table: %s, old segment: %s, new segment: %s", (Object)this._tableNameWithType, (Object)oldSegment.getSegmentName(), (Object)segmentName);
        this._logger.info("Replacing {} segment: {}, current primary key count: {}", new Object[]{oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, this.getNumPrimaryKeys()});
        long startTimeMs = System.currentTimeMillis();
        if (segment instanceof EmptyIndexSegment) {
            this._logger.info("Skip adding empty segment: {}", (Object)segmentName);
            this.replaceSegment(segment, null, null, null, oldSegment);
            return;
        }
        if (this.isTTLEnabled()) {
            double maxComparisonValue = this.getMaxComparisonValue((IndexSegment)segment);
            this._largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, maxComparisonValue));
        }
        try (UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader((IndexSegment)segment, this._primaryKeyColumns, this._comparisonColumns, this._deleteRecordColumn);){
            Iterator<RecordInfo> recordInfoIterator = UpsertUtils.getRecordInfoIterator(recordInfoReader, segment.getSegmentMetadata().getTotalDocs());
            this.replaceSegment(segment, null, null, recordInfoIterator, oldSegment);
        }
        catch (Exception e) {
            throw new RuntimeException(String.format("Caught exception while replacing segment: %s, table: %s", segmentName, this._tableNameWithType), e);
        }
        long numPrimaryKeys = this.getNumPrimaryKeys();
        this.updatePrimaryKeyGauge(numPrimaryKeys);
        this._logger.info("Finished replacing segment: {} in {}ms, current primary key count: {}", new Object[]{segmentName, System.currentTimeMillis() - startTimeMs, numPrimaryKeys});
    }

    @VisibleForTesting
    public void replaceSegment(ImmutableSegment segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, @Nullable Iterator<RecordInfo> recordInfoIterator, IndexSegment oldSegment) {
        String segmentName = segment.getSegmentName();
        MutableRoaringBitmap validDocIdsForOldSegment = null;
        if (this._upsertViewManager == null) {
            validDocIdsForOldSegment = this.getValidDocIdsForOldSegment(oldSegment);
        }
        if (recordInfoIterator != null) {
            Preconditions.checkArgument((boolean)(segment instanceof ImmutableSegmentImpl), (String)"Got unsupported segment implementation: %s for segment: %s, table: %s", segment.getClass(), (Object)segmentName, (Object)this._tableNameWithType);
            if (validDocIds == null) {
                validDocIds = new ThreadSafeMutableRoaringBitmap();
            }
            if (queryableDocIds == null && this._deleteRecordColumn != null) {
                queryableDocIds = new ThreadSafeMutableRoaringBitmap();
            }
            this.addOrReplaceSegment((ImmutableSegmentImpl)segment, validDocIds, queryableDocIds, recordInfoIterator, oldSegment, validDocIdsForOldSegment);
        }
        if (this._upsertViewManager != null) {
            validDocIdsForOldSegment = this.getValidDocIdsForOldSegment(oldSegment);
        }
        if (validDocIdsForOldSegment != null && !validDocIdsForOldSegment.isEmpty()) {
            int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
            if (this._partialUpsertHandler != null) {
                this._logger.warn("Found {} primary keys not replaced when replacing segment: {} for partial-upsert table. This can potentially cause inconsistency between replicas", (Object)numKeysNotReplaced, (Object)segmentName);
                this._serverMetrics.addMeteredTableValue(this._tableNameWithType, (AbstractMetrics.Meter)ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED, (long)numKeysNotReplaced);
            } else {
                this._logger.info("Found {} primary keys not replaced when replacing segment: {}", (Object)numKeysNotReplaced, (Object)segmentName);
            }
            this.removeSegment(oldSegment, validDocIdsForOldSegment);
        }
    }

    private MutableRoaringBitmap getValidDocIdsForOldSegment(IndexSegment oldSegment) {
        return oldSegment.getValidDocIds() != null ? oldSegment.getValidDocIds().getMutableRoaringBitmap() : null;
    }

    protected void removeSegment(IndexSegment segment, MutableRoaringBitmap validDocIds) {
        try (PrimaryKeyReader primaryKeyReader = new PrimaryKeyReader(segment, this._primaryKeyColumns);){
            this.removeSegment(segment, UpsertUtils.getPrimaryKeyIterator(primaryKeyReader, validDocIds));
        }
        catch (Exception e) {
            throw new RuntimeException(String.format("Caught exception while removing segment: %s, table: %s", segment.getSegmentName(), this._tableNameWithType), e);
        }
    }

    protected void removeSegment(IndexSegment segment, Iterator<PrimaryKey> primaryKeyIterator) {
        throw new UnsupportedOperationException("Both removeSegment(segment, validDocID) and removeSegment(segment, pkIterator) are not implemented. Implement one of them to support removal.");
    }

    @Override
    public void removeSegment(IndexSegment segment) {
        String segmentName = segment.getSegmentName();
        if (!this._trackedSegments.contains(segment)) {
            this._logger.info("Skip removing untracked (replaced or empty) segment: {}", (Object)segmentName);
            return;
        }
        if (!this.startOperation()) {
            this._logger.info("Skip removing segment: {} because metadata manager is already stopped", (Object)segmentName);
            return;
        }
        if (this._enableSnapshot) {
            this._snapshotLock.readLock().lock();
        }
        try {
            if (this.isOutOfMetadataTTL(segment)) {
                this._logger.info("Skip removing segment: {} because it's out of TTL", (Object)segmentName);
            } else {
                this.doRemoveSegment(segment);
            }
            this._trackedSegments.remove(segment);
        }
        finally {
            if (this._enableSnapshot) {
                this._snapshotLock.readLock().unlock();
            }
            this.finishOperation();
        }
    }

    protected void doRemoveSegment(IndexSegment segment) {
        MutableRoaringBitmap validDocIds;
        String segmentName = segment.getSegmentName();
        this._logger.info("Removing {} segment: {}, current primary key count: {}", new Object[]{segment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, this.getNumPrimaryKeys()});
        long startTimeMs = System.currentTimeMillis();
        MutableRoaringBitmap mutableRoaringBitmap = validDocIds = segment.getValidDocIds() != null ? segment.getValidDocIds().getMutableRoaringBitmap() : null;
        if (validDocIds == null || validDocIds.isEmpty()) {
            this._logger.info("Skip removing segment without valid docs: {}", (Object)segmentName);
            return;
        }
        this._logger.info("Removing {} primary keys for segment: {}", (Object)validDocIds.getCardinality(), (Object)segmentName);
        this.removeSegment(segment, validDocIds);
        long numPrimaryKeys = this.getNumPrimaryKeys();
        this.updatePrimaryKeyGauge(numPrimaryKeys);
        this._logger.info("Finished removing segment: {} in {}ms, current primary key count: {}", new Object[]{segmentName, System.currentTimeMillis() - startTimeMs, numPrimaryKeys});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public GenericRow updateRecord(GenericRow record, RecordInfo recordInfo) {
        if (this._partialUpsertHandler == null) {
            return record;
        }
        if (!this.startOperation()) {
            this._logger.debug("Skip updating record because metadata manager is already stopped");
            return record;
        }
        try {
            GenericRow genericRow = this.doUpdateRecord(record, recordInfo);
            return genericRow;
        }
        finally {
            this.finishOperation();
        }
    }

    protected abstract GenericRow doUpdateRecord(GenericRow var1, RecordInfo var2);

    protected void handleOutOfOrderEvent(Object currentComparisonValue, Object recordComparisonValue) {
        boolean isPartialUpsertTable = this._partialUpsertHandler != null;
        this._serverMetrics.addMeteredTableValue(this._tableNameWithType, (AbstractMetrics.Meter)(isPartialUpsertTable ? ServerMeter.PARTIAL_UPSERT_OUT_OF_ORDER : ServerMeter.UPSERT_OUT_OF_ORDER), 1L);
        ++this._numOutOfOrderEvents;
        long currentTimeNs = System.nanoTime();
        if (currentTimeNs - this._lastOutOfOrderEventReportTimeNs > OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS) {
            this._logger.warn("Skipped {} out-of-order events for {} upsert table {} (the last event has current comparison value: {}, record comparison value: {})", new Object[]{this._numOutOfOrderEvents, isPartialUpsertTable ? "partial" : "full", this._tableNameWithType, currentComparisonValue, recordComparisonValue});
            this._lastOutOfOrderEventReportTimeNs = currentTimeNs;
            this._numOutOfOrderEvents = 0;
        }
    }

    protected static Iterator<RecordInfo> resolveComparisonTies(Iterator<RecordInfo> recordInfoIterator, HashFunction hashFunction) {
        HashMap<Object, RecordInfo> deDuplicatedRecordInfo = new HashMap<Object, RecordInfo>();
        while (recordInfoIterator.hasNext()) {
            RecordInfo recordInfo = recordInfoIterator.next();
            Comparable newComparisonValue = recordInfo.getComparisonValue();
            deDuplicatedRecordInfo.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), hashFunction), (key, maxComparisonValueRecordInfo) -> {
                if (maxComparisonValueRecordInfo == null) {
                    return recordInfo;
                }
                int comparisonResult = newComparisonValue.compareTo(maxComparisonValueRecordInfo.getComparisonValue());
                if (comparisonResult >= 0) {
                    return recordInfo;
                }
                return maxComparisonValueRecordInfo;
            });
        }
        return deDuplicatedRecordInfo.values().iterator();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void takeSnapshot() {
        if (!this._enableSnapshot) {
            return;
        }
        if (this._partialUpsertHandler == null && !this._gotFirstConsumingSegment) {
            this._logger.info("Skip taking snapshot before getting the first consuming segment for full-upsert table");
            return;
        }
        if (!this.startOperation()) {
            this._logger.info("Skip taking snapshot because metadata manager is already stopped");
            return;
        }
        this._snapshotLock.writeLock().lock();
        try {
            long startTime = System.currentTimeMillis();
            this.doTakeSnapshot();
            long duration = System.currentTimeMillis() - startTime;
            this._serverMetrics.addTimedTableValue(this._tableNameWithType, (AbstractMetrics.Timer)ServerTimer.UPSERT_SNAPSHOT_TIME_MS, duration, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            this._logger.warn("Caught exception while taking snapshot", (Throwable)e);
        }
        finally {
            this._snapshotLock.writeLock().unlock();
            this.finishOperation();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void doTakeSnapshot() {
        boolean locked;
        Lock segmentLock;
        String segmentName;
        int numTrackedSegments = this._trackedSegments.size();
        long numPrimaryKeysInSnapshot = 0L;
        this._logger.info("Taking snapshot for {} segments", (Object)numTrackedSegments);
        long startTimeMs = System.currentTimeMillis();
        int numImmutableSegments = 0;
        int numConsumingSegments = 0;
        int numUnchangedSegments = 0;
        HashSet<ImmutableSegmentImpl> segmentsWithoutSnapshot = new HashSet<ImmutableSegmentImpl>();
        TableDataManager tableDataManager = this._context.getTableDataManager();
        boolean isSegmentSkipped = false;
        for (IndexSegment indexSegment : this._trackedSegments) {
            if (!(indexSegment instanceof ImmutableSegmentImpl)) {
                ++numConsumingSegments;
                continue;
            }
            if (!this._updatedSegmentsSinceLastSnapshot.contains(indexSegment)) {
                ++numUnchangedSegments;
                continue;
            }
            segmentName = indexSegment.getSegmentName();
            segmentLock = tableDataManager.getSegmentLock(segmentName);
            locked = segmentLock.tryLock();
            if (!locked) {
                this._logger.warn("Could not get segmentLock to take snapshot for segment: {}, skipping", (Object)segmentName);
                isSegmentSkipped = true;
                continue;
            }
            try {
                ImmutableSegmentImpl immutableSegment = (ImmutableSegmentImpl)indexSegment;
                if (!immutableSegment.hasValidDocIdsSnapshotFile()) {
                    segmentsWithoutSnapshot.add(immutableSegment);
                    continue;
                }
                immutableSegment.persistValidDocIdsSnapshot();
                this._updatedSegmentsSinceLastSnapshot.remove(indexSegment);
                ++numImmutableSegments;
                numPrimaryKeysInSnapshot += (long)immutableSegment.getValidDocIds().getMutableRoaringBitmap().getCardinality();
            }
            catch (Exception e) {
                this._logger.warn("Caught exception while taking snapshot for segment: {}, skipping", (Object)segmentName, (Object)e);
                isSegmentSkipped = true;
            }
            finally {
                segmentLock.unlock();
            }
        }
        if (!isSegmentSkipped) {
            for (ImmutableSegmentImpl immutableSegmentImpl : segmentsWithoutSnapshot) {
                segmentName = immutableSegmentImpl.getSegmentName();
                segmentLock = tableDataManager.getSegmentLock(segmentName);
                locked = segmentLock.tryLock();
                if (!locked) {
                    this._logger.warn("Could not get segmentLock to take snapshot for segment: {} w/o snapshot, skipping", (Object)segmentName);
                    continue;
                }
                try {
                    immutableSegmentImpl.persistValidDocIdsSnapshot();
                    this._updatedSegmentsSinceLastSnapshot.remove(immutableSegmentImpl);
                    ++numImmutableSegments;
                    numPrimaryKeysInSnapshot += (long)immutableSegmentImpl.getValidDocIds().getMutableRoaringBitmap().getCardinality();
                }
                catch (Exception e) {
                    this._logger.warn("Caught exception while taking snapshot for segment: {} w/o snapshot, skipping", (Object)segmentName, (Object)e);
                }
                finally {
                    segmentLock.unlock();
                }
            }
        }
        this._updatedSegmentsSinceLastSnapshot.retainAll(this._trackedSegments);
        if (this.isTTLEnabled()) {
            WatermarkUtils.persistWatermark(this._largestSeenComparisonValue.get(), this.getWatermarkFile());
        }
        this._serverMetrics.setValueOfPartitionGauge(this._tableNameWithType, this._partitionId, (AbstractMetrics.Gauge)ServerGauge.UPSERT_VALID_DOC_ID_SNAPSHOT_COUNT, (long)numImmutableSegments);
        this._serverMetrics.setValueOfPartitionGauge(this._tableNameWithType, this._partitionId, (AbstractMetrics.Gauge)ServerGauge.UPSERT_PRIMARY_KEYS_IN_SNAPSHOT_COUNT, numPrimaryKeysInSnapshot);
        int numMissedSegments = numTrackedSegments - numImmutableSegments - numConsumingSegments - numUnchangedSegments;
        if (numMissedSegments > 0) {
            this._serverMetrics.addMeteredTableValue(this._tableNameWithType, String.valueOf(this._partitionId), (AbstractMetrics.Meter)ServerMeter.UPSERT_MISSED_VALID_DOC_ID_SNAPSHOT_COUNT, (long)numMissedSegments);
            this._logger.warn("Missed taking snapshot for {} immutable segments", (Object)numMissedSegments);
        }
        this._logger.info("Finished taking snapshot for {} immutable segments with {} primary keys (out of {} total segments, {} are consuming segments) in {} ms", new Object[]{numImmutableSegments, numPrimaryKeysInSnapshot, numTrackedSegments, numConsumingSegments, System.currentTimeMillis() - startTimeMs});
    }

    protected File getWatermarkFile() {
        return new File(this._tableIndexDir, "ttl.watermark.partition." + this._partitionId);
    }

    @VisibleForTesting
    double getWatermark() {
        return this._largestSeenComparisonValue.get();
    }

    @VisibleForTesting
    void setWatermark(double watermark) {
        this._largestSeenComparisonValue.set(watermark);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void removeExpiredPrimaryKeys() {
        if (!this.isTTLEnabled()) {
            return;
        }
        if (!this.startOperation()) {
            this._logger.info("Skip removing expired primary keys because metadata manager is already stopped");
            return;
        }
        try {
            long startTime = System.currentTimeMillis();
            this.doRemoveExpiredPrimaryKeys();
            long duration = System.currentTimeMillis() - startTime;
            this._serverMetrics.addTimedTableValue(this._tableNameWithType, (AbstractMetrics.Timer)ServerTimer.UPSERT_REMOVE_EXPIRED_PRIMARY_KEYS_TIME_MS, duration, TimeUnit.MILLISECONDS);
        }
        finally {
            this.finishOperation();
        }
    }

    protected abstract void doRemoveExpiredPrimaryKeys();

    protected synchronized boolean startOperation() {
        if (this._stopped || this._numPendingOperations == 0) {
            return false;
        }
        ++this._numPendingOperations;
        return true;
    }

    protected synchronized void finishOperation() {
        --this._numPendingOperations;
        if (this._numPendingOperations == 0) {
            this.notifyAll();
        }
    }

    @Override
    public synchronized void stop() {
        if (this._stopped) {
            this._logger.warn("Metadata manager is already stopped");
            return;
        }
        this._stopped = true;
        --this._numPendingOperations;
        this._logger.info("Stopped the metadata manager with {} pending operations, current primary key count: {}", (Object)this._numPendingOperations, (Object)this.getNumPrimaryKeys());
    }

    @Override
    public synchronized void close() throws IOException {
        Preconditions.checkState((boolean)this._stopped, (Object)"Must stop the metadata manager before closing it");
        if (this._closed) {
            this._logger.warn("Metadata manager is already closed");
            return;
        }
        this._closed = true;
        this._logger.info("Closing the metadata manager");
        while (this._numPendingOperations != 0) {
            this._logger.info("Waiting for {} pending operations to finish", (Object)this._numPendingOperations);
            try {
                this.wait();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(String.format("Interrupted while waiting for %d pending operations to finish", this._numPendingOperations), e);
            }
        }
        this.doClose();
        this.updatePrimaryKeyGauge(0L);
        this._logger.info("Closed the metadata manager");
    }

    protected void replaceDocId(IndexSegment newSegment, ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, IndexSegment oldSegment, int oldDocId, int newDocId, RecordInfo recordInfo) {
        if (this._upsertViewManager == null) {
            UpsertUtils.doRemoveDocId(oldSegment, oldDocId);
            UpsertUtils.doAddDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
        } else {
            this._upsertViewManager.replaceDocId(newSegment, validDocIds, queryableDocIds, oldSegment, oldDocId, newDocId, recordInfo);
        }
        this.trackUpdatedSegmentsSinceLastSnapshot(oldSegment);
    }

    protected void replaceDocId(IndexSegment segment, ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int oldDocId, int newDocId, RecordInfo recordInfo) {
        if (this._upsertViewManager == null) {
            UpsertUtils.doReplaceDocId(validDocIds, queryableDocIds, oldDocId, newDocId, recordInfo);
        } else {
            this._upsertViewManager.replaceDocId(segment, validDocIds, queryableDocIds, oldDocId, newDocId, recordInfo);
        }
    }

    protected void addDocId(IndexSegment segment, ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int docId, RecordInfo recordInfo) {
        if (this._upsertViewManager == null) {
            UpsertUtils.doAddDocId(validDocIds, queryableDocIds, docId, recordInfo);
        } else {
            this._upsertViewManager.addDocId(segment, validDocIds, queryableDocIds, docId, recordInfo);
        }
    }

    protected void removeDocId(IndexSegment segment, int docId) {
        if (this._upsertViewManager == null) {
            UpsertUtils.doRemoveDocId(segment, docId);
        } else {
            this._upsertViewManager.removeDocId(segment, docId);
        }
        this.trackUpdatedSegmentsSinceLastSnapshot(segment);
    }

    private void trackUpdatedSegmentsSinceLastSnapshot(IndexSegment segment) {
        if (this._enableSnapshot && segment instanceof ImmutableSegment) {
            this._snapshotLock.readLock().lock();
            try {
                this._updatedSegmentsSinceLastSnapshot.add(segment);
            }
            finally {
                this._snapshotLock.readLock().unlock();
            }
        }
    }

    protected void doClose() throws IOException {
    }

    public UpsertViewManager getUpsertViewManager() {
        return this._upsertViewManager;
    }

    @Override
    public void trackSegmentForUpsertView(IndexSegment segment) {
        if (this._upsertViewManager != null) {
            this._upsertViewManager.trackSegment(segment);
        }
    }

    @Override
    public void untrackSegmentForUpsertView(IndexSegment segment) {
        if (this._upsertViewManager != null) {
            this._upsertViewManager.untrackSegment(segment);
        }
    }

    @Override
    public void trackNewlyAddedSegment(String segmentName) {
        if (this._newSegmentTrackingTimeMs > 0L) {
            this._newlyAddedSegments.put(segmentName, System.currentTimeMillis() + this._newSegmentTrackingTimeMs);
            if (this._logger.isDebugEnabled()) {
                this._logger.debug("Tracked newly added segments: {}", this._newlyAddedSegments);
            }
        }
    }

    public Set<String> getNewlyAddedSegments() {
        if (this._newSegmentTrackingTimeMs > 0L) {
            long nowMs = System.currentTimeMillis();
            this._newlyAddedSegments.values().removeIf(v -> v < nowMs);
            return this._newlyAddedSegments.keySet();
        }
        return Collections.emptySet();
    }
}

