/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.examples.statemachine;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.examples.statemachine.event.Event;
import org.apache.flink.streaming.examples.statemachine.generator.EventsGeneratorFunction;
import org.apache.flink.streaming.examples.statemachine.kafka.EventDeSerializationSchema;

public class KafkaEventsGeneratorJob {
    public static void main(String[] args) throws Exception {
        ParameterTool params = ParameterTool.fromArgs((String[])args);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        double errorRate = params.getDouble("error-rate", 0.0);
        int sleep = params.getInt("sleep", 1);
        double recordsPerSecond = params.getDouble("rps", KafkaEventsGeneratorJob.rpsFromSleep(sleep, env.getParallelism()));
        System.out.printf("Generating events to Kafka with standalone source with error rate %f and %.1f records per second\n", errorRate, recordsPerSecond);
        System.out.println();
        String kafkaTopic = params.get("kafka-topic");
        String brokers = params.get("brokers", "localhost:9092");
        EventsGeneratorFunction generatorFunction = new EventsGeneratorFunction(errorRate);
        DataGeneratorSource eventGeneratorSource = new DataGeneratorSource((GeneratorFunction)generatorFunction, Long.MAX_VALUE, RateLimiterStrategy.perSecond((double)recordsPerSecond), TypeInformation.of(Event.class));
        env.fromSource((Source)eventGeneratorSource, WatermarkStrategy.noWatermarks(), "Events Generator Source").sinkTo((Sink)KafkaSink.builder().setBootstrapServers(brokers).setRecordSerializer(KafkaRecordSerializationSchema.builder().setValueSerializationSchema((SerializationSchema)new EventDeSerializationSchema()).setTopic(kafkaTopic).build()).build());
        env.execute("State machine example Kafka events generator job");
    }

    private static double rpsFromSleep(int sleep, int parallelism) {
        return 1000.0 / (double)sleep * (double)parallelism;
    }
}

