package com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import com.amazonaws.SdkClientException;
import com.amazonaws.services.cloudwatch.model.StandardUnit;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import lombok.NonNull;
import org.apache.commons.lang3.Validate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.class */
public class PrefetchGetRecordsCache implements GetRecordsCache {
    private static final Log log = LogFactory.getLog(PrefetchGetRecordsCache.class);
    private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator";
    LinkedBlockingQueue<ProcessRecordsInput> getRecordsResultQueue;
    private int maxPendingProcessRecordsInput;
    private int maxByteSize;
    private int maxRecordsCount;
    private final int maxRecordsPerCall;
    private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
    private final ExecutorService executorService;
    private final IMetricsFactory metricsFactory;
    private final long idleMillisBetweenCalls;
    private Instant lastSuccessfulCall;
    private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon;
    private PrefetchCounters prefetchCounters;
    private boolean started = false;
    private final String operation;
    private final IDataFetcher dataFetcher;
    private final String shardId;

    /* loaded from: input_file:com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache$DefaultGetRecordsCacheDaemon.class */
    private class DefaultGetRecordsCacheDaemon implements Runnable {
        volatile boolean isShutdown;

        private DefaultGetRecordsCacheDaemon() {
            this.isShutdown = false;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                if (this.isShutdown) {
                    break;
                }
                if (Thread.currentThread().isInterrupted()) {
                    PrefetchGetRecordsCache.log.warn("Prefetch thread was interrupted.");
                    break;
                }
                MetricsHelper.startScope(PrefetchGetRecordsCache.this.metricsFactory, PrefetchGetRecordsCache.this.operation);
                if (PrefetchGetRecordsCache.this.prefetchCounters.shouldGetNewRecords()) {
                    try {
                        try {
                            try {
                                try {
                                    try {
                                        sleepBeforeNextCall();
                                        GetRecordsResult records = PrefetchGetRecordsCache.this.getRecordsRetrievalStrategy.getRecords(PrefetchGetRecordsCache.this.maxRecordsPerCall);
                                        PrefetchGetRecordsCache.this.lastSuccessfulCall = Instant.now();
                                        ProcessRecordsInput withCacheEntryTime = new ProcessRecordsInput().withRecords(records.getRecords()).withMillisBehindLatest(records.getMillisBehindLatest()).withCacheEntryTime(PrefetchGetRecordsCache.this.lastSuccessfulCall);
                                        PrefetchGetRecordsCache.this.getRecordsResultQueue.put(withCacheEntryTime);
                                        PrefetchGetRecordsCache.this.prefetchCounters.added(withCacheEntryTime);
                                        MetricsHelper.endScope();
                                    } catch (ExpiredIteratorException e) {
                                        PrefetchGetRecordsCache.log.info(String.format("ShardId %s: getRecords threw ExpiredIteratorException - restarting after greatest seqNum passed to customer", PrefetchGetRecordsCache.this.shardId), e);
                                        MetricsHelper.getMetricsScope().addData(PrefetchGetRecordsCache.EXPIRED_ITERATOR_METRIC, 1.0d, StandardUnit.Count, MetricsLevel.SUMMARY);
                                        PrefetchGetRecordsCache.this.dataFetcher.restartIterator();
                                        MetricsHelper.endScope();
                                    }
                                } catch (Throwable th) {
                                    PrefetchGetRecordsCache.log.error("Unexpected exception was thrown. This could probably be an issue or a bug. Please search for the exception/error online to check what is going on. If the issue persists or is a recurring problem, feel free to open an issue on, https://github.com/awslabs/amazon-kinesis-client.", th);
                                    MetricsHelper.endScope();
                                }
                            } catch (SdkClientException e2) {
                                PrefetchGetRecordsCache.log.error("Exception thrown while fetching records from Kinesis", e2);
                                MetricsHelper.endScope();
                            }
                        } catch (InterruptedException e3) {
                            PrefetchGetRecordsCache.log.info("Thread was interrupted, indicating shutdown was called on the cache.");
                            MetricsHelper.endScope();
                        }
                    } catch (Throwable th2) {
                        MetricsHelper.endScope();
                        throw th2;
                    }
                } else {
                    try {
                        PrefetchGetRecordsCache.this.prefetchCounters.waitForConsumer();
                    } catch (InterruptedException e4) {
                        PrefetchGetRecordsCache.log.info("Thread was interrupted while waiting for the consumer.  Shutdown has probably been started");
                    }
                }
            }
            callShutdownOnStrategy();
        }

        private void callShutdownOnStrategy() {
            if (PrefetchGetRecordsCache.this.getRecordsRetrievalStrategy.isShutdown()) {
                return;
            }
            PrefetchGetRecordsCache.this.getRecordsRetrievalStrategy.shutdown();
        }

