/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Function;
import javax.inject.Inject;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.BackgroundHiveSplitLoader;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.CoercionPolicy;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.DirectoryLister;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.ForHiveClient;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.HdfsEnvironment;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.HiveBucketHandle;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.HiveBucketProperty;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.HiveBucketing;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.HiveClientConfig;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.HiveConnectorId;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.HiveErrorCode;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.HivePartition;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.HivePartitionMetadata;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.HiveSplitSource;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.HiveTableLayoutHandle;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.HiveTransactionHandle;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.HiveType;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.NamenodeStats;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PartitionOfflineException;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.metastore.Column;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.metastore.MetastoreUtil;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.metastore.Partition;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.metastore.SemiTransactionalHiveMetastore;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.metastore.Table;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.spi.ConnectorSession;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.spi.ConnectorSplitSource;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.spi.ConnectorTableLayoutHandle;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.spi.ErrorCodeSupplier;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.spi.FixedSplitSource;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.spi.PrestoException;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.spi.SchemaTableName;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.spi.StandardErrorCode;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.spi.TableNotFoundException;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.spi.connector.ConnectorSplitManager;
import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import org.apache.flink.fs.s3presto.shaded.com.google.common.base.Preconditions;
import org.apache.flink.fs.s3presto.shaded.com.google.common.base.Strings;
import org.apache.flink.fs.s3presto.shaded.com.google.common.collect.AbstractIterator;
import org.apache.flink.fs.s3presto.shaded.com.google.common.collect.ImmutableList;
import org.apache.flink.fs.s3presto.shaded.com.google.common.collect.ImmutableMap;
import org.apache.flink.fs.s3presto.shaded.com.google.common.collect.Iterables;
import org.apache.flink.fs.s3presto.shaded.com.google.common.collect.Lists;
import org.apache.flink.fs.s3presto.shaded.com.google.common.collect.Ordering;
import org.apache.flink.fs.s3presto.shaded.io.airlift.concurrent.BoundedExecutor;
import org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.hive.metastore.ProtectMode;

