package com.huaweicloud.dws.client.binlog.reader;

import com.huaweicloud.dws.client.DwsConfig;
import com.huaweicloud.dws.client.TableConfig;
import com.huaweicloud.dws.client.binlog.collector.BinlogApi;
import com.huaweicloud.dws.client.binlog.collector.BinlogCollector;
import com.huaweicloud.dws.client.binlog.collector.BinlogFullSyncCollector;
import com.huaweicloud.dws.client.binlog.model.BinlogRecord;
import com.huaweicloud.dws.client.binlog.model.FullSyncSlot;
import com.huaweicloud.dws.client.binlog.model.Slot;
import com.huaweicloud.dws.client.exception.DwsBinlogException;
import com.huaweicloud.dws.client.exception.DwsClientException;
import com.huaweicloud.dws.client.exception.ExceptionCode;
import com.huaweicloud.dws.client.exception.InvalidException;
import com.huaweicloud.dws.client.util.AssertUtil;
import com.huaweicloud.dws.client.util.LogUtil;
import com.huaweicloud.dws.client.worker.DwsConnectionPool;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huaweicloud/dws/client/binlog/reader/BinlogReader.class */
public class BinlogReader {
    private static final Logger log = LoggerFactory.getLogger(BinlogReader.class);
    private static final long AWAIT_TIME = 1000;
    private static final int SLEEP_RANDOM_MS = 100;
    private final SecureRandom sleepRandom;
    private final DwsConfig dwsConfig;
    private final TableConfig tableConfig;
    private final ExecutorService binlogCollectExecutorService;
    private final ExecutorService binlogGetRecordExecutorService;
    private final AtomicBoolean started;
    private final AtomicBoolean needFullSync;
    private final AtomicLong totalRecord;
    private final List<CompletableFuture<Void>> completableFutureList;
    private final int fullSyncBinlogBatchReadSize;
    private final AtomicBoolean running;
    private final List<BinlogCollector> binlogCollectors;
    private final List<BinlogFullSyncCollector> binlogFullSyncCollectors;
    private final int binlogParallelNum;
    private final BlockingQueue<BinlogRecord> queue;
    private final List<Slot> specifySlots;
    private Exception exception;
    private final List<String> columnNames;
    private List<Slot> currentSlots;
    private final String tableName;
    private final String slotName;
    private final DwsConnectionPool dwsConnectionPool;
    private List<Integer> nodeIds;
    private final boolean isNeedRedistribution;
    private final String errorMessage;
    private int noDataTimes;
    private String binlogWorkerThreadPrefix;
    private String binlogGetRecordThreadPrefix;

