/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mysql.source.assigners;

import io.debezium.relational.Column;
import io.debezium.relational.TableId;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase;
import org.apache.flink.cdc.connectors.mysql.source.assigners.AssignerStatus;
import org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner;
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.ChunkSplitterState;
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.SnapshotPendingSplitsState;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSchemalessSnapshotSplit;
import org.apache.flink.cdc.connectors.mysql.source.utils.ChunkUtils;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.ExceptionUtils;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class MySqlSnapshotSplitAssignerTest
extends MySqlSourceTestBase {
    private static final UniqueDatabase customerDatabase = new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");

    @BeforeClass
    public static void init() {
        customerDatabase.createAndInitialize();
    }

    @Test
    public void testAssignSingleTableSplits() {
        List<String> expected = Arrays.asList("customers_even_dist null [105]", "customers_even_dist [105] [109]", "customers_even_dist [109] null");
        List<String> splits = this.getTestAssignSnapshotSplits(4, (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[]{"customers_even_dist"});
        Assert.assertEquals(expected, splits);
    }

    @Test
    public void testAssignTableWhoseRowCntLessSplitSize() {
        List<String> expected = Arrays.asList("customers null null");
        List<String> splits = this.getTestAssignSnapshotSplits(2000, (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[]{"customers"});
        Assert.assertEquals(expected, splits);
    }

    @Test
    public void testAssignMultipleTableSplits() {
        List<String> expected = Arrays.asList("customers_even_dist null [105]", "customers_even_dist [105] [109]", "customers_even_dist [109] null", "customers_sparse_dist null [10]", "customers_sparse_dist [10] [18]", "customers_sparse_dist [18] null");
        List<String> splits = this.getTestAssignSnapshotSplits(4, (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[]{"customers_even_dist", "customers_sparse_dist"});
        Assert.assertEquals(expected, splits);
    }

    @Test
    public void testAssignCompositePkTableSplitsUnevenlyWithChunkKeyColumn() {
        List<String> expected = Arrays.asList("shopping_cart null [KIND_007]", "shopping_cart [KIND_007] [KIND_008]", "shopping_cart [KIND_008] [KIND_009]", "shopping_cart [KIND_009] [KIND_100]", "shopping_cart [KIND_100] null");
        List<String> splits = this.getTestAssignSnapshotSplits(customerDatabase, 4, (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[]{"shopping_cart"}, "product_kind");
        Assert.assertEquals(expected, splits);
    }

    @Test
    public void testAssignCompositePkTableSplitsEvenlyWithChunkKeyColumn() {
        List<String> expected = Arrays.asList("evenly_shopping_cart null [105]", "evenly_shopping_cart [105] [109]", "evenly_shopping_cart [109] null");
        List<String> splits = this.getTestAssignSnapshotSplits(customerDatabase, 4, (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[]{"evenly_shopping_cart"}, "product_no");
        Assert.assertEquals(expected, splits);
    }

    @Test
    public void testAssignCompositePkTableWithWrongChunkKeyColumn() {
        try {
            this.getTestAssignSnapshotSplits(customerDatabase, 4, (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[]{"customer_card"}, "errorCol");
            Assert.fail((String)"exception expected");
        }
        catch (Throwable t) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)t, (String)"Chunk key column 'errorCol' doesn't exist in the primary keys [card_no,level] of the table").isPresent());
        }
    }

    @Test
    public void testEnableAutoIncrementedKeyOptimization() {
        List<String> expected = Arrays.asList("shopping_cart_big null [3]", "shopping_cart_big [3] null");
        List<String> splits = this.getTestAssignSnapshotSplits(2, (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[]{"shopping_cart_big"});
        Assert.assertEquals(expected, splits);
    }

    @Test
    public void testAssignSnapshotSplitsWithRandomPrimaryKey() {
        List<String> expected = Arrays.asList("address null [417111867899200427]", "address [417111867899200427] [417420106184475563]", "address [417420106184475563] null");
        List<String> splits = this.getTestAssignSnapshotSplits(4, (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[]{"address"});
        Assert.assertEquals(expected, splits);
    }

    @Test
    public void testAssignSnapshotSplitsWithDecimalKey() {
        List<String> expected = Arrays.asList("shopping_cart_dec null [123458.1230]", "shopping_cart_dec [123458.1230] null");
        List<String> splits = this.getTestAssignSnapshotSplits(2, (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[]{"shopping_cart_dec"});
        Assert.assertEquals(expected, splits);
    }

    @Test
    public void testAssignTableWithMultipleKey() {
        List<String> expected = Arrays.asList("customer_card null [20004]", "customer_card [20004] [30006]", "customer_card [30006] [30009]", "customer_card [30009] [40001]", "customer_card [40001] [50001]", "customer_card [50001] null");
        List<String> splits = this.getTestAssignSnapshotSplits(4, (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[]{"customer_card"});
        Assert.assertEquals(expected, splits);
    }

    @Test
    public void testAssignTableWithSparseDistributionSplitKey() {
        List<String> expected = Arrays.asList("customers_sparse_dist null [10]", "customers_sparse_dist [10] [18]", "customers_sparse_dist [18] null");
        List<String> splits = this.getTestAssignSnapshotSplits(4, 2000.0, (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[]{"customers_sparse_dist"});
        Assert.assertEquals(expected, splits);
        List<String> expected1 = Arrays.asList("customers_sparse_dist null [8]", "customers_sparse_dist [8] [17]", "customers_sparse_dist [17] null");
        List<String> splits1 = this.getTestAssignSnapshotSplits(4, 2.0, (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[]{"customers_sparse_dist"});
        Assert.assertEquals(expected1, splits1);
        List<String> expected2 = Arrays.asList("customers_sparse_dist null [18]", "customers_sparse_dist [18] null");
        List<String> splits2 = this.getTestAssignSnapshotSplits(8, 10.0, (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[]{"customers_sparse_dist"});
        Assert.assertEquals(expected2, splits2);
    }

    @Test
    public void testAssignTableWithDenseDistributionSplitKey() {
        List<String> expected = Arrays.asList("customers_dense_dist null [2]", "customers_dense_dist [2] [3]", "customers_dense_dist [3] null");
        List<String> splits = this.getTestAssignSnapshotSplits(2, (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[]{"customers_dense_dist"});
        Assert.assertEquals(expected, splits);
        List<String> expected1 = Arrays.asList("customers_dense_dist null [2]", "customers_dense_dist [2] null");
        List<String> splits1 = this.getTestAssignSnapshotSplits(2, (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), 0.9, new String[]{"customers_dense_dist"});
        Assert.assertEquals(expected1, splits1);
    }

    @Test
    public void testAssignTableWithSingleLine() {
        List<String> expected = Collections.singletonList("customer_card_single_line null null");
        List<String> splits = this.getTestAssignSnapshotSplits(4, (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[]{"customer_card_single_line"});
        Assert.assertEquals(expected, splits);
    }

    @Test
    public void testAssignTableWithCombinedIntSplitKey() {
        List<String> expected = Arrays.asList("shopping_cart null [user_2]", "shopping_cart [user_2] [user_4]", "shopping_cart [user_4] [user_5]", "shopping_cart [user_5] null");
        List<String> splits = this.getTestAssignSnapshotSplits(4, (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[]{"shopping_cart"});
        Assert.assertEquals(expected, splits);
    }

    @Test
    public void testAssignTableWithConfiguredStringSplitKey() {
        List<String> expected = Arrays.asList("shopping_cart null [user_2]", "shopping_cart [user_2] [user_4]", "shopping_cart [user_4] [user_5]", "shopping_cart [user_5] null");
        List<String> splits = this.getTestAssignSnapshotSplits(4, (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[]{"shopping_cart"});
        Assert.assertEquals(expected, splits);
    }

    @Test
    public void testAssignMinSplitSize() {
        List<String> expected = Arrays.asList("customers_even_dist null [103]", "customers_even_dist [103] [105]", "customers_even_dist [105] [107]", "customers_even_dist [107] [109]", "customers_even_dist [109] null");
        List<String> splits = this.getTestAssignSnapshotSplits(2, (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[]{"customers_even_dist"});
        Assert.assertEquals(expected, splits);
    }

    @Test
    public void testAssignMaxSplitSize() {
        List<String> expected = Collections.singletonList("customers_even_dist null null");
        List<String> splits = this.getTestAssignSnapshotSplits(8096, (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[]{"customers_even_dist"});
        Assert.assertEquals(expected, splits);
    }

    @Test
    public void testUnMatchedPrimaryKey() {
        try {
            this.getTestAssignSnapshotSplits(4, (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[]{"customer_card"});
        }
        catch (Throwable t) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)t, (String)"The defined primary key [card_no] in Flink is not matched with actual primary key [card_no, level] in MySQL").isPresent());
        }
    }

    @Test
    public void testTableWithoutPrimaryKey() {
        String tableWithoutPrimaryKey = "customers_no_pk";
        try {
            this.getTestAssignSnapshotSplits(4, (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[]{tableWithoutPrimaryKey});
        }
        catch (Throwable t) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)t, (String)"'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't have primary keys.").isPresent());
        }
    }

    @Test
    public void testEnumerateTablesLazily() {
        MySqlSourceConfig configuration = this.getConfig(customerDatabase, 4, (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[]{"customers_even_dist"}, "id", false);
        MySqlSnapshotSplitAssigner assigner = new MySqlSnapshotSplitAssigner(configuration, 4, new ArrayList(), false);
        Assert.assertTrue((boolean)assigner.needToDiscoveryTables());
        assigner.open();
        Assert.assertTrue((boolean)assigner.getNext().isPresent());
        Assert.assertFalse((boolean)assigner.needToDiscoveryTables());
    }

    @Test
    public void testScanNewlyAddedTableStartFromCheckpoint() {
        List<String> expected = Arrays.asList("customers_sparse_dist [109] null", "customers_even_dist null [10]", "customers_even_dist [10] [18]", "customers_even_dist [18] null", "customer_card_single_line null null");
        Assert.assertEquals(expected, this.getTestAssignSnapshotSplitsFromCheckpoint());
    }

    private List<String> getTestAssignSnapshotSplits(int splitSize, double distributionFactorUpper, double distributionFactorLower, String[] captureTables) {
        return this.getTestAssignSnapshotSplits(customerDatabase, splitSize, distributionFactorUpper, distributionFactorLower, captureTables, null);
    }

    private List<String> getTestAssignSnapshotSplits(UniqueDatabase database, int splitSize, double distributionFactorUpper, double distributionFactorLower, String[] captureTables, String chunkKeyColumn) {
        MySqlSourceConfig configuration = this.getConfig(database, splitSize, distributionFactorUpper, distributionFactorLower, captureTables, chunkKeyColumn, false);
        List remainingTables = Arrays.stream(captureTables).map(t -> database.getDatabaseName() + "." + t).map(TableId::parse).collect(Collectors.toList());
        MySqlSnapshotSplitAssigner assigner = new MySqlSnapshotSplitAssigner(configuration, 4, remainingTables, false);
        return this.getSplitsFromAssigner(assigner);
    }

    private List<String> getTestAssignSnapshotSplitsFromCheckpoint() {
        TableId newTable = TableId.parse((String)(customerDatabase.getDatabaseName() + ".customer_card_single_line"));
        TableId processedTable = TableId.parse((String)(customerDatabase.getDatabaseName() + ".customers_sparse_dist"));
        TableId splitTable = TableId.parse((String)(customerDatabase.getDatabaseName() + ".customers_even_dist"));
        String[] captureTables = new String[]{newTable.table(), processedTable.table(), splitTable.table()};
        MySqlSourceConfig configuration = this.getConfig(customerDatabase, 4, (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), (Double)MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), captureTables, null, true);
        ArrayList remainingTables = new ArrayList();
        ArrayList<TableId> alreadyProcessedTables = new ArrayList<TableId>();
        alreadyProcessedTables.add(processedTable);
        RowType splitKeyType = ChunkUtils.getChunkKeyColumnType((Column)Column.editor().name("id").type("INT").jdbcType(4).create());
        List<MySqlSchemalessSnapshotSplit> remainingSplits = Arrays.asList(new MySqlSchemalessSnapshotSplit(processedTable, processedTable + ":2", splitKeyType, new Object[]{109}, null, null), new MySqlSchemalessSnapshotSplit(splitTable, splitTable + ":0", splitKeyType, null, new Object[]{10}, null), new MySqlSchemalessSnapshotSplit(splitTable, splitTable + ":1", splitKeyType, new Object[]{10}, new Object[]{18}, null), new MySqlSchemalessSnapshotSplit(splitTable, splitTable + ":2", splitKeyType, new Object[]{18}, null, null));
        HashMap<String, MySqlSchemalessSnapshotSplit> assignedSplits = new HashMap<String, MySqlSchemalessSnapshotSplit>();
        assignedSplits.put(processedTable + ":0", new MySqlSchemalessSnapshotSplit(processedTable, processedTable + ":0", splitKeyType, null, new Object[]{105}, null));
        assignedSplits.put(processedTable + ":1", new MySqlSchemalessSnapshotSplit(processedTable, processedTable + ":1", splitKeyType, new Object[]{105}, new Object[]{109}, null));
        HashMap<String, BinlogOffset> splitFinishedOffsets = new HashMap<String, BinlogOffset>();
        splitFinishedOffsets.put(processedTable + ":0", BinlogOffset.ofEarliest());
        SnapshotPendingSplitsState checkpoint = new SnapshotPendingSplitsState(alreadyProcessedTables, remainingSplits, assignedSplits, new HashMap(), splitFinishedOffsets, AssignerStatus.INITIAL_ASSIGNING, remainingTables, false, true, ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
        MySqlSnapshotSplitAssigner assigner = new MySqlSnapshotSplitAssigner(configuration, 4, checkpoint);
        return this.getSplitsFromAssigner(assigner);
    }

    private List<String> getSplitsFromAssigner(MySqlSnapshotSplitAssigner assigner) {
        Optional split2;
        assigner.open();
        ArrayList sqlSplits = new ArrayList();
        while ((split2 = assigner.getNext()).isPresent()) {
            sqlSplits.add(split2.get());
        }
        assigner.close();
        return sqlSplits.stream().map(split -> {
            if (split.isSnapshotSplit()) {
                return split.asSnapshotSplit().getTableId().table() + " " + Arrays.toString(split.asSnapshotSplit().getSplitStart()) + " " + Arrays.toString(split.asSnapshotSplit().getSplitEnd());
            }
            return split.toString();
        }).collect(Collectors.toList());
    }

    private MySqlSourceConfig getConfig(UniqueDatabase database, int splitSize, double distributionFactorUpper, double distributionLower, String[] captureTables, String chunkKeyColumn, boolean scanNewlyAddedTableEnabled) {
        HashMap<ObjectPath, String> chunkKeys = new HashMap<ObjectPath, String>();
        for (String table : captureTables) {
            chunkKeys.put(new ObjectPath(database.getDatabaseName(), table), chunkKeyColumn);
        }
        String[] fullNames = new String[captureTables.length];
        for (int i = 0; i < captureTables.length; ++i) {
            fullNames[i] = database.getDatabaseName() + "." + captureTables[i];
        }
        return new MySqlSourceConfigFactory().startupOptions(StartupOptions.initial()).databaseList(new String[]{database.getDatabaseName()}).tableList(fullNames).hostname(MYSQL_CONTAINER.getHost()).port(MYSQL_CONTAINER.getDatabasePort()).splitSize(splitSize).fetchSize(2).distributionFactorUpper(distributionFactorUpper).distributionFactorLower(distributionLower).username(database.getUsername()).password(database.getPassword()).serverTimeZone(ZoneId.of("UTC").toString()).chunkKeyColumn(chunkKeys).scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled).createConfig(0);
    }
}

