/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.gcp.pubsub.common;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.connectors.gcp.pubsub.common.AcknowledgeIdsForCheckpoint;
import org.apache.flink.streaming.connectors.gcp.pubsub.common.Acknowledger;

public class AcknowledgeOnCheckpoint<ACKID extends Serializable>
implements CheckpointListener,
ListCheckpointed<AcknowledgeIdsForCheckpoint<ACKID>> {
    private final Acknowledger<ACKID> acknowledger;
    private List<AcknowledgeIdsForCheckpoint<ACKID>> acknowledgeIdsPerCheckpoint;
    private List<ACKID> acknowledgeIdsForPendingCheckpoint;
    private AtomicInteger outstandingAcknowledgements;

    public AcknowledgeOnCheckpoint(Acknowledger<ACKID> acknowledger) {
        this.acknowledger = acknowledger;
        this.acknowledgeIdsPerCheckpoint = new ArrayList<AcknowledgeIdsForCheckpoint<ACKID>>();
        this.acknowledgeIdsForPendingCheckpoint = new ArrayList<ACKID>();
        this.outstandingAcknowledgements = new AtomicInteger(0);
    }

    public void addAcknowledgeId(ACKID id) {
        this.acknowledgeIdsForPendingCheckpoint.add(id);
        this.outstandingAcknowledgements.incrementAndGet();
    }

    public void notifyCheckpointComplete(long checkpointId) {
        List idsToAcknowledge = this.acknowledgeIdsPerCheckpoint.stream().filter(acknowledgeIdsForCheckpoint -> acknowledgeIdsForCheckpoint.getCheckpointId() <= checkpointId).flatMap(acknowledgeIdsForCheckpoint -> acknowledgeIdsForCheckpoint.getAcknowledgeIds().stream()).collect(Collectors.toList());
        this.acknowledger.acknowledge(idsToAcknowledge);
        this.acknowledgeIdsPerCheckpoint = this.acknowledgeIdsPerCheckpoint.stream().filter(acknowledgeIdsForCheckpoint -> acknowledgeIdsForCheckpoint.getCheckpointId() > checkpointId).collect(Collectors.toList());
        this.outstandingAcknowledgements = new AtomicInteger(this.numberOfAcknowledgementIds(this.acknowledgeIdsPerCheckpoint));
    }

    public void notifyCheckpointAborted(long checkpointId) {
    }

    public List<AcknowledgeIdsForCheckpoint<ACKID>> snapshotState(long checkpointId, long timestamp) {
        this.acknowledgeIdsPerCheckpoint.add(new AcknowledgeIdsForCheckpoint<ACKID>(checkpointId, this.acknowledgeIdsForPendingCheckpoint));
        this.acknowledgeIdsForPendingCheckpoint = new ArrayList<ACKID>();
        return this.acknowledgeIdsPerCheckpoint;
    }

    public void restoreState(List<AcknowledgeIdsForCheckpoint<ACKID>> state) {
        this.outstandingAcknowledgements = new AtomicInteger(this.numberOfAcknowledgementIds(state));
        this.acknowledgeIdsPerCheckpoint = state;
    }

    private int numberOfAcknowledgementIds(List<AcknowledgeIdsForCheckpoint<ACKID>> acknowledgeIdsForCheckpoints) {
        return acknowledgeIdsForCheckpoints.stream().map(AcknowledgeIdsForCheckpoint::getAcknowledgeIds).mapToInt(List::size).sum();
    }

    public int numberOfOutstandingAcknowledgements() {
        return this.outstandingAcknowledgements.get();
    }
}

