/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kinesis.source.proxy;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
import org.apache.flink.connector.kinesis.source.split.StartingPosition;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.SdkHttpClient;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.KinesisClient;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.Shard;

@Internal
public class KinesisStreamProxy
implements StreamProxy {
    private final KinesisClient kinesisClient;
    private final SdkHttpClient httpClient;
    private final Map<String, String> shardIdToIteratorStore;

    public KinesisStreamProxy(KinesisClient kinesisClient, SdkHttpClient httpClient) {
        this.kinesisClient = kinesisClient;
        this.httpClient = httpClient;
        this.shardIdToIteratorStore = new ConcurrentHashMap<String, String>();
    }

    @Override
    public List<Shard> listShards(String streamArn, @Nullable String lastSeenShardId) {
        ListShardsResponse listShardsResponse;
        ArrayList<Shard> shards = new ArrayList<Shard>();
        String nextToken = null;
        do {
            listShardsResponse = this.kinesisClient.listShards((ListShardsRequest)ListShardsRequest.builder().streamARN(streamArn).exclusiveStartShardId(nextToken == null ? lastSeenShardId : null).nextToken(nextToken).build());
            shards.addAll(listShardsResponse.shards());
        } while ((nextToken = listShardsResponse.nextToken()) != null);
        return shards;
    }

    @Override
    public GetRecordsResponse getRecords(String streamArn, String shardId, StartingPosition startingPosition) {
        String shardIterator = this.shardIdToIteratorStore.computeIfAbsent(shardId, s -> this.getShardIterator(streamArn, (String)s, startingPosition));
        try {
            GetRecordsResponse getRecordsResponse = this.getRecords(streamArn, shardIterator);
            if (getRecordsResponse.nextShardIterator() != null) {
                this.shardIdToIteratorStore.put(shardId, getRecordsResponse.nextShardIterator());
            }
            return getRecordsResponse;
        }
        catch (ExpiredIteratorException e) {
            shardIterator = this.getShardIterator(streamArn, shardId, startingPosition);
            GetRecordsResponse getRecordsResponse = this.getRecords(streamArn, shardIterator);
            if (getRecordsResponse.nextShardIterator() != null) {
                this.shardIdToIteratorStore.put(shardId, getRecordsResponse.nextShardIterator());
            }
            return getRecordsResponse;
        }
    }

    private String getShardIterator(String streamArn, String shardId, StartingPosition startingPosition) {
        GetShardIteratorRequest.Builder requestBuilder = GetShardIteratorRequest.builder().streamARN(streamArn).shardId(shardId).shardIteratorType(startingPosition.getShardIteratorType());
        switch (startingPosition.getShardIteratorType()) {
            case TRIM_HORIZON: 
            case LATEST: {
                break;
            }
            case AT_TIMESTAMP: {
                if (startingPosition.getStartingMarker() instanceof Instant) {
                    requestBuilder = requestBuilder.timestamp((Instant)startingPosition.getStartingMarker());
                    break;
                }
                throw new IllegalArgumentException("Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_TIMESTAMP. Must be a Date object.");
            }
            case AT_SEQUENCE_NUMBER: 
            case AFTER_SEQUENCE_NUMBER: {
                if (startingPosition.getStartingMarker() instanceof String) {
                    requestBuilder = requestBuilder.startingSequenceNumber((String)startingPosition.getStartingMarker());
                    break;
                }
                throw new IllegalArgumentException("Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER. Must be a String.");
            }
        }
        return this.kinesisClient.getShardIterator((GetShardIteratorRequest)requestBuilder.build()).shardIterator();
    }

    private GetRecordsResponse getRecords(String streamArn, String shardIterator) {
        return this.kinesisClient.getRecords((GetRecordsRequest)GetRecordsRequest.builder().streamARN(streamArn).shardIterator(shardIterator).build());
    }

    @Override
    public void close() throws IOException {
        this.kinesisClient.close();
        this.httpClient.close();
    }
}

