/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.streaming.runtime.util.NoOpIntMap;
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
import org.apache.flink.test.util.AbstractTestBase;
import org.junit.Assert;
import org.junit.Test;

public class PartitionerITCase
extends AbstractTestBase {
    private static final int PARALLELISM = 3;
    private static final List<String> INPUT = Arrays.asList("a", "b", "c", "d", "e", "f", "g");

    @Test(expected=UnsupportedOperationException.class)
    public void testForwardFailsLowToHighParallelism() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource src = env.fromElements((Object[])new Integer[]{1, 2, 3});
        src.forward().map((MapFunction)new NoOpIntMap());
        env.execute();
    }

    @Test(expected=UnsupportedOperationException.class)
    public void testForwardFailsHightToLowParallelism() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator src = env.fromElements((Object[])new Integer[]{1, 2, 3}).map((MapFunction)new NoOpIntMap());
        src.forward().map((MapFunction)new NoOpIntMap()).setParallelism(1);
        env.execute();
    }

    @Test
    public void partitionerTest() {
        TestListResultSink hashPartitionResultSink = new TestListResultSink();
        TestListResultSink customPartitionResultSink = new TestListResultSink();
        TestListResultSink broadcastPartitionResultSink = new TestListResultSink();
        TestListResultSink forwardPartitionResultSink = new TestListResultSink();
        TestListResultSink rebalancePartitionResultSink = new TestListResultSink();
        TestListResultSink globalPartitionResultSink = new TestListResultSink();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        DataStreamSource src = env.fromCollection((Collection)INPUT.stream().map(Tuple1::of).collect(Collectors.toList()));
        src.keyBy(new int[]{0}).map((MapFunction)new SubtaskIndexAssigner()).addSink(hashPartitionResultSink);
        SingleOutputStreamOperator partitionCustom = src.partitionCustom((Partitioner)new Partitioner<String>(){

            public int partition(String key, int numPartitions) {
                if (key.equals("c")) {
                    return 2;
                }
                return 0;
            }
        }, 0).map((MapFunction)new SubtaskIndexAssigner());
        partitionCustom.addSink(customPartitionResultSink);
        src.broadcast().map((MapFunction)new SubtaskIndexAssigner()).addSink(broadcastPartitionResultSink);
        src.rebalance().map((MapFunction)new SubtaskIndexAssigner()).addSink(rebalancePartitionResultSink);
        src.map((MapFunction)new MapFunction<Tuple1<String>, Tuple1<String>>(){
            private static final long serialVersionUID = 1L;

            public Tuple1<String> map(Tuple1<String> value) throws Exception {
                return value;
            }
        }).forward().map((MapFunction)new SubtaskIndexAssigner()).addSink(forwardPartitionResultSink);
        src.global().map((MapFunction)new SubtaskIndexAssigner()).addSink(globalPartitionResultSink);
        try {
            env.execute();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        List<Tuple2<Integer, String>> hashPartitionResult = hashPartitionResultSink.getResult();
        List<Tuple2<Integer, String>> customPartitionResult = customPartitionResultSink.getResult();
        List<Tuple2<Integer, String>> broadcastPartitionResult = broadcastPartitionResultSink.getResult();
        List<Tuple2<Integer, String>> forwardPartitionResult = forwardPartitionResultSink.getResult();
        List<Tuple2<Integer, String>> rebalancePartitionResult = rebalancePartitionResultSink.getResult();
        List<Tuple2<Integer, String>> globalPartitionResult = globalPartitionResultSink.getResult();
        PartitionerITCase.verifyHashPartitioning(hashPartitionResult);
        PartitionerITCase.verifyCustomPartitioning(customPartitionResult);
        PartitionerITCase.verifyBroadcastPartitioning(broadcastPartitionResult);
        PartitionerITCase.verifyRebalancePartitioning(forwardPartitionResult);
        PartitionerITCase.verifyRebalancePartitioning(rebalancePartitionResult);
        PartitionerITCase.verifyGlobalPartitioning(globalPartitionResult);
    }

    private static void verifyHashPartitioning(List<Tuple2<Integer, String>> hashPartitionResult) {
        HashMap<Object, Object> verifier = new HashMap<Object, Object>();
        for (Tuple2<Integer, String> elem : hashPartitionResult) {
            Integer subtaskIndex = (Integer)verifier.get(elem.f1);
            if (subtaskIndex == null) {
                verifier.put(elem.f1, elem.f0);
                continue;
            }
            if (Objects.equals(subtaskIndex, elem.f0)) continue;
            Assert.fail();
        }
    }

    private static void verifyCustomPartitioning(List<Tuple2<Integer, String>> customPartitionResult) {
        for (Tuple2<Integer, String> stringWithSubtask : customPartitionResult) {
            if (((String)stringWithSubtask.f1).equals("c")) {
                Assert.assertEquals((Object)new Integer(2), (Object)stringWithSubtask.f0);
                continue;
            }
            Assert.assertEquals((Object)new Integer(0), (Object)stringWithSubtask.f0);
        }
    }

    private static void verifyBroadcastPartitioning(List<Tuple2<Integer, String>> broadcastPartitionResult) {
        Set expectedResult = INPUT.stream().flatMap(input -> IntStream.range(0, 3).mapToObj(i -> Tuple2.of((Object)i, (Object)input))).collect(Collectors.toSet());
        Assert.assertEquals(expectedResult, new HashSet<Tuple2<Integer, String>>(broadcastPartitionResult));
    }

    private static void verifyRebalancePartitioning(List<Tuple2<Integer, String>> rebalancePartitionResult) {
        Collections.sort(rebalancePartitionResult, Comparator.comparing(o -> (String)o.f1));
        Tuple2<Integer, String> firstEntry = rebalancePartitionResult.get(0);
        int offset = (Integer)firstEntry.f0;
        List expected = IntStream.range(0, rebalancePartitionResult.size()).mapToObj(index -> Tuple2.of((Object)((offset + index) % 3), (Object)INPUT.get(index))).collect(Collectors.toList());
        Assert.assertEquals(expected, rebalancePartitionResult);
    }

    private static void verifyGlobalPartitioning(List<Tuple2<Integer, String>> globalPartitionResult) {
        List expected = INPUT.stream().map(i -> Tuple2.of((Object)0, (Object)i)).collect(Collectors.toList());
        Assert.assertEquals(expected, globalPartitionResult);
    }

    private static class SubtaskIndexAssigner
    extends RichMapFunction<Tuple1<String>, Tuple2<Integer, String>> {
        private static final long serialVersionUID = 1L;
        private int indexOfSubtask;

        private SubtaskIndexAssigner() {
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            RuntimeContext runtimeContext = this.getRuntimeContext();
            this.indexOfSubtask = runtimeContext.getIndexOfThisSubtask();
        }

        public Tuple2<Integer, String> map(Tuple1<String> value) throws Exception {
            return new Tuple2((Object)this.indexOfSubtask, value.f0);
        }
    }
}

