/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.sink;

import java.io.DataInputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.sink.compactor.DecoderBasedReader;
import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
import org.apache.flink.connector.file.sink.compactor.FileCompactor;
import org.apache.flink.connector.file.sink.compactor.RecordWiseFileCompactor;
import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils;
import org.apache.flink.connector.file.sink.utils.PartSizeAndCheckpointRollingPolicy;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.testutils.junit.SharedObjectsExtension;
import org.apache.flink.testutils.junit.SharedReference;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

public class FileSinkCompactionSwitchITCase {
    private static final int PARALLELISM = 4;
    @RegisterExtension
    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).setRpcServiceSharing(RpcServiceSharing.DEDICATED).withHaLeadershipControl().build());
    protected static final int NUM_SOURCES = 4;
    protected static final int NUM_SINKS = 3;
    protected static final int NUM_RECORDS = 10000;
    protected static final int NUM_BUCKETS = 4;
    @TempDir
    private static java.nio.file.Path tmpDir;
    @RegisterExtension
    private final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
    private static final Map<String, CountDownLatch> LATCH_MAP;
    private String latchId;

    @BeforeEach
    void setup() {
        this.latchId = UUID.randomUUID().toString();
        LATCH_MAP.put(this.latchId, new CountDownLatch(12));
    }

    @AfterEach
    void teardown() {
        LATCH_MAP.remove(this.latchId);
    }

    @Test
    void testSwitchNeverEnabledToEnabled(@TempDir java.nio.file.Path tmpPath) throws Exception {
        String path = tmpPath.toFile().getAbsolutePath();
        FileSink<Integer> originFileSink = this.createFileSink(path, null, false);
        FileSink<Integer> restoredFileSink = this.createFileSink(path, FileSinkCompactionSwitchITCase.createFileCompactStrategy(), false);
        this.testSwitchingCompaction(path, originFileSink, restoredFileSink);
    }

    @Test
    void testSwitchDisabledToEnabled(@TempDir java.nio.file.Path tmpPath) throws Exception {
        String path = tmpPath.toFile().getAbsolutePath();
        FileSink<Integer> originFileSink = this.createFileSink(path, null, true);
        FileSink<Integer> restoredFileSink = this.createFileSink(path, FileSinkCompactionSwitchITCase.createFileCompactStrategy(), false);
        this.testSwitchingCompaction(path, originFileSink, restoredFileSink);
    }

    @Test
    void testSwitchEnabledToDisabled(@TempDir java.nio.file.Path tmpPath) throws Exception {
        String path = tmpPath.toFile().getAbsolutePath();
        FileSink<Integer> originFileSink = this.createFileSink(path, FileSinkCompactionSwitchITCase.createFileCompactStrategy(), false);
        FileSink<Integer> restoredFileSink = this.createFileSink(path, null, true);
        this.testSwitchingCompaction(path, originFileSink, restoredFileSink);
    }

    @Test
    void testSwitchEnabledToDisabledImproperly(@TempDir java.nio.file.Path tmpPath) throws Exception {
        String path = tmpPath.toFile().getAbsolutePath();
        FileSink<Integer> originFileSink = this.createFileSink(path, FileSinkCompactionSwitchITCase.createFileCompactStrategy(), false);
        FileSink<Integer> restoredFileSink = this.createFileSink(path, null, false);
        try {
            this.testSwitchingCompaction(path, originFileSink, restoredFileSink);
        }
        catch (JobExecutionException expected) {
            return;
        }
        Assertions.fail((String)"Job is not failing when compaction is disabled improperly");
    }

    private void testSwitchingCompaction(String path, FileSink<Integer> originFileSink, FileSink<Integer> restoredFileSink) throws Exception {
        String cpPath = "file://" + tmpDir.toFile().getAbsolutePath();
        SharedReference sendCountMap = this.sharedObjects.add(new ConcurrentHashMap());
        JobGraph jobGraph = this.createJobGraph(cpPath, originFileSink, false, (SharedReference<ConcurrentHashMap<Integer, Integer>>)sendCountMap);
        JobGraph restoringJobGraph = this.createJobGraph(cpPath, restoredFileSink, true, (SharedReference<ConcurrentHashMap<Integer, Integer>>)sendCountMap);
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(4).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            miniCluster.submitJob(jobGraph);
            LATCH_MAP.get(this.latchId).await();
            String savepointPath = (String)miniCluster.triggerSavepoint(jobGraph.getJobID(), tmpDir.toFile().getAbsolutePath(), true, SavepointFormatType.CANONICAL).get();
            LATCH_MAP.put(this.latchId, new CountDownLatch(8));
            restoringJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)savepointPath, (boolean)false));
            miniCluster.executeJobBlocking(restoringJobGraph);
        }
        FileSinkCompactionSwitchITCase.checkIntegerSequenceSinkOutput(path, (Map)sendCountMap.get(), 4, 4);
    }

    private JobGraph createJobGraph(String cpPath, FileSink<Integer> fileSink, boolean isFinite, SharedReference<ConcurrentHashMap<Integer, Integer>> sendCountMap) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Configuration config = new Configuration();
        config.set(ExecutionOptions.RUNTIME_MODE, (Object)RuntimeExecutionMode.STREAMING);
        config.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, (Object)false);
        env.configure((ReadableConfig)config, this.getClass().getClassLoader());
        env.enableCheckpointing(100L, CheckpointingMode.EXACTLY_ONCE);
        env.setRestartStrategy(RestartStrategies.noRestart());
        env.getCheckpointConfig().setCheckpointStorage((CheckpointStorage)new FileSystemCheckpointStorage(cpPath));
        env.setStateBackend((StateBackend)new HashMapStateBackend());
        env.addSource((SourceFunction)new CountingTestSource(this.latchId, 10000, isFinite, sendCountMap)).setParallelism(4).sinkTo(fileSink).uid("sink").setParallelism(3);
        StreamGraph streamGraph = env.getStreamGraph();
        return streamGraph.getJobGraph();
    }

    private FileSink<Integer> createFileSink(String path, FileCompactStrategy compactStrategy, boolean disableCompact) {
        FileSink.DefaultRowFormatBuilder sinkBuilder = (FileSink.DefaultRowFormatBuilder)((FileSink.DefaultRowFormatBuilder)FileSink.forRowFormat((Path)new Path(path), (Encoder)new IntegerFileSinkTestDataUtils.IntEncoder()).withBucketAssigner((BucketAssigner)new IntegerFileSinkTestDataUtils.ModuloBucketAssigner(4))).withRollingPolicy(new PartSizeAndCheckpointRollingPolicy(1024L, false));
        if (compactStrategy != null) {
            sinkBuilder = (FileSink.DefaultRowFormatBuilder)sinkBuilder.enableCompact(compactStrategy, FileSinkCompactionSwitchITCase.createFileCompactor());
        } else if (disableCompact) {
            sinkBuilder = (FileSink.DefaultRowFormatBuilder)sinkBuilder.disableCompact();
        }
        return sinkBuilder.build();
    }

    private static FileCompactor createFileCompactor() {
        return new RecordWiseFileCompactor((RecordWiseFileCompactor.Reader.Factory)new DecoderBasedReader.Factory(IntegerFileSinkTestDataUtils.IntDecoder::new));
    }

    private static FileCompactStrategy createFileCompactStrategy() {
        return FileCompactStrategy.Builder.newBuilder().enableCompactionOnCheckpoint(2).build();
    }

    private static void checkIntegerSequenceSinkOutput(String path, Map<Integer, Integer> countMap, int numBuckets, int numSources) throws Exception {
        Assertions.assertThat(countMap).hasSize(numSources);
        File dir = new File(path);
        Object[] subDirNames = dir.list();
        Assertions.assertThat((Object[])subDirNames).isNotNull();
        Arrays.sort(subDirNames, Comparator.comparingInt(Integer::parseInt));
        Assertions.assertThat((Object[])subDirNames).hasSize(numBuckets);
        for (int i = 0; i < numBuckets; ++i) {
            Assertions.assertThat((String)subDirNames[i]).isEqualTo(Integer.toString(i));
            File bucketDir = new File(path, (String)subDirNames[i]);
            Assertions.assertThat((File)bucketDir).isDirectory().as(bucketDir.getAbsolutePath() + " Should be a existing directory", new Object[0]);
            HashMap<Integer, Integer> counts = new HashMap<Integer, Integer>();
            Object[] files = bucketDir.listFiles(f -> !f.getName().startsWith("."));
            Assertions.assertThat((Object[])files).isNotNull();
            for (Object file : files) {
                Assertions.assertThat((File)file).isFile();
                try {
                    DataInputStream dataInputStream = new DataInputStream(new FileInputStream((File)file));
                    Throwable throwable = null;
                    try {
                        try {
                            while (true) {
                                int value = dataInputStream.readInt();
                                counts.compute(value, (k, v) -> v == null ? 1 : v + 1);
                            }
                        }
                        catch (Throwable value) {
                            throwable = value;
                            throw value;
                        }
                    }
                    catch (Throwable throwable2) {
                        if (dataInputStream != null) {
                            if (throwable != null) {
                                try {
                                    dataInputStream.close();
                                }
                                catch (Throwable throwable3) {
                                    throwable.addSuppressed(throwable3);
                                }
                            } else {
                                dataInputStream.close();
                            }
                        }
                        throw throwable2;
                    }
                }
                catch (EOFException dataInputStream) {
                }
            }
            int bucketId = i;
            int expectedCount = countMap.values().stream().map(numRecords -> numRecords / numBuckets + (bucketId < numRecords % numBuckets ? 1 : 0)).mapToInt(num -> num).max().getAsInt();
            Assertions.assertThat(counts).hasSize(expectedCount);
            ArrayList<Integer> countList = new ArrayList<Integer>(countMap.values());
            Collections.sort(countList);
            for (int j = 0; j < countList.size(); ++j) {
                int rangeFrom = j == 0 ? 0 : (Integer)countList.get(j - 1);
                rangeFrom = bucketId + (rangeFrom % numBuckets == 0 ? rangeFrom : rangeFrom + numBuckets - rangeFrom % numBuckets);
                int rangeTo = (Integer)countList.get(j);
                for (int k2 = rangeFrom; k2 < rangeTo; k2 += numBuckets) {
                    ((AbstractIntegerAssert)Assertions.assertThat((int)counts.getOrDefault(k2, 0)).as("The record " + k2 + " should occur " + (numBuckets - j) + " times,  but only occurs " + counts.getOrDefault(k2, 0) + "time", new Object[0])).isEqualTo(numBuckets - j);
                }
            }
        }
    }

    static {
        LATCH_MAP = new ConcurrentHashMap<String, CountDownLatch>();
    }

    private static class CountingTestSource
    extends RichParallelSourceFunction<Integer>
    implements CheckpointListener,
    CheckpointedFunction {
        private final String latchId;
        private final int numberOfRecords;
        private final boolean isFinite;
        private final SharedReference<ConcurrentHashMap<Integer, Integer>> sendCountMap;
        private ListState<Integer> nextValueState;
        private int nextValue;
        private volatile boolean isCanceled;
        private volatile boolean snapshottedAfterAllRecordsOutput;
        private volatile boolean isWaitingCheckpointComplete;

        public CountingTestSource(String latchId, int numberOfRecords, boolean isFinite, SharedReference<ConcurrentHashMap<Integer, Integer>> sendCountMap) {
            this.latchId = latchId;
            this.numberOfRecords = numberOfRecords;
            this.isFinite = isFinite;
            this.sendCountMap = sendCountMap;
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            this.nextValueState = context.getOperatorStateStore().getListState(new ListStateDescriptor("nextValue", Integer.class));
            if (this.nextValueState.get() != null && ((Iterable)this.nextValueState.get()).iterator().hasNext()) {
                this.nextValue = (Integer)((Iterable)this.nextValueState.get()).iterator().next();
            }
        }

        public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            this.sendRecordsUntil(this.isFinite ? this.nextValue + this.numberOfRecords : Integer.MAX_VALUE, ctx);
            this.isWaitingCheckpointComplete = true;
            CountDownLatch latch = (CountDownLatch)LATCH_MAP.get(this.latchId);
            latch.await();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void sendRecordsUntil(int targetNumber, SourceFunction.SourceContext<Integer> ctx) throws InterruptedException {
            while (!this.isCanceled && this.nextValue < targetNumber) {
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    ctx.collect((Object)this.nextValue++);
                    if (!this.isFinite && this.nextValue % 100 == 0) {
                        Thread.sleep(1L);
                    }
                }
            }
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            this.nextValueState.update(Collections.singletonList(this.nextValue));
            this.sendCountMap.consumeSync(m -> m.put(this.getRuntimeContext().getIndexOfThisSubtask(), this.nextValue));
            if (this.isWaitingCheckpointComplete) {
                this.snapshottedAfterAllRecordsOutput = true;
            }
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            if (!this.isFinite || this.isWaitingCheckpointComplete && this.snapshottedAfterAllRecordsOutput) {
                CountDownLatch latch = (CountDownLatch)LATCH_MAP.get(this.latchId);
                latch.countDown();
            }
        }

        public void cancel() {
            this.isCanceled = true;
        }
    }
}

