/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.segment.realtime.appenderator;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.google.inject.Provider;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.IndexableAdapter;
import org.apache.druid.segment.ProgressIndicator;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
import org.apache.druid.segment.realtime.appenderator.Appenderators;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.SinkQuerySegmentWalker;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderator;
import org.apache.druid.segment.realtime.plumber.Sink;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.Interval;
import org.joda.time.Period;

public class UnifiedIndexerAppenderatorsManager
implements AppenderatorsManager {
    private final Logger LOG = new Logger(UnifiedIndexerAppenderatorsManager.class);
    private final Map<String, DatasourceBundle> datasourceBundles = new HashMap<String, DatasourceBundle>();
    private final QueryProcessingPool queryProcessingPool;
    private final JoinableFactory joinableFactory;
    private final WorkerConfig workerConfig;
    private final Cache cache;
    private final CacheConfig cacheConfig;
    private final CachePopulatorStats cachePopulatorStats;
    private final ObjectMapper objectMapper;
    private final ServiceEmitter serviceEmitter;
    private final Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider;
    private ListeningExecutorService mergeExecutor;

    @Inject
    public UnifiedIndexerAppenderatorsManager(QueryProcessingPool queryProcessingPool, JoinableFactory joinableFactory, WorkerConfig workerConfig, Cache cache, CacheConfig cacheConfig, CachePopulatorStats cachePopulatorStats, ObjectMapper objectMapper, ServiceEmitter serviceEmitter, Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider) {
        this.queryProcessingPool = queryProcessingPool;
        this.joinableFactory = joinableFactory;
        this.workerConfig = workerConfig;
        this.cache = cache;
        this.cacheConfig = cacheConfig;
        this.cachePopulatorStats = cachePopulatorStats;
        this.objectMapper = objectMapper;
        this.serviceEmitter = serviceEmitter;
        this.queryRunnerFactoryConglomerateProvider = queryRunnerFactoryConglomerateProvider;
        this.mergeExecutor = MoreExecutors.listeningDecorator((ExecutorService)Execs.multiThreaded((int)workerConfig.getNumConcurrentMerges(), (String)"unified-indexer-merge-pool-%d"));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Appenderator createRealtimeAppenderatorForTask(String taskId, DataSchema schema, AppenderatorConfig config, FireDepartmentMetrics metrics, DataSegmentPusher dataSegmentPusher, ObjectMapper objectMapper, IndexIO indexIO, IndexMerger indexMerger, QueryRunnerFactoryConglomerate conglomerate, DataSegmentAnnouncer segmentAnnouncer, ServiceEmitter emitter, QueryProcessingPool queryProcessingPool, JoinableFactory joinableFactory, Cache cache, CacheConfig cacheConfig, CachePopulatorStats cachePopulatorStats, RowIngestionMeters rowIngestionMeters, ParseExceptionHandler parseExceptionHandler) {
        UnifiedIndexerAppenderatorsManager unifiedIndexerAppenderatorsManager = this;
        synchronized (unifiedIndexerAppenderatorsManager) {
            DatasourceBundle datasourceBundle = this.datasourceBundles.computeIfAbsent(schema.getDataSource(), x$0 -> new DatasourceBundle((String)x$0));
            StreamAppenderator appenderator = new StreamAppenderator(taskId, schema, this.rewriteAppenderatorConfigMemoryLimits(config), metrics, dataSegmentPusher, objectMapper, segmentAnnouncer, datasourceBundle.getWalker(), indexIO, this.wrapIndexMerger(indexMerger), cache, rowIngestionMeters, parseExceptionHandler);
            datasourceBundle.addAppenderator(taskId, appenderator);
            return appenderator;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Appenderator createOfflineAppenderatorForTask(String taskId, DataSchema schema, AppenderatorConfig config, FireDepartmentMetrics metrics, DataSegmentPusher dataSegmentPusher, ObjectMapper objectMapper, IndexIO indexIO, IndexMerger indexMerger, RowIngestionMeters rowIngestionMeters, ParseExceptionHandler parseExceptionHandler) {
        UnifiedIndexerAppenderatorsManager unifiedIndexerAppenderatorsManager = this;
        synchronized (unifiedIndexerAppenderatorsManager) {
            DatasourceBundle datasourceBundle = this.datasourceBundles.computeIfAbsent(schema.getDataSource(), x$0 -> new DatasourceBundle((String)x$0));
            Appenderator appenderator = Appenderators.createOffline(taskId, schema, this.rewriteAppenderatorConfigMemoryLimits(config), metrics, dataSegmentPusher, objectMapper, indexIO, this.wrapIndexMerger(indexMerger), rowIngestionMeters, parseExceptionHandler);
            datasourceBundle.addAppenderator(taskId, appenderator);
            return appenderator;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Appenderator createOpenSegmentsOfflineAppenderatorForTask(String taskId, DataSchema schema, AppenderatorConfig config, FireDepartmentMetrics metrics, DataSegmentPusher dataSegmentPusher, ObjectMapper objectMapper, IndexIO indexIO, IndexMerger indexMerger, RowIngestionMeters rowIngestionMeters, ParseExceptionHandler parseExceptionHandler) {
        UnifiedIndexerAppenderatorsManager unifiedIndexerAppenderatorsManager = this;
        synchronized (unifiedIndexerAppenderatorsManager) {
            DatasourceBundle datasourceBundle = this.datasourceBundles.computeIfAbsent(schema.getDataSource(), x$0 -> new DatasourceBundle((String)x$0));
            Appenderator appenderator = Appenderators.createOpenSegmentsOffline(taskId, schema, this.rewriteAppenderatorConfigMemoryLimits(config), metrics, dataSegmentPusher, objectMapper, indexIO, this.wrapIndexMerger(indexMerger), rowIngestionMeters, parseExceptionHandler);
            datasourceBundle.addAppenderator(taskId, appenderator);
            return appenderator;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Appenderator createClosedSegmentsOfflineAppenderatorForTask(String taskId, DataSchema schema, AppenderatorConfig config, FireDepartmentMetrics metrics, DataSegmentPusher dataSegmentPusher, ObjectMapper objectMapper, IndexIO indexIO, IndexMerger indexMerger, RowIngestionMeters rowIngestionMeters, ParseExceptionHandler parseExceptionHandler) {
        UnifiedIndexerAppenderatorsManager unifiedIndexerAppenderatorsManager = this;
        synchronized (unifiedIndexerAppenderatorsManager) {
            DatasourceBundle datasourceBundle = this.datasourceBundles.computeIfAbsent(schema.getDataSource(), x$0 -> new DatasourceBundle((String)x$0));
            Appenderator appenderator = Appenderators.createClosedSegmentsOffline(taskId, schema, this.rewriteAppenderatorConfigMemoryLimits(config), metrics, dataSegmentPusher, objectMapper, indexIO, this.wrapIndexMerger(indexMerger), rowIngestionMeters, parseExceptionHandler);
            datasourceBundle.addAppenderator(taskId, appenderator);
            return appenderator;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void removeAppenderatorsForTask(String taskId, String dataSource) {
        UnifiedIndexerAppenderatorsManager unifiedIndexerAppenderatorsManager = this;
        synchronized (unifiedIndexerAppenderatorsManager) {
            DatasourceBundle datasourceBundle = this.datasourceBundles.get(dataSource);
            if (datasourceBundle == null) {
                this.LOG.error("Could not find datasource bundle for [%s], task [%s]", new Object[]{dataSource, taskId});
            } else {
                List existingAppenderators = (List)datasourceBundle.taskAppenderatorMap.remove(taskId);
                if (existingAppenderators == null) {
                    this.LOG.error("Tried to remove appenderators for task [%s] but none were found.", new Object[]{taskId});
                }
                if (datasourceBundle.taskAppenderatorMap.isEmpty()) {
                    this.datasourceBundles.remove(dataSource);
                }
            }
        }
    }

    @Override
    public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals) {
        return this.getBundle(query).getWalker().getQueryRunnerForIntervals(query, intervals);
    }

    @Override
    public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs) {
        return this.getBundle(query).getWalker().getQueryRunnerForSegments(query, specs);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    <T> DatasourceBundle getBundle(Query<T> query) {
        DatasourceBundle bundle;
        DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource((DataSource)query.getDataSource());
        TableDataSource table = (TableDataSource)analysis.getBaseTableDataSource().orElseThrow(() -> new ISE("Cannot handle datasource: %s", new Object[]{analysis.getDataSource()}));
        UnifiedIndexerAppenderatorsManager unifiedIndexerAppenderatorsManager = this;
        synchronized (unifiedIndexerAppenderatorsManager) {
            bundle = this.datasourceBundles.get(table.getName());
        }
        if (bundle == null) {
            throw new IAE("Could not find segment walker for datasource [%s]", new Object[]{table.getName()});
        }
        return bundle;
    }

    @Override
    public boolean shouldTaskMakeNodeAnnouncements() {
        return false;
    }

    @Override
    public void shutdown() {
        if (this.mergeExecutor != null) {
            this.mergeExecutor.shutdownNow();
            this.mergeExecutor = null;
        }
    }

    @VisibleForTesting
    public Map<String, DatasourceBundle> getDatasourceBundles() {
        return this.datasourceBundles;
    }

    private AppenderatorConfig rewriteAppenderatorConfigMemoryLimits(AppenderatorConfig baseConfig) {
        long perWorkerLimit = this.workerConfig.getGlobalIngestionHeapLimitBytes() / (long)this.workerConfig.getCapacity();
        return new MemoryParameterOverridingAppenderatorConfig(baseConfig, perWorkerLimit);
    }

    private IndexMerger wrapIndexMerger(IndexMerger baseMerger) {
        return new LimitedPoolIndexMerger(baseMerger, this.mergeExecutor);
    }

    public static class LimitedPoolIndexMerger
    implements IndexMerger {
        private static final String ERROR_MSG = "Shouldn't be called";
        private final IndexMerger baseMerger;
        private final ListeningExecutorService mergeExecutor;

        public LimitedPoolIndexMerger(IndexMerger baseMerger, ListeningExecutorService mergeExecutor) {
            this.baseMerger = baseMerger;
            this.mergeExecutor = mergeExecutor;
        }

        public File mergeQueryableIndex(List<QueryableIndex> indexes, boolean rollup, AggregatorFactory[] metricAggs, File outDir, IndexSpec indexSpec, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, int maxColumnsToMerge) {
            return this.mergeQueryableIndex(indexes, rollup, metricAggs, null, outDir, indexSpec, segmentWriteOutMediumFactory, maxColumnsToMerge);
        }

        public File mergeQueryableIndex(final List<QueryableIndex> indexes, final boolean rollup, final AggregatorFactory[] metricAggs, final @Nullable DimensionsSpec dimensionsSpec, final File outDir, final IndexSpec indexSpec, final @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, final int maxColumnsToMerge) {
            ListenableFuture mergeFuture = this.mergeExecutor.submit((Callable)new Callable<File>(){

                @Override
                public File call() {
                    try {
                        return baseMerger.mergeQueryableIndex(indexes, rollup, metricAggs, dimensionsSpec, outDir, indexSpec, segmentWriteOutMediumFactory, maxColumnsToMerge);
                    }
                    catch (IOException ioe) {
                        throw new RuntimeException(ioe);
                    }
                }
            });
            try {
                return (File)mergeFuture.get();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public File persist(final IncrementalIndex index, final Interval dataInterval, final File outDir, final IndexSpec indexSpec, final @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory) {
            ListenableFuture mergeFuture = this.mergeExecutor.submit((Callable)new Callable<File>(){

                @Override
                public File call() {
                    try {
                        return baseMerger.persist(index, dataInterval, outDir, indexSpec, segmentWriteOutMediumFactory);
                    }
                    catch (IOException ioe) {
                        throw new RuntimeException(ioe);
                    }
                }
            });
            try {
                return (File)mergeFuture.get();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public File merge(List<IndexableAdapter> indexes, boolean rollup, AggregatorFactory[] metricAggs, File outDir, IndexSpec indexSpec, int maxColumnsToMerge) {
            throw new UOE(ERROR_MSG, new Object[0]);
        }

        public File convert(File inDir, File outDir, IndexSpec indexSpec) {
            throw new UOE(ERROR_MSG, new Object[0]);
        }

        public File append(List<IndexableAdapter> indexes, AggregatorFactory[] aggregators, File outDir, IndexSpec indexSpec, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory) {
            throw new UOE(ERROR_MSG, new Object[0]);
        }

        public File persist(IncrementalIndex index, File outDir, IndexSpec indexSpec, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory) {
            throw new UOE(ERROR_MSG, new Object[0]);
        }

        public File persist(IncrementalIndex index, Interval dataInterval, File outDir, IndexSpec indexSpec, ProgressIndicator progress, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory) {
            throw new UOE(ERROR_MSG, new Object[0]);
        }

        public File mergeQueryableIndex(List<QueryableIndex> indexes, boolean rollup, AggregatorFactory[] metricAggs, @Nullable DimensionsSpec dimensionsSpec, File outDir, IndexSpec indexSpec, ProgressIndicator progress, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, int maxColumnsToMerge) {
            throw new UOE(ERROR_MSG, new Object[0]);
        }
    }

    private static class MemoryParameterOverridingAppenderatorConfig
    implements AppenderatorConfig {
        private final AppenderatorConfig baseConfig;
        private final long newMaxBytesInMemory;

        public MemoryParameterOverridingAppenderatorConfig(AppenderatorConfig baseConfig, long newMaxBytesInMemory) {
            this.baseConfig = baseConfig;
            this.newMaxBytesInMemory = newMaxBytesInMemory;
        }

        @Override
        public boolean isReportParseExceptions() {
            return this.baseConfig.isReportParseExceptions();
        }

        @Override
        public AppendableIndexSpec getAppendableIndexSpec() {
            return this.baseConfig.getAppendableIndexSpec();
        }

        @Override
        public int getMaxRowsInMemory() {
            return Integer.MAX_VALUE;
        }

        @Override
        public long getMaxBytesInMemory() {
            return this.newMaxBytesInMemory;
        }

        @Override
        public boolean isSkipBytesInMemoryOverheadCheck() {
            return this.baseConfig.isSkipBytesInMemoryOverheadCheck();
        }

        @Override
        public int getMaxPendingPersists() {
            return this.baseConfig.getMaxPendingPersists();
        }

        @Override
        @Nullable
        public Integer getMaxRowsPerSegment() {
            return this.baseConfig.getMaxRowsPerSegment();
        }

        @Override
        @Nullable
        public Long getMaxTotalRows() {
            return this.baseConfig.getMaxTotalRows();
        }

        @Override
        public PartitionsSpec getPartitionsSpec() {
            return this.baseConfig.getPartitionsSpec();
        }

        @Override
        public Period getIntermediatePersistPeriod() {
            return this.baseConfig.getIntermediatePersistPeriod();
        }

        @Override
        public IndexSpec getIndexSpec() {
            return this.baseConfig.getIndexSpec();
        }

        @Override
        public IndexSpec getIndexSpecForIntermediatePersists() {
            return this.baseConfig.getIndexSpecForIntermediatePersists();
        }

        @Override
        public File getBasePersistDirectory() {
            return this.baseConfig.getBasePersistDirectory();
        }

        @Override
        public AppenderatorConfig withBasePersistDirectory(File basePersistDirectory) {
            return this;
        }

        @Override
        @Nullable
        public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory() {
            return this.baseConfig.getSegmentWriteOutMediumFactory();
        }
    }

    @VisibleForTesting
    public class DatasourceBundle {
        private final SinkQuerySegmentWalker walker;
        private final Map<String, List<Appenderator>> taskAppenderatorMap = new HashMap<String, List<Appenderator>>();

        public DatasourceBundle(String dataSource) {
            VersionedIntervalTimeline sinkTimeline = new VersionedIntervalTimeline(String.CASE_INSENSITIVE_ORDER);
            this.walker = new SinkQuerySegmentWalker(dataSource, (VersionedIntervalTimeline<String, Sink>)sinkTimeline, UnifiedIndexerAppenderatorsManager.this.objectMapper, UnifiedIndexerAppenderatorsManager.this.serviceEmitter, (QueryRunnerFactoryConglomerate)UnifiedIndexerAppenderatorsManager.this.queryRunnerFactoryConglomerateProvider.get(), UnifiedIndexerAppenderatorsManager.this.queryProcessingPool, UnifiedIndexerAppenderatorsManager.this.joinableFactory, (Cache)Preconditions.checkNotNull((Object)UnifiedIndexerAppenderatorsManager.this.cache, (Object)"cache"), UnifiedIndexerAppenderatorsManager.this.cacheConfig, UnifiedIndexerAppenderatorsManager.this.cachePopulatorStats);
        }

        public SinkQuerySegmentWalker getWalker() {
            return this.walker;
        }

        public void addAppenderator(String taskId, Appenderator appenderator) {
            this.taskAppenderatorMap.computeIfAbsent(taskId, myTaskId -> new ArrayList()).add(appenderator);
        }
    }
}

