package com.starrocks.data.load.stream.v2;

import com.starrocks.data.load.stream.Chunk;
import com.starrocks.data.load.stream.LabelGenerator;
import com.starrocks.data.load.stream.StreamLoadManager;
import com.starrocks.data.load.stream.StreamLoadResponse;
import com.starrocks.data.load.stream.StreamLoadSnapshot;
import com.starrocks.data.load.stream.StreamLoader;
import com.starrocks.data.load.stream.TableRegion;
import com.starrocks.data.load.stream.compress.CompressionCodec;
import com.starrocks.data.load.stream.compress.CompressionHttpEntity;
import com.starrocks.data.load.stream.exception.ErrorUtils;
import com.starrocks.data.load.stream.exception.StreamLoadFailException;
import com.starrocks.data.load.stream.http.StreamLoadEntityMeta;
import com.starrocks.data.load.stream.properties.StreamLoadTableProperties;
import com.starrocks.streamload.shade.org.apache.http.HttpEntity;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/starrocks/data/load/stream/v2/TransactionTableRegion.class */
public class TransactionTableRegion implements TableRegion {
    private static final Logger LOG = LoggerFactory.getLogger(TransactionTableRegion.class);
    private final StreamLoadManager manager;
    private final StreamLoader streamLoader;
    private final LabelGenerator labelGenerator;
    private final String uniqueKey;
    private final String database;
    private final String table;
    private final StreamLoadTableProperties properties;
    private final Optional<CompressionCodec> compressionCodec;
    private volatile Chunk activeChunk;
    private volatile String label;
    private volatile Future<?> responseFuture;
    private final int maxRetries;
    private final int retryIntervalInMs;
    private volatile int numRetries;
    private volatile long lastFailTimeMs;
    private volatile Throwable firstException;
    private final AtomicLong age = new AtomicLong(0);
    private final AtomicLong cacheBytes = new AtomicLong();
    private final AtomicLong cacheRows = new AtomicLong();
    private final AtomicBoolean ctl = new AtomicBoolean(false);
    private final ConcurrentLinkedQueue<Chunk> inactiveChunks = new ConcurrentLinkedQueue<>();
    private final AtomicReference<State> state = new AtomicReference<>(State.ACTIVE);
    private volatile long lastCommitTimeMills = System.currentTimeMillis();

    /* loaded from: input_file:com/starrocks/data/load/stream/v2/TransactionTableRegion$State.class */
    enum State {
        ACTIVE,
        FLUSHING,
        COMMITTING
    }

