package org.apache.rocketmq.client.producer;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.exception.RemotingException;

/* loaded from: input_file:org/apache/rocketmq/client/producer/ProduceAccumulator.class */
public class ProduceAccumulator {
    private final GuardForSyncSendService guardThreadForSyncSend;
    private final GuardForAsyncSendService guardThreadForAsyncSend;
    private final String instanceName;
    private long totalHoldSize = 33554432;
    private long holdSize = 32768;
    private int holdMs = 10;
    private final Logger log = LoggerFactory.getLogger(DefaultMQProducer.class);
    private Map<AggregateKey, MessageAccumulation> syncSendBatchs = new ConcurrentHashMap();
    private Map<AggregateKey, MessageAccumulation> asyncSendBatchs = new ConcurrentHashMap();
    private AtomicLong currentlyHoldSize = new AtomicLong(0);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/rocketmq/client/producer/ProduceAccumulator$AggregateKey.class */
    public class AggregateKey {
        public String topic;
        public MessageQueue mq;
        public boolean waitStoreMsgOK;
        public String tag;

        public AggregateKey(ProduceAccumulator produceAccumulator, Message message) {
            this(message.getTopic(), null, message.isWaitStoreMsgOK(), message.getTags());
        }

        public AggregateKey(ProduceAccumulator produceAccumulator, Message message, MessageQueue messageQueue) {
            this(message.getTopic(), messageQueue, message.isWaitStoreMsgOK(), message.getTags());
        }

        public AggregateKey(String str, MessageQueue messageQueue, boolean z, String str2) {
            this.topic = null;
            this.mq = null;
            this.waitStoreMsgOK = false;
            this.tag = null;
            this.topic = str;
            this.mq = messageQueue;
            this.waitStoreMsgOK = z;
            this.tag = str2;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            AggregateKey aggregateKey = (AggregateKey) obj;
            return this.waitStoreMsgOK == aggregateKey.waitStoreMsgOK && this.topic.equals(aggregateKey.topic) && Objects.equals(this.mq, aggregateKey.mq) && Objects.equals(this.tag, aggregateKey.tag);
        }

