/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.producer.internals;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
import org.apache.kafka.clients.producer.internals.ProducerBatch;
import org.apache.kafka.clients.producer.internals.ProducerIdAndEpoch;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.DelayedReceive;
import org.apache.kafka.test.MockSelector;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SenderTest {
    private static final int MAX_REQUEST_SIZE = 0x100000;
    private static final short ACKS_ALL = -1;
    private static final int MAX_RETRIES = 0;
    private static final String CLIENT_ID = "clientId";
    private static final String METRIC_GROUP = "producer-metrics";
    private static final double EPS = 1.0E-4;
    private static final int MAX_BLOCK_TIMEOUT = 1000;
    private static final int REQUEST_TIMEOUT = 1000;
    private TopicPartition tp0 = new TopicPartition("test", 0);
    private TopicPartition tp1 = new TopicPartition("test", 1);
    private MockTime time = new MockTime();
    private MockClient client = new MockClient(this.time);
    private int batchSize = 16384;
    private Metadata metadata = new Metadata(0L, Long.MAX_VALUE, true, true, new ClusterResourceListeners());
    private ApiVersions apiVersions = new ApiVersions();
    private Cluster cluster = TestUtils.singletonCluster("test", 2);
    private Metrics metrics = null;
    private RecordAccumulator accumulator = null;
    private Sender sender = null;

    @Before
    public void setup() {
        this.client.setNode((Node)this.cluster.nodes().get(0));
        this.setupWithTransactionState(null);
    }

    @After
    public void tearDown() {
        this.metrics.close();
    }

    @Test
    public void testSimple() throws Exception {
        long offset = 0L;
        FutureRecordMetadata future = this.accumulator.append((TopicPartition)this.tp0, (long)0L, (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals((String)"We should have a single produce request in flight.", (long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertTrue((boolean)this.client.hasInFlightRequests());
        this.client.respond((AbstractResponse)this.produceResponse(this.tp0, offset, Errors.NONE, 0));
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals((String)"All requests completed.", (long)0L, (long)this.client.inFlightRequestCount());
        Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((String)"Request should be completed", (boolean)future.isDone());
        Assert.assertEquals((long)offset, (long)((RecordMetadata)future.get()).offset());
    }

    @Test
    public void testMessageFormatDownConversion() throws Exception {
        long offset = 0L;
        this.apiVersions.update("0", NodeApiVersions.create());
        FutureRecordMetadata future = this.accumulator.append((TopicPartition)this.tp0, (long)0L, (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.apiVersions.update("0", NodeApiVersions.create(Collections.singleton(new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id, 0, 2))));
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                ProduceRequest request = (ProduceRequest)body;
                if (request.version() != 2) {
                    return false;
                }
                MemoryRecords records = (MemoryRecords)request.partitionRecordsOrFail().get(SenderTest.this.tp0);
                return records != null && records.sizeInBytes() > 0 && records.hasMatchingMagic((byte)1);
            }
        }, (AbstractResponse)this.produceResponse(this.tp0, offset, Errors.NONE, 0));
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((String)"Request should be completed", (boolean)future.isDone());
        Assert.assertEquals((long)offset, (long)((RecordMetadata)future.get()).offset());
    }

    @Test
    public void testDownConversionForMismatchedMagicValues() throws Exception {
        long offset = 0L;
        this.apiVersions.update("0", NodeApiVersions.create());
        FutureRecordMetadata future1 = this.accumulator.append((TopicPartition)this.tp0, (long)0L, (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.apiVersions.update("0", NodeApiVersions.create(Collections.singleton(new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id, 0, 2))));
        FutureRecordMetadata future2 = this.accumulator.append((TopicPartition)this.tp1, (long)0L, (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.apiVersions.update("0", NodeApiVersions.create());
        ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(Errors.NONE, offset, -1L);
        HashMap<TopicPartition, ProduceResponse.PartitionResponse> partResp = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
        partResp.put(this.tp0, resp);
        partResp.put(this.tp1, resp);
        ProduceResponse produceResponse = new ProduceResponse(partResp, 0);
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                ProduceRequest request = (ProduceRequest)body;
                if (request.version() != 2) {
                    return false;
                }
                Map recordsMap = request.partitionRecordsOrFail();
                if (recordsMap.size() != 2) {
                    return false;
                }
                for (MemoryRecords records : recordsMap.values()) {
                    if (records != null && records.sizeInBytes() != 0 && records.hasMatchingMagic((byte)1)) continue;
                    return false;
                }
                return true;
            }
        }, (AbstractResponse)produceResponse);
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((String)"Request should be completed", (boolean)future1.isDone());
        Assert.assertTrue((String)"Request should be completed", (boolean)future2.isDone());
    }

    @Test
    public void testQuotaMetrics() throws Exception {
        MockSelector selector = new MockSelector(this.time);
        Sensor throttleTimeSensor = Sender.throttleTimeSensor((Metrics)this.metrics);
        Cluster cluster = TestUtils.singletonCluster("test", 1);
        Node node = (Node)cluster.nodes().get(0);
        NetworkClient client = new NetworkClient((Selectable)selector, this.metadata, "mock", Integer.MAX_VALUE, 1000L, 1000L, 65536, 65536, 1000, (Time)this.time, true, new ApiVersions(), throttleTimeSensor);
        short apiVersionsResponseVersion = ApiKeys.API_VERSIONS.latestVersion();
        ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse((int)400, (byte)2).serialize(apiVersionsResponseVersion, new ResponseHeader(0));
        selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));
        while (!client.ready(node, this.time.milliseconds())) {
            client.poll(1L, this.time.milliseconds());
        }
        selector.clear();
        for (int i = 1; i <= 3; ++i) {
            int throttleTimeMs = 100 * i;
            ProduceRequest.Builder builder = new ProduceRequest.Builder(2, 1, 1000, Collections.emptyMap());
            ClientRequest request = client.newClientRequest(node.idString(), (AbstractRequest.Builder)builder, this.time.milliseconds(), true, null);
            client.send(request, this.time.milliseconds());
            client.poll(1L, this.time.milliseconds());
            ProduceResponse response = this.produceResponse(this.tp0, i, Errors.NONE, throttleTimeMs);
            buffer = response.serialize(ApiKeys.PRODUCE.latestVersion(), new ResponseHeader(request.correlationId()));
            selector.completeReceive(new NetworkReceive(node.idString(), buffer));
            client.poll(1L, this.time.milliseconds());
            selector.clear();
        }
        Map allMetrics = this.metrics.metrics();
        KafkaMetric avgMetric = (KafkaMetric)allMetrics.get(this.metrics.metricName("produce-throttle-time-avg", METRIC_GROUP, ""));
        KafkaMetric maxMetric = (KafkaMetric)allMetrics.get(this.metrics.metricName("produce-throttle-time-max", METRIC_GROUP, ""));
        Assert.assertEquals((double)250.0, (double)avgMetric.value(), (double)1.0E-4);
        Assert.assertEquals((double)400.0, (double)maxMetric.value(), (double)1.0E-4);
        client.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRetries() throws Exception {
        int maxRetries = 1;
        try (Metrics m = new Metrics();){
            Sender sender = new Sender((KafkaClient)this.client, this.metadata, this.accumulator, false, 0x100000, -1, maxRetries, m, (Time)this.time, 1000, 50L, null, this.apiVersions);
            FutureRecordMetadata future = this.accumulator.append((TopicPartition)this.tp0, (long)0L, (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
            sender.run(this.time.milliseconds());
            sender.run(this.time.milliseconds());
            String id = this.client.requests().peek().destination();
            Node node = new Node(Integer.parseInt(id), "localhost", 0);
            Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
            Assert.assertTrue((boolean)this.client.hasInFlightRequests());
            Assert.assertTrue((String)"Client ready status should be true", (boolean)this.client.isReady(node, 0L));
            this.client.disconnect(id);
            Assert.assertEquals((long)0L, (long)this.client.inFlightRequestCount());
            Assert.assertFalse((boolean)this.client.hasInFlightRequests());
            Assert.assertFalse((String)"Client ready status should be false", (boolean)this.client.isReady(node, 0L));
            sender.run(this.time.milliseconds());
            sender.run(this.time.milliseconds());
            sender.run(this.time.milliseconds());
            Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
            Assert.assertTrue((boolean)this.client.hasInFlightRequests());
            long offset = 0L;
            this.client.respond((AbstractResponse)this.produceResponse(this.tp0, offset, Errors.NONE, 0));
            sender.run(this.time.milliseconds());
            Assert.assertTrue((String)"Request should have retried and completed", (boolean)future.isDone());
            Assert.assertEquals((long)offset, (long)((RecordMetadata)future.get()).offset());
            future = this.accumulator.append((TopicPartition)this.tp0, (long)0L, (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
            sender.run(this.time.milliseconds());
            for (int i = 0; i < maxRetries + 1; ++i) {
                this.client.disconnect(this.client.requests().peek().destination());
                sender.run(this.time.milliseconds());
                sender.run(this.time.milliseconds());
                sender.run(this.time.milliseconds());
            }
            sender.run(this.time.milliseconds());
            this.completedWithError((Future<RecordMetadata>)future, Errors.NETWORK_EXCEPTION);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSendInOrder() throws Exception {
        int maxRetries = 1;
        try (Metrics m = new Metrics();){
            Sender sender = new Sender((KafkaClient)this.client, this.metadata, this.accumulator, true, 0x100000, -1, maxRetries, m, (Time)this.time, 1000, 50L, null, this.apiVersions);
            Cluster cluster1 = TestUtils.clusterWith(2, "test", 2);
            this.metadata.update(cluster1, Collections.emptySet(), this.time.milliseconds());
            TopicPartition tp2 = new TopicPartition("test", 1);
            this.accumulator.append(tp2, 0L, "key1".getBytes(), "value1".getBytes(), null, null, 1000L);
            sender.run(this.time.milliseconds());
            sender.run(this.time.milliseconds());
            String id = this.client.requests().peek().destination();
            Assert.assertEquals((Object)ApiKeys.PRODUCE, (Object)this.client.requests().peek().requestBuilder().apiKey());
            Node node = new Node(Integer.parseInt(id), "localhost", 0);
            Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
            Assert.assertTrue((boolean)this.client.hasInFlightRequests());
            Assert.assertTrue((String)"Client ready status should be true", (boolean)this.client.isReady(node, 0L));
            this.time.sleep(900L);
            this.accumulator.append(tp2, 0L, "key2".getBytes(), "value2".getBytes(), null, null, 1000L);
            Cluster cluster2 = TestUtils.singletonCluster("test", 2);
            this.metadata.update(cluster2, Collections.emptySet(), this.time.milliseconds());
            sender.run(this.time.milliseconds());
            Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
            Assert.assertTrue((boolean)this.client.hasInFlightRequests());
        }
    }

    @Test
    public void testAppendInExpiryCallback() throws InterruptedException {
        int messagesPerBatch = 10;
        final AtomicInteger expiryCallbackCount = new AtomicInteger(0);
        final AtomicReference unexpectedException = new AtomicReference();
        final byte[] key = "key".getBytes();
        final byte[] value = "value".getBytes();
        long maxBlockTimeMs = 1000L;
        Callback callback = new Callback(){

            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception instanceof TimeoutException) {
                    expiryCallbackCount.incrementAndGet();
                    try {
                        SenderTest.this.accumulator.append(SenderTest.this.tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 1000L);
                    }
                    catch (InterruptedException e) {
                        throw new RuntimeException("Unexpected interruption", e);
                    }
                } else if (exception != null) {
                    unexpectedException.compareAndSet(null, exception);
                }
            }
        };
        for (int i = 0; i < messagesPerBatch; ++i) {
            this.accumulator.append(this.tp1, 0L, key, value, null, callback, 1000L);
        }
        this.time.sleep(10000L);
        Node clusterNode = (Node)this.cluster.nodes().get(0);
        this.client.disconnect(clusterNode.idString());
        this.client.blackout(clusterNode, 100L);
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals((String)"Callbacks not invoked for expiry", (long)messagesPerBatch, (long)expiryCallbackCount.get());
        Assert.assertNull((String)"Unexpected exception", unexpectedException.get());
        Assert.assertTrue((boolean)this.accumulator.batches().containsKey(this.tp1));
        Assert.assertEquals((long)1L, (long)((Deque)this.accumulator.batches().get(this.tp1)).size());
        Assert.assertEquals((long)messagesPerBatch, (long)((ProducerBatch)((Deque)this.accumulator.batches().get((Object)this.tp1)).peekFirst()).recordCount);
    }

    @Test
    public void testMetadataTopicExpiry() throws Exception {
        long offset = 0L;
        this.metadata.update(Cluster.empty(), Collections.emptySet(), this.time.milliseconds());
        FutureRecordMetadata future = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((String)"Topic not added to metadata", (boolean)this.metadata.containsTopic(this.tp0.topic()));
        this.metadata.update(this.cluster, Collections.emptySet(), this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        this.client.respond((AbstractResponse)this.produceResponse(this.tp0, offset++, Errors.NONE, 0));
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals((String)"Request completed.", (long)0L, (long)this.client.inFlightRequestCount());
        Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((String)"Request should be completed", (boolean)future.isDone());
        Assert.assertTrue((String)"Topic not retained in metadata list", (boolean)this.metadata.containsTopic(this.tp0.topic()));
        this.time.sleep(300000L);
        this.metadata.update(Cluster.empty(), Collections.emptySet(), this.time.milliseconds());
        Assert.assertFalse((String)"Unused topic has not been expired", (boolean)this.metadata.containsTopic(this.tp0.topic()));
        future = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((String)"Topic not added to metadata", (boolean)this.metadata.containsTopic(this.tp0.topic()));
        this.metadata.update(this.cluster, Collections.emptySet(), this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        this.client.respond((AbstractResponse)this.produceResponse(this.tp0, offset++, Errors.NONE, 0));
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals((String)"Request completed.", (long)0L, (long)this.client.inFlightRequestCount());
        Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((String)"Request should be completed", (boolean)future.isDone());
    }

    @Test
    public void testInitProducerIdRequest() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.client.setNode(new Node(1, "localhost", 33343));
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue((boolean)transactionManager.hasProducerId());
        Assert.assertEquals((long)343434L, (long)transactionManager.producerIdAndEpoch().producerId);
        Assert.assertEquals((long)0L, (long)transactionManager.producerIdAndEpoch().epoch);
    }

    @Test
    public void testClusterAuthorizationExceptionInInitProducerIdRequest() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.client.setNode(new Node(1, "localhost", 33343));
        this.prepareAndReceiveInitProducerId(343434L, Errors.CLUSTER_AUTHORIZATION_FAILED);
        Assert.assertFalse((boolean)transactionManager.hasProducerId());
        Assert.assertTrue((boolean)transactionManager.hasError());
        Assert.assertTrue((boolean)(transactionManager.lastError() instanceof ClusterAuthorizationException));
        this.assertSendFailure(ClusterAuthorizationException.class);
    }

    @Test
    public void testClusterAuthorizationExceptionInProduceRequest() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.client.setNode(new Node(1, "localhost", 33343));
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue((boolean)transactionManager.hasProducerId());
        FutureRecordMetadata future = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                return body instanceof ProduceRequest && ((ProduceRequest)body).isIdempotent();
            }
        }, (AbstractResponse)this.produceResponse(this.tp0, -1L, Errors.CLUSTER_AUTHORIZATION_FAILED, 0));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)future.isDone());
        try {
            future.get();
            Assert.fail((String)"Future should have raised ClusterAuthorizationException");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof ClusterAuthorizationException));
        }
        this.assertSendFailure(ClusterAuthorizationException.class);
    }

    @Test
    public void testSequenceNumberIncrement() throws InterruptedException {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(343434L, 0));
        this.setupWithTransactionState(transactionManager);
        this.client.setNode(new Node(1, "localhost", 33343));
        int maxRetries = 10;
        Metrics m = new Metrics();
        Sender sender = new Sender((KafkaClient)this.client, this.metadata, this.accumulator, true, 0x100000, -1, maxRetries, m, (Time)this.time, 1000, 50L, transactionManager, this.apiVersions);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                if (body instanceof ProduceRequest) {
                    ProduceRequest request = (ProduceRequest)body;
                    MemoryRecords records = (MemoryRecords)request.partitionRecordsOrFail().get(SenderTest.this.tp0);
                    Iterator batchIterator = records.batches().iterator();
                    Assert.assertTrue((boolean)batchIterator.hasNext());
                    RecordBatch batch = (RecordBatch)batchIterator.next();
                    Assert.assertFalse((boolean)batchIterator.hasNext());
                    Assert.assertEquals((long)0L, (long)batch.baseSequence());
                    Assert.assertEquals((long)343434L, (long)batch.producerId());
                    Assert.assertEquals((long)0L, (long)batch.producerEpoch());
                    return true;
                }
                return false;
            }
        }, (AbstractResponse)this.produceResponse(this.tp0, 0L, Errors.NONE, 0));
        sender.run(this.time.milliseconds());
        sender.run(this.time.milliseconds());
        sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)responseFuture.isDone());
        Assert.assertEquals((long)transactionManager.sequenceNumber(this.tp0).intValue(), (long)1L);
    }

    @Test
    public void testAbortRetryWhenProducerIdChanges() throws InterruptedException {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(343434L, 0));
        this.setupWithTransactionState(transactionManager);
        this.client.setNode(new Node(1, "localhost", 33343));
        int maxRetries = 10;
        Metrics m = new Metrics();
        Sender sender = new Sender((KafkaClient)this.client, this.metadata, this.accumulator, true, 0x100000, -1, maxRetries, m, (Time)this.time, 1000, 50L, transactionManager, this.apiVersions);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        sender.run(this.time.milliseconds());
        sender.run(this.time.milliseconds());
        String id = this.client.requests().peek().destination();
        Node node = new Node(Integer.valueOf(id).intValue(), "localhost", 0);
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertTrue((String)"Client ready status should be true", (boolean)this.client.isReady(node, 0L));
        this.client.disconnect(id);
        Assert.assertEquals((long)0L, (long)this.client.inFlightRequestCount());
        Assert.assertFalse((String)"Client ready status should be false", (boolean)this.client.isReady(node, 0L));
        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(343435L, 0));
        sender.run(this.time.milliseconds());
        sender.run(this.time.milliseconds());
        sender.run(this.time.milliseconds());
        Assert.assertEquals((String)"Expected requests to be aborted after pid change", (long)0L, (long)this.client.inFlightRequestCount());
        KafkaMetric recordErrors = (KafkaMetric)m.metrics().get(m.metricName("record-error-rate", METRIC_GROUP, ""));
        Assert.assertTrue((String)"Expected non-zero value for record send errors", (recordErrors.value() > 0.0 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)responseFuture.isDone());
        Assert.assertEquals((long)transactionManager.sequenceNumber(this.tp0).intValue(), (long)0L);
    }

    @Test
    public void testResetWhenOutOfOrderSequenceReceived() throws InterruptedException {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(343434L, 0));
        this.setupWithTransactionState(transactionManager);
        this.client.setNode(new Node(1, "localhost", 33343));
        int maxRetries = 10;
        Metrics m = new Metrics();
        Sender sender = new Sender((KafkaClient)this.client, this.metadata, this.accumulator, true, 0x100000, -1, maxRetries, m, (Time)this.time, 1000, 50L, transactionManager, this.apiVersions);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        sender.run(this.time.milliseconds());
        sender.run(this.time.milliseconds());
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        this.client.respond((AbstractResponse)this.produceResponse(this.tp0, 0L, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 0));
        sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)responseFuture.isDone());
        Assert.assertFalse((String)"Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", (boolean)transactionManager.hasProducerId());
    }

    @Test
    public void testIdempotentSplitBatchAndSend() throws Exception {
        TopicPartition tp = new TopicPartition("testSplitBatchAndSend", 1);
        TransactionManager txnManager = new TransactionManager();
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, 0);
        txnManager.setProducerIdAndEpoch(producerIdAndEpoch);
        this.testSplitBatchAndSend(txnManager, producerIdAndEpoch, tp);
    }

    @Test
    public void testTransactionalSplitBatchAndSend() throws Exception {
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, 0);
        TopicPartition tp = new TopicPartition("testSplitBatchAndSend", 1);
        TransactionManager txnManager = new TransactionManager("testSplitBatchAndSend", 60000, 100L);
        this.setupWithTransactionState(txnManager);
        this.doInitTransactions(txnManager, producerIdAndEpoch);
        txnManager.beginTransaction();
        txnManager.maybeAddPartitionToTransaction(tp);
        this.client.prepareResponse((AbstractResponse)new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp, Errors.NONE)));
        this.sender.run(this.time.milliseconds());
        this.testSplitBatchAndSend(txnManager, producerIdAndEpoch, tp);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testSplitBatchAndSend(TransactionManager txnManager, ProducerIdAndEpoch producerIdAndEpoch, TopicPartition tp) throws Exception {
        int maxRetries = 1;
        String topic = tp.topic();
        CompressionRatioEstimator.setEstimation((String)topic, (CompressionType)CompressionType.GZIP, (float)0.2f);
        Metrics m = new Metrics();
        this.accumulator = new RecordAccumulator(this.batchSize, 0x100000L, CompressionType.GZIP, 0L, 0L, m, (Time)this.time, new ApiVersions(), txnManager);
        try {
            Sender sender = new Sender((KafkaClient)this.client, this.metadata, this.accumulator, true, 0x100000, -1, maxRetries, m, (Time)this.time, 1000, 1000L, txnManager, new ApiVersions());
            Cluster cluster1 = TestUtils.clusterWith(2, topic, 2);
            this.metadata.update(cluster1, Collections.emptySet(), this.time.milliseconds());
            FutureRecordMetadata f1 = this.accumulator.append((TopicPartition)tp, (long)0L, (byte[])"key1".getBytes(), (byte[])new byte[this.batchSize / 2], null, null, (long)1000L).future;
            FutureRecordMetadata f2 = this.accumulator.append((TopicPartition)tp, (long)0L, (byte[])"key2".getBytes(), (byte[])new byte[this.batchSize / 2], null, null, (long)1000L).future;
            sender.run(this.time.milliseconds());
            sender.run(this.time.milliseconds());
            Assert.assertEquals((String)"The sequence number should be 0", (long)0L, (long)txnManager.sequenceNumber(tp).longValue());
            String id = this.client.requests().peek().destination();
            Assert.assertEquals((Object)ApiKeys.PRODUCE, (Object)this.client.requests().peek().requestBuilder().apiKey());
            Node node = new Node(Integer.valueOf(id).intValue(), "localhost", 0);
            Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
            Assert.assertTrue((String)"Client ready status should be true", (boolean)this.client.isReady(node, 0L));
            HashMap<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
            responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.MESSAGE_TOO_LARGE));
            this.client.respond((AbstractResponse)new ProduceResponse(responseMap));
            sender.run(this.time.milliseconds());
            Assert.assertEquals((double)(CompressionType.GZIP.rate - 0.005f), (double)CompressionRatioEstimator.estimation((String)topic, (CompressionType)CompressionType.GZIP), (double)0.01);
            sender.run(this.time.milliseconds());
            Assert.assertEquals((String)"The sequence number should be 0", (long)0L, (long)txnManager.sequenceNumber(tp).longValue());
            Assert.assertFalse((String)"The future shouldn't have been done.", (boolean)f1.isDone());
            Assert.assertFalse((String)"The future shouldn't have been done.", (boolean)f2.isDone());
            id = this.client.requests().peek().destination();
            Assert.assertEquals((Object)ApiKeys.PRODUCE, (Object)this.client.requests().peek().requestBuilder().apiKey());
            node = new Node(Integer.valueOf(id).intValue(), "localhost", 0);
            Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
            Assert.assertTrue((String)"Client ready status should be true", (boolean)this.client.isReady(node, 0L));
            responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L));
            this.client.respond(this.produceRequestMatcher(tp, producerIdAndEpoch, 0, txnManager.isTransactional()), (AbstractResponse)new ProduceResponse(responseMap));
            sender.run(this.time.milliseconds());
            Assert.assertTrue((String)"The future should have been done.", (boolean)f1.isDone());
            Assert.assertEquals((String)"The sequence number should be 1", (long)1L, (long)txnManager.sequenceNumber(tp).longValue());
            Assert.assertFalse((String)"The future shouldn't have been done.", (boolean)f2.isDone());
            Assert.assertEquals((String)"Offset of the first message should be 0", (long)0L, (long)((RecordMetadata)f1.get()).offset());
            sender.run(this.time.milliseconds());
            id = this.client.requests().peek().destination();
            Assert.assertEquals((Object)ApiKeys.PRODUCE, (Object)this.client.requests().peek().requestBuilder().apiKey());
            node = new Node(Integer.valueOf(id).intValue(), "localhost", 0);
            Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
            Assert.assertTrue((String)"Client ready status should be true", (boolean)this.client.isReady(node, 0L));
            responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.NONE, 1L, 0L));
            this.client.respond(this.produceRequestMatcher(tp, producerIdAndEpoch, 1, txnManager.isTransactional()), (AbstractResponse)new ProduceResponse(responseMap));
            sender.run(this.time.milliseconds());
            Assert.assertTrue((String)"The future should have been done.", (boolean)f2.isDone());
            Assert.assertEquals((String)"The sequence number should be 2", (long)2L, (long)txnManager.sequenceNumber(tp).longValue());
            Assert.assertEquals((String)"Offset of the first message should be 1", (long)1L, (long)((RecordMetadata)f2.get()).offset());
            Assert.assertTrue((String)"There should be no batch in the accumulator", (boolean)((Deque)this.accumulator.batches().get(tp)).isEmpty());
            Assert.assertTrue((String)"There should be a split", (((KafkaMetric)m.metrics().get(m.metricName("batch-split-rate", METRIC_GROUP))).value() > 0.0 ? 1 : 0) != 0);
        }
        finally {
            m.close();
        }
    }

    private MockClient.RequestMatcher produceRequestMatcher(final TopicPartition tp, final ProducerIdAndEpoch producerIdAndEpoch, final int sequence, final boolean isTransactional) {
        return new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                if (!(body instanceof ProduceRequest)) {
                    return false;
                }
                ProduceRequest request = (ProduceRequest)body;
                Map recordsMap = request.partitionRecordsOrFail();
                MemoryRecords records = (MemoryRecords)recordsMap.get(tp);
                if (records == null) {
                    return false;
                }
                List batches = TestUtils.toList(records.batches());
                if (batches.isEmpty() || batches.size() > 1) {
                    return false;
                }
                MutableRecordBatch batch = (MutableRecordBatch)batches.get(0);
                return batch.baseOffset() == 0L && batch.baseSequence() == sequence && batch.producerId() == producerIdAndEpoch.producerId && batch.producerEpoch() == producerIdAndEpoch.epoch && batch.isTransactional() == isTransactional;
            }
        };
    }

    private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception {
        Assert.assertTrue((String)"Request should be completed", (boolean)future.isDone());
        try {
            future.get();
            Assert.fail((String)"Should have thrown an exception.");
        }
        catch (ExecutionException e) {
            Assert.assertEquals(error.exception().getClass(), e.getCause().getClass());
        }
    }

    private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs) {
        ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, -1L);
        Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp);
        return new ProduceResponse(partResp, throttleTimeMs);
    }

    private void setupWithTransactionState(TransactionManager transactionManager) {
        LinkedHashMap<String, String> metricTags = new LinkedHashMap<String, String>();
        metricTags.put("client-id", CLIENT_ID);
        MetricConfig metricConfig = new MetricConfig().tags(metricTags);
        this.metrics = new Metrics(metricConfig, (Time)this.time);
        this.accumulator = new RecordAccumulator(this.batchSize, 0x100000L, CompressionType.NONE, 0L, 0L, this.metrics, (Time)this.time, this.apiVersions, transactionManager);
        this.sender = new Sender((KafkaClient)this.client, this.metadata, this.accumulator, true, 0x100000, -1, 0, this.metrics, (Time)this.time, 1000, 50L, transactionManager, this.apiVersions);
        this.metadata.update(this.cluster, Collections.emptySet(), this.time.milliseconds());
    }

    private void assertSendFailure(Class<? extends RuntimeException> expectedError) throws Exception {
        FutureRecordMetadata future = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)future.isDone());
        try {
            future.get();
            Assert.fail((String)("Future should have raised " + expectedError.getSimpleName()));
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)expectedError.isAssignableFrom(e.getCause().getClass()));
        }
    }

    private void prepareAndReceiveInitProducerId(long producerId, Errors error) {
        short producerEpoch = 0;
        if (error != Errors.NONE) {
            producerEpoch = -1;
        }
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                return body instanceof InitProducerIdRequest && ((InitProducerIdRequest)body).transactionalId() == null;
            }
        }, (AbstractResponse)new InitProducerIdResponse(0, error, producerId, producerEpoch));
        this.sender.run(this.time.milliseconds());
    }

    private void doInitTransactions(TransactionManager transactionManager, ProducerIdAndEpoch producerIdAndEpoch) {
        transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(Errors.NONE);
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        this.prepareInitPidResponse(Errors.NONE, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)transactionManager.hasProducerId());
    }

    private void prepareFindCoordinatorResponse(Errors error) {
        this.client.prepareResponse((AbstractResponse)new FindCoordinatorResponse(error, (Node)this.cluster.nodes().get(0)));
    }

    private void prepareInitPidResponse(Errors error, long pid, short epoch) {
        this.client.prepareResponse((AbstractResponse)new InitProducerIdResponse(0, error, pid, epoch));
    }
}