    public BinlogReader(DwsConfig dwsConfig, BlockingQueue<BinlogRecord> blockingQueue, List<Slot> list, List<String> list2, DwsConnectionPool dwsConnectionPool) {
        this.started = new AtomicBoolean(false);
        this.needFullSync = new AtomicBoolean(false);
        this.totalRecord = new AtomicLong(0L);
        this.completableFutureList = new ArrayList();
        this.running = new AtomicBoolean(false);
        this.binlogCollectors = new ArrayList();
        this.binlogFullSyncCollectors = new ArrayList();
        this.currentSlots = new ArrayList();
        this.nodeIds = new CopyOnWriteArrayList();
        this.noDataTimes = 0;
        this.binlogWorkerThreadPrefix = "binlog-worker";
        this.binlogGetRecordThreadPrefix = "binlog-get-record";
        checkParams(dwsConfig);
        this.dwsConfig = dwsConfig;
        this.queue = blockingQueue;
        this.specifySlots = list;
        this.columnNames = list2;
        this.tableName = dwsConfig.getBinlogTableName();
        this.dwsConnectionPool = dwsConnectionPool;
        this.tableConfig = dwsConfig.getTableConfig(this.tableName);
        this.isNeedRedistribution = this.tableConfig.isNeedRedistribution();
        this.errorMessage = this.tableConfig.getErrorMessage();
        this.slotName = this.tableConfig.getBinlogSlotName();
        this.fullSyncBinlogBatchReadSize = this.tableConfig.getFullSyncBinlogBatchReadSize();
        this.binlogParallelNum = this.tableConfig.getBinlogParallelNum();
        this.binlogCollectExecutorService = new ThreadPoolExecutor(this.binlogParallelNum, this.binlogParallelNum, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(10), new ThreadFactory() { // from class: com.huaweicloud.dws.client.binlog.reader.BinlogReader.1
            private final AtomicInteger threadNumber = new AtomicInteger(1);

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable);
                thread.setName(BinlogReader.this.binlogWorkerThreadPrefix + "-" + BinlogReader.this.tableName + "-" + this.threadNumber.getAndIncrement());
                thread.setDaemon(true);
                return thread;
            }
        }, new ThreadPoolExecutor.AbortPolicy());
        this.binlogGetRecordExecutorService = Executors.newFixedThreadPool(1, runnable -> {
            Thread thread = new Thread(runnable);
            thread.setName(this.binlogGetRecordThreadPrefix + "-" + this.tableName);
            thread.setDaemon(true);
            return thread;
        });
        try {
            this.sleepRandom = SecureRandom.getInstanceStrong();
        } catch (NoSuchAlgorithmException e) {
            throw new InvalidException(e);
        }
    }

    public BinlogReader(DwsConfig dwsConfig, BlockingQueue<BinlogRecord> blockingQueue, List<String> list, DwsConnectionPool dwsConnectionPool) {
        this(dwsConfig, blockingQueue, null, list, dwsConnectionPool);
    }

    public synchronized void getRecords() throws Exception {
        if (!this.started.get()) {
            log.error("binlogReader not started...");
            throw new DwsClientException(ExceptionCode.BINLOG_READER_NOT_STARTED, "binlogReader not started...");
        }
        if (Objects.nonNull(this.exception)) {
            throw new DwsBinlogException(this.exception.getMessage(), this.exception);
        }
        if (this.running.get()) {
            return;
        }
        this.binlogGetRecordExecutorService.execute(() -> {
            try {
                if (this.running.compareAndSet(false, true)) {
                    while (this.started.get()) {
                        if (Objects.nonNull(this.exception)) {
                            throw DwsClientException.fromException(this.exception);
                        }
                        doCollect();
                    }
                }
            } catch (Exception e) {
                log.error("get binlog records has error", e);
                this.exception = e;
            }
        });
    }

    public void checkException() throws DwsBinlogException {
        if (Objects.nonNull(this.exception)) {
            throw new DwsBinlogException(this.exception.getMessage(), this.exception);
        }
    }

    public boolean hasException() {
        return Objects.nonNull(this.exception);
    }

    public synchronized void doCollect() throws Exception {
        int binlogMaxRetryTimes = this.tableConfig.getBinlogMaxRetryTimes();
        for (int i = 1; i <= binlogMaxRetryTimes; i++) {
            try {
                long currentTimeMillis = System.currentTimeMillis();
                prepareToCollect();
                collectBinlog();
                registerSlot();
                reset();
                if (this.totalRecord.get() != 0) {
                    this.noDataTimes = 0;
                    log.info("read binlog info cost time: {}, slots: {},", Long.valueOf(System.currentTimeMillis() - currentTimeMillis), this.currentSlots);
                    return;
                }
                long binlogSleepTime = this.tableConfig.getBinlogSleepTime();
                int i2 = this.noDataTimes + 1;
                this.noDataTimes = i2;
                long min = Math.min(binlogSleepTime * i2, 10000L);
                LogUtil.withLogSwitch(this.dwsConfig, () -> {
                    log.warn("there is no new data in currentSlots, sleepTime: {}", Long.valueOf(min));
                });
                TimeUnit.MILLISECONDS.sleep(min);
                return;
            } catch (Exception e) {
                log.error("[retry]collectBinlog has error: ", e);
                stopCollectors();
                if (handleRedistribution(e)) {
                    log.info("no need retry, begin re-consume...");
                    throw DwsClientException.fromException(e);
                }
                if (i >= binlogMaxRetryTimes) {
                    log.info("reach to MaxRetryTimes: " + i);
                    throw DwsClientException.fromException(e);
                }
                long binlogRetryInterval = (this.tableConfig.getBinlogRetryInterval() * i) + this.sleepRandom.nextInt(SLEEP_RANDOM_MS);
                log.info("reader binlog fail, start retry, retryCount: {}, sleepTime: {}", Integer.valueOf(i), Long.valueOf(binlogRetryInterval));
                try {
                    TimeUnit.MILLISECONDS.sleep(binlogRetryInterval);
                } catch (InterruptedException e2) {
                    log.error("sleep has error: ", e2);
                    throw DwsClientException.fromException(e2);
                }
            }
        }
    }

    public boolean handleRedistribution(Exception exc) {
        if (!exc.getMessage().contains(this.errorMessage)) {
            return false;
        }
        log.info("start handle redistribution...");
        return true;
    }

    private void prepareToCollect() throws Exception {
        if (nextCycle()) {
            initCurrentSlots();
        }
        mergeCurrentSlotList();
        if (this.currentSlots.stream().anyMatch(slot -> {
            return slot.getCurrentStartCsn() == 0;
        })) {
            this.needFullSync.set(true);
        }
        initBinlogCollectors();
    }

    protected void initCurrentSlots() throws SQLException {
        Connection connection = this.dwsConnectionPool.getConnection();
        Throwable th = null;
        try {
            this.currentSlots = BinlogApi.getSyncPoint(connection, this.tableName, this.slotName, 0, false, this.isNeedRedistribution);
            LogUtil.withLogSwitch(this.dwsConfig, () -> {
                log.info("get slot info from db currentSlots: {}", this.currentSlots);
            });
            if (connection != null) {
                if (0 == 0) {
                    connection.close();
                    return;
                }
                try {
                    connection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    connection.close();
                }
            }
            throw th3;
        }
    }

    private boolean hasNewData() throws Exception {
        if (this.currentSlots.isEmpty()) {
            return true;
        }
        List<Slot> slotsInDb = getSlotsInDb();
        Map map = (Map) slotsInDb.stream().collect(Collectors.toMap((v0) -> {
            return v0.getDnNodeId();
        }, Function.identity()));
        for (Slot slot : this.currentSlots) {
            int dnNodeId = slot.getDnNodeId();
            if (map.containsKey(Integer.valueOf(dnNodeId)) && ((Slot) map.get(Integer.valueOf(dnNodeId))).getEndCsn() > slot.getCurrentStartCsn()) {
                return true;
            }
        }
        log.warn("there is no new data, currentSlots: {}, slotsInDB: {}", this.currentSlots, slotsInDb);
        return false;
    }

    protected List<Slot> getSlotsInDb() throws SQLException {
        Connection connection = this.dwsConnectionPool.getConnection();
        Throwable th = null;
        try {
            List<Slot> syncPoint = BinlogApi.getSyncPoint(connection, this.tableName, this.slotName, 0, false, this.isNeedRedistribution);
            LogUtil.withLogSwitch(this.dwsConfig, () -> {
                log.info("get slot info from db slotsInDB: {}", syncPoint);
            });
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    connection.close();
                }
            }
            return syncPoint;
        } catch (Throwable th3) {
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    connection.close();
                }
            }
            throw th3;
        }
    }

    public List<Integer> getNodesFromDB() throws SQLException {
        Connection connection = getDwsConnectionPool().getConnection();
        Throwable th = null;
        try {
            List<Integer> nodeIdsWithTableName = BinlogApi.getNodeIdsWithTableName(connection, getTableName());
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    connection.close();
                }
            }
            return nodeIdsWithTableName;
        } catch (Throwable th3) {
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    connection.close();
                }
            }
            throw th3;
        }
    }

    private boolean nextCycle() {
        return this.currentSlots.stream().allMatch((v0) -> {
            return v0.isEnd();
        });
    }

    private void mergeCurrentSlotList() {
        if (Objects.isNull(this.specifySlots) || this.specifySlots.size() == 0) {
            return;
        }
        if (this.specifySlots.size() == 1 && (this.specifySlots.get(0) instanceof FullSyncSlot)) {
            this.currentSlots.forEach(slot -> {
                slot.setStartCsn(0L);
                slot.setCurrentStartCsn(0L);
            });
        } else {
            Map map = (Map) this.specifySlots.stream().collect(Collectors.toMap((v0) -> {
                return v0.getDnNodeId();
            }, Function.identity()));
            for (Slot slot2 : this.currentSlots) {
                int dnNodeId = slot2.getDnNodeId();
                if (map.containsKey(Integer.valueOf(dnNodeId))) {
                    Slot slot3 = (Slot) map.get(Integer.valueOf(dnNodeId));
                    if (slot3.getCurrentStartCsn() >= slot2.getCurrentStartCsn()) {
                        log.warn("specify slot is large than current slot, ignore...");
                    } else {
                        slot2.setCurrentStartCsn(slot3.getCurrentStartCsn());
                    }
                }
            }
        }
        log.info("use specify slot to replace current slot. specifySlotList: {}, currentSlots: {}", this.specifySlots, this.currentSlots);
        this.specifySlots.clear();
    }

    private void collectBinlog() {
        if (this.needFullSync.get()) {
            this.binlogFullSyncCollectors.forEach(binlogFullSyncCollector -> {
                this.completableFutureList.add(CompletableFuture.runAsync(binlogFullSyncCollector, this.binlogCollectExecutorService));
            });
        } else {
            this.binlogCollectors.forEach(binlogCollector -> {
                this.completableFutureList.add(CompletableFuture.runAsync(binlogCollector, this.binlogCollectExecutorService));
            });
        }
    }

    private void initBinlogCollectors() {
        this.binlogCollectors.clear();
        this.binlogFullSyncCollectors.clear();
        this.totalRecord.set(0L);
        int size = this.currentSlots.size();
        HashMap hashMap = new HashMap();
        for (int i = 0; i < size; i++) {
            ((List) hashMap.computeIfAbsent(Integer.valueOf(i % this.binlogParallelNum), num -> {
                return new ArrayList();
            })).add(this.currentSlots.get(i));
        }
        hashMap.forEach((num2, list) -> {
            if (list.stream().allMatch((v0) -> {
                return v0.isEnd();
            })) {
                return;
            }
            if (this.needFullSync.get()) {
                this.binlogFullSyncCollectors.add(new BinlogFullSyncCollector(this.queue, list, this.started, this.fullSyncBinlogBatchReadSize, this.dwsConnectionPool, this.columnNames, this.tableName, this.totalRecord));
            } else {
                this.binlogCollectors.add(new BinlogCollector(this.queue, list, this.started, this.tableConfig, this.dwsConnectionPool, this.columnNames, this.totalRecord));
            }
        });
    }

    private void registerSlot() throws Exception {
        CompletableFuture.allOf((CompletableFuture[]) this.completableFutureList.toArray(new CompletableFuture[0])).whenComplete((r3, th) -> {
            this.completableFutureList.clear();
        }).get(this.needFullSync.get() ? this.tableConfig.getFullSyncBinlogReadTimeout() : this.tableConfig.getBinlogReadTimeout(), TimeUnit.MILLISECONDS);
        if (this.needFullSync.get() || nextCycle()) {
            for (Slot slot : this.currentSlots) {
                Connection connection = this.dwsConnectionPool.getConnection();
                Throwable th2 = null;
                try {
                    try {
                        BinlogApi.updateSyncPoint(connection, this.tableName, this.slotName, slot.getEndCsn(), slot.getDnNodeId(), slot.getXmin(), false);
                        if (connection != null) {
                            if (0 != 0) {
                                try {
                                    connection.close();
                                } catch (Throwable th3) {
                                    th2.addSuppressed(th3);
                                }
                            } else {
                                connection.close();
                            }
                        }
                    } catch (Throwable th4) {
                        if (connection != null) {
                            if (th2 != null) {
                                try {
                                    connection.close();
                                } catch (Throwable th5) {
                                    th2.addSuppressed(th5);
                                }
                            } else {
                                connection.close();
                            }
                        }
                        throw th4;
                    }
                } catch (Throwable th6) {
                    th2 = th6;
                    throw th6;
                }
            }
        }
    }

    private void reset() {
        this.needFullSync.set(false);
    }

    public void start() {
        if (!this.started.compareAndSet(false, true)) {
            log.warn("BinlogReader has started...");
        } else {
            log.info("start BinlogReader...");
            Runtime.getRuntime().addShutdownHook(new Thread(this::close));
        }
    }

    private void checkParams(DwsConfig dwsConfig) {
        String binlogTableName = dwsConfig.getBinlogTableName();
        AssertUtil.nonNull(binlogTableName, new InvalidException("tableName not set"));
        TableConfig tableConfig = dwsConfig.getTableConfig(binlogTableName);
        AssertUtil.nonNull(tableConfig, new InvalidException("tableConfig not set"));
        AssertUtil.isTrue(tableConfig.isBinlog(), new InvalidException("binlog is false"));
        String binlogSlotName = tableConfig.getBinlogSlotName();
        if (Objects.isNull(binlogSlotName) || "".equals(binlogSlotName)) {
            tableConfig.withBinlogSlotName(binlogTableName);
        }
    }

    public void stop() {
        this.started.set(false);
    }

    public boolean isStart() {
        return this.started.get();
    }

    private void stopCollectors() {
        log.info("stop collectors...");
        this.binlogFullSyncCollectors.forEach((v0) -> {
            v0.stop();
        });
        this.binlogCollectors.forEach((v0) -> {
            v0.stop();
        });
    }

    public void close() {
        try {
            if (!this.started.compareAndSet(true, false)) {
                log.warn("BinlogReader has closed...");
                return;
            }
            log.info("BinlogReader close start");
            BinlogApi.nodeCount.remove();
            stopCollectors();
            this.binlogCollectExecutorService.shutdownNow();
            while (!this.binlogCollectExecutorService.awaitTermination(AWAIT_TIME, TimeUnit.MILLISECONDS)) {
                log.info("wait binlog collect executorService termination");
            }
            this.binlogGetRecordExecutorService.shutdownNow();
            while (!this.binlogGetRecordExecutorService.awaitTermination(AWAIT_TIME, TimeUnit.MILLISECONDS)) {
                log.info("wait binlog get record executorService termination");
            }
            this.dwsConnectionPool.close();
            log.info("BinlogReader close end");
        } catch (Exception e) {
            log.error("shutdown binlog executorService error.", e);
        }
    }

    public TableConfig getTableConfig() {
        return this.tableConfig;
    }

    public Exception getException() {
        return this.exception;
    }

    public void setCurrentSlots(List<Slot> list) {
        this.currentSlots = list;
    }

    public String getTableName() {
        return this.tableName;
    }

    public String getSlotName() {
        return this.slotName;
    }

    public DwsConnectionPool getDwsConnectionPool() {
        return this.dwsConnectionPool;
    }

    public List<Integer> getNodeIds() {
        return this.nodeIds;
    }

    public void setNodeIds(List<Integer> list) {
        this.nodeIds = list;
    }

    public boolean isNeedRedistribution() {
        return this.isNeedRedistribution;
    }

    public String getErrorMessage() {
        return this.errorMessage;
    }

    public void setBinlogWorkerThreadPrefix(String str) {
        this.binlogWorkerThreadPrefix = str;
    }

    public void setBinlogGetRecordThreadPrefix(String str) {
        this.binlogGetRecordThreadPrefix = str;
    }
}
