/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.shuffle;

import java.util.Properties;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleConsumer;
import org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer;
import org.apache.flink.streaming.connectors.kafka.shuffle.StreamKafkaShuffleSink;
import org.apache.flink.streaming.util.keys.KeySelectorUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.PropertiesUtil;

@Deprecated
@Experimental
public class FlinkKafkaShuffle {
    static final String PRODUCER_PARALLELISM = "producer parallelism";
    static final String PARTITION_NUMBER = "partition number";

    public static <T, K> KeyedStream<T, K> persistentKeyBy(DataStream<T> dataStream, String topic, int producerParallelism, int numberOfPartitions, Properties properties, KeySelector<T, K> keySelector) {
        Properties kafkaProperties = PropertiesUtil.flatten((Properties)properties);
        kafkaProperties.setProperty(PRODUCER_PARALLELISM, String.valueOf(producerParallelism));
        kafkaProperties.setProperty(PARTITION_NUMBER, String.valueOf(numberOfPartitions));
        StreamExecutionEnvironment env = dataStream.getExecutionEnvironment();
        FlinkKafkaShuffle.writeKeyBy(dataStream, topic, kafkaProperties, keySelector);
        return FlinkKafkaShuffle.readKeyBy(topic, env, dataStream.getType(), kafkaProperties, keySelector);
    }

    public static <T> KeyedStream<T, Tuple> persistentKeyBy(DataStream<T> dataStream, String topic, int producerParallelism, int numberOfPartitions, Properties properties, int ... fields) {
        return FlinkKafkaShuffle.persistentKeyBy(dataStream, topic, producerParallelism, numberOfPartitions, properties, FlinkKafkaShuffle.keySelector(dataStream, fields));
    }

    public static <T, K> void writeKeyBy(DataStream<T> dataStream, String topic, Properties kafkaProperties, KeySelector<T, K> keySelector) {
        StreamExecutionEnvironment env = dataStream.getExecutionEnvironment();
        TypeSerializer typeSerializer = dataStream.getType().createSerializer(env.getConfig());
        FlinkKafkaShuffleProducer kafkaProducer = new FlinkKafkaShuffleProducer(topic, typeSerializer, kafkaProperties, (KeySelector)env.clean(keySelector), FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5);
        Preconditions.checkArgument((kafkaProperties.getProperty(PRODUCER_PARALLELISM) != null ? 1 : 0) != 0, (Object)"Missing producer parallelism for Kafka Shuffle");
        int producerParallelism = PropertiesUtil.getInt((Properties)kafkaProperties, (String)PRODUCER_PARALLELISM, (int)Integer.MIN_VALUE);
        FlinkKafkaShuffle.addKafkaShuffle(dataStream, kafkaProducer, producerParallelism);
    }

    public static <T> void writeKeyBy(DataStream<T> dataStream, String topic, Properties kafkaProperties, int ... fields) {
        FlinkKafkaShuffle.writeKeyBy(dataStream, topic, kafkaProperties, FlinkKafkaShuffle.keySelector(dataStream, fields));
    }

    public static <T, K> KeyedStream<T, K> readKeyBy(String topic, StreamExecutionEnvironment env, TypeInformation<T> typeInformation, Properties kafkaProperties, KeySelector<T, K> keySelector) {
        TypeSerializer typeSerializer = typeInformation.createSerializer(env.getConfig());
        TypeInformationSerializationSchema schema = new TypeInformationSerializationSchema(typeInformation, typeSerializer);
        FlinkKafkaShuffleConsumer kafkaConsumer = new FlinkKafkaShuffleConsumer(topic, schema, typeSerializer, kafkaProperties);
        Preconditions.checkArgument((kafkaProperties.getProperty(PARTITION_NUMBER) != null ? 1 : 0) != 0, (Object)"Missing partition number for Kafka Shuffle");
        int numberOfPartitions = PropertiesUtil.getInt((Properties)kafkaProperties, (String)PARTITION_NUMBER, (int)Integer.MIN_VALUE);
        SingleOutputStreamOperator outputDataStream = env.addSource(kafkaConsumer).setParallelism(numberOfPartitions).setMaxParallelism(numberOfPartitions);
        return DataStreamUtils.reinterpretAsKeyedStream((DataStream)outputDataStream, keySelector);
    }

    private static <T, K> void addKafkaShuffle(DataStream<T> inputStream, FlinkKafkaShuffleProducer<T, K> kafkaShuffleProducer, int producerParallelism) {
        inputStream.getTransformation().getOutputType();
        StreamKafkaShuffleSink shuffleSinkOperator = new StreamKafkaShuffleSink(kafkaShuffleProducer);
        LegacySinkTransformation transformation = new LegacySinkTransformation(inputStream.getTransformation(), "kafka_shuffle", shuffleSinkOperator, inputStream.getExecutionEnvironment().getParallelism(), false);
        inputStream.getExecutionEnvironment().addOperator((Transformation)transformation);
        transformation.setParallelism(producerParallelism);
    }

    private static <T> KeySelector<T, Tuple> keySelector(DataStream<T> source, int ... fields) {
        KeySelectorUtil.ArrayKeySelector keySelector;
        if (source.getType() instanceof BasicArrayTypeInfo || source.getType() instanceof PrimitiveArrayTypeInfo) {
            keySelector = KeySelectorUtil.getSelectorForArray((int[])fields, (TypeInformation)source.getType());
        } else {
            Keys.ExpressionKeys keys = new Keys.ExpressionKeys(fields, source.getType());
            keySelector = KeySelectorUtil.getSelectorForKeys((Keys)keys, (TypeInformation)source.getType(), (ExecutionConfig)source.getExecutionEnvironment().getConfig());
        }
        return keySelector;
    }
}

