/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.sqs.table;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
import org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory;
import org.apache.flink.connector.base.table.AsyncSinkConnectorOptions;
import org.apache.flink.connector.sqs.table.SqsConnectorOptions;
import org.apache.flink.connector.sqs.table.SqsDynamicSink;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;

@Internal
public class SqsDynamicTableFactory
extends AsyncDynamicTableSinkFactory {
    private static final String IDENTIFIER = "sqs";

    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        AsyncDynamicTableSinkFactory.AsyncDynamicSinkContext factoryContext = new AsyncDynamicTableSinkFactory.AsyncDynamicSinkContext((AsyncDynamicTableSinkFactory)this, (AsyncDynamicTableSinkFactory)this, context);
        factoryContext.getFactoryHelper().validate();
        ReadableConfig config = factoryContext.getTableOptions();
        Properties clientProperties = this.getSqsClientProperties(config);
        AWSGeneralUtil.validateAwsConfiguration((Properties)clientProperties);
        SqsDynamicSink.SqsDynamicSinkBuilder builder = SqsDynamicSink.builder().setSqsQueueUrl((String)config.get(SqsConnectorOptions.QUEUE_URL)).setEncodingFormat((EncodingFormat<SerializationSchema<RowData>>)factoryContext.getEncodingFormat()).setSqsClientProperties(clientProperties).setConsumedDataType(factoryContext.getPhysicalDataType()).setFailOnError((Boolean)config.get(SqsConnectorOptions.FAIL_ON_ERROR));
        this.addAsyncOptionsToBuilder(this.getAsyncSinkOptions(config), builder);
        return builder.build();
    }

    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    public Set<ConfigOption<?>> requiredOptions() {
        HashSet options = new HashSet();
        options.add(SqsConnectorOptions.QUEUE_URL);
        options.add(SqsConnectorOptions.AWS_REGION);
        options.add(FactoryUtil.FORMAT);
        return options;
    }

    public Set<ConfigOption<?>> optionalOptions() {
        Set options = super.optionalOptions();
        options.add(SqsConnectorOptions.FAIL_ON_ERROR);
        options.add(SqsConnectorOptions.AWS_CONFIG_PROPERTIES);
        return options;
    }

    public Set<ConfigOption<?>> forwardOptions() {
        HashSet options = new HashSet();
        options.add(SqsConnectorOptions.QUEUE_URL);
        options.add(SqsConnectorOptions.AWS_REGION);
        options.add(SqsConnectorOptions.AWS_CONFIG_PROPERTIES);
        return options;
    }

    private Properties getSqsClientProperties(ReadableConfig config) {
        Properties properties = new Properties();
        properties.putAll(this.appendAwsPrefixToOptions((Map)config.get(SqsConnectorOptions.AWS_CONFIG_PROPERTIES)));
        return properties;
    }

    private Map<String, String> appendAwsPrefixToOptions(Map<String, String> options) {
        HashMap<String, String> prefixedProperties = new HashMap<String, String>();
        options.forEach((key, value) -> prefixedProperties.put("aws." + key, (String)value));
        return prefixedProperties;
    }

    private Properties getAsyncSinkOptions(ReadableConfig config) {
        Properties properties = new Properties();
        Optional.ofNullable((Long)config.get(AsyncSinkConnectorOptions.FLUSH_BUFFER_SIZE)).ifPresent(flushBufferSize -> properties.put(AsyncSinkConnectorOptions.FLUSH_BUFFER_SIZE.key(), flushBufferSize));
        Optional.ofNullable((Integer)config.get(AsyncSinkConnectorOptions.MAX_BATCH_SIZE)).ifPresent(maxBatchSize -> properties.put(AsyncSinkConnectorOptions.MAX_BATCH_SIZE.key(), maxBatchSize));
        Optional.ofNullable((Integer)config.get(AsyncSinkConnectorOptions.MAX_IN_FLIGHT_REQUESTS)).ifPresent(maxInflightRequests -> properties.put(AsyncSinkConnectorOptions.MAX_IN_FLIGHT_REQUESTS.key(), maxInflightRequests));
        Optional.ofNullable((Integer)config.get(AsyncSinkConnectorOptions.MAX_BUFFERED_REQUESTS)).ifPresent(maxBufferedRequests -> properties.put(AsyncSinkConnectorOptions.MAX_BUFFERED_REQUESTS.key(), maxBufferedRequests));
        Optional.ofNullable((Long)config.get(AsyncSinkConnectorOptions.FLUSH_BUFFER_TIMEOUT)).ifPresent(timeout -> properties.put(AsyncSinkConnectorOptions.FLUSH_BUFFER_TIMEOUT.key(), timeout));
        return properties;
    }
}

