/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.changelog.fs;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.changelog.fs.StateChangeSet;
import org.apache.flink.changelog.fs.StateChangeUploadScheduler;
import org.apache.flink.changelog.fs.UploadResult;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.runtime.state.changelog.SequenceNumberRange;
import org.apache.flink.runtime.state.changelog.StateChange;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
class FsStateChangelogWriter
implements StateChangelogWriter<ChangelogStateHandleStreamImpl> {
    private static final Logger LOG = LoggerFactory.getLogger(FsStateChangelogWriter.class);
    private static final SequenceNumber INITIAL_SQN = SequenceNumber.of((long)0L);
    private final UUID logId;
    private final KeyGroupRange keyGroupRange;
    private final StateChangeUploadScheduler uploader;
    private final long preEmptivePersistThresholdInBytes;
    private final List<UploadCompletionListener> uploadCompletionListeners = new ArrayList<UploadCompletionListener>();
    private SequenceNumber activeSequenceNumber = INITIAL_SQN;
    private SequenceNumber lowestSequenceNumber = INITIAL_SQN;
    private List<StateChange> activeChangeSet = new ArrayList<StateChange>();
    private long activeChangeSetSize;
    private final NavigableMap<SequenceNumber, StateChangeSet> notUploaded = new TreeMap<SequenceNumber, StateChangeSet>();
    private final NavigableMap<SequenceNumber, UploadResult> uploaded = new TreeMap<SequenceNumber, UploadResult>();
    @Nullable
    private Tuple2<SequenceNumber, Throwable> highestFailed;
    private boolean closed;
    private final MailboxExecutor mailboxExecutor;

    FsStateChangelogWriter(UUID logId, KeyGroupRange keyGroupRange, StateChangeUploadScheduler uploader, long preEmptivePersistThresholdInBytes, MailboxExecutor mailboxExecutor) {
        this.logId = logId;
        this.keyGroupRange = keyGroupRange;
        this.uploader = uploader;
        this.preEmptivePersistThresholdInBytes = preEmptivePersistThresholdInBytes;
        this.mailboxExecutor = mailboxExecutor;
    }

    public void append(int keyGroup, byte[] value) throws IOException {
        LOG.trace("append to {}: keyGroup={} {} bytes", new Object[]{this.logId, keyGroup, value.length});
        Preconditions.checkState((!this.closed ? 1 : 0) != 0, (String)"%s is closed", (Object[])new Object[]{this.logId});
        this.activeChangeSet.add(new StateChange(keyGroup, value));
        this.activeChangeSetSize += (long)value.length;
        if (this.activeChangeSetSize >= this.preEmptivePersistThresholdInBytes) {
            LOG.debug("pre-emptively flush {}Mb of appended changes to the common store", (Object)(this.activeChangeSetSize / 1024L / 1024L));
            this.persistInternal(this.notUploaded.isEmpty() ? this.activeSequenceNumber : (SequenceNumber)this.notUploaded.firstKey());
        }
    }

    public SequenceNumber initialSequenceNumber() {
        return INITIAL_SQN;
    }

    public SequenceNumber nextSequenceNumber() {
        this.rollover();
        LOG.trace("query {} sqn: {}", (Object)this.logId, (Object)this.activeSequenceNumber);
        return this.activeSequenceNumber;
    }

    public CompletableFuture<ChangelogStateHandleStreamImpl> persist(SequenceNumber from) throws IOException {
        LOG.debug("persist {} starting from sqn {} (incl.), active sqn: {}", new Object[]{this.logId, from, this.activeSequenceNumber});
        return this.persistInternal(from);
    }

    private CompletableFuture<ChangelogStateHandleStreamImpl> persistInternal(SequenceNumber from) throws IOException {
        this.ensureCanPersist(from);
        this.rollover();
        Map<SequenceNumber, StateChangeSet> toUpload = FsStateChangelogWriter.drainTailMap(this.notUploaded, from);
        NavigableMap<SequenceNumber, UploadResult> readyToReturn = this.uploaded.tailMap(from, true);
        LOG.debug("collected readyToReturn: {}, toUpload: {}", readyToReturn, toUpload);
        SequenceNumberRange range = SequenceNumberRange.generic((SequenceNumber)from, (SequenceNumber)this.activeSequenceNumber);
        if (range.size() == (long)readyToReturn.size()) {
            Preconditions.checkState((boolean)toUpload.isEmpty());
            return CompletableFuture.completedFuture(FsStateChangelogWriter.buildHandle(this.keyGroupRange, readyToReturn, 0L));
        }
        CompletableFuture<ChangelogStateHandleStreamImpl> future = new CompletableFuture<ChangelogStateHandleStreamImpl>();
        this.uploadCompletionListeners.add(new UploadCompletionListener(this.keyGroupRange, range, readyToReturn, future));
        if (!toUpload.isEmpty()) {
            StateChangeUploadScheduler.UploadTask uploadTask = new StateChangeUploadScheduler.UploadTask(toUpload.values(), this::handleUploadSuccess, this::handleUploadFailure);
            this.uploader.upload(uploadTask);
        }
        return future;
    }

    private void handleUploadFailure(List<SequenceNumber> failedSqn, Throwable throwable) {
        this.mailboxExecutor.execute(() -> {
            if (this.closed) {
                return;
            }
            this.uploadCompletionListeners.removeIf(listener -> listener.onFailure(failedSqn, throwable));
            failedSqn.stream().max(Comparator.naturalOrder()).filter(sqn -> sqn.compareTo((Object)this.lowestSequenceNumber) >= 0).filter(sqn -> this.highestFailed == null || sqn.compareTo(this.highestFailed.f0) > 0).ifPresent(sqn -> {
                this.highestFailed = Tuple2.of((Object)sqn, (Object)throwable);
            });
        }, "handleUploadFailure");
    }

    private void handleUploadSuccess(List<UploadResult> results) {
        this.mailboxExecutor.execute(() -> {
            if (this.closed) {
                results.forEach(r -> IOUtils.closeAllQuietly((AutoCloseable[])new AutoCloseable[]{() -> r.getStreamStateHandle().discardState()}));
            } else {
                this.uploadCompletionListeners.removeIf(listener -> listener.onSuccess(results));
                for (UploadResult result : results) {
                    if (result.sequenceNumber.compareTo((Object)this.lowestSequenceNumber) < 0) continue;
                    this.uploaded.put(result.sequenceNumber, result);
                }
            }
        }, "handleUploadSuccess");
    }

    public void close() {
        LOG.debug("close {}", (Object)this.logId);
        Preconditions.checkState((!this.closed ? 1 : 0) != 0);
        this.closed = true;
        this.activeChangeSet.clear();
        this.activeChangeSetSize = 0L;
        this.notUploaded.clear();
        this.uploaded.clear();
    }

    public void truncate(SequenceNumber to) {
        LOG.debug("truncate {} to sqn {} (excl.)", (Object)this.logId, (Object)to);
        Preconditions.checkArgument((to.compareTo((Object)this.activeSequenceNumber) <= 0 ? 1 : 0) != 0);
        this.lowestSequenceNumber = to;
        this.notUploaded.headMap(this.lowestSequenceNumber, false).clear();
        this.uploaded.headMap(this.lowestSequenceNumber, false).clear();
    }

    private void rollover() {
        if (this.activeChangeSet.isEmpty()) {
            return;
        }
        this.notUploaded.put(this.activeSequenceNumber, new StateChangeSet(this.logId, this.activeSequenceNumber, this.activeChangeSet));
        this.activeSequenceNumber = this.activeSequenceNumber.next();
        LOG.debug("bump active sqn to {}", (Object)this.activeSequenceNumber);
        this.activeChangeSet = new ArrayList<StateChange>();
        this.activeChangeSetSize = 0L;
    }

    public void confirm(SequenceNumber from, SequenceNumber to) {
    }

    public void reset(SequenceNumber from, SequenceNumber to) {
    }

    private static ChangelogStateHandleStreamImpl buildHandle(KeyGroupRange keyGroupRange, NavigableMap<SequenceNumber, UploadResult> results, long incrementalSize) {
        ArrayList<Tuple2> tuples = new ArrayList<Tuple2>();
        long size = 0L;
        for (UploadResult uploadResult : results.values()) {
            tuples.add(Tuple2.of((Object)uploadResult.getStreamStateHandle(), (Object)uploadResult.getOffset()));
            size += uploadResult.getSize();
        }
        return new ChangelogStateHandleStreamImpl(tuples, keyGroupRange, size, incrementalSize);
    }

    @VisibleForTesting
    SequenceNumber lastAppendedSqnUnsafe() {
        return this.activeSequenceNumber;
    }

    @VisibleForTesting
    public SequenceNumber getLowestSequenceNumber() {
        return this.lowestSequenceNumber;
    }

    private void ensureCanPersist(SequenceNumber from) throws IOException {
        Preconditions.checkNotNull((Object)from);
        if (this.highestFailed != null && ((SequenceNumber)this.highestFailed.f0).compareTo((Object)from) >= 0) {
            throw new IOException("The upload for " + this.highestFailed.f0 + " has already failed previously", (Throwable)this.highestFailed.f1);
        }
        if (this.lowestSequenceNumber.compareTo((Object)from) > 0) {
            throw new IllegalArgumentException(String.format("Requested changes were truncated (requested: %s, truncated: %s)", from, this.lowestSequenceNumber));
        }
        if (this.activeSequenceNumber.compareTo((Object)from) < 0) {
            throw new IllegalArgumentException(String.format("Requested changes were not yet appended (requested: %s, appended: %s)", from, this.activeSequenceNumber));
        }
    }

    private static Map<SequenceNumber, StateChangeSet> drainTailMap(NavigableMap<SequenceNumber, StateChangeSet> src, SequenceNumber fromInclusive) {
        NavigableMap<SequenceNumber, StateChangeSet> tailMap = src.tailMap(fromInclusive, true);
        HashMap<SequenceNumber, StateChangeSet> toUpload = new HashMap<SequenceNumber, StateChangeSet>(tailMap);
        tailMap.clear();
        return toUpload;
    }

    private static final class UploadCompletionListener {
        private final NavigableMap<SequenceNumber, UploadResult> uploaded;
        private final CompletableFuture<ChangelogStateHandleStreamImpl> completionFuture;
        private final KeyGroupRange keyGroupRange;
        private final SequenceNumberRange changeRange;

        private UploadCompletionListener(KeyGroupRange keyGroupRange, SequenceNumberRange changeRange, Map<SequenceNumber, UploadResult> uploaded, CompletableFuture<ChangelogStateHandleStreamImpl> completionFuture) {
            Preconditions.checkArgument((!changeRange.isEmpty() ? 1 : 0) != 0, (String)"Empty change range not allowed: %s", (Object[])new Object[]{changeRange});
            this.uploaded = new TreeMap<SequenceNumber, UploadResult>(uploaded);
            this.completionFuture = completionFuture;
            this.keyGroupRange = keyGroupRange;
            this.changeRange = changeRange;
        }

        public boolean onSuccess(List<UploadResult> uploadResults) {
            long incrementalSize = 0L;
            for (UploadResult uploadResult : uploadResults) {
                if (!this.changeRange.contains(uploadResult.sequenceNumber)) continue;
                this.uploaded.put(uploadResult.sequenceNumber, uploadResult);
                incrementalSize += uploadResult.getSize();
                if ((long)this.uploaded.size() != this.changeRange.size()) continue;
                this.completionFuture.complete(FsStateChangelogWriter.buildHandle(this.keyGroupRange, this.uploaded, incrementalSize));
                return true;
            }
            return false;
        }

        public boolean onFailure(List<SequenceNumber> sequenceNumbers, Throwable throwable) {
            IOException ioException = throwable instanceof IOException ? (IOException)throwable : new IOException(throwable);
            for (SequenceNumber sequenceNumber : sequenceNumbers) {
                if (!this.changeRange.contains(sequenceNumber)) continue;
                this.completionFuture.completeExceptionally(ioException);
                return true;
            }
            return false;
        }
    }
}