        public int hashCode() {
            return Objects.hash(this.topic, this.mq, Boolean.valueOf(this.waitStoreMsgOK), this.tag);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/rocketmq/client/producer/ProduceAccumulator$GuardForAsyncSendService.class */
    public class GuardForAsyncSendService extends ServiceThread {
        private final String serviceName;

        public GuardForAsyncSendService(String str) {
            this.serviceName = String.format("Client_%s_GuardForAsyncSend", str);
        }

        public String getServiceName() {
            return this.serviceName;
        }

        public void run() {
            ProduceAccumulator.this.log.info(getServiceName() + " service started");
            while (!isStopped()) {
                try {
                    doWork();
                } catch (Exception e) {
                    ProduceAccumulator.this.log.warn(getServiceName() + " service has exception. ", e);
                }
            }
            ProduceAccumulator.this.log.info(getServiceName() + " service end");
        }

        private void doWork() throws Exception {
            Collection<MessageAccumulation> values = ProduceAccumulator.this.asyncSendBatchs.values();
            int max = Math.max(1, ProduceAccumulator.this.holdMs / 2);
            for (MessageAccumulation messageAccumulation : values) {
                if (messageAccumulation.readyToSend()) {
                    messageAccumulation.send(null);
                }
                synchronized (messageAccumulation.closed) {
                    if (messageAccumulation.messagesSize.get() == 0) {
                        messageAccumulation.closed.set(true);
                        ProduceAccumulator.this.asyncSendBatchs.remove(messageAccumulation.aggregateKey, messageAccumulation);
                    }
                }
            }
            Thread.sleep(max);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/rocketmq/client/producer/ProduceAccumulator$GuardForSyncSendService.class */
    public class GuardForSyncSendService extends ServiceThread {
        private final String serviceName;

        public GuardForSyncSendService(String str) {
            this.serviceName = String.format("Client_%s_GuardForSyncSend", str);
        }

        public String getServiceName() {
            return this.serviceName;
        }

        public void run() {
            ProduceAccumulator.this.log.info(getServiceName() + " service started");
            while (!isStopped()) {
                try {
                    doWork();
                } catch (Exception e) {
                    ProduceAccumulator.this.log.warn(getServiceName() + " service has exception. ", e);
                }
            }
            ProduceAccumulator.this.log.info(getServiceName() + " service end");
        }

        private void doWork() throws InterruptedException {
            Collection<MessageAccumulation> values = ProduceAccumulator.this.syncSendBatchs.values();
            int max = Math.max(1, ProduceAccumulator.this.holdMs / 2);
            for (MessageAccumulation messageAccumulation : values) {
                messageAccumulation.wakeup();
                synchronized (messageAccumulation) {
                    synchronized (messageAccumulation.closed) {
                        if (messageAccumulation.messagesSize.get() == 0) {
                            messageAccumulation.closed.set(true);
                            ProduceAccumulator.this.syncSendBatchs.remove(messageAccumulation.aggregateKey, messageAccumulation);
                        } else {
                            messageAccumulation.notify();
                        }
                    }
                }
            }
            Thread.sleep(max);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/rocketmq/client/producer/ProduceAccumulator$MessageAccumulation.class */
    public class MessageAccumulation {
        private final DefaultMQProducer defaultMQProducer;
        private SendResult[] sendResults;
        private AggregateKey aggregateKey;
        private LinkedList<Message> messages = new LinkedList<>();
        private LinkedList<SendCallback> sendCallbacks = new LinkedList<>();
        private Set<String> keys = new HashSet();
        private AtomicBoolean closed = new AtomicBoolean(false);
        private AtomicInteger messagesSize = new AtomicInteger(0);
        private int count = 0;
        private long createTime = System.currentTimeMillis();

        public MessageAccumulation(AggregateKey aggregateKey, DefaultMQProducer defaultMQProducer) {
            this.defaultMQProducer = defaultMQProducer;
            this.aggregateKey = aggregateKey;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean readyToSend() {
            return ((long) this.messagesSize.get()) > ProduceAccumulator.this.holdSize || System.currentTimeMillis() >= this.createTime + ((long) ProduceAccumulator.this.holdMs);
        }

        public int add(Message message) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
            synchronized (this.closed) {
                if (this.closed.get()) {
                    return -1;
                }
                int i = this.count;
                this.count = i + 1;
                this.messages.add(message);
                this.messagesSize.addAndGet(message.getBody().length);
                String keys = message.getKeys();
                if (keys != null) {
                    this.keys.addAll(Arrays.asList(keys.split(" ")));
                }
                synchronized (this) {
                    while (true) {
                        if (this.closed.get()) {
                            break;
                        }
                        if (readyToSend()) {
                            send();
                            break;
                        }
                        wait();
                    }
                }
                return i;
            }
        }

        public boolean add(Message message, SendCallback sendCallback) throws InterruptedException, RemotingException, MQClientException {
            synchronized (this.closed) {
                if (this.closed.get()) {
                    return false;
                }
                this.count++;
                this.messages.add(message);
                this.sendCallbacks.add(sendCallback);
                this.messagesSize.getAndAdd(message.getBody().length);
                if (!readyToSend()) {
                    return true;
                }
                send(sendCallback);
                return true;
            }
        }

        public synchronized void wakeup() {
            if (this.closed.get()) {
                return;
            }
            notify();
        }

        private MessageBatch batch() {
            MessageBatch messageBatch = new MessageBatch(this.messages);
            messageBatch.setTopic(this.aggregateKey.topic);
            messageBatch.setWaitStoreMsgOK(this.aggregateKey.waitStoreMsgOK);
            messageBatch.setKeys(this.keys);
            messageBatch.setTags(this.aggregateKey.tag);
            MessageClientIDSetter.setUniqID(messageBatch);
            messageBatch.setBody(MessageDecoder.encodeMessages(this.messages));
            return messageBatch;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void splitSendResults(SendResult sendResult) {
            if (sendResult == null) {
                throw new IllegalArgumentException("sendResult is null");
            }
            boolean z = !sendResult.getMsgId().contains(",");
            this.sendResults = new SendResult[this.count];
            if (z) {
                for (int i = 0; i < this.count; i++) {
                    this.sendResults[i] = sendResult;
                }
                return;
            }
            String[] split = sendResult.getMsgId().split(",");
            String[] split2 = sendResult.getOffsetMsgId().split(",");
            if (split2.length != this.count || split.length != this.count) {
                throw new IllegalArgumentException("sendResult is illegal");
            }
            for (int i2 = 0; i2 < this.count; i2++) {
                this.sendResults[i2] = new SendResult(sendResult.getSendStatus(), split[i2], sendResult.getMessageQueue(), sendResult.getQueueOffset() + i2, sendResult.getTransactionId(), split2[i2], sendResult.getRegionId());
            }
        }

        private void send() throws InterruptedException, MQClientException, MQBrokerException, RemotingException {
            synchronized (this.closed) {
                if (this.closed.getAndSet(true)) {
                    return;
                }
                Message batch = batch();
                try {
                    if (this.defaultMQProducer == null) {
                        throw new IllegalArgumentException("defaultMQProducer is null, can not send message");
                    }
                    splitSendResults(this.defaultMQProducer.sendDirect(batch, this.aggregateKey.mq, null));
                } finally {
                    ProduceAccumulator.this.currentlyHoldSize.addAndGet(-this.messagesSize.get());
                    notifyAll();
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void send(SendCallback sendCallback) {
            synchronized (this.closed) {
                if (this.closed.getAndSet(true)) {
                    return;
                }
                Message batch = batch();
                try {
                    if (this.defaultMQProducer == null) {
                        throw new IllegalArgumentException("defaultMQProducer is null, can not send message");
                    }
                    final int i = this.messagesSize.get();
                    this.defaultMQProducer.sendDirect(batch, this.aggregateKey.mq, new SendCallback() { // from class: org.apache.rocketmq.client.producer.ProduceAccumulator.MessageAccumulation.1
                        @Override // org.apache.rocketmq.client.producer.SendCallback
                        public void onSuccess(SendResult sendResult) {
                            try {
                                MessageAccumulation.this.splitSendResults(sendResult);
                                int i2 = 0;
                                Iterator it = MessageAccumulation.this.sendCallbacks.iterator();
                                while (it.hasNext()) {
                                    int i3 = i2;
                                    i2++;
                                    ((SendCallback) it.next()).onSuccess(MessageAccumulation.this.sendResults[i3]);
                                }
                                if (i2 != MessageAccumulation.this.count) {
                                    throw new IllegalArgumentException("sendResult is illegal");
                                }
                                ProduceAccumulator.this.currentlyHoldSize.addAndGet(-i);
                            } catch (Exception e) {
                                onException(e);
                            }
                        }

                        @Override // org.apache.rocketmq.client.producer.SendCallback
                        public void onException(Throwable th) {
                            Iterator it = MessageAccumulation.this.sendCallbacks.iterator();
                            while (it.hasNext()) {
                                ((SendCallback) it.next()).onException(th);
                            }
                            ProduceAccumulator.this.currentlyHoldSize.addAndGet(-i);
                        }
                    });
                } catch (Exception e) {
                    Iterator<SendCallback> it = this.sendCallbacks.iterator();
                    while (it.hasNext()) {
                        it.next().onException(e);
                    }
                }
            }
        }
    }

    public ProduceAccumulator(String str) {
        this.instanceName = str;
        this.guardThreadForSyncSend = new GuardForSyncSendService(this.instanceName);
        this.guardThreadForAsyncSend = new GuardForAsyncSendService(this.instanceName);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        this.guardThreadForSyncSend.start();
        this.guardThreadForAsyncSend.start();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdown() {
        this.guardThreadForSyncSend.shutdown();
        this.guardThreadForAsyncSend.shutdown();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getBatchMaxDelayMs() {
        return this.holdMs;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void batchMaxDelayMs(int i) {
        if (i <= 0 || i > 30000) {
            throw new IllegalArgumentException(String.format("batchMaxDelayMs expect between 1ms and 30s, but get %d!", Integer.valueOf(i)));
        }
        this.holdMs = i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getBatchMaxBytes() {
        return this.holdSize;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void batchMaxBytes(long j) {
        if (j <= 0 || j > 2097152) {
            throw new IllegalArgumentException(String.format("batchMaxBytes expect between 1B and 2MB, but get %d!", Long.valueOf(j)));
        }
        this.holdSize = j;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getTotalBatchMaxBytes() {
        return this.holdSize;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void totalBatchMaxBytes(long j) {
        if (j <= 0) {
            throw new IllegalArgumentException(String.format("totalBatchMaxBytes must bigger then 0, but get %d!", Long.valueOf(j)));
        }
        this.totalHoldSize = j;
    }

    private MessageAccumulation getOrCreateSyncSendBatch(AggregateKey aggregateKey, DefaultMQProducer defaultMQProducer) {
        MessageAccumulation messageAccumulation = this.syncSendBatchs.get(aggregateKey);
        if (messageAccumulation != null) {
            return messageAccumulation;
        }
        MessageAccumulation messageAccumulation2 = new MessageAccumulation(aggregateKey, defaultMQProducer);
        MessageAccumulation putIfAbsent = this.syncSendBatchs.putIfAbsent(aggregateKey, messageAccumulation2);
        return putIfAbsent == null ? messageAccumulation2 : putIfAbsent;
    }

    private MessageAccumulation getOrCreateAsyncSendBatch(AggregateKey aggregateKey, DefaultMQProducer defaultMQProducer) {
        MessageAccumulation messageAccumulation = this.asyncSendBatchs.get(aggregateKey);
        if (messageAccumulation != null) {
            return messageAccumulation;
        }
        MessageAccumulation messageAccumulation2 = new MessageAccumulation(aggregateKey, defaultMQProducer);
        MessageAccumulation putIfAbsent = this.asyncSendBatchs.putIfAbsent(aggregateKey, messageAccumulation2);
        return putIfAbsent == null ? messageAccumulation2 : putIfAbsent;
    }

    SendResult send(Message message, DefaultMQProducer defaultMQProducer) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
        AggregateKey aggregateKey = new AggregateKey(this, message);
        while (true) {
            MessageAccumulation orCreateSyncSendBatch = getOrCreateSyncSendBatch(aggregateKey, defaultMQProducer);
            int add = orCreateSyncSendBatch.add(message);
            if (add != -1) {
                return orCreateSyncSendBatch.sendResults[add];
            }
            this.syncSendBatchs.remove(aggregateKey, orCreateSyncSendBatch);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SendResult send(Message message, MessageQueue messageQueue, DefaultMQProducer defaultMQProducer) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
        AggregateKey aggregateKey = new AggregateKey(this, message, messageQueue);
        while (true) {
            MessageAccumulation orCreateSyncSendBatch = getOrCreateSyncSendBatch(aggregateKey, defaultMQProducer);
            int add = orCreateSyncSendBatch.add(message);
            if (add != -1) {
                return orCreateSyncSendBatch.sendResults[add];
            }
            this.syncSendBatchs.remove(aggregateKey, orCreateSyncSendBatch);
        }
    }

    void send(Message message, SendCallback sendCallback, DefaultMQProducer defaultMQProducer) throws InterruptedException, RemotingException, MQClientException {
        AggregateKey aggregateKey = new AggregateKey(this, message);
        while (true) {
            MessageAccumulation orCreateAsyncSendBatch = getOrCreateAsyncSendBatch(aggregateKey, defaultMQProducer);
            if (orCreateAsyncSendBatch.add(message, sendCallback)) {
                return;
            } else {
                this.asyncSendBatchs.remove(aggregateKey, orCreateAsyncSendBatch);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void send(Message message, MessageQueue messageQueue, SendCallback sendCallback, DefaultMQProducer defaultMQProducer) throws InterruptedException, RemotingException, MQClientException {
        AggregateKey aggregateKey = new AggregateKey(this, message, messageQueue);
        while (true) {
            MessageAccumulation orCreateAsyncSendBatch = getOrCreateAsyncSendBatch(aggregateKey, defaultMQProducer);
            if (orCreateAsyncSendBatch.add(message, sendCallback)) {
                return;
            } else {
                this.asyncSendBatchs.remove(aggregateKey, orCreateAsyncSendBatch);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean tryAddMessage(Message message) {
        synchronized (this.currentlyHoldSize) {
            if (this.currentlyHoldSize.get() >= this.totalHoldSize) {
                return false;
            }
            this.currentlyHoldSize.addAndGet(message.getBody().length);
            return true;
        }
    }
}
