package com.alibaba.ververica.connectors.common.dim.cache;

import java.io.IOException;
import org.apache.flink.streaming.natives.VectorizedDataChunk;
import org.apache.flink.table.runtime.natives.NativeTableTypeUtils;
import org.apache.flink.table.runtime.natives.PlainRowDataEncoder;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/ververica/connectors/common/dim/cache/NativeAllCacheBase.class */
public abstract class NativeAllCacheBase<V> extends AllCache<Object, V> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) NativeAllCacheBase.class);
    private final String tableName;
    private int batchSize;
    private long offHeapMemorySize;
    private RowDataSerializer rowDataSerializer;
    private VectorizedDataChunk reusedLookupVector;
    private volatile transient long nativeLookupTableAddr;
    protected transient PlainRowDataEncoder encoder;
    private RowType rowType = null;
    private volatile boolean isInitialized = false;
    private volatile boolean isRemoved = false;

    public NativeAllCacheBase(String str) {
        this.tableName = str;
    }

    protected abstract boolean hasPrimaryKey();

    public NativeAllCacheBase<V> setBatchSize(int i) {
        this.batchSize = i;
        return this;
    }

    public NativeAllCacheBase<V> setOffHeapMemorySize(long j) {
        this.offHeapMemorySize = j;
        return this;
    }

    public NativeAllCacheBase<V> setRowDataSerializer(RowDataSerializer rowDataSerializer) {
        this.rowDataSerializer = rowDataSerializer;
        return this;
    }

    public NativeAllCacheBase<V> setRowType(RowType rowType) {
        this.rowType = rowType;
        return this;
    }

    @Override // com.alibaba.ververica.connectors.common.dim.cache.AllCache, com.alibaba.ververica.connectors.common.dim.cache.Cache
    public V get(Object obj) {
        throw new UnsupportedOperationException("We should not call get() method in Native Cache.");
    }

    @Override // com.alibaba.ververica.connectors.common.dim.cache.AllCache
    public synchronized void initialize() {
        super.initialize();
        if (this.isInitialized) {
            NativeCacheUtils.resetWriteLookupTable(this.nativeLookupTableAddr);
            LOG.info("Initialize Native All Cache: {} after resetting lookup table for insertion.", this.tableName);
            return;
        }
        LOG.info("Initialize Native All Cache: {} for first time and create data structures in native side.", this.tableName);
        long initializeLookupTable = NativeCacheUtils.initializeLookupTable(this.nativeLookupTableAddr);
        Preconditions.checkArgument(initializeLookupTable != 0);
        this.reusedLookupVector = new VectorizedDataChunk(initializeLookupTable);
        this.encoder = new PlainRowDataEncoder(this.batchSize, NativeTableTypeUtils.toNativeType(this.rowType), this.rowDataSerializer, NativeAllCacheSharedResource.getInstance().getMemoryPool());
        this.encoder.initReusedResult(this.reusedLookupVector);
        this.isInitialized = true;
    }

    public synchronized void open(long j) {
        LOG.info("Open Native All Cache...");
        Preconditions.checkArgument(j != 0);
        this.nativeLookupTableAddr = createLookupTable(j);
        Preconditions.checkArgument(this.nativeLookupTableAddr != 0);
    }

    public long createLookupTable(long j) {
        NativeAllCacheSharedResource.setQuota(this.offHeapMemorySize);
        return NativeCacheUtils.createLookupTable(j, NativeAllCacheSharedResource.getInstance().getMemoryPool().getNativeMemoryPool(), this.tableName, hasPrimaryKey());
    }

    public synchronized void removeLookupTable() {
        this.isRemoved = true;
        NativeCacheUtils.removeLookupTable(this.tableName);
    }

    @Override // com.alibaba.ververica.connectors.common.dim.cache.AllCache
    public synchronized void switchCache() {
        this.lock.writeLock().lock();
        try {
            try {
                if (this.isRemoved) {
                    LOG.info("The Native All Cache of {} is removed. Skip switch cache.", this.tableName);
                } else {
                    LOG.info("Switch Native All Cache: {}", this.tableName);
                    flush();
                    NativeCacheUtils.switchLookupTableCache(this.nativeLookupTableAddr);
                }
                setLastUpdated(System.currentTimeMillis());
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override // com.alibaba.ververica.connectors.common.dim.cache.AllCache, com.alibaba.ververica.connectors.common.dim.cache.Cache
    public synchronized long size() {
        this.lock.readLock().lock();
        try {
            if (!this.isRemoved) {
                return NativeCacheUtils.getLookupTableSize(this.nativeLookupTableAddr);
            }
            LOG.info("The Native All Cache of {} is removed. Skip reading size.", this.tableName);
            return 0L;
        } finally {
            this.lock.readLock().unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void flush() throws IOException {
        if (this.encoder == null || this.isRemoved) {
            return;
        }
        try {
            NativeCacheUtils.insertBatchToLookupTable(this.nativeLookupTableAddr, this.encoder.flush());
        } finally {
            this.encoder.reset();
        }
    }
}
