/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import java.util.HashSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import lombok.NonNull;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.DataFetcherResult;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.GetRecordsRetrievalStrategy;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingScope;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.GetRecordsResult;
import org.apache.flink.kinesis.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.kinesis.shaded.org.apache.commons.logging.Log;
import org.apache.flink.kinesis.shaded.org.apache.commons.logging.LogFactory;

public class AsynchronousGetRecordsRetrievalStrategy
implements GetRecordsRetrievalStrategy {
    private static final Log log = LogFactory.getLog(AsynchronousGetRecordsRetrievalStrategy.class);
    private static final int TIME_TO_KEEP_ALIVE = 5;
    private static final int CORE_THREAD_POOL_COUNT = 1;
    private final KinesisDataFetcher dataFetcher;
    private final ExecutorService executorService;
    private final int retryGetRecordsInSeconds;
    private final String shardId;
    final Supplier<CompletionService<DataFetcherResult>> completionServiceSupplier;

    public AsynchronousGetRecordsRetrievalStrategy(@NonNull KinesisDataFetcher dataFetcher, int retryGetRecordsInSeconds, int maxGetRecordsThreadPool, String shardId) {
        this(dataFetcher, AsynchronousGetRecordsRetrievalStrategy.buildExector(maxGetRecordsThreadPool, shardId), retryGetRecordsInSeconds, shardId);
        if (dataFetcher == null) {
            throw new NullPointerException("dataFetcher is marked non-null but is null");
        }
    }

    public AsynchronousGetRecordsRetrievalStrategy(KinesisDataFetcher dataFetcher, ExecutorService executorService, int retryGetRecordsInSeconds, String shardId) {
        this(dataFetcher, executorService, retryGetRecordsInSeconds, () -> new ExecutorCompletionService(executorService), shardId);
    }

    AsynchronousGetRecordsRetrievalStrategy(KinesisDataFetcher dataFetcher, ExecutorService executorService, int retryGetRecordsInSeconds, Supplier<CompletionService<DataFetcherResult>> completionServiceSupplier, String shardId) {
        this.dataFetcher = dataFetcher;
        this.executorService = executorService;
        this.retryGetRecordsInSeconds = retryGetRecordsInSeconds;
        this.completionServiceSupplier = completionServiceSupplier;
        this.shardId = shardId;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public GetRecordsResult getRecords(int maxRecords) {
        if (this.executorService.isShutdown()) {
            throw new IllegalStateException("Strategy has been shutdown");
        }
        GetRecordsResult result = null;
        CompletionService<DataFetcherResult> completionService = this.completionServiceSupplier.get();
        HashSet<Future<DataFetcherResult>> futures = new HashSet<Future<DataFetcherResult>>();
        Callable<DataFetcherResult> retrieverCall = this.createRetrieverCallable(maxRecords);
        try {
            while (true) {
                try {
                    futures.add(completionService.submit(retrieverCall));
                }
                catch (RejectedExecutionException e) {
                    log.warn("Out of resources, unable to start additional requests.");
                }
                try {
                    Future<DataFetcherResult> resultFuture = completionService.poll(this.retryGetRecordsInSeconds, TimeUnit.SECONDS);
                    if (resultFuture == null) continue;
                    result = resultFuture.get().accept();
                }
                catch (ExecutionException e) {
                    if (e.getCause() instanceof ExpiredIteratorException) {
                        throw (ExpiredIteratorException)e.getCause();
                    }
                    log.error("ExecutionException thrown while trying to get records", e);
                    continue;
                }
                catch (InterruptedException e) {
                    log.error("Thread was interrupted", e);
                }
                break;
            }
        }
        finally {
            futures.forEach(f -> f.cancel(true));
        }
        return result;
    }

    private Callable<DataFetcherResult> createRetrieverCallable(int maxRecords) {
        ThreadSafeMetricsDelegatingScope metricsScope = new ThreadSafeMetricsDelegatingScope(MetricsHelper.getMetricsScope());
        return () -> {
            try {
                MetricsHelper.setMetricsScope(metricsScope);
                DataFetcherResult dataFetcherResult = this.dataFetcher.getRecords(maxRecords);
                return dataFetcherResult;
            }
            finally {
                MetricsHelper.unsetMetricsScope();
            }
        };
    }

    @Override
    public void shutdown() {
        this.executorService.shutdownNow();
    }

    @Override
    public boolean isShutdown() {
        return this.executorService.isShutdown();
    }

    private static ExecutorService buildExector(int maxGetRecordsThreadPool, String shardId) {
        String threadNameFormat = "get-records-worker-" + shardId + "-%d";
        return new ThreadPoolExecutor(1, maxGetRecordsThreadPool, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1), new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadNameFormat).build(), new ThreadPoolExecutor.AbortPolicy());
    }

    @Override
    public KinesisDataFetcher getDataFetcher() {
        return this.dataFetcher;
    }
}

