/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.base.sink.writer;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.ResultHandler;
import org.apache.flink.connector.base.sink.writer.TestSinkInitContextAnyThreadMailbox;
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class AsyncSinkWriterThrottlingTest {
    AsyncSinkWriterThrottlingTest() {
    }

    @Test
    void testSinkThroughputShouldThrottleToHalfBatchSize() throws Exception {
        int maxBatchSize = 32;
        int maxInFlightRequest = 10;
        int numberOfBatchesToSend = 1000;
        TestSinkInitContextAnyThreadMailbox context = new TestSinkInitContextAnyThreadMailbox();
        ThrottlingWriter writer = new ThrottlingWriter((ElementConverter<String, Long>)(ElementConverter & Serializable)(elem, ctx) -> Long.valueOf(elem), context, maxBatchSize, maxInFlightRequest);
        for (int i = 0; i < numberOfBatchesToSend * maxBatchSize; ++i) {
            writer.write(String.valueOf(i));
        }
        Assertions.assertThat((int)writer.getInflightMessagesLimit()).isGreaterThanOrEqualTo(maxBatchSize / 4);
        Assertions.assertThat((int)writer.getInflightMessagesLimit()).isLessThanOrEqualTo(maxBatchSize / 2 + 10);
    }

    private static class ThrottlingWriter
    extends AsyncSinkWriter<String, Long> {
        private final ProcessingTimeService timeService;
        private final int maxBatchSize;
        private final Queue<Tuple2<Long, Integer>> requestsData;
        private long sizeOfLast100ms;
        private int inflightMessagesLimit;

        public ThrottlingWriter(ElementConverter<String, Long> elementConverter, WriterInitContext context, int maxBatchSize, int maxInFlightRequests) {
            super(elementConverter, context, AsyncSinkWriterConfiguration.builder().setMaxBatchSize(maxBatchSize).setMaxBatchSizeInBytes(10000L).setMaxInFlightRequests(maxInFlightRequests).setMaxBufferedRequests(10000).setMaxTimeInBufferMS(100L).setMaxRecordSizeInBytes(1000L).build(), Collections.emptyList());
            this.maxBatchSize = maxBatchSize;
            this.timeService = context.getProcessingTimeService();
            this.requestsData = new ArrayDeque<Tuple2<Long, Integer>>();
            this.inflightMessagesLimit = maxBatchSize;
            this.sizeOfLast100ms = 0L;
        }

        public void write(String element) throws IOException, InterruptedException {
            super.write((Object)element, null);
        }

        public int getInflightMessagesLimit() {
            return this.inflightMessagesLimit;
        }

        protected void submitRequestEntries(List<Long> requestEntries, ResultHandler<Long> resultHandler) {
            long currentProcessingTime = this.timeService.getCurrentProcessingTime();
            this.inflightMessagesLimit = requestEntries.size();
            this.addRequestDataToQueue(requestEntries.size(), currentProcessingTime);
            if (this.sizeOfLast100ms > (long)this.maxBatchSize && requestEntries.size() > 1) {
                resultHandler.retryForEntries(requestEntries);
            } else {
                resultHandler.complete();
            }
        }

        protected long getSizeInBytes(Long requestEntry) {
            return 8L;
        }

        private void addRequestDataToQueue(int size, long time) {
            this.requestsData.add((Tuple2<Long, Integer>)Tuple2.of((Object)time, (Object)size));
            this.sizeOfLast100ms += (long)size;
            while (!this.requestsData.isEmpty() && (Long)this.requestsData.peek().f0 < time - 100L) {
                this.sizeOfLast100ms -= (long)((Integer)this.requestsData.remove().f1).intValue();
            }
        }
    }
}

