/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.scheduler.ClusterDatasetCorruptedException;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.CachedDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.transformations.CacheTransformation;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.AbstractTestBaseJUnit4;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class CacheITCase
extends AbstractTestBaseJUnit4 {
    private StreamExecutionEnvironment env;
    private MiniClusterWithClientResource miniClusterWithClientResource;

    @BeforeEach
    void setUp() throws Exception {
        Configuration configuration = new Configuration();
        this.miniClusterWithClientResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(8).setRpcServiceSharing(RpcServiceSharing.DEDICATED).withHaLeadershipControl().build());
        this.miniClusterWithClientResource.before();
        configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_ENABLED, (Object)false);
        this.env = new TestStreamEnvironment(this.miniClusterWithClientResource.getMiniCluster(), configuration, 8, Collections.emptyList(), Collections.emptyList());
        this.env.setRuntimeMode(RuntimeExecutionMode.BATCH);
    }

    @AfterEach
    void tearDown() {
        this.miniClusterWithClientResource.after();
    }

    @Test
    void testCacheProduceAndConsume(@TempDir java.nio.file.Path tmpDir) throws Exception {
        File file = this.prepareTestData(tmpDir);
        FileSource source = FileSource.forRecordStreamFormat((StreamFormat)new TextLineInputFormat(), (Path[])new Path[]{new Path(file.getPath())}).build();
        CachedDataStream cachedDataStream = this.env.fromSource((Source)source, WatermarkStrategy.noWatermarks(), "source").map((MapFunction & Serializable)i -> Integer.parseInt(i) + 1).cache();
        this.executeAndVerifyResult(tmpDir, (DataStream)cachedDataStream, "2", "3", "4");
        this.executeAndVerifyResult(tmpDir, (DataStream)cachedDataStream, "2", "3", "4");
    }

    @Test
    void testInvalidateCache(@TempDir java.nio.file.Path tmpDir) throws Exception {
        File file = this.prepareTestData(tmpDir);
        FileSource source = FileSource.forRecordStreamFormat((StreamFormat)new TextLineInputFormat(), (Path[])new Path[]{new Path(file.getPath())}).build();
        CachedDataStream cachedDataStream = this.env.fromSource((Source)source, WatermarkStrategy.noWatermarks(), "source").map((MapFunction & Serializable)i -> Integer.parseInt(i) + 1).cache();
        this.executeAndVerifyResult(tmpDir, (DataStream)cachedDataStream, "2", "3", "4");
        cachedDataStream.invalidate();
        Assertions.assertThat((boolean)file.delete()).isTrue();
        try (FileWriter writer = new FileWriter(file);){
            writer.write("4\n5\n6\n");
        }
        this.executeAndVerifyResult(tmpDir, (DataStream)cachedDataStream, "5", "6", "7");
    }

    @Test
    void testBatchProduceCacheStreamConsume(@TempDir java.nio.file.Path tmpDir) throws Exception {
        File file = this.prepareTestData(tmpDir);
        FileSource source = FileSource.forRecordStreamFormat((StreamFormat)new TextLineInputFormat(), (Path[])new Path[]{new Path(file.getPath())}).build();
        CachedDataStream cachedDataStream = this.env.fromSource((Source)source, WatermarkStrategy.noWatermarks(), "source").map(Integer::parseInt).map((MapFunction & Serializable)i -> i + 1).cache();
        this.executeAndVerifyResult(tmpDir, (DataStream)cachedDataStream, "2", "3", "4");
        this.env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        SingleOutputStreamOperator dataStream = cachedDataStream.map((MapFunction & Serializable)i -> i + 1);
        this.executeAndVerifyResult(tmpDir, (DataStream)dataStream, "3", "4", "5");
    }

    @Test
    void testCacheProduceAndConsumeWithDifferentPartitioner(@TempDir java.nio.file.Path tmpDir) throws Exception {
        DataStreamSource ds = this.env.fromData((Object[])new Tuple2[]{new Tuple2((Object)1, (Object)1), new Tuple2((Object)2, (Object)1), new Tuple2((Object)2, (Object)1)});
        CachedDataStream cacheSource = ds.cache();
        SingleOutputStreamOperator result = cacheSource.keyBy((KeySelector & Serializable)v -> (Integer)v.f0).reduce((ReduceFunction & Serializable)(v1, v2) -> new Tuple2(v1.f0, (Object)((Integer)v1.f1 + (Integer)v2.f1)));
        this.executeAndVerifyResult(tmpDir, (DataStream)result, "(1,1)", "(2,2)");
        result = cacheSource.keyBy((KeySelector & Serializable)t -> (Integer)t.f1).reduce((ReduceFunction & Serializable)(v1, v2) -> new Tuple2((Object)((Integer)v1.f0 + (Integer)v2.f0), v1.f1));
        this.executeAndVerifyResult(tmpDir, (DataStream)result, "(5,1)");
    }

    @Test
    void testCacheSideOutput(@TempDir java.nio.file.Path tmpDir) throws Exception {
        OutputTag<Integer> tag = new OutputTag<Integer>("2"){};
        DataStreamSource ds = this.env.fromData((Object[])new Tuple2[]{new Tuple2((Object)1, (Object)1), new Tuple2((Object)2, (Object)1), new Tuple2((Object)2, (Object)2)});
        SingleOutputStreamOperator processed = ds.process((ProcessFunction)new ProcessFunction<Tuple2<Integer, Integer>, Integer>((OutputTag)tag){
            final /* synthetic */ OutputTag val$tag;
            {
                this.val$tag = outputTag;
            }

            public void processElement(Tuple2<Integer, Integer> v, ProcessFunction.Context ctx, Collector<Integer> out) {
                if ((Integer)v.f0 == 2) {
                    ctx.output(this.val$tag, v.f1);
                    return;
                }
                out.collect(v.f1);
            }
        });
        CachedDataStream cachedSideOutput = processed.getSideOutput((OutputTag)tag).cache();
        this.executeAndVerifyResult(tmpDir, (DataStream)cachedSideOutput, "1", "2");
        this.executeAndVerifyResult(tmpDir, (DataStream)cachedSideOutput, "1", "2");
    }

    @Test
    void testRetryOnCorruptedClusterDataset(@TempDir java.nio.file.Path tmpDir) throws Exception {
        File file = this.prepareTestData(tmpDir);
        FileSource source = FileSource.forRecordStreamFormat((StreamFormat)new TextLineInputFormat(), (Path[])new Path[]{new Path(file.getPath())}).build();
        CachedDataStream cachedDataStream = this.env.fromSource((Source)source, WatermarkStrategy.noWatermarks(), "source").map((MapFunction & Serializable)i -> Integer.parseInt(i) + 1).cache();
        this.executeAndVerifyResult(tmpDir, (DataStream)cachedDataStream, "2", "3", "4");
        AbstractID datasetId = ((CacheTransformation)cachedDataStream.getTransformation()).getDatasetId();
        Assertions.assertThat((boolean)file.delete()).isTrue();
        try (FileWriter writer = new FileWriter(file);){
            writer.write("4\n5\n6\n");
        }
        SingleOutputStreamOperator dataStream = cachedDataStream.flatMap((FlatMapFunction & Serializable)(value, out) -> {
            if (value < 5) {
                throw new ClusterDatasetCorruptedException(null, Collections.singletonList(new IntermediateDataSetID(datasetId)));
            }
            out.collect(value);
        }).returns(Integer.class);
        this.executeAndVerifyResult(tmpDir, (DataStream)dataStream, "5", "6", "7");
    }

    private <T> void executeAndVerifyResult(java.nio.file.Path tmpDir, DataStream<T> dataStream, String ... expectedResult) throws Exception {
        File outputFile = new File(tmpDir.toFile(), UUID.randomUUID().toString());
        dataStream.sinkTo(this.getFileSink(outputFile));
        this.env.execute();
        Assertions.assertThat(CacheITCase.getFileContent(outputFile)).containsExactlyInAnyOrder((Object[])expectedResult);
    }

    private <T> FileSink<T> getFileSink(File outputFile) {
        return FileSink.forRowFormat((Path)new Path(outputFile.getPath()), (Encoder)new SimpleStringEncoder()).build();
    }

    private static List<String> getFileContent(File directory) throws IOException {
        ArrayList<String> res = new ArrayList<String>();
        Collection filesInBucket = FileUtils.listFiles((File)directory, null, (boolean)true);
        for (File file : filesInBucket) {
            res.addAll(Arrays.asList(FileUtils.readFileToString((File)file).split("\n")));
        }
        return res;
    }

    private File prepareTestData(java.nio.file.Path tmpDir) throws IOException {
        File datafile = new File(tmpDir.toFile(), UUID.randomUUID().toString());
        try (FileWriter writer = new FileWriter(datafile);){
            writer.write("1\n2\n3\n");
        }
        return datafile;
    }
}

