/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.trogdor.workload;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.common.WorkerUtils;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.apache.kafka.trogdor.workload.ConsumeBenchSpec;
import org.apache.kafka.trogdor.workload.Histogram;
import org.apache.kafka.trogdor.workload.Throttle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumeBenchWorker
implements TaskWorker {
    private static final Logger log = LoggerFactory.getLogger(ConsumeBenchWorker.class);
    private static final int THROTTLE_PERIOD_MS = 100;
    private final String id;
    private final ConsumeBenchSpec spec;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private ScheduledExecutorService executor;
    private WorkerStatusTracker workerStatus;
    private StatusUpdater statusUpdater;
    private Future<?> statusUpdaterFuture;
    private KafkaFutureImpl<String> doneFuture;
    private ThreadSafeConsumer consumer;

    public ConsumeBenchWorker(String id, ConsumeBenchSpec spec) {
        this.id = id;
        this.spec = spec;
    }

    @Override
    public void start(Platform platform, WorkerStatusTracker status, KafkaFutureImpl<String> doneFuture) throws Exception {
        if (!this.running.compareAndSet(false, true)) {
            throw new IllegalStateException("ConsumeBenchWorker is already running.");
        }
        log.info("{}: Activating ConsumeBenchWorker with {}", (Object)this.id, (Object)this.spec);
        this.statusUpdater = new StatusUpdater();
        this.executor = Executors.newScheduledThreadPool(this.spec.threadsPerWorker() + 2, ThreadUtils.createThreadFactory("ConsumeBenchWorkerThread%d", false));
        this.statusUpdaterFuture = this.executor.scheduleAtFixedRate(this.statusUpdater, 1L, 1L, TimeUnit.MINUTES);
        this.workerStatus = status;
        this.doneFuture = doneFuture;
        this.executor.submit(new Prepare());
    }

    @Override
    public void stop(Platform platform) throws Exception {
        if (!this.running.compareAndSet(true, false)) {
            throw new IllegalStateException("ConsumeBenchWorker is not running.");
        }
        log.info("{}: Deactivating ConsumeBenchWorker.", (Object)this.id);
        this.doneFuture.complete((Object)"");
        this.executor.shutdownNow();
        this.executor.awaitTermination(1L, TimeUnit.DAYS);
        this.consumer.close();
        this.consumer = null;
        this.executor = null;
        this.statusUpdater = null;
        this.statusUpdaterFuture = null;
        this.workerStatus = null;
        this.doneFuture = null;
    }

    private static class ThreadSafeConsumer {
        private final KafkaConsumer<byte[], byte[]> consumer;
        private final String clientId;
        private final ReentrantLock consumerLock;
        private boolean closed = false;

        ThreadSafeConsumer(KafkaConsumer<byte[], byte[]> consumer, String clientId) {
            this.consumer = consumer;
            this.clientId = clientId;
            this.consumerLock = new ReentrantLock();
        }

        ConsumerRecords<byte[], byte[]> poll() {
            this.consumerLock.lock();
            try {
                ConsumerRecords consumerRecords = this.consumer.poll(Duration.ofMillis(50L));
                return consumerRecords;
            }
            finally {
                this.consumerLock.unlock();
            }
        }

        void close() {
            if (this.closed) {
                return;
            }
            this.consumerLock.lock();
            try {
                this.consumer.unsubscribe();
                Utils.closeQuietly(this.consumer, (String)"consumer");
                this.closed = true;
            }
            finally {
                this.consumerLock.unlock();
            }
        }

        void subscribe(Set<String> topics) {
            this.consumerLock.lock();
            try {
                this.consumer.subscribe(topics);
            }
            finally {
                this.consumerLock.unlock();
            }
        }

        void assign(Collection<TopicPartition> partitions) {
            this.consumerLock.lock();
            try {
                this.consumer.assign(partitions);
            }
            finally {
                this.consumerLock.unlock();
            }
        }

        List<String> assignedPartitions() {
            this.consumerLock.lock();
            try {
                List<String> list = this.consumer.assignment().stream().map(TopicPartition::toString).collect(Collectors.toList());
                return list;
            }
            finally {
                this.consumerLock.unlock();
            }
        }

        String clientId() {
            return this.clientId;
        }

        KafkaConsumer<byte[], byte[]> consumer() {
            return this.consumer;
        }
    }

    public static class StatusData {
        private final long totalMessagesReceived;
        private final List<String> assignedPartitions;
        private final long totalBytesReceived;
        private final long averageMessageSizeBytes;
        private final float averageLatencyMs;
        private final int p50LatencyMs;
        private final int p95LatencyMs;
        private final int p99LatencyMs;
        static final float[] PERCENTILES = new float[]{0.5f, 0.95f, 0.99f};

        @JsonCreator
        StatusData(@JsonProperty(value="assignedPartitions") List<String> assignedPartitions, @JsonProperty(value="totalMessagesReceived") long totalMessagesReceived, @JsonProperty(value="totalBytesReceived") long totalBytesReceived, @JsonProperty(value="averageMessageSizeBytes") long averageMessageSizeBytes, @JsonProperty(value="averageLatencyMs") float averageLatencyMs, @JsonProperty(value="p50LatencyMs") int p50latencyMs, @JsonProperty(value="p95LatencyMs") int p95latencyMs, @JsonProperty(value="p99LatencyMs") int p99latencyMs) {
            this.assignedPartitions = assignedPartitions;
            this.totalMessagesReceived = totalMessagesReceived;
            this.totalBytesReceived = totalBytesReceived;
            this.averageMessageSizeBytes = averageMessageSizeBytes;
            this.averageLatencyMs = averageLatencyMs;
            this.p50LatencyMs = p50latencyMs;
            this.p95LatencyMs = p95latencyMs;
            this.p99LatencyMs = p99latencyMs;
        }

        @JsonProperty
        public List<String> assignedPartitions() {
            return this.assignedPartitions;
        }

        @JsonProperty
        public long totalMessagesReceived() {
            return this.totalMessagesReceived;
        }

        @JsonProperty
        public long totalBytesReceived() {
            return this.totalBytesReceived;
        }

        @JsonProperty
        public long averageMessageSizeBytes() {
            return this.averageMessageSizeBytes;
        }

        @JsonProperty
        public float averageLatencyMs() {
            return this.averageLatencyMs;
        }

        @JsonProperty
        public int p50LatencyMs() {
            return this.p50LatencyMs;
        }

        @JsonProperty
        public int p95LatencyMs() {
            return this.p95LatencyMs;
        }

        @JsonProperty
        public int p99LatencyMs() {
            return this.p99LatencyMs;
        }
    }

    public class ConsumeStatusUpdater
    implements Runnable {
        private final Histogram latencyHistogram;
        private final Histogram messageSizeHistogram;
        private final ThreadSafeConsumer consumer;

        ConsumeStatusUpdater(Histogram latencyHistogram, Histogram messageSizeHistogram, ThreadSafeConsumer consumer) {
            this.latencyHistogram = latencyHistogram;
            this.messageSizeHistogram = messageSizeHistogram;
            this.consumer = consumer;
        }

        @Override
        public void run() {
            try {
                this.update();
            }
            catch (Exception e) {
                WorkerUtils.abort(log, "ConsumeStatusUpdater", e, (KafkaFutureImpl<String>)ConsumeBenchWorker.this.doneFuture);
            }
        }

        StatusData update() {
            Histogram.Summary latSummary = this.latencyHistogram.summarize(StatusData.PERCENTILES);
            Histogram.Summary msgSummary = this.messageSizeHistogram.summarize(StatusData.PERCENTILES);
            StatusData statusData = new StatusData(this.consumer.assignedPartitions(), latSummary.numSamples(), (long)((float)msgSummary.numSamples() * msgSummary.average()), (long)msgSummary.average(), latSummary.average(), latSummary.percentiles().get(0).value(), latSummary.percentiles().get(1).value(), latSummary.percentiles().get(2).value());
            ConsumeBenchWorker.this.statusUpdater.updateConsumeStatus(this.consumer.clientId(), statusData);
            log.info("Status={}", (Object)JsonUtil.toJsonString(statusData));
            return statusData;
        }
    }

    class StatusUpdater
    implements Runnable {
        final Map<String, JsonNode> statuses = new HashMap<String, JsonNode>();

        StatusUpdater() {
        }

        @Override
        public void run() {
            try {
                this.update();
            }
            catch (Exception e) {
                WorkerUtils.abort(log, "ConsumeStatusUpdater", e, (KafkaFutureImpl<String>)ConsumeBenchWorker.this.doneFuture);
            }
        }

        synchronized void update() {
            ConsumeBenchWorker.this.workerStatus.update(JsonUtil.JSON_SERDE.valueToTree(this.statuses));
        }

        synchronized void updateConsumeStatus(String clientId, StatusData status) {
            this.statuses.put(clientId, JsonUtil.JSON_SERDE.valueToTree((Object)status));
        }
    }

    public class CloseStatusUpdater
    implements Runnable {
        private final List<Future<Void>> consumeTasks;

        CloseStatusUpdater(List<Future<Void>> consumeTasks) {
            this.consumeTasks = consumeTasks;
        }

        @Override
        public void run() {
            while (!this.consumeTasks.stream().allMatch(Future::isDone)) {
                try {
                    Thread.sleep(60000L);
                }
                catch (InterruptedException e) {
                    log.debug("{} was interrupted. Closing...", (Object)this.getClass().getName());
                    break;
                }
            }
            ConsumeBenchWorker.this.statusUpdaterFuture.cancel(false);
            ConsumeBenchWorker.this.statusUpdater.update();
        }
    }

    public class ConsumeMessages
    implements Callable<Void> {
        private final Histogram latencyHistogram = new Histogram(5000);
        private final Histogram messageSizeHistogram = new Histogram(0x200000);
        private final Future<?> statusUpdaterFuture;
        private final Throttle throttle;
        private final String clientId;
        private final ThreadSafeConsumer consumer;

        private ConsumeMessages(ThreadSafeConsumer consumer) {
            this.clientId = consumer.clientId();
            this.statusUpdaterFuture = ConsumeBenchWorker.this.executor.scheduleAtFixedRate(new ConsumeStatusUpdater(this.latencyHistogram, this.messageSizeHistogram, consumer), 1L, 1L, TimeUnit.MINUTES);
            int perPeriod = ConsumeBenchWorker.this.spec.targetMessagesPerSec() <= 0 ? Integer.MAX_VALUE : WorkerUtils.perSecToPerPeriod(ConsumeBenchWorker.this.spec.targetMessagesPerSec(), 100L);
            this.throttle = new Throttle(perPeriod, 100);
            this.consumer = consumer;
        }

        ConsumeMessages(ThreadSafeConsumer consumer, Set<String> topics) {
            this(consumer);
            log.info("Will consume from topics {} via dynamic group assignment.", topics);
            this.consumer.subscribe(topics);
        }

        ConsumeMessages(ThreadSafeConsumer consumer, List<TopicPartition> partitions) {
            this(consumer);
            log.info("Will consume from topic partitions {} via manual assignment.", partitions);
            this.consumer.assign(partitions);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Void call() throws Exception {
            long startTimeMs;
            long messagesConsumed = 0L;
            long bytesConsumed = 0L;
            long startBatchMs = startTimeMs = Time.SYSTEM.milliseconds();
            long maxMessages = ConsumeBenchWorker.this.spec.maxMessages();
            try {
                while (messagesConsumed < maxMessages) {
                    ConsumerRecords<byte[], byte[]> records = this.consumer.poll();
                    if (records.isEmpty()) continue;
                    long endBatchMs = Time.SYSTEM.milliseconds();
                    long elapsedBatchMs = endBatchMs - startBatchMs;
                    for (ConsumerRecord record : records) {
                        ++messagesConsumed;
                        long messageBytes = 0L;
                        if (record.key() != null) {
                            messageBytes += (long)record.serializedKeySize();
                        }
                        if (record.value() != null) {
                            messageBytes += (long)record.serializedValueSize();
                        }
                        this.latencyHistogram.add(elapsedBatchMs);
                        this.messageSizeHistogram.add(messageBytes);
                        bytesConsumed += messageBytes;
                        if (messagesConsumed >= maxMessages) break;
                        this.throttle.increment();
                    }
                    startBatchMs = Time.SYSTEM.milliseconds();
                }
                this.statusUpdaterFuture.cancel(false);
            }
            catch (Exception e) {
                try {
                    WorkerUtils.abort(log, "ConsumeRecords", e, (KafkaFutureImpl<String>)ConsumeBenchWorker.this.doneFuture);
                    this.statusUpdaterFuture.cancel(false);
                }
                catch (Throwable throwable) {
                    this.statusUpdaterFuture.cancel(false);
                    StatusData statusData = new ConsumeStatusUpdater(this.latencyHistogram, this.messageSizeHistogram, this.consumer).update();
                    long curTimeMs = Time.SYSTEM.milliseconds();
                    log.info("{} Consumed total number of messages={}, bytes={} in {} ms.  status: {}", new Object[]{this.clientId, messagesConsumed, bytesConsumed, curTimeMs - startTimeMs, statusData});
                    throw throwable;
                }
                StatusData statusData = new ConsumeStatusUpdater(this.latencyHistogram, this.messageSizeHistogram, this.consumer).update();
                long curTimeMs = Time.SYSTEM.milliseconds();
                log.info("{} Consumed total number of messages={}, bytes={} in {} ms.  status: {}", new Object[]{this.clientId, messagesConsumed, bytesConsumed, curTimeMs - startTimeMs, statusData});
            }
            StatusData statusData = new ConsumeStatusUpdater(this.latencyHistogram, this.messageSizeHistogram, this.consumer).update();
            long curTimeMs = Time.SYSTEM.milliseconds();
            log.info("{} Consumed total number of messages={}, bytes={} in {} ms.  status: {}", new Object[]{this.clientId, messagesConsumed, bytesConsumed, curTimeMs - startTimeMs, statusData});
            ConsumeBenchWorker.this.doneFuture.complete((Object)"");
            this.consumer.close();
            return null;
        }
    }

    public class Prepare
    implements Runnable {
        @Override
        public void run() {
            try {
                ArrayList<Future<Void>> consumeTasks = new ArrayList<Future<Void>>();
                for (ConsumeMessages task : this.consumeTasks()) {
                    consumeTasks.add(ConsumeBenchWorker.this.executor.submit(task));
                }
                ConsumeBenchWorker.this.executor.submit(new CloseStatusUpdater(consumeTasks));
            }
            catch (Throwable e) {
                WorkerUtils.abort(log, "Prepare", e, (KafkaFutureImpl<String>)ConsumeBenchWorker.this.doneFuture);
            }
        }

        private List<ConsumeMessages> consumeTasks() {
            ArrayList<ConsumeMessages> tasks = new ArrayList<ConsumeMessages>();
            String consumerGroup = this.consumerGroup();
            int consumerCount = ConsumeBenchWorker.this.spec.threadsPerWorker();
            Map<String, List<TopicPartition>> partitionsByTopic = ConsumeBenchWorker.this.spec.materializeTopics();
            boolean toUseGroupPartitionAssignment = partitionsByTopic.values().stream().allMatch(List::isEmpty);
            if (!toUseGroupPartitionAssignment && !this.toUseRandomConsumeGroup() && consumerCount > 1) {
                throw new ConfigException("You may not specify an explicit partition assignment when using multiple consumers in the same group.Please leave the consumer group unset, specify topics instead of partitions or use a single consumer.");
            }
            ConsumeBenchWorker.this.consumer = this.consumer(consumerGroup, this.clientId(0));
            if (toUseGroupPartitionAssignment) {
                Set<String> topics = partitionsByTopic.keySet();
                tasks.add(new ConsumeMessages(ConsumeBenchWorker.this.consumer, topics));
                for (int i = 0; i < consumerCount - 1; ++i) {
                    tasks.add(new ConsumeMessages(this.consumer(this.consumerGroup(), this.clientId(i + 1)), topics));
                }
            } else {
                List<TopicPartition> partitions = this.populatePartitionsByTopic(ConsumeBenchWorker.this.consumer.consumer(), partitionsByTopic).values().stream().flatMap(Collection::stream).collect(Collectors.toList());
                tasks.add(new ConsumeMessages(ConsumeBenchWorker.this.consumer, partitions));
                for (int i = 0; i < consumerCount - 1; ++i) {
                    tasks.add(new ConsumeMessages(this.consumer(this.consumerGroup(), this.clientId(i + 1)), partitions));
                }
            }
            return tasks;
        }

        private String clientId(int idx) {
            return String.format("consumer.%s-%d", ConsumeBenchWorker.this.id, idx);
        }

        private ThreadSafeConsumer consumer(String consumerGroup, String clientId) {
            Properties props = new Properties();
            props.put("bootstrap.servers", ConsumeBenchWorker.this.spec.bootstrapServers());
            props.put("client.id", clientId);
            props.put("group.id", consumerGroup);
            props.put("auto.offset.reset", "earliest");
            props.put("max.poll.interval.ms", (Object)100000);
            WorkerUtils.addConfigsToProperties(props, ConsumeBenchWorker.this.spec.commonClientConf(), ConsumeBenchWorker.this.spec.consumerConf());
            return new ThreadSafeConsumer((KafkaConsumer<byte[], byte[]>)new KafkaConsumer(props, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer()), clientId);
        }

        private String consumerGroup() {
            return this.toUseRandomConsumeGroup() ? "consume-bench-" + UUID.randomUUID().toString() : ConsumeBenchWorker.this.spec.consumerGroup();
        }

        private boolean toUseRandomConsumeGroup() {
            return ConsumeBenchWorker.this.spec.consumerGroup().isEmpty();
        }

        private Map<String, List<TopicPartition>> populatePartitionsByTopic(KafkaConsumer<byte[], byte[]> consumer, Map<String, List<TopicPartition>> materializedTopics) {
            for (Map.Entry<String, List<TopicPartition>> entry : materializedTopics.entrySet()) {
                String topicName = entry.getKey();
                List<TopicPartition> partitions = entry.getValue();
                if (partitions.isEmpty()) {
                    List fetchedPartitions = consumer.partitionsFor(topicName).stream().map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition())).collect(Collectors.toList());
                    partitions.addAll(fetchedPartitions);
                }
                materializedTopics.put(topicName, partitions);
            }
            return materializedTopics;
        }
    }
}

