package org.springframework.cloud.stream.binder.kinesis.provisioning;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisConsumerProperties;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisProducerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.util.Assert;
import software.amazon.awssdk.core.retry.backoff.FixedDelayBackoffStrategy;
import software.amazon.awssdk.core.waiters.WaiterResponse;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.ScalingType;
import software.amazon.awssdk.services.kinesis.model.Shard;

/* loaded from: input_file:org/springframework/cloud/stream/binder/kinesis/provisioning/KinesisStreamProvisioner.class */
public class KinesisStreamProvisioner implements ProvisioningProvider<ExtendedConsumerProperties<KinesisConsumerProperties>, ExtendedProducerProperties<KinesisProducerProperties>> {
    private static final Log logger = LogFactory.getLog(KinesisStreamProvisioner.class);
    private final KinesisAsyncClient amazonKinesis;
    private final KinesisBinderConfigurationProperties configurationProperties;

    public KinesisStreamProvisioner(KinesisAsyncClient kinesisAsyncClient, KinesisBinderConfigurationProperties kinesisBinderConfigurationProperties) {
        Assert.notNull(kinesisAsyncClient, "'amazonKinesis' must not be null");
        Assert.notNull(kinesisBinderConfigurationProperties, "'kinesisBinderConfigurationProperties' must not be null");
        this.amazonKinesis = kinesisAsyncClient;
        this.configurationProperties = kinesisBinderConfigurationProperties;
    }

    public ProducerDestination provisionProducerDestination(String str, ExtendedProducerProperties<KinesisProducerProperties> extendedProducerProperties) throws ProvisioningException {
        if (logger.isInfoEnabled()) {
            logger.info("Using Kinesis stream for outbound: " + str);
        }
        ((KinesisProducerProperties) extendedProducerProperties.getExtension()).setEmbedHeaders(extendedProducerProperties.getHeaderMode() == null || HeaderMode.embeddedHeaders.equals(extendedProducerProperties.getHeaderMode()));
        extendedProducerProperties.setHeaderMode(HeaderMode.none);
        if (extendedProducerProperties.getHeaderMode() == null) {
            extendedProducerProperties.setHeaderMode(HeaderMode.embeddedHeaders);
        }
        return new KinesisProducerDestination(str, createOrUpdate(str, extendedProducerProperties.getPartitionCount()));
    }

    public ConsumerDestination provisionConsumerDestination(String str, String str2, ExtendedConsumerProperties<KinesisConsumerProperties> extendedConsumerProperties) throws ProvisioningException {
        ((KinesisConsumerProperties) extendedConsumerProperties.getExtension()).setEmbedHeaders(extendedConsumerProperties.getHeaderMode() == null || HeaderMode.embeddedHeaders.equals(extendedConsumerProperties.getHeaderMode()));
        extendedConsumerProperties.setHeaderMode(HeaderMode.none);
        if (logger.isInfoEnabled()) {
            logger.info("Using Kinesis stream for inbound: " + str);
        }
        return new KinesisConsumerDestination(str, createOrUpdate(str, extendedConsumerProperties.getInstanceCount() * extendedConsumerProperties.getConcurrency()));
    }

    private List<Shard> createOrUpdate(String str, int i) {
        List<Shard> createStream;
        try {
            createStream = getShardList(str).join();
        } catch (CompletionException e) {
            Throwable cause = e.getCause();
            if (!(cause instanceof ResourceNotFoundException)) {
                throw new ProvisioningException("Cannot retrieve shards information for stream [" + str + "].", cause);
            }
            if (!this.configurationProperties.isAutoCreateStream()) {
                throw new ProvisioningException("The stream [" + str + "] was not found and auto creation is disabled.", cause);
            }
            if (logger.isInfoEnabled()) {
                logger.info("Stream '" + str + "' not found. Create one...");
            }
            createStream = createStream(str, i);
        }
        int max = Math.max(this.configurationProperties.getMinShardCount(), i);
        return (createStream.size() >= max || !this.configurationProperties.isAutoAddShards()) ? createStream : updateShardCount(str, createStream.size(), max);
    }

    private CompletableFuture<List<Shard>> getShardList(String str) {
        return this.amazonKinesis.describeStreamSummary(builder -> {
            builder.streamName(str);
        }).thenCompose(describeStreamSummaryResponse -> {
            return this.amazonKinesis.listShards(builder2 -> {
                builder2.streamName(str);
            });
        }).thenApply((v0) -> {
            return v0.shards();
        });
    }

    private List<Shard> createStream(String str, int i) {
        try {
            return (List) this.amazonKinesis.createStream(builder -> {
                builder.streamName(str).shardCount(Integer.valueOf(Math.max(this.configurationProperties.getMinShardCount(), i)));
            }).thenCompose(createStreamResponse -> {
                return waitForStreamToBecomeActive(str);
            }).thenCompose(waiterResponse -> {
                return getShardList(str);
            }).join();
        } catch (Exception e) {
            throw new ProvisioningException("Cannot create stream [" + str + "].", e);
        }
    }

    private CompletableFuture<WaiterResponse<DescribeStreamResponse>> waitForStreamToBecomeActive(String str) {
        return this.amazonKinesis.waiter().waitUntilStreamExists(builder -> {
            builder.streamName(str);
        }, builder2 -> {
            builder2.maxAttempts(Integer.valueOf(this.configurationProperties.getDescribeStreamRetries())).backoffStrategy(FixedDelayBackoffStrategy.create(Duration.ofMillis(this.configurationProperties.getDescribeStreamBackoff())));
        });
    }

    private List<Shard> updateShardCount(String str, int i, int i2) {
        if (logger.isInfoEnabled()) {
            logger.info("Stream [" + str + "] has [" + i + "] shards compared to a target configuration of [" + i2 + "], creating shards...");
        }
        return (List) this.amazonKinesis.updateShardCount(builder -> {
            builder.streamName(str).targetShardCount(Integer.valueOf(i2)).scalingType(ScalingType.UNIFORM_SCALING);
        }).thenCompose(updateShardCountResponse -> {
            return waitForStreamToBecomeActive(str);
        }).thenCompose(waiterResponse -> {
            return getShardList(str);
        }).join();
    }
}
