package org.apache.iceberg.flink.sink.shuffle;

import java.util.Comparator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.datasketches.sampling.ReservoirItemsSketch;
import org.apache.datasketches.sampling.ReservoirItemsUnion;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortKey;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.SortOrderComparators;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iceberg/flink/sink/shuffle/AggregatedStatisticsTracker.class */
class AggregatedStatisticsTracker {
    private static final Logger LOG = LoggerFactory.getLogger(AggregatedStatisticsTracker.class);
    private final String operatorName;
    private final int parallelism;
    private final TypeSerializer<DataStatistics> statisticsSerializer;
    private final int downstreamParallelism;
    private final StatisticsType statisticsType;
    private final int switchToSketchThreshold;
    private final Comparator<StructLike> comparator;
    private final NavigableMap<Long, Aggregation> aggregationsPerCheckpoint = Maps.newTreeMap();
    private AggregatedStatistics completedStatistics;

    /* loaded from: input_file:org/apache/iceberg/flink/sink/shuffle/AggregatedStatisticsTracker$Aggregation.class */
    static class Aggregation {
        private static final Logger LOG = LoggerFactory.getLogger(Aggregation.class);
        private final Set<Integer> subtaskSet = Sets.newHashSet();
        private final int parallelism;
        private final int downstreamParallelism;
        private final int switchToSketchThreshold;
        private final Comparator<StructLike> comparator;
        private final StatisticsType configuredType;
        private StatisticsType currentType;
        private Map<SortKey, Long> mapStatistics;
        private ReservoirItemsUnion<SortKey> sketchStatistics;

        Aggregation(int i, int i2, int i3, Comparator<StructLike> comparator, StatisticsType statisticsType, StatisticsType statisticsType2) {
            this.parallelism = i;
            this.downstreamParallelism = i2;
            this.switchToSketchThreshold = i3;
            this.comparator = comparator;
            this.configuredType = statisticsType;
            this.currentType = statisticsType2;
            if (statisticsType2 == StatisticsType.Map) {
                this.mapStatistics = Maps.newHashMap();
                this.sketchStatistics = null;
            } else {
                this.mapStatistics = null;
                this.sketchStatistics = ReservoirItemsUnion.newInstance(SketchUtil.determineCoordinatorReservoirSize(i2));
            }
        }

        @VisibleForTesting
        Set<Integer> subtaskSet() {
            return this.subtaskSet;
        }

        @VisibleForTesting
        StatisticsType currentType() {
            return this.currentType;
        }

        @VisibleForTesting
        Map<SortKey, Long> mapStatistics() {
            return this.mapStatistics;
        }

