/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.filesystem;

import java.io.DataInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.assertj.core.api.AbstractByteArrayAssert;
import org.assertj.core.api.AbstractFileAssert;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.Assert;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

@ExtendWith(value={ParameterizedTestExtension.class})
public class FsCheckpointStateOutputStreamTest {
    @Parameter
    public boolean relativePaths;
    @TempDir
    private java.nio.file.Path tempDir;

    @Parameters(name="relativePaths = {0}")
    public static List<Boolean> parameters() {
        return Arrays.asList(true, false);
    }

    @TestTemplate
    void testWrongParameters() throws Exception {
        Assertions.assertThatThrownBy(() -> new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile((File)TempDirUtils.newFolder((java.nio.file.Path)this.tempDir)), FileSystem.getLocalFileSystem(), 4000, 5000, this.relativePaths)).isInstanceOf(IllegalArgumentException.class);
    }

    @TestTemplate
    void testEmptyState() throws Exception {
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile((File)TempDirUtils.newFolder((java.nio.file.Path)this.tempDir)), FileSystem.getLocalFileSystem(), 1024, 512, this.relativePaths);
        StreamStateHandle handle = stream.closeAndGetHandle();
        Assertions.assertThat((Object)handle).isNull();
    }

    @TestTemplate
    void testStateBelowMemThreshold() throws Exception {
        this.runTest(999, 1024, 1000, false);
    }

    @TestTemplate
    void testStateOneBufferAboveThreshold() throws Exception {
        this.runTest(896, 1024, 15, true);
    }

    @TestTemplate
    void testStateAboveMemThreshold() throws Exception {
        this.runTest(576446, 259, 17, true);
    }

    @TestTemplate
    void testZeroThreshold() throws Exception {
        this.runTest(16678, 4096, 0, true);
    }

    @TestTemplate
    void testGetPos() throws Exception {
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile((File)TempDirUtils.newFolder((java.nio.file.Path)this.tempDir)), FileSystem.getLocalFileSystem(), 31, 17, this.relativePaths);
        for (int i = 0; i < 64; ++i) {
            Assertions.assertThat((long)stream.getPos()).isEqualTo((long)i);
            stream.write(66);
        }
        stream.closeAndGetHandle();
        stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile((File)TempDirUtils.newFolder((java.nio.file.Path)this.tempDir)), FileSystem.getLocalFileSystem(), 31, 17, this.relativePaths);
        byte[] data = "testme!".getBytes(ConfigConstants.DEFAULT_CHARSET);
        for (int i = 0; i < 7; ++i) {
            Assertions.assertThat((long)stream.getPos()).isEqualTo((long)i * (1L + (long)data.length));
            stream.write(66);
            stream.write(data);
        }
        stream.closeAndGetHandle();
    }

    @TestTemplate
    void testCleanupWhenClosingStream() throws IOException {
        FileSystem fs = (FileSystem)Mockito.mock(FileSystem.class);
        FSDataOutputStream outputStream = (FSDataOutputStream)Mockito.mock(FSDataOutputStream.class);
        ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(Path.class);
        Mockito.when((Object)fs.create((Path)pathCaptor.capture(), (FileSystem.WriteMode)Matchers.any(FileSystem.WriteMode.class))).thenReturn((Object)outputStream);
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile((File)TempDirUtils.newFolder((java.nio.file.Path)this.tempDir)), fs, 4, 0, this.relativePaths);
        stream.write(new byte[]{1, 2, 3, 4, 5});
        ((FileSystem)Mockito.verify((Object)fs)).create((Path)Matchers.any(Path.class), (FileSystem.WriteMode)Matchers.any(FileSystem.WriteMode.class));
        stream.close();
        ((FileSystem)Mockito.verify((Object)fs)).delete((Path)Matchers.eq((Object)((Path)pathCaptor.getValue())), Matchers.anyBoolean());
    }

    @TestTemplate
    void testCleanupWhenFailingCloseAndGetHandle() throws IOException {
        FileSystem fs = (FileSystem)Mockito.mock(FileSystem.class);
        FSDataOutputStream outputStream = (FSDataOutputStream)Mockito.mock(FSDataOutputStream.class);
        ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(Path.class);
        Mockito.when((Object)fs.create((Path)pathCaptor.capture(), (FileSystem.WriteMode)Matchers.any(FileSystem.WriteMode.class))).thenReturn((Object)outputStream);
        ((FSDataOutputStream)Mockito.doThrow((Throwable[])new Throwable[]{new IOException("Test IOException.")}).when((Object)outputStream)).close();
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile((File)TempDirUtils.newFolder((java.nio.file.Path)this.tempDir)), fs, 4, 0, this.relativePaths);
        stream.write(new byte[]{1, 2, 3, 4, 5});
        ((FileSystem)Mockito.verify((Object)fs)).create((Path)Matchers.any(Path.class), (FileSystem.WriteMode)Matchers.any(FileSystem.WriteMode.class));
        Assertions.assertThatThrownBy(() -> ((CheckpointStateOutputStream)stream).closeAndGetHandle()).isInstanceOf(IOException.class);
        ((FileSystem)Mockito.verify((Object)fs)).delete((Path)Matchers.eq((Object)((Path)pathCaptor.getValue())), Matchers.anyBoolean());
    }

    private void runTest(int numBytes, int bufferSize, int threshold, boolean expectFile) throws Exception {
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile((File)TempDirUtils.newFolder((java.nio.file.Path)this.tempDir)), FileSystem.getLocalFileSystem(), bufferSize, threshold, this.relativePaths);
        Random rnd = new Random();
        byte[] original = new byte[numBytes];
        byte[] bytes = new byte[original.length];
        rnd.nextBytes(original);
        System.arraycopy(original, 0, bytes, 0, original.length);
        int pos = 0;
        while (pos < bytes.length) {
            boolean single = rnd.nextBoolean();
            if (single) {
                stream.write((int)bytes[pos++]);
                continue;
            }
            int num = rnd.nextBoolean() ? bytes.length - pos : rnd.nextInt(bytes.length - pos);
            stream.write(bytes, pos, num);
            pos += num;
        }
        StreamStateHandle handle = stream.closeAndGetHandle();
        if (expectFile) {
            Assertions.assertThat((Object)handle).isInstanceOf(FileStateHandle.class);
        } else {
            Assertions.assertThat((Object)handle).isInstanceOf(ByteStreamStateHandle.class);
        }
        Assertions.assertThat((byte[])bytes).isEqualTo((Object)original);
        try (FSDataInputStream inStream = handle.openInputStream();){
            byte[] validation = new byte[bytes.length];
            DataInputStream dataInputStream = new DataInputStream((InputStream)inStream);
            dataInputStream.readFully(validation);
            Assertions.assertThat((byte[])validation).isEqualTo((Object)bytes);
        }
        handle.discardState();
    }

    @TestTemplate
    void testWriteFailsFastWhenClosed() throws Exception {
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile((File)TempDirUtils.newFolder((java.nio.file.Path)this.tempDir)), FileSystem.getLocalFileSystem(), 1024, 512, this.relativePaths);
        Assertions.assertThat((boolean)stream.isClosed()).isFalse();
        stream.close();
        Assertions.assertThat((boolean)stream.isClosed()).isTrue();
        Assertions.assertThatThrownBy(() -> stream.write(1)).isInstanceOf(IOException.class);
        Assertions.assertThatThrownBy(() -> stream.write(new byte[4], 1, 2)).isInstanceOf(IOException.class);
    }

    @TestTemplate
    void testMixedBelowAndAboveThreshold() throws Exception {
        StreamStateHandle handle4;
        byte[] state1 = new byte[0x137331];
        byte[] state2 = new byte[1];
        byte[] state3 = new byte[]{};
        byte[] state4 = new byte[177];
        Random rnd = new Random();
        rnd.nextBytes(state1);
        rnd.nextBytes(state2);
        rnd.nextBytes(state3);
        rnd.nextBytes(state4);
        File directory = TempDirUtils.newFolder((java.nio.file.Path)this.tempDir);
        Path basePath = Path.fromLocalFile((File)directory);
        Supplier<CheckpointStateOutputStream> factory = () -> new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(basePath, FileSystem.getLocalFileSystem(), 1024, 15, this.relativePaths);
        CheckpointStateOutputStream stream1 = factory.get();
        CheckpointStateOutputStream stream2 = factory.get();
        CheckpointStateOutputStream stream3 = factory.get();
        stream1.write(state1);
        stream2.write(state2);
        stream3.write(state3);
        FileStateHandle handle1 = (FileStateHandle)stream1.closeAndGetHandle();
        ByteStreamStateHandle handle2 = (ByteStreamStateHandle)stream2.closeAndGetHandle();
        ByteStreamStateHandle handle3 = (ByteStreamStateHandle)stream3.closeAndGetHandle();
        try (CheckpointStateOutputStream stream4 = factory.get();){
            stream4.write(state4);
            handle4 = stream4.closeAndGetHandle();
        }
        CheckpointStateOutputStream stream5 = factory.get();
        stream5.write(state4);
        stream5.close();
        Assertions.assertThatThrownBy(() -> ((CheckpointStateOutputStream)stream5).closeAndGetHandle()).isInstanceOf(IOException.class);
        FsCheckpointStateOutputStreamTest.validateBytesInStream((InputStream)handle1.openInputStream(), state1);
        handle1.discardState();
        Assertions.assertThat((boolean)FsCheckpointStateOutputStreamTest.isDirectoryEmpty(directory)).isFalse();
        FsCheckpointStateOutputStreamTest.ensureLocalFileDeleted(handle1.getFilePath());
        FsCheckpointStateOutputStreamTest.validateBytesInStream((InputStream)handle2.openInputStream(), state2);
        handle2.discardState();
        Assertions.assertThat((boolean)FsCheckpointStateOutputStreamTest.isDirectoryEmpty(directory)).isFalse();
        Assertions.assertThat((Object)handle3).isNull();
        Assertions.assertThat((boolean)FsCheckpointStateOutputStreamTest.isDirectoryEmpty(directory)).isFalse();
        FsCheckpointStateOutputStreamTest.validateBytesInStream((InputStream)handle4.openInputStream(), state4);
        handle4.discardState();
        Assertions.assertThat((boolean)FsCheckpointStateOutputStreamTest.isDirectoryEmpty(directory)).isTrue();
    }

    @Tag(value="org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @TestTemplate
    void testStreamDoesNotTryToCleanUpParentOnError() throws Exception {
        File directory = TempDirUtils.newFolder((java.nio.file.Path)this.tempDir);
        Assumptions.assumeThat((boolean)directory.setWritable(false, true)).isTrue();
        FsCheckpointStateOutputStreamTest.checkDirectoryNotWritable(directory);
        FileSystem fs = (FileSystem)Mockito.spy((Object)FileSystem.getLocalFileSystem());
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream1 = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile((File)directory), fs, 1024, 1, this.relativePaths);
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream2 = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile((File)directory), fs, 1024, 1, this.relativePaths);
        stream1.write(new byte[61]);
        stream2.write(new byte[61]);
        Assertions.assertThatThrownBy(() -> ((FsCheckpointStreamFactory.FsCheckpointStateOutputStream)stream1).closeAndGetHandle()).isInstanceOf(IOException.class);
        stream2.close();
        ((FileSystem)Mockito.verify((Object)fs, (VerificationMode)Mockito.times((int)0))).delete((Path)Matchers.any(Path.class), Matchers.anyBoolean());
        Assertions.assertThat((File)directory).exists();
        Assertions.assertThat((File)directory).isDirectory();
    }

    @TestTemplate
    public void testCleanupWhenCloseableRegistryClosedBeforeCreatingStream() throws Exception {
        OneShotLatch streamCreationLatch = new OneShotLatch();
        OneShotLatch startCloseLatch = new OneShotLatch();
        OneShotLatch endCloseLatch = new OneShotLatch();
        FileSystem fs = (FileSystem)Mockito.mock(FileSystem.class);
        FSDataOutputStream fsDataOutputStream = (FSDataOutputStream)Mockito.mock(FSDataOutputStream.class);
        ((FileSystem)Mockito.doAnswer(invocation -> {
            streamCreationLatch.trigger();
            startCloseLatch.await();
            Assert.assertThrows(TimeoutException.class, () -> endCloseLatch.await(1L, TimeUnit.SECONDS));
            return fsDataOutputStream;
        }).when((Object)fs)).create((Path)Matchers.any(Path.class), (FileSystem.WriteMode)Matchers.any(FileSystem.WriteMode.class));
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream outputStream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile((File)TempDirUtils.newFolder((java.nio.file.Path)this.tempDir)), fs, 1024, 1, this.relativePaths);
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        closeableRegistry.registerCloseable((AutoCloseable)outputStream);
        CompletableFuture<Void> flushFuture = CompletableFuture.runAsync(() -> {
            try {
                outputStream.flushToFile();
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }, Executors.newSingleThreadExecutor());
        streamCreationLatch.await();
        Assert.assertFalse((boolean)outputStream.isClosed());
        ((FSDataOutputStream)Mockito.verify((Object)fsDataOutputStream, (VerificationMode)Mockito.never())).close();
        startCloseLatch.trigger();
        closeableRegistry.close();
        endCloseLatch.trigger();
        flushFuture.get();
        Assert.assertTrue((boolean)outputStream.isClosed());
        ((FSDataOutputStream)Mockito.verify((Object)fsDataOutputStream)).close();
    }

    private static void ensureLocalFileDeleted(Path path) {
        URI uri = path.toUri();
        if (!"file".equals(uri.getScheme())) {
            throw new IllegalArgumentException("not a local path");
        }
        File file = new File(uri.getPath());
        ((AbstractFileAssert)Assertions.assertThat((File)file).withFailMessage("file not properly deleted", new Object[0])).doesNotExist();
    }

    private static boolean isDirectoryEmpty(File directory) {
        if (!directory.exists()) {
            return true;
        }
        String[] nested = directory.list();
        return nested == null || nested.length == 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void validateBytesInStream(InputStream is, byte[] data) throws IOException {
        try {
            int pos;
            int read;
            byte[] holder = new byte[data.length];
            for (pos = 0; pos < holder.length && (read = is.read(holder, pos, holder.length - pos)) != -1; pos += read) {
            }
            ((AbstractIntegerAssert)Assertions.assertThat((int)pos).withFailMessage("not enough data", new Object[0])).isEqualTo(holder.length);
            ((AbstractIntegerAssert)Assertions.assertThat((int)is.read()).withFailMessage("too much data", new Object[0])).isEqualTo(-1);
            ((AbstractByteArrayAssert)Assertions.assertThat((byte[])holder).withFailMessage("wrong data", new Object[0])).isEqualTo((Object)data);
        }
        finally {
            is.close();
        }
    }

    private static void checkDirectoryNotWritable(File directory) {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> {
            try (FileOutputStream fos = new FileOutputStream(new File(directory, "temp"));){
                fos.write(42);
                fos.flush();
            }
        }).withFailMessage("this should fail when writing is properly prevented", new Object[0])).isInstanceOf(IOException.class);
    }
}

