/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.contrib.streaming.state.RocksDBStateDataTransfer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.ThrowingRunnable;

public class RocksDBStateDownloader
extends RocksDBStateDataTransfer {
    public RocksDBStateDownloader(int restoringThreadNum) {
        super(restoringThreadNum);
    }

    public void transferAllStateDataToDirectory(IncrementalRemoteKeyedStateHandle restoreStateHandle, Path dest, CloseableRegistry closeableRegistry) throws Exception {
        Map sstFiles = restoreStateHandle.getSharedState();
        Map miscFiles = restoreStateHandle.getPrivateState();
        this.downloadDataForAllStateHandles(sstFiles, dest, closeableRegistry);
        this.downloadDataForAllStateHandles(miscFiles, dest, closeableRegistry);
    }

    private void downloadDataForAllStateHandles(Map<StateHandleID, StreamStateHandle> stateHandleMap, Path restoreInstancePath, CloseableRegistry closeableRegistry) throws Exception {
        try {
            List<Runnable> runnables = this.createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry);
            ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>(runnables.size());
            for (Runnable runnable : runnables) {
                futures.add(CompletableFuture.runAsync(runnable, this.executorService));
            }
            FutureUtils.waitForAll(futures).get();
        }
        catch (ExecutionException e) {
            Throwable throwable = ExceptionUtils.stripExecutionException((Throwable)e);
            throwable = ExceptionUtils.stripException((Throwable)throwable, RuntimeException.class);
            if (throwable instanceof IOException) {
                throw (IOException)throwable;
            }
            throw new FlinkRuntimeException("Failed to download data for state handles.", (Throwable)e);
        }
    }

    private List<Runnable> createDownloadRunnables(Map<StateHandleID, StreamStateHandle> stateHandleMap, Path restoreInstancePath, CloseableRegistry closeableRegistry) {
        ArrayList<Runnable> runnables = new ArrayList<Runnable>(stateHandleMap.size());
        for (Map.Entry<StateHandleID, StreamStateHandle> entry : stateHandleMap.entrySet()) {
            StateHandleID stateHandleID = entry.getKey();
            StreamStateHandle remoteFileHandle = entry.getValue();
            Path path = restoreInstancePath.resolve(stateHandleID.toString());
            runnables.add(ThrowingRunnable.unchecked(() -> this.downloadDataForStateHandle(path, remoteFileHandle, closeableRegistry)));
        }
        return runnables;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void downloadDataForStateHandle(Path restoreFilePath, StreamStateHandle remoteFileHandle, CloseableRegistry closeableRegistry) throws IOException {
        FSDataInputStream inputStream = null;
        OutputStream outputStream = null;
        try {
            int numBytes;
            inputStream = remoteFileHandle.openInputStream();
            closeableRegistry.registerCloseable((AutoCloseable)inputStream);
            Files.createDirectories(restoreFilePath.getParent(), new FileAttribute[0]);
            outputStream = Files.newOutputStream(restoreFilePath, new OpenOption[0]);
            closeableRegistry.registerCloseable((AutoCloseable)outputStream);
            byte[] buffer = new byte[8192];
            while ((numBytes = inputStream.read(buffer)) != -1) {
                outputStream.write(buffer, 0, numBytes);
            }
        }
        finally {
            if (closeableRegistry.unregisterCloseable((AutoCloseable)inputStream)) {
                inputStream.close();
            }
            if (closeableRegistry.unregisterCloseable((AutoCloseable)outputStream)) {
                outputStream.close();
            }
        }
    }
}

