/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.sink;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Paths;
import java.util.stream.Stream;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.connector.file.sink.FileSinkCommittable;
import org.apache.flink.connector.file.sink.FileSinkCommittableSerializer;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

class FileSinkCommittableSerializerMigrationTest {
    private static final int CURRENT_VERSION = 1;
    private static final String IN_PROGRESS_CONTENT = "writing";
    private static final String PENDING_CONTENT = "wrote";
    private static final java.nio.file.Path BASE_PATH = Paths.get("src/test/resources/", new String[0]).resolve("committable-serializer-migration");

    FileSinkCommittableSerializerMigrationTest() {
    }

    static Stream<Integer> previousVersions() {
        return Stream.of(Integer.valueOf(1));
    }

    @Test
    @Disabled
    void prepareDeserializationInProgressToCleanup() throws IOException {
        String scenario = "in-progress";
        java.nio.file.Path path = this.resolveVersionPath(1L, scenario);
        RowWiseBucketWriter<String, String> bucketWriter = FileSinkCommittableSerializerMigrationTest.createBucketWriter();
        RecoverableWriter writer = FileSystem.getLocalFileSystem().createRecoverableWriter();
        FileSinkCommittableSerializer serializer = new FileSinkCommittableSerializer(bucketWriter.getProperties().getPendingFileRecoverableSerializer(), bucketWriter.getProperties().getInProgressFileRecoverableSerializer());
        RecoverableFsDataOutputStream outputStream = writer.open(new Path(path.resolve("content").toString()));
        outputStream.write(IN_PROGRESS_CONTENT.getBytes(StandardCharsets.UTF_8));
        RecoverableWriter.ResumeRecoverable resumeRecoverable = outputStream.persist();
        OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverable recoverable = new OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverable(resumeRecoverable);
        FileSinkCommittable committable = new FileSinkCommittable("0", (InProgressFileWriter.InProgressFileRecoverable)recoverable);
        byte[] bytes = serializer.serialize(committable);
        Files.write(path.resolve("committable"), bytes, new OpenOption[0]);
    }

    @ParameterizedTest(name="Previous Version = {0}")
    @MethodSource(value={"previousVersions"})
    void testSerializationInProgressToCleanup(Integer previousVersion) throws IOException {
        String scenario = "in-progress";
        java.nio.file.Path path = this.resolveVersionPath(previousVersion.intValue(), scenario);
        RowWiseBucketWriter<String, String> bucketWriter = FileSinkCommittableSerializerMigrationTest.createBucketWriter();
        FileSinkCommittableSerializer serializer = new FileSinkCommittableSerializer(bucketWriter.getProperties().getPendingFileRecoverableSerializer(), bucketWriter.getProperties().getInProgressFileRecoverableSerializer());
        FileSinkCommittable committable = serializer.deserialize(previousVersion.intValue(), Files.readAllBytes(path.resolve("committable")));
        Assertions.assertThat((boolean)committable.hasInProgressFileToCleanup()).isTrue();
        Assertions.assertThat((boolean)committable.hasPendingFile()).isFalse();
    }

    @Test
    @Disabled
    void prepareDeserializationPending() throws IOException {
        String scenario = "pending";
        java.nio.file.Path path = this.resolveVersionPath(1L, scenario);
        RowWiseBucketWriter<String, String> bucketWriter = FileSinkCommittableSerializerMigrationTest.createBucketWriter();
        RecoverableWriter writer = FileSystem.getLocalFileSystem().createRecoverableWriter();
        FileSinkCommittableSerializer serializer = new FileSinkCommittableSerializer(bucketWriter.getProperties().getPendingFileRecoverableSerializer(), bucketWriter.getProperties().getInProgressFileRecoverableSerializer());
        RecoverableFsDataOutputStream outputStream = writer.open(new Path(path.resolve("content").toString()));
        outputStream.write(PENDING_CONTENT.getBytes(StandardCharsets.UTF_8));
        RecoverableWriter.CommitRecoverable commitRecoverable = outputStream.closeForCommit().getRecoverable();
        OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable recoverable = new OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable(commitRecoverable);
        FileSinkCommittable committable = new FileSinkCommittable("0", (InProgressFileWriter.PendingFileRecoverable)recoverable);
        byte[] bytes = serializer.serialize(committable);
        Files.write(path.resolve("committable"), bytes, new OpenOption[0]);
    }

    @ParameterizedTest(name="Previous Version = {0}")
    @MethodSource(value={"previousVersions"})
    void testSerializationPending(Integer previousVersion) throws IOException {
        String scenario = "pending";
        java.nio.file.Path path = this.resolveVersionPath(previousVersion.intValue(), scenario);
        RowWiseBucketWriter<String, String> bucketWriter = FileSinkCommittableSerializerMigrationTest.createBucketWriter();
        FileSinkCommittableSerializer serializer = new FileSinkCommittableSerializer(bucketWriter.getProperties().getPendingFileRecoverableSerializer(), bucketWriter.getProperties().getInProgressFileRecoverableSerializer());
        FileSinkCommittable committable = serializer.deserialize(previousVersion.intValue(), Files.readAllBytes(path.resolve("committable")));
        Assertions.assertThat((boolean)committable.hasPendingFile()).isTrue();
        Assertions.assertThat((boolean)committable.hasInProgressFileToCleanup()).isFalse();
    }

    private java.nio.file.Path resolveVersionPath(long version, String scenario) {
        return BASE_PATH.resolve(scenario + "-v" + version);
    }

    private static RowWiseBucketWriter<String, String> createBucketWriter() throws IOException {
        return new RowWiseBucketWriter(FileSystem.getLocalFileSystem().createRecoverableWriter(), (Encoder)new SimpleStringEncoder());
    }
}

