package org.apache.druid.indexing.kafka;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SequenceMetadata;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.utils.CollectionUtils;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.class */
public class IncrementalPublishingKafkaIndexTaskRunner extends SeekableStreamIndexTaskRunner<Integer, Long, KafkaRecordEntity> {
    private static final EmittingLogger log = new EmittingLogger(IncrementalPublishingKafkaIndexTaskRunner.class);
    private final KafkaIndexTask task;

    /* JADX INFO: Access modifiers changed from: package-private */
    public IncrementalPublishingKafkaIndexTaskRunner(KafkaIndexTask kafkaIndexTask, @Nullable InputRowParser<ByteBuffer> inputRowParser, AuthorizerMapper authorizerMapper, LockGranularity lockGranularity) {
        super(kafkaIndexTask, inputRowParser, authorizerMapper, lockGranularity);
        this.task = kafkaIndexTask;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Long getNextStartOffset(@NotNull Long l) {
        return Long.valueOf(l.longValue() + 1);
    }

    @Nonnull
    protected List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> getRecords(RecordSupplier<Integer, Long, KafkaRecordEntity> recordSupplier, TaskToolbox taskToolbox) throws Exception {
        try {
            return recordSupplier.poll(this.task.m2getIOConfig().getPollTimeout());
        } catch (OffsetOutOfRangeException e) {
            log.warn("OffsetOutOfRangeException with message [%s]", new Object[]{e.getMessage()});
            possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), recordSupplier, taskToolbox);
            return Collections.emptyList();
        }
    }

    protected SeekableStreamEndSequenceNumbers<Integer, Long> deserializePartitionsFromMetadata(ObjectMapper objectMapper, Object obj) {
        return (SeekableStreamEndSequenceNumbers) objectMapper.convertValue(obj, objectMapper.getTypeFactory().constructParametrizedType(SeekableStreamEndSequenceNumbers.class, SeekableStreamEndSequenceNumbers.class, new Class[]{Integer.class, Long.class}));
    }

    private void possiblyResetOffsetsOrWait(Map<TopicPartition, Long> map, RecordSupplier<Integer, Long, KafkaRecordEntity> recordSupplier, TaskToolbox taskToolbox) throws InterruptedException, IOException {
        HashMap hashMap = new HashMap();
        boolean z = false;
        if (this.task.m3getTuningConfig().isResetOffsetAutomatically()) {
            for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
                TopicPartition key = entry.getKey();
                long longValue = entry.getValue().longValue();
                StreamPartition of = StreamPartition.of(key.topic(), Integer.valueOf(key.partition()));
                Long l = (Long) recordSupplier.getEarliestSequenceNumber(of);
                if (l == null) {
                    throw new ISE("got null sequence number for partition[%s] when fetching from kafka!", new Object[]{Integer.valueOf(key.partition())});
                }
                recordSupplier.seek(of, Long.valueOf(longValue));
                if (l.longValue() > longValue) {
                    z = true;
                    hashMap.put(key, Long.valueOf(longValue));
                }
            }
        }
        if (z) {
            sendResetRequestAndWait(CollectionUtils.mapKeys(hashMap, topicPartition -> {
                return StreamPartition.of(topicPartition.topic(), Integer.valueOf(topicPartition.partition()));
            }), taskToolbox);
            return;
        }
        log.warn("Retrying in %dms", new Object[]{Long.valueOf(this.task.getPollRetryMs())});
        this.pollRetryLock.lockInterruptibly();
        try {
            long nanos = TimeUnit.MILLISECONDS.toNanos(this.task.getPollRetryMs());
            while (nanos > 0) {
                if (this.pauseRequested || this.stopRequested.get()) {
                    break;
                } else {
                    nanos = this.isAwaitingRetry.awaitNanos(nanos);
                }
            }
        } finally {
            this.pollRetryLock.unlock();
        }
    }

    protected SeekableStreamDataSourceMetadata<Integer, Long> createDataSourceMetadata(SeekableStreamSequenceNumbers<Integer, Long> seekableStreamSequenceNumbers) {
        return new KafkaDataSourceMetadata(seekableStreamSequenceNumbers);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public OrderedSequenceNumber<Long> createSequenceNumber(Long l) {
        return KafkaSequenceNumber.of(l);
    }

    protected void possiblyResetDataSourceMetadata(TaskToolbox taskToolbox, RecordSupplier<Integer, Long, KafkaRecordEntity> recordSupplier, Set<StreamPartition<Integer>> set) {
    }

    protected boolean isEndOffsetExclusive() {
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isEndOfShard(Long l) {
        return false;
    }

    public TypeReference<List<SequenceMetadata<Integer, Long>>> getSequenceMetadataTypeReference() {
        return new TypeReference<List<SequenceMetadata<Integer, Long>>>() { // from class: org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner.1
        };
    }

    @Nullable
    protected TreeMap<Integer, Map<Integer, Long>> getCheckPointsFromContext(TaskToolbox taskToolbox, String str) throws IOException {
        if (str == null) {
            return null;
        }
        log.debug("Got checkpoints from task context[%s].", new Object[]{str});
        return (TreeMap) taskToolbox.getJsonMapper().readValue(str, new TypeReference<TreeMap<Integer, Map<Integer, Long>>>() { // from class: org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner.2
        });
    }
}
