/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.db2.source;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.cdc.connectors.db2.Db2TestBase;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

public class Db2SourceITCase
extends Db2TestBase {
    @Rule
    public final Timeout timeoutPerTest = Timeout.seconds((long)300L);
    @Rule
    public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).setRpcServiceSharing(RpcServiceSharing.DEDICATED).withHaLeadershipControl().build());
    protected static final int DEFAULT_PARALLELISM = 4;

    @Test
    public void testReadSingleTableWithSingleParallelism() throws Exception {
        this.testDb2ParallelSource(1, FailoverType.NONE, FailoverPhase.NEVER, new String[]{"DB2INST1.CUSTOMERS"});
    }

    @Test
    public void testReadSingleTableWithMultipleParallelism() throws Exception {
        this.testDb2ParallelSource(4, FailoverType.NONE, FailoverPhase.NEVER, new String[]{"DB2INST1.CUSTOMERS"});
    }

    @Test
    public void testTaskManagerFailoverInSnapshotPhase() throws Exception {
        this.testDb2ParallelSource(FailoverType.TM, FailoverPhase.SNAPSHOT, new String[]{"DB2INST1.CUSTOMERS"});
    }

    @Test
    public void testTaskManagerFailoverInRedoLogsPhase() throws Exception {
        this.testDb2ParallelSource(FailoverType.TM, FailoverPhase.STREAM, new String[]{"DB2INST1.CUSTOMERS"});
    }

    @Test
    public void testJobManagerFailoverInSnapshotPhase() throws Exception {
        this.testDb2ParallelSource(FailoverType.JM, FailoverPhase.SNAPSHOT, new String[]{"DB2INST1.CUSTOMERS"});
    }

    @Test
    public void testJobManagerFailoverInRedoLogsPhase() throws Exception {
        this.testDb2ParallelSource(FailoverType.JM, FailoverPhase.STREAM, new String[]{"DB2INST1.CUSTOMERS"});
    }

    @Test
    public void testJobManagerFailoverSingleParallelism() throws Exception {
        this.testDb2ParallelSource(1, FailoverType.JM, FailoverPhase.SNAPSHOT, new String[]{"DB2INST1.CUSTOMERS"});
    }

    @Test
    public void testReadSingleTableWithSingleParallelismAndSkipBackfill() throws Exception {
        this.testDb2ParallelSource(4, FailoverType.TM, FailoverPhase.SNAPSHOT, new String[]{"DB2INST1.CUSTOMERS"}, true);
    }

    private void testDb2ParallelSource(FailoverType failoverType, FailoverPhase failoverPhase, String[] captureCustomerTables) throws Exception {
        this.testDb2ParallelSource(4, failoverType, failoverPhase, captureCustomerTables);
    }

    private void testDb2ParallelSource(int parallelism, FailoverType failoverType, FailoverPhase failoverPhase, String[] captureCustomerTables) throws Exception {
        this.testDb2ParallelSource(parallelism, failoverType, failoverPhase, captureCustomerTables, false);
    }

    private void testDb2ParallelSource(int parallelism, FailoverType failoverType, FailoverPhase failoverPhase, String[] captureCustomerTables, boolean skipSnapshotBackfill) throws Exception {
        this.initializeDb2Table("customers", "CUSTOMERS");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)env);
        env.setParallelism(parallelism);
        env.enableCheckpointing(1000L);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
        String sourceDDL = String.format("CREATE TABLE CUSTOMERS ( ID INT NOT NULL, NAME STRING, ADDRESS STRING, PHONE_NUMBER STRING, primary key (ID) not enforced) WITH ( 'connector' = 'db2-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = 'true', 'scan.incremental.snapshot.chunk.size' = '4', 'scan.incremental.snapshot.backfill.skip' = '%s')", DB2_CONTAINER.getHost(), DB2_CONTAINER.getMappedPort(50000), DB2_CONTAINER.getUsername(), DB2_CONTAINER.getPassword(), DB2_CONTAINER.getDatabaseName(), this.getTableNameRegex(captureCustomerTables), skipSnapshotBackfill);
        String[] snapshotForSingleTable = new String[]{"+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]"};
        tEnv.executeSql(sourceDDL);
        TableResult tableResult = tEnv.executeSql("select * from CUSTOMERS");
        CloseableIterator iterator = tableResult.collect();
        JobID jobId = ((JobClient)tableResult.getJobClient().get()).getJobID();
        ArrayList<String> expectedSnapshotData = new ArrayList<String>();
        for (int i = 0; i < captureCustomerTables.length; ++i) {
            expectedSnapshotData.addAll(Arrays.asList(snapshotForSingleTable));
        }
        if (failoverPhase == FailoverPhase.SNAPSHOT && iterator.hasNext()) {
            Db2SourceITCase.triggerFailover(failoverType, jobId, this.miniClusterResource.getMiniCluster(), () -> this.sleepMs(100L));
        }
        Db2SourceITCase.assertEqualsInAnyOrder(expectedSnapshotData, Db2SourceITCase.fetchRows((Iterator<Row>)iterator, expectedSnapshotData.size()));
        for (String tableId : captureCustomerTables) {
            this.makeFirstPartChangeStreamEvents(tableId);
        }
        if (failoverPhase == FailoverPhase.STREAM) {
            Db2SourceITCase.triggerFailover(failoverType, jobId, this.miniClusterResource.getMiniCluster(), () -> this.sleepMs(200L));
        }
        for (String tableId : captureCustomerTables) {
            this.makeSecondPartRedoLogsEvents(tableId);
        }
        String[] redoLogsForSingleTable = new String[]{"-U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Hangzhou, 123567891234]", "-D[102, user_2, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "-U[103, user_3, Hangzhou, 123567891234]", "+U[103, user_3, Shanghai, 123567891234]", "-U[1010, user_11, Shanghai, 123567891234]", "+U[1010, user_11, Hangzhou, 123567891234]", "+I[2001, user_22, Shanghai, 123567891234]", "+I[2002, user_23, Shanghai, 123567891234]", "+I[2003, user_24, Shanghai, 123567891234]"};
        ArrayList<String> expectedRedoLogsData = new ArrayList<String>();
        for (int i = 0; i < captureCustomerTables.length; ++i) {
            expectedRedoLogsData.addAll(Arrays.asList(redoLogsForSingleTable));
        }
        Db2SourceITCase.assertEqualsInAnyOrder(expectedRedoLogsData, Db2SourceITCase.fetchRows((Iterator<Row>)iterator, expectedRedoLogsData.size()));
        ((JobClient)tableResult.getJobClient().get()).cancel().get();
    }

    private void makeFirstPartChangeStreamEvents(String tableId) {
        this.executeSql("UPDATE " + tableId + " SET ADDRESS = 'Hangzhou' where ID = 103");
        this.executeSql("DELETE FROM " + tableId + " where ID = 102");
        this.executeSql("INSERT INTO " + tableId + " VALUES(102, 'user_2','Shanghai','123567891234')");
        this.executeSql("UPDATE " + tableId + " SET ADDRESS = 'Shanghai' where ID = 103");
    }

    private void makeSecondPartRedoLogsEvents(String tableId) {
        this.executeSql("UPDATE " + tableId + " SET ADDRESS = 'Hangzhou' where ID = 1010");
        this.executeSql("INSERT INTO " + tableId + " VALUES(2001, 'user_22','Shanghai','123567891234')");
        this.executeSql("INSERT INTO " + tableId + " VALUES(2002, 'user_23','Shanghai','123567891234')");
        this.executeSql("INSERT INTO " + tableId + " VALUES(2003, 'user_24','Shanghai','123567891234')");
    }

    private void sleepMs(long millis) {
        try {
            Thread.sleep(millis);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private static List<String> fetchRows(Iterator<Row> iter, int size) {
        ArrayList<String> rows = new ArrayList<String>(size);
        while (size > 0 && iter.hasNext()) {
            Row row = iter.next();
            rows.add(row.toString());
            --size;
        }
        return rows;
    }

    protected static void triggerFailover(FailoverType type, JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception {
        switch (type) {
            case TM: {
                Db2SourceITCase.restartTaskManager(miniCluster, afterFailAction);
                break;
            }
            case JM: {
                Db2SourceITCase.triggerJobManagerFailover(jobId, miniCluster, afterFailAction);
                break;
            }
            case NONE: {
                break;
            }
            default: {
                throw new IllegalStateException("Unexpected value: " + (Object)((Object)type));
            }
        }
    }

    protected static void triggerJobManagerFailover(JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception {
        HaLeadershipControl haLeadershipControl = (HaLeadershipControl)miniCluster.getHaLeadershipControl().get();
        haLeadershipControl.revokeJobMasterLeadership(jobId).get();
        afterFailAction.run();
        haLeadershipControl.grantJobMasterLeadership(jobId).get();
    }

    protected static void restartTaskManager(MiniCluster miniCluster, Runnable afterFailAction) throws Exception {
        miniCluster.terminateTaskManager(0).get();
        afterFailAction.run();
        miniCluster.startTaskManager();
    }

    protected static enum FailoverPhase {
        SNAPSHOT,
        STREAM,
        NEVER;

    }

    protected static enum FailoverType {
        TM,
        JM,
        NONE;

    }
}

