/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.dict;

import it.unimi.dsi.fastutil.objects.Object2LongMap;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.TreeSet;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.spark.dict.NGlobalDictMetaInfo;
import org.apache.spark.dict.NGlobalDictStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NGlobalDictHDFSStore
implements NGlobalDictStore {
    protected static final String VERSION_PREFIX = "version_";
    protected static final String DICT_METADATA_NAME = "meta";
    protected static final String DICT_CURR_PREFIX = "CURR_";
    protected static final String DICT_PREV_PREFIX = "PREV_";
    protected static final String WORKING_DIR = "working";
    static final Logger logger = LoggerFactory.getLogger(NGlobalDictHDFSStore.class);
    protected final Path basePath;
    protected final FileSystem fileSystem;
    protected final String baseDir;

    public NGlobalDictHDFSStore(String baseDir) throws IOException {
        this.baseDir = baseDir;
        this.basePath = new Path(baseDir);
        this.fileSystem = this.basePath.getFileSystem(new Configuration());
    }

    @Override
    public void prepareForWrite(String workingDir) throws IOException {
        if (!this.fileSystem.exists(this.basePath)) {
            logger.info("Global dict store at {} doesn't exist, create a new one", (Object)this.basePath);
            this.fileSystem.mkdirs(this.basePath);
        }
        logger.trace("Prepare to write Global dict store at {}", (Object)workingDir);
        Path working = new Path(workingDir);
        if (this.fileSystem.exists(working)) {
            this.fileSystem.delete(working, true);
            logger.trace("Working directory {} exits, delete it first", (Object)working);
        }
        this.fileSystem.mkdirs(working);
    }

    @Override
    public Long[] listAllVersions() throws IOException {
        if (!this.fileSystem.exists(this.basePath)) {
            return new Long[0];
        }
        FileStatus[] versionDirs = this.fileSystem.listStatus(this.basePath, path -> path.getName().startsWith(VERSION_PREFIX));
        TreeSet<Long> versions = new TreeSet<Long>();
        for (FileStatus versionDir : versionDirs) {
            Path path2 = versionDir.getPath();
            versions.add(Long.parseLong(path2.getName().substring(VERSION_PREFIX.length())));
        }
        return versions.toArray(new Long[versions.size()]);
    }

    @Override
    public Path getVersionDir(long version) {
        return new Path(this.basePath, VERSION_PREFIX + version);
    }

    @Override
    public NGlobalDictMetaInfo getMetaInfo(long version) throws IOException {
        NGlobalDictMetaInfo metaInfo;
        Path versionDir = this.getVersionDir(version);
        FileStatus[] metaFiles = this.fileSystem.listStatus(versionDir, path -> path.getName().startsWith(DICT_METADATA_NAME));
        if (metaFiles.length == 0) {
            logger.info("because metaFiles.length is 0, metaInfo is null");
            return null;
        }
        String metaFile = metaFiles[0].getPath().getName();
        Path metaPath = new Path(versionDir, metaFile);
        if (!this.fileSystem.exists(metaPath)) {
            logger.info("because metaPath[{}] is not exists, metaInfo is null", (Object)metaPath);
            return null;
        }
        try (FSDataInputStream is = this.fileSystem.open(metaPath);){
            int i;
            int bucketSize = is.readInt();
            long[] bucketOffsets = new long[bucketSize];
            long[] bucketCount = new long[bucketSize];
            long dictCount = is.readLong();
            for (i = 0; i < bucketSize; ++i) {
                bucketOffsets[i] = is.readLong();
            }
            for (i = 0; i < bucketSize; ++i) {
                bucketCount[i] = is.readLong();
            }
            metaInfo = new NGlobalDictMetaInfo(bucketSize, bucketOffsets, dictCount, bucketCount);
        }
        return metaInfo;
    }

    @Override
    public Object2LongMap<String> getBucketDict(long version, NGlobalDictMetaInfo metaInfo, int bucketId) throws IOException {
        FileStatus[] bucketFiles;
        Object2LongOpenHashMap object2IntMap = new Object2LongOpenHashMap();
        Path versionDir = this.getVersionDir(version);
        for (FileStatus file : bucketFiles = this.fileSystem.listStatus(versionDir, path -> path.getName().endsWith("_" + bucketId))) {
            if (file.getPath().getName().startsWith(DICT_CURR_PREFIX)) {
                object2IntMap.putAll(this.getBucketDict(file.getPath(), metaInfo.getOffset(bucketId)));
            }
            if (!file.getPath().getName().startsWith(DICT_PREV_PREFIX)) continue;
            object2IntMap.putAll(this.getBucketDict(file.getPath(), 0L));
        }
        return object2IntMap;
    }

    private Object2LongMap<String> getBucketDict(Path dictPath, long offset) throws IOException {
        Object2LongOpenHashMap object2IntMap = new Object2LongOpenHashMap();
        try (FSDataInputStream is = this.fileSystem.open(dictPath);){
            int elementCnt = is.readInt();
            for (int i = 0; i < elementCnt; ++i) {
                long value = is.readLong();
                int bytesLength = is.readInt();
                byte[] bytes = new byte[bytesLength];
                IOUtils.readFully((InputStream)is, (byte[])bytes, (int)0, (int)bytes.length);
                object2IntMap.put((Object)new String(bytes, Charset.defaultCharset()), value + offset);
            }
        }
        return object2IntMap;
    }

    @Override
    public void writeBucketCurrDict(String workingPath, int bucketId, Object2LongMap<String> openHashMap) throws IOException {
        Path dictPath = new Path(workingPath, DICT_CURR_PREFIX + bucketId);
        this.writeBucketDict(dictPath, openHashMap);
    }

    @Override
    public void writeBucketPrevDict(String workingPath, int bucketId, Object2LongMap<String> openHashMap) throws IOException {
        Path dictPath = new Path(workingPath, DICT_PREV_PREFIX + bucketId);
        this.writeBucketDict(dictPath, openHashMap);
    }

    private void writeBucketDict(Path dictPath, Object2LongMap<String> openHashMap) throws IOException {
        if (this.fileSystem.exists(dictPath)) {
            this.fileSystem.delete(dictPath, true);
        }
        logger.info("Write dict path: {}", (Object)dictPath);
        try (FSDataOutputStream dos = this.fileSystem.create(dictPath);){
            dos.writeInt(openHashMap.size());
            for (Object2LongMap.Entry entry : openHashMap.object2LongEntrySet()) {
                dos.writeLong(entry.getLongValue());
                byte[] bytes = ((String)entry.getKey()).getBytes(Charset.defaultCharset());
                dos.writeInt(bytes.length);
                dos.write(bytes);
            }
            dos.flush();
        }
        logger.info("Write dict path: {} , dict num: {} success", (Object)dictPath, (Object)openHashMap.size());
    }

    @Override
    public void writeMetaInfo(int bucketSize, String workingPath) throws IOException {
        Path metaPath = new Path(workingPath, DICT_METADATA_NAME);
        if (this.fileSystem.exists(metaPath)) {
            this.fileSystem.delete(metaPath, true);
        }
        logger.info("Write dict meta path: {}", (Object)metaPath);
        Path workPath = new Path(workingPath);
        FileStatus[] dictPrevFiles = this.fileSystem.listStatus(workPath, path -> StringUtils.contains((String)path.getName(), (String)DICT_PREV_PREFIX));
        FileStatus[] dictCurrFiles = this.fileSystem.listStatus(workPath, path -> StringUtils.contains((String)path.getName(), (String)DICT_CURR_PREFIX));
        long prevDictCount = 0L;
        long[] bucketCnts = new long[bucketSize];
        long[] bucketOffsets = new long[bucketSize];
        for (FileStatus fileStatus : dictPrevFiles) {
            try (FSDataInputStream is = this.fileSystem.open(fileStatus.getPath());){
                String bucketId = fileStatus.getPath().getName().replaceAll(DICT_PREV_PREFIX, "");
                int cnt = is.readInt();
                prevDictCount += (long)cnt;
                bucketCnts[Integer.parseInt((String)bucketId)] = cnt;
            }
        }
        try (FSDataOutputStream dos = this.fileSystem.create(metaPath);){
            dos.writeInt(bucketSize);
            int currDictCnt = 0;
            for (FileStatus fileStatus : dictCurrFiles) {
                try (FSDataInputStream is = this.fileSystem.open(fileStatus.getPath());){
                    String bucketId = fileStatus.getPath().getName().replaceAll(DICT_CURR_PREFIX, "");
                    int cnt = is.readInt();
                    int bucket = Integer.parseInt(bucketId);
                    bucketCnts[bucket] = bucketCnts[bucket] + (long)cnt;
                    bucketOffsets[bucket] = cnt;
                    currDictCnt += cnt;
                }
            }
            dos.writeLong(prevDictCount + (long)currDictCnt);
            for (long offset : bucketOffsets) {
                dos.writeLong(prevDictCount);
                prevDictCount += offset;
            }
            for (long cnt : bucketCnts) {
                dos.writeLong(cnt);
            }
            dos.flush();
        }
    }

    @Override
    public void commit(String workingDir, int maxVersions, long versionTTL) throws IOException {
        Path workingPath = new Path(workingDir);
        Path newVersionPath = new Path(this.basePath, VERSION_PREFIX + System.currentTimeMillis());
        this.fileSystem.rename(workingPath, newVersionPath);
        logger.info("Commit from {} to {}", (Object)workingPath, (Object)newVersionPath);
        this.cleanUp(maxVersions, versionTTL);
    }

    @Override
    public String getWorkingDir() {
        return this.baseDir + WORKING_DIR;
    }

    protected void cleanUp(int maxVersions, long versionTTL) throws IOException {
        long timestamp = System.currentTimeMillis();
        Long[] versions = this.listAllVersions();
        for (int i = 0; i < versions.length - maxVersions; ++i) {
            if (versions[i] + versionTTL >= timestamp) continue;
            this.fileSystem.delete(this.getVersionDir(versions[i]), true);
        }
    }
}

