/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityNotifier;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;

public class TieredStorageConsumerClient {
    private final List<TierFactory> tierFactories;
    private final TieredStorageNettyService nettyService;
    private final List<TierConsumerAgent> tierConsumerAgents;
    private final Map<TieredStoragePartitionId, Map<TieredStorageSubpartitionId, Tuple2<TierConsumerAgent, Integer>>> currentConsumerAgentAndSegmentIds = new HashMap<TieredStoragePartitionId, Map<TieredStorageSubpartitionId, Tuple2<TierConsumerAgent, Integer>>>();

    public TieredStorageConsumerClient(List<TierFactory> tierFactories, List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs, TieredStorageNettyService nettyService) {
        this.tierFactories = tierFactories;
        this.nettyService = nettyService;
        this.tierConsumerAgents = this.createTierConsumerAgents(tieredStorageConsumerSpecs);
    }

    public void start() {
        for (TierConsumerAgent tierConsumerAgent : this.tierConsumerAgents) {
            tierConsumerAgent.start();
        }
    }

    public Optional<Buffer> getNextBuffer(TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId) {
        Tuple2<Object, Integer> currentConsumerAgentAndSegmentId = this.currentConsumerAgentAndSegmentIds.computeIfAbsent(partitionId, ignore -> new HashMap()).getOrDefault(subpartitionId, Tuple2.of(null, 0));
        Optional<Object> buffer = Optional.empty();
        if (currentConsumerAgentAndSegmentId.f0 == null) {
            for (TierConsumerAgent tierConsumerAgent : this.tierConsumerAgents) {
                buffer = tierConsumerAgent.getNextBuffer(partitionId, subpartitionId, (Integer)currentConsumerAgentAndSegmentId.f1);
                if (!buffer.isPresent()) continue;
                this.currentConsumerAgentAndSegmentIds.get(partitionId).put(subpartitionId, Tuple2.of(tierConsumerAgent, currentConsumerAgentAndSegmentId.f1));
                break;
            }
        } else {
            buffer = ((TierConsumerAgent)currentConsumerAgentAndSegmentId.f0).getNextBuffer(partitionId, subpartitionId, (Integer)currentConsumerAgentAndSegmentId.f1);
        }
        if (!buffer.isPresent()) {
            return Optional.empty();
        }
        Buffer bufferData = (Buffer)buffer.get();
        if (bufferData.getDataType() == Buffer.DataType.END_OF_SEGMENT) {
            this.currentConsumerAgentAndSegmentIds.get(partitionId).put(subpartitionId, Tuple2.of(null, (Integer)currentConsumerAgentAndSegmentId.f1 + 1));
            bufferData.recycleBuffer();
            return this.getNextBuffer(partitionId, subpartitionId);
        }
        return Optional.of(bufferData);
    }

    public void registerAvailabilityNotifier(AvailabilityNotifier notifier) {
        for (TierConsumerAgent tierConsumerAgent : this.tierConsumerAgents) {
            tierConsumerAgent.registerAvailabilityNotifier(notifier);
        }
    }

    public void close() throws IOException {
        for (TierConsumerAgent tierConsumerAgent : this.tierConsumerAgents) {
            tierConsumerAgent.close();
        }
    }

    private List<TierConsumerAgent> createTierConsumerAgents(List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs) {
        ArrayList<TierConsumerAgent> tierConsumerAgents = new ArrayList<TierConsumerAgent>();
        for (TierFactory tierFactory : this.tierFactories) {
            tierConsumerAgents.add(tierFactory.createConsumerAgent(tieredStorageConsumerSpecs, this.nettyService));
        }
        return tierConsumerAgents;
    }
}

