/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kinesis.util;

import java.util.List;
import java.util.Properties;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisException;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisherConfiguration;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.StreamConsumerRegistrar;
import org.apache.flink.streaming.connectors.kinesis.proxy.FullJitterBackoff;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxySyncV2Interface;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Factory;
import org.apache.flink.streaming.connectors.kinesis.util.AwsV2Util;

@Internal
public class StreamConsumerRegistrarUtil {
    public static void eagerlyRegisterStreamConsumers(Properties configProps, List<String> streams) {
        if (!AwsV2Util.isUsingEfoRecordPublisher(configProps) || !AwsV2Util.isEagerEfoRegistrationType(configProps)) {
            return;
        }
        StreamConsumerRegistrarUtil.registerStreamConsumers(configProps, streams);
    }

    public static void lazilyRegisterStreamConsumers(Properties configProps, List<String> streams) {
        if (!AwsV2Util.isUsingEfoRecordPublisher(configProps) || !AwsV2Util.isLazyEfoRegistrationType(configProps)) {
            return;
        }
        StreamConsumerRegistrarUtil.registerStreamConsumers(configProps, streams);
    }

    public static void deregisterStreamConsumers(Properties configProps, List<String> streams) {
        if (StreamConsumerRegistrarUtil.isConsumerDeregistrationRequired(configProps)) {
            try (StreamConsumerRegistrar registrar = StreamConsumerRegistrarUtil.createStreamConsumerRegistrar(configProps, streams);){
                StreamConsumerRegistrarUtil.deregisterStreamConsumers(registrar, configProps, streams);
            }
        }
    }

    private static boolean isConsumerDeregistrationRequired(Properties configProps) {
        return AwsV2Util.isUsingEfoRecordPublisher(configProps) && AwsV2Util.isLazyEfoRegistrationType(configProps);
    }

    private static void registerStreamConsumers(Properties configProps, List<String> streams) {
        try (StreamConsumerRegistrar registrar = StreamConsumerRegistrarUtil.createStreamConsumerRegistrar(configProps, streams);){
            StreamConsumerRegistrarUtil.registerStreamConsumers(registrar, configProps, streams);
        }
    }

    @VisibleForTesting
    static void registerStreamConsumers(StreamConsumerRegistrar registrar, Properties configProps, List<String> streams) {
        String streamConsumerName = configProps.getProperty("flink.stream.efo.consumername");
        for (String stream : streams) {
            try {
                String streamConsumerArn = registrar.registerStreamConsumer(stream, streamConsumerName);
                configProps.setProperty(ConsumerConfigConstants.efoConsumerArn(stream), streamConsumerArn);
            }
            catch (Exception ex) {
                if (ex instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                throw new FlinkKinesisStreamConsumerRegistrarException("Error registering stream: " + stream, ex);
            }
        }
    }

    @VisibleForTesting
    static void deregisterStreamConsumers(StreamConsumerRegistrar registrar, Properties configProps, List<String> streams) {
        if (StreamConsumerRegistrarUtil.isConsumerDeregistrationRequired(configProps)) {
            for (String stream : streams) {
                try {
                    registrar.deregisterStreamConsumer(stream);
                }
                catch (Exception ex) {
                    throw new FlinkKinesisStreamConsumerRegistrarException("Error deregistering stream: " + stream, ex);
                }
            }
        }
    }

    private static StreamConsumerRegistrar createStreamConsumerRegistrar(Properties configProps, List<String> streams) {
        FullJitterBackoff backoff = new FullJitterBackoff();
        FanOutRecordPublisherConfiguration configuration = new FanOutRecordPublisherConfiguration(configProps, streams);
        KinesisProxySyncV2Interface kinesis = KinesisProxyV2Factory.createKinesisProxySyncV2(configProps);
        return new StreamConsumerRegistrar(kinesis, configuration, backoff);
    }

    @Internal
    public static class FlinkKinesisStreamConsumerRegistrarException
    extends FlinkKinesisException {
        public FlinkKinesisStreamConsumerRegistrarException(String message, Throwable cause) {
            super(message, cause);
        }
    }
}

