/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.AbstractInput;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.transformations.AbstractMultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
import org.apache.flink.test.util.AbstractTestBase;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class MultipleInputITCase
extends AbstractTestBase {
    @Test
    public void test() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        TestListResultSink resultSink = new TestListResultSink();
        DataStreamSource source1 = env.fromElements((Object[])new Integer[]{1, 10});
        DataStreamSource source2 = env.fromElements((Object[])new Long[]{2L, 11L});
        DataStreamSource source3 = env.fromElements((Object[])new String[]{"42", "44"});
        MultipleInputTransformation transform = new MultipleInputTransformation("My Operator", (StreamOperatorFactory)new SumAllInputOperatorFactory(), (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, 1);
        env.addOperator((Transformation)transform.addInput(source1.getTransformation()).addInput(source2.getTransformation()).addInput(source3.getTransformation()));
        new MultipleConnectedStreams(env).transform((AbstractMultipleInputTransformation)transform).addSink(resultSink);
        env.execute();
        List result = resultSink.getResult();
        Collections.sort(result);
        long actualSum = (Long)result.get(result.size() - 1);
        Assert.assertEquals((long)110L, (long)actualSum);
    }

    @Test
    public void testKeyedState() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        TestListResultSink resultSink = new TestListResultSink();
        DataStreamSource source1 = env.fromElements((Object[])new Long[]{0L, 3L});
        DataStreamSource source2 = env.fromElements((Object[])new Long[]{13L, 16L});
        DataStreamSource source3 = env.fromElements((Object[])new Long[]{101L, 104L});
        KeyedMultipleInputTransformation transform = new KeyedMultipleInputTransformation("My Operator", (StreamOperatorFactory)new KeyedSumMultipleInputOperatorFactory(), (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, 1, (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO);
        KeySelector & Serializable keySelector = (KeySelector & Serializable)value -> value % 3L;
        env.addOperator((Transformation)transform.addInput(source1.getTransformation(), (KeySelector)keySelector).addInput(source2.getTransformation(), (KeySelector)keySelector).addInput(source3.getTransformation(), (KeySelector)keySelector));
        new MultipleConnectedStreams(env).transform((AbstractMultipleInputTransformation)transform).addSink(resultSink);
        env.execute();
        List result = resultSink.getResult();
        Collections.sort(result);
        MatcherAssert.assertThat(result, (Matcher)Matchers.contains((Object[])new Long[]{0L, 3L, 13L, 29L, 101L, 205L}));
    }

    public static class SumAllInputOperatorFactory
    extends AbstractStreamOperatorFactory<Long> {
        public <T extends StreamOperator<Long>> T createStreamOperator(StreamOperatorParameters<Long> parameters) {
            return (T)((Object)new SumAllInputOperator(parameters));
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return SumAllInputOperator.class;
        }
    }

    public static class SumAllInputOperator
    extends AbstractStreamOperatorV2<Long>
    implements MultipleInputStreamOperator<Long> {
        private long sum;

        public SumAllInputOperator(StreamOperatorParameters<Long> parameters) {
            super(parameters, 3);
        }

        public List<Input> getInputs() {
            return Arrays.asList(new Input[]{new SumInput(this, 1), new SumInput(this, 2), new SumInput(this, 3)});
        }

        public class SumInput<T>
        extends AbstractInput<T, Long> {
            public SumInput(AbstractStreamOperatorV2<Long> owner, int inputId) {
                super(owner, inputId);
            }

            public void processElement(StreamRecord<T> element) throws Exception {
                SumAllInputOperator.this.sum = SumAllInputOperator.this.sum + Long.valueOf(element.getValue().toString());
                this.output.collect((Object)new StreamRecord((Object)SumAllInputOperator.this.sum));
            }
        }
    }

    private static class KeyedSumMultipleInputOperatorFactory
    extends AbstractStreamOperatorFactory<Long> {
        private KeyedSumMultipleInputOperatorFactory() {
        }

        public <T extends StreamOperator<Long>> T createStreamOperator(StreamOperatorParameters<Long> parameters) {
            return (T)((Object)new KeyedSumMultipleInputOperator(parameters));
        }

        public Class<? extends StreamOperator<Long>> getStreamOperatorClass(ClassLoader classLoader) {
            return KeyedSumMultipleInputOperator.class;
        }
    }

    private static class KeyedSumMultipleInputOperator
    extends AbstractStreamOperatorV2<Long>
    implements MultipleInputStreamOperator<Long> {
        private ValueState<Long> sumState;

        public KeyedSumMultipleInputOperator(StreamOperatorParameters<Long> parameters) {
            super(parameters, 3);
        }

        public void initializeState(StateInitializationContext context) throws Exception {
            super.initializeState(context);
            this.sumState = context.getKeyedStateStore().getState(new ValueStateDescriptor("sum-state", (TypeSerializer)LongSerializer.INSTANCE));
        }

        public List<Input> getInputs() {
            return Arrays.asList(new Input[]{new KeyedSumInput(this, 1), new KeyedSumInput(this, 2), new KeyedSumInput(this, 3)});
        }

        private class KeyedSumInput
        extends AbstractInput<Long, Long> {
            public KeyedSumInput(AbstractStreamOperatorV2<Long> owner, int inputId) {
                super(owner, inputId);
            }

            public void processElement(StreamRecord<Long> element) throws Exception {
                if (KeyedSumMultipleInputOperator.this.sumState.value() == null) {
                    KeyedSumMultipleInputOperator.this.sumState.update((Object)0L);
                }
                KeyedSumMultipleInputOperator.this.sumState.update((Object)((Long)KeyedSumMultipleInputOperator.this.sumState.value() + (Long)element.getValue()));
                this.output.collect((Object)element.replace(KeyedSumMultipleInputOperator.this.sumState.value()));
            }
        }
    }
}

