/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.kinesis.connectors.flink.proxy;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.kinesis.connectors.flink.internals.publisher.fanout.FanOutRecordPublisherConfiguration;
import software.amazon.kinesis.connectors.flink.proxy.FullJitterBackoff;
import software.amazon.kinesis.connectors.flink.proxy.KinesisProxyV2Interface;
import software.amazon.kinesis.connectors.flink.util.AwsV2Util;
import software.amazon.kinesis.shaded.software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.kinesis.shaded.software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.DeregisterStreamConsumerRequest;
import software.amazon.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.DeregisterStreamConsumerResponse;
import software.amazon.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerRequest;
import software.amazon.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerResponse;
import software.amazon.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
import software.amazon.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
import software.amazon.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerRequest;
import software.amazon.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerResponse;
import software.amazon.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import software.amazon.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;

@Internal
public class KinesisProxyV2
implements KinesisProxyV2Interface {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisProxyV2.class);
    private final KinesisAsyncClient kinesisAsyncClient;
    private final SdkAsyncHttpClient httpClient;
    private final FanOutRecordPublisherConfiguration fanOutRecordPublisherConfiguration;
    private final FullJitterBackoff backoff;

    public KinesisProxyV2(KinesisAsyncClient kinesisAsyncClient, SdkAsyncHttpClient httpClient, FanOutRecordPublisherConfiguration fanOutRecordPublisherConfiguration, FullJitterBackoff backoff) {
        this.kinesisAsyncClient = (KinesisAsyncClient)Preconditions.checkNotNull((Object)kinesisAsyncClient);
        this.httpClient = httpClient;
        this.fanOutRecordPublisherConfiguration = fanOutRecordPublisherConfiguration;
        this.backoff = backoff;
    }

    @Override
    public CompletableFuture<Void> subscribeToShard(SubscribeToShardRequest request, SubscribeToShardResponseHandler responseHandler) {
        return this.kinesisAsyncClient.subscribeToShard(request, responseHandler);
    }

    @Override
    public void close() {
        this.kinesisAsyncClient.close();
        this.httpClient.close();
    }

    @Override
    public DescribeStreamSummaryResponse describeStreamSummary(String stream) throws InterruptedException, ExecutionException {
        DescribeStreamSummaryRequest describeStreamRequest = (DescribeStreamSummaryRequest)DescribeStreamSummaryRequest.builder().streamName(stream).build();
        return this.invokeWithRetryAndBackoff(() -> this.kinesisAsyncClient.describeStreamSummary(describeStreamRequest).get(), this.fanOutRecordPublisherConfiguration.getDescribeStreamBaseBackoffMillis(), this.fanOutRecordPublisherConfiguration.getDescribeStreamMaxBackoffMillis(), this.fanOutRecordPublisherConfiguration.getDescribeStreamExpConstant(), this.fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries());
    }

    @Override
    public DescribeStreamConsumerResponse describeStreamConsumer(String streamArn, String consumerName) throws InterruptedException, ExecutionException {
        DescribeStreamConsumerRequest describeStreamConsumerRequest = (DescribeStreamConsumerRequest)DescribeStreamConsumerRequest.builder().streamARN(streamArn).consumerName(consumerName).build();
        return this.describeStreamConsumer(describeStreamConsumerRequest);
    }

    @Override
    public DescribeStreamConsumerResponse describeStreamConsumer(String streamConsumerArn) throws InterruptedException, ExecutionException {
        DescribeStreamConsumerRequest describeStreamConsumerRequest = (DescribeStreamConsumerRequest)DescribeStreamConsumerRequest.builder().consumerARN(streamConsumerArn).build();
        return this.describeStreamConsumer(describeStreamConsumerRequest);
    }

    private DescribeStreamConsumerResponse describeStreamConsumer(DescribeStreamConsumerRequest request) throws InterruptedException, ExecutionException {
        return this.invokeWithRetryAndBackoff(() -> this.kinesisAsyncClient.describeStreamConsumer(request).get(), this.fanOutRecordPublisherConfiguration.getDescribeStreamConsumerBaseBackoffMillis(), this.fanOutRecordPublisherConfiguration.getDescribeStreamConsumerMaxBackoffMillis(), this.fanOutRecordPublisherConfiguration.getDescribeStreamConsumerExpConstant(), this.fanOutRecordPublisherConfiguration.getDescribeStreamConsumerMaxRetries());
    }

    @Override
    public RegisterStreamConsumerResponse registerStreamConsumer(String streamArn, String consumerName) throws InterruptedException, ExecutionException {
        RegisterStreamConsumerRequest registerStreamConsumerRequest = (RegisterStreamConsumerRequest)RegisterStreamConsumerRequest.builder().streamARN(streamArn).consumerName(consumerName).build();
        return this.invokeWithRetryAndBackoff(() -> this.kinesisAsyncClient.registerStreamConsumer(registerStreamConsumerRequest).get(), this.fanOutRecordPublisherConfiguration.getRegisterStreamBaseBackoffMillis(), this.fanOutRecordPublisherConfiguration.getRegisterStreamMaxBackoffMillis(), this.fanOutRecordPublisherConfiguration.getRegisterStreamExpConstant(), this.fanOutRecordPublisherConfiguration.getRegisterStreamMaxRetries());
    }

    @Override
    public DeregisterStreamConsumerResponse deregisterStreamConsumer(String consumerArn) throws InterruptedException, ExecutionException {
        DeregisterStreamConsumerRequest deregisterStreamConsumerRequest = (DeregisterStreamConsumerRequest)DeregisterStreamConsumerRequest.builder().consumerARN(consumerArn).build();
        return this.invokeWithRetryAndBackoff(() -> this.kinesisAsyncClient.deregisterStreamConsumer(deregisterStreamConsumerRequest).get(), this.fanOutRecordPublisherConfiguration.getDeregisterStreamBaseBackoffMillis(), this.fanOutRecordPublisherConfiguration.getDeregisterStreamMaxBackoffMillis(), this.fanOutRecordPublisherConfiguration.getDeregisterStreamExpConstant(), this.fanOutRecordPublisherConfiguration.getDeregisterStreamMaxRetries());
    }

    private <T> T invokeWithRetryAndBackoff(ResponseSupplier<T> responseSupplier, long jitterBase, long jitterMax, double jitterExponent, int maximumNumberOfRetries) throws InterruptedException, ExecutionException {
        T response = null;
        int attempt = 0;
        while (attempt < maximumNumberOfRetries && response == null) {
            try {
                response = responseSupplier.get();
            }
            catch (Exception ex) {
                if (AwsV2Util.isRecoverableException(ex)) {
                    long backoffMillis = this.backoff.calculateFullJitterBackoff(jitterBase, jitterMax, jitterExponent, ++attempt);
                    LOG.warn("Encountered recoverable error: {}. Backing off for {} millis.", new Object[]{ex.getClass().getSimpleName(), backoffMillis, ex});
                    this.backoff.sleep(backoffMillis);
                    continue;
                }
                throw ex;
            }
        }
        if (response == null) {
            throw new RuntimeException("Retries exceeded - all " + maximumNumberOfRetries + " retry attempts failed.");
        }
        return response;
    }

    private static interface ResponseSupplier<T> {
        public T get() throws ExecutionException, InterruptedException;
    }
}