public class HiveSplitManager
implements ConnectorSplitManager {
    public static final String PRESTO_OFFLINE = "presto_offline";
    private final String connectorId;
    private final Function<HiveTransactionHandle, SemiTransactionalHiveMetastore> metastoreProvider;
    private final NamenodeStats namenodeStats;
    private final HdfsEnvironment hdfsEnvironment;
    private final DirectoryLister directoryLister;
    private final Executor executor;
    private final CoercionPolicy coercionPolicy;
    private final int maxOutstandingSplits;
    private final int minPartitionBatchSize;
    private final int maxPartitionBatchSize;
    private final int maxInitialSplits;
    private final boolean recursiveDfsWalkerEnabled;

    @Inject
    public HiveSplitManager(HiveConnectorId connectorId, HiveClientConfig hiveClientConfig, Function<HiveTransactionHandle, SemiTransactionalHiveMetastore> metastoreProvider, NamenodeStats namenodeStats, HdfsEnvironment hdfsEnvironment, DirectoryLister directoryLister, @ForHiveClient ExecutorService executorService, CoercionPolicy coercionPolicy) {
        this(connectorId, metastoreProvider, namenodeStats, hdfsEnvironment, directoryLister, (Executor)new BoundedExecutor((Executor)executorService, hiveClientConfig.getMaxSplitIteratorThreads()), coercionPolicy, hiveClientConfig.getMaxOutstandingSplits(), hiveClientConfig.getMinPartitionBatchSize(), hiveClientConfig.getMaxPartitionBatchSize(), hiveClientConfig.getMaxInitialSplits(), hiveClientConfig.getRecursiveDirWalkerEnabled());
    }

    public HiveSplitManager(HiveConnectorId connectorId, Function<HiveTransactionHandle, SemiTransactionalHiveMetastore> metastoreProvider, NamenodeStats namenodeStats, HdfsEnvironment hdfsEnvironment, DirectoryLister directoryLister, Executor executor, CoercionPolicy coercionPolicy, int maxOutstandingSplits, int minPartitionBatchSize, int maxPartitionBatchSize, int maxInitialSplits, boolean recursiveDfsWalkerEnabled) {
        this.connectorId = Objects.requireNonNull(connectorId, "connectorId is null").toString();
        this.metastoreProvider = Objects.requireNonNull(metastoreProvider, "metastore is null");
        this.namenodeStats = Objects.requireNonNull(namenodeStats, "namenodeStats is null");
        this.hdfsEnvironment = Objects.requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
        this.directoryLister = Objects.requireNonNull(directoryLister, "directoryLister is null");
        this.executor = new ErrorCodedExecutor(executor);
        this.coercionPolicy = Objects.requireNonNull(coercionPolicy, "coercionPolicy is null");
        Preconditions.checkArgument(maxOutstandingSplits >= 1, "maxOutstandingSplits must be at least 1");
        this.maxOutstandingSplits = maxOutstandingSplits;
        this.minPartitionBatchSize = minPartitionBatchSize;
        this.maxPartitionBatchSize = maxPartitionBatchSize;
        this.maxInitialSplits = maxInitialSplits;
        this.recursiveDfsWalkerEnabled = recursiveDfsWalkerEnabled;
    }

    public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layoutHandle) {
        HiveTableLayoutHandle layout = (HiveTableLayoutHandle)layoutHandle;
        List<HivePartition> partitions = layout.getPartitions().get();
        HivePartition partition = Iterables.getFirst(partitions, null);
        if (partition == null) {
            return new FixedSplitSource(ImmutableList.of());
        }
        SchemaTableName tableName = partition.getTableName();
        List<HiveBucketing.HiveBucket> buckets = partition.getBuckets();
        Optional<HiveBucketHandle> bucketHandle = layout.getBucketHandle();
        partitions = Ordering.natural().onResultOf(HivePartition::getPartitionId).reverse().sortedCopy(partitions);
        SemiTransactionalHiveMetastore metastore = this.metastoreProvider.apply((HiveTransactionHandle)transaction);
        Optional<Table> table = metastore.getTable(tableName.getSchemaName(), tableName.getTableName());
        if (!table.isPresent()) {
            throw new TableNotFoundException(tableName);
        }
        Iterable<HivePartitionMetadata> hivePartitions = this.getPartitionMetadata(metastore, table.get(), tableName, partitions, bucketHandle.map(HiveBucketHandle::toBucketProperty));
        BackgroundHiveSplitLoader hiveSplitLoader = new BackgroundHiveSplitLoader(this.connectorId, table.get(), hivePartitions, bucketHandle, buckets, session, this.hdfsEnvironment, this.namenodeStats, this.directoryLister, this.executor, this.maxPartitionBatchSize, this.maxInitialSplits, this.recursiveDfsWalkerEnabled);
        HiveSplitSource splitSource = new HiveSplitSource(this.maxOutstandingSplits, hiveSplitLoader, this.executor);
        hiveSplitLoader.start(splitSource);
        return splitSource;
    }

    private Iterable<HivePartitionMetadata> getPartitionMetadata(SemiTransactionalHiveMetastore metastore, Table table, SchemaTableName tableName, List<HivePartition> hivePartitions, Optional<HiveBucketProperty> bucketProperty) {
        HivePartition firstPartition;
        if (hivePartitions.isEmpty()) {
            return ImmutableList.of();
        }
        if (hivePartitions.size() == 1 && (firstPartition = Iterables.getOnlyElement(hivePartitions)).getPartitionId().equals("<UNPARTITIONED>")) {
            return ImmutableList.of(new HivePartitionMetadata(firstPartition, Optional.empty(), ImmutableMap.of()));
        }
        Iterable<List<HivePartition>> partitionNameBatches = HiveSplitManager.partitionExponentially(hivePartitions, this.minPartitionBatchSize, this.maxPartitionBatchSize);
        Iterable partitionBatches = Iterables.transform(partitionNameBatches, partitionBatch -> {
            Map<String, Optional<Partition>> batch = metastore.getPartitionsByNames(tableName.getSchemaName(), tableName.getTableName(), Lists.transform(partitionBatch, HivePartition::getPartitionId));
            ImmutableMap.Builder<String, Partition> partitionBuilder = ImmutableMap.builder();
            for (Map.Entry<String, Optional<Partition>> entry : batch.entrySet()) {
                if (!entry.getValue().isPresent()) {
                    throw new PrestoException((ErrorCodeSupplier)HiveErrorCode.HIVE_PARTITION_DROPPED_DURING_QUERY, "Partition no longer exists: " + entry.getKey());
                }
                partitionBuilder.put(entry.getKey(), entry.getValue().get());
            }
            ImmutableMap partitions = partitionBuilder.build();
            if (partitionBatch.size() != partitions.size()) {
                throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, String.format("Expected %s partitions but found %s", partitionBatch.size(), partitions.size()));
            }
            ImmutableList.Builder results = ImmutableList.builder();
            for (HivePartition hivePartition : partitionBatch) {
                Partition partition = (Partition)partitions.get(hivePartition.getPartitionId());
                if (partition == null) {
                    throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "Partition not loaded: " + hivePartition);
                }
                String protectMode = partition.getParameters().get(ProtectMode.PARAMETER_NAME);
                String partName = MetastoreUtil.makePartName(table.getPartitionColumns(), partition.getValues());
                if (protectMode != null && ProtectMode.getProtectModeFromString((String)protectMode).offline) {
                    throw new PartitionOfflineException(tableName, partName, false, null);
                }
                String prestoOffline = partition.getParameters().get(PRESTO_OFFLINE);
                if (!Strings.isNullOrEmpty(prestoOffline)) {
                    throw new PartitionOfflineException(tableName, partName, true, prestoOffline);
                }
                List<Column> tableColumns = table.getDataColumns();
                List<Column> partitionColumns = partition.getColumns();
                if (tableColumns == null || partitionColumns == null) {
                    throw new PrestoException((ErrorCodeSupplier)HiveErrorCode.HIVE_INVALID_METADATA, String.format("Table '%s' or partition '%s' has null columns", tableName, partName));
                }
                ImmutableMap.Builder<Integer, HiveType> columnCoercions = ImmutableMap.builder();
                for (int i = 0; i < Math.min(partitionColumns.size(), tableColumns.size()); ++i) {
                    HiveType partitionType;
                    HiveType tableType = tableColumns.get(i).getType();
                    if (tableType.equals(partitionType = partitionColumns.get(i).getType())) continue;
                    if (!this.coercionPolicy.canCoerce(partitionType, tableType)) {
                        throw new PrestoException((ErrorCodeSupplier)HiveErrorCode.HIVE_PARTITION_SCHEMA_MISMATCH, String.format("There is a mismatch between the table and partition schemas. The types are incompatible and cannot be coerced. The column '%s' in table '%s' is declared as type '%s', but partition '%s' declared column '%s' as type '%s'.", tableColumns.get(i).getName(), tableName, tableType, partName, partitionColumns.get(i).getName(), partitionType));
                    }
                    columnCoercions.put(i, partitionType);
                }
                if (bucketProperty.isPresent()) {
                    Optional<HiveBucketProperty> partitionBucketProperty = partition.getStorage().getBucketProperty();
                    if (!partitionBucketProperty.isPresent()) {
                        throw new PrestoException((ErrorCodeSupplier)HiveErrorCode.HIVE_PARTITION_SCHEMA_MISMATCH, String.format("Hive table (%s) is bucketed but partition (%s) is not bucketed", hivePartition.getTableName(), hivePartition.getPartitionId()));
                    }
                    if (!bucketProperty.equals(partitionBucketProperty)) {
                        throw new PrestoException((ErrorCodeSupplier)HiveErrorCode.HIVE_PARTITION_SCHEMA_MISMATCH, String.format("Hive table (%s) bucketing (columns=%s, buckets=%s) does not match partition (%s) bucketing (columns=%s, buckets=%s)", hivePartition.getTableName(), ((HiveBucketProperty)bucketProperty.get()).getBucketedBy(), ((HiveBucketProperty)bucketProperty.get()).getBucketCount(), hivePartition.getPartitionId(), partitionBucketProperty.get().getBucketedBy(), partitionBucketProperty.get().getBucketCount()));
                    }
                }
                results.add(new HivePartitionMetadata(hivePartition, Optional.of(partition), columnCoercions.build()));
            }
            return results.build();
        });
        return Iterables.concat(partitionBatches);
    }

    private static <T> Iterable<List<T>> partitionExponentially(final List<T> values, final int minBatchSize, final int maxBatchSize) {
        return () -> new AbstractIterator<List<T>>(){
            private int currentSize;
            private final Iterator iterator;
            {
                this.currentSize = minBatchSize;
                this.iterator = values.iterator();
            }

            @Override
            protected List<T> computeNext() {
                if (!this.iterator.hasNext()) {
                    return (List)this.endOfData();
                }
                ImmutableList.Builder builder = ImmutableList.builder();
                for (int count = 0; this.iterator.hasNext() && count < this.currentSize; ++count) {
                    builder.add(this.iterator.next());
                }
                this.currentSize = Math.min(maxBatchSize, this.currentSize * 2);
                return builder.build();
            }
        };
    }

    private static class ErrorCodedExecutor
    implements Executor {
        private final Executor delegate;

        private ErrorCodedExecutor(Executor delegate) {
            this.delegate = Objects.requireNonNull(delegate, "delegate is null");
        }

        @Override
        public void execute(Runnable command) {
            try {
                this.delegate.execute(command);
            }
            catch (RejectedExecutionException e) {
                throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.SERVER_SHUTTING_DOWN, "Server is shutting down", (Throwable)e);
            }
        }
    }
}

