/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.kinesis.connectors.flink.util;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.util.Preconditions;
import software.amazon.kinesis.connectors.flink.config.AWSConfigConstants;
import software.amazon.kinesis.connectors.flink.config.ConsumerConfigConstants;
import software.amazon.kinesis.connectors.flink.util.AWSUtil;
import software.amazon.kinesis.shaded.com.amazonaws.regions.Regions;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;

@Internal
public class KinesisConfigUtil {
    protected static final String COLLECTION_MAX_COUNT = "CollectionMaxCount";
    protected static final String AGGREGATION_MAX_COUNT = "AggregationMaxCount";
    protected static final String RATE_LIMIT = "RateLimit";
    protected static final String THREADING_MODEL = "ThreadingModel";
    protected static final String THREAD_POOL_SIZE = "ThreadPoolSize";
    protected static final long DEFAULT_RATE_LIMIT = 100L;
    protected static final KinesisProducerConfiguration.ThreadingModel DEFAULT_THREADING_MODEL = KinesisProducerConfiguration.ThreadingModel.POOLED;
    protected static final int DEFAULT_THREAD_POOL_SIZE = 10;

    public static void validateConsumerConfiguration(Properties config) {
        KinesisConfigUtil.validateConsumerConfiguration(config, Collections.emptyList());
    }