    public TransactionTableRegion(String str, String str2, String str3, StreamLoadManager streamLoadManager, StreamLoadTableProperties streamLoadTableProperties, StreamLoader streamLoader, LabelGenerator labelGenerator, int i, int i2) {
        this.uniqueKey = str;
        this.database = str2;
        this.table = str3;
        this.manager = streamLoadManager;
        this.properties = streamLoadTableProperties;
        this.streamLoader = streamLoader;
        this.labelGenerator = labelGenerator;
        this.compressionCodec = CompressionCodec.createCompressionCodec(streamLoadTableProperties.getDataFormat(), streamLoadTableProperties.getProperty("compression"), streamLoadTableProperties.getTableProperties());
        this.activeChunk = new Chunk(streamLoadTableProperties.getDataFormat());
        this.maxRetries = i;
        this.retryIntervalInMs = i2;
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public StreamLoadTableProperties getProperties() {
        return this.properties;
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public String getUniqueKey() {
        return this.uniqueKey;
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public String getDatabase() {
        return this.database;
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public String getTable() {
        return this.table;
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public LabelGenerator getLabelGenerator() {
        return this.labelGenerator;
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public void setLabel(String str) {
        if (this.numRetries <= 0 || str == null) {
            this.label = str;
        }
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public String getLabel() {
        return this.label;
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public long getCacheBytes() {
        return this.cacheBytes.get();
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public void resetAge() {
        this.age.set(0L);
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public long getAndIncrementAge() {
        return this.age.getAndIncrement();
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public long getAge() {
        return this.age.get();
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public int write(byte[] bArr) {
        int write0;
        if (bArr == null) {
            return 0;
        }
        if (this.ctl.compareAndSet(false, true)) {
            write0 = write0(bArr);
            this.ctl.set(false);
            return write0;
        }
        do {
        } while (!this.ctl.compareAndSet(false, true));
        write0 = write0(bArr);
        this.ctl.set(false);
        return write0;
    }

    private void switchChunk() {
        if (this.activeChunk == null || this.activeChunk.numRows() == 0) {
            return;
        }
        this.inactiveChunks.add(this.activeChunk);
        this.activeChunk = new Chunk(this.properties.getDataFormat());
    }

    protected int write0(byte[] bArr) {
        if (this.activeChunk.estimateChunkSize(bArr) > this.properties.getChunkLimit().longValue() || this.activeChunk.numRows() >= this.properties.getMaxBufferRows()) {
            switchChunk();
        }
        this.activeChunk.addRow(bArr);
        this.cacheBytes.addAndGet(bArr.length);
        this.cacheRows.incrementAndGet();
        return bArr.length;
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public boolean isFlushing() {
        return this.state.get() == State.FLUSHING;
    }

    public FlushReason shouldFlush() {
        if (this.state.get() == State.ACTIVE && this.cacheRows.get() >= this.properties.getMaxBufferRows()) {
            return FlushReason.BUFFER_ROWS_REACH_LIMIT;
        }
        return FlushReason.NONE;
    }

    public boolean flush(FlushReason flushReason) {
        LOG.debug("Try to flush db: {}, table: {}, label: {}, cacheBytes: {}, cacheRows: {}, reason: {}", new Object[]{this.database, this.table, this.label, this.cacheBytes, this.cacheRows, flushReason});
        if (!this.state.compareAndSet(State.ACTIVE, State.FLUSHING)) {
            return false;
        }
        do {
        } while (!this.ctl.compareAndSet(false, true));
        if (flushReason != FlushReason.BUFFER_ROWS_REACH_LIMIT || this.activeChunk.numRows() >= this.properties.getMaxBufferRows()) {
            switchChunk();
        }
        this.ctl.set(false);
        if (this.inactiveChunks.isEmpty()) {
            this.state.compareAndSet(State.FLUSHING, State.ACTIVE);
            return false;
        }
        LOG.info("Flush db: {}, table: {}, label: {}, cacheBytes: {}, cacheRows: {}, reason: {}", new Object[]{this.database, this.table, this.label, Long.valueOf(this.cacheBytes.get()), Long.valueOf(this.cacheRows.get()), flushReason});
        streamLoad(0);
        return true;
    }

    public boolean commit() {
        LOG.debug("Try to commit, db: {}, table: {}, label: {}", new Object[]{this.database, this.table, this.label});
        boolean z = false;
        if (!this.state.compareAndSet(State.ACTIVE, State.COMMITTING)) {
            if (this.state.get() != State.COMMITTING) {
                return false;
            }
            z = true;
        }
        if (z) {
            if (this.label != null) {
                return false;
            }
            this.state.compareAndSet(State.COMMITTING, State.ACTIVE);
            LOG.debug("Success to commit, db: {}, table: {}", this.database, this.table);
            return true;
        }
        if (this.label == null) {
            boolean z2 = this.cacheBytes.get() == 0;
            this.state.compareAndSet(State.COMMITTING, State.ACTIVE);
            if (z2) {
                LOG.debug("Success to commit, db: {}, table: {}", this.database, this.table);
            }
            return z2;
        }
        try {
            this.streamLoader.getExecutorService().submit(this::doCommit);
            return false;
        } catch (Exception e) {
            LOG.error("Failed to submit commit task, db: {}, table: {}, label: {}", new Object[]{this.database, this.table, this.label, e});
            throw e;
        }
    }

    private void doCommit() {
        StreamLoadSnapshot.Transaction transaction = new StreamLoadSnapshot.Transaction(this.database, this.table, this.label);
        try {
        } catch (Throwable th) {
            LOG.error("TransactionTableRegion commit failed, db: {}, table: {}, label: {}", new Object[]{this.database, this.table, this.label, th});
            fail(th);
        }
        if (!this.streamLoader.prepare(transaction)) {
            throw new StreamLoadFailException("Failed to prepare transaction, please check taskmanager log for details, " + transaction);
        }
        if (!this.streamLoader.commit(transaction)) {
            throw new StreamLoadFailException("Failed to commit transaction, please check taskmanager log for details, " + transaction);
        }
        long currentTimeMillis = System.currentTimeMillis();
        long j = currentTimeMillis - this.lastCommitTimeMills;
        this.lastCommitTimeMills = currentTimeMillis;
        this.label = null;
        LOG.info("Success to commit transaction: {}, duration: {} ms", transaction, Long.valueOf(j));
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public void fail(Throwable th) {
        if (this.firstException == null) {
            this.firstException = th;
        }
        if (this.numRetries >= this.maxRetries || !ErrorUtils.isRetryable(th)) {
            LOG.error("Failed to flush data for db: {}, table: {} after {} times retry, the last exception is", new Object[]{this.database, this.table, Integer.valueOf(this.numRetries), th});
            this.manager.callback(this.firstException);
            return;
        }
        this.responseFuture = null;
        this.numRetries++;
        this.lastFailTimeMs = System.currentTimeMillis();
        LOG.warn("Failed to flush data for db: {}, table: {}, and will retry for {} times after {} ms", new Object[]{this.database, this.table, Integer.valueOf(this.numRetries), Integer.valueOf(this.retryIntervalInMs), th});
        streamLoad(this.retryIntervalInMs);
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public void complete(StreamLoadResponse streamLoadResponse) {
        Chunk remove = this.inactiveChunks.remove();
        this.cacheBytes.addAndGet(-remove.rowBytes());
        this.cacheRows.addAndGet(-remove.numRows());
        streamLoadResponse.setFlushBytes(remove.rowBytes());
        streamLoadResponse.setFlushRows(remove.numRows());
        this.manager.callback(streamLoadResponse);
        this.numRetries = 0;
        this.firstException = null;
        if (!this.inactiveChunks.isEmpty()) {
            LOG.info("Stream load continue, db: {}, table: {}, label: {}, cacheBytes: {}, cacheRows: {}", new Object[]{this.database, this.table, this.label, this.cacheBytes, this.cacheRows});
            streamLoad(0);
        } else if (this.state.compareAndSet(State.FLUSHING, State.ACTIVE)) {
            LOG.info("Stream load completed, db: {}, table: {}, label: {}, cacheBytes: {}, cacheRows: {}", new Object[]{this.database, this.table, this.label, this.cacheBytes, this.cacheRows});
        }
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public Future<?> getResult() {
        return this.responseFuture;
    }

    protected void streamLoad(int i) {
        try {
            Chunk peek = this.inactiveChunks.peek();
            LOG.info("Stream load chunk, db: {}, table: {}, numRows: {}, rowBytes: {}, chunkBytes: {}", new Object[]{this.database, this.table, Integer.valueOf(peek.numRows()), Long.valueOf(peek.rowBytes()), Long.valueOf(peek.chunkBytes())});
            this.responseFuture = this.streamLoader.send(this, i);
        } catch (Exception e) {
            fail(e);
        }
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public HttpEntity getHttpEntity() {
        ChunkHttpEntity chunkHttpEntity = new ChunkHttpEntity(this.uniqueKey, this.inactiveChunks.peek());
        return (HttpEntity) this.compressionCodec.map(compressionCodec -> {
            return new CompressionHttpEntity(chunkHttpEntity, compressionCodec);
        }).orElse(chunkHttpEntity);
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public long getLastWriteTimeMillis() {
        throw new UnsupportedOperationException();
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public void setResult(Future<?> future) {
        throw new UnsupportedOperationException();
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public void callback(StreamLoadResponse streamLoadResponse) {
        throw new UnsupportedOperationException();
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public long getFlushBytes() {
        throw new UnsupportedOperationException();
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public byte[] read() {
        throw new UnsupportedOperationException();
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public StreamLoadEntityMeta getEntityMeta() {
        throw new UnsupportedOperationException();
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public boolean testPrepare() {
        throw new UnsupportedOperationException();
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public boolean prepare() {
        throw new UnsupportedOperationException();
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public boolean cancel() {
        throw new UnsupportedOperationException();
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public boolean isReadable() {
        throw new UnsupportedOperationException();
    }

    @Override // com.starrocks.data.load.stream.TableRegion
    public boolean flush() {
        throw new UnsupportedOperationException();
    }
}
