/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.segment.local.upsert;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AtomicDouble;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.AbstractMetrics;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.SegmentUtils;
import org.apache.pinot.common.utils.UploadedRealtimeSegmentName;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.segment.local.upsert.PartialUpsertHandler;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.segment.local.upsert.RecordInfo;
import org.apache.pinot.segment.local.upsert.UpsertContext;
import org.apache.pinot.segment.local.upsert.UpsertUtils;
import org.apache.pinot.segment.local.utils.HashUtils;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.segment.spi.SegmentContext;
import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.apache.pinot.spi.utils.BooleanUtils;
import org.roaringbitmap.PeekableIntIterator;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public abstract class BasePartitionUpsertMetadataManager
implements PartitionUpsertMetadataManager {
    protected static final long OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS = TimeUnit.MINUTES.toNanos(1L);
    protected final String _tableNameWithType;
    protected final int _partitionId;
    protected final UpsertContext _context;
    protected final List<String> _primaryKeyColumns;
    protected final List<String> _comparisonColumns;
    protected final String _deleteRecordColumn;
    protected final HashFunction _hashFunction;
    protected final PartialUpsertHandler _partialUpsertHandler;
    protected final boolean _enableSnapshot;
    protected final double _metadataTTL;
    protected final double _deletedKeysTTL;
    protected final File _tableIndexDir;
    protected final ServerMetrics _serverMetrics;
    protected final Logger _logger;
    protected final Set<IndexSegment> _trackedSegments = ConcurrentHashMap.newKeySet();
    protected final Set<ImmutableSegment> _updatedSegmentsSinceLastSnapshot = ConcurrentHashMap.newKeySet();
    protected volatile boolean _gotFirstConsumingSegment = false;
    protected final ReadWriteLock _snapshotLock;
    protected long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE;
    protected int _numOutOfOrderEvents = 0;
    protected final AtomicDouble _largestSeenComparisonValue;
    private boolean _stopped;
    private int _numPendingOperations = 1;
    private boolean _closed;
    private final Lock _preloadLock = new ReentrantLock();
    private volatile boolean _isPreloading;
    private final UpsertConfig.ConsistencyMode _consistencyMode;
    private final long _upsertViewRefreshIntervalMs;
    private final ReadWriteLock _upsertViewLock = new ReentrantReadWriteLock();
    private final Set<IndexSegment> _updatedSegmentsSinceLastRefresh = ConcurrentHashMap.newKeySet();
    private volatile long _lastUpsertViewRefreshTimeMs = 0L;
    private volatile Map<IndexSegment, MutableRoaringBitmap> _segmentQueryableDocIdsMap;

    protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId, UpsertContext context) {
        this._tableNameWithType = tableNameWithType;
        this._partitionId = partitionId;
        this._context = context;
        this._primaryKeyColumns = context.getPrimaryKeyColumns();
        this._comparisonColumns = context.getComparisonColumns();
        this._deleteRecordColumn = context.getDeleteRecordColumn();
        this._hashFunction = context.getHashFunction();
        this._partialUpsertHandler = context.getPartialUpsertHandler();
        this._enableSnapshot = context.isSnapshotEnabled();
        this._snapshotLock = this._enableSnapshot ? new ReentrantReadWriteLock() : null;
        this._isPreloading = this._enableSnapshot && context.isPreloadEnabled();
        this._metadataTTL = context.getMetadataTTL();
        this._deletedKeysTTL = context.getDeletedKeysTTL();
        this._tableIndexDir = context.getTableIndexDir();
        UpsertConfig.ConsistencyMode cmode = context.getConsistencyMode();
        this._consistencyMode = cmode != null ? cmode : UpsertConfig.ConsistencyMode.NONE;
        this._upsertViewRefreshIntervalMs = context.getUpsertViewRefreshIntervalMs();
        this._serverMetrics = ServerMetrics.get();
        this._logger = LoggerFactory.getLogger((String)(tableNameWithType + "-" + partitionId + "-" + this.getClass().getSimpleName()));
        if (this._metadataTTL > 0.0) {
            this._largestSeenComparisonValue = new AtomicDouble(this.loadWatermark());
        } else {
            this._largestSeenComparisonValue = new AtomicDouble(Double.MIN_VALUE);
            this.deleteWatermark();
        }
    }

    @Override
    public List<String> getPrimaryKeyColumns() {
        return this._primaryKeyColumns;
    }

    @Nullable
    protected MutableRoaringBitmap getQueryableDocIds(IndexSegment segment, MutableRoaringBitmap validDocIds) {
        if (this._deleteRecordColumn == null) {
            return null;
        }
        MutableRoaringBitmap queryableDocIds = new MutableRoaringBitmap();
        try (PinotSegmentColumnReader deleteRecordColumnReader = new PinotSegmentColumnReader(segment, this._deleteRecordColumn);){
            PeekableIntIterator docIdIterator = validDocIds.getIntIterator();
            while (docIdIterator.hasNext()) {
                int docId = docIdIterator.next();
                if (BooleanUtils.toBoolean((Object)deleteRecordColumnReader.getValue(docId))) continue;
                queryableDocIds.add(docId);
            }
        }
        catch (IOException e) {
            this._logger.error("Failed to close column reader for delete record column: {} for segment: {} ", new Object[]{this._deleteRecordColumn, segment.getSegmentName(), e});
        }
        return queryableDocIds;
    }

    @Override
    public boolean isPreloading() {
        return this._isPreloading;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void preloadSegments(IndexLoadingConfig indexLoadingConfig) {
        if (!this._isPreloading) {
            return;
        }
        TableDataManager tableDataManager = this._context.getTableDataManager();
        Preconditions.checkNotNull((Object)tableDataManager, (Object)"Preloading segments requires tableDataManager");
        HelixManager helixManager = tableDataManager.getHelixManager();
        ExecutorService segmentPreloadExecutor = tableDataManager.getSegmentPreloadExecutor();
        this._preloadLock.lock();
        try {
            if (!this._isPreloading) {
                return;
            }
            long startTime = System.currentTimeMillis();
            this.doPreloadSegments(tableDataManager, indexLoadingConfig, helixManager, segmentPreloadExecutor);
            long duration = System.currentTimeMillis() - startTime;
            this._serverMetrics.addTimedTableValue(this._tableNameWithType, (AbstractMetrics.Timer)ServerTimer.UPSERT_PRELOAD_TIME_MS, duration, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            this._logger.warn("Failed to preload segments from partition: {} of table: {}, skipping", new Object[]{this._partitionId, this._tableNameWithType, e});
            this._serverMetrics.addMeteredTableValue(this._tableNameWithType, (AbstractMetrics.Meter)ServerMeter.UPSERT_PRELOAD_FAILURE, 1L);
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
        }
        finally {
            this._isPreloading = false;
            this._preloadLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void doPreloadSegments(TableDataManager tableDataManager, IndexLoadingConfig indexLoadingConfig, HelixManager helixManager, ExecutorService segmentPreloadExecutor) throws Exception {
        this._logger.info("Preload segments from partition: {} of table: {} for fast upsert metadata recovery", (Object)this._partitionId, (Object)this._tableNameWithType);
        String instanceId = this.getInstanceId(tableDataManager);
        Map<String, Map<String, String>> segmentAssignment = this.getSegmentAssignment(helixManager);
        Map<String, SegmentZKMetadata> segmentMetadataMap = this.getSegmentsZKMetadata(helixManager);
        ArrayList futures = new ArrayList();
        for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) {
            String segmentName = entry.getKey();
            Map<String, String> instanceStateMap = entry.getValue();
            String state = instanceStateMap.get(instanceId);
            if (!"ONLINE".equals(state)) {
                if (state == null) {
                    this._logger.debug("Skip segment: {} as it's not assigned to instance: {}", (Object)segmentName, (Object)instanceId);
                    continue;
                }
                this._logger.info("Skip segment: {} as its ideal state: {} is not ONLINE for instance: {}", new Object[]{segmentName, state, instanceId});
                continue;
            }
            SegmentZKMetadata segmentZKMetadata = segmentMetadataMap.get(segmentName);
            Preconditions.checkState((segmentZKMetadata != null ? 1 : 0) != 0, (String)"Failed to find ZK metadata for segment: %s, table: %s", (Object)segmentName, (Object)this._tableNameWithType);
            Integer partitionId = SegmentUtils.getRealtimeSegmentPartitionId((String)segmentName, (SegmentZKMetadata)segmentZKMetadata, null);
            Preconditions.checkNotNull((Object)partitionId, (Object)String.format("Failed to get partition id for segment: %s (upsert-enabled table: %s)", segmentName, this._tableNameWithType));
            if (partitionId != this._partitionId) {
                this._logger.debug("Skip segment: {} as its partition: {} is different from the requested partition: {}", new Object[]{segmentName, partitionId, this._partitionId});
                continue;
            }
            if (!BasePartitionUpsertMetadataManager.hasValidDocIdsSnapshot(tableDataManager, indexLoadingConfig.getTableConfig(), segmentName, segmentZKMetadata.getTier())) {
                this._logger.info("Skip segment: {} from partition: {} as no validDocIds snapshot exists", (Object)segmentName, (Object)this._partitionId);
                continue;
            }
            futures.add(segmentPreloadExecutor.submit(() -> this.doPreloadSegmentWithSnapshot(tableDataManager, segmentName, indexLoadingConfig, segmentZKMetadata)));
        }
        try {
            for (Future future : futures) {
                future.get();
            }
        }
        finally {
            for (Future future : futures) {
                if (future.isDone()) continue;
                future.cancel(true);
            }
        }
        this._logger.info("Preloaded segments from partition: {} of table: {} for fast upsert metadata recovery", (Object)this._partitionId, (Object)this._tableNameWithType);
    }

    private String getInstanceId(TableDataManager tableDataManager) {
        return tableDataManager.getInstanceDataManagerConfig().getInstanceId();
    }

    private static boolean hasValidDocIdsSnapshot(TableDataManager tableDataManager, TableConfig tableConfig, String segmentName, String segmentTier) {
        try {
            File indexDir = tableDataManager.getSegmentDataDir(segmentName, segmentTier, tableConfig);
            File snapshotFile = new File(SegmentDirectoryPaths.findSegmentDirectory((File)indexDir), "validdocids.bitmap.snapshot");
            return snapshotFile.exists();
        }
        catch (Exception e) {
            return false;
        }
    }

    @VisibleForTesting
    Map<String, Map<String, String>> getSegmentAssignment(HelixManager helixManager) {
        IdealState idealState = HelixHelper.getTableIdealState((HelixManager)helixManager, (String)this._tableNameWithType);
        Preconditions.checkState((idealState != null ? 1 : 0) != 0, (String)"Failed to find ideal state for table: %s", (Object)this._tableNameWithType);
        return idealState.getRecord().getMapFields();
    }

    @VisibleForTesting
    Map<String, SegmentZKMetadata> getSegmentsZKMetadata(HelixManager helixManager) {
        HashMap<String, SegmentZKMetadata> segmentMetadataMap = new HashMap<String, SegmentZKMetadata>();
        ZKMetadataProvider.getSegmentsZKMetadata((ZkHelixPropertyStore)helixManager.getHelixPropertyStore(), (String)this._tableNameWithType).forEach(m -> segmentMetadataMap.put(m.getSegmentName(), (SegmentZKMetadata)m));
        return segmentMetadataMap;
    }

    @VisibleForTesting
    void doPreloadSegmentWithSnapshot(TableDataManager tableDataManager, String segmentName, IndexLoadingConfig indexLoadingConfig, SegmentZKMetadata segmentZKMetadata) {
        try {
            this._logger.info("Preload segment: {} from partition: {} of table: {}", new Object[]{segmentName, this._partitionId, this._tableNameWithType});
            tableDataManager.tryLoadExistingSegment(segmentZKMetadata, indexLoadingConfig);
            this._logger.info("Preloaded segment: {} from partition: {} of table: {}", new Object[]{segmentName, this._partitionId, this._tableNameWithType});
        }
        catch (Exception e) {
            this._logger.warn("Failed to preload segment: {} from partition: {} of table: {}, skipping", new Object[]{segmentName, this._partitionId, this._tableNameWithType, e});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addSegment(ImmutableSegment segment) {
        String segmentName = segment.getSegmentName();
        if (segment instanceof EmptyIndexSegment) {
            this._logger.info("Skip adding empty segment: {}", (Object)segmentName);
            return;
        }
        Preconditions.checkArgument((boolean)(segment instanceof ImmutableSegmentImpl), (String)"Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), (Object)segmentName, (Object)this._tableNameWithType);
        ImmutableSegmentImpl immutableSegment = (ImmutableSegmentImpl)segment;
        if (this._deletedKeysTTL > 0.0) {
            double maxComparisonValue = ((Number)((Object)((ColumnMetadata)segment.getSegmentMetadata().getColumnMetadataMap().get(this._comparisonColumns.get(0))).getMaxValue())).doubleValue();
            this._largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, maxComparisonValue));
        }
        if (this._metadataTTL > 0.0 && this._largestSeenComparisonValue.get() > 0.0) {
            Preconditions.checkState((boolean)this._enableSnapshot, (Object)"Upsert TTL must have snapshot enabled");
            Preconditions.checkState((this._comparisonColumns.size() == 1 ? 1 : 0) != 0, (Object)"Upsert TTL does not work with multiple comparison columns");
            Number maxComparisonValue = (Number)((Object)((ColumnMetadata)segment.getSegmentMetadata().getColumnMetadataMap().get(this._comparisonColumns.get(0))).getMaxValue());
            if (maxComparisonValue.doubleValue() < this._largestSeenComparisonValue.get() - this._metadataTTL) {
                this._logger.info("Skip adding segment: {} because it's out of TTL", (Object)segmentName);
                MutableRoaringBitmap validDocIdsSnapshot = immutableSegment.loadValidDocIdsFromSnapshot();
                if (validDocIdsSnapshot != null) {
                    MutableRoaringBitmap queryableDocIds = this.getQueryableDocIds((IndexSegment)segment, validDocIdsSnapshot);
                    immutableSegment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(validDocIdsSnapshot), queryableDocIds != null ? new ThreadSafeMutableRoaringBitmap(queryableDocIds) : null);
                } else {
                    this._logger.warn("Failed to find snapshot from segment: {} which is out of TTL, treating all documents as valid", (Object)segmentName);
                }
                return;
            }
        }
        if (!this.startOperation()) {
            this._logger.info("Skip adding segment: {} because metadata manager is already stopped", (Object)segment.getSegmentName());
            return;
        }
        if (this._enableSnapshot) {
            this._snapshotLock.readLock().lock();
        }
        try {
            this.doAddSegment(immutableSegment);
            this._trackedSegments.add((IndexSegment)segment);
            if (this._enableSnapshot) {
                this._updatedSegmentsSinceLastSnapshot.add(segment);
            }
        }
        finally {
            if (this._enableSnapshot) {
                this._snapshotLock.readLock().unlock();
            }
            this.finishOperation();
        }
    }

    protected void doAddSegment(ImmutableSegmentImpl segment) {
        MutableRoaringBitmap validDocIds;
        String segmentName = segment.getSegmentName();
        this._logger.info("Adding segment: {}, current primary key count: {}", (Object)segmentName, (Object)this.getNumPrimaryKeys());
        long startTimeMs = System.currentTimeMillis();
        if (this._enableSnapshot) {
            validDocIds = segment.loadValidDocIdsFromSnapshot();
            if (validDocIds != null && validDocIds.isEmpty()) {
                this._logger.info("Skip adding segment: {} without valid doc, current primary key count: {}", (Object)segment.getSegmentName(), (Object)this.getNumPrimaryKeys());
                segment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(), null);
                return;
            }
        } else {
            validDocIds = null;
            segment.deleteValidDocIdsSnapshot();
        }
        try (UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader((IndexSegment)segment, this._primaryKeyColumns, this._comparisonColumns, this._deleteRecordColumn);){
            Iterator<RecordInfo> recordInfoIterator = validDocIds != null ? UpsertUtils.getRecordInfoIterator(recordInfoReader, validDocIds) : UpsertUtils.getRecordInfoIterator(recordInfoReader, segment.getSegmentMetadata().getTotalDocs());
            this.addSegment(segment, null, null, recordInfoIterator);
        }
        catch (Exception e) {
            throw new RuntimeException(String.format("Caught exception while adding segment: %s, table: %s", segmentName, this._tableNameWithType), e);
        }
        long numPrimaryKeys = this.getNumPrimaryKeys();
        this.updatePrimaryKeyGauge(numPrimaryKeys);
        this._logger.info("Finished adding segment: {} in {}ms, current primary key count: {}", new Object[]{segmentName, System.currentTimeMillis() - startTimeMs, numPrimaryKeys});
    }

    protected abstract long getNumPrimaryKeys();

    protected void updatePrimaryKeyGauge(long numPrimaryKeys) {
        this._serverMetrics.setValueOfPartitionGauge(this._tableNameWithType, this._partitionId, (AbstractMetrics.Gauge)ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, numPrimaryKeys);
    }

    protected void updatePrimaryKeyGauge() {
        this.updatePrimaryKeyGauge(this.getNumPrimaryKeys());
    }

    @Override
    public void preloadSegment(ImmutableSegment segment) {
        String segmentName = segment.getSegmentName();
        Preconditions.checkArgument((boolean)this._enableSnapshot, (String)"Snapshot must be enabled to preload segment: %s, table: %s", (Object)segmentName, (Object)this._tableNameWithType);
        Preconditions.checkArgument((boolean)(segment instanceof ImmutableSegmentImpl), (String)"Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), (Object)segmentName, (Object)this._tableNameWithType);
        if (!this.startOperation()) {
            this._logger.info("Skip preloading segment: {} because metadata manager is already stopped", (Object)segmentName);
            return;
        }
        this._snapshotLock.readLock().lock();
        try {
            this.doPreloadSegment((ImmutableSegmentImpl)segment);
            this._trackedSegments.add((IndexSegment)segment);
            this._updatedSegmentsSinceLastSnapshot.add(segment);
        }
        finally {
            this._snapshotLock.readLock().unlock();
            this.finishOperation();
        }
    }

    protected void doPreloadSegment(ImmutableSegmentImpl segment) {
        String segmentName = segment.getSegmentName();
        this._logger.info("Preloading segment: {}, current primary key count: {}", (Object)segmentName, (Object)this.getNumPrimaryKeys());
        long startTimeMs = System.currentTimeMillis();
        MutableRoaringBitmap validDocIds = segment.loadValidDocIdsFromSnapshot();
        Preconditions.checkState((validDocIds != null ? 1 : 0) != 0, (String)"Snapshot of validDocIds is required to preload segment: %s, table: %s", (Object)segmentName, (Object)this._tableNameWithType);
        if (validDocIds.isEmpty()) {
            this._logger.info("Skip preloading segment: {} without valid doc, current primary key count: {}", (Object)segment.getSegmentName(), (Object)this.getNumPrimaryKeys());
            segment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(), null);
            return;
        }
        try (UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader((IndexSegment)segment, this._primaryKeyColumns, this._comparisonColumns, this._deleteRecordColumn);){
            this.doPreloadSegment(segment, null, null, UpsertUtils.getRecordInfoIterator(recordInfoReader, validDocIds));
        }
        catch (Exception e) {
            throw new RuntimeException(String.format("Caught exception while preloading segment: %s, table: %s", segmentName, this._tableNameWithType), e);
        }
        long numPrimaryKeys = this.getNumPrimaryKeys();
        this.updatePrimaryKeyGauge(numPrimaryKeys);
        this._logger.info("Finished preloading segment: {} in {}ms, current primary key count: {}", new Object[]{segmentName, System.currentTimeMillis() - startTimeMs, numPrimaryKeys});
    }

    @VisibleForTesting
    void doPreloadSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator) {
        if (validDocIds == null) {
            validDocIds = new ThreadSafeMutableRoaringBitmap();
        }
        if (queryableDocIds == null && this._deleteRecordColumn != null) {
            queryableDocIds = new ThreadSafeMutableRoaringBitmap();
        }
        this.addSegmentWithoutUpsert(segment, validDocIds, queryableDocIds, recordInfoIterator);
    }

    @VisibleForTesting
    public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator) {
        if (validDocIds == null) {
            validDocIds = new ThreadSafeMutableRoaringBitmap();
        }
        if (queryableDocIds == null && this._deleteRecordColumn != null) {
            queryableDocIds = new ThreadSafeMutableRoaringBitmap();
        }
        this.addOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, null, null);
    }

    protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator, @Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap validDocIdsForOldSegment) {
        if (this._partialUpsertHandler != null) {
            recordInfoIterator = BasePartitionUpsertMetadataManager.resolveComparisonTies(recordInfoIterator, this._hashFunction);
        }
        this.doAddOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, oldSegment, validDocIdsForOldSegment);
    }

    protected abstract void doAddOrReplaceSegment(ImmutableSegmentImpl var1, ThreadSafeMutableRoaringBitmap var2, @Nullable ThreadSafeMutableRoaringBitmap var3, Iterator<RecordInfo> var4, @Nullable IndexSegment var5, @Nullable MutableRoaringBitmap var6);

    protected void addSegmentWithoutUpsert(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator) {
        this.addOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, null, null);
    }

    protected boolean shouldReplaceOnComparisonTie(String segmentName, String currentSegmentName, long segmentCreationTimeMs, long currentSegmentCreationTimeMs) {
        LLCSegmentName llcSegmentName = LLCSegmentName.of((String)segmentName);
        LLCSegmentName currentLLCSegmentName = LLCSegmentName.of((String)currentSegmentName);
        if (llcSegmentName != null && currentLLCSegmentName != null) {
            return llcSegmentName.getSequenceNumber() > currentLLCSegmentName.getSequenceNumber();
        }
        int creationTimeComparisonRes = Long.compare(segmentCreationTimeMs, currentSegmentCreationTimeMs);
        if (creationTimeComparisonRes != 0) {
            return creationTimeComparisonRes > 0;
        }
        if (UploadedRealtimeSegmentName.of((String)currentSegmentName) != null) {
            return false;
        }
        return UploadedRealtimeSegmentName.of((String)segmentName) != null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean addRecord(MutableSegment segment, RecordInfo recordInfo) {
        this._gotFirstConsumingSegment = true;
        if (!this.startOperation()) {
            this._logger.debug("Skip adding record to segment: {} because metadata manager is already stopped", (Object)segment.getSegmentName());
            return false;
        }
        try {
            boolean addRecord = this.doAddRecord(segment, recordInfo);
            this._trackedSegments.add((IndexSegment)segment);
            boolean bl = addRecord;
            return bl;
        }
        finally {
            this.finishOperation();
        }
    }

    protected abstract boolean doAddRecord(MutableSegment var1, RecordInfo var2);

    @Override
    public void replaceSegment(ImmutableSegment segment, IndexSegment oldSegment) {
        if (!this.startOperation()) {
            this._logger.info("Skip replacing segment: {} because metadata manager is already stopped", (Object)segment.getSegmentName());
            return;
        }
        if (this._enableSnapshot) {
            this._snapshotLock.readLock().lock();
        }
        try {
            this.doReplaceSegment(segment, oldSegment);
            if (!(segment instanceof EmptyIndexSegment)) {
                this._trackedSegments.add((IndexSegment)segment);
                if (this._enableSnapshot) {
                    this._updatedSegmentsSinceLastSnapshot.add(segment);
                }
            }
            this._trackedSegments.remove(oldSegment);
        }
        finally {
            if (this._enableSnapshot) {
                this._snapshotLock.readLock().unlock();
            }
            this.finishOperation();
        }
    }

    protected void doReplaceSegment(ImmutableSegment segment, IndexSegment oldSegment) {
        String segmentName = segment.getSegmentName();
        Preconditions.checkArgument((boolean)segmentName.equals(oldSegment.getSegmentName()), (String)"Cannot replace segment with different name for table: {}, old segment: {}, new segment: {}", (Object)this._tableNameWithType, (Object)oldSegment.getSegmentName(), (Object)segmentName);
        this._logger.info("Replacing {} segment: {}, current primary key count: {}", new Object[]{oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, this.getNumPrimaryKeys()});
        long startTimeMs = System.currentTimeMillis();
        if (segment instanceof EmptyIndexSegment) {
            this._logger.info("Skip adding empty segment: {}", (Object)segmentName);
            this.replaceSegment(segment, null, null, null, oldSegment);
            return;
        }
        try (UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader((IndexSegment)segment, this._primaryKeyColumns, this._comparisonColumns, this._deleteRecordColumn);){
            Iterator<RecordInfo> recordInfoIterator = UpsertUtils.getRecordInfoIterator(recordInfoReader, segment.getSegmentMetadata().getTotalDocs());
            this.replaceSegment(segment, null, null, recordInfoIterator, oldSegment);
        }
        catch (Exception e) {
            throw new RuntimeException(String.format("Caught exception while replacing segment: %s, table: %s", segmentName, this._tableNameWithType), e);
        }
        long numPrimaryKeys = this.getNumPrimaryKeys();
        this.updatePrimaryKeyGauge(numPrimaryKeys);
        this._logger.info("Finished replacing segment: {} in {}ms, current primary key count: {}", new Object[]{segmentName, System.currentTimeMillis() - startTimeMs, numPrimaryKeys});
    }

    @VisibleForTesting
    public void replaceSegment(ImmutableSegment segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, @Nullable Iterator<RecordInfo> recordInfoIterator, IndexSegment oldSegment) {
        MutableRoaringBitmap validDocIdsForOldSegment;
        String segmentName = segment.getSegmentName();
        MutableRoaringBitmap mutableRoaringBitmap = validDocIdsForOldSegment = oldSegment.getValidDocIds() != null ? oldSegment.getValidDocIds().getMutableRoaringBitmap() : null;
        if (recordInfoIterator != null) {
            Preconditions.checkArgument((boolean)(segment instanceof ImmutableSegmentImpl), (String)"Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), (Object)segmentName, (Object)this._tableNameWithType);
            if (validDocIds == null) {
                validDocIds = new ThreadSafeMutableRoaringBitmap();
            }
            if (queryableDocIds == null && this._deleteRecordColumn != null) {
                queryableDocIds = new ThreadSafeMutableRoaringBitmap();
            }
            this.addOrReplaceSegment((ImmutableSegmentImpl)segment, validDocIds, queryableDocIds, recordInfoIterator, oldSegment, validDocIdsForOldSegment);
        }
        if (validDocIdsForOldSegment != null && !validDocIdsForOldSegment.isEmpty()) {
            int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
            if (this._partialUpsertHandler != null) {
                this._logger.warn("Found {} primary keys not replaced when replacing segment: {} for partial-upsert table. This can potentially cause inconsistency between replicas", (Object)numKeysNotReplaced, (Object)segmentName);
                this._serverMetrics.addMeteredTableValue(this._tableNameWithType, (AbstractMetrics.Meter)ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED, (long)numKeysNotReplaced);
            } else {
                this._logger.info("Found {} primary keys not replaced when replacing segment: {}", (Object)numKeysNotReplaced, (Object)segmentName);
            }
            this.removeSegment(oldSegment, validDocIdsForOldSegment);
        }
    }

    protected void removeSegment(IndexSegment segment, MutableRoaringBitmap validDocIds) {
        try (UpsertUtils.PrimaryKeyReader primaryKeyReader = new UpsertUtils.PrimaryKeyReader(segment, this._primaryKeyColumns);){
            this.removeSegment(segment, UpsertUtils.getPrimaryKeyIterator(primaryKeyReader, validDocIds));
        }
        catch (Exception e) {
            throw new RuntimeException(String.format("Caught exception while removing segment: %s, table: %s", segment.getSegmentName(), this._tableNameWithType), e);
        }
    }

    protected void removeSegment(IndexSegment segment, Iterator<PrimaryKey> primaryKeyIterator) {
        throw new UnsupportedOperationException("Both removeSegment(segment, validDocID) and removeSegment(segment, pkIterator) are not implemented. Implement one of them to support removal.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void removeSegment(IndexSegment segment) {
        Number maxComparisonValue;
        String segmentName = segment.getSegmentName();
        if (!this._trackedSegments.contains(segment)) {
            this._logger.info("Skip removing untracked (replaced or empty) segment: {}", (Object)segmentName);
            return;
        }
        boolean skipRemoveMetadata = false;
        if (this._metadataTTL > 0.0 && this._largestSeenComparisonValue.get() > 0.0 && (maxComparisonValue = (Number)((Object)((ColumnMetadata)segment.getSegmentMetadata().getColumnMetadataMap().get(this._comparisonColumns.get(0))).getMaxValue())).doubleValue() < this._largestSeenComparisonValue.get() - this._metadataTTL) {
            this._logger.info("Skip removing segment: {} because it's out of TTL", (Object)segmentName);
            skipRemoveMetadata = true;
        }
        if (!this.startOperation()) {
            this._logger.info("Skip removing segment: {} because metadata manager is already stopped", (Object)segmentName);
            return;
        }
        if (this._enableSnapshot) {
            this._snapshotLock.readLock().lock();
        }
        try {
            if (!skipRemoveMetadata) {
                this.doRemoveSegment(segment);
            }
            this._trackedSegments.remove(segment);
        }
        finally {
            if (this._enableSnapshot) {
                this._snapshotLock.readLock().unlock();
            }
            this.finishOperation();
        }
    }

    protected void doRemoveSegment(IndexSegment segment) {
        MutableRoaringBitmap validDocIds;
        String segmentName = segment.getSegmentName();
        this._logger.info("Removing {} segment: {}, current primary key count: {}", new Object[]{segment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, this.getNumPrimaryKeys()});
        long startTimeMs = System.currentTimeMillis();
        MutableRoaringBitmap mutableRoaringBitmap = validDocIds = segment.getValidDocIds() != null ? segment.getValidDocIds().getMutableRoaringBitmap() : null;
        if (validDocIds == null || validDocIds.isEmpty()) {
            this._logger.info("Skip removing segment without valid docs: {}", (Object)segmentName);
            return;
        }
        this._logger.info("Removing {} primary keys for segment: {}", (Object)validDocIds.getCardinality(), (Object)segmentName);
        this.removeSegment(segment, validDocIds);
        long numPrimaryKeys = this.getNumPrimaryKeys();
        this.updatePrimaryKeyGauge(numPrimaryKeys);
        this._logger.info("Finished removing segment: {} in {}ms, current primary key count: {}", new Object[]{segmentName, System.currentTimeMillis() - startTimeMs, numPrimaryKeys});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public GenericRow updateRecord(GenericRow record, RecordInfo recordInfo) {
        if (this._partialUpsertHandler == null) {
            return record;
        }
        if (!this.startOperation()) {
            this._logger.debug("Skip updating record because metadata manager is already stopped");
            return record;
        }
        try {
            GenericRow genericRow = this.doUpdateRecord(record, recordInfo);
            return genericRow;
        }
        finally {
            this.finishOperation();
        }
    }

    protected abstract GenericRow doUpdateRecord(GenericRow var1, RecordInfo var2);

    protected void handleOutOfOrderEvent(Object currentComparisonValue, Object recordComparisonValue) {
        boolean isPartialUpsertTable = this._partialUpsertHandler != null;
        this._serverMetrics.addMeteredTableValue(this._tableNameWithType, (AbstractMetrics.Meter)(isPartialUpsertTable ? ServerMeter.PARTIAL_UPSERT_OUT_OF_ORDER : ServerMeter.UPSERT_OUT_OF_ORDER), 1L);
        ++this._numOutOfOrderEvents;
        long currentTimeNs = System.nanoTime();
        if (currentTimeNs - this._lastOutOfOrderEventReportTimeNs > OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS) {
            this._logger.warn("Skipped {} out-of-order events for {} upsert table {} (the last event has current comparison value: {}, record comparison value: {})", new Object[]{this._numOutOfOrderEvents, isPartialUpsertTable ? "partial" : "full", this._tableNameWithType, currentComparisonValue, recordComparisonValue});
            this._lastOutOfOrderEventReportTimeNs = currentTimeNs;
            this._numOutOfOrderEvents = 0;
        }
    }

    protected static Iterator<RecordInfo> resolveComparisonTies(Iterator<RecordInfo> recordInfoIterator, HashFunction hashFunction) {
        HashMap<Object, RecordInfo> deDuplicatedRecordInfo = new HashMap<Object, RecordInfo>();
        while (recordInfoIterator.hasNext()) {
            RecordInfo recordInfo = recordInfoIterator.next();
            Comparable newComparisonValue = recordInfo.getComparisonValue();
            deDuplicatedRecordInfo.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), hashFunction), (key, maxComparisonValueRecordInfo) -> {
                if (maxComparisonValueRecordInfo == null) {
                    return recordInfo;
                }
                int comparisonResult = newComparisonValue.compareTo(maxComparisonValueRecordInfo.getComparisonValue());
                if (comparisonResult >= 0) {
                    return recordInfo;
                }
                return maxComparisonValueRecordInfo;
            });
        }
        return deDuplicatedRecordInfo.values().iterator();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void takeSnapshot() {
        if (!this._enableSnapshot) {
            return;
        }
        if (this._partialUpsertHandler == null && !this._gotFirstConsumingSegment) {
            this._logger.info("Skip taking snapshot before getting the first consuming segment for full-upsert table");
            return;
        }
        if (!this.startOperation()) {
            this._logger.info("Skip taking snapshot because metadata manager is already stopped");
            return;
        }
        this._snapshotLock.writeLock().lock();
        try {
            long startTime = System.currentTimeMillis();
            this.doTakeSnapshot();
            long duration = System.currentTimeMillis() - startTime;
            this._serverMetrics.addTimedTableValue(this._tableNameWithType, (AbstractMetrics.Timer)ServerTimer.UPSERT_SNAPSHOT_TIME_MS, duration, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            this._logger.warn("Caught exception while taking snapshot", (Throwable)e);
        }
        finally {
            this._snapshotLock.writeLock().unlock();
            this.finishOperation();
        }
    }

    protected void doTakeSnapshot() {
        int numTrackedSegments = this._trackedSegments.size();
        long numPrimaryKeysInSnapshot = 0L;
        this._logger.info("Taking snapshot for {} segments", (Object)numTrackedSegments);
        long startTimeMs = System.currentTimeMillis();
        int numImmutableSegments = 0;
        int numConsumingSegments = 0;
        HashSet<ImmutableSegmentImpl> segmentsWithoutSnapshot = new HashSet<ImmutableSegmentImpl>();
        for (IndexSegment indexSegment : this._trackedSegments) {
            if (!(indexSegment instanceof ImmutableSegmentImpl)) {
                ++numConsumingSegments;
                continue;
            }
            if (!this._updatedSegmentsSinceLastSnapshot.contains(indexSegment)) continue;
            try {
                ImmutableSegmentImpl immutableSegment = (ImmutableSegmentImpl)indexSegment;
                if (!immutableSegment.hasValidDocIdsSnapshotFile()) {
                    segmentsWithoutSnapshot.add(immutableSegment);
                    continue;
                }
                immutableSegment.persistValidDocIdsSnapshot();
                ++numImmutableSegments;
                numPrimaryKeysInSnapshot += (long)immutableSegment.getValidDocIds().getMutableRoaringBitmap().getCardinality();
            }
            catch (Exception e) {
                this._logger.warn("Caught exception while taking snapshot for segment: {}, skipping", (Object)indexSegment.getSegmentName(), (Object)e);
                Utils.rethrowException((Throwable)e);
            }
        }
        for (ImmutableSegmentImpl immutableSegmentImpl : segmentsWithoutSnapshot) {
            try {
                immutableSegmentImpl.persistValidDocIdsSnapshot();
                ++numImmutableSegments;
                numPrimaryKeysInSnapshot += (long)immutableSegmentImpl.getValidDocIds().getMutableRoaringBitmap().getCardinality();
            }
            catch (Exception e) {
                this._logger.warn("Caught exception while taking snapshot for segment: {}, skipping", (Object)immutableSegmentImpl.getSegmentName(), (Object)e);
                Utils.rethrowException((Throwable)e);
            }
        }
        this._updatedSegmentsSinceLastSnapshot.clear();
        this._serverMetrics.setValueOfPartitionGauge(this._tableNameWithType, this._partitionId, (AbstractMetrics.Gauge)ServerGauge.UPSERT_VALID_DOC_ID_SNAPSHOT_COUNT, (long)numImmutableSegments);
        this._serverMetrics.setValueOfPartitionGauge(this._tableNameWithType, this._partitionId, (AbstractMetrics.Gauge)ServerGauge.UPSERT_PRIMARY_KEYS_IN_SNAPSHOT_COUNT, numPrimaryKeysInSnapshot);
        int numMissedSegments = numTrackedSegments - numImmutableSegments - numConsumingSegments;
        if (numMissedSegments > 0) {
            this._serverMetrics.addMeteredTableValue(this._tableNameWithType, String.valueOf(this._partitionId), (AbstractMetrics.Meter)ServerMeter.UPSERT_MISSED_VALID_DOC_ID_SNAPSHOT_COUNT, (long)numMissedSegments);
            this._logger.warn("Missed taking snapshot for {} immutable segments", (Object)numMissedSegments);
        }
        this._logger.info("Finished taking snapshot for {} immutable segments with {} primary keys (out of {} total segments, {} are consuming segments) in {} ms", new Object[]{numImmutableSegments, numPrimaryKeysInSnapshot, numTrackedSegments, numConsumingSegments, System.currentTimeMillis() - startTimeMs});
    }

    protected double loadWatermark() {
        File watermarkFile = this.getWatermarkFile();
        if (watermarkFile.exists()) {
            try {
                byte[] bytes = FileUtils.readFileToByteArray((File)watermarkFile);
                double watermark = ByteBuffer.wrap(bytes).getDouble();
                this._logger.info("Loaded watermark: {} from file for table: {} partition_id: {}", new Object[]{watermark, this._tableNameWithType, this._partitionId});
                return watermark;
            }
            catch (Exception e) {
                this._logger.warn("Caught exception while loading watermark file: {}, skipping", (Object)watermarkFile);
            }
        }
        return Double.MIN_VALUE;
    }

    protected void persistWatermark(double watermark) {
        File watermarkFile = this.getWatermarkFile();
        try {
            if (watermarkFile.exists() && !FileUtils.deleteQuietly((File)watermarkFile)) {
                this._logger.warn("Cannot delete watermark file: {}, skipping", (Object)watermarkFile);
                return;
            }
            try (FileOutputStream outputStream = new FileOutputStream(watermarkFile, false);
                 DataOutputStream dataOutputStream = new DataOutputStream(outputStream);){
                dataOutputStream.writeDouble(watermark);
            }
            this._logger.info("Persisted watermark: {} to file: {}", (Object)watermark, (Object)watermarkFile);
        }
        catch (Exception e) {
            this._logger.warn("Caught exception while persisting watermark file: {}, skipping", (Object)watermarkFile);
        }
    }

    protected void deleteWatermark() {
        File watermarkFile = this.getWatermarkFile();
        if (watermarkFile.exists() && !FileUtils.deleteQuietly((File)watermarkFile)) {
            this._logger.warn("Cannot delete watermark file: {}, skipping", (Object)watermarkFile);
        }
    }

    protected File getWatermarkFile() {
        return new File(this._tableIndexDir, "ttl.watermark.partition." + this._partitionId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void removeExpiredPrimaryKeys() {
        if (this._metadataTTL <= 0.0 && this._deletedKeysTTL <= 0.0) {
            return;
        }
        if (!this.startOperation()) {
            this._logger.info("Skip removing expired primary keys because metadata manager is already stopped");
            return;
        }
        try {
            long startTime = System.currentTimeMillis();
            this.doRemoveExpiredPrimaryKeys();
            long duration = System.currentTimeMillis() - startTime;
            this._serverMetrics.addTimedTableValue(this._tableNameWithType, (AbstractMetrics.Timer)ServerTimer.UPSERT_REMOVE_EXPIRED_PRIMARY_KEYS_TIME_MS, duration, TimeUnit.MILLISECONDS);
        }
        finally {
            this.finishOperation();
        }
    }

    protected abstract void doRemoveExpiredPrimaryKeys();

    protected synchronized boolean startOperation() {
        if (this._stopped || this._numPendingOperations == 0) {
            return false;
        }
        ++this._numPendingOperations;
        return true;
    }

    protected synchronized void finishOperation() {
        --this._numPendingOperations;
        if (this._numPendingOperations == 0) {
            this.notifyAll();
        }
    }

    @Override
    public synchronized void stop() {
        if (this._stopped) {
            this._logger.warn("Metadata manager is already stopped");
            return;
        }
        this._stopped = true;
        --this._numPendingOperations;
        this._logger.info("Stopped the metadata manager with {} pending operations, current primary key count: {}", (Object)this._numPendingOperations, (Object)this.getNumPrimaryKeys());
    }

    @Override
    public synchronized void close() throws IOException {
        Preconditions.checkState((boolean)this._stopped, (Object)"Must stop the metadata manager before closing it");
        if (this._closed) {
            this._logger.warn("Metadata manager is already closed");
            return;
        }
        this._closed = true;
        this._logger.info("Closing the metadata manager");
        while (this._numPendingOperations != 0) {
            this._logger.info("Waiting for {} pending operations to finish", (Object)this._numPendingOperations);
            try {
                this.wait();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(String.format("Interrupted while waiting for %d pending operations to finish", this._numPendingOperations), e);
            }
        }
        this.doClose();
        this.updatePrimaryKeyGauge(0L);
        this._logger.info("Closed the metadata manager");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void replaceDocId(IndexSegment newSegment, ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, IndexSegment oldSegment, int oldDocId, int newDocId, RecordInfo recordInfo) {
        if (this._consistencyMode == UpsertConfig.ConsistencyMode.SYNC) {
            this._upsertViewLock.writeLock().lock();
            try {
                this.doRemoveDocId(oldSegment, oldDocId);
                this.doAddDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
            }
            finally {
                this._upsertViewLock.writeLock().unlock();
            }
        } else if (this._consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
            this._upsertViewLock.readLock().lock();
            try {
                this.doRemoveDocId(oldSegment, oldDocId);
                this.doAddDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
                this._updatedSegmentsSinceLastRefresh.add(newSegment);
                this._updatedSegmentsSinceLastRefresh.add(oldSegment);
            }
            finally {
                this._upsertViewLock.readLock().unlock();
                this.doBatchRefreshUpsertView(this._upsertViewRefreshIntervalMs);
            }
        } else {
            this.doRemoveDocId(oldSegment, oldDocId);
            this.doAddDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
        }
        this.trackUpdatedSegmentsSinceLastSnapshot(oldSegment);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void replaceDocId(IndexSegment segment, ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int oldDocId, int newDocId, RecordInfo recordInfo) {
        if (this._consistencyMode != UpsertConfig.ConsistencyMode.SNAPSHOT) {
            this.doReplaceDocId(validDocIds, queryableDocIds, oldDocId, newDocId, recordInfo);
        } else {
            this._upsertViewLock.readLock().lock();
            try {
                this.doReplaceDocId(validDocIds, queryableDocIds, oldDocId, newDocId, recordInfo);
                this._updatedSegmentsSinceLastRefresh.add(segment);
            }
            finally {
                this._upsertViewLock.readLock().unlock();
                this.doBatchRefreshUpsertView(this._upsertViewRefreshIntervalMs);
            }
        }
    }

    private void doReplaceDocId(ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int oldDocId, int newDocId, RecordInfo recordInfo) {
        validDocIds.replace(oldDocId, newDocId);
        if (queryableDocIds != null) {
            if (recordInfo.isDeleteRecord()) {
                queryableDocIds.remove(oldDocId);
            } else {
                queryableDocIds.replace(oldDocId, newDocId);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void addDocId(IndexSegment segment, ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int docId, RecordInfo recordInfo) {
        if (this._consistencyMode != UpsertConfig.ConsistencyMode.SNAPSHOT) {
            this.doAddDocId(validDocIds, queryableDocIds, docId, recordInfo);
        } else {
            this._upsertViewLock.readLock().lock();
            try {
                this.doAddDocId(validDocIds, queryableDocIds, docId, recordInfo);
                this._updatedSegmentsSinceLastRefresh.add(segment);
            }
            finally {
                this._upsertViewLock.readLock().unlock();
                this.doBatchRefreshUpsertView(this._upsertViewRefreshIntervalMs);
            }
        }
    }

    private void doAddDocId(ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int docId, RecordInfo recordInfo) {
        validDocIds.add(docId);
        if (queryableDocIds != null && !recordInfo.isDeleteRecord()) {
            queryableDocIds.add(docId);
        }
    }

    protected void removeDocId(IndexSegment segment, int docId) {
        if (this._consistencyMode != UpsertConfig.ConsistencyMode.SNAPSHOT) {
            this.doRemoveDocId(segment, docId);
        } else {
            this._upsertViewLock.readLock().lock();
            try {
                this.doRemoveDocId(segment, docId);
                this._updatedSegmentsSinceLastRefresh.add(segment);
            }
            finally {
                this._upsertViewLock.readLock().unlock();
                this.doBatchRefreshUpsertView(this._upsertViewRefreshIntervalMs);
            }
        }
        this.trackUpdatedSegmentsSinceLastSnapshot(segment);
    }

    private void trackUpdatedSegmentsSinceLastSnapshot(IndexSegment segment) {
        if (this._enableSnapshot && segment instanceof ImmutableSegment) {
            this._snapshotLock.readLock().lock();
            try {
                this._updatedSegmentsSinceLastSnapshot.add((ImmutableSegment)segment);
            }
            finally {
                this._snapshotLock.readLock().unlock();
            }
        }
    }

    private void doRemoveDocId(IndexSegment segment, int docId) {
        Objects.requireNonNull(segment.getValidDocIds()).remove(docId);
        ThreadSafeMutableRoaringBitmap currentQueryableDocIds = segment.getQueryableDocIds();
        if (currentQueryableDocIds != null) {
            currentQueryableDocIds.remove(docId);
        }
    }

    public void setSegmentContexts(List<SegmentContext> segmentContexts, Map<String, String> queryOptions) {
        if (this._consistencyMode == UpsertConfig.ConsistencyMode.NONE) {
            this.setSegmentContexts(segmentContexts);
            return;
        }
        if (this._consistencyMode == UpsertConfig.ConsistencyMode.SYNC) {
            this._upsertViewLock.readLock().lock();
            try {
                this.setSegmentContexts(segmentContexts);
                return;
            }
            finally {
                this._upsertViewLock.readLock().unlock();
            }
        }
        long upsertViewFreshnessMs = Math.min(QueryOptionsUtils.getUpsertViewFreshnessMs(queryOptions), this._upsertViewRefreshIntervalMs);
        if (upsertViewFreshnessMs < 0L) {
            upsertViewFreshnessMs = this._upsertViewRefreshIntervalMs;
        }
        this.doBatchRefreshUpsertView(upsertViewFreshnessMs);
        Map<IndexSegment, MutableRoaringBitmap> currentUpsertView = this._segmentQueryableDocIdsMap;
        for (SegmentContext segmentContext : segmentContexts) {
            IndexSegment segment = segmentContext.getIndexSegment();
            MutableRoaringBitmap segmentView = currentUpsertView.get(segment);
            if (segmentView == null) continue;
            segmentContext.setQueryableDocIdsSnapshot(segmentView);
        }
    }

    private void setSegmentContexts(List<SegmentContext> segmentContexts) {
        for (SegmentContext segmentContext : segmentContexts) {
            IndexSegment segment = segmentContext.getIndexSegment();
            if (!this._trackedSegments.contains(segment)) continue;
            segmentContext.setQueryableDocIdsSnapshot(UpsertUtils.getQueryableDocIdsSnapshotFromSegment(segment));
        }
    }

    private boolean skipUpsertViewRefresh(long upsertViewFreshnessMs) {
        if (upsertViewFreshnessMs < 0L) {
            return true;
        }
        return this._lastUpsertViewRefreshTimeMs + upsertViewFreshnessMs > System.currentTimeMillis();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doBatchRefreshUpsertView(long upsertViewFreshnessMs) {
        if (this.skipUpsertViewRefresh(upsertViewFreshnessMs) && this._segmentQueryableDocIdsMap != null) {
            return;
        }
        this._upsertViewLock.writeLock().lock();
        try {
            Map<IndexSegment, MutableRoaringBitmap> current = this._segmentQueryableDocIdsMap;
            if (this.skipUpsertViewRefresh(upsertViewFreshnessMs) && current != null) {
                return;
            }
            HashMap<IndexSegment, MutableRoaringBitmap> updated = new HashMap<IndexSegment, MutableRoaringBitmap>();
            for (IndexSegment segment : this._trackedSegments) {
                if (current == null || current.get(segment) == null || this._updatedSegmentsSinceLastRefresh.contains(segment)) {
                    updated.put(segment, UpsertUtils.getQueryableDocIdsSnapshotFromSegment(segment));
                    continue;
                }
                updated.put(segment, current.get(segment));
            }
            this._segmentQueryableDocIdsMap = updated;
            this._updatedSegmentsSinceLastRefresh.clear();
            this._lastUpsertViewRefreshTimeMs = System.currentTimeMillis();
        }
        finally {
            this._upsertViewLock.writeLock().unlock();
        }
    }

    @VisibleForTesting
    Map<IndexSegment, MutableRoaringBitmap> getSegmentQueryableDocIdsMap() {
        return this._segmentQueryableDocIdsMap;
    }

    @VisibleForTesting
    Set<IndexSegment> getUpdatedSegmentsSinceLastRefresh() {
        return this._updatedSegmentsSinceLastRefresh;
    }

    protected void doClose() throws IOException {
    }
}