    public static void validateConsumerConfiguration(Properties config, List<String> streams) {
        Preconditions.checkNotNull((Object)config, (String)"config can not be null");
        KinesisConfigUtil.validateAwsConfiguration(config);
        ConsumerConfigConstants.RecordPublisherType recordPublisherType = KinesisConfigUtil.validateRecordPublisherType(config);
        if (recordPublisherType == ConsumerConfigConstants.RecordPublisherType.EFO) {
            KinesisConfigUtil.validateEfoConfiguration(config, streams);
        }
        if (!config.containsKey("aws.region") && !config.containsKey("aws.endpoint")) {
            throw new IllegalArgumentException(String.format("For FlinkKinesisConsumer AWS region ('%s') and/or AWS endpoint ('%s') must be set in the config.", "aws.region", "aws.endpoint"));
        }
        if (config.containsKey("flink.stream.initpos")) {
            String initPosType = config.getProperty("flink.stream.initpos");
            try {
                ConsumerConfigConstants.InitialPosition.valueOf(initPosType);
            }
            catch (IllegalArgumentException e) {
                String errorMessage = Arrays.stream(ConsumerConfigConstants.InitialPosition.values()).map(Enum::name).collect(Collectors.joining(", "));
                throw new IllegalArgumentException("Invalid initial position in stream set in config. Valid values are: " + errorMessage);
            }
            if (ConsumerConfigConstants.InitialPosition.valueOf(initPosType) == ConsumerConfigConstants.InitialPosition.AT_TIMESTAMP) {
                if (!config.containsKey("flink.stream.initpos.timestamp")) {
                    throw new IllegalArgumentException("Please set value for initial timestamp ('flink.stream.initpos.timestamp') when using AT_TIMESTAMP initial position.");
                }
                KinesisConfigUtil.validateOptionalDateProperty(config, "flink.stream.initpos.timestamp", config.getProperty("flink.stream.initpos.timestamp.format", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), "Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream. Must be a valid format: yyyy-MM-dd'T'HH:mm:ss.SSSXXX or non-negative double value. For example, 2016-04-04T19:58:46.480-00:00 or 1459799926.480 .");
            }
        }
        KinesisConfigUtil.validateOptionalPositiveIntProperty(config, "flink.shard.getrecords.maxrecordcount", "Invalid value given for maximum records per getRecords shard operation. Must be a valid non-negative integer value.");
        KinesisConfigUtil.validateOptionalPositiveIntProperty(config, "flink.shard.getrecords.maxretries", "Invalid value given for maximum retry attempts for getRecords shard operation. Must be a valid non-negative integer value.");
        KinesisConfigUtil.validateOptionalPositiveLongProperty(config, "flink.shard.getrecords.backoff.base", "Invalid value given for get records operation base backoff milliseconds. Must be a valid non-negative long value.");
        KinesisConfigUtil.validateOptionalPositiveLongProperty(config, "flink.shard.getrecords.backoff.max", "Invalid value given for get records operation max backoff milliseconds. Must be a valid non-negative long value.");
        KinesisConfigUtil.validateOptionalPositiveDoubleProperty(config, "flink.shard.getrecords.backoff.expconst", "Invalid value given for get records operation backoff exponential constant. Must be a valid non-negative double value.");
        KinesisConfigUtil.validateOptionalPositiveLongProperty(config, "flink.shard.getrecords.intervalmillis", "Invalid value given for getRecords sleep interval in milliseconds. Must be a valid non-negative long value.");
        KinesisConfigUtil.validateOptionalPositiveIntProperty(config, "flink.shard.getiterator.maxretries", "Invalid value given for maximum retry attempts for getShardIterator shard operation. Must be a valid non-negative integer value.");
        KinesisConfigUtil.validateOptionalPositiveLongProperty(config, "flink.shard.getiterator.backoff.base", "Invalid value given for get shard iterator operation base backoff milliseconds. Must be a valid non-negative long value.");
        KinesisConfigUtil.validateOptionalPositiveLongProperty(config, "flink.shard.getiterator.backoff.max", "Invalid value given for get shard iterator operation max backoff milliseconds. Must be a valid non-negative long value.");
        KinesisConfigUtil.validateOptionalPositiveDoubleProperty(config, "flink.shard.getiterator.backoff.expconst", "Invalid value given for get shard iterator operation backoff exponential constant. Must be a valid non-negative double value.");
        KinesisConfigUtil.validateOptionalPositiveLongProperty(config, "flink.shard.discovery.intervalmillis", "Invalid value given for shard discovery sleep interval in milliseconds. Must be a valid non-negative long value.");
        KinesisConfigUtil.validateOptionalPositiveLongProperty(config, "flink.list.shards.backoff.base", "Invalid value given for list shards operation base backoff milliseconds. Must be a valid non-negative long value.");
        KinesisConfigUtil.validateOptionalPositiveLongProperty(config, "flink.list.shards.backoff.max", "Invalid value given for list shards operation max backoff milliseconds. Must be a valid non-negative long value.");
        KinesisConfigUtil.validateOptionalPositiveDoubleProperty(config, "flink.list.shards.backoff.expconst", "Invalid value given for list shards operation backoff exponential constant. Must be a valid non-negative double value.");
        KinesisConfigUtil.validateOptionalPositiveIntProperty(config, "flink.stream.describestreamconsumer.maxretries", "Invalid value given for maximum retry attempts for describe stream consumer operation. Must be a valid non-negative int value.");
        KinesisConfigUtil.validateOptionalPositiveLongProperty(config, "flink.stream.describestreamconsumer.backoff.max", "Invalid value given for describe stream consumer operation max backoff milliseconds. Must be a valid non-negative long value.");
        KinesisConfigUtil.validateOptionalPositiveDoubleProperty(config, "flink.stream.describestreamconsumer.backoff.expconst", "Invalid value given for describe stream consumer operation backoff exponential constant. Must be a valid non-negative double value.");
        KinesisConfigUtil.validateOptionalPositiveLongProperty(config, "flink.stream.describestreamconsumer.backoff.base", "Invalid value given for describe stream consumer operation base backoff milliseconds. Must be a valid non-negative long value.");
        KinesisConfigUtil.validateOptionalPositiveIntProperty(config, "flink.stream.registerstreamconsumer.maxretries", "Invalid value given for maximum retry attempts for register stream operation. Must be a valid non-negative integer value.");
        KinesisConfigUtil.validateOptionalPositiveIntProperty(config, "flink.stream.registerstreamconsumer.timeout", "Invalid value given for maximum timeout for register stream consumer. Must be a valid non-negative integer value.");
        KinesisConfigUtil.validateOptionalPositiveLongProperty(config, "flink.stream.registerstreamconsumer.backoff.max", "Invalid value given for register stream operation max backoff milliseconds. Must be a valid non-negative long value.");
        KinesisConfigUtil.validateOptionalPositiveLongProperty(config, "flink.stream.registerstreamconsumer.backoff.base", "Invalid value given for register stream operation base backoff milliseconds. Must be a valid non-negative long value.");
        KinesisConfigUtil.validateOptionalPositiveDoubleProperty(config, "flink.stream.registerstreamconsumer.backoff.expconst", "Invalid value given for register stream operation backoff exponential constant. Must be a valid non-negative double value.");
        KinesisConfigUtil.validateOptionalPositiveIntProperty(config, "flink.stream.deregisterstreamconsumer.maxretries", "Invalid value given for maximum retry attempts for deregister stream operation. Must be a valid non-negative integer value.");
        KinesisConfigUtil.validateOptionalPositiveIntProperty(config, "flink.stream.deregisterstreamconsumer.timeout", "Invalid value given for maximum timeout for deregister stream consumer. Must be a valid non-negative integer value.");
        KinesisConfigUtil.validateOptionalPositiveLongProperty(config, "flink.stream.deregisterstreamconsumer.backoff.base", "Invalid value given for deregister stream operation base backoff milliseconds. Must be a valid non-negative long value.");
        KinesisConfigUtil.validateOptionalPositiveLongProperty(config, "flink.stream.deregisterstreamconsumer.backoff.max", "Invalid value given for deregister stream operation max backoff milliseconds. Must be a valid non-negative long value.");
        KinesisConfigUtil.validateOptionalPositiveDoubleProperty(config, "flink.stream.deregisterstreamconsumer.backoff.expconst", "Invalid value given for deregister stream operation backoff exponential constant. Must be a valid non-negative double value.");
        KinesisConfigUtil.validateOptionalPositiveIntProperty(config, "flink.shard.subscribetoshard.maxretries", "Invalid value given for maximum retry attempts for subscribe to shard operation. Must be a valid non-negative integer value.");
        KinesisConfigUtil.validateOptionalPositiveLongProperty(config, "flink.shard.subscribetoshard.backoff.base", "Invalid value given for subscribe to shard operation base backoff milliseconds. Must be a valid non-negative long value.");
        KinesisConfigUtil.validateOptionalPositiveLongProperty(config, "flink.shard.subscribetoshard.backoff.max", "Invalid value given for subscribe to shard operation max backoff milliseconds. Must be a valid non-negative long value.");
        KinesisConfigUtil.validateOptionalPositiveDoubleProperty(config, "flink.shard.subscribetoshard.backoff.expconst", "Invalid value given for subscribe to shard operation backoff exponential constant. Must be a valid non-negative double value.");
        if (config.containsKey("flink.shard.getrecords.intervalmillis")) {
            Preconditions.checkArgument((Long.parseLong(config.getProperty("flink.shard.getrecords.intervalmillis")) < 300000L ? 1 : 0) != 0, (Object)"Invalid value given for getRecords sleep interval in milliseconds. Must be lower than 300000 milliseconds.");
        }
        KinesisConfigUtil.validateOptionalPositiveIntProperty(config, "flink.stream.efo.http-client.max-concurrency", "Invalid value given for EFO HTTP client max concurrency. Must be positive.");
    }

