package com.starrocks.data.load.stream.v2;

import com.starrocks.data.load.stream.DefaultStreamLoader;
import com.starrocks.data.load.stream.EnvUtils;
import com.starrocks.data.load.stream.LabelGeneratorFactory;
import com.starrocks.data.load.stream.LoadMetrics;
import com.starrocks.data.load.stream.StreamLoadManager;
import com.starrocks.data.load.stream.StreamLoadResponse;
import com.starrocks.data.load.stream.StreamLoadSnapshot;
import com.starrocks.data.load.stream.StreamLoadUtils;
import com.starrocks.data.load.stream.StreamLoader;
import com.starrocks.data.load.stream.TableRegion;
import com.starrocks.data.load.stream.TransactionStreamLoader;
import com.starrocks.data.load.stream.properties.StreamLoadProperties;
import com.starrocks.data.load.stream.v2.FlushAndCommitStrategy;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/starrocks/data/load/stream/v2/StreamLoadManagerV2.class */
public class StreamLoadManagerV2 implements StreamLoadManager, Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(StreamLoadManagerV2.class);
    private static final long serialVersionUID = 1;
    private final StreamLoadProperties properties;
    private final boolean enableAutoCommit;
    private final StreamLoader streamLoader;
    private final int maxRetries;
    private final int retryIntervalInMs;
    private final long maxCacheBytes;
    private final long maxWriteBlockCacheBytes;
    private final FlushAndCommitStrategy flushAndCommitStrategy;
    private final long scanningFrequency;
    private Thread current;
    private Thread manager;
    private volatile boolean allRegionsCommitted;
    private volatile Throwable e;
    private transient AtomicBoolean writeTriggerFlush;
    private transient LoadMetrics loadMetrics;
    private transient StreamLoadListener streamLoadListener;
    private transient LabelGeneratorFactory labelGeneratorFactory;
    private final Map<String, TableRegion> regions = new ConcurrentHashMap();
    private final AtomicLong currentCacheBytes = new AtomicLong(0);
    private final AtomicLong totalFlushRows = new AtomicLong(0);
    private final AtomicLong numberTotalRows = new AtomicLong(0);
    private final AtomicLong numberLoadRows = new AtomicLong(0);
    private volatile boolean savepoint = false;
    private final Lock lock = new ReentrantLock();
    private final Condition writable = this.lock.newCondition();
    private final Condition flushable = this.lock.newCondition();
    private final AtomicReference<State> state = new AtomicReference<>(State.INACTIVE);
    private final Queue<TransactionTableRegion> flushQ = new ConcurrentLinkedQueue();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/starrocks/data/load/stream/v2/StreamLoadManagerV2$State.class */
    public enum State {
        ACTIVE,
        INACTIVE
    }

    public StreamLoadManagerV2(StreamLoadProperties streamLoadProperties, boolean z) {
        this.properties = streamLoadProperties;
        if (!z && !streamLoadProperties.isEnableTransaction()) {
            throw new IllegalArgumentException("You must use transaction stream load if not enable auto-commit");
        }
        this.enableAutoCommit = z;
        if (z) {
            this.streamLoader = (streamLoadProperties.getMaxRetries() > 0 || !streamLoadProperties.isEnableTransaction()) ? new DefaultStreamLoader() : new TransactionStreamLoader();
            this.maxRetries = streamLoadProperties.getMaxRetries();
            this.retryIntervalInMs = streamLoadProperties.getRetryIntervalInMs();
        } else {
            this.streamLoader = new TransactionStreamLoader();
            this.maxRetries = 0;
            this.retryIntervalInMs = 0;
        }
        this.maxCacheBytes = streamLoadProperties.getMaxCacheBytes();
        this.maxWriteBlockCacheBytes = 2 * this.maxCacheBytes;
        this.scanningFrequency = streamLoadProperties.getScanningFrequency();
        this.flushAndCommitStrategy = new FlushAndCommitStrategy(streamLoadProperties, z);
    }

    @Override // com.starrocks.data.load.stream.StreamLoadManager
    public void init() {
        if (this.labelGeneratorFactory == null) {
            this.labelGeneratorFactory = new LabelGeneratorFactory.DefaultLabelGeneratorFactory(this.properties.getLabelPrefix());
        }
        this.writeTriggerFlush = new AtomicBoolean(false);
        this.loadMetrics = new LoadMetrics();
        if (this.state.compareAndSet(State.INACTIVE, State.ACTIVE)) {
            this.manager = new Thread(() -> {
                long j = -1;
                LOG.info("manager running, scanningFrequency : {}", Long.valueOf(this.scanningFrequency));
                while (true) {
                    this.lock.lock();
                    try {
                        try {
                            this.flushable.await(this.scanningFrequency, TimeUnit.MILLISECONDS);
                            this.lock.unlock();
                            if (j == -1 || System.currentTimeMillis() - j > 10000) {
                                j = System.currentTimeMillis();
                                LOG.debug("Audit information: {}, {}", this.loadMetrics, this.flushAndCommitStrategy);
                            }
                            if (this.savepoint) {
                                for (TransactionTableRegion transactionTableRegion : this.flushQ) {
                                    LOG.debug("Trigger flush table region {} because of savepoint, region cache bytes: {}, flush: {}", new Object[]{transactionTableRegion.getUniqueKey(), Long.valueOf(transactionTableRegion.getCacheBytes()), Boolean.valueOf(transactionTableRegion.flush(FlushReason.FORCE))});
                                }
                                if (this.enableAutoCommit) {
                                    int i = 0;
                                    for (TransactionTableRegion transactionTableRegion2 : this.flushQ) {
                                        boolean commit = transactionTableRegion2.commit();
                                        if (commit && transactionTableRegion2.getCacheBytes() == 0) {
                                            i++;
                                            transactionTableRegion2.resetAge();
                                        }
                                        LOG.debug("Commit region {} for savepoint, success: {}", transactionTableRegion2.getUniqueKey(), Boolean.valueOf(commit));
                                    }
                                    if (i == this.flushQ.size()) {
                                        this.allRegionsCommitted = true;
                                        LOG.info("All regions committed for savepoint, number of regions: {}", Integer.valueOf(i));
                                    } else {
                                        LOG.debug("Some regions not committed for savepoint, expected num: {}, actual num: {}", Integer.valueOf(this.flushQ.size()), Integer.valueOf(i));
                                    }
                                }
                                LockSupport.unpark(this.current);
                            } else {
                                for (TransactionTableRegion transactionTableRegion3 : this.flushQ) {
                                    transactionTableRegion3.getAndIncrementAge();
                                    if (this.flushAndCommitStrategy.shouldCommit(transactionTableRegion3)) {
                                        boolean commit2 = transactionTableRegion3.commit();
                                        if (commit2) {
                                            transactionTableRegion3.resetAge();
                                        }
                                        LOG.debug("Commit region {} for normal, success: {}", transactionTableRegion3.getUniqueKey(), Boolean.valueOf(commit2));
                                    }
                                }
                                for (FlushAndCommitStrategy.SelectFlushResult selectFlushResult : this.flushAndCommitStrategy.selectFlushRegions(this.flushQ, this.currentCacheBytes.get())) {
                                    TransactionTableRegion region = selectFlushResult.getRegion();
                                    LOG.debug("Trigger flush table region {} because of selection, region cache bytes: {}, flush: {}", new Object[]{region.getUniqueKey(), Long.valueOf(region.getCacheBytes()), Boolean.valueOf(region.flush(selectFlushResult.getReason()))});
                                }
                            }
                        } catch (InterruptedException e) {
                            if (this.savepoint) {
                                this.savepoint = false;
                                LockSupport.unpark(this.current);
                            }
                            this.lock.unlock();
                            return;
                        }
                    } catch (Throwable th) {
                        this.lock.unlock();
                        throw th;
                    }
                }
            }, "StarRocks-Sink-Manager");
            this.manager.setDaemon(true);
            this.manager.start();
            this.manager.setUncaughtExceptionHandler((thread, th) -> {
                LOG.error("StarRocks-Sink-Manager error", th);
                this.e = th;
            });
            LOG.info("StarRocks-Sink-Manager start, enableAutoCommit: {}, streamLoader: {}, {}", new Object[]{Boolean.valueOf(this.enableAutoCommit), this.streamLoader.getClass().getName(), EnvUtils.getGitInformation()});
            this.streamLoader.start(this.properties, this);
        }
    }

    public void setStreamLoadListener(StreamLoadListener streamLoadListener) {
        this.streamLoadListener = streamLoadListener;
    }

    public void setLabelGeneratorFactory(LabelGeneratorFactory labelGeneratorFactory) {
        this.labelGeneratorFactory = labelGeneratorFactory;
    }

    @Override // com.starrocks.data.load.stream.StreamLoadManager
    public void write(String str, String str2, String str3, String... strArr) {
        TableRegion cacheRegion = getCacheRegion(str, str2, str3);
        for (String str4 : strArr) {
            AssertNotException();
            if (LOG.isTraceEnabled()) {
                Logger logger = LOG;
                Object[] objArr = new Object[4];
                objArr[0] = str == null ? "null" : str;
                objArr[1] = str2;
                objArr[2] = str3;
                objArr[3] = str4;
                logger.trace("Write uniqueKey {}, database {}, table {}, row {}", objArr);
            }
            long addAndGet = this.currentCacheBytes.addAndGet(cacheRegion.write(str4.getBytes(StandardCharsets.UTF_8)));
            if (addAndGet >= this.maxWriteBlockCacheBytes) {
                long nanoTime = System.nanoTime();
                this.lock.lock();
                int i = 0;
                while (this.currentCacheBytes.get() >= this.maxWriteBlockCacheBytes) {
                    try {
                        try {
                            AssertNotException();
                            LOG.info("Cache full, wait flush, currentBytes: {}, maxWriteBlockCacheBytes: {}", Long.valueOf(this.currentCacheBytes.get()), Long.valueOf(this.maxWriteBlockCacheBytes));
                            this.flushable.signal();
                            i++;
                            this.writable.await(Math.min(i, 5), TimeUnit.SECONDS);
                        } catch (InterruptedException e) {
                            this.e = e;
                            throw new RuntimeException(e);
                        }
                    } finally {
                        this.lock.unlock();
                    }
                }
                this.loadMetrics.updateWriteBlock(1, System.nanoTime() - nanoTime);
            } else if (addAndGet >= this.maxCacheBytes && this.writeTriggerFlush.compareAndSet(false, true)) {
                this.lock.lock();
                try {
                    this.flushable.signal();
                    this.lock.unlock();
                    this.loadMetrics.updateWriteTriggerFlush(1);
                    LOG.info("Trigger flush, currentBytes: {}, maxCacheBytes: {}", Long.valueOf(addAndGet), Long.valueOf(this.maxCacheBytes));
                } finally {
                    this.lock.unlock();
                }
            }
        }
    }

    @Override // com.starrocks.data.load.stream.StreamLoadManager
    public void callback(StreamLoadResponse streamLoadResponse) {
        long andAdd = streamLoadResponse.getFlushBytes() != null ? this.currentCacheBytes.getAndAdd(-streamLoadResponse.getFlushBytes().longValue()) : this.currentCacheBytes.get();
        if (streamLoadResponse.getFlushRows() != null) {
            this.totalFlushRows.addAndGet(streamLoadResponse.getFlushRows().longValue());
        }
        this.writeTriggerFlush.set(false);
        LOG.info("Receive load response, cacheByteBeforeFlush: {}, currentCacheBytes: {}, totalFlushRows : {}", new Object[]{Long.valueOf(andAdd), Long.valueOf(this.currentCacheBytes.get()), Long.valueOf(this.totalFlushRows.get())});
        this.lock.lock();
        try {
            this.writable.signal();
            this.lock.unlock();
            if (streamLoadResponse.getException() != null) {
                LOG.error("Stream load failed", streamLoadResponse.getException());
                this.e = streamLoadResponse.getException();
            }
            if (streamLoadResponse.getBody() != null) {
                if (streamLoadResponse.getBody().getNumberTotalRows() != null) {
                    this.numberTotalRows.addAndGet(streamLoadResponse.getBody().getNumberTotalRows().longValue());
                }
                if (streamLoadResponse.getBody().getNumberLoadedRows() != null) {
                    this.numberLoadRows.addAndGet(streamLoadResponse.getBody().getNumberLoadedRows().longValue());
                }
            }
            if (streamLoadResponse.getException() != null) {
                this.loadMetrics.updateFailedLoad();
            } else {
                this.loadMetrics.updateSuccessLoad(streamLoadResponse);
            }
            if (this.streamLoadListener != null) {
                this.streamLoadListener.onResponse(streamLoadResponse);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("{}", this.loadMetrics);
            }
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @Override // com.starrocks.data.load.stream.StreamLoadManager
    public void callback(Throwable th) {
        LOG.error("Stream load failed", th);
        this.e = th;
    }

    public Throwable getException() {
        return this.e;
    }

    @Override // com.starrocks.data.load.stream.StreamLoadManager
    public void flush() {
        LOG.info("Stream load manager flush");
        this.savepoint = true;
        this.allRegionsCommitted = false;
        this.current = Thread.currentThread();
        while (!isSavepointFinished()) {
            AssertNotException();
            this.lock.lock();
            try {
                this.flushable.signal();
                LockSupport.park(this.current);
                if (!this.savepoint) {
                    break;
                }
                try {
                    Iterator<TableRegion> it = this.regions.values().iterator();
                    while (it.hasNext()) {
                        Future<?> result = it.next().getResult();
                        if (result != null) {
                            result.get();
                        }
                    }
                } catch (InterruptedException | ExecutionException e) {
                    LOG.warn("Flush get result failed", e);
                }
            } finally {
                this.lock.unlock();
            }
        }
        this.savepoint = false;
    }

    @Override // com.starrocks.data.load.stream.StreamLoadManager
    public StreamLoadSnapshot snapshot() {
        StreamLoadSnapshot snapshot = StreamLoadSnapshot.snapshot(this.regions.values());
        Iterator<TableRegion> it = this.regions.values().iterator();
        while (it.hasNext()) {
            it.next().setLabel(null);
        }
        return snapshot;
    }

    public StreamLoader getStreamLoader() {
        return this.streamLoader;
    }

    @Override // com.starrocks.data.load.stream.StreamLoadManager
    public boolean prepare(StreamLoadSnapshot streamLoadSnapshot) {
        return this.streamLoader.prepare(streamLoadSnapshot);
    }

    @Override // com.starrocks.data.load.stream.StreamLoadManager
    public boolean commit(StreamLoadSnapshot streamLoadSnapshot) {
        return this.streamLoader.commit(streamLoadSnapshot);
    }

    @Override // com.starrocks.data.load.stream.StreamLoadManager
    public boolean abort(StreamLoadSnapshot streamLoadSnapshot) {
        return this.streamLoader.rollback(streamLoadSnapshot);
    }

    @Override // com.starrocks.data.load.stream.StreamLoadManager
    public void close() {
        if (this.state.compareAndSet(State.ACTIVE, State.INACTIVE)) {
            LOG.info("StreamLoadManagerV2 close, loadMetrics: {}, flushAndCommit: {}", this.loadMetrics, this.flushAndCommitStrategy);
            this.manager.interrupt();
            this.streamLoader.close();
        }
    }

    private boolean isSavepointFinished() {
        return this.currentCacheBytes.compareAndSet(0L, 0L) && (!this.enableAutoCommit || this.allRegionsCommitted);
    }

    private void AssertNotException() {
        if (this.e != null) {
            LOG.error("catch exception, wait rollback ", this.e);
            this.streamLoader.rollback(snapshot());
            close();
            throw new RuntimeException(this.e);
        }
    }

    protected TableRegion getCacheRegion(String str, String str2, String str3) {
        if (str == null) {
            str = StreamLoadUtils.getTableUniqueKey(str2, str3);
        }
        TableRegion tableRegion = this.regions.get(str);
        if (tableRegion == null) {
            synchronized (this.regions) {
                tableRegion = this.regions.get(str);
                if (tableRegion == null) {
                    tableRegion = new TransactionTableRegion(str, str2, str3, this, this.properties.getTableProperties(str, str2, str3), this.streamLoader, this.labelGeneratorFactory.create(str2, str3), this.maxRetries, this.retryIntervalInMs);
                    this.regions.put(str, tableRegion);
                    this.flushQ.offer((TransactionTableRegion) tableRegion);
                }
            }
        }
        return tableRegion;
    }
}
