/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.kafka;

import java.util.Collections;
import java.util.HashMap;
import java.util.Optional;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.AbstractFlinkTableFactory;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.kafka.KafkaLogStoreFactory;
import org.apache.paimon.flink.kafka.KafkaLogTestUtils;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.flink.sink.FlinkTableSink;
import org.apache.paimon.flink.source.DataTableSource;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

public class KafkaLogStoreFactoryTest {
    @ParameterizedTest
    @EnumSource(value=CoreOptions.StartupMode.class)
    public void testCreateKafkaLogStoreFactory(CoreOptions.StartupMode startupMode) {
        HashMap<String, String> dynamicOptions = new HashMap<String, String>();
        dynamicOptions.put(FlinkConnectorOptions.LOG_SYSTEM.key(), "kafka");
        dynamicOptions.put(CoreOptions.SCAN_MODE.key(), startupMode.toString());
        if (startupMode == CoreOptions.StartupMode.FROM_SNAPSHOT || startupMode == CoreOptions.StartupMode.FROM_SNAPSHOT_FULL) {
            dynamicOptions.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), "1");
        } else if (startupMode == CoreOptions.StartupMode.FROM_TIMESTAMP) {
            dynamicOptions.put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), String.valueOf(System.currentTimeMillis()));
        }
        dynamicOptions.put(CoreOptions.SCAN_MODE.key(), startupMode.toString());
        DynamicTableFactory.Context context = KafkaLogTestUtils.testContext("table", "", CoreOptions.LogChangelogMode.AUTO, CoreOptions.LogConsistency.TRANSACTIONAL, org.apache.flink.table.types.logical.RowType.of((LogicalType[])new LogicalType[]{new org.apache.flink.table.types.logical.IntType(), new org.apache.flink.table.types.logical.IntType()}), new int[]{0}, dynamicOptions);
        try {
            Optional optional = AbstractFlinkTableFactory.createOptionalLogStoreFactory((DynamicTableFactory.Context)context);
            Assertions.assertThat((Comparable)startupMode).isNotIn(new Object[]{CoreOptions.StartupMode.FROM_SNAPSHOT, CoreOptions.StartupMode.FROM_SNAPSHOT_FULL});
            Assertions.assertThat((boolean)optional.isPresent()).isTrue();
            Assertions.assertThat(optional.get()).isInstanceOf(KafkaLogStoreFactory.class);
        }
        catch (ValidationException e) {
            Assertions.assertThat((Comparable)startupMode).isIn(new Object[]{CoreOptions.StartupMode.FROM_SNAPSHOT, CoreOptions.StartupMode.FROM_SNAPSHOT_FULL});
        }
    }

    @Test
    public void testInputChangelogProducerWithKafkaLog(@TempDir java.nio.file.Path temp) throws Exception {
        Options options = new Options();
        options.set(CoreOptions.CHANGELOG_PRODUCER, (Object)CoreOptions.ChangelogProducer.INPUT);
        Path path = new Path(temp.toUri().toString());
        new SchemaManager((FileIO)LocalFileIO.create(), path).createTable(new Schema(RowType.of((DataType[])new DataType[]{new IntType(), new IntType()}).getFields(), Collections.emptyList(), Collections.singletonList("f0"), options.toMap(), ""));
        FileStoreTable table = FileStoreTableFactory.create((FileIO)LocalFileIO.create(), (Path)path);
        ObjectIdentifier identifier = ObjectIdentifier.of((String)"c", (String)"d", (String)"t");
        DataTableSource source = new DataTableSource(identifier, (Table)table, true, null, (LogStoreTableFactory)new KafkaLogStoreFactory());
        Assertions.assertThat((Object)source.getChangelogMode()).isEqualTo((Object)ChangelogMode.upsert());
        FlinkTableSink sink = new FlinkTableSink(identifier, (Table)table, null, null);
        Assertions.assertThat((Object)sink.getChangelogMode(ChangelogMode.all())).isEqualTo((Object)ChangelogMode.all());
    }
}

