/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.table.stream;

import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
import org.apache.flink.connector.file.table.FileSystemTableSink;
import org.apache.flink.connector.file.table.stream.PartitionCommitInfo;
import org.apache.flink.connector.file.table.stream.StreamingFileWriter;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class StreamingFileWriterTest {
    private final OutputFileConfig outputFileConfig = OutputFileConfig.builder().build();
    private final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
    @TempDir
    private java.nio.file.Path tmpDir;
    private Path path;

    StreamingFileWriterTest() {
    }

    @BeforeEach
    void before() throws IOException {
        this.path = new Path(this.tmpDir.resolve("tmp").toUri());
    }

    @Test
    void testFailover() throws Exception {
        List<String> partitions;
        OperatorSubtaskState state;
        try (OneInputStreamOperatorTestHarness<RowData, PartitionCommitInfo> harness = this.create();){
            harness.setup();
            harness.initializeEmptyState();
            harness.open();
            harness.processElement((Object)StreamingFileWriterTest.row("1"), 0L);
            harness.processElement((Object)StreamingFileWriterTest.row("2"), 0L);
            harness.processElement((Object)StreamingFileWriterTest.row("2"), 0L);
            state = harness.snapshot(1L, 1L);
            harness.processElement((Object)StreamingFileWriterTest.row("3"), 0L);
            harness.processElement((Object)StreamingFileWriterTest.row("4"), 0L);
            harness.notifyOfCompletedCheckpoint(1L);
            partitions = StreamingFileWriterTest.collect(harness);
            Assertions.assertThat(partitions).containsExactly((Object[])new String[]{"1", "2"});
        }
        harness = this.create();
        var3_2 = null;
        try {
            harness.setup();
            harness.initializeState(state);
            harness.open();
            harness.processElement((Object)StreamingFileWriterTest.row("3"), 0L);
            harness.processElement((Object)StreamingFileWriterTest.row("4"), 0L);
            state = harness.snapshot(2L, 2L);
            harness.notifyOfCompletedCheckpoint(2L);
            partitions = StreamingFileWriterTest.collect(harness);
            Assertions.assertThat(partitions).containsExactly((Object[])new String[]{"1", "2", "3", "4"});
        }
        catch (Throwable partitions2) {
            var3_2 = partitions2;
            throw partitions2;
        }
        finally {
            if (harness != null) {
                if (var3_2 != null) {
                    try {
                        harness.close();
                    }
                    catch (Throwable partitions2) {
                        var3_2.addSuppressed(partitions2);
                    }
                } else {
                    harness.close();
                }
            }
        }
        harness = this.create();
        var3_2 = null;
        try {
            harness.setup();
            harness.initializeState(state);
            harness.open();
            harness.processElement((Object)StreamingFileWriterTest.row("4"), 0L);
            harness.processElement((Object)StreamingFileWriterTest.row("5"), 0L);
            state = harness.snapshot(3L, 3L);
            harness.notifyOfCompletedCheckpoint(3L);
            partitions = StreamingFileWriterTest.collect(harness);
            Assertions.assertThat(partitions).containsExactly((Object[])new String[]{"3", "4", "5"});
        }
        catch (Throwable partitions3) {
            var3_2 = partitions3;
            throw partitions3;
        }
        finally {
            if (harness != null) {
                if (var3_2 != null) {
                    try {
                        harness.close();
                    }
                    catch (Throwable partitions3) {
                        var3_2.addSuppressed(partitions3);
                    }
                } else {
                    harness.close();
                }
            }
        }
        harness = this.create();
        var3_2 = null;
        try {
            harness.setup();
            harness.initializeState(state);
            harness.open();
            harness.processElement((Object)StreamingFileWriterTest.row("6"), 0L);
            harness.processElement((Object)StreamingFileWriterTest.row("7"), 0L);
            harness.snapshot(4L, 4L);
            harness.processElement((Object)StreamingFileWriterTest.row("8"), 0L);
            harness.snapshot(5L, 5L);
            harness.processElement((Object)StreamingFileWriterTest.row("9"), 0L);
            harness.snapshot(6L, 6L);
            harness.notifyOfCompletedCheckpoint(5L);
            partitions = StreamingFileWriterTest.collect(harness);
            Assertions.assertThat(partitions).containsExactly((Object[])new String[]{"4", "5", "6", "7", "8"});
        }
        catch (Throwable throwable) {
            var3_2 = throwable;
            throw throwable;
        }
        finally {
            if (harness != null) {
                if (var3_2 != null) {
                    try {
                        harness.close();
                    }
                    catch (Throwable throwable) {
                        var3_2.addSuppressed(throwable);
                    }
                } else {
                    harness.close();
                }
            }
        }
    }

    @Test
    void testCommitImmediately() throws Exception {
        try (OneInputStreamOperatorTestHarness<RowData, PartitionCommitInfo> harness = this.create();){
            harness.setup();
            harness.initializeEmptyState();
            harness.open();
            harness.processElement((Object)StreamingFileWriterTest.row("1"), 0L);
            harness.processElement((Object)StreamingFileWriterTest.row("2"), 0L);
            harness.processElement((Object)StreamingFileWriterTest.row("2"), 0L);
            harness.snapshot(1L, 1L);
            harness.processElement((Object)StreamingFileWriterTest.row("1"), 0L);
            harness.processElement((Object)StreamingFileWriterTest.row("3"), 0L);
            harness.processElement((Object)StreamingFileWriterTest.row("4"), 0L);
            harness.notifyOfCompletedCheckpoint(1L);
            List<String> partitions = StreamingFileWriterTest.collect(harness);
            Assertions.assertThat(partitions).containsExactly((Object[])new String[]{"1", "2"});
        }
    }

    @Test
    void testCommitFileWhenPartitionIsCommittableByProcessTime() throws Exception {
        OperatorSubtaskState state;
        FileSystemTableSink.TableRollingPolicy tableRollingPolicy = new FileSystemTableSink.TableRollingPolicy(false, Long.MAX_VALUE, Duration.ofDays(1L).toMillis(), Duration.ofDays(1L).toMillis());
        List<String> partitionKeys = Collections.singletonList("d");
        Configuration conf = this.getProcTimeCommitTriggerConf(Duration.ofSeconds(1L).toMillis());
        long currentTimeMillis = System.currentTimeMillis();
        try (OneInputStreamOperatorTestHarness<RowData, PartitionCommitInfo> harness = this.create((RollingPolicy<RowData, String>)tableRollingPolicy, partitionKeys, conf);){
            harness.setup();
            harness.initializeEmptyState();
            harness.open();
            harness.setProcessingTime(currentTimeMillis);
            harness.processElement((Object)StreamingFileWriterTest.row("1"), 0L);
            harness.processElement((Object)StreamingFileWriterTest.row("2"), 0L);
            state = harness.snapshot(1L, 1L);
            harness.processElement((Object)StreamingFileWriterTest.row("3"), 0L);
            harness.notifyOfCompletedCheckpoint(1L);
            Assertions.assertThat((boolean)this.isPartitionFileCommitted("1", 0, 0)).isFalse();
            Assertions.assertThat((boolean)this.isPartitionFileCommitted("2", 0, 1)).isFalse();
        }
        harness = this.create((RollingPolicy<RowData, String>)tableRollingPolicy, partitionKeys, conf);
        var8_6 = null;
        try {
            harness.setup();
            harness.initializeState(state);
            harness.open();
            harness.processElement((Object)StreamingFileWriterTest.row("3"), 0L);
            harness.setProcessingTime(currentTimeMillis += Duration.ofSeconds(2L).toMillis());
            harness.processElement((Object)StreamingFileWriterTest.row("4"), 0L);
            harness.snapshot(2L, 2L);
            harness.notifyOfCompletedCheckpoint(2L);
            Assertions.assertThat((boolean)this.isPartitionFileCommitted("3", 0, 2)).isTrue();
            Assertions.assertThat((boolean)this.isPartitionFileCommitted("4", 0, 3)).isFalse();
            harness.setProcessingTime(currentTimeMillis += Duration.ofSeconds(2L).toMillis());
            state = harness.snapshot(3L, 3L);
            harness.notifyOfCompletedCheckpoint(3L);
            Assertions.assertThat((boolean)this.isPartitionFileCommitted("4", 0, 3)).isTrue();
        }
        catch (Throwable throwable) {
            var8_6 = throwable;
            throw throwable;
        }
        finally {
            if (harness != null) {
                if (var8_6 != null) {
                    try {
                        harness.close();
                    }
                    catch (Throwable throwable) {
                        var8_6.addSuppressed(throwable);
                    }
                } else {
                    harness.close();
                }
            }
        }
        harness = this.create((RollingPolicy<RowData, String>)tableRollingPolicy, partitionKeys, conf);
        var8_6 = null;
        try {
            harness.setup();
            harness.initializeState(state);
            harness.open();
            harness.processElement((Object)StreamingFileWriterTest.row("4"), 0L);
            harness.processElement((Object)StreamingFileWriterTest.row("4"), 0L);
            harness.snapshot(4L, 4L);
            harness.processElement((Object)StreamingFileWriterTest.row("5"), 5L);
            harness.endInput();
            Assertions.assertThat((boolean)this.isPartitionFileCommitted("4", 0, 4)).isTrue();
            Assertions.assertThat((boolean)this.isPartitionFileCommitted("5", 0, 5)).isTrue();
        }
        catch (Throwable throwable) {
            var8_6 = throwable;
            throw throwable;
        }
        finally {
            if (harness != null) {
                if (var8_6 != null) {
                    try {
                        harness.close();
                    }
                    catch (Throwable throwable) {
                        var8_6.addSuppressed(throwable);
                    }
                } else {
                    harness.close();
                }
            }
        }
    }

    @Test
    void testCommitFileWhenPartitionIsCommittableByPartitionTime() throws Exception {
        OperatorSubtaskState state;
        FileSystemTableSink.TableRollingPolicy tableRollingPolicy = new FileSystemTableSink.TableRollingPolicy(false, Long.MAX_VALUE, Duration.ofDays(1L).toMillis(), Duration.ofDays(1L).toMillis());
        List<String> partitionKeys = Collections.singletonList("d");
        Configuration conf = this.getPartitionCommitTriggerConf(Duration.ofDays(1L).toMillis());
        long currentTimeMillis = System.currentTimeMillis();
        Date nextYear = new Date(currentTimeMillis + Duration.ofDays(365L).toMillis());
        String nextYearPartition = "d=" + this.dateFormat.format(nextYear);
        Date yesterday = new Date(currentTimeMillis - Duration.ofDays(1L).toMillis());
        String yesterdayPartition = "d=" + this.dateFormat.format(yesterday);
        Date today = new Date(currentTimeMillis);
        String todayPartition = "d=" + this.dateFormat.format(today);
        Date tomorrow = new Date(currentTimeMillis + Duration.ofDays(1L).toMillis());
        String tomorrowPartition = "d=" + this.dateFormat.format(tomorrow);
        try (OneInputStreamOperatorTestHarness<RowData, PartitionCommitInfo> harness = this.create((RollingPolicy<RowData, String>)tableRollingPolicy, partitionKeys, conf);){
            harness.setup();
            harness.initializeEmptyState();
            harness.open();
            harness.processElement((Object)StreamingFileWriterTest.row(yesterdayPartition), 0L);
            harness.processWatermark(currentTimeMillis);
            state = harness.snapshot(1L, 1L);
            harness.notifyOfCompletedCheckpoint(1L);
            Assertions.assertThat((boolean)this.isPartitionFileCommitted(yesterdayPartition, 0, 0)).isTrue();
        }
        harness = this.create((RollingPolicy<RowData, String>)tableRollingPolicy, partitionKeys, conf);
        var16_14 = null;
        try {
            harness.setup();
            harness.initializeState(state);
            harness.open();
            harness.processElement((Object)StreamingFileWriterTest.row(tomorrowPartition), 0L);
            harness.processElement((Object)StreamingFileWriterTest.row(todayPartition), 0L);
            harness.processWatermark(currentTimeMillis += Duration.ofDays(1L).toMillis());
            harness.snapshot(2L, 2L);
            harness.notifyOfCompletedCheckpoint(2L);
            Assertions.assertThat((boolean)this.isPartitionFileCommitted(todayPartition, 0, 2)).isTrue();
            Assertions.assertThat((boolean)this.isPartitionFileCommitted(tomorrowPartition, 0, 1)).isFalse();
            harness.processWatermark(currentTimeMillis += Duration.ofDays(1L).toMillis());
            state = harness.snapshot(3L, 3L);
            harness.notifyOfCompletedCheckpoint(3L);
            Assertions.assertThat((boolean)this.isPartitionFileCommitted(tomorrowPartition, 0, 1)).isTrue();
            harness.processElement((Object)StreamingFileWriterTest.row(nextYearPartition), 0L);
        }
        catch (Throwable throwable) {
            var16_14 = throwable;
            throw throwable;
        }
        finally {
            if (harness != null) {
                if (var16_14 != null) {
                    try {
                        harness.close();
                    }
                    catch (Throwable throwable) {
                        var16_14.addSuppressed(throwable);
                    }
                } else {
                    harness.close();
                }
            }
        }
        harness = this.create((RollingPolicy<RowData, String>)tableRollingPolicy, partitionKeys, conf);
        var16_14 = null;
        try {
            harness.setup();
            harness.initializeState(state);
            harness.open();
            harness.processElement((Object)StreamingFileWriterTest.row(nextYearPartition), 0L);
            harness.processElement((Object)StreamingFileWriterTest.row(tomorrowPartition), 0L);
            harness.endInput();
            Assertions.assertThat((boolean)this.isPartitionFileCommitted(tomorrowPartition, 0, 4)).isTrue();
            Assertions.assertThat((boolean)this.isPartitionFileCommitted(nextYearPartition, 0, 3)).isTrue();
        }
        catch (Throwable throwable) {
            var16_14 = throwable;
            throw throwable;
        }
        finally {
            if (harness != null) {
                if (var16_14 != null) {
                    try {
                        harness.close();
                    }
                    catch (Throwable throwable) {
                        var16_14.addSuppressed(throwable);
                    }
                } else {
                    harness.close();
                }
            }
        }
    }

    private static RowData row(String s) {
        return GenericRowData.of((Object[])new Object[]{StringData.fromString((String)s)});
    }

    private static List<String> collect(OneInputStreamOperatorTestHarness<RowData, PartitionCommitInfo> harness) {
        ArrayList<String> parts = new ArrayList<String>();
        harness.extractOutputValues().forEach(m -> parts.addAll(m.getPartitions()));
        return parts;
    }

    private OneInputStreamOperatorTestHarness<RowData, PartitionCommitInfo> create() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_TRIGGER.key(), "process-time");
        return this.create((RollingPolicy<RowData, String>)OnCheckpointRollingPolicy.build(), new ArrayList<String>(), configuration);
    }

    private OneInputStreamOperatorTestHarness<RowData, PartitionCommitInfo> create(RollingPolicy<RowData, String> rollingPolicy, List<String> partitionKeys, Configuration conf) throws Exception {
        StreamingFileWriter writer = new StreamingFileWriter(1000L, (StreamingFileSink.BucketsBuilder)((StreamingFileSink.DefaultRowFormatBuilder)StreamingFileSink.forRowFormat((Path)this.path, (Encoder & Serializable)(element, stream) -> stream.write((element.getString(0) + "\n").getBytes(StandardCharsets.UTF_8))).withBucketAssigner((BucketAssigner)new BucketAssigner<RowData, String>(){

            public String getBucketId(RowData element, BucketAssigner.Context context) {
                return element.getString(0).toString();
            }

            public SimpleVersionedSerializer<String> getSerializer() {
                return SimpleVersionedStringSerializer.INSTANCE;
            }
        })).withRollingPolicy(rollingPolicy), partitionKeys, conf);
        OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)writer, 1, 1, 0);
        harness.getStreamConfig().setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        return harness;
    }

    private Configuration getPartitionCommitTriggerConf(long commitDelay) {
        Configuration configuration = new Configuration();
        configuration.setString(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_KIND, "success-file");
        configuration.setString(FileSystemConnectorOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_FORMATTER.key(), "yyyy-MM-dd");
        configuration.setString(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_TRIGGER.key(), "partition-time");
        configuration.setLong(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_DELAY.key(), commitDelay);
        configuration.setString(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE.key(), "UTC");
        return configuration;
    }

    private Configuration getProcTimeCommitTriggerConf(long commitDelay) {
        Configuration configuration = new Configuration();
        configuration.setString(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_KIND, "success-file");
        configuration.setString(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_TRIGGER.key(), "process-time");
        configuration.setLong(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_DELAY.key(), commitDelay);
        configuration.setString(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE.key(), "UTC");
        return configuration;
    }

    private boolean isPartitionFileCommitted(String partition, int subtaskIndex, int partCounter) {
        java.nio.file.Path bucketPath = Paths.get(this.path.getPath(), partition);
        String fileName = this.outputFileConfig.getPartPrefix() + '-' + subtaskIndex + '-' + partCounter + this.outputFileConfig.getPartSuffix();
        java.nio.file.Path filePath = bucketPath.resolve(fileName);
        return filePath.toFile().exists();
    }
}