        private void sleepBeforeNextCall() throws InterruptedException {
            if (PrefetchGetRecordsCache.this.lastSuccessfulCall == null) {
                return;
            }
            long millis = Duration.between(PrefetchGetRecordsCache.this.lastSuccessfulCall, Instant.now()).abs().toMillis();
            if (millis < PrefetchGetRecordsCache.this.idleMillisBetweenCalls) {
                Thread.sleep(PrefetchGetRecordsCache.this.idleMillisBetweenCalls - millis);
            }
        }
    }

    /* loaded from: input_file:com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache$PrefetchCounters.class */
    private class PrefetchCounters {
        private long size;
        private long byteSize;

        private PrefetchCounters() {
            this.size = 0L;
            this.byteSize = 0L;
        }

        public synchronized void added(ProcessRecordsInput processRecordsInput) {
            this.size += getSize(processRecordsInput);
            this.byteSize += getByteSize(processRecordsInput);
        }

        public synchronized void removed(ProcessRecordsInput processRecordsInput) {
            this.size -= getSize(processRecordsInput);
            this.byteSize -= getByteSize(processRecordsInput);
            notifyAll();
        }

        private long getSize(ProcessRecordsInput processRecordsInput) {
            return processRecordsInput.getRecords().size();
        }

        private long getByteSize(ProcessRecordsInput processRecordsInput) {
            return processRecordsInput.getRecords().stream().mapToLong(record -> {
                return record.getData().array().length;
            }).sum();
        }

        public synchronized void waitForConsumer() throws InterruptedException {
            if (shouldGetNewRecords()) {
                return;
            }
            PrefetchGetRecordsCache.log.debug("Queue is full waiting for consumer for " + PrefetchGetRecordsCache.this.idleMillisBetweenCalls + " ms");
            wait(PrefetchGetRecordsCache.this.idleMillisBetweenCalls);
        }

        public synchronized boolean shouldGetNewRecords() {
            if (PrefetchGetRecordsCache.log.isDebugEnabled()) {
                PrefetchGetRecordsCache.log.debug("Current Prefetch Counter States: " + toString());
            }
            return this.size < ((long) PrefetchGetRecordsCache.this.maxRecordsCount) && this.byteSize < ((long) PrefetchGetRecordsCache.this.maxByteSize);
        }

        public String toString() {
            return String.format("{ Requests: %d, Records: %d, Bytes: %d }", Integer.valueOf(PrefetchGetRecordsCache.this.getRecordsResultQueue.size()), Long.valueOf(this.size), Long.valueOf(this.byteSize));
        }
    }

    public PrefetchGetRecordsCache(int i, int i2, int i3, int i4, @NonNull GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, @NonNull ExecutorService executorService, long j, @NonNull IMetricsFactory iMetricsFactory, @NonNull String str, @NonNull String str2) {
        if (getRecordsRetrievalStrategy == null) {
            throw new NullPointerException("getRecordsRetrievalStrategy is marked non-null but is null");
        }
        if (executorService == null) {
            throw new NullPointerException("executorService is marked non-null but is null");
        }
        if (iMetricsFactory == null) {
            throw new NullPointerException("metricsFactory is marked non-null but is null");
        }
        if (str == null) {
            throw new NullPointerException("operation is marked non-null but is null");
        }
        if (str2 == null) {
            throw new NullPointerException("shardId is marked non-null but is null");
        }
        this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
        this.maxRecordsPerCall = i4;
        this.maxPendingProcessRecordsInput = i;
        this.maxByteSize = i2;
        this.maxRecordsCount = i3;
        this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxPendingProcessRecordsInput);
        this.prefetchCounters = new PrefetchCounters();
        this.executorService = executorService;
        this.metricsFactory = new ThreadSafeMetricsDelegatingFactory(iMetricsFactory);
        this.idleMillisBetweenCalls = j;
        this.defaultGetRecordsCacheDaemon = new DefaultGetRecordsCacheDaemon();
        Validate.notEmpty(str, "Operation cannot be empty", new Object[0]);
        this.operation = str;
        this.dataFetcher = this.getRecordsRetrievalStrategy.getDataFetcher();
        this.shardId = str2;
    }

    @Override // com.amazonaws.services.kinesis.clientlibrary.lib.worker.GetRecordsCache
    public void start() {
        if (this.executorService.isShutdown()) {
            throw new IllegalStateException("ExecutorService has been shutdown.");
        }
        if (!this.started) {
            log.info("Starting prefetching thread.");
            this.executorService.execute(this.defaultGetRecordsCacheDaemon);
        }
        this.started = true;
    }

    @Override // com.amazonaws.services.kinesis.clientlibrary.lib.worker.GetRecordsCache
    public ProcessRecordsInput getNextResult() {
        if (this.executorService.isShutdown()) {
            throw new IllegalStateException("Shutdown has been called on the cache, can't accept new requests.");
        }
        if (!this.started) {
            throw new IllegalStateException("Cache has not been initialized, make sure to call start.");
        }
        ProcessRecordsInput processRecordsInput = null;
        try {
            processRecordsInput = this.getRecordsResultQueue.take().withCacheExitTime(Instant.now());
            this.prefetchCounters.removed(processRecordsInput);
            log.info("Shard " + this.shardId + ": Number of records remaining in queue is " + this.getRecordsResultQueue.size());
        } catch (InterruptedException e) {
            log.error("Interrupted while getting records from the cache", e);
        }
        return processRecordsInput;
    }

    @Override // com.amazonaws.services.kinesis.clientlibrary.lib.worker.GetRecordsCache
    public GetRecordsRetrievalStrategy getGetRecordsRetrievalStrategy() {
        return this.getRecordsRetrievalStrategy;
    }

    @Override // com.amazonaws.services.kinesis.clientlibrary.lib.worker.GetRecordsCache
    public void shutdown() {
        this.defaultGetRecordsCacheDaemon.isShutdown = true;
        this.executorService.shutdownNow();
        this.started = false;
    }
}
