/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.listener;

import java.util.Collection;
import java.util.function.BiConsumer;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConsumerRecordRecoverer;
import org.springframework.kafka.listener.ErrorHandlerAdapter;
import org.springframework.kafka.listener.ErrorHandlingUtils;
import org.springframework.kafka.listener.KafkaExceptionLogLevelAware;
import org.springframework.kafka.listener.ListenerInvokingBatchErrorHandler;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler;
import org.springframework.lang.Nullable;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;

class FallbackBatchErrorHandler
extends KafkaExceptionLogLevelAware
implements ListenerInvokingBatchErrorHandler {
    private final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass()));
    private final BackOff backOff;
    private final BiConsumer<ConsumerRecords<?, ?>, Exception> recoverer;
    private final CommonErrorHandler seeker = new ErrorHandlerAdapter(new SeekToCurrentBatchErrorHandler());
    private boolean ackAfterHandle = true;
    private final ThreadLocal<Boolean> retrying = ThreadLocal.withInitial(() -> false);

    FallbackBatchErrorHandler() {
        this((BackOff)new FixedBackOff(), null);
    }

    FallbackBatchErrorHandler(BackOff backOff, @Nullable ConsumerRecordRecoverer recoverer) {
        this.backOff = backOff;
        this.recoverer = (crs, ex) -> {
            if (recoverer == null) {
                this.logger.error((Throwable)ex, () -> "Records discarded: " + ErrorHandlingUtils.recordsToString(crs));
            } else {
                crs.spliterator().forEachRemaining(rec -> recoverer.accept(rec, ex));
            }
        };
    }

    @Override
    public boolean isAckAfterHandle() {
        return this.ackAfterHandle;
    }

    @Override
    public void setAckAfterHandle(boolean ackAfterHandle) {
        this.ackAfterHandle = ackAfterHandle;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> records, Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
        if (records == null || records.count() == 0) {
            this.logger.error((Throwable)thrownException, (CharSequence)"Called with no records; consumer exception");
            return;
        }
        this.retrying.set(true);
        try {
            ErrorHandlingUtils.retryBatch(thrownException, records, consumer, container, invokeListener, this.backOff, this.seeker, this.recoverer, this.logger, this.getLogLevel());
        }
        finally {
            this.retrying.set(false);
        }
    }

    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions, Runnable publishPause) {
        if (this.retrying.get().booleanValue()) {
            consumer.pause((Collection)consumer.assignment());
            publishPause.run();
        }
    }
}

