/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.sink.writer;

import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.ScheduledFuture;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.sink.utils.FileSinkTestUtils;
import org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory;
import org.apache.flink.connector.file.sink.writer.FileWriter;
import org.apache.flink.connector.file.sink.writer.FileWriterBucket;
import org.apache.flink.connector.file.sink.writer.FileWriterBucketFactory;
import org.apache.flink.connector.file.sink.writer.FileWriterBucketState;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.runtime.metrics.groups.InternalOperatorIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.util.ExceptionUtils;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class FileWriterTest {
    private MetricListener metricListener;

    FileWriterTest() {
    }

    @BeforeEach
    void setUp() {
        this.metricListener = new MetricListener();
    }

    @Test
    void testPreCommit(@TempDir java.nio.file.Path tempDir) throws Exception {
        Path path = new Path(tempDir.toUri());
        FileWriter<String> fileWriter = this.createWriter(path, (RollingPolicy<String, String>)OnCheckpointRollingPolicy.build(), new OutputFileConfig("part-", ""));
        fileWriter.write((Object)"test1", (SinkWriter.Context)new ContextImpl());
        fileWriter.write((Object)"test1", (SinkWriter.Context)new ContextImpl());
        fileWriter.write((Object)"test2", (SinkWriter.Context)new ContextImpl());
        fileWriter.write((Object)"test2", (SinkWriter.Context)new ContextImpl());
        fileWriter.write((Object)"test3", (SinkWriter.Context)new ContextImpl());
        Collection committables = fileWriter.prepareCommit();
        Assertions.assertThat((int)committables.size()).isEqualTo(3);
    }

    @Test
    void testSnapshotAndRestore(@TempDir java.nio.file.Path tempDir) throws Exception {
        Path path = new Path(tempDir.toUri());
        FileWriter<String> fileWriter = this.createWriter(path, (RollingPolicy<String, String>)DefaultRollingPolicy.builder().build(), new OutputFileConfig("part-", ""));
        fileWriter.write((Object)"test1", (SinkWriter.Context)new ContextImpl());
        fileWriter.write((Object)"test2", (SinkWriter.Context)new ContextImpl());
        fileWriter.write((Object)"test3", (SinkWriter.Context)new ContextImpl());
        Assertions.assertThat((int)fileWriter.getActiveBuckets().size()).isEqualTo(3);
        fileWriter.prepareCommit();
        List states = fileWriter.snapshotState(1L);
        Assertions.assertThat((int)states.size()).isEqualTo(3);
        fileWriter = this.restoreWriter(states, path, (RollingPolicy<String, String>)OnCheckpointRollingPolicy.build(), new OutputFileConfig("part-", ""));
        Assertions.assertThat(fileWriter.getActiveBuckets().keySet()).isEqualTo(new HashSet<String>(Arrays.asList("test1", "test2", "test3")));
        for (FileWriterBucket bucket : fileWriter.getActiveBuckets().values()) {
            ((ObjectAssert)Assertions.assertThat((Object)bucket.getInProgressPart()).as("The in-progress file should be recovered", new Object[0])).isNotNull();
        }
    }

    @Test
    void testMergingForRescaling(@TempDir java.nio.file.Path tempDir) throws Exception {
        FileWriterBucket bucket;
        Path path = new Path(tempDir.toUri());
        FileWriter<String> firstFileWriter = this.createWriter(path, (RollingPolicy<String, String>)DefaultRollingPolicy.builder().build(), new OutputFileConfig("part-", ""));
        firstFileWriter.write((Object)"test1", (SinkWriter.Context)new ContextImpl());
        firstFileWriter.write((Object)"test2", (SinkWriter.Context)new ContextImpl());
        firstFileWriter.write((Object)"test3", (SinkWriter.Context)new ContextImpl());
        firstFileWriter.prepareCommit();
        List firstState = firstFileWriter.snapshotState(1L);
        FileWriter<String> secondFileWriter = this.createWriter(path, (RollingPolicy<String, String>)DefaultRollingPolicy.builder().build(), new OutputFileConfig("part-", ""));
        secondFileWriter.write((Object)"test1", (SinkWriter.Context)new ContextImpl());
        secondFileWriter.write((Object)"test2", (SinkWriter.Context)new ContextImpl());
        secondFileWriter.prepareCommit();
        List secondState = secondFileWriter.snapshotState(1L);
        ArrayList<FileWriterBucketState> mergedState = new ArrayList<FileWriterBucketState>();
        mergedState.addAll(firstState);
        mergedState.addAll(secondState);
        FileWriter<String> restoredWriter = this.restoreWriter(mergedState, path, (RollingPolicy<String, String>)DefaultRollingPolicy.builder().build(), new OutputFileConfig("part-", ""));
        Assertions.assertThat((int)restoredWriter.getActiveBuckets().size()).isEqualTo(3);
        for (String bucketId : Arrays.asList("test1", "test2")) {
            bucket = (FileWriterBucket)restoredWriter.getActiveBuckets().get(bucketId);
            ((ObjectAssert)Assertions.assertThat((Object)bucket.getInProgressPart()).as("The in-progress file should be recovered", new Object[0])).isNotNull();
            Assertions.assertThat((int)bucket.getPendingFiles().size()).isEqualTo(1);
        }
        for (String bucketId : Collections.singletonList("test3")) {
            bucket = (FileWriterBucket)restoredWriter.getActiveBuckets().get(bucketId);
            ((ObjectAssert)Assertions.assertThat((Object)bucket.getInProgressPart()).as("The in-progress file should be recovered", new Object[0])).isNotNull();
            Assertions.assertThat((int)bucket.getPendingFiles().size()).isEqualTo(0);
        }
    }

    @Test
    void testBucketIsRemovedWhenNotActive(@TempDir java.nio.file.Path tempDir) throws Exception {
        Path path = new Path(tempDir.toUri());
        FileWriter<String> fileWriter = this.createWriter(path, (RollingPolicy<String, String>)OnCheckpointRollingPolicy.build(), new OutputFileConfig("part-", ""));
        fileWriter.write((Object)"test", (SinkWriter.Context)new ContextImpl());
        fileWriter.prepareCommit();
        fileWriter.snapshotState(1L);
        fileWriter.prepareCommit();
        Assertions.assertThat((boolean)fileWriter.getActiveBuckets().isEmpty()).isTrue();
    }

    @Test
    void testOnProcessingTime(@TempDir java.nio.file.Path tempDir) throws Exception {
        Path path = new Path(tempDir.toUri());
        ManuallyTriggeredProcessingTimeService processingTimeService = new ManuallyTriggeredProcessingTimeService();
        processingTimeService.advanceTo(10L);
        FileWriter<String> fileWriter = this.createWriter(path, new FileSinkTestUtils.StringIdentityBucketAssigner(), (RollingPolicy<String, String>)DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMillis(10L)).build(), new OutputFileConfig("part-", ""), processingTimeService, 5L);
        fileWriter.initializeState(Collections.emptyList());
        fileWriter.write((Object)"test1", (SinkWriter.Context)new ContextImpl());
        processingTimeService.advanceTo(15L);
        fileWriter.write((Object)"test2", (SinkWriter.Context)new ContextImpl());
        processingTimeService.advanceTo(20L);
        FileWriterBucket test1Bucket = (FileWriterBucket)fileWriter.getActiveBuckets().get("test1");
        ((ObjectAssert)Assertions.assertThat((Object)test1Bucket.getInProgressPart()).as("The in-progress part of test1 should be rolled", new Object[0])).isNull();
        Assertions.assertThat((int)test1Bucket.getPendingFiles().size()).isEqualTo(1);
        FileWriterBucket test2Bucket = (FileWriterBucket)fileWriter.getActiveBuckets().get("test2");
        ((ObjectAssert)Assertions.assertThat((Object)test2Bucket.getInProgressPart()).as("The in-progress part of test2 should not be rolled", new Object[0])).isNotNull();
        Assertions.assertThat((int)test2Bucket.getPendingFiles().size()).isEqualTo(0);
        processingTimeService.advanceTo(30L);
        fileWriter.prepareCommit();
        fileWriter.write((Object)"test1", (SinkWriter.Context)new ContextImpl());
        processingTimeService.advanceTo(35L);
        fileWriter.write((Object)"test2", (SinkWriter.Context)new ContextImpl());
        processingTimeService.advanceTo(40L);
        test1Bucket = (FileWriterBucket)fileWriter.getActiveBuckets().get("test1");
        ((ObjectAssert)Assertions.assertThat((Object)test1Bucket.getInProgressPart()).as("The in-progress part of test1 should be rolled", new Object[0])).isNull();
        Assertions.assertThat((int)test1Bucket.getPendingFiles().size()).isEqualTo(1);
        test2Bucket = (FileWriterBucket)fileWriter.getActiveBuckets().get("test2");
        ((ObjectAssert)Assertions.assertThat((Object)test2Bucket.getInProgressPart()).as("The in-progress part of test2 should not be rolled", new Object[0])).isNotNull();
        Assertions.assertThat((int)test2Bucket.getPendingFiles().size()).isEqualTo(0);
    }

    @Test
    void testContextPassingNormalExecution(@TempDir java.nio.file.Path tempDir) throws Exception {
        this.testCorrectTimestampPassingInContext(1L, 2L, 3L, tempDir);
    }

    @Test
    void testContextPassingNullTimestamp(@TempDir java.nio.file.Path tempDir) throws Exception {
        this.testCorrectTimestampPassingInContext(null, 4L, 5L, tempDir);
    }

    @Test
    void testNumberRecordsOutCounter(@TempDir java.nio.file.Path tempDir) throws IOException, InterruptedException {
        Path path = new Path(tempDir.toUri());
        InternalOperatorIOMetricGroup operatorIOMetricGroup = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
        InternalSinkWriterMetricGroup sinkWriterMetricGroup = InternalSinkWriterMetricGroup.mock((MetricGroup)this.metricListener.getMetricGroup(), (OperatorIOMetricGroup)operatorIOMetricGroup);
        Counter recordsCounter = sinkWriterMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
        ContextImpl context = new ContextImpl();
        FileWriter<String> fileWriter = this.createWriter(path, (RollingPolicy<String, String>)DefaultRollingPolicy.builder().build(), new OutputFileConfig("part-", ""), (SinkWriterMetricGroup)sinkWriterMetricGroup);
        Assertions.assertThat((long)recordsCounter.getCount()).isEqualTo(0L);
        fileWriter.write((Object)"1", (SinkWriter.Context)context);
        Assertions.assertThat((long)recordsCounter.getCount()).isEqualTo(1L);
        fileWriter.write((Object)"2", (SinkWriter.Context)context);
        fileWriter.write((Object)"3", (SinkWriter.Context)context);
        Assertions.assertThat((long)recordsCounter.getCount()).isEqualTo(3L);
    }

    private void testCorrectTimestampPassingInContext(Long timestamp, long watermark, long processingTime, java.nio.file.Path tempDir) throws Exception {
        Path path = new Path(tempDir.toUri());
        ManuallyTriggeredProcessingTimeService processingTimeService = new ManuallyTriggeredProcessingTimeService();
        processingTimeService.advanceTo(processingTime);
        FileWriter<String> fileWriter = this.createWriter(path, new VerifyingBucketAssigner(timestamp, watermark, processingTime), (RollingPolicy<String, String>)DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMillis(10L)).build(), new OutputFileConfig("part-", ""), processingTimeService, 5L);
        fileWriter.initializeState(Collections.emptyList());
        fileWriter.write((Object)"test", (SinkWriter.Context)new ContextImpl(watermark, timestamp));
    }

    private FileWriter<String> createWriter(Path basePath, RollingPolicy<String, String> rollingPolicy, OutputFileConfig outputFileConfig, SinkWriterMetricGroup sinkWriterMetricGroup) throws IOException {
        return new FileWriter(basePath, sinkWriterMetricGroup, (BucketAssigner)new FileSinkTestUtils.StringIdentityBucketAssigner(), (FileWriterBucketFactory)new DefaultFileWriterBucketFactory(), (BucketWriter)new RowWiseBucketWriter(FileSystem.get((URI)basePath.toUri()).createRecoverableWriter(), (Encoder)new SimpleStringEncoder()), rollingPolicy, outputFileConfig, (ProcessingTimeService)new ManuallyTriggeredProcessingTimeService(), 10L);
    }

    private FileWriter<String> createWriter(Path basePath, RollingPolicy<String, String> rollingPolicy, OutputFileConfig outputFileConfig) throws IOException {
        return this.createWriter(basePath, rollingPolicy, outputFileConfig, (SinkWriterMetricGroup)InternalSinkWriterMetricGroup.mock((MetricGroup)this.metricListener.getMetricGroup()));
    }

    private FileWriter<String> createWriter(Path basePath, BucketAssigner<String, String> bucketAssigner, RollingPolicy<String, String> rollingPolicy, OutputFileConfig outputFileConfig, ProcessingTimeService processingTimeService, long bucketCheckInterval) throws IOException {
        return new FileWriter(basePath, (SinkWriterMetricGroup)InternalSinkWriterMetricGroup.mock((MetricGroup)this.metricListener.getMetricGroup()), bucketAssigner, (FileWriterBucketFactory)new DefaultFileWriterBucketFactory(), (BucketWriter)new RowWiseBucketWriter(FileSystem.get((URI)basePath.toUri()).createRecoverableWriter(), (Encoder)new SimpleStringEncoder()), rollingPolicy, outputFileConfig, processingTimeService, bucketCheckInterval);
    }

    private FileWriter<String> restoreWriter(List<FileWriterBucketState> states, Path basePath, RollingPolicy<String, String> rollingPolicy, OutputFileConfig outputFileConfig) throws IOException {
        FileWriter<String> writer = this.createWriter(basePath, rollingPolicy, outputFileConfig);
        writer.initializeState(states);
        return writer;
    }

    private static class VerifyingBucketAssigner
    implements BucketAssigner<String, String> {
        private static final long serialVersionUID = 7729086510972377578L;
        private final Long expectedTimestamp;
        private final long expectedWatermark;
        private final long expectedProcessingTime;

        VerifyingBucketAssigner(Long expectedTimestamp, long expectedWatermark, long expectedProcessingTime) {
            this.expectedTimestamp = expectedTimestamp;
            this.expectedWatermark = expectedWatermark;
            this.expectedProcessingTime = expectedProcessingTime;
        }

        public String getBucketId(String element, BucketAssigner.Context context) {
            Long elementTimestamp = context.timestamp();
            long watermark = context.currentWatermark();
            long processingTime = context.currentProcessingTime();
            Assertions.assertThat((Long)elementTimestamp).isEqualTo((Object)this.expectedTimestamp);
            Assertions.assertThat((long)processingTime).isEqualTo(this.expectedProcessingTime);
            Assertions.assertThat((long)watermark).isEqualTo(this.expectedWatermark);
            return element;
        }

        public SimpleVersionedSerializer<String> getSerializer() {
            return SimpleVersionedStringSerializer.INSTANCE;
        }
    }

    private static class ManuallyTriggeredProcessingTimeService
    implements ProcessingTimeService {
        private long now;
        private final Queue<Tuple2<Long, ProcessingTimeService.ProcessingTimeCallback>> timers = new PriorityQueue<Tuple2>(Comparator.comparingLong(o -> (Long)o.f0));

        private ManuallyTriggeredProcessingTimeService() {
        }

        public long getCurrentProcessingTime() {
            return this.now;
        }

        public ScheduledFuture<?> registerTimer(long time, ProcessingTimeService.ProcessingTimeCallback processingTimeCallback) {
            if (time <= this.now) {
                try {
                    processingTimeCallback.onProcessingTime(this.now);
                }
                catch (Exception e) {
                    ExceptionUtils.rethrow((Throwable)e);
                }
            } else {
                this.timers.add((Tuple2<Long, ProcessingTimeService.ProcessingTimeCallback>)new Tuple2((Object)time, (Object)processingTimeCallback));
            }
            return null;
        }

        public void advanceTo(long time) throws Exception {
            if (time > this.now) {
                Tuple2<Long, ProcessingTimeService.ProcessingTimeCallback> timer;
                this.now = time;
                while ((timer = this.timers.peek()) != null && (Long)timer.f0 <= this.now) {
                    ((ProcessingTimeService.ProcessingTimeCallback)timer.f1).onProcessingTime(this.now);
                    this.timers.poll();
                }
            }
        }
    }

    private static class ContextImpl
    implements SinkWriter.Context {
        private final long watermark;
        private final Long timestamp;

        public ContextImpl() {
            this(0L, 0L);
        }

        private ContextImpl(long watermark, Long timestamp) {
            this.watermark = watermark;
            this.timestamp = timestamp;
        }

        public long currentWatermark() {
            return this.watermark;
        }

        public Long timestamp() {
            return this.timestamp;
        }
    }
}

