package com.huaweicloud.dws.client.binlog.reader;

import com.huaweicloud.dws.client.DwsConfig;
import com.huaweicloud.dws.client.binlog.collector.BinlogApi;
import com.huaweicloud.dws.client.binlog.model.BinlogRecord;
import com.huaweicloud.dws.client.binlog.model.Slot;
import com.huaweicloud.dws.client.worker.DwsConnectionPool;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huaweicloud/dws/client/binlog/reader/ParallelBinlogReader.class */
public class ParallelBinlogReader extends BinlogReader {
    private static final Logger log = LoggerFactory.getLogger(ParallelBinlogReader.class);
    private final List<Slot> currentSlots;
    private final int taskId;
    private final int parallelTaskNum;
    private final List<Integer> pendingNodeIds;

    public ParallelBinlogReader(DwsConfig dwsConfig, BlockingQueue<BinlogRecord> blockingQueue, List<Slot> list, List<String> list2, DwsConnectionPool dwsConnectionPool, int i, int i2) {
        super(dwsConfig, blockingQueue, list, list2, dwsConnectionPool);
        this.currentSlots = new ArrayList();
        this.pendingNodeIds = new ArrayList();
        this.taskId = i;
        this.parallelTaskNum = i2;
        setCurrentSlots(this.currentSlots);
        setBinlogWorkerThreadPrefix("[taskId=" + i + "]:binlog-worker");
        setBinlogGetRecordThreadPrefix("[taskId=" + i + "]:binlog-get-record");
        log.info("init ParallelBinlogReader: taskId: {}, parallelTaskNum: {}", Integer.valueOf(i), Integer.valueOf(i2));
    }

    public ParallelBinlogReader(DwsConfig dwsConfig, BlockingQueue<BinlogRecord> blockingQueue, List<String> list, DwsConnectionPool dwsConnectionPool, int i, int i2) {
        this(dwsConfig, blockingQueue, null, list, dwsConnectionPool, i, i2);
    }

    @Override // com.huaweicloud.dws.client.binlog.reader.BinlogReader
    public void initCurrentSlots() throws SQLException {
        initOrUpdateCurrentSlots();
    }

    private void initOrUpdateCurrentSlots() throws SQLException {
        initPendingNodeIds();
        List<Slot> slotsInDb = getSlotsInDb();
        if (this.currentSlots.size() == 0) {
            this.currentSlots.addAll(slotsInDb);
            log.info("init currentSlots: slotsInDb: {}, currentSlots: {}", slotsInDb, this.currentSlots);
        } else {
            Map map = (Map) slotsInDb.stream().collect(Collectors.toMap((v0) -> {
                return v0.getDnNodeId();
            }, Function.identity()));
            this.currentSlots.forEach(slot -> {
                int dnNodeId = slot.getDnNodeId();
                if (!map.containsKey(Integer.valueOf(dnNodeId))) {
                    log.warn("dnNodeId: {} not in currentSlots: [{}]", Integer.valueOf(dnNodeId), this.currentSlots);
                    return;
                }
                Slot slot = (Slot) map.get(Integer.valueOf(dnNodeId));
                slot.setStartCsn(slot.getStartCsn());
                slot.setCurrentStartCsn(slot.getCurrentStartCsn());
                slot.setEndCsn(slot.getEndCsn());
            });
        }
    }

    public void initPendingNodeIds() throws SQLException {
        if (getNodeIds().size() == 0) {
            Connection connection = getDwsConnectionPool().getConnection();
            Throwable th = null;
            try {
                if (isNeedRedistribution()) {
                    getNodeIds().addAll(BinlogApi.getNodeIds(connection, getTableName()));
                } else {
                    getNodeIds().addAll(BinlogApi.getNodeIds(connection, null));
                }
            } finally {
                if (connection != null) {
                    if (0 != 0) {
                        try {
                            connection.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        connection.close();
                    }
                }
            }
        }
        int size = getNodeIds().size();
        if (this.taskId + 1 > size) {
            log.warn("task number is larger than dn number, stop...");
            stop();
            return;
        }
        if (isNeedRedistribution()) {
            BinlogApi.nodeCount.set(Integer.valueOf(size));
        }
        if (this.pendingNodeIds.size() == 0) {
            for (int i = 0; i < size; i++) {
                if (i % this.parallelTaskNum == this.taskId) {
                    this.pendingNodeIds.add(getNodeIds().get(i));
                }
            }
            log.info("taskId: {}, init pendingNodeIds: {}, nodeIds: {}", new Object[]{Integer.valueOf(this.taskId), this.pendingNodeIds, getNodeIds()});
        }
    }

    @Override // com.huaweicloud.dws.client.binlog.reader.BinlogReader
    public List<Slot> getSlotsInDb() throws SQLException {
        ArrayList arrayList = new ArrayList();
        Iterator<Integer> it = this.pendingNodeIds.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            Connection connection = getDwsConnectionPool().getConnection();
            Throwable th = null;
            try {
                try {
                    arrayList.addAll(BinlogApi.getSyncPoint(connection, getTableName(), getSlotName(), intValue, false, isNeedRedistribution()));
                    if (connection != null) {
                        if (0 != 0) {
                            try {
                                connection.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            connection.close();
                        }
                    }
                } catch (Throwable th3) {
                    if (connection != null) {
                        if (th != null) {
                            try {
                                connection.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            connection.close();
                        }
                    }
                    throw th3;
                }
            } finally {
            }
        }
        return (List) arrayList.stream().sorted(Comparator.comparing((v0) -> {
            return v0.getDnNodeId();
        })).collect(Collectors.toList());
    }

    public List<Integer> getPendingNodeIds() {
        return this.pendingNodeIds;
    }
}