    public static ConsumerConfigConstants.RecordPublisherType validateRecordPublisherType(Properties config) {
        if (config.containsKey("flink.stream.recordpublisher")) {
            String recordPublisherType = config.getProperty("flink.stream.recordpublisher");
            try {
                return ConsumerConfigConstants.RecordPublisherType.valueOf(recordPublisherType);
            }
            catch (IllegalArgumentException e) {
                String errorMessage = Arrays.stream(ConsumerConfigConstants.RecordPublisherType.values()).map(Enum::name).collect(Collectors.joining(", "));
                throw new IllegalArgumentException("Invalid record publisher type in stream set in config. Valid values are: " + errorMessage);
            }
        }
        return ConsumerConfigConstants.RecordPublisherType.POLLING;
    }

    public static void validateEfoConfiguration(Properties config, List<String> streams) {
        ConsumerConfigConstants.EFORegistrationType efoRegistrationType;
        if (config.containsKey("flink.stream.efo.registration")) {
            String typeInString = config.getProperty("flink.stream.efo.registration");
            try {
                efoRegistrationType = ConsumerConfigConstants.EFORegistrationType.valueOf(typeInString);
            }
            catch (IllegalArgumentException e) {
                String errorMessage = Arrays.stream(ConsumerConfigConstants.EFORegistrationType.values()).map(Enum::name).collect(Collectors.joining(", "));
                throw new IllegalArgumentException("Invalid efo registration type in stream set in config. Valid values are: " + errorMessage);
            }
        } else {
            efoRegistrationType = ConsumerConfigConstants.EFORegistrationType.LAZY;
        }
        if (efoRegistrationType == ConsumerConfigConstants.EFORegistrationType.NONE) {
            ArrayList<String> missingConsumerArnKeys = new ArrayList<String>();
            for (String stream : streams) {
                String efoConsumerARNKey = "flink.stream.efo.consumerarn." + stream;
                if (config.containsKey(efoConsumerARNKey)) continue;
                missingConsumerArnKeys.add(efoConsumerARNKey);
            }
            if (!missingConsumerArnKeys.isEmpty()) {
                String errorMessage = Arrays.stream(missingConsumerArnKeys.toArray()).map(Object::toString).collect(Collectors.joining(", "));
                throw new IllegalArgumentException("Invalid efo consumer arn settings for not providing consumer arns: " + errorMessage);
            }
        } else if (!config.containsKey("flink.stream.efo.consumername")) {
            throw new IllegalArgumentException("No valid enhanced fan-out consumer name is set through flink.stream.efo.consumername");
        }
    }

