/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.io.network.partition.AbstractPartitionTracker;
import org.apache.flink.runtime.io.network.partition.PartitionTrackerEntry;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionInfo;
import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskExecutorPartitionTrackerImpl
extends AbstractPartitionTracker<JobID, TaskExecutorPartitionInfo>
implements TaskExecutorPartitionTracker {
    private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorPartitionTrackerImpl.class);
    private final Map<IntermediateDataSetID, DataSetEntry> clusterPartitions = new HashMap<IntermediateDataSetID, DataSetEntry>();
    private final ShuffleEnvironment<?, ?> shuffleEnvironment;

    public TaskExecutorPartitionTrackerImpl(ShuffleEnvironment<?, ?> shuffleEnvironment) {
        this.shuffleEnvironment = shuffleEnvironment;
    }

    @Override
    public void startTrackingPartition(JobID producingJobId, TaskExecutorPartitionInfo partitionInfo) {
        Preconditions.checkNotNull(producingJobId);
        Preconditions.checkNotNull(partitionInfo);
        this.startTrackingPartition(producingJobId, partitionInfo.getResultPartitionId(), partitionInfo);
    }

    @Override
    public void stopTrackingAndReleaseJobPartitions(Collection<ResultPartitionID> partitionsToRelease) {
        LOG.debug("Releasing Job Partitions {}", partitionsToRelease);
        if (partitionsToRelease.isEmpty()) {
            return;
        }
        this.stopTrackingPartitions(partitionsToRelease);
        this.shuffleEnvironment.releasePartitionsLocally(partitionsToRelease);
    }

    @Override
    public void stopTrackingAndReleaseJobPartitionsFor(JobID producingJobId) {
        Collection<ResultPartitionID> partitionsForJob = CollectionUtil.project(this.stopTrackingPartitionsFor(producingJobId), PartitionTrackerEntry::getResultPartitionId);
        LOG.debug("Releasing Job Partitions {} for job {}", partitionsForJob, (Object)producingJobId);
        this.shuffleEnvironment.releasePartitionsLocally(partitionsForJob);
    }

    @Override
    public void promoteJobPartitions(Collection<ResultPartitionID> partitionsToPromote) {
        LOG.debug("Promoting Job Partitions {}", partitionsToPromote);
        if (partitionsToPromote.isEmpty()) {
            return;
        }
        Collection partitionTrackerEntries = this.stopTrackingPartitions(partitionsToPromote);
        for (PartitionTrackerEntry partitionTrackerEntry : partitionTrackerEntries) {
            TaskExecutorPartitionInfo dataSetMetaInfo = (TaskExecutorPartitionInfo)partitionTrackerEntry.getMetaInfo();
            DataSetEntry dataSetEntry = this.clusterPartitions.computeIfAbsent(dataSetMetaInfo.getIntermediateDataSetId(), ignored -> new DataSetEntry(dataSetMetaInfo.getNumberOfPartitions()));
            dataSetEntry.addPartition(((TaskExecutorPartitionInfo)partitionTrackerEntry.getMetaInfo()).getShuffleDescriptor());
        }
    }

    @Override
    public void stopTrackingAndReleaseClusterPartitions(Collection<IntermediateDataSetID> dataSetsToRelease) {
        for (IntermediateDataSetID dataSetID : dataSetsToRelease) {
            DataSetEntry dataSetEntry = this.clusterPartitions.remove(dataSetID);
            Set<ResultPartitionID> partitionIds = dataSetEntry.getPartitionIds();
            this.shuffleEnvironment.releasePartitionsLocally(partitionIds);
        }
    }

    @Override
    public void stopTrackingAndReleaseAllClusterPartitions() {
        this.clusterPartitions.values().stream().map(DataSetEntry::getPartitionIds).forEach(this.shuffleEnvironment::releasePartitionsLocally);
        this.clusterPartitions.clear();
    }

    @Override
    public ClusterPartitionReport createClusterPartitionReport() {
        List<ClusterPartitionReport.ClusterPartitionReportEntry> reportEntries = this.clusterPartitions.entrySet().stream().map(entry -> new ClusterPartitionReport.ClusterPartitionReportEntry((IntermediateDataSetID)entry.getKey(), ((DataSetEntry)entry.getValue()).getTotalNumberOfPartitions(), ((DataSetEntry)entry.getValue()).getShuffleDescriptors())).collect(Collectors.toList());
        return new ClusterPartitionReport(reportEntries);
    }

    private static class DataSetEntry {
        private final Map<ResultPartitionID, ShuffleDescriptor> shuffleDescriptors = new HashMap<ResultPartitionID, ShuffleDescriptor>();
        private final int totalNumberOfPartitions;

        private DataSetEntry(int totalNumberOfPartitions) {
            this.totalNumberOfPartitions = totalNumberOfPartitions;
        }

        void addPartition(ShuffleDescriptor shuffleDescriptor) {
            this.shuffleDescriptors.put(shuffleDescriptor.getResultPartitionID(), shuffleDescriptor);
        }

        public Set<ResultPartitionID> getPartitionIds() {
            return this.shuffleDescriptors.keySet();
        }

        public int getTotalNumberOfPartitions() {
            return this.totalNumberOfPartitions;
        }

        public Map<ResultPartitionID, ShuffleDescriptor> getShuffleDescriptors() {
            return this.shuffleDescriptors;
        }
    }
}

