/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;

import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher;
import org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;

@Internal
public class AdaptivePollingRecordPublisher
extends PollingRecordPublisher {
    private static final long KINESIS_SHARD_BYTES_PER_SECOND_LIMIT = 0x200000L;
    private int lastRecordBatchSize = 0;
    private long lastRecordBatchSizeInBytes = 0L;
    private long processingStartTimeNanos = System.nanoTime();
    private int maxNumberOfRecordsPerFetch;
    private final PollingRecordPublisherMetricsReporter metricsReporter;

    AdaptivePollingRecordPublisher(StartingPosition startingPosition, StreamShardHandle subscribedShard, PollingRecordPublisherMetricsReporter metricsReporter, KinesisProxyInterface kinesisProxy, int maxNumberOfRecordsPerFetch, long fetchIntervalMillis) throws InterruptedException {
        super(startingPosition, subscribedShard, metricsReporter, kinesisProxy, maxNumberOfRecordsPerFetch, fetchIntervalMillis);
        this.maxNumberOfRecordsPerFetch = maxNumberOfRecordsPerFetch;
        this.metricsReporter = metricsReporter;
    }

    @Override
    public RecordPublisher.RecordPublisherRunResult run(RecordPublisher.RecordBatchConsumer consumer) throws InterruptedException {
        RecordPublisher.RecordPublisherRunResult result = super.run(batch -> {
            SequenceNumber latestSequenceNumber = consumer.accept(batch);
            this.lastRecordBatchSize = batch.getDeaggregatedRecordSize();
            this.lastRecordBatchSizeInBytes = batch.getTotalSizeInBytes();
            return latestSequenceNumber;
        }, this.maxNumberOfRecordsPerFetch);
        long endTimeNanos = System.nanoTime();
        long runLoopTimeNanos = endTimeNanos - this.processingStartTimeNanos;
        this.maxNumberOfRecordsPerFetch = this.adaptRecordsToRead(runLoopTimeNanos, this.lastRecordBatchSize, this.lastRecordBatchSizeInBytes, this.maxNumberOfRecordsPerFetch);
        this.processingStartTimeNanos = endTimeNanos;
        return result;
    }

    private int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, long recordBatchSizeBytes, int maxNumberOfRecordsPerFetch) {
        if (numRecords != 0 && runLoopTimeNanos != 0L) {
            long averageRecordSizeBytes = recordBatchSizeBytes / (long)numRecords;
            double loopFrequencyHz = 1.0E9 / (double)runLoopTimeNanos;
            double bytesPerRead = 2097152.0 / loopFrequencyHz;
            maxNumberOfRecordsPerFetch = (int)(bytesPerRead / (double)averageRecordSizeBytes);
            maxNumberOfRecordsPerFetch = Math.max(1, Math.min(maxNumberOfRecordsPerFetch, 10000));
            this.metricsReporter.setLoopFrequencyHz(loopFrequencyHz);
            this.metricsReporter.setBytesPerRead(bytesPerRead);
        }
        return maxNumberOfRecordsPerFetch;
    }
}