    public static Properties replaceDeprecatedProducerKeys(Properties configProps) {
        if (configProps.containsKey("aws.producer.collectionMaxCount")) {
            configProps.setProperty(COLLECTION_MAX_COUNT, configProps.getProperty("aws.producer.collectionMaxCount"));
            configProps.remove("aws.producer.collectionMaxCount");
        }
        if (configProps.containsKey("aws.producer.aggregationMaxCount")) {
            configProps.setProperty(AGGREGATION_MAX_COUNT, configProps.getProperty("aws.producer.aggregationMaxCount"));
            configProps.remove("aws.producer.aggregationMaxCount");
        }
        return configProps;
    }

    public static Properties backfillConsumerKeys(Properties configProps) {
        HashMap<String, String> oldKeyToNewKeys = new HashMap<String, String>();
        oldKeyToNewKeys.put("flink.stream.describe.backoff.base", "flink.list.shards.backoff.base");
        oldKeyToNewKeys.put("flink.stream.describe.backoff.max", "flink.list.shards.backoff.max");
        oldKeyToNewKeys.put("flink.stream.describe.backoff.expconst", "flink.list.shards.backoff.expconst");
        for (Map.Entry entry : oldKeyToNewKeys.entrySet()) {
            String oldKey = (String)entry.getKey();
            String newKey = (String)entry.getValue();
            if (!configProps.containsKey(oldKey)) continue;
            configProps.setProperty(newKey, configProps.getProperty(oldKey));
        }
        return configProps;
    }

    public static KinesisProducerConfiguration getValidatedProducerConfiguration(Properties config) {
        Preconditions.checkNotNull((Object)config, (String)"config can not be null");
        KinesisConfigUtil.validateAwsConfiguration(config);
        if (!config.containsKey("aws.region")) {
            throw new IllegalArgumentException(String.format("For FlinkKinesisProducer AWS region ('%s') must be set in the config.", "aws.region"));
        }
        KinesisProducerConfiguration kpc = KinesisProducerConfiguration.fromProperties(config);
        kpc.setRegion(config.getProperty("aws.region"));
        kpc.setCredentialsProvider(AWSUtil.getCredentialsProvider(config));
        kpc.setCredentialsRefreshDelay(100L);
        if (!config.containsKey(RATE_LIMIT)) {
            kpc.setRateLimit(100L);
        }
        if (!config.containsKey(THREADING_MODEL)) {
            kpc.setThreadingModel(DEFAULT_THREADING_MODEL);
        }
        if (!config.containsKey(THREAD_POOL_SIZE)) {
            kpc.setThreadPoolSize(10);
        }
        return kpc;
    }

