/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.aws.inbound.kinesis;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.model.Record;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import org.springframework.core.AttributeAccessor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.core.task.support.ExecutorServiceAdapter;
import org.springframework.integration.aws.inbound.kinesis.CheckpointMode;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mapping.InboundMessageMapper;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.integration.support.management.IntegrationManagedResource;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

@ManagedResource
@IntegrationManagedResource
public class KclMessageDrivenChannelAdapter
extends MessageProducerSupport {
    private static final ThreadLocal<AttributeAccessor> attributesHolder = new ThreadLocal();
    private final String stream;
    private final AmazonKinesis kinesisClient;
    private final AWSCredentialsProvider kinesisProxyCredentialsProvider;
    private final AmazonCloudWatch cloudWatchClient;
    private final AmazonDynamoDB dynamoDBClient;
    private TaskExecutor executor = new SimpleAsyncTaskExecutor();
    private String consumerGroup = "SpringIntegration";
    private InboundMessageMapper<byte[]> embeddedHeadersMapper;
    private Worker scheduler;
    private InitialPositionInStream streamInitialSequence = InitialPositionInStream.LATEST;
    private int idleBetweenPolls;
    private int consumerBackoff;
    private long checkpointsInterval = 5000L;
    private CheckpointMode checkpointMode = CheckpointMode.batch;
    private String workerId = UUID.randomUUID().toString();

    public KclMessageDrivenChannelAdapter(String streams) {
        this(streams, AmazonKinesisClientBuilder.defaultClient(), AmazonCloudWatchClientBuilder.defaultClient(), AmazonDynamoDBClientBuilder.defaultClient(), (AWSCredentialsProvider)new DefaultAWSCredentialsProviderChain());
    }

    public KclMessageDrivenChannelAdapter(String streams, Regions region) {
        this(streams, (AmazonKinesis)((AmazonKinesisClientBuilder)AmazonKinesisClient.builder().withRegion(region)).build(), (AmazonCloudWatch)((AmazonCloudWatchClientBuilder)AmazonCloudWatchClient.builder().withRegion(region)).build(), (AmazonDynamoDB)((AmazonDynamoDBClientBuilder)AmazonDynamoDBClient.builder().withRegion(region)).build(), (AWSCredentialsProvider)new DefaultAWSCredentialsProviderChain());
    }

    public KclMessageDrivenChannelAdapter(String stream, AmazonKinesis kinesisClient, AmazonCloudWatch cloudWatchClient, AmazonDynamoDB dynamoDBClient, AWSCredentialsProvider kinesisProxyCredentialsProvider) {
        Assert.notNull((Object)stream, (String)"'stream' must not be null.");
        Assert.notNull((Object)kinesisClient, (String)"'kinesisClient' must not be null.");
        Assert.notNull((Object)cloudWatchClient, (String)"'cloudWatchClient' must not be null.");
        Assert.notNull((Object)dynamoDBClient, (String)"'dynamoDBClient' must not be null.");
        Assert.notNull((Object)kinesisProxyCredentialsProvider, (String)"'kinesisProxyCredentialsProvider' must not be null.");
        this.stream = stream;
        this.kinesisClient = kinesisClient;
        this.cloudWatchClient = cloudWatchClient;
        this.dynamoDBClient = dynamoDBClient;
        this.kinesisProxyCredentialsProvider = kinesisProxyCredentialsProvider;
    }

    public void setExecutor(TaskExecutor executor) {
        Assert.notNull((Object)executor, (String)"'executor' must not be null.");
        this.executor = executor;
    }

    public void setConsumerGroup(String consumerGroup) {
        Assert.hasText((String)consumerGroup, (String)"'consumerGroup' must not be empty");
        this.consumerGroup = consumerGroup;
    }

    public void setEmbeddedHeadersMapper(InboundMessageMapper<byte[]> embeddedHeadersMapper) {
        this.embeddedHeadersMapper = embeddedHeadersMapper;
    }

    public void setStreamInitialSequence(InitialPositionInStream streamInitialSequence) {
        Assert.notNull((Object)streamInitialSequence, (String)"'streamInitialSequence' must not be null");
        this.streamInitialSequence = streamInitialSequence;
    }

    public void setIdleBetweenPolls(int idleBetweenPolls) {
        this.idleBetweenPolls = Math.max(250, idleBetweenPolls);
    }

    public void setConsumerBackoff(int consumerBackoff) {
        this.consumerBackoff = Math.max(1000, consumerBackoff);
    }

    public void setCheckpointsInterval(long checkpointsInterval) {
        this.checkpointsInterval = checkpointsInterval;
    }

    public void setCheckpointMode(CheckpointMode checkpointMode) {
        Assert.notNull((Object)((Object)checkpointMode), (String)"'checkpointMode' must not be null");
        this.checkpointMode = checkpointMode;
    }

    public void setWorkerId(String workerId) {
        Assert.hasText((String)workerId, (String)"'workerId' must not be null or empty");
        this.workerId = workerId;
    }

    protected void onInit() {
        super.onInit();
        KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(this.consumerGroup, this.stream, null, this.streamInitialSequence, this.kinesisProxyCredentialsProvider, null, null, 10000L, this.workerId, 10000, (long)this.idleBetweenPolls, false, 10000L, 60000L, true, new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(), (long)this.consumerBackoff, 10000L, 10000, true, null, 5000L);
        this.scheduler = new Worker.Builder().kinesisClient(this.kinesisClient).dynamoDBClient(this.dynamoDBClient).cloudWatchClient(this.cloudWatchClient).recordProcessorFactory((IRecordProcessorFactory)new RecordProcessorFactory()).execService((ExecutorService)new ExecutorServiceAdapter(this.executor)).config(config).build();
    }

    protected void doStart() {
        super.doStart();
        this.executor.execute((Runnable)this.scheduler);
    }

    protected void doStop() {
        super.doStop();
        this.scheduler.shutdown();
    }

    protected AttributeAccessor getErrorMessageAttributes(Message<?> message) {
        AttributeAccessor attributes = attributesHolder.get();
        if (attributes == null) {
            return super.getErrorMessageAttributes(message);
        }
        return attributes;
    }

    public String toString() {
        return "KclMessageDrivenChannelAdapter{consumerGroup='" + this.consumerGroup + '\'' + ", stream='" + this.stream + "'}";
    }

    private class RecordProcessor
    implements IRecordProcessor {
        private String shardId;
        private long nextCheckpointTimeInMillis;

        private RecordProcessor() {
        }

        public void initialize(String shardId) {
            this.shardId = shardId;
            if (KclMessageDrivenChannelAdapter.this.logger.isInfoEnabled()) {
                KclMessageDrivenChannelAdapter.this.logger.info((Object)("Initializing record processor for shard: " + this.shardId));
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
            if (KclMessageDrivenChannelAdapter.this.logger.isDebugEnabled()) {
                KclMessageDrivenChannelAdapter.this.logger.debug((Object)("Processing " + records.size() + " records from " + this.shardId));
            }
            for (Record record : records) {
                try {
                    this.processSingleRecord(record, checkpointer);
                }
                catch (Throwable t) {
                    KclMessageDrivenChannelAdapter.this.logger.warn((Object)("Caught throwable while processing record " + record), t);
                }
                finally {
                    attributesHolder.remove();
                    if (System.currentTimeMillis() <= this.nextCheckpointTimeInMillis) continue;
                    this.checkpoint(checkpointer);
                    this.nextCheckpointTimeInMillis = System.currentTimeMillis() + KclMessageDrivenChannelAdapter.this.checkpointsInterval;
                }
            }
            if (CheckpointMode.batch.equals((Object)KclMessageDrivenChannelAdapter.this.checkpointMode)) {
                this.checkpoint(checkpointer);
            } else if (CheckpointMode.periodic.equals((Object)KclMessageDrivenChannelAdapter.this.checkpointMode) && System.currentTimeMillis() > this.nextCheckpointTimeInMillis) {
                this.checkpoint(checkpointer);
                this.nextCheckpointTimeInMillis = System.currentTimeMillis() + KclMessageDrivenChannelAdapter.this.checkpointsInterval;
            }
        }

        private void processSingleRecord(Record record, IRecordProcessorCheckpointer checkpointer) {
            this.performSend(this.prepareMessageForRecord(record, checkpointer), record);
            if (CheckpointMode.record.equals((Object)KclMessageDrivenChannelAdapter.this.checkpointMode)) {
                this.checkpoint(checkpointer);
            }
        }

        private AbstractIntegrationMessageBuilder<Object> prepareMessageForRecord(Record record, IRecordProcessorCheckpointer checkpointer) {
            Object payload = record.getData().array();
            Message messageToUse = null;
            if (KclMessageDrivenChannelAdapter.this.embeddedHeadersMapper != null) {
                try {
                    messageToUse = KclMessageDrivenChannelAdapter.this.embeddedHeadersMapper.toMessage(payload);
                    if (messageToUse == null) {
                        throw new IllegalStateException("The 'embeddedHeadersMapper' returned null for payload: " + Arrays.toString(payload));
                    }
                    payload = messageToUse.getPayload();
                }
                catch (Exception e) {
                    KclMessageDrivenChannelAdapter.this.logger.warn((Object)"Could not parse embedded headers. Remain payload untouched.", (Throwable)e);
                }
            }
            AbstractIntegrationMessageBuilder messageBuilder = KclMessageDrivenChannelAdapter.this.getMessageBuilderFactory().withPayload(payload).setHeader("aws_receivedPartitionKey", (Object)record.getPartitionKey()).setHeader("aws_receivedSequenceNumber", (Object)record.getSequenceNumber()).setHeader("aws_receivedStream", (Object)KclMessageDrivenChannelAdapter.this.stream).setHeader("aws_shard", (Object)this.shardId);
            if (messageToUse != null) {
                messageBuilder.copyHeadersIfAbsent((Map)messageToUse.getHeaders());
            }
            if (CheckpointMode.manual.equals((Object)KclMessageDrivenChannelAdapter.this.checkpointMode)) {
                messageBuilder.setHeader("aws_checkpointer", (Object)checkpointer);
            }
            return messageBuilder;
        }

        private void performSend(AbstractIntegrationMessageBuilder<?> messageBuilder, Object rawRecord) {
            Message messageToSend = messageBuilder.build();
            this.setAttributesIfNecessary(rawRecord, messageToSend);
            try {
                KclMessageDrivenChannelAdapter.this.sendMessage(messageToSend);
            }
            catch (Exception e) {
                KclMessageDrivenChannelAdapter.this.logger.error((Object)("Got an exception during sending a '" + messageToSend + "'\nfor the '" + rawRecord + "'.\nConsider to use 'errorChannel' flow for the compensation logic."), (Throwable)e);
            }
        }

        private void setAttributesIfNecessary(Object record, Message<?> message) {
            if (KclMessageDrivenChannelAdapter.this.getErrorChannel() != null) {
                AttributeAccessor attributes = ErrorMessageUtils.getAttributeAccessor(message, null);
                attributesHolder.set(attributes);
                attributes.setAttribute("aws_rawRecord", record);
            }
        }

        private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
            if (KclMessageDrivenChannelAdapter.this.logger.isInfoEnabled()) {
                KclMessageDrivenChannelAdapter.this.logger.info((Object)("Checkpointing shard " + this.shardId));
            }
            try {
                checkpointer.checkpoint();
            }
            catch (ShutdownException se) {
                KclMessageDrivenChannelAdapter.this.logger.info((Object)"Caught shutdown exception, skipping checkpoint.", (Throwable)se);
            }
            catch (ThrottlingException e) {
                if (KclMessageDrivenChannelAdapter.this.logger.isInfoEnabled()) {
                    KclMessageDrivenChannelAdapter.this.logger.info((Object)"Transient issue when checkpointing", (Throwable)e);
                }
            }
            catch (InvalidStateException e) {
                KclMessageDrivenChannelAdapter.this.logger.error((Object)"Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", (Throwable)e);
            }
        }

        public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
            if (KclMessageDrivenChannelAdapter.this.logger.isInfoEnabled()) {
                KclMessageDrivenChannelAdapter.this.logger.info((Object)("Scheduler is shutting down for reason '" + reason + "'; checkpointing..."));
            }
            try {
                checkpointer.checkpoint();
            }
            catch (InvalidStateException | ShutdownException e) {
                KclMessageDrivenChannelAdapter.this.logger.error((Object)"Exception while checkpointing at requested shutdown. Giving up", e);
            }
        }
    }

    private class RecordProcessorFactory
    implements IRecordProcessorFactory {
        private RecordProcessorFactory() {
        }

        public IRecordProcessor createProcessor() {
            return new RecordProcessor();
        }
    }
}

