/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mysql.source.assigners;

import io.debezium.relational.TableId;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase;
import org.apache.flink.cdc.connectors.mysql.source.assigners.AssignerStatus;
import org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner;
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.ChunkSplitterState;
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.HybridPendingSplitsState;
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.SnapshotPendingSplitsState;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSchemalessSnapshotSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class MySqlHybridSplitAssignerTest
extends MySqlSourceTestBase {
    private static final UniqueDatabase customerDatabase = new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");

    @BeforeClass
    public static void init() {
        customerDatabase.createAndInitialize();
    }

    @Test
    public void testAssignMySqlBinlogSplitAfterAllSnapshotSplitsFinished() {
        String captureTable = "customers";
        MySqlSourceConfig configuration = this.getConfig(new String[]{"customers"}, StartupOptions.initial());
        TableId tableId = new TableId(null, customerDatabase.getDatabaseName(), "customers");
        RowType splitKeyType = (RowType)DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"id", (DataType)DataTypes.BIGINT())}).getLogicalType();
        ArrayList alreadyProcessedTables = Lists.newArrayList((Object[])new TableId[]{tableId});
        ArrayList remainingSplits = new ArrayList();
        HashMap<String, MySqlSchemalessSnapshotSplit> assignedSplits = new HashMap<String, MySqlSchemalessSnapshotSplit>();
        HashMap<String, BinlogOffset> splitFinishedOffsets = new HashMap<String, BinlogOffset>();
        for (int i = 0; i < 5; ++i) {
            Object[] objectArray;
            String splitId = customerDatabase.getDatabaseName() + "." + "customers" + ":" + i;
            if (i == 0) {
                objectArray = null;
            } else {
                Object[] objectArray2 = new Object[1];
                objectArray = objectArray2;
                objectArray2[0] = i * 2;
            }
            Object[] splitStart = objectArray;
            Object[] splitEnd = new Object[]{i * 2 + 2};
            BinlogOffset highWatermark = BinlogOffset.ofBinlogFilePosition((String)"mysql-bin.00001", (long)(i + 1));
            MySqlSchemalessSnapshotSplit mySqlSchemalessSnapshotSplit = new MySqlSchemalessSnapshotSplit(tableId, splitId, splitKeyType, splitStart, splitEnd, highWatermark);
            assignedSplits.put(splitId, mySqlSchemalessSnapshotSplit);
            splitFinishedOffsets.put(splitId, highWatermark);
        }
        SnapshotPendingSplitsState snapshotPendingSplitsState = new SnapshotPendingSplitsState((List)alreadyProcessedTables, remainingSplits, assignedSplits, new HashMap(), splitFinishedOffsets, AssignerStatus.INITIAL_ASSIGNING_FINISHED, new ArrayList(), false, true, ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
        HybridPendingSplitsState checkpoint = new HybridPendingSplitsState(snapshotPendingSplitsState, false);
        MySqlHybridSplitAssigner assigner = new MySqlHybridSplitAssigner(configuration, 4, checkpoint);
        Optional binlogSplit = assigner.getNext();
        MySqlBinlogSplit mySqlBinlogSplit = ((MySqlSplit)binlogSplit.get()).asBinlogSplit();
        ArrayList<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<FinishedSnapshotSplitInfo>();
        List mySqlSchemalessSnapshotSplits = assignedSplits.values().stream().sorted(Comparator.comparing(MySqlSplit::splitId)).collect(Collectors.toList());
        for (MySqlSchemalessSnapshotSplit split : mySqlSchemalessSnapshotSplits) {
            finishedSnapshotSplitInfos.add(new FinishedSnapshotSplitInfo(split.getTableId(), split.splitId(), split.getSplitStart(), split.getSplitEnd(), split.getHighWatermark()));
        }
        MySqlBinlogSplit expected = new MySqlBinlogSplit("binlog-split", BinlogOffset.ofBinlogFilePosition((String)"mysql-bin.00001", (long)1L), BinlogOffset.ofNonStopping(), finishedSnapshotSplitInfos, new HashMap(), finishedSnapshotSplitInfos.size());
        Assert.assertEquals((Object)expected, (Object)mySqlBinlogSplit);
        assigner.close();
    }

    @Test
    public void testAssigningInSnapshotOnlyMode() {
        String captureTable = "customers";
        MySqlSourceConfig sourceConfig = this.getConfig(new String[]{"customers"}, StartupOptions.snapshot());
        MySqlHybridSplitAssigner assigner = new MySqlHybridSplitAssigner(sourceConfig, 1, new ArrayList(), false);
        assigner.open();
        List<MySqlSnapshotSplit> snapshotSplits = this.drainSnapshotSplits(assigner);
        int i = 0;
        HashMap<String, BinlogOffset> finishedOffsets = new HashMap<String, BinlogOffset>();
        for (MySqlSnapshotSplit snapshotSplit : snapshotSplits) {
            BinlogOffset binlogOffset = BinlogOffset.builder().setBinlogFilePosition("foo", (long)i++).build();
            finishedOffsets.put(snapshotSplit.splitId(), binlogOffset);
        }
        assigner.onFinishedSplits(finishedOffsets);
        Optional split = assigner.getNext();
        Assert.assertTrue((boolean)split.isPresent());
        Assert.assertTrue((boolean)(split.get() instanceof MySqlBinlogSplit));
        MySqlBinlogSplit binlogSplit = ((MySqlSplit)split.get()).asBinlogSplit();
        Assert.assertEquals((Object)BinlogOffset.builder().setBinlogFilePosition("foo", (long)(snapshotSplits.size() - 1)).build(), (Object)binlogSplit.getEndingOffset());
    }

    private MySqlSourceConfig getConfig(String[] captureTables, StartupOptions startupOptions) {
        String[] captureTableIds = (String[])Arrays.stream(captureTables).map(tableName -> customerDatabase.getDatabaseName() + "." + tableName).toArray(String[]::new);
        return new MySqlSourceConfigFactory().startupOptions(startupOptions).databaseList(new String[]{customerDatabase.getDatabaseName()}).tableList(captureTableIds).hostname(MYSQL_CONTAINER.getHost()).port(MYSQL_CONTAINER.getDatabasePort()).username(customerDatabase.getUsername()).password(customerDatabase.getPassword()).serverTimeZone(ZoneId.of("UTC").toString()).createConfig(0);
    }

    private List<MySqlSnapshotSplit> drainSnapshotSplits(MySqlHybridSplitAssigner assigner) {
        Optional split;
        ArrayList<MySqlSnapshotSplit> snapshotSplits = new ArrayList<MySqlSnapshotSplit>();
        while ((split = assigner.getNext()).isPresent()) {
            Assert.assertTrue((boolean)(split.get() instanceof MySqlSnapshotSplit));
            snapshotSplits.add(((MySqlSplit)split.get()).asSnapshotSplit());
        }
        return snapshotSplits;
    }
}