    public static void validateAwsConfiguration(Properties config) {
        if (config.containsKey("aws.credentials.provider")) {
            AWSConfigConstants.CredentialProvider providerType;
            String credentialsProviderType = config.getProperty("aws.credentials.provider");
            try {
                providerType = AWSConfigConstants.CredentialProvider.valueOf(credentialsProviderType);
            }
            catch (IllegalArgumentException e) {
                StringBuilder sb = new StringBuilder();
                for (AWSConfigConstants.CredentialProvider type : AWSConfigConstants.CredentialProvider.values()) {
                    sb.append(type.toString()).append(", ");
                }
                throw new IllegalArgumentException("Invalid AWS Credential Provider Type set in config. Valid values are: " + sb.toString());
            }
            if (!(providerType != AWSConfigConstants.CredentialProvider.BASIC || config.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID) && config.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY))) {
                throw new IllegalArgumentException("Please set values for AWS Access Key ID ('" + AWSConfigConstants.AWS_ACCESS_KEY_ID + "') and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");
            }
        }
        if (config.containsKey("aws.region") && !AWSUtil.isValidRegion(config.getProperty("aws.region"))) {
            StringBuilder sb = new StringBuilder();
            for (Regions region : Regions.values()) {
                sb.append(region.getName()).append(", ");
            }
            throw new IllegalArgumentException("Invalid AWS region set in config. Valid values are: " + sb.toString());
        }
    }

    public static Date parseStreamTimestampStartingPosition(Properties consumerConfig) {
        String timestamp = consumerConfig.getProperty("flink.stream.initpos.timestamp");
        try {
            String format = consumerConfig.getProperty("flink.stream.initpos.timestamp.format", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
            SimpleDateFormat customDateFormat = new SimpleDateFormat(format);
            return customDateFormat.parse(timestamp);
        }
        catch (IllegalArgumentException | NullPointerException exception) {
            throw new IllegalArgumentException(exception);
        }
        catch (ParseException exception) {
            return new Date((long)(Double.parseDouble(timestamp) * 1000.0));
        }
    }

    private static void validateOptionalPositiveLongProperty(Properties config, String key, String message) {
        if (config.containsKey(key)) {
            try {
                long value = Long.parseLong(config.getProperty(key));
                if (value < 0L) {
                    throw new NumberFormatException();
                }
            }
            catch (NumberFormatException e) {
                throw new IllegalArgumentException(message);
            }
        }
    }

    private static void validateOptionalPositiveIntProperty(Properties config, String key, String message) {
        if (config.containsKey(key)) {
            try {
                int value = Integer.parseInt(config.getProperty(key));
                if (value < 0) {
                    throw new NumberFormatException();
                }
            }
            catch (NumberFormatException e) {
                throw new IllegalArgumentException(message);
            }
        }
    }

    private static void validateOptionalPositiveDoubleProperty(Properties config, String key, String message) {
        if (config.containsKey(key)) {
            try {
                double value = Double.parseDouble(config.getProperty(key));
                if (value < 0.0) {
                    throw new NumberFormatException();
                }
            }
            catch (NumberFormatException e) {
                throw new IllegalArgumentException(message);
            }
        }
    }

    private static void validateOptionalDateProperty(Properties config, String timestampKey, String format, String message) {
        if (config.containsKey(timestampKey)) {
            try {
                SimpleDateFormat customDateFormat = new SimpleDateFormat(format);
                customDateFormat.parse(config.getProperty(timestampKey));
            }
            catch (IllegalArgumentException | NullPointerException exception) {
                throw new IllegalArgumentException(message);
            }
            catch (ParseException exception) {
                try {
                    double value = Double.parseDouble(config.getProperty(timestampKey));
                    if (value < 0.0) {
                        throw new IllegalArgumentException(message);
                    }
                }
                catch (NumberFormatException numberFormatException) {
                    throw new IllegalArgumentException(message);
                }
            }
        }
    }
}

