/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.changelog.fs;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.function.BiFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.changelog.fs.ChangelogStorageMetricGroup;
import org.apache.flink.changelog.fs.DuplicatingOutputStreamWithPos;
import org.apache.flink.changelog.fs.OutputStreamWithPos;
import org.apache.flink.changelog.fs.StateChangeFormat;
import org.apache.flink.changelog.fs.StateChangeSet;
import org.apache.flink.changelog.fs.StateChangeUploadScheduler;
import org.apache.flink.changelog.fs.StateChangeUploader;
import org.apache.flink.changelog.fs.TaskChangelogRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.shaded.guava32.com.google.common.io.Closer;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;

public abstract class AbstractStateChangeFsUploader
implements StateChangeUploader {
    private final StateChangeFormat format = new StateChangeFormat();
    private final Clock clock;
    private final TaskChangelogRegistry changelogRegistry;
    private final BiFunction<Path, Long, StreamStateHandle> handleFactory;
    protected final ChangelogStorageMetricGroup metrics;
    protected final boolean compression;
    protected final int bufferSize;

    public AbstractStateChangeFsUploader(boolean compression, int bufferSize, ChangelogStorageMetricGroup metrics, TaskChangelogRegistry changelogRegistry, BiFunction<Path, Long, StreamStateHandle> handleFactory) {
        this.compression = compression;
        this.bufferSize = bufferSize;
        this.metrics = metrics;
        this.clock = SystemClock.getInstance();
        this.changelogRegistry = changelogRegistry;
        this.handleFactory = handleFactory;
    }

    abstract OutputStreamWithPos prepareStream() throws IOException;

    @Override
    public StateChangeUploader.UploadTasksResult upload(Collection<StateChangeUploadScheduler.UploadTask> tasks) throws IOException {
        this.metrics.getUploadsCounter().inc();
        long start = this.clock.relativeTimeNanos();
        StateChangeUploader.UploadTasksResult result = this.uploadInternal(tasks);
        this.metrics.getUploadLatenciesNanos().update(this.clock.relativeTimeNanos() - start);
        this.metrics.getUploadSizes().update(result.getStateSize());
        return result;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private StateChangeUploader.UploadTasksResult uploadInternal(Collection<StateChangeUploadScheduler.UploadTask> tasks) throws IOException {
        try (OutputStreamWithPos stream = this.prepareStream();){
            HashMap<StateChangeUploadScheduler.UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> tasksOffsets222 = new HashMap<StateChangeUploadScheduler.UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>>();
            for (StateChangeUploadScheduler.UploadTask task : tasks) {
                tasksOffsets222.put(task, this.format.write(stream, task.changeSets));
            }
            long numOfChangeSets = tasks.stream().flatMap(t -> t.getChangeSets().stream()).count();
            StreamStateHandle handle = stream.getHandle(this.handleFactory);
            this.changelogRegistry.startTracking(handle, numOfChangeSets);
            if (stream instanceof DuplicatingOutputStreamWithPos) {
                StreamStateHandle localHandle = ((DuplicatingOutputStreamWithPos)stream).getSecondaryHandle(this.handleFactory);
                this.changelogRegistry.startTracking(localHandle, numOfChangeSets);
                StateChangeUploader.UploadTasksResult uploadTasksResult = new StateChangeUploader.UploadTasksResult(tasksOffsets222, handle, localHandle);
                return uploadTasksResult;
            }
            StateChangeUploader.UploadTasksResult uploadTasksResult = new StateChangeUploader.UploadTasksResult(tasksOffsets222, handle);
            return uploadTasksResult;
        }
        catch (IOException e) {
            this.metrics.getUploadFailuresCounter().inc();
            try (Closer closer = Closer.create();){
                closer.register(() -> {
                    throw e;
                });
                tasks.forEach(cs -> closer.register(() -> cs.fail(e)));
                return null;
            }
        }
    }

    protected String generateFileName() {
        return UUID.randomUUID().toString();
    }
}

