/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.hive;

import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.HiveBucketFunction;
import com.facebook.presto.hive.HiveBucketProperty;
import com.facebook.presto.hive.HiveColumnHandle;
import com.facebook.presto.hive.HiveErrorCode;
import com.facebook.presto.hive.HiveType;
import com.facebook.presto.hive.HiveWriter;
import com.facebook.presto.hive.HiveWriterFactory;
import com.facebook.presto.hive.PartitionUpdate;
import com.facebook.presto.spi.ConnectorPageSink;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.PageIndexer;
import com.facebook.presto.spi.PageIndexerFactory;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockBuilderStatus;
import com.facebook.presto.spi.block.DictionaryBlock;
import com.facebook.presto.spi.block.IntArrayBlockBuilder;
import com.facebook.presto.spi.type.IntegerType;
import com.facebook.presto.spi.type.TypeManager;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.airlift.concurrent.MoreFutures;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.airlift.slice.SizeOf;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import it.unimi.dsi.fastutil.ints.IntArraySet;
import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

public class HivePageSink
implements ConnectorPageSink {
    private static final Logger log = Logger.get(HivePageSink.class);
    private static final int MAX_PAGE_POSITIONS = 4096;
    private final HiveWriterFactory writerFactory;
    private final int[] dataColumnInputIndex;
    private final int[] partitionColumnsInputIndex;
    private final int[] bucketColumns;
    private final HiveBucketFunction bucketFunction;
    private final HiveWriterPagePartitioner pagePartitioner;
    private final HdfsEnvironment hdfsEnvironment;
    private final int maxOpenWriters;
    private final ListeningExecutorService writeVerificationExecutor;
    private final JsonCodec<PartitionUpdate> partitionUpdateCodec;
    private final List<HiveWriter> writers = new ArrayList<HiveWriter>();
    private final List<WriterPositions> writerPositions = new ArrayList<WriterPositions>();
    private final ConnectorSession session;
    private long systemMemoryUsage;

    public HivePageSink(HiveWriterFactory writerFactory, List<HiveColumnHandle> inputColumns, Optional<HiveBucketProperty> bucketProperty, PageIndexerFactory pageIndexerFactory, TypeManager typeManager, HdfsEnvironment hdfsEnvironment, int maxOpenWriters, ListeningExecutorService writeVerificationExecutor, JsonCodec<PartitionUpdate> partitionUpdateCodec, ConnectorSession session) {
        this.writerFactory = Objects.requireNonNull(writerFactory, "writerFactory is null");
        Objects.requireNonNull(inputColumns, "inputColumns is null");
        Objects.requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");
        this.hdfsEnvironment = Objects.requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
        this.maxOpenWriters = maxOpenWriters;
        this.writeVerificationExecutor = Objects.requireNonNull(writeVerificationExecutor, "writeVerificationExecutor is null");
        this.partitionUpdateCodec = Objects.requireNonNull(partitionUpdateCodec, "partitionUpdateCodec is null");
        Objects.requireNonNull(bucketProperty, "bucketProperty is null");
        this.pagePartitioner = new HiveWriterPagePartitioner(inputColumns, bucketProperty.isPresent(), pageIndexerFactory, typeManager);
        ImmutableList.Builder partitionColumns = ImmutableList.builder();
        ImmutableList.Builder dataColumnsInputIndex = ImmutableList.builder();
        Object2IntOpenHashMap dataColumnNameToIdMap = new Object2IntOpenHashMap();
        HashMap<String, HiveType> dataColumnNameToTypeMap = new HashMap<String, HiveType>();
        for (int inputIndex = 0; inputIndex < inputColumns.size(); ++inputIndex) {
            HiveColumnHandle column = inputColumns.get(inputIndex);
            if (column.isPartitionKey()) {
                partitionColumns.add((Object)inputIndex);
                continue;
            }
            dataColumnsInputIndex.add((Object)inputIndex);
            dataColumnNameToIdMap.put((Object)column.getName(), inputIndex);
            dataColumnNameToTypeMap.put(column.getName(), column.getHiveType());
        }
        this.partitionColumnsInputIndex = Ints.toArray(partitionColumns.build());
        this.dataColumnInputIndex = Ints.toArray(dataColumnsInputIndex.build());
        if (bucketProperty.isPresent()) {
            int bucketCount = bucketProperty.get().getBucketCount();
            this.bucketColumns = bucketProperty.get().getBucketedBy().stream().mapToInt(arg_0 -> ((Object2IntMap)dataColumnNameToIdMap).get(arg_0)).toArray();
            List<HiveType> bucketColumnTypes = bucketProperty.get().getBucketedBy().stream().map(dataColumnNameToTypeMap::get).collect(Collectors.toList());
            this.bucketFunction = new HiveBucketFunction(bucketCount, bucketColumnTypes);
        } else {
            this.bucketColumns = null;
            this.bucketFunction = null;
        }
        this.session = Objects.requireNonNull(session, "session is null");
    }

    public long getSystemMemoryUsage() {
        return this.systemMemoryUsage;
    }

    public CompletableFuture<Collection<Slice>> finish() {
        ListenableFuture result = this.hdfsEnvironment.doAs(this.session.getUser(), this::doFinish);
        return MoreFutures.toCompletableFuture((ListenableFuture)result);
    }

    private ListenableFuture<Collection<Slice>> doFinish() {
        ImmutableList.Builder partitionUpdates = ImmutableList.builder();
        ArrayList verificationTasks = new ArrayList();
        for (HiveWriter writer : this.writers) {
            writer.commit();
            PartitionUpdate partitionUpdate = writer.getPartitionUpdate();
            partitionUpdates.add(Slices.wrappedBuffer(this.partitionUpdateCodec.toJsonBytes((Object)partitionUpdate)));
            writer.getVerificationTask().map(Executors::callable).ifPresent(verificationTasks::add);
        }
        ImmutableCollection result = partitionUpdates.build();
        if (verificationTasks.isEmpty()) {
            return Futures.immediateFuture(result);
        }
        try {
            List futures = this.writeVerificationExecutor.invokeAll(verificationTasks).stream().map(future -> (ListenableFuture)future).collect(Collectors.toList());
            return Futures.transform(Futures.allAsList(futures), arg_0 -> HivePageSink.lambda$doFinish$1((List)((Object)result), arg_0));
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

    public void abort() {
        this.hdfsEnvironment.doAs(this.session.getUser(), this::doAbort);
    }

    private void doAbort() {
        Optional<Object> rollbackException = Optional.empty();
        for (HiveWriter writer : this.writers) {
            if (writer == null) continue;
            try {
                writer.rollback();
            }
            catch (Exception e) {
                log.warn("exception '%s' while rollback on %s", e, writer);
                rollbackException = Optional.of(e);
            }
        }
        if (rollbackException.isPresent()) {
            throw new PrestoException((ErrorCodeSupplier)HiveErrorCode.HIVE_WRITER_CLOSE_ERROR, "Error rolling back write to Hive", (Throwable)rollbackException.get());
        }
    }

    public CompletableFuture<?> appendPage(Page page) {
        if (page.getPositionCount() > 0) {
            this.hdfsEnvironment.doAs(this.session.getUser(), () -> this.doAppend(page));
        }
        return NOT_BLOCKED;
    }

    private void doAppend(Page page) {
        while (page.getPositionCount() > 4096) {
            Page chunk = page.getRegion(0, 4096);
            page = page.getRegion(4096, page.getPositionCount() - 4096);
            this.writePage(chunk);
        }
        this.writePage(page);
    }

    private void writePage(Page page) {
        int[] writerIndexes = this.getWriterIndexes(page);
        for (int position = 0; position < page.getPositionCount(); ++position) {
            int writerIndex = writerIndexes[position];
            this.writerPositions.get(writerIndex).add(position);
        }
        Page dataPage = this.getDataPage(page);
        IntArraySet writersUsed = new IntArraySet(writerIndexes);
        IntIterator iterator = writersUsed.iterator();
        while (iterator.hasNext()) {
            int writerIndex = iterator.nextInt();
            WriterPositions currentWriterPositions = this.writerPositions.get(writerIndex);
            if (currentWriterPositions.isEmpty()) continue;
            Page pageForWriter = dataPage;
            if (currentWriterPositions.size() != dataPage.getPositionCount()) {
                Block[] blocks = new Block[dataPage.getChannelCount()];
                for (int channel = 0; channel < dataPage.getChannelCount(); ++channel) {
                    blocks[channel] = new DictionaryBlock(currentWriterPositions.size(), dataPage.getBlock(channel), currentWriterPositions.getPositionsArray());
                }
                pageForWriter = new Page(currentWriterPositions.size(), blocks);
            }
            HiveWriter writer = this.writers.get(writerIndex);
            long currentMemory = writer.getSystemMemoryUsage();
            writer.append(pageForWriter);
            this.systemMemoryUsage += writer.getSystemMemoryUsage() - currentMemory;
            currentWriterPositions.clear();
        }
    }

    private int[] getWriterIndexes(Page page) {
        Page partitionColumns = HivePageSink.extractColumns(page, this.partitionColumnsInputIndex);
        Block bucketBlock = this.buildBucketBlock(page);
        int[] writerIndexes = this.pagePartitioner.partitionPage(partitionColumns, bucketBlock);
        if (this.pagePartitioner.getMaxIndex() >= this.maxOpenWriters) {
            throw new PrestoException((ErrorCodeSupplier)HiveErrorCode.HIVE_TOO_MANY_OPEN_PARTITIONS, "Too many open partitions");
        }
        while (this.writers.size() <= this.pagePartitioner.getMaxIndex()) {
            this.writers.add(null);
            WriterPositions newWriterPositions = new WriterPositions();
            this.systemMemoryUsage += SizeOf.sizeOf(newWriterPositions.getPositionsArray());
            this.writerPositions.add(newWriterPositions);
        }
        for (int position = 0; position < page.getPositionCount(); ++position) {
            int writerIndex = writerIndexes[position];
            if (this.writers.get(writerIndex) != null) continue;
            OptionalInt bucketNumber = OptionalInt.empty();
            if (bucketBlock != null) {
                bucketNumber = OptionalInt.of(bucketBlock.getInt(position, 0));
            }
            HiveWriter writer = this.writerFactory.createWriter(partitionColumns, position, bucketNumber);
            this.writers.set(writerIndex, writer);
        }
        Verify.verify(this.writers.size() == this.pagePartitioner.getMaxIndex() + 1);
        Verify.verify(!this.writers.contains(null));
        return writerIndexes;
    }

    private Page getDataPage(Page page) {
        Block[] blocks = new Block[this.dataColumnInputIndex.length];
        for (int i = 0; i < this.dataColumnInputIndex.length; ++i) {
            int dataColumn = this.dataColumnInputIndex[i];
            blocks[i] = page.getBlock(dataColumn);
        }
        return new Page(page.getPositionCount(), blocks);
    }

    private Block buildBucketBlock(Page page) {
        if (this.bucketFunction == null) {
            return null;
        }
        IntArrayBlockBuilder bucketColumnBuilder = new IntArrayBlockBuilder(new BlockBuilderStatus(), page.getPositionCount());
        Page bucketColumnsPage = HivePageSink.extractColumns(page, this.bucketColumns);
        for (int position = 0; position < page.getPositionCount(); ++position) {
            int bucket = this.bucketFunction.getBucket(bucketColumnsPage, position);
            bucketColumnBuilder.writeInt(bucket);
        }
        return bucketColumnBuilder.build();
    }

    private static Page extractColumns(Page page, int[] columns) {
        Block[] blocks = new Block[columns.length];
        for (int i = 0; i < columns.length; ++i) {
            int dataColumn = columns[i];
            blocks[i] = page.getBlock(dataColumn);
        }
        return new Page(page.getPositionCount(), blocks);
    }

    private static /* synthetic */ Collection lambda$doFinish$1(List result, List input) {
        return result;
    }

    private static final class WriterPositions {
        private final int[] positions = new int[4096];
        private int size;

        private WriterPositions() {
        }

        public boolean isEmpty() {
            return this.size == 0;
        }

        public int size() {
            return this.size;
        }

        public int[] getPositionsArray() {
            return this.positions;
        }

        public void add(int position) {
            Preconditions.checkArgument(this.size < this.positions.length, "Too many page positions");
            this.positions[this.size] = position;
            ++this.size;
        }

        public void clear() {
            this.size = 0;
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).add("size", this.size).toString();
        }
    }

    private static class HiveWriterPagePartitioner {
        private final PageIndexer pageIndexer;

        public HiveWriterPagePartitioner(List<HiveColumnHandle> inputColumns, boolean bucketed, PageIndexerFactory pageIndexerFactory, TypeManager typeManager) {
            Objects.requireNonNull(inputColumns, "inputColumns is null");
            Objects.requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");
            List partitionColumnTypes = inputColumns.stream().filter(HiveColumnHandle::isPartitionKey).map(column -> typeManager.getType(column.getTypeSignature())).collect(Collectors.toList());
            if (bucketed) {
                partitionColumnTypes.add(IntegerType.INTEGER);
            }
            this.pageIndexer = pageIndexerFactory.createPageIndexer(partitionColumnTypes);
        }

        public int[] partitionPage(Page partitionColumns, Block bucketBlock) {
            if (bucketBlock != null) {
                Block[] blocks = new Block[partitionColumns.getChannelCount() + 1];
                for (int i = 0; i < partitionColumns.getChannelCount(); ++i) {
                    blocks[i] = partitionColumns.getBlock(i);
                }
                blocks[blocks.length - 1] = bucketBlock;
                partitionColumns = new Page(partitionColumns.getPositionCount(), blocks);
            }
            return this.pageIndexer.indexPage(partitionColumns);
        }

        public int getMaxIndex() {
            return this.pageIndexer.getMaxIndex();
        }
    }
}

