package org.apache.druid.testing.utils;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.model.AddTagsToStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.ScalingType;
import com.amazonaws.services.kinesis.model.StreamDescription;
import com.amazonaws.services.kinesis.model.StreamStatus;
import com.amazonaws.services.kinesis.model.UpdateShardCountRequest;
import com.amazonaws.util.AwsHostNameUtils;
import com.google.common.collect.Iterables;
import java.io.FileInputStream;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.druid.java.util.common.ISE;

/* loaded from: input_file:org/apache/druid/testing/utils/KinesisAdminClient.class */
public class KinesisAdminClient implements StreamAdminClient {
    private final AmazonKinesis amazonKinesis;

    public KinesisAdminClient(String str) throws Exception {
        String property = System.getProperty("override.config.path");
        Properties properties = new Properties();
        properties.load(new FileInputStream(property));
        this.amazonKinesis = (AmazonKinesis) AmazonKinesisClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(properties.getProperty("druid_kinesis_accessKey"), properties.getProperty("druid_kinesis_secretKey")))).withClientConfiguration(new ClientConfiguration()).withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(str, AwsHostNameUtils.parseRegion(str, (String) null))).build();
    }

    @Override // org.apache.druid.testing.utils.StreamAdminClient
    public void createStream(String str, int i, Map<String, String> map) {
        if (this.amazonKinesis.createStream(str, Integer.valueOf(i)).getSdkHttpMetadata().getHttpStatusCode() != 200) {
            throw new ISE("Cannot create stream for integration test", new Object[0]);
        }
        if (map == null || map.isEmpty()) {
            return;
        }
        AddTagsToStreamRequest addTagsToStreamRequest = new AddTagsToStreamRequest();
        addTagsToStreamRequest.setStreamName(str);
        addTagsToStreamRequest.setTags(map);
        if (this.amazonKinesis.addTagsToStream(addTagsToStreamRequest).getSdkHttpMetadata().getHttpStatusCode() != 200) {
            throw new ISE("Cannot tag stream for integration test", new Object[0]);
        }
    }

    @Override // org.apache.druid.testing.utils.StreamAdminClient
    public void deleteStream(String str) {
        if (this.amazonKinesis.deleteStream(str).getSdkHttpMetadata().getHttpStatusCode() != 200) {
            throw new ISE("Cannot delete stream for integration test", new Object[0]);
        }
    }

    @Override // org.apache.druid.testing.utils.StreamAdminClient
    public void updatePartitionCount(String str, int i, boolean z) {
        int streamPartitionCount = getStreamPartitionCount(str);
        if (streamPartitionCount == i) {
            return;
        }
        UpdateShardCountRequest updateShardCountRequest = new UpdateShardCountRequest();
        updateShardCountRequest.setStreamName(str);
        updateShardCountRequest.setTargetShardCount(Integer.valueOf(i));
        updateShardCountRequest.setScalingType(ScalingType.UNIFORM_SCALING);
        if (this.amazonKinesis.updateShardCount(updateShardCountRequest).getSdkHttpMetadata().getHttpStatusCode() != 200) {
            throw new ISE("Cannot update stream's shard count for integration test", new Object[0]);
        }
        if (z) {
            ITRetryUtil.retryUntil(() -> {
                return Boolean.valueOf(verifyStreamStatus(str, StreamStatus.ACTIVE, StreamStatus.UPDATING) && getStreamPartitionCount(str) != streamPartitionCount);
            }, true, 300L, 100, "Kinesis stream resharding to start (or finished)");
        }
    }

    @Override // org.apache.druid.testing.utils.StreamAdminClient
    public boolean isStreamActive(String str) {
        return verifyStreamStatus(str, StreamStatus.ACTIVE);
    }

    @Override // org.apache.druid.testing.utils.StreamAdminClient
    public int getStreamPartitionCount(String str) {
        HashSet hashSet = new HashSet();
        DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
        describeStreamRequest.setStreamName(str);
        while (describeStreamRequest != null) {
            StreamDescription streamDescription = this.amazonKinesis.describeStream(describeStreamRequest).getStreamDescription();
            List list = (List) streamDescription.getShards().stream().map((v0) -> {
                return v0.getShardId();
            }).collect(Collectors.toList());
            hashSet.addAll(list);
            if (streamDescription.isHasMoreShards().booleanValue()) {
                describeStreamRequest.setExclusiveStartShardId((String) Iterables.getLast(list));
            } else {
                describeStreamRequest = null;
            }
        }
        return hashSet.size();
    }

    @Override // org.apache.druid.testing.utils.StreamAdminClient
    public boolean verfiyPartitionCountUpdated(String str, int i, int i2) {
        return getStreamPartitionCount(str) == i + i2;
    }

    private boolean verifyStreamStatus(String str, StreamStatus... streamStatusArr) {
        Stream map = Arrays.stream(streamStatusArr).map((v0) -> {
            return v0.toString();
        });
        String streamStatus = getStreamStatus(str);
        Objects.requireNonNull(streamStatus);
        return map.anyMatch((v1) -> {
            return r1.equals(v1);
        });
    }

    private String getStreamStatus(String str) {
        return getStreamDescription(str).getStreamStatus();
    }

    private StreamDescription getStreamDescription(String str) {
        DescribeStreamResult describeStream = this.amazonKinesis.describeStream(str);
        if (describeStream.getSdkHttpMetadata().getHttpStatusCode() != 200) {
            throw new ISE("Cannot get stream description for integration test", new Object[0]);
        }
        return describeStream.getStreamDescription();
    }
}
