/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.segment.local.dedup;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AtomicDouble;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.metrics.AbstractMetrics;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.dedup.DedupContext;
import org.apache.pinot.segment.local.dedup.DedupRecordInfo;
import org.apache.pinot.segment.local.dedup.DedupUtils;
import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.utils.SegmentPreloadUtils;
import org.apache.pinot.segment.local.utils.WatermarkUtils;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BasePartitionDedupMetadataManager
implements PartitionDedupMetadataManager {
    protected static final double TTL_WATERMARK_NOT_SET = 0.0;
    protected final String _tableNameWithType;
    protected final List<String> _primaryKeyColumns;
    protected final int _partitionId;
    protected final DedupContext _context;
    protected final ServerMetrics _serverMetrics;
    protected final HashFunction _hashFunction;
    protected final double _metadataTTL;
    protected final String _dedupTimeColumn;
    protected final AtomicDouble _largestSeenTime;
    protected final File _tableIndexDir;
    protected final Logger _logger;
    private boolean _stopped;
    private int _numPendingOperations = 1;
    private boolean _closed;
    private final Lock _preloadLock = new ReentrantLock();
    private volatile boolean _isPreloading;

    protected BasePartitionDedupMetadataManager(String tableNameWithType, int partitionId, DedupContext dedupContext) {
        this._tableNameWithType = tableNameWithType;
        this._partitionId = partitionId;
        this._context = dedupContext;
        this._primaryKeyColumns = dedupContext.getPrimaryKeyColumns();
        this._hashFunction = dedupContext.getHashFunction();
        this._isPreloading = dedupContext.isPreloadEnabled();
        this._metadataTTL = dedupContext.getMetadataTTL() >= 0.0 ? dedupContext.getMetadataTTL() : 0.0;
        this._dedupTimeColumn = dedupContext.getDedupTimeColumn();
        this._tableIndexDir = dedupContext.getTableIndexDir();
        this._serverMetrics = ServerMetrics.get();
        this._logger = LoggerFactory.getLogger((String)(tableNameWithType + "-" + partitionId + "-" + this.getClass().getSimpleName()));
        if (this._metadataTTL > 0.0) {
            Preconditions.checkArgument((this._dedupTimeColumn != null ? 1 : 0) != 0, (String)"When metadataTTL is configured, metadata time column must be configured for dedup enabled table: %s", (Object)tableNameWithType);
            this._largestSeenTime = new AtomicDouble(WatermarkUtils.loadWatermark(this.getWatermarkFile(), 0.0));
        } else {
            this._largestSeenTime = new AtomicDouble(0.0);
            WatermarkUtils.deleteWatermark(this.getWatermarkFile());
        }
    }

    @Override
    public boolean checkRecordPresentOrUpdate(PrimaryKey pk, IndexSegment indexSegment) {
        throw new UnsupportedOperationException("checkRecordPresentOrUpdate(PrimaryKey pk, IndexSegment indexSegment) is deprecated!");
    }

    @Override
    public boolean isPreloading() {
        return this._isPreloading;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void preloadSegments(IndexLoadingConfig indexLoadingConfig) {
        if (!this._isPreloading) {
            return;
        }
        TableDataManager tableDataManager = this._context.getTableDataManager();
        Preconditions.checkNotNull((Object)tableDataManager, (Object)"Preloading segments requires tableDataManager");
        HelixManager helixManager = tableDataManager.getHelixManager();
        ExecutorService segmentPreloadExecutor = tableDataManager.getSegmentPreloadExecutor();
        this._preloadLock.lock();
        try {
            if (!this._isPreloading) {
                return;
            }
            long startTime = System.currentTimeMillis();
            this.doPreloadSegments(tableDataManager, indexLoadingConfig, helixManager, segmentPreloadExecutor);
            long duration = System.currentTimeMillis() - startTime;
            this._serverMetrics.addTimedTableValue(this._tableNameWithType, (AbstractMetrics.Timer)ServerTimer.DEDUP_PRELOAD_TIME_MS, duration, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            this._logger.warn("Failed to preload segments from partition: {} of table: {}, skipping", new Object[]{this._partitionId, this._tableNameWithType, e});
            this._serverMetrics.addMeteredTableValue(this._tableNameWithType, (AbstractMetrics.Meter)ServerMeter.DEDUP_PRELOAD_FAILURE, 1L);
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
        }
        finally {
            this._isPreloading = false;
            this._preloadLock.unlock();
        }
    }

    protected void doPreloadSegments(TableDataManager tableDataManager, IndexLoadingConfig indexLoadingConfig, HelixManager helixManager, ExecutorService segmentPreloadExecutor) throws Exception {
        SegmentPreloadUtils.preloadSegments(tableDataManager, this._partitionId, indexLoadingConfig, helixManager, segmentPreloadExecutor, null);
    }

    @Override
    public void preloadSegment(ImmutableSegment segment) {
        String segmentName = segment.getSegmentName();
        if (segment instanceof EmptyIndexSegment) {
            this._logger.info("Skip adding empty segment: {}", (Object)segmentName);
            return;
        }
        Preconditions.checkArgument((boolean)(segment instanceof ImmutableSegmentImpl), (String)"Got unsupported segment implementation: %s for segment: %s, table: %s", segment.getClass(), (Object)segmentName, (Object)this._tableNameWithType);
        if (!this.startOperation()) {
            this._logger.info("Skip preloading segment: {} because dedup metadata manager is already stopped", (Object)segmentName);
            return;
        }
        try {
            if (this.skipSegmentOutOfTTL((IndexSegment)segment, true)) {
                return;
            }
            try (DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = new DedupUtils.DedupRecordInfoReader((IndexSegment)segment, this._primaryKeyColumns, this._dedupTimeColumn);){
                Iterator<DedupRecordInfo> dedupRecordInfoIterator = DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, segment.getSegmentMetadata().getTotalDocs());
                this.doPreloadSegment(segment, dedupRecordInfoIterator);
                this.updatePrimaryKeyGauge();
            }
        }
        catch (Exception e) {
            throw new RuntimeException(String.format("Caught exception while preloading segment: %s of table: %s in %s", segmentName, this._tableNameWithType, this.getClass().getSimpleName()), e);
        }
        finally {
            this.finishOperation();
        }
    }

    protected abstract void doPreloadSegment(ImmutableSegment var1, Iterator<DedupRecordInfo> var2);

    @Override
    public void addSegment(IndexSegment segment) {
        String segmentName = segment.getSegmentName();
        if (segment instanceof EmptyIndexSegment) {
            this._logger.info("Skip adding empty segment: {}", (Object)segmentName);
            return;
        }
        Preconditions.checkArgument((boolean)(segment instanceof ImmutableSegmentImpl), (String)"Got unsupported segment implementation: %s for segment: %s, table: %s", segment.getClass(), (Object)segmentName, (Object)this._tableNameWithType);
        if (!this.startOperation()) {
            this._logger.info("Skip adding segment: {} because dedup metadata manager is already stopped", (Object)segmentName);
            return;
        }
        try {
            if (!this.skipSegmentOutOfTTL(segment, true)) {
                this.addOrReplaceSegment(null, segment);
            }
        }
        catch (Exception e) {
            throw new RuntimeException(String.format("Caught exception while adding segment: %s of table: %s to %s", segmentName, this._tableNameWithType, this.getClass().getSimpleName()), e);
        }
        finally {
            this.finishOperation();
        }
    }

    @Override
    public void replaceSegment(IndexSegment oldSegment, IndexSegment newSegment) {
        if (!this.startOperation()) {
            this._logger.info("Skip replacing segment: {} with segment: {} because dedup metadata manager is already stopped", (Object)oldSegment.getSegmentName(), (Object)newSegment.getSegmentName());
            return;
        }
        try {
            if (!this.skipSegmentOutOfTTL(newSegment, true)) {
                this.addOrReplaceSegment(oldSegment, newSegment);
            }
        }
        catch (Exception e) {
            throw new RuntimeException(String.format("Caught exception while replacing segment: %s with segment: %s of table: %s in %s", oldSegment.getSegmentName(), newSegment.getSegmentName(), this._tableNameWithType, this.getClass().getSimpleName()), e);
        }
        finally {
            this.finishOperation();
        }
    }

    protected boolean skipSegmentOutOfTTL(IndexSegment segment, boolean updateWatermark) {
        if (this._metadataTTL <= 0.0) {
            return false;
        }
        double maxDedupTime = this.getMaxDedupTime(segment);
        if (updateWatermark) {
            this._largestSeenTime.getAndUpdate(time -> Math.max(time, maxDedupTime));
        }
        if (!this.isOutOfMetadataTTL(maxDedupTime)) {
            return false;
        }
        this._logger.info("Skip segment: {} as max dedupTime: {} is out of TTL: {}", new Object[]{segment.getSegmentName(), maxDedupTime, this._metadataTTL});
        return true;
    }

    private void addOrReplaceSegment(@Nullable IndexSegment oldSegment, IndexSegment newSegment) throws IOException {
        try (DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = new DedupUtils.DedupRecordInfoReader(newSegment, this._primaryKeyColumns, this._dedupTimeColumn);){
            Iterator<DedupRecordInfo> dedupRecordInfoIterator = DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, newSegment.getSegmentMetadata().getTotalDocs());
            this.doAddOrReplaceSegment(oldSegment, newSegment, dedupRecordInfoIterator);
            this.updatePrimaryKeyGauge();
        }
    }

    protected abstract void doAddOrReplaceSegment(@Nullable IndexSegment var1, IndexSegment var2, Iterator<DedupRecordInfo> var3);

    @Override
    public void removeSegment(IndexSegment segment) {
        if (!this.startOperation()) {
            this._logger.info("Skip removing segment: {} because metadata manager is already stopped", (Object)segment.getSegmentName());
            return;
        }
        try {
            if (this.skipSegmentOutOfTTL(segment, false)) {
                return;
            }
            try (DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = new DedupUtils.DedupRecordInfoReader(segment, this._primaryKeyColumns, this._dedupTimeColumn);){
                Iterator<DedupRecordInfo> dedupRecordInfoIterator = DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, segment.getSegmentMetadata().getTotalDocs());
                this.doRemoveSegment(segment, dedupRecordInfoIterator);
                this.updatePrimaryKeyGauge();
            }
        }
        catch (Exception e) {
            throw new RuntimeException(String.format("Caught exception while removing segment: %s of table: %s from %s", segment.getSegmentName(), this._tableNameWithType, this.getClass().getSimpleName()), e);
        }
        finally {
            this.finishOperation();
        }
    }

    protected abstract void doRemoveSegment(IndexSegment var1, Iterator<DedupRecordInfo> var2);

    protected boolean isOutOfMetadataTTL(double dedupTime) {
        return this._metadataTTL > 0.0 && dedupTime < this._largestSeenTime.get() - this._metadataTTL;
    }

    protected double getMaxDedupTime(IndexSegment segment) {
        return ((Number)((Object)((ColumnMetadata)segment.getSegmentMetadata().getColumnMetadataMap().get(this._dedupTimeColumn)).getMaxValue())).doubleValue();
    }

    protected File getWatermarkFile() {
        return new File(this._tableIndexDir, "ttl.watermark.partition." + this._partitionId + ".dedup");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void removeExpiredPrimaryKeys() {
        if (this._metadataTTL <= 0.0) {
            return;
        }
        if (!this.startOperation()) {
            this._logger.info("Skip removing expired primary keys because metadata manager is already stopped");
            return;
        }
        try {
            long startTime = System.currentTimeMillis();
            this.doRemoveExpiredPrimaryKeys();
            WatermarkUtils.persistWatermark(this._largestSeenTime.get(), this.getWatermarkFile());
            long duration = System.currentTimeMillis() - startTime;
            this._serverMetrics.addTimedTableValue(this._tableNameWithType, (AbstractMetrics.Timer)ServerTimer.DEDUP_REMOVE_EXPIRED_PRIMARY_KEYS_TIME_MS, duration, TimeUnit.MILLISECONDS);
        }
        finally {
            this.finishOperation();
        }
    }

    protected abstract void doRemoveExpiredPrimaryKeys();

    protected synchronized boolean startOperation() {
        if (this._stopped || this._numPendingOperations == 0) {
            return false;
        }
        ++this._numPendingOperations;
        return true;
    }

    protected synchronized void finishOperation() {
        --this._numPendingOperations;
        if (this._numPendingOperations == 0) {
            this.notifyAll();
        }
    }

    @Override
    public synchronized void stop() {
        if (this._stopped) {
            this._logger.warn("Metadata manager is already stopped");
            return;
        }
        this._stopped = true;
        --this._numPendingOperations;
        this._logger.info("Stopped the metadata manager with {} pending operations, current primary key count: {}", (Object)this._numPendingOperations, (Object)this.getNumPrimaryKeys());
    }

    @Override
    public synchronized void close() throws IOException {
        Preconditions.checkState((boolean)this._stopped, (Object)"Must stop the metadata manager before closing it");
        if (this._closed) {
            this._logger.warn("Metadata manager is already closed");
            return;
        }
        this._closed = true;
        this._logger.info("Closing the metadata manager");
        while (this._numPendingOperations != 0) {
            this._logger.info("Waiting for {} pending operations to finish", (Object)this._numPendingOperations);
            try {
                this.wait();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(String.format("Interrupted while waiting for %d pending operations to finish", this._numPendingOperations), e);
            }
        }
        this.doClose();
        this.updatePrimaryKeyGauge(0L);
        this._logger.info("Closed the metadata manager");
    }

    protected abstract long getNumPrimaryKeys();

    protected void updatePrimaryKeyGauge(long numPrimaryKeys) {
        this._serverMetrics.setValueOfPartitionGauge(this._tableNameWithType, this._partitionId, (AbstractMetrics.Gauge)ServerGauge.DEDUP_PRIMARY_KEYS_COUNT, numPrimaryKeys);
    }

    protected void updatePrimaryKeyGauge() {
        this.updatePrimaryKeyGauge(this.getNumPrimaryKeys());
    }

    protected void doClose() throws IOException {
    }
}

