package com.mulesoft.mq.restclient.internal;

import com.mulesoft.mq.restclient.api.AnypointMqMessage;
import com.mulesoft.mq.restclient.api.CourierObserver;
import com.mulesoft.mq.restclient.api.Destination;
import com.mulesoft.mq.restclient.api.exception.ResourceNotFoundException;
import com.mulesoft.mq.restclient.api.exception.RestException;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;

/* loaded from: input_file:com/mulesoft/mq/restclient/internal/ScheduledPrefetcher.class */
public class ScheduledPrefetcher implements Prefetcher {
    private static final Logger logger = LoggerFactory.getLogger(ScheduledPrefetcher.class);
    private static final int DEFAULT_RETRIEVE_PERIOD = 5000;
    private final BlockingQueue<Subscriber<? super AnypointMqMessage>> waitingSubscribers;
    private final ScheduledExecutorService retriever;
    private final Destination destination;
    private final int batchSize;
    private final long poolingTime;
    private final long lockTimeToLive;
    private BufferedQueue bufferedQueue;
    private MessagePreserver preserver;
    private long retrievePeriod;
    private RestException invalidPrefetcherException;

    public ScheduledPrefetcher(Destination destination, int i, long j, long j2) {
        this(destination, i, j, j2, 5000L, null);
    }

    public ScheduledPrefetcher(Destination destination, int i, long j, long j2, long j3, MessagePreserver messagePreserver) {
        this.waitingSubscribers = new LinkedBlockingQueue();
        this.destination = destination;
        this.batchSize = i;
        this.poolingTime = j;
        this.lockTimeToLive = j2;
        this.bufferedQueue = new SimpleBufferedQueue();
        this.retriever = createExecutorService();
        this.retrievePeriod = j3;
        this.preserver = messagePreserver;
        this.retriever.scheduleAtFixedRate(new Runnable() { // from class: com.mulesoft.mq.restclient.internal.ScheduledPrefetcher.1
            @Override // java.lang.Runnable
            public void run() {
                ScheduledPrefetcher.this.retrieveMessages();
            }
        }, 0L, j3, TimeUnit.MILLISECONDS);
    }

    protected ScheduledExecutorService createExecutorService() {
        return Executors.newSingleThreadScheduledExecutor();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void retrieveMessages() {
        if (this.bufferedQueue.size() < 3 * this.batchSize) {
            logger.debug("Retrieving messages...");
            this.destination.receive(this.batchSize, this.poolingTime, this.lockTimeToLive + this.retrievePeriod).subscribe(new CourierObserver<List<AnypointMqMessage>>() { // from class: com.mulesoft.mq.restclient.internal.ScheduledPrefetcher.2
                @Override // com.mulesoft.mq.restclient.api.CourierObserver
                public void onSuccess(List<AnypointMqMessage> list) {
                    if (list == null || list.size() <= 0) {
                        return;
                    }
                    Subscriber subscriber = null;
                    for (AnypointMqMessage anypointMqMessage : list) {
                        subscriber = (Subscriber) ScheduledPrefetcher.this.waitingSubscribers.poll();
                        if (subscriber != null) {
                            ScheduledPrefetcher.this.dispatchMessageToSubscriber(subscriber, anypointMqMessage);
                        } else {
                            ScheduledPrefetcher.this.bufferedQueue.add(anypointMqMessage);
                            if (ScheduledPrefetcher.this.preserver != null) {
                                ScheduledPrefetcher.this.preserver.add(anypointMqMessage, ScheduledPrefetcher.this.lockTimeToLive);
                            }
                        }
                    }
                    if (subscriber != null) {
                        ScheduledPrefetcher.this.retrieveMessages();
                    }
                }

                @Override // com.mulesoft.mq.restclient.api.CourierObserver
                public void onError(Throwable th) {
                    if (AbstractCourierRestClient.isTimeout(th)) {
                        ScheduledPrefetcher.logger.debug("Timeout while retrieving messages.");
                        return;
                    }
                    if (!(th instanceof ResourceNotFoundException)) {
                        ScheduledPrefetcher.logger.error("Can not retrieve messages: {}.", MessageUtils.getCompleteMessage(th));
                        ScheduledPrefetcher.logger.debug("Can not retrieve messages.", th);
                    } else {
                        ScheduledPrefetcher.logger.error("Destination not found: {}. Shutting down scheduler prefetcher...", ScheduledPrefetcher.this.destination.getName());
                        ScheduledPrefetcher.this.stop();
                        ScheduledPrefetcher.this.raiseErrorOnSubscribers((ResourceNotFoundException) th);
                    }
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void raiseErrorOnSubscribers(ResourceNotFoundException resourceNotFoundException) {
        this.invalidPrefetcherException = resourceNotFoundException;
        while (true) {
            Subscriber<? super AnypointMqMessage> poll = this.waitingSubscribers.poll();
            if (poll == null) {
                return;
            } else {
                poll.onError(resourceNotFoundException);
            }
        }
    }

    @Override // com.mulesoft.mq.restclient.internal.Prefetcher
    public Observable<AnypointMqMessage> get() {
        return Observable.create(new Observable.OnSubscribe<AnypointMqMessage>() { // from class: com.mulesoft.mq.restclient.internal.ScheduledPrefetcher.3
            public void call(Subscriber<? super AnypointMqMessage> subscriber) {
                AnypointMqMessage take;
                if (ScheduledPrefetcher.this.invalidPrefetcherException != null) {
                    subscriber.onError(ScheduledPrefetcher.this.invalidPrefetcherException);
                    return;
                }
                do {
                    take = ScheduledPrefetcher.this.bufferedQueue.take();
                    if (take == null) {
                        break;
                    }
                } while (ScheduledPrefetcher.this.isExpired(take));
                if (take != null) {
                    ScheduledPrefetcher.this.dispatchMessageToSubscriber(subscriber, take);
                } else {
                    ScheduledPrefetcher.this.waitingSubscribers.offer(subscriber);
                    ScheduledPrefetcher.this.retrieveMessages();
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void dispatchMessageToSubscriber(Subscriber<? super AnypointMqMessage> subscriber, AnypointMqMessage anypointMqMessage) {
        try {
            subscriber.onStart();
            subscriber.onNext(anypointMqMessage);
            if (this.preserver != null) {
                this.preserver.remove(anypointMqMessage.getId());
            }
        } finally {
            subscriber.onCompleted();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isExpired(AnypointMqMessage anypointMqMessage) {
        return false;
    }

    public void stop() {
        if (this.preserver != null) {
            while (true) {
                AnypointMqMessage take = this.bufferedQueue.take();
                if (take == null) {
                    break;
                } else {
                    this.preserver.remove(take.getId());
                }
            }
        }
        this.bufferedQueue.clear();
        ExecutorUtils.stopExecutorService(this.retriever);
    }
}
