package com.alibaba.ververica.connectors.hologres.holohub;

import com.alibaba.ververica.connectors.hologres.binlog.source.converter.HoloHubBinlogRecordConverter;
import com.alibaba.ververica.connectors.hologres.binlog.source.reader.AbstractHologresBinlogShardSplitReader;
import com.alibaba.ververica.connectors.hologres.binlog.source.record.BinlogSourceRecord;
import com.alibaba.ververica.connectors.hologres.binlog.source.record.SourceRecord;
import com.alibaba.ververica.connectors.hologres.config.HologresConnectionParam;
import com.alibaba.ververica.connectors.hologres.config.JDBCOptions;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shaded.hologres.com.aliyun.datahub.client.exception.DatahubClientException;
import shaded.hologres.com.aliyun.datahub.client.model.RecordEntry;
import shaded.hologres.com.aliyun.datahub.client.model.TupleRecordData;
import shaded.hologres.com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
import shaded.hologres.com.aliyun.datahub.clientlibrary.consumer.ShardGroupReader;
import shaded.hologres.com.aliyun.datahub.clientlibrary.models.Offset;

/* loaded from: input_file:com/alibaba/ververica/connectors/hologres/holohub/HologresHoloHubBinlogShardSplitReader.class */
public class HologresHoloHubBinlogShardSplitReader<T> extends AbstractHologresBinlogShardSplitReader<T> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) HologresHoloHubBinlogShardSplitReader.class);
    public static final int DEFAULT_FETCH_LATEST_DELAY = 500;
    private final HoloHubBinlogRecordConverter binlogRecordConverter;
    private ShardGroupReader shardGroupReader;
    private final int requestTimeout;
    private final int retryInterval;
    private final int retryTimesLimit;
    int retryTimes;

    public HologresHoloHubBinlogShardSplitReader(HoloHubBinlogRecordConverter holoHubBinlogRecordConverter, HologresConnectionParam hologresConnectionParam, JDBCOptions jDBCOptions, String[] strArr, DataType[] dataTypeArr, SourceReaderContext sourceReaderContext, String str, long j) {
        super(hologresConnectionParam, jDBCOptions, strArr, dataTypeArr, sourceReaderContext, str, j);
        this.retryTimes = 0;
        this.binlogRecordConverter = holoHubBinlogRecordConverter;
        this.requestTimeout = hologresConnectionParam.getBinlogRequestTimeoutMs();
        this.retryTimesLimit = hologresConnectionParam.getBinlogMaxRetryTimes();
        this.retryInterval = hologresConnectionParam.getSqlRetryInterval();
    }

    public AbstractHologresBinlogShardSplitReader.HologresShardSplitRecords<SourceRecord<T>> binlogFetch() throws IOException {
        AbstractHologresBinlogShardSplitReader.HologresShardSplitRecords<SourceRecord<T>> hologresShardSplitRecords = new AbstractHologresBinlogShardSplitReader.HologresShardSplitRecords<>();
        int i = 0;
        boolean z = false;
        DatahubClientException datahubClientException = null;
        while (i < this.binlogBatchReadSize && !this.wakeup) {
            long nanoTime = z ? 0L : System.nanoTime();
            RecordEntry read = this.shardGroupReader.read();
            long nanoTime2 = z ? 0L : System.nanoTime() - nanoTime;
            this.readCounter.inc();
            if (read == null) {
                this.readNullCounter.inc();
                if (!z) {
                    Thread.sleep(200L);
                }
                break;
            }
            if (nanoTime2 > 0) {
                try {
                    this.latencyGauge.report(nanoTime2 / 1000);
                    this.currentFetchEventTimeLag = System.currentTimeMillis() - read.getSystemTime();
                } catch (DatahubClientException e) {
                    this.exceptionCounter.inc();
                    LOG.warn("Failed to read record, retry {} times", Integer.valueOf(this.retryTimes + 1), e);
                    datahubClientException = e;
                    try {
                        Thread.sleep(this.retryInterval);
                        this.retryTimes++;
                    } catch (InterruptedException e2) {
                    }
                    if ("TableVersionExpired".equals(e.getErrorCode())) {
                        if (this.shardGroupReader != null) {
                            this.shardGroupReader.close();
                            this.shardGroupReader = null;
                        }
                        for (Map.Entry entry : this.shardIdToMaxLsn.entrySet()) {
                            this.binlogOffsetMap.put(entry.getKey(), new Tuple2(Long.valueOf(((Long) entry.getValue()).longValue() + 1), -1L));
                        }
                        LOG.info("register binlog reader for table version mismatch, {}", this.binlogOffsetMap);
                        openBinlogReader();
                    }
                } catch (Exception e3) {
                    throw new RuntimeException(e3);
                }
            }
            TupleRecordData tupleRecordData = (TupleRecordData) read.getRecordData();
            Long l = (Long) tupleRecordData.getField(0);
            if (!this.shardIdToMaxLsn.containsKey(read.getShardId())) {
                this.shardIdToMaxLsn.put(read.getShardId(), l);
            } else if (l.longValue() > ((Long) this.shardIdToMaxLsn.get(read.getShardId())).longValue()) {
                this.shardIdToMaxLsn.put(read.getShardId(), l);
            }
            z = true;
            i++;
            Collection recordsForSplit = hologresShardSplitRecords.recordsForSplit(read.getShardId());
            RowData convertTo = this.binlogRecordConverter.convertTo(tupleRecordData);
            if (!this.upsertSource || convertTo.getRowKind() != RowKind.UPDATE_BEFORE) {
                recordsForSplit.add(new BinlogSourceRecord(convertTo, read.getSequence(), read.getSystemTime()));
                this.retryTimes = 0;
                if (datahubClientException != null && this.retryTimes > this.retryTimesLimit) {
                    throw new IOException("Failed to read hologres binlog after retries: ", datahubClientException);
                }
            }
        }
        this.wakeup = false;
        hologresShardSplitRecords.prepareForRead();
        return hologresShardSplitRecords;
    }

    public void openBinlogReader() {
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : this.binlogOffsetMap.entrySet()) {
            hashMap.put(entry.getKey(), new Offset(((Long) ((Tuple2) entry.getValue()).f0).longValue(), ((Long) ((Tuple2) entry.getValue()).f1).longValue()));
        }
        if (this.shardGroupReader == null) {
            this.shardGroupReader = new ShardGroupReader(this.jdbcOptions.getDatabase(), this.jdbcOptions.getBinlogTableName(), getConsumerConfig(hashMap.size()));
        }
        this.shardGroupReader.createShardReader(hashMap);
    }

    public void close() throws Exception {
        super.close();
        if (this.shardGroupReader != null) {
            this.shardGroupReader.close();
        }
    }

    private ConsumerConfig getConsumerConfig(int i) {
        ConsumerConfig fetchLatestDelayMs = new ConsumerConfig(this.jdbcOptions.getEndpoint(), this.jdbcOptions.getUsername(), this.jdbcOptions.getPassword()).setClientProvider(HolohubClientProvider.newHolohubClientProvider(this.jdbcOptions)).setEnableBinary(false).setAutoCommit(false).setFetchSize(this.binlogBatchReadSize).setMaxShardReaderPoolSize(i).setMaxBufferSize(this.binlogBatchReadSize * 2).setFetchLatestDelayMs(500L);
        fetchLatestDelayMs.getHttpConfig().setCompressType(null).setConnTimeout(this.requestTimeout).setReadTimeout(this.requestTimeout);
        return fetchLatestDelayMs;
    }
}
