/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import software.amazon.kinesis.shaded.com.amazonaws.ClientConfiguration;
import software.amazon.kinesis.shaded.com.amazonaws.auth.AWSCredentialsProvider;
import software.amazon.kinesis.shaded.com.amazonaws.client.builder.AwsClientBuilder;
import software.amazon.kinesis.shaded.com.amazonaws.regions.RegionUtils;
import software.amazon.kinesis.shaded.com.amazonaws.regions.Regions;
import software.amazon.kinesis.shaded.com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import software.amazon.kinesis.shaded.com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
import software.amazon.kinesis.shaded.com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder;
import software.amazon.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import software.amazon.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import software.amazon.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesis;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.DeterministicShuffleShardSyncLeaderDecider;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.GracefulShutdownContext;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.GracefulShutdownCoordinator;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisLeaseCleanupValidator;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisShardSyncer;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.LeaderDecider;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.LeaseCleanupValidator;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.NoOpShardPrioritization;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.NoOpWorkerStateChangeListener;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.PeriodicShardSyncManager;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.PeriodicShardSyncStrategy;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.SequenceNumberValidator;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumerShutdownNotification;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardEndShardSyncStrategy;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardPrioritization;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncStrategy;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncStrategyType;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncTask;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncTaskManager;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncer;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.TaskResult;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.V1ToV2RecordProcessorFactoryAdapter;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.WorkerStateChangeListener;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.impl.LeaseRenewer;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.impl.LeaseTaker;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.interfaces.ILeaseRenewer;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.interfaces.ILeaseTaker;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.metrics.impl.CWMetricsFactory;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import software.amazon.kinesis.shaded.com.google.common.annotations.VisibleForTesting;
import software.amazon.kinesis.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import software.amazon.kinesis.shaded.org.apache.commons.lang3.StringUtils;
import software.amazon.kinesis.shaded.org.apache.commons.logging.Log;
import software.amazon.kinesis.shaded.org.apache.commons.logging.LogFactory;

