/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.testutils;

import java.util.Map;
import java.util.Properties;
import java.util.Random;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;

public class DataGenerators {
    public static void generateRandomizedIntegerSequence(StreamExecutionEnvironment env, KafkaTestEnvironment testServer, String topic, int numPartitions, final int numElements, final boolean randomizeOrder) throws Exception {
        env.setParallelism(numPartitions);
        env.setRestartStrategy(RestartStrategies.noRestart());
        DataStreamSource stream = env.addSource((SourceFunction)new RichParallelSourceFunction<Integer>(){
            private volatile boolean running = true;

            public void run(SourceFunction.SourceContext<Integer> ctx) {
                int[] elements = new int[numElements];
                int i = 0;
                int val = this.getRuntimeContext().getIndexOfThisSubtask();
                while (i < numElements) {
                    elements[i] = val;
                    ++i;
                    val += this.getRuntimeContext().getNumberOfParallelSubtasks();
                }
                if (randomizeOrder) {
                    Random rnd = new Random();
                    for (int i2 = 0; i2 < elements.length; ++i2) {
                        int otherPos = rnd.nextInt(elements.length);
                        int tmp = elements[i2];
                        elements[i2] = elements[otherPos];
                        elements[otherPos] = tmp;
                    }
                }
                int pos = 0;
                while (this.running && pos < elements.length) {
                    ctx.collect((Object)elements[pos++]);
                }
            }

            public void cancel() {
                this.running = false;
            }
        });
        Properties props = new Properties();
        props.putAll((Map<?, ?>)FlinkKafkaProducerBase.getPropertiesFromBrokerList((String)testServer.getBrokerConnectionString()));
        Properties secureProps = testServer.getSecureProperties();
        if (secureProps != null) {
            props.putAll((Map<?, ?>)testServer.getSecureProperties());
        }
        props.putAll((Map<?, ?>)testServer.getIdempotentProducerConfig());
        stream = stream.rebalance();
        testServer.produceIntoKafka(stream, topic, new TypeInformationSerializationSchema((TypeInformation)BasicTypeInfo.INT_TYPE_INFO, env.getConfig()), props, new FlinkKafkaPartitioner<Integer>(){

            public int partition(Integer next, byte[] serializedKey, byte[] serializedValue, String topic, int[] partitions) {
                return next % partitions.length;
            }
        });
        env.execute("Scrambles int sequence generator");
    }

    public static class InfiniteStringsGenerator
    extends Thread {
        private final KafkaTestEnvironment server;
        private final String topic;
        private volatile Throwable error;
        private volatile boolean running = true;

        public InfiniteStringsGenerator(KafkaTestEnvironment server, String topic) {
            this.server = server;
            this.topic = topic;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            OneInputStreamOperatorTestHarness testHarness = null;
            try {
                Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList((String)this.server.getBrokerConnectionString());
                producerProperties.setProperty("retries", "3");
                StreamSink sink = this.server.getProducerSink(this.topic, new SimpleStringSchema(), producerProperties, new FlinkFixedPartitioner());
                testHarness = new OneInputStreamOperatorTestHarness(sink);
                testHarness.open();
                StringBuilder bld = new StringBuilder();
                Random rnd = new Random();
                while (this.running) {
                    bld.setLength(0);
                    int len = rnd.nextInt(100) + 1;
                    for (int i = 0; i < len; ++i) {
                        bld.append((char)(rnd.nextInt(20) + 97));
                    }
                    String next = bld.toString();
                    testHarness.processElement(new StreamRecord((Object)next));
                }
            }
            catch (Throwable t) {
                this.error = t;
            }
            finally {
                if (testHarness != null) {
                    try {
                        testHarness.close();
                    }
                    catch (Throwable producerProperties) {}
                }
            }
        }

        public void shutdown() {
            this.running = false;
            this.interrupt();
        }

        public Throwable getError() {
            return this.error;
        }
    }
}

