package com.alibaba.ververica.connectors.hologres.bhclient;

import com.alibaba.hologres.client.BlackholeClientConfigPB;
import com.alibaba.hologres.client.BlackholeClientException;
import com.alibaba.hologres.client.BlackholeTableBinlogReader;
import com.alibaba.hologres.client.model.TableSchema;
import com.alibaba.ververica.connectors.hologres.api.HologresTableSchema;
import com.alibaba.ververica.connectors.hologres.binlog.source.converter.BhclientBinlogRecordConverter;
import com.alibaba.ververica.connectors.hologres.binlog.source.reader.AbstractHologresBinlogShardSplitReader;
import com.alibaba.ververica.connectors.hologres.binlog.source.record.BinlogSourceRecord;
import com.alibaba.ververica.connectors.hologres.binlog.source.record.SourceRecord;
import com.alibaba.ververica.connectors.hologres.config.HologresConnectionParam;
import com.alibaba.ververica.connectors.hologres.config.JDBCOptions;
import com.alibaba.ververica.connectors.hologres.utils.HoloBinlogUtil;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shaded.blackholeclientjni.com.alibaba.blink.dataformat.BinaryRow;

/* loaded from: input_file:com/alibaba/ververica/connectors/hologres/bhclient/HologresBhclientBinlogShardSplitReader.class */
public class HologresBhclientBinlogShardSplitReader<T> extends AbstractHologresBinlogShardSplitReader<T> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) HologresBhclientBinlogShardSplitReader.class);
    private static final int CURSOR_TYPE_SYSTEM_TIME = 4;
    private final BhclientBinlogRecordConverter binlogRecordConverter;
    private final String[] blinkTableFields;
    private final String sqlTableName;
    private transient HologresBhclientClientProvider clientProvider;
    private BlackholeTableBinlogReader shardGroupReader;

    public HologresBhclientBinlogShardSplitReader(BhclientBinlogRecordConverter bhclientBinlogRecordConverter, HologresConnectionParam hologresConnectionParam, JDBCOptions jDBCOptions, String[] strArr, DataType[] dataTypeArr, String str, String[] strArr2, SourceReaderContext sourceReaderContext, String str2, long j) {
        super(hologresConnectionParam, jDBCOptions, strArr, dataTypeArr, sourceReaderContext, str2, j);
        this.binlogRecordConverter = bhclientBinlogRecordConverter;
        this.blinkTableFields = strArr2;
        this.sqlTableName = str;
    }

    public AbstractHologresBinlogShardSplitReader.HologresShardSplitRecords<SourceRecord<T>> binlogFetch() throws IOException {
        AbstractHologresBinlogShardSplitReader.HologresShardSplitRecords<SourceRecord<T>> hologresShardSplitRecords = new AbstractHologresBinlogShardSplitReader.HologresShardSplitRecords<>();
        int i = 0;
        boolean z = false;
        while (i < this.binlogBatchReadSize && !this.wakeup) {
            long nanoTime = z ? 0L : System.nanoTime();
            BlackholeTableBinlogReader.GetNextResult next = this.shardGroupReader.getNext();
            long nanoTime2 = z ? 0L : System.nanoTime() - nanoTime;
            if (next.rows.size() == 0) {
                try {
                    this.readNullCounter.inc();
                    if (z) {
                        break;
                    }
                } catch (Exception e) {
                    this.exceptionCounter.inc();
                    throw new IOException("Failed to read hologres binlog, failed reason: ", e);
                }
            } else {
                if (nanoTime2 > 0) {
                    this.latencyGauge.report(nanoTime2 / 1000);
                }
                if (next.rows.size() > 0) {
                    String valueOf = String.valueOf(next.shardId);
                    if (!this.shardIdToMaxLsn.containsKey(valueOf)) {
                        this.shardIdToMaxLsn.put(valueOf, -1L);
                    }
                    for (int i2 = 0; i2 < next.rows.size(); i2++) {
                        BinaryRow GetRow = next.rows.GetRow(i2);
                        RowData convertTo = this.binlogRecordConverter.convertTo(GetRow);
                        long j = GetRow.getLong(this.blinkTableFields.length + 0);
                        long j2 = GetRow.getLong(this.blinkTableFields.length + 2) / 1000;
                        this.readCounter.inc();
                        this.shardIdToMaxLsn.put(valueOf, Long.valueOf(j));
                        this.currentFetchEventTimeLag = System.currentTimeMillis() - j2;
                        z = true;
                        i++;
                        Collection recordsForSplit = hologresShardSplitRecords.recordsForSplit(valueOf);
                        if (!this.upsertSource || convertTo.getRowKind() != RowKind.UPDATE_BEFORE) {
                            recordsForSplit.add(new BinlogSourceRecord(convertTo, j, j2));
                        }
                    }
                }
            }
        }
        this.wakeup = false;
        hologresShardSplitRecords.prepareForRead();
        return hologresShardSplitRecords;
    }

    public void openBinlogReader() {
        try {
            TableSchema tableSchema = HologresTableSchema.get(this.connectionParam).get();
            this.clientProvider = new HologresBhclientClientProvider(this.connectionParam, this.sqlTableName);
            BlackholeClientConfigPB.TableBinlogReaderCreateOptions.Builder rpcTimeoutMs = BlackholeClientConfigPB.TableBinlogReaderCreateOptions.newBuilder().setTableDesc(this.clientProvider.getClient().toTableDesc(this.connectionParam.getDatabase(), tableSchema.getSchemaName(), tableSchema.getTableName())).setProjections(this.clientProvider.getClient().fieldsToColumnDescs(ArrayUtils.concat(this.blinkTableFields, HoloBinlogUtil.BINLOG_METADATA_COLUMNS_FOR_BHCLIENT))).setPreReadQueueSize(this.connectionParam.getBhclientReaderQueue()).setPreReadBatchSize(this.connectionParam.getBhclientReaderBatch()).setRetry(true).setRpcTimeoutMs(0);
            Iterator it = this.binlogOffsetMap.entrySet().iterator();
            while (it.hasNext()) {
                rpcTimeoutMs.addShardIds(Integer.parseInt((String) ((Map.Entry) it.next()).getKey()));
            }
            this.shardGroupReader = this.clientProvider.getClient().createBlackholeTableBinlogReader(rpcTimeoutMs.build());
            for (Map.Entry entry : this.binlogOffsetMap.entrySet()) {
                if (((Long) ((Tuple2) entry.getValue()).f0).longValue() > 0) {
                    this.shardGroupReader.seek(Integer.parseInt((String) entry.getKey()), ((Long) ((Tuple2) entry.getValue()).f0).longValue());
                } else {
                    this.shardGroupReader.seek(Integer.parseInt((String) entry.getKey()), 4, ((Long) ((Tuple2) entry.getValue()).f1).longValue() * 1000);
                }
            }
        } catch (BlackholeClientException e) {
            throw new RuntimeException("create bhclient client failed, because:", e);
        }
    }

    public void close() throws Exception {
        super.close();
        if (this.shardGroupReader != null) {
            this.shardGroupReader.setWaitReqsBeforeClose(false);
            this.shardGroupReader.close();
        }
        if (this.clientProvider != null) {
            this.clientProvider.closeClient();
        }
    }
}
