package com.github.jcustenborder.kafka.connect.utils.data;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.RateLimiter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/jcustenborder/kafka/connect/utils/data/SourceRecordDequeImpl.class */
class SourceRecordDequeImpl extends ConcurrentLinkedDeque<SourceRecord> implements SourceRecordDeque {
    private static final Logger log = LoggerFactory.getLogger(SourceRecordDequeImpl.class);
    private final Time time;
    private final int maximumCapacity;
    private final int batchSize;
    private final int emptyWaitMs;
    private final int maximumCapacityWaitMs;
    private final int maximumCapacityTimeoutMs;
    private final RateLimiter writeRateLimit;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SourceRecordDequeImpl(Time time, int i, int i2, int i3, int i4, int i5, RateLimiter rateLimiter) {
        this.time = time;
        this.batchSize = i2;
        this.maximumCapacity = i;
        this.emptyWaitMs = i3;
        this.maximumCapacityWaitMs = i4;
        this.maximumCapacityTimeoutMs = i5;
        this.writeRateLimit = rateLimiter;
    }

    private void waitForCapacity() {
        waitForCapacity(1);
    }

    private void waitForCapacity(int i) {
        if (null != this.writeRateLimit) {
            this.writeRateLimit.acquire(i);
        }
        if (size() < this.maximumCapacity) {
            return;
        }
        long milliseconds = this.time.milliseconds();
        long j = 0;
        while (true) {
            long j2 = j;
            if (size() < this.maximumCapacity) {
                return;
            }
            if (j2 > this.maximumCapacityTimeoutMs) {
                throw new TimeoutException(String.format("Timeout of %s ms exceeded while waiting for Deque to be drained below %s", Integer.valueOf(this.maximumCapacityTimeoutMs), Integer.valueOf(this.maximumCapacity)));
            }
            this.time.sleep(this.maximumCapacityWaitMs);
            j = this.time.milliseconds() - milliseconds;
        }
    }

    @Override // java.util.concurrent.ConcurrentLinkedDeque, java.util.AbstractCollection, java.util.Collection, java.util.Deque, java.util.Queue
    public boolean add(SourceRecord sourceRecord) {
        waitForCapacity();
        return super.add((SourceRecordDequeImpl) sourceRecord);
    }

    @Override // java.util.concurrent.ConcurrentLinkedDeque, java.util.AbstractCollection, java.util.Collection, java.util.Deque
    public boolean addAll(Collection<? extends SourceRecord> collection) {
        waitForCapacity(collection.size());
        return super.addAll(collection);
    }

    @Override // java.util.concurrent.ConcurrentLinkedDeque, java.util.Deque
    public void addFirst(SourceRecord sourceRecord) {
        waitForCapacity();
        super.addFirst((SourceRecordDequeImpl) sourceRecord);
    }

    @Override // java.util.concurrent.ConcurrentLinkedDeque, java.util.Deque
    public void addLast(SourceRecord sourceRecord) {
        waitForCapacity();
        super.addLast((SourceRecordDequeImpl) sourceRecord);
    }

    @Override // com.github.jcustenborder.kafka.connect.utils.data.SourceRecordDeque
    public List<SourceRecord> newList() {
        return new ArrayList(this.batchSize);
    }

    @Override // com.github.jcustenborder.kafka.connect.utils.data.SourceRecordDeque
    public List<SourceRecord> drain() {
        List<SourceRecord> newList = newList();
        drain(newList);
        return newList;
    }

    @Override // com.github.jcustenborder.kafka.connect.utils.data.SourceRecordDeque
    public List<SourceRecord> getBatch() {
        return getBatch(this.emptyWaitMs);
    }

    @Override // com.github.jcustenborder.kafka.connect.utils.data.SourceRecordDeque
    public List<SourceRecord> getBatch(int i) {
        SourceRecord poll;
        Preconditions.checkArgument(i >= 0, "emptyWaitMs should be greater than or equal to 0.");
        List<SourceRecord> newList = newList();
        int i2 = 0;
        log.trace("drain() - Attempting to draining {} record(s).", Integer.valueOf(this.batchSize));
        while (i2 <= this.batchSize && null != (poll = poll())) {
            newList.add(poll);
            i2++;
        }
        if (i2 != 0) {
            return newList;
        }
        if (i <= 0) {
            log.trace("drain() - Found no records.", Integer.valueOf(i));
            return null;
        }
        log.trace("drain() - Found no records, sleeping {} ms.", Integer.valueOf(i));
        this.time.sleep(i);
        return null;
    }

    @Override // com.github.jcustenborder.kafka.connect.utils.data.SourceRecordDeque
    public boolean drain(List<SourceRecord> list) {
        return drain(list, this.emptyWaitMs);
    }

    @Override // com.github.jcustenborder.kafka.connect.utils.data.SourceRecordDeque
    public boolean drain(List<SourceRecord> list, int i) {
        SourceRecord poll;
        Preconditions.checkNotNull(list, "records cannot be null");
        Preconditions.checkArgument(i >= 0, "emptyWaitMs should be greater than or equal to 0.");
        int i2 = 0;
        log.trace("drain() - Attempting to draining {} record(s).", Integer.valueOf(this.batchSize));
        while (i2 <= this.batchSize && null != (poll = poll())) {
            list.add(poll);
            i2++;
        }
        if (i2 == 0 && i > 0) {
            log.trace("drain() - Found no records, sleeping {} ms.", Integer.valueOf(i));
            this.time.sleep(i);
        }
        return i2 > 0;
    }
}
