/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.state;

import java.util.Random;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.test.util.AbstractTestBase;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

@Ignore
public class ManualWindowSpeedITCase
extends AbstractTestBase {
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();

    @Test
    public void testTumblingIngestionTimeWindowsWithFsBackend() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        env.setParallelism(1);
        String checkpoints = this.tempFolder.newFolder().toURI().toString();
        env.setStateBackend((AbstractStateBackend)new FsStateBackend(checkpoints));
        env.addSource((SourceFunction)new InfiniteTupleSource(1000)).keyBy(new int[]{0}).timeWindow(Time.seconds((long)3L)).reduce((ReduceFunction)new ReduceFunction<Tuple2<String, Integer>>(){
            private static final long serialVersionUID = 1L;

            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                return Tuple2.of((Object)value1.f0, (Object)((Integer)value1.f1 + (Integer)value2.f1));
            }
        }).filter((FilterFunction)new FilterFunction<Tuple2<String, Integer>>(){
            private static final long serialVersionUID = 1L;

            public boolean filter(Tuple2<String, Integer> value) throws Exception {
                return ((String)value.f0).startsWith("Tuple 0");
            }
        }).print();
        env.execute();
    }

    @Test
    public void testTumblingIngestionTimeWindowsWithFsBackendWithLateness() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        env.setParallelism(1);
        String checkpoints = this.tempFolder.newFolder().toURI().toString();
        env.setStateBackend((AbstractStateBackend)new FsStateBackend(checkpoints));
        env.addSource((SourceFunction)new InfiniteTupleSource(10000)).keyBy(new int[]{0}).timeWindow(Time.seconds((long)3L)).allowedLateness(Time.seconds((long)1L)).reduce((ReduceFunction)new ReduceFunction<Tuple2<String, Integer>>(){
            private static final long serialVersionUID = 1L;

            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                return Tuple2.of((Object)value1.f0, (Object)((Integer)value1.f1 + (Integer)value2.f1));
            }
        }).filter((FilterFunction)new FilterFunction<Tuple2<String, Integer>>(){
            private static final long serialVersionUID = 1L;

            public boolean filter(Tuple2<String, Integer> value) throws Exception {
                return ((String)value.f0).startsWith("Tuple 0");
            }
        }).print();
        env.execute();
    }

    @Test
    public void testTumblingIngestionTimeWindowsWithRocksDBBackend() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        env.setParallelism(1);
        env.setStateBackend((AbstractStateBackend)new RocksDBStateBackend((AbstractStateBackend)new MemoryStateBackend()));
        env.addSource((SourceFunction)new InfiniteTupleSource(10000)).keyBy(new int[]{0}).timeWindow(Time.seconds((long)3L)).reduce((ReduceFunction)new ReduceFunction<Tuple2<String, Integer>>(){
            private static final long serialVersionUID = 1L;

            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                return Tuple2.of((Object)value1.f0, (Object)((Integer)value1.f1 + (Integer)value2.f1));
            }
        }).filter((FilterFunction)new FilterFunction<Tuple2<String, Integer>>(){
            private static final long serialVersionUID = 1L;

            public boolean filter(Tuple2<String, Integer> value) throws Exception {
                return ((String)value.f0).startsWith("Tuple 0");
            }
        }).print();
        env.execute();
    }

    @Test
    public void testTumblingIngestionTimeWindowsWithRocksDBBackendWithLateness() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        env.setParallelism(1);
        env.setStateBackend((AbstractStateBackend)new RocksDBStateBackend((AbstractStateBackend)new MemoryStateBackend()));
        env.addSource((SourceFunction)new InfiniteTupleSource(10000)).keyBy(new int[]{0}).timeWindow(Time.seconds((long)3L)).allowedLateness(Time.seconds((long)1L)).reduce((ReduceFunction)new ReduceFunction<Tuple2<String, Integer>>(){
            private static final long serialVersionUID = 1L;

            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                return Tuple2.of((Object)value1.f0, (Object)((Integer)value1.f1 + (Integer)value2.f1));
            }
        }).filter((FilterFunction)new FilterFunction<Tuple2<String, Integer>>(){
            private static final long serialVersionUID = 1L;

            public boolean filter(Tuple2<String, Integer> value) throws Exception {
                return ((String)value.f0).startsWith("Tuple 0");
            }
        }).print();
        env.execute();
    }

    @Test
    public void testAlignedProcessingTimeWindows() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        env.setParallelism(1);
        env.setStateBackend((AbstractStateBackend)new RocksDBStateBackend((AbstractStateBackend)new MemoryStateBackend()));
        env.addSource((SourceFunction)new InfiniteTupleSource(10000)).keyBy(new int[]{0}).timeWindow(Time.seconds((long)3L)).reduce((ReduceFunction)new ReduceFunction<Tuple2<String, Integer>>(){
            private static final long serialVersionUID = 1L;

            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                return Tuple2.of((Object)value1.f0, (Object)((Integer)value1.f1 + (Integer)value2.f1));
            }
        }).filter((FilterFunction)new FilterFunction<Tuple2<String, Integer>>(){
            private static final long serialVersionUID = 1L;

            public boolean filter(Tuple2<String, Integer> value) throws Exception {
                return ((String)value.f0).startsWith("Tuple 0");
            }
        }).print();
        env.execute();
    }

    public static class InfiniteTupleSource
    implements ParallelSourceFunction<Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;
        private int numKeys;
        private volatile boolean running = true;

        public InfiniteTupleSource(int numKeys) {
            this.numKeys = numKeys;
        }

        public void run(SourceFunction.SourceContext<Tuple2<String, Integer>> out) throws Exception {
            Random random = new Random(42L);
            while (this.running) {
                Tuple2 tuple = new Tuple2((Object)("Tuple " + random.nextInt(this.numKeys)), (Object)1);
                out.collect((Object)tuple);
            }
        }

        public void cancel() {
            this.running = false;
        }
    }
}

