/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.partitioner;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.sink.partitioner.profile.WriteProfile;
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hudi.table.action.commit.SmallFile;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BucketAssigner
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(BucketAssigner.class);
    private final int taskID;
    private final int maxParallelism;
    private final int numTasks;
    private final HashMap<String, BucketInfo> bucketInfoMap;
    protected final HoodieWriteConfig config;
    private final WriteProfile writeProfile;
    private final Map<String, SmallFileAssign> smallFileAssignMap;
    private final Map<String, NewFileAssignState> newFileAssignStates;
    private int accCkp = 0;

    public BucketAssigner(int taskID, int maxParallelism, int numTasks, WriteProfile profile, HoodieWriteConfig config) {
        this.taskID = taskID;
        this.maxParallelism = maxParallelism;
        this.numTasks = numTasks;
        this.config = config;
        this.writeProfile = profile;
        this.bucketInfoMap = new HashMap();
        this.smallFileAssignMap = new HashMap<String, SmallFileAssign>();
        this.newFileAssignStates = new HashMap<String, NewFileAssignState>();
    }

    public void reset() {
        this.bucketInfoMap.clear();
    }

    public BucketInfo addUpdate(String partitionPath, String fileIdHint) {
        String key = StreamerUtil.generateBucketKey(partitionPath, fileIdHint);
        if (!this.bucketInfoMap.containsKey(key)) {
            BucketInfo bucketInfo = new BucketInfo(BucketType.UPDATE, fileIdHint, partitionPath);
            this.bucketInfoMap.put(key, bucketInfo);
        }
        return this.bucketInfoMap.get(key);
    }

    public BucketInfo addInsert(String partitionPath) {
        NewFileAssignState newFileAssignState;
        SmallFileAssign smallFileAssign = this.getSmallFileAssign(partitionPath);
        if (smallFileAssign != null && smallFileAssign.assign()) {
            return new BucketInfo(BucketType.UPDATE, smallFileAssign.getFileId(), partitionPath);
        }
        if (this.newFileAssignStates.containsKey(partitionPath) && (newFileAssignState = this.newFileAssignStates.get(partitionPath)).canAssign()) {
            newFileAssignState.assign();
            String key = StreamerUtil.generateBucketKey(partitionPath, newFileAssignState.fileId);
            if (this.bucketInfoMap.containsKey(key)) {
                return this.bucketInfoMap.get(key);
            }
            return new BucketInfo(BucketType.UPDATE, newFileAssignState.fileId, partitionPath);
        }
        BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, this.createFileIdOfThisTask(), partitionPath);
        String key = StreamerUtil.generateBucketKey(partitionPath, bucketInfo.getFileIdPrefix());
        this.bucketInfoMap.put(key, bucketInfo);
        NewFileAssignState newFileAssignState2 = new NewFileAssignState(bucketInfo.getFileIdPrefix(), this.writeProfile.getRecordsPerBucket());
        newFileAssignState2.assign();
        this.newFileAssignStates.put(partitionPath, newFileAssignState2);
        return bucketInfo;
    }

    private synchronized SmallFileAssign getSmallFileAssign(String partitionPath) {
        if (this.smallFileAssignMap.containsKey(partitionPath)) {
            return this.smallFileAssignMap.get(partitionPath);
        }
        List<SmallFile> smallFiles = this.smallFilesOfThisTask(this.writeProfile.getSmallFiles(partitionPath));
        if (smallFiles.size() > 0) {
            LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles);
            SmallFileAssignState[] states = (SmallFileAssignState[])smallFiles.stream().map(smallFile -> new SmallFileAssignState(this.config.getParquetMaxFileSize(), (SmallFile)smallFile, this.writeProfile.getAvgSize())).toArray(SmallFileAssignState[]::new);
            SmallFileAssign assign = new SmallFileAssign(states);
            this.smallFileAssignMap.put(partitionPath, assign);
            return assign;
        }
        this.smallFileAssignMap.put(partitionPath, null);
        return null;
    }

    public synchronized void reload(long checkpointId) {
        ++this.accCkp;
        if (this.accCkp > 1) {
            this.newFileAssignStates.clear();
            this.accCkp = 0;
        }
        this.smallFileAssignMap.clear();
        this.writeProfile.reload(checkpointId);
    }

    private boolean fileIdOfThisTask(String fileId) {
        return KeyGroupRangeAssignment.assignKeyToParallelOperator((Object)fileId, (int)this.maxParallelism, (int)this.numTasks) == this.taskID;
    }

    @VisibleForTesting
    public String createFileIdOfThisTask() {
        String newFileIdPfx = FSUtils.createNewFileIdPfx();
        while (!this.fileIdOfThisTask(newFileIdPfx)) {
            newFileIdPfx = FSUtils.createNewFileIdPfx();
        }
        return newFileIdPfx;
    }

    @VisibleForTesting
    public List<SmallFile> smallFilesOfThisTask(List<SmallFile> smallFiles) {
        return smallFiles.stream().filter(smallFile -> this.fileIdOfThisTask(smallFile.location.getFileId())).collect(Collectors.toList());
    }

    @Override
    public void close() {
        this.reset();
        WriteProfiles.clean(this.config.getBasePath());
    }

    private static class NewFileAssignState {
        long assigned;
        long totalUnassigned;
        final String fileId;

        NewFileAssignState(String fileId, long insertRecordsPerBucket) {
            this.fileId = fileId;
            this.assigned = 0L;
            this.totalUnassigned = insertRecordsPerBucket;
        }

        public boolean canAssign() {
            return this.totalUnassigned > 0L && this.totalUnassigned > this.assigned;
        }

        public void assign() {
            ++this.assigned;
        }
    }

    private static class SmallFileAssignState {
        long assigned = 0L;
        long total;
        final String fileId;

        SmallFileAssignState(long parquetMaxFileSize, SmallFile smallFile, long averageRecordSize) {
            this.total = (parquetMaxFileSize - smallFile.sizeBytes) / averageRecordSize;
            this.fileId = smallFile.location.getFileId();
        }

        public boolean canAssign() {
            return this.total > 0L && this.total > this.assigned;
        }

        public void assign() {
            ++this.assigned;
        }
    }

    private static class SmallFileAssign {
        final SmallFileAssignState[] states;
        int assignIdx = 0;
        boolean noSpace = false;

        SmallFileAssign(SmallFileAssignState[] states) {
            this.states = states;
        }

        public boolean assign() {
            if (this.noSpace) {
                return false;
            }
            SmallFileAssignState state = this.states[this.assignIdx];
            while (!state.canAssign()) {
                ++this.assignIdx;
                if (this.assignIdx >= this.states.length) {
                    this.noSpace = true;
                    return false;
                }
                state = this.states[this.assignIdx];
            }
            state.assign();
            return true;
        }

        public String getFileId() {
            return this.states[this.assignIdx].fileId;
        }
    }
}

