/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.execution.bulkinsert;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.ConsistentHashingNode;
import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.bulkinsert.RDDBucketIndexPartitioner;
import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
import org.apache.hudi.index.bucket.ConsistentBucketIndexUtils;
import org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;

public class RDDConsistentBucketBulkInsertPartitioner<T>
extends RDDBucketIndexPartitioner<T> {
    private Map<String, List<ConsistentHashingNode>> hashingChildrenNodes;

    public RDDConsistentBucketBulkInsertPartitioner(HoodieTable table) {
        this(table, Collections.emptyMap(), false);
        ValidationUtils.checkArgument(table.getIndex() instanceof HoodieSparkConsistentBucketIndex, "RDDConsistentBucketPartitioner can only be used together with consistent hashing bucket index");
    }

    public RDDConsistentBucketBulkInsertPartitioner(HoodieTable table, Map<String, String> strategyParams, boolean preserveHoodieMetadata) {
        this(table, strategyParams, preserveHoodieMetadata, null);
    }

    public RDDConsistentBucketBulkInsertPartitioner(HoodieTable table, Map<String, String> strategyParams, boolean preserveHoodieMetadata, Map<String, List<ConsistentHashingNode>> hashingChildrenNodes) {
        super(table, strategyParams.getOrDefault(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key(), null), preserveHoodieMetadata);
        ValidationUtils.checkArgument(table.getMetaClient().getTableType().equals((Object)HoodieTableType.MERGE_ON_READ), "Consistent hash bucket index doesn't support CoW table");
        if (hashingChildrenNodes != null) {
            ValidationUtils.checkArgument(hashingChildrenNodes.values().stream().flatMap(Collection::stream).noneMatch(n -> n.getTag() == ConsistentHashingNode.NodeTag.NORMAL), "children nodes should not be tagged as NORMAL");
            this.hashingChildrenNodes = hashingChildrenNodes;
        }
    }

    @Override
    public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) {
        final Map<String, ConsistentBucketIdentifier> partitionToIdentifier = this.initializeBucketIdentifier(records);
        final Map<String, Map<String, Integer>> partitionToFileIdPfxIdxMap = this.generateFileIdPfx(partitionToIdentifier);
        return this.doPartition(records, new Partitioner(){

            public int numPartitions() {
                return RDDConsistentBucketBulkInsertPartitioner.this.fileIdPfxList.size();
            }

            public int getPartition(Object key) {
                HoodieKey hoodieKey = (HoodieKey)key;
                String partition = hoodieKey.getPartitionPath();
                ConsistentHashingNode node = ((ConsistentBucketIdentifier)partitionToIdentifier.get(partition)).getBucket(hoodieKey, RDDConsistentBucketBulkInsertPartitioner.this.indexKeyFields);
                return (Integer)((Map)partitionToFileIdPfxIdxMap.get(partition)).get(node.getFileIdPrefix());
            }
        });
    }

    private ConsistentBucketIdentifier getBucketIdentifier(String partition) {
        HoodieSparkConsistentBucketIndex index = (HoodieSparkConsistentBucketIndex)this.table.getIndex();
        HoodieConsistentHashingMetadata metadata = ConsistentBucketIndexUtils.loadOrCreateMetadata(this.table, partition, index.getNumBuckets());
        if (this.hashingChildrenNodes != null) {
            ValidationUtils.checkState(this.hashingChildrenNodes.containsKey(partition), "children nodes should be provided for clustering");
            metadata.setChildrenNodes(this.hashingChildrenNodes.get(partition));
        } else {
            ValidationUtils.checkState(this.hashingChildrenNodes == null, "children nodes should not be provided for normal bulk insert");
        }
        return new ConsistentBucketIdentifier(metadata);
    }

    private Map<String, ConsistentBucketIdentifier> initializeBucketIdentifier(JavaRDD<HoodieRecord<T>> records) {
        if (this.hashingChildrenNodes != null) {
            return this.hashingChildrenNodes.keySet().stream().collect(Collectors.toMap(p -> p, this::getBucketIdentifier));
        }
        return records.map(HoodieRecord::getPartitionPath).distinct().collect().stream().collect(Collectors.toMap(p -> p, this::getBucketIdentifier));
    }

    private Map<String, Map<String, Integer>> generateFileIdPfx(Map<String, ConsistentBucketIdentifier> partitionToIdentifier) {
        Map<String, Map<String, Integer>> partitionToFileIdPfxIdxMap = ConsistentBucketIndexUtils.generatePartitionToFileIdPfxIdxMap(partitionToIdentifier);
        int count = 0;
        for (ConsistentBucketIdentifier identifier : partitionToIdentifier.values()) {
            this.fileIdPfxList.addAll(identifier.getNodes().stream().map(ConsistentHashingNode::getFileIdPrefix).collect(Collectors.toList()));
            HashMap<String, Integer> fileIdPfxToIdx = new HashMap<String, Integer>();
            for (ConsistentHashingNode node : identifier.getNodes()) {
                fileIdPfxToIdx.put(node.getFileIdPrefix(), count++);
            }
            if (identifier.getMetadata().isFirstCreated()) {
                this.doAppend.addAll(Collections.nCopies(identifier.getNodes().size(), false));
            } else {
                this.doAppend.addAll(identifier.getNodes().stream().map(n -> n.getTag() == ConsistentHashingNode.NodeTag.NORMAL).collect(Collectors.toList()));
            }
            partitionToFileIdPfxIdxMap.put(identifier.getMetadata().getPartitionPath(), fileIdPfxToIdx);
        }
        ValidationUtils.checkState(this.fileIdPfxList.size() == partitionToIdentifier.values().stream().mapToInt(ConsistentBucketIdentifier::getNumBuckets).sum(), "Error state after constructing fileId & idx mapping");
        return partitionToFileIdPfxIdxMap;
    }

    @Override
    public Option<WriteHandleFactory> getWriteHandleFactory(final int idx) {
        return super.getWriteHandleFactory(idx).map(writeHandleFactory -> new WriteHandleFactory((WriteHandleFactory)writeHandleFactory){
            final /* synthetic */ WriteHandleFactory val$writeHandleFactory;
            {
                this.val$writeHandleFactory = writeHandleFactory;
            }

            public HoodieWriteHandle create(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable, String partitionPath, String fileIdPrefix, TaskContextSupplier taskContextSupplier) {
                ValidationUtils.checkArgument((Boolean)RDDConsistentBucketBulkInsertPartitioner.this.doAppend.get(idx) == false, "Consistent Hashing bulk_insert only support write to new file group");
                return this.val$writeHandleFactory.create(config, commitTime, hoodieTable, partitionPath, fileIdPrefix, taskContextSupplier);
            }
        });
    }
}

