/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.server.coordinator.duty;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.duty.CompactionSegmentIterator;
import org.apache.druid.server.coordinator.duty.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
import org.apache.druid.server.coordinator.duty.NewestSegmentFirstPolicy;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.Interval;

public class CompactSegments
implements CoordinatorDuty {
    static final String COMPACTION_TASK_COUNT = "compactTaskCount";
    static final String TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION = "segmentSizeWaitCompact";
    public static final String COMPACTION_TASK_TYPE = "compact";
    public static final String STORE_COMPACTION_STATE_KEY = "storeCompactionState";
    private static final Logger LOG = new Logger(CompactSegments.class);
    private final CompactionSegmentSearchPolicy policy;
    private final IndexingServiceClient indexingServiceClient;
    private Object2LongOpenHashMap<String> totalSizesOfSegmentsAwaitingCompactionPerDataSource;

    @Inject
    public CompactSegments(ObjectMapper objectMapper, IndexingServiceClient indexingServiceClient) {
        this.policy = new NewestSegmentFirstPolicy(objectMapper);
        this.indexingServiceClient = indexingServiceClient;
    }

    @Override
    public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) {
        LOG.info("Compact segments", new Object[0]);
        CoordinatorCompactionConfig dynamicConfig = params.getCoordinatorCompactionConfig();
        CoordinatorStats stats = new CoordinatorStats();
        if (dynamicConfig.getMaxCompactionTaskSlots() > 0) {
            Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources = params.getUsedSegmentsTimelinesPerDataSource();
            List<DataSourceCompactionConfig> compactionConfigList = dynamicConfig.getCompactionConfigs();
            if (compactionConfigList != null && !compactionConfigList.isEmpty()) {
                Map<String, DataSourceCompactionConfig> compactionConfigs = compactionConfigList.stream().collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity()));
                List<TaskStatusPlus> compactionTasks = CompactSegments.filterNonCompactionTasks(this.indexingServiceClient.getActiveTasks());
                HashMap compactionTaskIntervals = Maps.newHashMapWithExpectedSize((int)compactionConfigList.size());
                int numEstimatedNonCompleteCompactionTasks = 0;
                for (TaskStatusPlus status : compactionTasks) {
                    TaskPayloadResponse response = this.indexingServiceClient.getTaskPayload(status.getId());
                    if (response == null) {
                        throw new ISE("WTH? got a null paylord from overlord for task[%s]", new Object[]{status.getId()});
                    }
                    if (COMPACTION_TASK_TYPE.equals(response.getPayload().getType())) {
                        ClientCompactionTaskQuery compactionTaskQuery = (ClientCompactionTaskQuery)response.getPayload();
                        Interval interval = compactionTaskQuery.getIoConfig().getInputSpec().getInterval();
                        compactionTaskIntervals.computeIfAbsent(status.getDataSource(), k -> new ArrayList()).add(interval);
                        int numSubTasks = this.findNumMaxConcurrentSubTasks(compactionTaskQuery.getTuningConfig());
                        numEstimatedNonCompleteCompactionTasks += numSubTasks + 1;
                        continue;
                    }
                    throw new ISE("WTH? task[%s] is not a compactionTask?", new Object[]{status.getId()});
                }
                CompactionSegmentIterator iterator = this.policy.reset(compactionConfigs, dataSources, compactionTaskIntervals);
                int compactionTaskCapacity = (int)Math.min((double)this.indexingServiceClient.getTotalWorkerCapacity() * dynamicConfig.getCompactionTaskSlotRatio(), (double)dynamicConfig.getMaxCompactionTaskSlots());
                int numAvailableCompactionTaskSlots = numEstimatedNonCompleteCompactionTasks > 0 ? Math.max(0, compactionTaskCapacity - numEstimatedNonCompleteCompactionTasks) : Math.max(1, compactionTaskCapacity);
                LOG.info("Found [%d] available task slots for compaction out of [%d] max compaction task capacity", new Object[]{numAvailableCompactionTaskSlots, compactionTaskCapacity});
                if (numAvailableCompactionTaskSlots > 0) {
                    stats.accumulate(this.doRun(compactionConfigs, numAvailableCompactionTaskSlots, iterator));
                } else {
                    stats.accumulate(this.makeStats(0, iterator));
                }
            } else {
                LOG.info("compactionConfig is empty. Skip.", new Object[0]);
            }
        } else {
            LOG.info("maxCompactionTaskSlots was set to 0. Skip compaction", new Object[0]);
        }
        return params.buildFromExisting().withCoordinatorStats(stats).build();
    }

    private int findNumMaxConcurrentSubTasks(@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig) {
        if (tuningConfig != null && tuningConfig.getMaxNumConcurrentSubTasks() != null) {
            return tuningConfig.getMaxNumConcurrentSubTasks();
        }
        return 0;
    }

    private static List<TaskStatusPlus> filterNonCompactionTasks(List<TaskStatusPlus> taskStatuses) {
        return taskStatuses.stream().filter(status -> {
            String taskType = status.getType();
            return taskType == null || COMPACTION_TASK_TYPE.equals(taskType);
        }).collect(Collectors.toList());
    }

    private CoordinatorStats doRun(Map<String, DataSourceCompactionConfig> compactionConfigs, int numAvailableCompactionTaskSlots, CompactionSegmentIterator iterator) {
        int numSubmittedTasks;
        DataSourceCompactionConfig config;
        for (numSubmittedTasks = 0; iterator.hasNext() && numSubmittedTasks < numAvailableCompactionTaskSlots; numSubmittedTasks += this.findNumMaxConcurrentSubTasks(config.getTuningConfig()) + 1) {
            List segmentsToCompact = (List)iterator.next();
            if (!segmentsToCompact.isEmpty()) {
                String dataSourceName = ((DataSegment)segmentsToCompact.get(0)).getDataSource();
                config = compactionConfigs.get(dataSourceName);
                String taskId = this.indexingServiceClient.compactSegments(segmentsToCompact, config.getTaskPriority(), ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()), this.newAutoCompactionContext(config.getTaskContext()));
                LOG.info("Submitted a compactionTask[%s] for %s segments", new Object[]{taskId, segmentsToCompact.size()});
                LOG.infoSegments((Collection)segmentsToCompact, "Compacting segments");
                continue;
            }
            throw new ISE("segmentsToCompact is empty?", new Object[0]);
        }
        return this.makeStats(numSubmittedTasks, iterator);
    }

    private Map<String, Object> newAutoCompactionContext(@Nullable Map<String, Object> configuredContext) {
        HashMap<String, Object> newContext = configuredContext == null ? new HashMap<String, Object>() : new HashMap<String, Object>(configuredContext);
        newContext.put(STORE_COMPACTION_STATE_KEY, true);
        return newContext;
    }

    private CoordinatorStats makeStats(int numCompactionTasks, CompactionSegmentIterator iterator) {
        CoordinatorStats stats = new CoordinatorStats();
        stats.addToGlobalStat(COMPACTION_TASK_COUNT, numCompactionTasks);
        this.totalSizesOfSegmentsAwaitingCompactionPerDataSource = iterator.totalRemainingSegmentsSizeBytes();
        this.totalSizesOfSegmentsAwaitingCompactionPerDataSource.object2LongEntrySet().fastForEach(entry -> {
            String dataSource = (String)entry.getKey();
            long totalSizeOfSegmentsAwaitingCompaction = entry.getLongValue();
            stats.addToDataSourceStat(TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION, dataSource, totalSizeOfSegmentsAwaitingCompaction);
        });
        return stats;
    }

    @Nullable
    public Long getTotalSizeOfSegmentsAwaitingCompaction(String dataSource) {
        return this.totalSizesOfSegmentsAwaitingCompactionPerDataSource.get((Object)dataSource);
    }
}

