/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mongodb.table;

import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSourceTestBase;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class MongoDBTimeZoneITCase
extends MongoDBSourceTestBase {
    private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    private final StreamTableEnvironment tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env, (EnvironmentSettings)EnvironmentSettings.newInstance().inStreamingMode().build());
    private final String localTimeZone;
    private final boolean parallelismSnapshot;

    public MongoDBTimeZoneITCase(String localTimeZone, boolean parallelismSnapshot) {
        this.localTimeZone = localTimeZone;
        this.parallelismSnapshot = parallelismSnapshot;
    }

    @Parameterized.Parameters(name="localTimeZone: {0}, parallelismSnapshot: {1}")
    public static Object[] parameters() {
        return new Object[][]{{"Asia/Shanghai", false}, {"Europe/Berlin", false}, {"UTC", false}, {"Asia/Shanghai", true}, {"Europe/Berlin", true}, {"UTC", true}};
    }

    @Before
    public void before() {
        TestValuesTableFactory.clearAllData();
        if (this.parallelismSnapshot) {
            this.env.setParallelism(4);
            this.env.enableCheckpointing(200L);
        } else {
            this.env.setParallelism(1);
        }
    }

    @Test
    public void testTemporalTypesWithTimeZone() throws Exception {
        this.tEnv.getConfig().setLocalTimeZone(ZoneId.of(this.localTimeZone));
        String database = CONTAINER.executeCommandFileInSeparateDatabase("column_type_test");
        String sourceDDL = String.format("CREATE TABLE full_types (\n    _id STRING,\n    timeField TIME,\n    dateField DATE,\n    dateToTimestampField TIMESTAMP(3),\n    dateToLocalTimestampField TIMESTAMP_LTZ(3),\n    timestampField TIMESTAMP(0),\n    timestampToLocalTimestampField TIMESTAMP_LTZ(0),\n    PRIMARY KEY (_id) NOT ENFORCED) WITH ( 'connector' = 'mongodb-cdc', 'hosts' = '%s', 'username' = '%s', 'password' = '%s', 'database' = '%s', 'collection' = '%s')", CONTAINER.getHostAndPort(), "flinkuser", "a1?~!@#$%^&*(){}[]<>.,+_-=/|:;", database, "full_types");
        this.tEnv.executeSql(sourceDDL);
        TableResult result = this.tEnv.executeSql("SELECT dateField,\ntimeField,\ndateToTimestampField,\ndateToLocalTimestampField,\ntimestampField,\ntimestampToLocalTimestampField\nFROM full_types");
        CloseableIterator iterator = result.collect();
        Object[] expectedSnapshot = null;
        switch (this.localTimeZone) {
            case "Asia/Shanghai": {
                expectedSnapshot = new String[]{"+I[2019-08-12, 01:54:14, 2019-08-12T01:54:14.692, 2019-08-11T17:54:14.692Z, 2019-08-12T01:47:44, 2019-08-11T17:47:44Z]"};
                break;
            }
            case "Europe/Berlin": {
                expectedSnapshot = new String[]{"+I[2019-08-11, 19:54:14, 2019-08-11T19:54:14.692, 2019-08-11T17:54:14.692Z, 2019-08-11T19:47:44, 2019-08-11T17:47:44Z]"};
                break;
            }
            default: {
                expectedSnapshot = new String[]{"+I[2019-08-11, 17:54:14, 2019-08-11T17:54:14.692, 2019-08-11T17:54:14.692Z, 2019-08-11T17:47:44, 2019-08-11T17:47:44Z]"};
            }
        }
        List<String> actualSnapshot = MongoDBTimeZoneITCase.fetchRows((Iterator<Row>)iterator, expectedSnapshot.length);
        Assert.assertThat(actualSnapshot, (Matcher)Matchers.containsInAnyOrder((Object[])expectedSnapshot));
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    @Test
    public void testDateAndTimestampToStringWithTimeZone() throws Exception {
        Object[] expectedSnapshot;
        this.tEnv.getConfig().setLocalTimeZone(ZoneId.of(this.localTimeZone));
        String database = CONTAINER.executeCommandFileInSeparateDatabase("column_type_test");
        String sourceDDL = String.format("CREATE TABLE full_types_1 (\n    _id STRING,\n    dateToLocalTimestampField STRING,\n    timestampToLocalTimestampField STRING,\n    PRIMARY KEY (_id) NOT ENFORCED) WITH ( 'connector' = 'mongodb-cdc', 'hosts' = '%s', 'username' = '%s', 'password' = '%s', 'database' = '%s', 'collection' = '%s')", CONTAINER.getHostAndPort(), "flinkuser", "a1?~!@#$%^&*(){}[]<>.,+_-=/|:;", database, "full_types");
        this.tEnv.executeSql(sourceDDL);
        TableResult result = this.tEnv.executeSql("SELECT dateToLocalTimestampField,\ntimestampToLocalTimestampField\nFROM full_types_1");
        CloseableIterator iterator = result.collect();
        switch (this.localTimeZone) {
            case "Asia/Shanghai": {
                expectedSnapshot = new String[]{"+I[2019-08-12T01:54:14.692+08:00, 2019-08-12T01:47:44+08:00]"};
                break;
            }
            case "Europe/Berlin": {
                expectedSnapshot = new String[]{"+I[2019-08-11T19:54:14.692+02:00, 2019-08-11T19:47:44+02:00]"};
                break;
            }
            default: {
                expectedSnapshot = new String[]{"+I[2019-08-11T17:54:14.692Z, 2019-08-11T17:47:44Z]"};
            }
        }
        List<String> actualSnapshot = MongoDBTimeZoneITCase.fetchRows((Iterator<Row>)iterator, expectedSnapshot.length);
        Assert.assertThat(actualSnapshot, (Matcher)Matchers.containsInAnyOrder((Object[])expectedSnapshot));
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    private static List<String> fetchRows(Iterator<Row> iter, int size) {
        ArrayList<String> rows = new ArrayList<String>(size);
        while (size > 0 && iter.hasNext()) {
            Row row = iter.next();
            rows.add(row.toString());
            --size;
        }
        return rows;
    }
}

