package org.axonframework.eventsourcing.eventstore;

import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Spliterators;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.ObjectUtils;
import org.axonframework.common.jdbc.PersistenceExceptionResolver;
import org.axonframework.eventhandling.DomainEventData;
import org.axonframework.eventhandling.TrackedEventData;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine;
import org.axonframework.eventsourcing.snapshotting.SnapshotFilter;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.upcasting.event.EventUpcaster;

/* loaded from: input_file:org/axonframework/eventsourcing/eventstore/BatchingEventStorageEngine.class */
public abstract class BatchingEventStorageEngine extends AbstractEventStorageEngine {
    private static final int DEFAULT_BATCH_SIZE = 100;
    private final int batchSize;
    private final Predicate<List<? extends DomainEventData<?>>> finalAggregateBatchPredicate;

    /* loaded from: input_file:org/axonframework/eventsourcing/eventstore/BatchingEventStorageEngine$Builder.class */
    public static abstract class Builder extends AbstractEventStorageEngine.Builder {
        private int batchSize = BatchingEventStorageEngine.DEFAULT_BATCH_SIZE;
        private Predicate<List<? extends DomainEventData<?>>> finalAggregateBatchPredicate;

        @Override // org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.Builder
        public Builder snapshotSerializer(Serializer serializer) {
            super.snapshotSerializer(serializer);
            return this;
        }

        @Override // org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.Builder
        public Builder upcasterChain(EventUpcaster eventUpcaster) {
            super.upcasterChain(eventUpcaster);
            return this;
        }

        @Override // org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.Builder
        public Builder persistenceExceptionResolver(PersistenceExceptionResolver persistenceExceptionResolver) {
            super.persistenceExceptionResolver(persistenceExceptionResolver);
            return this;
        }

        @Override // org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.Builder
        public Builder eventSerializer(Serializer serializer) {
            super.eventSerializer(serializer);
            return this;
        }

        public Builder finalAggregateBatchPredicate(Predicate<List<? extends DomainEventData<?>>> predicate) {
            BuilderUtils.assertNonNull(predicate, "The finalAggregateBatchPredicate must not be null");
            this.finalAggregateBatchPredicate = predicate;
            return this;
        }

        @Override // org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.Builder
        @Deprecated
        public Builder snapshotFilter(Predicate<? super DomainEventData<?>> predicate) {
            super.snapshotFilter(predicate);
            return this;
        }

        @Override // org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.Builder
        public Builder snapshotFilter(SnapshotFilter snapshotFilter) {
            super.snapshotFilter(snapshotFilter);
            return this;
        }

        public Builder batchSize(int i) {
            BuilderUtils.assertThat(Integer.valueOf(i), num -> {
                return num.intValue() > 0;
            }, "The batchSize must be a positive number");
            this.batchSize = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.Builder
        public void validate() throws AxonConfigurationException {
            super.validate();
        }

        @Override // org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.Builder
        @Deprecated
        public /* bridge */ /* synthetic */ AbstractEventStorageEngine.Builder snapshotFilter(Predicate predicate) {
            return snapshotFilter((Predicate<? super DomainEventData<?>>) predicate);
        }
    }

    /* loaded from: input_file:org/axonframework/eventsourcing/eventstore/BatchingEventStorageEngine$EventStreamSpliterator.class */
    private static class EventStreamSpliterator<T> extends Spliterators.AbstractSpliterator<T> {
        private final Function<T, List<? extends T>> fetchFunction;
        private final Predicate<List<? extends T>> finalBatchPredicate;
        private Iterator<? extends T> iterator;
        private T lastItem;
        private boolean lastBatchFound;

        private EventStreamSpliterator(Function<T, List<? extends T>> function, Predicate<List<? extends T>> predicate) {
            super(Long.MAX_VALUE, 4369);
            this.fetchFunction = function;
            this.finalBatchPredicate = predicate;
        }

        /* JADX WARN: Type inference failed for: r2v3, types: [T, java.lang.Object] */
        @Override // java.util.Spliterator
        public boolean tryAdvance(Consumer<? super T> consumer) {
            Objects.requireNonNull(consumer);
            if (this.iterator == null || !this.iterator.hasNext()) {
                if (this.lastBatchFound) {
                    return false;
                }
                List<? extends T> apply = this.fetchFunction.apply(this.lastItem);
                this.lastBatchFound = this.finalBatchPredicate.test(apply);
                this.iterator = apply.iterator();
            }
            if (!this.iterator.hasNext()) {
                return false;
            }
            T next = this.iterator.next();
            this.lastItem = next;
            consumer.accept(next);
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public BatchingEventStorageEngine(Builder builder) {
        super(builder);
        this.batchSize = builder.batchSize;
        this.finalAggregateBatchPredicate = (Predicate) ObjectUtils.getOrDefault(builder.finalAggregateBatchPredicate, this::defaultFinalAggregateBatchPredicate);
    }

    protected abstract List<? extends TrackedEventData<?>> fetchTrackedEvents(TrackingToken trackingToken, int i);

    protected abstract List<? extends DomainEventData<?>> fetchDomainEvents(String str, long j, int i);

    protected boolean fetchForAggregateUntilEmpty() {
        return false;
    }

    private boolean defaultFinalAggregateBatchPredicate(List<? extends DomainEventData<?>> list) {
        return fetchForAggregateUntilEmpty() ? list.isEmpty() : list.size() < this.batchSize;
    }

    @Override // org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine
    protected Stream<? extends DomainEventData<?>> readEventData(String str, long j) {
        return StreamSupport.stream(new EventStreamSpliterator(domainEventData -> {
            return fetchDomainEvents(str, domainEventData == null ? j : domainEventData.getSequenceNumber() + 1, this.batchSize);
        }, this.finalAggregateBatchPredicate), false);
    }

    @Override // org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine
    protected Stream<? extends TrackedEventData<?>> readEventData(TrackingToken trackingToken, boolean z) {
        return StreamSupport.stream(new EventStreamSpliterator(trackedEventData -> {
            return fetchTrackedEvents(trackedEventData == null ? trackingToken : trackedEventData.trackingToken(), this.batchSize);
        }, (v0) -> {
            return v0.isEmpty();
        }), false);
    }

    public int batchSize() {
        return this.batchSize;
    }
}
