/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.base.sink.writer;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.TestSinkInitContextAnyThreadMailbox;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.assertj.core.api.Assertions;
import org.junit.Test;

public class AsyncSinkWriterThrottlingTest {
    @Test
    public void testSinkThroughputShouldThrottleToHalfBatchSize() throws Exception {
        int maxBatchSize = 32;
        int maxInFlightRequest = 10;
        int numberOfBatchesToSend = 1000;
        Queue<String> testRequests = this.getTestRequestsBuffer();
        TestSinkInitContextAnyThreadMailbox context = new TestSinkInitContextAnyThreadMailbox();
        TestProcessingTimeService tpts = context.getTestProcessingTimeService();
        ThrottlingWriter writer = new ThrottlingWriter((ElementConverter<String, Long>)(ElementConverter & Serializable)(elem, ctx) -> Long.valueOf(elem), context, maxBatchSize, maxInFlightRequest);
        long currentTime = 0L;
        tpts.setCurrentTime(currentTime);
        for (int i = 0; i < numberOfBatchesToSend; ++i) {
            this.removeBatchAndSend(writer, testRequests, maxBatchSize);
            tpts.setCurrentTime(currentTime + 50L);
            currentTime += 50L;
        }
        Assertions.assertThat((int)writer.getInflightMessagesLimit()).isGreaterThanOrEqualTo(maxBatchSize / 4);
        Assertions.assertThat((int)writer.getInflightMessagesLimit()).isLessThanOrEqualTo(maxBatchSize / 2 + 10);
    }

    private Queue<String> getTestRequestsBuffer() {
        return LongStream.range(1L, 1000000L).mapToObj(Long::toString).collect(Collectors.toCollection(ArrayDeque::new));
    }

    private void removeBatchAndSend(ThrottlingWriter writer, Queue<String> buffer, int batchSize) throws IOException, InterruptedException {
        for (int i = 0; i < Math.min(batchSize, buffer.size()); ++i) {
            writer.write(buffer.remove());
        }
    }

    private static class ThrottlingWriter
    extends AsyncSinkWriter<String, Long> {
        private final ProcessingTimeService timeService;
        private final int maxBatchSize;
        private final Queue<Tuple2<Long, Integer>> requestsData;
        private long sizeOfLast100ms;
        private int inflightMessagesLimit;

        public ThrottlingWriter(ElementConverter<String, Long> elementConverter, WriterInitContext context, int maxBatchSize, int maxInFlightRequests) {
            this(elementConverter, (Sink.InitContext)new Sink.InitContextWrapper(context), maxBatchSize, maxInFlightRequests);
        }

        public ThrottlingWriter(ElementConverter<String, Long> elementConverter, Sink.InitContext context, int maxBatchSize, int maxInFlightRequests) {
            super(elementConverter, context, maxBatchSize, maxInFlightRequests, 10000, 10000L, 100L, 1000L);
            this.maxBatchSize = maxBatchSize;
            this.timeService = context.getProcessingTimeService();
            this.requestsData = new ArrayDeque<Tuple2<Long, Integer>>();
            this.inflightMessagesLimit = maxBatchSize;
            this.sizeOfLast100ms = 0L;
        }

        public void write(String element) throws IOException, InterruptedException {
            super.write((Object)element, null);
        }

        public int getInflightMessagesLimit() {
            return this.inflightMessagesLimit;
        }

        protected void submitRequestEntries(List<Long> requestEntries, Consumer<List<Long>> requestResult) {
            long currentProcessingTime = this.timeService.getCurrentProcessingTime();
            this.inflightMessagesLimit = requestEntries.size();
            this.addRequestDataToQueue(requestEntries.size(), currentProcessingTime);
            if (this.sizeOfLast100ms > (long)this.maxBatchSize && requestEntries.size() > 1) {
                requestResult.accept(requestEntries);
            } else {
                requestResult.accept(new ArrayList());
            }
        }

        protected long getSizeInBytes(Long requestEntry) {
            return 8L;
        }

        private void addRequestDataToQueue(int size, long time) {
            this.requestsData.add((Tuple2<Long, Integer>)Tuple2.of((Object)time, (Object)size));
            this.sizeOfLast100ms += (long)size;
            while (!this.requestsData.isEmpty() && (Long)this.requestsData.peek().f0 < time - 100L) {
                this.sizeOfLast100ms -= (long)((Integer)this.requestsData.remove().f1).intValue();
            }
        }
    }
}