public class Worker
implements Runnable {
    private static final Log LOG = LogFactory.getLog(Worker.class);
    private static final int SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC = 0;
    private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1;
    static long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3000L;
    static long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1000L;
    static long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30000L;
    private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener();
    private static final LeaseCleanupValidator DEFAULT_LEASE_CLEANUP_VALIDATOR = new KinesisLeaseCleanupValidator();
    private static final LeaseSelector<KinesisClientLease> DEFAULT_LEASE_SELECTOR = new GenericLeaseSelector<KinesisClientLease>();
    private WorkerLog wlog = new WorkerLog();
    private final String applicationName;
    private final software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory recordProcessorFactory;
    private final KinesisClientLibConfiguration config;
    private final StreamConfig streamConfig;
    private final InitialPositionInStreamExtended initialPosition;
    private final ICheckpoint checkpointTracker;
    private final long idleTimeInMilliseconds;
    private final long parentShardPollIntervalMillis;
    private final ExecutorService executorService;
    private final IMetricsFactory metricsFactory;
    private final long taskBackoffTimeMillis;
    private final long failoverTimeMillis;
    private final Optional<Integer> retryGetRecordsInSeconds;
    private final Optional<Integer> maxGetRecordsThreadPool;
    private final KinesisClientLibLeaseCoordinator leaseCoordinator;
    private final ShardSyncTaskManager shardSyncTaskManager;
    private final ShardPrioritization shardPrioritization;
    private volatile boolean shutdown;
    private volatile long shutdownStartTimeMillis;
    private volatile boolean shutdownComplete = false;
    private final ShardSyncer shardSyncer;
    private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap = new ConcurrentHashMap<ShardInfo, ShardConsumer>();
    private final boolean cleanupLeasesUponShardCompletion;
    private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
    private Future<Boolean> gracefulShutdownFuture;
    @VisibleForTesting
    protected boolean gracefuleShutdownStarted = false;
    @VisibleForTesting
    protected GracefulShutdownCoordinator gracefulShutdownCoordinator = new GracefulShutdownCoordinator();
    private WorkerStateChangeListener workerStateChangeListener;
    private LeaderDecider leaderDecider;
    private ShardSyncStrategy shardSyncStrategy;
    private PeriodicShardSyncManager leaderElectedPeriodicShardSyncManager;
    private final LeaseCleanupManager leaseCleanupManager;

    @Deprecated
    public Worker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config) {
        this(recordProcessorFactory, config, Worker.getExecutorService());
    }

    @Deprecated
    public Worker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, ExecutorService execService) {
        this(recordProcessorFactory, config, new AmazonKinesisClient(config.getKinesisCredentialsProvider(), config.getKinesisClientConfiguration()), new AmazonDynamoDBClient(config.getDynamoDBCredentialsProvider(), config.getDynamoDBClientConfiguration()), new AmazonCloudWatchClient(config.getCloudWatchCredentialsProvider(), config.getCloudWatchClientConfiguration()), execService);
    }

    @Deprecated
    public Worker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, IMetricsFactory metricsFactory) {
        this(recordProcessorFactory, config, metricsFactory, Worker.getExecutorService());
    }

    @Deprecated
    public Worker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, IMetricsFactory metricsFactory, ExecutorService execService) {
        this(recordProcessorFactory, config, new AmazonKinesisClient(config.getKinesisCredentialsProvider(), config.getKinesisClientConfiguration()), new AmazonDynamoDBClient(config.getDynamoDBCredentialsProvider(), config.getDynamoDBClientConfiguration()), metricsFactory, execService);
    }

    @Deprecated
    public Worker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient, AmazonCloudWatch cloudWatchClient) {
        this(recordProcessorFactory, config, kinesisClient, dynamoDBClient, cloudWatchClient, Worker.getExecutorService());
    }

    @Deprecated
    public Worker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient, AmazonCloudWatch cloudWatchClient, ExecutorService execService) {
        this(recordProcessorFactory, config, kinesisClient, dynamoDBClient, Worker.getMetricsFactory(cloudWatchClient, config), execService);
    }

    @Deprecated
    public Worker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient, AmazonDynamoDBClient dynamoDBClient, AmazonCloudWatchClient cloudWatchClient) {
        this(recordProcessorFactory, config, (AmazonKinesis)kinesisClient, (AmazonDynamoDB)dynamoDBClient, (AmazonCloudWatch)cloudWatchClient);
    }

    @Deprecated
    public Worker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient, AmazonDynamoDBClient dynamoDBClient, AmazonCloudWatchClient cloudWatchClient, ExecutorService execService) {
        this(recordProcessorFactory, config, (AmazonKinesis)kinesisClient, (AmazonDynamoDB)dynamoDBClient, (AmazonCloudWatch)cloudWatchClient, execService);
    }

    @Deprecated
    public Worker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient, AmazonDynamoDBClient dynamoDBClient, IMetricsFactory metricsFactory, ExecutorService execService) {
        this(recordProcessorFactory, config, (AmazonKinesis)kinesisClient, (AmazonDynamoDB)dynamoDBClient, metricsFactory, execService);
    }

    @Deprecated
    public Worker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient, IMetricsFactory metricsFactory, ExecutorService execService) {
        this(config.getApplicationName(), (software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory)new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory), config, Worker.getStreamConfig(config, kinesisClient), config.getInitialPositionInStreamExtended(), config.getParentShardPollIntervalMillis(), config.getShardSyncIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(), null, Worker.getLeaseCoordinator(config, dynamoDBClient, metricsFactory).withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity()).withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()), execService, metricsFactory, config.getTaskBackoffTimeMillis(), config.getFailoverTimeMillis(), config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(), config.getShardPrioritizationStrategy(), config.getRetryGetRecordsInSeconds(), config.getMaxGetRecordsThreadPool(), DEFAULT_WORKER_STATE_CHANGE_LISTENER, DEFAULT_LEASE_CLEANUP_VALIDATOR, null, null);
        if (config.getRegionName() != null) {
            Worker.setField(kinesisClient, "region", kinesisClient::setRegion, RegionUtils.getRegion(config.getRegionName()));
            Worker.setField(dynamoDBClient, "region", dynamoDBClient::setRegion, RegionUtils.getRegion(config.getRegionName()));
        }
        if (config.getDynamoDBEndpoint() != null) {
            Worker.setField(dynamoDBClient, "endpoint", dynamoDBClient::setEndpoint, config.getDynamoDBEndpoint());
        }
        if (config.getKinesisEndpoint() != null) {
            Worker.setField(kinesisClient, "endpoint", kinesisClient::setEndpoint, config.getKinesisEndpoint());
        }
    }

    Worker(String applicationName, software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) {
        this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER, DEFAULT_LEASE_CLEANUP_VALIDATOR, null, null);
    }

    Worker(String applicationName, software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener, LeaseCleanupValidator leaseCleanupValidator, LeaderDecider leaderDecider, PeriodicShardSyncManager periodicShardSyncManager) {
        this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, retryGetRecordsInSeconds, maxGetRecordsThreadPool, workerStateChangeListener, new KinesisShardSyncer(leaseCleanupValidator), leaderDecider, periodicShardSyncManager);
    }

    Worker(String applicationName, software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener, ShardSyncer shardSyncer, LeaderDecider leaderDecider, PeriodicShardSyncManager periodicShardSyncManager) {
        this.applicationName = applicationName;
        this.recordProcessorFactory = recordProcessorFactory;
        this.config = config;
        this.streamConfig = streamConfig;
        this.initialPosition = initialPositionInStream;
        this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
        this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
        this.checkpointTracker = checkpoint != null ? checkpoint : leaseCoordinator;
        this.idleTimeInMilliseconds = streamConfig.getIdleTimeInMilliseconds();
        this.executorService = execService;
        this.leaseCoordinator = leaseCoordinator;
        this.metricsFactory = metricsFactory;
        this.shardSyncer = shardSyncer;
        this.shardSyncTaskManager = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), initialPositionInStream, cleanupLeasesUponShardCompletion, config.shouldIgnoreUnexpectedChildShards(), shardSyncIdleTimeMillis, metricsFactory, this.executorService, shardSyncer);
        this.taskBackoffTimeMillis = taskBackoffTimeMillis;
        this.failoverTimeMillis = failoverTimeMillis;
        this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
        this.shardPrioritization = shardPrioritization;
        this.retryGetRecordsInSeconds = retryGetRecordsInSeconds;
        this.maxGetRecordsThreadPool = maxGetRecordsThreadPool;
        this.workerStateChangeListener = workerStateChangeListener;
        workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED);
        this.createShardSyncStrategy(config.getShardSyncStrategyType(), leaderDecider, periodicShardSyncManager);
        this.leaseCleanupManager = LeaseCleanupManager.newInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), Executors.newSingleThreadScheduledExecutor(), metricsFactory, cleanupLeasesUponShardCompletion, config.leaseCleanupIntervalMillis(), config.completedLeaseCleanupThresholdMillis(), config.garbageLeaseCleanupThresholdMillis(), config.getMaxRecords());
    }

    private void createShardSyncStrategy(ShardSyncStrategyType strategyType, LeaderDecider leaderDecider, PeriodicShardSyncManager periodicShardSyncManager) {
        switch (strategyType) {
            case PERIODIC: {
                this.leaderDecider = this.getOrCreateLeaderDecider(leaderDecider);
                this.leaderElectedPeriodicShardSyncManager = this.getOrCreatePeriodicShardSyncManager(periodicShardSyncManager, false);
                this.shardSyncStrategy = this.createPeriodicShardSyncStrategy();
                break;
            }
            default: {
                if (leaderDecider != null) {
                    LOG.warn("LeaderDecider cannot be customized with non-PERIODIC shard sync strategy type. Using default LeaderDecider.");
                }
                this.leaderDecider = this.getOrCreateLeaderDecider(null);
                this.leaderElectedPeriodicShardSyncManager = this.getOrCreatePeriodicShardSyncManager(periodicShardSyncManager, true);
                this.shardSyncStrategy = this.createShardEndShardSyncStrategy();
            }
        }
        LOG.info("Shard sync strategy determined as " + this.shardSyncStrategy.getStrategyType().toString());
    }

    private static KinesisClientLibLeaseCoordinator getLeaseCoordinator(KinesisClientLibConfiguration config, AmazonDynamoDB dynamoDBClient, IMetricsFactory metricsFactory) {
        return new KinesisClientLibLeaseCoordinator(new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient, config.getBillingMode()), DEFAULT_LEASE_SELECTOR, config.getWorkerIdentifier(), config.getFailoverTimeMillis(), config.getEpsilonMillis(), config.getMaxLeasesForWorker(), config.getMaxLeasesToStealAtOneTime(), config.getMaxLeaseRenewalThreads(), metricsFactory);
    }

    private static StreamConfig getStreamConfig(KinesisClientLibConfiguration config, AmazonKinesis kinesisClient) {
        return new StreamConfig(new KinesisProxy(config, kinesisClient), config.getMaxRecords(), config.getIdleTimeBetweenReadsInMillis(), config.shouldCallProcessRecordsEvenForEmptyRecordList(), config.shouldValidateSequenceNumberBeforeCheckpointing(), config.getInitialPositionInStreamExtended());
    }

    public String getApplicationName() {
        return this.applicationName;
    }

    KinesisClientLibLeaseCoordinator getLeaseCoordinator() {
        return this.leaseCoordinator;
    }

    LeaderDecider getLeaderDecider() {
        return this.leaderDecider;
    }

    PeriodicShardSyncManager getPeriodicShardSyncManager() {
        return this.leaderElectedPeriodicShardSyncManager;
    }

    @Override
    public void run() {
        if (this.shutdown) {
            return;
        }
        try {
            this.initialize();
            LOG.info("Initialization complete. Starting worker loop.");
        }
        catch (RuntimeException e1) {
            LOG.error("Unable to initialize after " + this.config.getMaxInitializationAttempts() + " attempts. Shutting down.", e1);
            this.shutdown();
        }
        while (!this.shouldShutdown()) {
            this.runProcessLoop();
        }
        this.finalShutdown();
        LOG.info("Worker loop is complete. Exiting from worker.");
    }

    @VisibleForTesting
    void runProcessLoop() {
        try {
            boolean foundCompletedShard = false;
            HashSet<ShardInfo> assignedShards = new HashSet<ShardInfo>();
            for (ShardInfo shardInfo : this.getShardInfoForAssignments()) {
                ShardConsumer shardConsumer = this.createOrGetShardConsumer(shardInfo, this.recordProcessorFactory);
                if (shardConsumer.isShutdown() && shardConsumer.getShutdownReason().equals((Object)ShutdownReason.TERMINATE)) {
                    foundCompletedShard = true;
                } else {
                    shardConsumer.consumeShard();
                }
                assignedShards.add(shardInfo);
            }
            this.cleanupShardConsumers(assignedShards);
            this.wlog.info("Sleeping ...");
            Thread.sleep(this.idleTimeInMilliseconds);
        }
        catch (Exception e) {
            LOG.error(String.format("Worker.run caught exception, sleeping for %s milli seconds!", String.valueOf(this.idleTimeInMilliseconds)), e);
            try {
                Thread.sleep(this.idleTimeInMilliseconds);
            }
            catch (InterruptedException ex) {
                LOG.info("Worker: sleep interrupted after catching exception ", ex);
            }
        }
        this.wlog.resetInfoLogging();
    }

    private void initialize() {
        this.workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING);
        boolean isDone = false;
        Exception lastException = null;
        for (int i = 0; !isDone && i < this.config.getMaxInitializationAttempts(); ++i) {
            try {
                LOG.info("Initialization attempt " + (i + 1));
                LOG.info("Initializing LeaseCoordinator");
                this.leaseCoordinator.initialize();
                if (this.shouldInitiateLeaseSync()) {
                    LOG.info(this.config.getWorkerIdentifier() + " worker is beginning initial lease sync.");
                    TaskResult result = this.leaderElectedPeriodicShardSyncManager.syncShardsOnce();
                    if (result.getException() != null) {
                        throw result.getException();
                    }
                }
                this.leaseCleanupManager.start();
                if (!this.leaseCoordinator.isRunning()) {
                    LOG.info("Starting LeaseCoordinator");
                    this.leaseCoordinator.start();
                } else {
                    LOG.info("LeaseCoordinator is already running. No need to start it.");
                }
                this.shardSyncStrategy.onWorkerInitialization();
                isDone = true;
            }
            catch (LeasingException e) {
                LOG.error("Caught exception when initializing LeaseCoordinator", e);
                lastException = e;
            }
            catch (Exception e) {
                lastException = e;
            }
            try {
                Thread.sleep(this.parentShardPollIntervalMillis);
                continue;
            }
            catch (InterruptedException e) {
                LOG.debug("Sleep interrupted while initializing worker.");
            }
        }
        if (!isDone) {
            this.leaderElectedPeriodicShardSyncManager.stop();
            throw new RuntimeException(lastException);
        }
        this.workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED);
    }

    @VisibleForTesting
    boolean shouldInitiateLeaseSync() throws InterruptedException, DependencyException, InvalidStateException, ProvisionedThroughputException {
        ILeaseManager<KinesisClientLease> leaseManager = this.leaseCoordinator.getLeaseManager();
        if (this.skipShardSyncAtWorkerInitializationIfLeasesExist && !leaseManager.isLeaseTableEmpty()) {
            LOG.info("Skipping shard sync because getSkipShardSyncAtWorkerInitializationIfLeasesExist config is set to TRUE and lease table is not empty.");
            return false;
        }
        long waitTime = ThreadLocalRandom.current().nextLong(MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS, MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS);
        long waitUntil = System.currentTimeMillis() + waitTime;
        boolean shouldInitiateLeaseSync = true;
        while (System.currentTimeMillis() < waitUntil && (shouldInitiateLeaseSync = leaseManager.isLeaseTableEmpty())) {
            LOG.info("Lease table is still empty. Checking again in " + LEASE_TABLE_CHECK_FREQUENCY_MILLIS + " ms.");
            Thread.sleep(LEASE_TABLE_CHECK_FREQUENCY_MILLIS);
        }
        return shouldInitiateLeaseSync;
    }

    void cleanupShardConsumers(Set<ShardInfo> assignedShards) {
        for (ShardInfo shard : this.shardInfoShardConsumerMap.keySet()) {
            boolean isShutdown;
            if (assignedShards.contains(shard) || !(isShutdown = ((ShardConsumer)this.shardInfoShardConsumerMap.get(shard)).beginShutdown())) continue;
            this.shardInfoShardConsumerMap.remove(shard);
        }
    }

    private List<ShardInfo> getShardInfoForAssignments() {
        List<ShardInfo> assignedStreamShards = this.leaseCoordinator.getCurrentAssignments();
        List<ShardInfo> prioritizedShards = this.shardPrioritization.prioritize(assignedStreamShards);
        if (prioritizedShards != null && !prioritizedShards.isEmpty()) {
            if (this.wlog.isInfoEnabled()) {
                StringBuilder builder = new StringBuilder();
                boolean firstItem = true;
                for (ShardInfo shardInfo : prioritizedShards) {
                    if (!firstItem) {
                        builder.append(", ");
                    }
                    builder.append(shardInfo.getShardId());
                    firstItem = false;
                }
                this.wlog.info("Current stream shard assignments: " + builder.toString());
            }
        } else {
            this.wlog.info("No activities assigned");
        }
        return prioritizedShards;
    }

    @Deprecated
    public Future<Void> requestShutdown() {
        final Future<Boolean> requestedShutdownFuture = this.startGracefulShutdown();
        return new Future<Void>(){

            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                return requestedShutdownFuture.cancel(mayInterruptIfRunning);
            }

            @Override
            public boolean isCancelled() {
                return requestedShutdownFuture.isCancelled();
            }

            @Override
            public boolean isDone() {
                return requestedShutdownFuture.isDone();
            }

            @Override
            public Void get() throws InterruptedException, ExecutionException {
                requestedShutdownFuture.get();
                return null;
            }

            @Override
            public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                requestedShutdownFuture.get(timeout, unit);
                return null;
            }
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Future<Boolean> startGracefulShutdown() {
        Worker worker = this;
        synchronized (worker) {
            if (this.gracefulShutdownFuture == null) {
                this.gracefulShutdownFuture = this.gracefulShutdownCoordinator.startGracefulShutdown(this.createGracefulShutdownCallable());
            }
        }
        return this.gracefulShutdownFuture;
    }

    public Callable<Boolean> createGracefulShutdownCallable() {
        if (this.isShutdownComplete()) {
            return () -> true;
        }
        Callable<GracefulShutdownContext> startShutdown = this.createWorkerShutdownCallable();
        return this.gracefulShutdownCoordinator.createGracefulShutdownCallable(startShutdown);
    }

    public boolean hasGracefulShutdownStarted() {
        return this.gracefuleShutdownStarted;
    }

    @VisibleForTesting
    Callable<GracefulShutdownContext> createWorkerShutdownCallable() {
        return () -> {
            Worker worker = this;
            synchronized (worker) {
                if (this.gracefuleShutdownStarted) {
                    throw new IllegalStateException("Requested shutdown has already been started");
                }
                this.gracefuleShutdownStarted = true;
            }
            this.leaseCoordinator.stopLeaseTaker();
            Collection leases = this.leaseCoordinator.getAssignments();
            if (leases == null || leases.isEmpty()) {
                this.shutdown();
                return GracefulShutdownContext.SHUTDOWN_ALREADY_COMPLETED;
            }
            CountDownLatch shutdownCompleteLatch = new CountDownLatch(leases.size());
            CountDownLatch notificationCompleteLatch = new CountDownLatch(leases.size());
            for (KinesisClientLease lease : leases) {
                ShardConsumerShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(this.leaseCoordinator, lease, notificationCompleteLatch, shutdownCompleteLatch);
                ShardInfo shardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(lease);
                ShardConsumer consumer = (ShardConsumer)this.shardInfoShardConsumerMap.get(shardInfo);
                if (consumer == null || ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE.equals((Object)consumer.getCurrentState())) {
                    notificationCompleteLatch.countDown();
                    shutdownCompleteLatch.countDown();
                    continue;
                }
                consumer.notifyShutdownRequested(shutdownNotification);
            }
            return new GracefulShutdownContext(shutdownCompleteLatch, notificationCompleteLatch, this);
        };
    }

    boolean isShutdownComplete() {
        return this.shutdownComplete;
    }

    ConcurrentMap<ShardInfo, ShardConsumer> getShardInfoShardConsumerMap() {
        return this.shardInfoShardConsumerMap;
    }

    WorkerStateChangeListener getWorkerStateChangeListener() {
        return this.workerStateChangeListener;
    }

    public void shutdown() {
        if (this.shutdown) {
            LOG.warn("Shutdown requested a second time.");
            return;
        }
        LOG.info("Worker shutdown requested.");
        this.shutdown = true;
        this.shutdownStartTimeMillis = System.currentTimeMillis();
        this.leaseCoordinator.stop();
        this.leaseCleanupManager.shutdown();
        if (this.shardSyncStrategy != null) {
            this.shardSyncStrategy.onWorkerShutDown();
        }
        this.workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN);
    }

    private void finalShutdown() {
        LOG.info("Starting worker's final shutdown.");
        if (this.executorService instanceof WorkerThreadPoolExecutor) {
            this.executorService.shutdownNow();
        }
        if (this.metricsFactory instanceof WorkerCWMetricsFactory) {
            ((CWMetricsFactory)this.metricsFactory).shutdown();
        }
        this.shutdownComplete = true;
    }

    @VisibleForTesting
    boolean shouldShutdown() {
        if (this.executorService.isShutdown()) {
            LOG.error("Worker executor service has been shutdown, so record processors cannot be shutdown.");
            return true;
        }
        if (this.shutdown) {
            if (this.shardInfoShardConsumerMap.isEmpty()) {
                LOG.info("All record processors have been shutdown successfully.");
                return true;
            }
            if (System.currentTimeMillis() - this.shutdownStartTimeMillis >= this.failoverTimeMillis) {
                LOG.info("Lease failover time is reached, so forcing shutdown.");
                return true;
            }
        }
        return false;
    }

    ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory processorFactory) {
        ShardConsumer consumer = (ShardConsumer)this.shardInfoShardConsumerMap.get(shardInfo);
        if (consumer == null || consumer.isShutdown() && consumer.getShutdownReason().equals((Object)ShutdownReason.ZOMBIE)) {
            consumer = this.buildConsumer(shardInfo, processorFactory);
            this.shardInfoShardConsumerMap.put(shardInfo, consumer);
            this.wlog.infoForce("Created new shardConsumer for : " + shardInfo);
        }
        return consumer;
    }

    protected ShardConsumer buildConsumer(ShardInfo shardInfo, software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory processorFactory) {
        IRecordProcessor recordProcessor = processorFactory.createProcessor();
        RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer(shardInfo, this.checkpointTracker, new SequenceNumberValidator(this.streamConfig.getStreamProxy(), shardInfo.getShardId(), this.streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()), this.metricsFactory);
        return new ShardConsumer(shardInfo, this.streamConfig, this.checkpointTracker, recordProcessor, recordProcessorCheckpointer, this.leaseCoordinator, this.parentShardPollIntervalMillis, this.cleanupLeasesUponShardCompletion, this.executorService, this.metricsFactory, this.taskBackoffTimeMillis, this.skipShardSyncAtWorkerInitializationIfLeasesExist, new KinesisDataFetcher(this.streamConfig.getStreamProxy(), shardInfo), this.retryGetRecordsInSeconds, this.maxGetRecordsThreadPool, this.config, this.shardSyncer, this.shardSyncStrategy, this.leaseCleanupManager);
    }

    @VisibleForTesting
    StreamConfig getStreamConfig() {
        return this.streamConfig;
    }

    private static IMetricsFactory getMetricsFactory(AmazonCloudWatch cloudWatchClient, KinesisClientLibConfiguration config) {
        IMetricsFactory metricsFactory;
        if (config.getMetricsLevel() == MetricsLevel.NONE) {
            metricsFactory = new NullMetricsFactory();
        } else {
            if (config.getRegionName() != null) {
                Worker.setField(cloudWatchClient, "region", cloudWatchClient::setRegion, RegionUtils.getRegion(config.getRegionName()));
            }
            metricsFactory = new WorkerCWMetricsFactory(cloudWatchClient, config.getApplicationName(), config.getMetricsBufferTimeMillis(), config.getMetricsMaxQueueSize(), config.getMetricsLevel(), config.getMetricsEnabledDimensions());
        }
        return metricsFactory;
    }

    private static ExecutorService getExecutorService() {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("RecordProcessor-%04d").build();
        return new WorkerThreadPoolExecutor(threadFactory);
    }

    private static <S, T> void setField(S source, String field, Consumer<T> t, T value) {
        try {
            t.accept(value);
        }
        catch (UnsupportedOperationException e) {
            LOG.debug("Exception thrown while trying to set " + field + ", indicating that " + source.getClass().getSimpleName() + "is immutable.", e);
        }
    }

    private PeriodicShardSyncStrategy createPeriodicShardSyncStrategy() {
        return new PeriodicShardSyncStrategy(this.leaderElectedPeriodicShardSyncManager);
    }

    private ShardEndShardSyncStrategy createShardEndShardSyncStrategy() {
        return new ShardEndShardSyncStrategy(this.shardSyncTaskManager, this.leaderElectedPeriodicShardSyncManager);
    }

    private LeaderDecider getOrCreateLeaderDecider(LeaderDecider leaderDecider) {
        if (leaderDecider != null) {
            return leaderDecider;
        }
        return new DeterministicShuffleShardSyncLeaderDecider(this.leaseCoordinator.getLeaseManager(), Executors.newSingleThreadScheduledExecutor(), 1);
    }

    private PeriodicShardSyncManager getOrCreatePeriodicShardSyncManager(PeriodicShardSyncManager periodicShardSyncManager, boolean isAuditorMode) {
        if (periodicShardSyncManager != null) {
            return periodicShardSyncManager;
        }
        return new PeriodicShardSyncManager(this.config.getWorkerIdentifier(), this.leaderDecider, new ShardSyncTask(this.streamConfig.getStreamProxy(), this.leaseCoordinator.getLeaseManager(), this.config.getInitialPositionInStreamExtended(), this.config.shouldCleanupLeasesUponShardCompletion(), this.config.shouldIgnoreUnexpectedChildShards(), 0L, this.shardSyncer, null), this.metricsFactory, this.leaseCoordinator.getLeaseManager(), this.streamConfig.getStreamProxy(), isAuditorMode, this.config.getLeasesRecoveryAuditorExecutionFrequencyMillis(), this.config.getLeasesRecoveryAuditorInconsistencyConfidenceThreshold());
    }

    public static class Builder {
        private software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory recordProcessorFactory;
        private KinesisClientLibConfiguration config;
        private AmazonKinesis kinesisClient;
        private AmazonDynamoDB dynamoDBClient;
        private AmazonCloudWatch cloudWatchClient;
        private IMetricsFactory metricsFactory;
        private ILeaseManager<KinesisClientLease> leaseManager;
        private ExecutorService execService;
        private ShardPrioritization shardPrioritization;
        private IKinesisProxy kinesisProxy;
        private WorkerStateChangeListener workerStateChangeListener;
        private LeaseCleanupValidator leaseCleanupValidator;
        private LeaseSelector<KinesisClientLease> leaseSelector;
        private LeaderDecider leaderDecider;
        private ILeaseTaker<KinesisClientLease> leaseTaker;
        private ILeaseRenewer<KinesisClientLease> leaseRenewer;
        private ShardSyncer shardSyncer;

        @VisibleForTesting
        AmazonKinesis getKinesisClient() {
            return this.kinesisClient;
        }

        @VisibleForTesting
        AmazonDynamoDB getDynamoDBClient() {
            return this.dynamoDBClient;
        }

        @VisibleForTesting
        AmazonCloudWatch getCloudWatchClient() {
            return this.cloudWatchClient;
        }

        public Builder recordProcessorFactory(IRecordProcessorFactory recordProcessorFactory) {
            this.recordProcessorFactory = new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory);
            return this;
        }

        public Builder recordProcessorFactory(software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory recordProcessorFactory) {
            this.recordProcessorFactory = recordProcessorFactory;
            return this;
        }

        public Worker build() {
            if (this.config == null) {
                throw new IllegalArgumentException("Kinesis Client Library configuration needs to be provided to build Worker");
            }
            if (this.recordProcessorFactory == null) {
                throw new IllegalArgumentException("A Record Processor Factory needs to be provided to build Worker");
            }
            if (this.execService == null) {
                this.execService = Worker.getExecutorService();
            }
            if (this.kinesisClient == null) {
                this.kinesisClient = (AmazonKinesis)this.createClient(AmazonKinesisClientBuilder.standard(), this.config.getKinesisCredentialsProvider(), this.config.getKinesisClientConfiguration(), this.config.getKinesisEndpoint(), this.config.getRegionName());
            }
            if (this.dynamoDBClient == null) {
                this.dynamoDBClient = (AmazonDynamoDB)this.createClient(AmazonDynamoDBClientBuilder.standard(), this.config.getDynamoDBCredentialsProvider(), this.config.getDynamoDBClientConfiguration(), this.config.getDynamoDBEndpoint(), this.config.getRegionName());
            }
            if (this.cloudWatchClient == null) {
                this.cloudWatchClient = (AmazonCloudWatch)this.createClient(AmazonCloudWatchClientBuilder.standard(), this.config.getCloudWatchCredentialsProvider(), this.config.getCloudWatchClientConfiguration(), null, this.config.getRegionName());
            }
            if (this.config.getRegionName() != null) {
                Worker.setField(this.cloudWatchClient, "region", this.cloudWatchClient::setRegion, RegionUtils.getRegion(this.config.getRegionName()));
                Worker.setField(this.kinesisClient, "region", this.kinesisClient::setRegion, RegionUtils.getRegion(this.config.getRegionName()));
                Worker.setField(this.dynamoDBClient, "region", this.dynamoDBClient::setRegion, RegionUtils.getRegion(this.config.getRegionName()));
            }
            if (this.config.getDynamoDBEndpoint() != null) {
                Worker.setField(this.dynamoDBClient, "endpoint", this.dynamoDBClient::setEndpoint, this.config.getDynamoDBEndpoint());
            }
            if (this.config.getKinesisEndpoint() != null) {
                Worker.setField(this.kinesisClient, "endpoint", this.kinesisClient::setEndpoint, this.config.getKinesisEndpoint());
            }
            if (this.metricsFactory == null) {
                this.metricsFactory = Worker.getMetricsFactory(this.cloudWatchClient, this.config);
            }
            if (this.leaseManager == null) {
                this.leaseManager = new KinesisClientLeaseManager(this.config.getTableName(), this.dynamoDBClient, this.config.getBillingMode());
            }
            if (this.shardPrioritization == null) {
                this.shardPrioritization = new NoOpShardPrioritization();
            }
            if (this.kinesisProxy == null) {
                this.kinesisProxy = new KinesisProxy(this.config, this.kinesisClient);
            }
            if (this.workerStateChangeListener == null) {
                this.workerStateChangeListener = DEFAULT_WORKER_STATE_CHANGE_LISTENER;
            }
            if (this.leaseCleanupValidator == null) {
                this.leaseCleanupValidator = DEFAULT_LEASE_CLEANUP_VALIDATOR;
            }
            if (this.shardSyncer == null) {
                this.shardSyncer = new KinesisShardSyncer(this.leaseCleanupValidator);
            }
            if (this.leaseSelector == null) {
                this.leaseSelector = DEFAULT_LEASE_SELECTOR;
            }
            if (this.leaseTaker == null) {
                this.leaseTaker = new LeaseTaker<KinesisClientLease>(this.leaseManager, this.leaseSelector, this.config.getWorkerIdentifier(), this.config.getFailoverTimeMillis()).withMaxLeasesForWorker(this.config.getMaxLeasesForWorker()).withMaxLeasesToStealAtOneTime(this.config.getMaxLeasesToStealAtOneTime());
            }
            if (this.leaseRenewer == null) {
                ExecutorService leaseRenewerThreadPool = LeaseCoordinator.getDefaultLeaseRenewalExecutorService(this.config.getMaxLeaseRenewalThreads());
                this.leaseRenewer = new LeaseRenewer<KinesisClientLease>(this.leaseManager, this.config.getWorkerIdentifier(), this.config.getFailoverTimeMillis(), leaseRenewerThreadPool);
            }
            if (this.leaderDecider == null) {
                this.leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(this.leaseManager, Executors.newSingleThreadScheduledExecutor(), 1);
            }
            return new Worker(this.config.getApplicationName(), this.recordProcessorFactory, this.config, new StreamConfig(this.kinesisProxy, this.config.getMaxRecords(), this.config.getIdleTimeBetweenReadsInMillis(), this.config.shouldCallProcessRecordsEvenForEmptyRecordList(), this.config.shouldValidateSequenceNumberBeforeCheckpointing(), this.config.getInitialPositionInStreamExtended()), this.config.getInitialPositionInStreamExtended(), this.config.getParentShardPollIntervalMillis(), this.config.getShardSyncIntervalMillis(), this.config.shouldCleanupLeasesUponShardCompletion(), null, new KinesisClientLibLeaseCoordinator(this.leaseManager, this.leaseTaker, this.leaseRenewer, this.config.getFailoverTimeMillis(), this.config.getEpsilonMillis(), this.config.getMaxLeasesForWorker(), this.config.getMaxLeasesToStealAtOneTime(), this.metricsFactory).withInitialLeaseTableReadCapacity(this.config.getInitialLeaseTableReadCapacity()).withInitialLeaseTableWriteCapacity(this.config.getInitialLeaseTableWriteCapacity()), this.execService, this.metricsFactory, this.config.getTaskBackoffTimeMillis(), this.config.getFailoverTimeMillis(), this.config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(), this.shardPrioritization, this.config.getRetryGetRecordsInSeconds(), this.config.getMaxGetRecordsThreadPool(), this.workerStateChangeListener, this.shardSyncer, this.leaderDecider, null);
        }

        <R, T extends AwsClientBuilder<T, R>> R createClient(T builder, AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, String endpointUrl, String region) {
            if (credentialsProvider != null) {
                builder.withCredentials(credentialsProvider);
            }
            if (clientConfiguration != null) {
                builder.withClientConfiguration(clientConfiguration);
            }
            if (StringUtils.isNotEmpty(endpointUrl)) {
                LOG.warn("Received configuration for endpoint as " + endpointUrl + ", and region as " + region + ".");
                builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpointUrl, region));
            } else if (StringUtils.isNotEmpty(region)) {
                LOG.warn("Received configuration for region as " + region + ".");
                builder.withRegion(region);
            } else {
                LOG.warn("No configuration received for endpoint and region, will default region to us-east-1");
                builder.withRegion(Regions.US_EAST_1);
            }
            return builder.build();
        }

        public Builder config(KinesisClientLibConfiguration config) {
            this.config = config;
            return this;
        }

        public Builder kinesisClient(AmazonKinesis kinesisClient) {
            this.kinesisClient = kinesisClient;
            return this;
        }

        public Builder dynamoDBClient(AmazonDynamoDB dynamoDBClient) {
            this.dynamoDBClient = dynamoDBClient;
            return this;
        }

        public Builder cloudWatchClient(AmazonCloudWatch cloudWatchClient) {
            this.cloudWatchClient = cloudWatchClient;
            return this;
        }

        public Builder metricsFactory(IMetricsFactory metricsFactory) {
            this.metricsFactory = metricsFactory;
            return this;
        }

        public Builder leaseManager(ILeaseManager<KinesisClientLease> leaseManager) {
            this.leaseManager = leaseManager;
            return this;
        }

        public Builder execService(ExecutorService execService) {
            this.execService = execService;
            return this;
        }

        public Builder shardPrioritization(ShardPrioritization shardPrioritization) {
            this.shardPrioritization = shardPrioritization;
            return this;
        }

        public Builder kinesisProxy(IKinesisProxy kinesisProxy) {
            this.kinesisProxy = kinesisProxy;
            return this;
        }

        public Builder workerStateChangeListener(WorkerStateChangeListener workerStateChangeListener) {
            this.workerStateChangeListener = workerStateChangeListener;
            return this;
        }

        public Builder leaseCleanupValidator(LeaseCleanupValidator leaseCleanupValidator) {
            this.leaseCleanupValidator = leaseCleanupValidator;
            return this;
        }

        public Builder leaseSelector(LeaseSelector<KinesisClientLease> leaseSelector) {
            this.leaseSelector = leaseSelector;
            return this;
        }

        public Builder leaderDecider(LeaderDecider leaderDecider) {
            this.leaderDecider = leaderDecider;
            return this;
        }

        public Builder leaseTaker(ILeaseTaker<KinesisClientLease> leaseTaker) {
            this.leaseTaker = leaseTaker;
            return this;
        }

        public Builder leaseRenewer(ILeaseRenewer<KinesisClientLease> leaseRenewer) {
            this.leaseRenewer = leaseRenewer;
            return this;
        }

        public Builder shardSyncer(ShardSyncer shardSyncer) {
            this.shardSyncer = shardSyncer;
            return this;
        }
    }

    static class WorkerThreadPoolExecutor
    extends ThreadPoolExecutor {
        private static final long DEFAULT_KEEP_ALIVE_TIME = 60L;

        WorkerThreadPoolExecutor(ThreadFactory threadFactory) {
            super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory);
        }
    }

    static class WorkerCWMetricsFactory
    extends CWMetricsFactory {
        WorkerCWMetricsFactory(AmazonCloudWatch cloudWatchClient, String namespace, long bufferTimeMillis, int maxQueueSize, MetricsLevel metricsLevel, Set<String> metricsEnabledDimensions) {
            super(cloudWatchClient, namespace, bufferTimeMillis, maxQueueSize, metricsLevel, metricsEnabledDimensions);
        }
    }

    private static class WorkerLog {
        private long reportIntervalMillis = TimeUnit.MINUTES.toMillis(1L);
        private long nextReportTime = System.currentTimeMillis() + this.reportIntervalMillis;
        private boolean infoReporting;

        private WorkerLog() {
        }

        public void debug(Object message, Throwable t) {
            LOG.debug(message, t);
        }

        public void info(Object message) {
            if (this.isInfoEnabled()) {
                LOG.info(message);
            }
        }

        public void infoForce(Object message) {
            LOG.info(message);
        }

        public void warn(Object message) {
            LOG.warn(message);
        }

        public void error(Object message, Throwable t) {
            LOG.error(message, t);
        }

        private boolean isInfoEnabled() {
            return this.infoReporting;
        }

        private void resetInfoLogging() {
            if (this.infoReporting) {
                if (LOG.isInfoEnabled()) {
                    this.infoReporting = false;
                    this.nextReportTime = System.currentTimeMillis() + this.reportIntervalMillis;
                }
            } else if (this.nextReportTime <= System.currentTimeMillis()) {
                this.infoReporting = true;
            }
        }
    }
}