        @VisibleForTesting
        ReservoirItemsUnion<SortKey> sketchStatistics() {
            return this.sketchStatistics;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isComplete() {
            return this.subtaskSet.size() == this.parallelism;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean merge(int i, DataStatistics dataStatistics) {
            if (this.subtaskSet.contains(Integer.valueOf(i))) {
                return false;
            }
            this.subtaskSet.add(Integer.valueOf(i));
            merge(dataStatistics);
            return true;
        }

        private void merge(DataStatistics dataStatistics) {
            if (dataStatistics.type() != StatisticsType.Map) {
                ReservoirItemsSketch reservoirItemsSketch = (ReservoirItemsSketch) dataStatistics.result();
                if (this.currentType == StatisticsType.Map) {
                    convertCoordinatorToSketch();
                }
                this.sketchStatistics.update(reservoirItemsSketch);
                return;
            }
            Map map = (Map) dataStatistics.result();
            if (this.currentType != StatisticsType.Map) {
                ReservoirItemsSketch newInstance = ReservoirItemsSketch.newInstance(SketchUtil.determineOperatorReservoirSize(this.parallelism, this.downstreamParallelism));
                Objects.requireNonNull(newInstance);
                SketchUtil.convertMapToSketch(map, (v1) -> {
                    r1.update(v1);
                });
                this.sketchStatistics.update(newInstance);
                return;
            }
            map.forEach((sortKey, l) -> {
                this.mapStatistics.merge(sortKey, l, (v0, v1) -> {
                    return Long.sum(v0, v1);
                });
            });
            if (this.configuredType != StatisticsType.Auto || this.mapStatistics.size() <= this.switchToSketchThreshold) {
                return;
            }
            convertCoordinatorToSketch();
        }

        private void convertCoordinatorToSketch() {
            this.sketchStatistics = ReservoirItemsUnion.newInstance(SketchUtil.determineCoordinatorReservoirSize(this.downstreamParallelism));
            Map<SortKey, Long> map = this.mapStatistics;
            ReservoirItemsUnion<SortKey> reservoirItemsUnion = this.sketchStatistics;
            Objects.requireNonNull(reservoirItemsUnion);
            SketchUtil.convertMapToSketch(map, (v1) -> {
                r1.update(v1);
            });
            this.currentType = StatisticsType.Sketch;
            this.mapStatistics = null;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public AggregatedStatistics completedStatistics(long j) {
            if (this.currentType == StatisticsType.Map) {
                LOG.info("Completed map statistics aggregation with {} keys", Integer.valueOf(this.mapStatistics.size()));
                return AggregatedStatistics.fromKeyFrequency(j, this.mapStatistics);
            }
            ReservoirItemsSketch result = this.sketchStatistics.getResult();
            LOG.info("Completed sketch statistics aggregation: reservoir size = {}, number of items seen = {}, number of samples = {}", new Object[]{Integer.valueOf(result.getK()), Long.valueOf(result.getN()), Integer.valueOf(result.getNumSamples())});
            return AggregatedStatistics.fromRangeBounds(j, SketchUtil.rangeBounds(this.downstreamParallelism, this.comparator, result));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AggregatedStatisticsTracker(String str, int i, Schema schema, SortOrder sortOrder, int i2, StatisticsType statisticsType, int i3, @Nullable AggregatedStatistics aggregatedStatistics) {
        this.operatorName = str;
        this.parallelism = i;
        this.statisticsSerializer = new DataStatisticsSerializer(new SortKeySerializer(schema, sortOrder));
        this.downstreamParallelism = i2;
        this.statisticsType = statisticsType;
        this.switchToSketchThreshold = i3;
        this.completedStatistics = aggregatedStatistics;
        this.comparator = SortOrderComparators.forSchema(schema, sortOrder);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AggregatedStatistics updateAndCheckCompletion(int i, StatisticsEvent statisticsEvent) {
        long checkpointId = statisticsEvent.checkpointId();
        LOG.debug("Handling statistics event from subtask {} of operator {} for checkpoint {}", new Object[]{Integer.valueOf(i), this.operatorName, Long.valueOf(checkpointId)});
        if (this.completedStatistics != null && this.completedStatistics.checkpointId() > checkpointId) {
            LOG.info("Ignore stale statistics event from operator {} subtask {} for older checkpoint {}. Was expecting data statistics from checkpoint higher than {}", new Object[]{this.operatorName, Integer.valueOf(i), Long.valueOf(checkpointId), Long.valueOf(this.completedStatistics.checkpointId())});
            return null;
        }
        Aggregation aggregation = (Aggregation) this.aggregationsPerCheckpoint.computeIfAbsent(Long.valueOf(checkpointId), l -> {
            return new Aggregation(this.parallelism, this.downstreamParallelism, this.switchToSketchThreshold, this.comparator, this.statisticsType, StatisticsUtil.collectType(this.statisticsType, this.completedStatistics));
        });
        if (!aggregation.merge(i, StatisticsUtil.deserializeDataStatistics(statisticsEvent.statisticsBytes(), this.statisticsSerializer))) {
            LOG.debug("Ignore duplicate data statistics from operator {} subtask {} for checkpoint {}.", new Object[]{this.operatorName, Integer.valueOf(i), Long.valueOf(checkpointId)});
        }
        if (!aggregation.isComplete()) {
            return null;
        }
        this.completedStatistics = aggregation.completedStatistics(checkpointId);
        this.aggregationsPerCheckpoint.headMap(Long.valueOf(checkpointId), true).clear();
        return this.completedStatistics;
    }

    @VisibleForTesting
    NavigableMap<Long, Aggregation> aggregationsPerCheckpoint() {
        return this.aggregationsPerCheckpoint;
    }
}
