package org.apache.kafka.image.loader;

import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.loader.metrics.MetadataLoaderMetrics;
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.image.writer.ImageReWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.fault.FaultHandlerException;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.Snapshots;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/image/loader/MetadataLoader.class */
public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>, AutoCloseable {
    private static final String INITIALIZE_NEW_PUBLISHERS = "InitializeNewPublishers";
    private final Logger log;
    private final Time time;
    private final FaultHandler faultHandler;
    private final MetadataLoaderMetrics metrics;
    private final Supplier<OptionalLong> highWaterMarkAccessor;
    private final LinkedHashMap<String, MetadataPublisher> uninitializedPublishers;
    private final LinkedHashMap<String, MetadataPublisher> publishers;
    private boolean catchingUp;
    private LeaderAndEpoch currentLeaderAndEpoch;
    private MetadataImage image;
    private final MetadataBatchLoader batchLoader;
    private final KafkaEventQueue eventQueue;

    /* loaded from: input_file:org/apache/kafka/image/loader/MetadataLoader$Builder.class */
    public static class Builder {
        private int nodeId = -1;
        private String threadNamePrefix = "";
        private Time time = Time.SYSTEM;
        private LogContext logContext = null;
        private FaultHandler faultHandler = (str, th) -> {
            return new FaultHandlerException(str, th);
        };
        private MetadataLoaderMetrics metrics = null;
        private Supplier<OptionalLong> highWaterMarkAccessor = null;

        public Builder setNodeId(int i) {
            this.nodeId = i;
            return this;
        }

        public Builder setThreadNamePrefix(String str) {
            this.threadNamePrefix = str;
            return this;
        }

        public Builder setTime(Time time) {
            this.time = time;
            return this;
        }

        public Builder setFaultHandler(FaultHandler faultHandler) {
            this.faultHandler = faultHandler;
            return this;
        }

        public Builder setHighWaterMarkAccessor(Supplier<OptionalLong> supplier) {
            this.highWaterMarkAccessor = supplier;
            return this;
        }

        public Builder setMetrics(MetadataLoaderMetrics metadataLoaderMetrics) {
            this.metrics = metadataLoaderMetrics;
            return this;
        }

        public MetadataLoader build() {
            if (this.logContext == null) {
                this.logContext = new LogContext("[MetadataLoader id=" + this.nodeId + "] ");
            }
            if (this.highWaterMarkAccessor == null) {
                throw new RuntimeException("You must set the high water mark accessor.");
            }
            if (this.metrics == null) {
                this.metrics = new MetadataLoaderMetrics(Optional.empty(), l -> {
                }, num -> {
                }, new AtomicReference(MetadataProvenance.EMPTY));
            }
            return new MetadataLoader(this.time, this.logContext, this.nodeId, this.threadNamePrefix, this.faultHandler, this.metrics, this.highWaterMarkAccessor);
        }
    }

    /* loaded from: input_file:org/apache/kafka/image/loader/MetadataLoader$ShutdownEvent.class */
    class ShutdownEvent implements EventQueue.Event {
        ShutdownEvent() {
        }

        public void run() throws Exception {
            Iterator it = MetadataLoader.this.uninitializedPublishers.values().iterator();
            while (it.hasNext()) {
                MetadataLoader.this.closePublisher((MetadataPublisher) it.next());
                it.remove();
            }
            Iterator it2 = MetadataLoader.this.publishers.values().iterator();
            while (it2.hasNext()) {
                MetadataLoader.this.closePublisher((MetadataPublisher) it2.next());
                it2.remove();
            }
        }
    }

    private MetadataLoader(Time time, LogContext logContext, int i, String str, FaultHandler faultHandler, MetadataLoaderMetrics metadataLoaderMetrics, Supplier<OptionalLong> supplier) {
        this.catchingUp = true;
        this.currentLeaderAndEpoch = LeaderAndEpoch.UNKNOWN;
        this.log = logContext.logger(MetadataLoader.class);
        this.time = time;
        this.faultHandler = faultHandler;
        this.metrics = metadataLoaderMetrics;
        this.highWaterMarkAccessor = supplier;
        this.uninitializedPublishers = new LinkedHashMap<>();
        this.publishers = new LinkedHashMap<>();
        this.image = MetadataImage.EMPTY;
        this.batchLoader = new MetadataBatchLoader(logContext, time, faultHandler, (v1, v2, v3) -> {
            maybePublishMetadata(v1, v2, v3);
        });
        this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, str + "metadata-loader-", new ShutdownEvent());
    }

    MetadataLoaderMetrics metrics() {
        return this.metrics;
    }

    private boolean stillNeedToCatchUp(String str, long j) {
        if (!this.catchingUp) {
            this.log.trace("{}: we are not in the initial catching up state.", str);
            return false;
        }
        OptionalLong optionalLong = this.highWaterMarkAccessor.get();
        if (!optionalLong.isPresent()) {
            this.log.info("{}: the loader is still catching up because we still don't know the high water mark yet.", str);
            return true;
        }
        if (optionalLong.getAsLong() - 1 > j) {
            this.log.info("{}: The loader is still catching up because we have loaded up to offset " + j + ", but the high water mark is {}", str, Long.valueOf(optionalLong.getAsLong()));
            return true;
        }
        if (!this.batchLoader.hasSeenRecord()) {
            this.log.info("{}: The loader is still catching up because we have not loaded a controller record as of offset " + j + " and high water mark is {}", str, Long.valueOf(optionalLong.getAsLong()));
            return true;
        }
        this.log.info("{}: The loader finished catching up to the current high water mark of {}", str, Long.valueOf(optionalLong.getAsLong()));
        this.catchingUp = false;
        return false;
    }

    void scheduleInitializeNewPublishers(long j) {
        this.eventQueue.scheduleDeferred(INITIALIZE_NEW_PUBLISHERS, new EventQueue.EarliestDeadlineFunction(this.eventQueue.time().nanoseconds() + j), () -> {
            try {
                initializeNewPublishers();
            } catch (Throwable th) {
                this.faultHandler.handleFault("Unhandled error initializing new publishers", th);
            }
        });
    }

    void initializeNewPublishers() {
        if (this.uninitializedPublishers.isEmpty()) {
            this.log.debug("InitializeNewPublishers: nothing to do.");
            return;
        }
        if (stillNeedToCatchUp("initializeNewPublishers", this.image.highestOffsetAndEpoch().offset())) {
            this.log.debug("InitializeNewPublishers: unable to initialize new publisher(s) {} because we are still catching up with quorum metadata. Rescheduling.", uninitializedPublisherNames());
            scheduleInitializeNewPublishers(TimeUnit.MILLISECONDS.toNanos(100L));
            return;
        }
        this.log.debug("InitializeNewPublishers: setting up snapshot image for new publisher(s): {}", uninitializedPublisherNames());
        long nanoseconds = this.time.nanoseconds();
        MetadataDelta build = new MetadataDelta.Builder().setImage(MetadataImage.EMPTY).build();
        this.image.write(new ImageReWriter(build), new ImageWriterOptions.Builder().setMetadataVersion(this.image.features().metadataVersion()).build());
        SnapshotManifest snapshotManifest = new SnapshotManifest(this.image.provenance(), this.time.nanoseconds() - nanoseconds);
        Iterator<MetadataPublisher> it = this.uninitializedPublishers.values().iterator();
        while (it.hasNext()) {
            MetadataPublisher next = it.next();
            it.remove();
            try {
                this.log.info("InitializeNewPublishers: initializing {} with a snapshot at offset {}", next.name(), Long.valueOf(this.image.highestOffsetAndEpoch().offset()));
                next.onMetadataUpdate(build, this.image, snapshotManifest);
                next.onControllerChange(this.currentLeaderAndEpoch);
                this.publishers.put(next.name(), next);
            } catch (Throwable th) {
                this.faultHandler.handleFault("Unhandled error initializing " + next.name() + " with a snapshot at offset " + this.image.highestOffsetAndEpoch().offset(), th);
            }
        }
    }

    private String uninitializedPublisherNames() {
        return String.join(", ", this.uninitializedPublishers.keySet());
    }

    private void maybePublishMetadata(MetadataDelta metadataDelta, MetadataImage metadataImage, LoaderManifest loaderManifest) {
        this.image = metadataImage;
        if (stillNeedToCatchUp("maybePublishMetadata(" + loaderManifest.type().toString() + ")", loaderManifest.provenance().lastContainedOffset())) {
            return;
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("handleCommit: publishing new image with provenance {}.", metadataImage.provenance());
        }
        for (MetadataPublisher metadataPublisher : this.publishers.values()) {
            try {
                metadataPublisher.onMetadataUpdate(metadataDelta, metadataImage, loaderManifest);
            } catch (Throwable th) {
                this.faultHandler.handleFault("Unhandled error publishing the new metadata image ending at " + loaderManifest.provenance().lastContainedOffset() + " with publisher " + metadataPublisher.name(), th);
            }
        }
        this.metrics.updateLastAppliedImageProvenance(metadataImage.provenance());
        this.metrics.setCurrentMetadataVersion(metadataImage.features().metadataVersion());
        if (this.uninitializedPublishers.isEmpty()) {
            return;
        }
        scheduleInitializeNewPublishers(0L);
    }

    public void handleCommit(BatchReader<ApiMessageAndVersion> batchReader) {
        this.eventQueue.append(() -> {
            while (batchReader.hasNext()) {
                try {
                    try {
                        Batch<ApiMessageAndVersion> batch = (Batch) batchReader.next();
                        long loadBatch = this.batchLoader.loadBatch(batch, this.currentLeaderAndEpoch);
                        this.metrics.updateBatchSize(batch.records().size());
                        this.metrics.updateBatchProcessingTimeNs(loadBatch);
                    } catch (Throwable th) {
                        this.faultHandler.handleFault("Unhandled fault in MetadataLoader#handleCommit. Last image offset was " + this.image.offset(), th);
                        batchReader.close();
                        return;
                    }
                } catch (Throwable th2) {
                    batchReader.close();
                    throw th2;
                }
            }
            this.batchLoader.maybeFlushBatches(this.currentLeaderAndEpoch);
            batchReader.close();
        });
    }

    public void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> snapshotReader) {
        this.eventQueue.append(() -> {
            try {
                try {
                    long incrementHandleLoadSnapshotCount = this.metrics.incrementHandleLoadSnapshotCount();
                    String filenameFromSnapshotId = Snapshots.filenameFromSnapshotId(snapshotReader.snapshotId());
                    this.log.info("handleLoadSnapshot({}): incrementing HandleLoadSnapshotCount to {}.", filenameFromSnapshotId, Long.valueOf(incrementHandleLoadSnapshotCount));
                    MetadataDelta build = new MetadataDelta.Builder().setImage(this.image).build();
                    SnapshotManifest loadSnapshot = loadSnapshot(build, snapshotReader);
                    this.log.info("handleLoadSnapshot({}): generated a metadata delta between offset {} and this snapshot in {} us.", new Object[]{filenameFromSnapshotId, Long.valueOf(this.image.provenance().lastContainedOffset()), Long.valueOf(TimeUnit.NANOSECONDS.toMicros(loadSnapshot.elapsedNs()))});
                    MetadataImage apply = build.apply(loadSnapshot.provenance());
                    this.batchLoader.resetToImage(apply);
                    maybePublishMetadata(build, apply, loadSnapshot);
                    snapshotReader.close();
                } catch (Throwable th) {
                    this.faultHandler.handleFault("Unhandled fault in MetadataLoader#handleLoadSnapshot. Snapshot offset was " + snapshotReader.lastContainedLogOffset(), th);
                    snapshotReader.close();
                }
            } catch (Throwable th2) {
                snapshotReader.close();
                throw th2;
            }
        });
    }

    SnapshotManifest loadSnapshot(MetadataDelta metadataDelta, SnapshotReader<ApiMessageAndVersion> snapshotReader) {
        long nanoseconds = this.time.nanoseconds();
        int i = 0;
        while (snapshotReader.hasNext()) {
            Iterator it = ((Batch) snapshotReader.next()).records().iterator();
            while (it.hasNext()) {
                try {
                    metadataDelta.replay(((ApiMessageAndVersion) it.next()).message());
                } catch (Throwable th) {
                    this.faultHandler.handleFault("Error loading metadata log record " + i + " in snapshot at offset " + snapshotReader.lastContainedLogOffset(), th);
                }
                i++;
            }
        }
        metadataDelta.finishSnapshot();
        return new SnapshotManifest(new MetadataProvenance(snapshotReader.lastContainedLogOffset(), snapshotReader.lastContainedLogEpoch(), snapshotReader.lastContainedLogTimestamp()), this.time.nanoseconds() - nanoseconds);
    }

    public void handleLeaderChange(LeaderAndEpoch leaderAndEpoch) {
        this.eventQueue.append(() -> {
            this.currentLeaderAndEpoch = leaderAndEpoch;
            for (MetadataPublisher metadataPublisher : this.publishers.values()) {
                try {
                    metadataPublisher.onControllerChange(this.currentLeaderAndEpoch);
                } catch (Throwable th) {
                    this.faultHandler.handleFault("Unhandled error publishing the new leader change to " + this.currentLeaderAndEpoch + " with publisher " + metadataPublisher.name(), th);
                }
            }
            this.metrics.setCurrentControllerId(leaderAndEpoch.leaderId().orElseGet(() -> {
                return -1;
            }));
        });
    }

    public CompletableFuture<Void> installPublishers(List<? extends MetadataPublisher> list) {
        if (list.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.eventQueue.append(() -> {
            try {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    MetadataPublisher metadataPublisher = (MetadataPublisher) it.next();
                    MetadataPublisher metadataPublisher2 = this.publishers.get(metadataPublisher.name());
                    if (metadataPublisher2 == null) {
                        metadataPublisher2 = this.uninitializedPublishers.get(metadataPublisher.name());
                    }
                    if (metadataPublisher2 != null) {
                        if (metadataPublisher2 != metadataPublisher) {
                            throw this.faultHandler.handleFault("Attempted to install a new publisher named " + metadataPublisher.name() + ", but there is already a publisher with that name.");
                        }
                        throw this.faultHandler.handleFault("Attempted to install publisher " + metadataPublisher.name() + ", which is already installed.");
                    }
                }
                list.forEach(metadataPublisher3 -> {
                    this.uninitializedPublishers.put(metadataPublisher3.name(), metadataPublisher3);
                });
                scheduleInitializeNewPublishers(0L);
                completableFuture.complete(null);
            } catch (Throwable th) {
                completableFuture.completeExceptionally(this.faultHandler.handleFault("Unhandled fault in MetadataLoader#installPublishers", th));
            }
        });
        return completableFuture;
    }

    void waitForAllEventsToBeHandled() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        this.eventQueue.append(() -> {
            completableFuture.complete(null);
        });
        completableFuture.get();
    }

    public CompletableFuture<Void> removeAndClosePublisher(MetadataPublisher metadataPublisher) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.eventQueue.append(() -> {
            try {
                if (!this.publishers.remove(metadataPublisher.name(), metadataPublisher) && !this.uninitializedPublishers.remove(metadataPublisher.name(), metadataPublisher)) {
                    throw this.faultHandler.handleFault("Attempted to remove publisher " + metadataPublisher.name() + ", which is not installed.");
                }
                closePublisher(metadataPublisher);
                completableFuture.complete(null);
            } catch (Throwable th) {
                completableFuture.completeExceptionally(th);
            }
        });
        return completableFuture;
    }

    public long lastAppliedOffset() {
        return this.metrics.lastAppliedOffset();
    }

    public void beginShutdown() {
        this.eventQueue.beginShutdown("beginShutdown");
    }

    Time time() {
        return this.time;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closePublisher(MetadataPublisher metadataPublisher) {
        try {
            metadataPublisher.close();
        } catch (Throwable th) {
            this.faultHandler.handleFault("Got unexpected exception while closing publisher " + metadataPublisher.name(), th);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        beginShutdown();
        this.eventQueue.close();
    }
}
