/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kinesis.source;

import java.util.Properties;
import java.util.function.Supplier;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.kinesis.source.KinesisStreamsSourceBuilder;
import org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner;
import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumerator;
import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorState;
import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorStateSerializer;
import org.apache.flink.connector.kinesis.source.proxy.KinesisStreamProxy;
import org.apache.flink.connector.kinesis.source.reader.KinesisStreamsRecordEmitter;
import org.apache.flink.connector.kinesis.source.reader.KinesisStreamsSourceReader;
import org.apache.flink.connector.kinesis.source.reader.PollingKinesisShardSplitReader;
import org.apache.flink.connector.kinesis.source.serialization.KinesisDeserializationSchema;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSClientUtil;
import org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.SdkHttpClient;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.KinesisClient;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.Record;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.AttributeMap;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.UserCodeClassLoader;

@Experimental
public class KinesisStreamsSource<T>
implements Source<T, KinesisShardSplit, KinesisStreamsSourceEnumeratorState> {
    private final String streamArn;
    private final Configuration sourceConfig;
    private final KinesisDeserializationSchema<T> deserializationSchema;
    private final KinesisShardAssigner kinesisShardAssigner;

    KinesisStreamsSource(String streamArn, Configuration sourceConfig, KinesisDeserializationSchema<T> deserializationSchema, KinesisShardAssigner kinesisShardAssigner) {
        Preconditions.checkNotNull((Object)streamArn);
        Preconditions.checkArgument((!streamArn.isEmpty() ? 1 : 0) != 0, (Object)"stream ARN cannot be empty string");
        Preconditions.checkNotNull((Object)sourceConfig);
        Preconditions.checkNotNull(deserializationSchema);
        Preconditions.checkNotNull((Object)kinesisShardAssigner);
        this.streamArn = streamArn;
        this.sourceConfig = sourceConfig;
        this.deserializationSchema = deserializationSchema;
        this.kinesisShardAssigner = kinesisShardAssigner;
    }

    public static <T> KinesisStreamsSourceBuilder<T> builder() {
        return new KinesisStreamsSourceBuilder();
    }

    public Boundedness getBoundedness() {
        return Boundedness.CONTINUOUS_UNBOUNDED;
    }

    public SourceReader<T, KinesisShardSplit> createReader(SourceReaderContext readerContext) throws Exception {
        this.setUpDeserializationSchema(readerContext);
        FutureCompletingBlockingQueue elementsQueue = new FutureCompletingBlockingQueue();
        Supplier<PollingKinesisShardSplitReader> splitReaderSupplier = () -> new PollingKinesisShardSplitReader(this.createKinesisStreamProxy(this.sourceConfig));
        KinesisStreamsRecordEmitter<T> recordEmitter = new KinesisStreamsRecordEmitter<T>(this.deserializationSchema);
        return new KinesisStreamsSourceReader((FutureCompletingBlockingQueue<RecordsWithSplitIds<Record>>)elementsQueue, (SingleThreadFetcherManager<Record, KinesisShardSplit>)new SingleThreadFetcherManager(elementsQueue, splitReaderSupplier::get), recordEmitter, this.sourceConfig, readerContext);
    }

    public SplitEnumerator<KinesisShardSplit, KinesisStreamsSourceEnumeratorState> createEnumerator(SplitEnumeratorContext<KinesisShardSplit> enumContext) throws Exception {
        return this.restoreEnumerator(enumContext, null);
    }

    public SplitEnumerator<KinesisShardSplit, KinesisStreamsSourceEnumeratorState> restoreEnumerator(SplitEnumeratorContext<KinesisShardSplit> enumContext, KinesisStreamsSourceEnumeratorState checkpoint) throws Exception {
        return new KinesisStreamsSourceEnumerator(enumContext, this.streamArn, this.sourceConfig, this.createKinesisStreamProxy(this.sourceConfig), this.kinesisShardAssigner, checkpoint);
    }

    public SimpleVersionedSerializer<KinesisShardSplit> getSplitSerializer() {
        return new KinesisShardSplitSerializer();
    }

    public SimpleVersionedSerializer<KinesisStreamsSourceEnumeratorState> getEnumeratorCheckpointSerializer() {
        return new KinesisStreamsSourceEnumeratorStateSerializer(new KinesisShardSplitSerializer());
    }

    private KinesisStreamProxy createKinesisStreamProxy(Configuration consumerConfig) {
        SdkHttpClient httpClient = AWSGeneralUtil.createSyncHttpClient(AttributeMap.builder().build(), ApacheHttpClient.builder());
        Properties kinesisClientProperties = new Properties();
        consumerConfig.addAllToProperties(kinesisClientProperties);
        AWSGeneralUtil.validateAwsCredentials(kinesisClientProperties);
        KinesisClient kinesisClient = (KinesisClient)AWSClientUtil.createAwsSyncClient(kinesisClientProperties, httpClient, KinesisClient.builder(), "Apache Flink %s (%s) Kinesis Connector", "aws.kinesis.client.user-agent-prefix");
        return new KinesisStreamProxy(kinesisClient, httpClient);
    }

    private void setUpDeserializationSchema(final SourceReaderContext sourceReaderContext) throws Exception {
        this.deserializationSchema.open(new DeserializationSchema.InitializationContext(){

            public MetricGroup getMetricGroup() {
                return sourceReaderContext.metricGroup().addGroup("deserializer");
            }

            public UserCodeClassLoader getUserCodeClassLoader() {
                return sourceReaderContext.getUserCodeClassLoader();
            }
        });
    }
}

