package akka.kafka.internal;

import akka.kafka.ConsumerMessage;
import akka.kafka.ProducerMessage;
import akka.kafka.ProducerSettings;
import akka.kafka.internal.DeferredProducer;
import akka.kafka.internal.TransactionalProducerStage;
import akka.stream.Attributes;
import akka.stream.stage.AsyncCallback;
import java.util.Map;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Seq;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Try;

/* compiled from: TransactionalProducerStage.scala */
@ScalaSignature(bytes = "\u0006\u0001\t=b\u0001B\u0015+\rEB\u0001\u0002\u001b\u0001\u0003\u0002\u0003\u0006I!\u001b\u0005\tY\u0002\u0011\t\u0011)A\u0005[\"AQ\u000f\u0001B\u0001B\u0003%a\u000fC\u0003}\u0001\u0011\u0005Q\u0010C\u0005\u0002\u0006\u0001\u0011\r\u0011\"\u0003\u0002\b!A\u0011q\u0003\u0001!\u0002\u0013\tI\u0001C\u0005\u0002\u001a\u0001\u0011\r\u0011\"\u0003\u0002\u001c!A\u0011Q\u0006\u0001!\u0002\u0013\ti\u0002C\u0005\u00020\u0001\u0001\r\u0011\"\u0003\u00022!I\u0011\u0011\t\u0001A\u0002\u0013%\u00111\t\u0005\t\u0003\u001f\u0002\u0001\u0015)\u0003\u00024!I\u0011\u0011\u000b\u0001A\u0002\u0013%\u00111\u000b\u0005\n\u00037\u0002\u0001\u0019!C\u0005\u0003;B\u0001\"!\u0019\u0001A\u0003&\u0011Q\u000b\u0005\n\u0003G\u0002\u0001\u0019!C\u0005\u0003KB\u0011\"!\u001c\u0001\u0001\u0004%I!a\u001c\t\u0011\u0005M\u0004\u0001)Q\u0005\u0003OBq!!\u001e\u0001\t#\n9\bC\u0004\u0002\b\u0002!\t%!#\t\u000f\u0005-\u0005\u0001\"\u0015\u0002\n\"9\u0011Q\u0012\u0001\u0005\n\u0005%\u0005bBAH\u0001\u0011E\u0013\u0011\u0013\u0005\n\u0003/\u0003\u0011\u0013!C\t\u00033Cq!a,\u0001\t#\nI\tC\u0004\u00022\u0002!\t&!#\t\u000f\u0005M\u0006\u0001\"\u0015\u00026\"9\u00111\u0018\u0001\u0005\n\u0005u\u0006\"CAd\u0001E\u0005I\u0011BAM\u0011%\tI\rAI\u0001\n\u0013\tI\nC\u0004\u0002L\u0002!I!!4\t\u000f\u0005M\u0007\u0001\"\u0003\u0002V\"9\u0011\u0011\u001d\u0001\u0005R\u0005\r\bbBAt\u0001\u0011\u0005\u0013\u0011\u0012\u0005\b\u0003S\u0004A\u0011IAv\u0011\u001d\u0011\u0019\u0001\u0001C\u0005\u0005\u000bA\u0011Ba\u0005\u0001\u0005\u0004%IA!\u0006\t\u0011\t\u0005\u0002\u0001)A\u0005\u0005/AqAa\t\u0001\t\u0013\tI\tC\u0004\u0003&\u0001!I!!#\t\u000f\t\u001d\u0002\u0001\"\u0003\u0003*\tyBK]1og\u0006\u001cG/[8oC2\u0004&o\u001c3vG\u0016\u00148\u000b^1hK2{w-[2\u000b\u0005-b\u0013\u0001C5oi\u0016\u0014h.\u00197\u000b\u00055r\u0013!B6bM.\f'\"A\u0018\u0002\t\u0005\\7.Y\u0002\u0001+\u0011\u0011\u0014HR%\u0014\t\u0001\u0019Dl\u0018\t\biU:T\tS&Z\u001b\u0005Q\u0013B\u0001\u001c+\u0005e!UMZ1vYR\u0004&o\u001c3vG\u0016\u00148\u000b^1hK2{w-[2\u0011\u0005aJD\u0002\u0001\u0003\u0006u\u0001\u0011\ra\u000f\u0002\u0002\u0017F\u0011AH\u0011\t\u0003{\u0001k\u0011A\u0010\u0006\u0002\u007f\u0005)1oY1mC&\u0011\u0011I\u0010\u0002\b\u001d>$\b.\u001b8h!\ti4)\u0003\u0002E}\t\u0019\u0011I\\=\u0011\u0005a2E!B$\u0001\u0005\u0004Y$!\u0001,\u0011\u0005aJE!\u0002&\u0001\u0005\u0004Y$!\u0001)\u0011\u000b13v'\u0012%\u000f\u00055#fB\u0001(T\u001d\ty%+D\u0001Q\u0015\t\t\u0006'\u0001\u0004=e>|GOP\u0005\u0002_%\u0011QFL\u0005\u0003+2\nq\u0002\u0015:pIV\u001cWM]'fgN\fw-Z\u0005\u0003/b\u0013\u0001\"\u00128wK2|\u0007/\u001a\u0006\u0003+2\u0002R\u0001\u0014.8\u000b\"K!a\u0017-\u0003\u000fI+7/\u001e7ugB\u0011A'X\u0005\u0003=*\u0012ab\u0015;bO\u0016LE\rT8hO&tw\r\u0005\u0002aK:\u0011\u0011m\u0019\b\u0003\u001b\nL!a\u000b\u0017\n\u0005\u0011T\u0013!\u0004)s_\u0012,8-\u001a:Ti\u0006<W-\u0003\u0002gO\n9\u0002K]8ek\u000e,'oQ8na2,G/[8o'R\fG/\u001a\u0006\u0003I*\nQa\u001d;bO\u0016\u0004R\u0001\u000e68\u000b\"K!a\u001b\u0016\u00035Q\u0013\u0018M\\:bGRLwN\\1m!J|G-^2feN#\u0018mZ3\u0002\u001fQ\u0014\u0018M\\:bGRLwN\\1m\u0013\u0012\u0004\"A\u001c:\u000f\u0005=\u0004\bCA(?\u0013\t\th(\u0001\u0004Qe\u0016$WMZ\u0005\u0003gR\u0014aa\u0015;sS:<'BA9?\u0003MIg\u000e[3sSR,G-\u0011;ue&\u0014W\u000f^3t!\t9(0D\u0001y\u0015\tIh&\u0001\u0004tiJ,\u0017-\\\u0005\u0003wb\u0014!\"\u0011;ue&\u0014W\u000f^3t\u0003\u0019a\u0014N\\5u}Q1ap`A\u0001\u0003\u0007\u0001R\u0001\u000e\u00018\u000b\"CQ\u0001\u001b\u0003A\u0002%DQ\u0001\u001c\u0003A\u00025DQ!\u001e\u0003A\u0002Y\f!cY8n[&$8k\u00195fIVdWM]&fsV\u0011\u0011\u0011\u0002\t\u0005\u0003\u0017\t)\"\u0004\u0002\u0002\u000e)!\u0011qBA\t\u0003\u0011a\u0017M\\4\u000b\u0005\u0005M\u0011\u0001\u00026bm\u0006L1a]A\u0007\u0003M\u0019w.\\7jiN\u001b\u0007.\u001a3vY\u0016\u00148*Z=!\u0003QiWm]:bO\u0016$%/Y5o\u0013:$XM\u001d<bYV\u0011\u0011Q\u0004\t\u0005\u0003?\tI#\u0004\u0002\u0002\")!\u00111EA\u0013\u0003!!WO]1uS>t'bAA\u0014}\u0005Q1m\u001c8dkJ\u0014XM\u001c;\n\t\u0005-\u0012\u0011\u0005\u0002\u000f\r&t\u0017\u000e^3EkJ\fG/[8o\u0003UiWm]:bO\u0016$%/Y5o\u0013:$XM\u001d<bY\u0002\nABY1uG\"|eMZ:fiN,\"!a\r\u0011\t\u0005U\u00121\b\b\u0004i\u0005]\u0012bAA\u001dU\u0005QBK]1og\u0006\u001cG/[8oC2\u0004&o\u001c3vG\u0016\u00148\u000b^1hK&!\u0011QHA \u0005A!&/\u00198tC\u000e$\u0018n\u001c8CCR\u001c\u0007NC\u0002\u0002:)\n\u0001CY1uG\"|eMZ:fiN|F%Z9\u0015\t\u0005\u0015\u00131\n\t\u0004{\u0005\u001d\u0013bAA%}\t!QK\\5u\u0011%\tiECA\u0001\u0002\u0004\t\u0019$A\u0002yIE\nQBY1uG\"|eMZ:fiN\u0004\u0013a\u00043f[\u0006tGmU;ta\u0016tG-\u001a3\u0016\u0005\u0005U\u0003cA\u001f\u0002X%\u0019\u0011\u0011\f \u0003\u000f\t{w\u000e\\3b]\u0006\u0019B-Z7b]\u0012\u001cVo\u001d9f]\u0012,Gm\u0018\u0013fcR!\u0011QIA0\u0011%\ti%DA\u0001\u0002\u0004\t)&\u0001\teK6\fg\u000eZ*vgB,g\u000eZ3eA\u0005aa-\u001b:ti6+7o]1hKV\u0011\u0011q\r\t\u0005{\u0005%4*C\u0002\u0002ly\u0012aa\u00149uS>t\u0017\u0001\u00054jeN$X*Z:tC\u001e,w\fJ3r)\u0011\t)%!\u001d\t\u0013\u00055\u0003#!AA\u0002\u0005\u001d\u0014!\u00044jeN$X*Z:tC\u001e,\u0007%A\u0005m_\u001e\u001cv.\u001e:dKV\u0011\u0011\u0011\u0010\u0019\u0005\u0003w\n\u0019\tE\u0003o\u0003{\n\t)C\u0002\u0002��Q\u0014Qa\u00117bgN\u00042\u0001OAB\t)\t)IEA\u0001\u0002\u0003\u0015\ta\u000f\u0002\u0004?\u0012\n\u0014\u0001\u00039sKN#\u0018M\u001d;\u0015\u0005\u0005\u0015\u0013\u0001\u00059s_\u0012,8-\u001a:BgNLwM\\3e\u0003M\u0001(o\u001c3vG\u00164\u0015N]:u\u001b\u0016\u001c8/Y4f\u00031\u0011Xm];nK\u0012+W.\u00198e)\u0011\t)%a%\t\u0013\u0005Ue\u0003%AA\u0002\u0005U\u0013!\u0003;ssR{\u0007+\u001e7m\u0003Y\u0011Xm];nK\u0012+W.\u00198eI\u0011,g-Y;mi\u0012\nTCAANU\u0011\t)&!(,\u0005\u0005}\u0005\u0003BAQ\u0003Wk!!a)\u000b\t\u0005\u0015\u0016qU\u0001\nk:\u001c\u0007.Z2lK\u0012T1!!+?\u0003)\tgN\\8uCRLwN\\\u0005\u0005\u0003[\u000b\u0019KA\tv]\u000eDWmY6fIZ\u000b'/[1oG\u0016\fQb];ta\u0016tG\rR3nC:$\u0017\u0001E5oSRL\u0017\r\\%o\u0011\u0006tG\r\\3s\u0003\u001dyg\u000eV5nKJ$B!!\u0012\u00028\"1\u0011\u0011\u0018\u000eA\u0002\t\u000b\u0001\u0002^5nKJ\\U-_\u0001\u0017[\u0006L(-Z\"p[6LG\u000f\u0016:b]N\f7\r^5p]R1\u0011QIA`\u0003\u0007D\u0011\"!1\u001c!\u0003\u0005\r!!\u0016\u0002'\t,w-\u001b8OK^$&/\u00198tC\u000e$\u0018n\u001c8\t\u0013\u0005\u00157\u0004%AA\u0002\u0005U\u0013aH1c_J$X)\u001c9usR\u0013\u0018M\\:bGRLwN\\(o\u0007>l\u0007\u000f\\3uK\u0006\u0001S.Y=cK\u000e{W.\\5u)J\fgn]1di&|g\u000e\n3fM\u0006,H\u000e\u001e\u00132\u0003\u0001j\u0017-\u001f2f\u0007>lW.\u001b;Ue\u0006t7/Y2uS>tG\u0005Z3gCVdG\u000f\n\u001a\u0002#A\f'o]3GSJ\u001cH/T3tg\u0006<W\r\u0006\u0003\u0002V\u0005=\u0007BBAi=\u0001\u00071*A\u0002ng\u001e\fAdZ3oKJ\fG/\u001a3Ue\u0006t7/Y2uS>t\u0017\r\\\"p]\u001aLw\r\u0006\u0003\u0002X\u0006}\u0007CBAm\u00037<T)D\u0001-\u0013\r\ti\u000e\f\u0002\u0011!J|G-^2feN+G\u000f^5oONDa!!5 \u0001\u0004Y\u0015\u0001\u00039pgR\u001cVM\u001c3\u0015\t\u0005\u0015\u0013Q\u001d\u0005\u0007\u0003#\u0004\u0003\u0019A&\u0002'=t7i\\7qY\u0016$\u0018n\u001c8Tk\u000e\u001cWm]:\u0002'=t7i\\7qY\u0016$\u0018n\u001c8GC&dWO]3\u0015\t\u0005\u0015\u0013Q\u001e\u0005\b\u0003_\u0014\u0003\u0019AAy\u0003\t)\u0007\u0010\u0005\u0003\u0002t\u0006uh\u0002BA{\u0003st1aTA|\u0013\u0005y\u0014bAA~}\u00059\u0001/Y2lC\u001e,\u0017\u0002BA��\u0005\u0003\u0011\u0011\u0002\u00165s_^\f'\r\\3\u000b\u0007\u0005mh(A\td_6l\u0017\u000e\u001e+sC:\u001c\u0018m\u0019;j_:$b!!\u0012\u0003\b\tE\u0001b\u0002B\u0005G\u0001\u0007!1B\u0001\u0006E\u0006$8\r\u001b\t\u0005\u0003k\u0011i!\u0003\u0003\u0003\u0010\u0005}\"\u0001\u0007(p]\u0016l\u0007\u000f^=Ue\u0006t7/Y2uS>t')\u0019;dQ\"9\u0011\u0011Y\u0012A\u0002\u0005U\u0013!F8o\u0013:$XM\u001d8bY\u000e{W.\\5u\u0003\u000e\\7IY\u000b\u0003\u0005/\u0001bA!\u0007\u0003\u001e\u0005\u0015SB\u0001B\u000e\u0015\tA\u00070\u0003\u0003\u0003 \tm!!D!ts:\u001c7)\u00197mE\u0006\u001c7.\u0001\fp]&sG/\u001a:oC2\u001cu.\\7ji\u0006\u001b7n\u00112!\u0003AIg.\u001b;Ue\u0006t7/Y2uS>t7/\u0001\tcK\u001eLg\u000e\u0016:b]N\f7\r^5p]\u0006\u0001\u0012MY8siR\u0013\u0018M\\:bGRLwN\u001c\u000b\u0005\u0003\u000b\u0012Y\u0003\u0003\u0004\u0003.!\u0002\r!\\\u0001\u0007e\u0016\f7o\u001c8")
/* loaded from: input_file:akka/kafka/internal/TransactionalProducerStageLogic.class */
public final class TransactionalProducerStageLogic<K, V, P> extends DefaultProducerStageLogic<K, V, P, ProducerMessage.Envelope<K, V, P>, ProducerMessage.Results<K, V, P>> {
    public final TransactionalProducerStage<K, V, P> akka$kafka$internal$TransactionalProducerStageLogic$$stage;
    private final String transactionalId;
    private final String commitSchedulerKey;
    private final FiniteDuration messageDrainInterval;
    private TransactionalProducerStage.TransactionBatch batchOffsets;
    private boolean demandSuspended;
    private Option<ProducerMessage.Envelope<K, V, P>> firstMessage;
    private final AsyncCallback<BoxedUnit> onInternalCommitAckCb;

    private String commitSchedulerKey() {
        return this.commitSchedulerKey;
    }

    private FiniteDuration messageDrainInterval() {
        return this.messageDrainInterval;
    }

    private TransactionalProducerStage.TransactionBatch batchOffsets() {
        return this.batchOffsets;
    }

    private void batchOffsets_$eq(TransactionalProducerStage.TransactionBatch transactionBatch) {
        this.batchOffsets = transactionBatch;
    }

    private boolean demandSuspended() {
        return this.demandSuspended;
    }

    private void demandSuspended_$eq(boolean z) {
        this.demandSuspended = z;
    }

    private Option<ProducerMessage.Envelope<K, V, P>> firstMessage() {
        return this.firstMessage;
    }

    private void firstMessage_$eq(Option<ProducerMessage.Envelope<K, V, P>> option) {
        this.firstMessage = option;
    }

    @Override // akka.kafka.internal.DefaultProducerStageLogic
    public Class<?> logSource() {
        return TransactionalProducerStage.class;
    }

    @Override // akka.kafka.internal.DefaultProducerStageLogic
    public void preStart() {
        resumeDemand(resumeDemand$default$1());
    }

    @Override // akka.kafka.internal.DefaultProducerStageLogic, akka.kafka.internal.DeferredProducer
    public void producerAssigned() {
        producingInHandler();
        initTransactions();
        beginTransaction();
        produceFirstMessage();
        resumeDemand(resumeDemand$default$1());
        scheduleOnce(commitSchedulerKey(), producerSettings().eosCommitInterval());
    }

    private void produceFirstMessage() {
        Some firstMessage = firstMessage();
        if (!(firstMessage instanceof Some)) {
            throw new IllegalStateException("Should never attempt to produce first message if it does not exist.");
        }
        produce((ProducerMessage.Envelope) firstMessage.value());
        firstMessage_$eq(None$.MODULE$);
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    @Override // akka.kafka.internal.DefaultProducerStageLogic
    public void resumeDemand(boolean z) {
        super.resumeDemand(z);
        demandSuspended_$eq(false);
    }

    @Override // akka.kafka.internal.DefaultProducerStageLogic
    public boolean resumeDemand$default$1() {
        return true;
    }

    @Override // akka.kafka.internal.DefaultProducerStageLogic
    public void suspendDemand() {
        if (!demandSuspended()) {
            super.suspendDemand();
        }
        demandSuspended_$eq(true);
    }

    @Override // akka.kafka.internal.DefaultProducerStageLogic
    public void initialInHandler() {
        setHandler(this.akka$kafka$internal$TransactionalProducerStageLogic$$stage.in(), new DefaultProducerStageLogic<K, V, P, ProducerMessage.Envelope<K, V, P>, ProducerMessage.Results<K, V, P>>.DefaultInHandler(this) { // from class: akka.kafka.internal.TransactionalProducerStageLogic$$anon$1
            private final /* synthetic */ TransactionalProducerStageLogic $outer;

            @Override // akka.kafka.internal.DefaultProducerStageLogic.DefaultInHandler
            public void onPush() {
                this.$outer.akka$kafka$internal$TransactionalProducerStageLogic$$parseFirstMessage((ProducerMessage.Envelope) this.$outer.grab(this.$outer.akka$kafka$internal$TransactionalProducerStageLogic$$stage.in()));
            }

            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(this);
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        });
    }

    public void onTimer(Object obj) {
        String commitSchedulerKey = commitSchedulerKey();
        if (obj == null) {
            if (commitSchedulerKey != null) {
                return;
            }
        } else if (!obj.equals(commitSchedulerKey)) {
            return;
        }
        maybeCommitTransaction(maybeCommitTransaction$default$1(), maybeCommitTransaction$default$2());
    }

    private void maybeCommitTransaction(boolean z, boolean z2) {
        int awaitingConfirmationValue = awaitingConfirmationValue();
        TransactionalProducerStage.TransactionBatch batchOffsets = batchOffsets();
        if (batchOffsets instanceof TransactionalProducerStage.NonemptyTransactionBatch) {
            TransactionalProducerStage.NonemptyTransactionBatch nonemptyTransactionBatch = (TransactionalProducerStage.NonemptyTransactionBatch) batchOffsets;
            if (awaitingConfirmationValue == 0) {
                commitTransaction(nonemptyTransactionBatch, z);
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                return;
            }
        }
        if ((batchOffsets instanceof TransactionalProducerStage.EmptyTransactionBatch) && awaitingConfirmationValue == 0 && z2) {
            abortTransaction("Transaction is empty and stage is completing");
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        } else if (awaitingConfirmationValue <= 0) {
            scheduleOnce(commitSchedulerKey(), producerSettings().eosCommitInterval());
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        } else {
            suspendDemand();
            scheduleOnce(commitSchedulerKey(), messageDrainInterval());
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
        }
    }

    private boolean maybeCommitTransaction$default$1() {
        return true;
    }

    private boolean maybeCommitTransaction$default$2() {
        return false;
    }

    public boolean akka$kafka$internal$TransactionalProducerStageLogic$$parseFirstMessage(ProducerMessage.Envelope<K, V, P> envelope) {
        boolean z;
        boolean z2 = false;
        DeferredProducer.ProducerAssignmentLifecycle producerAssignmentLifecycle = producerAssignmentLifecycle();
        if (DeferredProducer$Assigned$.MODULE$.equals(producerAssignmentLifecycle)) {
            z = true;
        } else {
            if (DeferredProducer$Unassigned$.MODULE$.equals(producerAssignmentLifecycle)) {
                z2 = true;
                if (firstMessage().nonEmpty()) {
                    throw new IllegalStateException("Cannot reapply first message");
                }
            }
            if (!z2) {
                if (DeferredProducer$AsyncCreateRequestSent$.MODULE$.equals(producerAssignmentLifecycle)) {
                    throw new IllegalStateException(new StringBuilder(71).append("Should never receive new messages while in producer assignment state '").append(DeferredProducer$AsyncCreateRequestSent$.MODULE$).append("'").toString());
                }
                throw new MatchError(producerAssignmentLifecycle);
            }
            firstMessage_$eq(new Some(envelope));
            resolveProducer(generatedTransactionalConfig(envelope));
            suspendDemand();
            z = false;
        }
        return z;
    }

    private ProducerSettings<K, V> generatedTransactionalConfig(ProducerMessage.Envelope<K, V, P> envelope) {
        String str;
        P passThrough = envelope.passThrough();
        if (passThrough instanceof ConsumerMessage.PartitionOffsetCommittedMarker) {
            ConsumerMessage.PartitionOffsetCommittedMarker partitionOffsetCommittedMarker = (ConsumerMessage.PartitionOffsetCommittedMarker) passThrough;
            if (partitionOffsetCommittedMarker.fromPartitionedSource()) {
                ConsumerMessage.GroupTopicPartition key = partitionOffsetCommittedMarker.key();
                String sb = new StringBuilder(3).append(this.transactionalId).append("-").append(key.groupId()).append("-").append(key.topic()).append("-").append(key.partition()).toString();
                log().debug("Generated transactional id from partitioned source '{}'", sb);
                str = sb;
                return this.akka$kafka$internal$TransactionalProducerStageLogic$$stage.settings().withProperties((Seq<Tuple2<String, String>>) Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("enable.idempotence"), BoxesRunTime.boxToBoolean(true).toString()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("transactional.id"), str), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("max.in.flight.requests.per.connection"), BoxesRunTime.boxToInteger(1).toString())}));
            }
        }
        str = this.transactionalId;
        return this.akka$kafka$internal$TransactionalProducerStageLogic$$stage.settings().withProperties((Seq<Tuple2<String, String>>) Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("enable.idempotence"), BoxesRunTime.boxToBoolean(true).toString()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("transactional.id"), str), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("max.in.flight.requests.per.connection"), BoxesRunTime.boxToInteger(1).toString())}));
    }

    @Override // akka.kafka.internal.DefaultProducerStageLogic
    public void postSend(ProducerMessage.Envelope<K, V, P> envelope) {
        P passThrough = envelope.passThrough();
        if (!(passThrough instanceof ConsumerMessage.PartitionOffsetCommittedMarker)) {
            throw new MatchError(passThrough);
        }
        batchOffsets_$eq(batchOffsets().updated((ConsumerMessage.PartitionOffsetCommittedMarker) passThrough));
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    @Override // akka.kafka.internal.DefaultProducerStageLogic, akka.kafka.internal.ProducerStage.ProducerCompletionState
    public void onCompletionSuccess() {
        log().debug("Committing final transaction before shutdown");
        cancelTimer(commitSchedulerKey());
        maybeCommitTransaction(false, true);
        super.onCompletionSuccess();
    }

    @Override // akka.kafka.internal.DefaultProducerStageLogic, akka.kafka.internal.ProducerStage.ProducerCompletionState
    public void onCompletionFailure(Throwable th) {
        abortTransaction("Stage failure");
        batchOffsets().committingFailed();
        super.onCompletionFailure(th);
    }

    private void commitTransaction(TransactionalProducerStage.NonemptyTransactionBatch nonemptyTransactionBatch, boolean z) {
        String group = nonemptyTransactionBatch.group();
        log().debug("Committing transaction for transactional id '{}' consumer group '{}' with offsets: {}", this.transactionalId, group, nonemptyTransactionBatch.offsets());
        producer().sendOffsetsToTransaction((Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(nonemptyTransactionBatch.offsetMap()).asJava(), group);
        producer().commitTransaction();
        log().debug("Committed transaction for transactional id '{}' consumer group '{}' with offsets: {}", this.transactionalId, group, nonemptyTransactionBatch.offsets());
        batchOffsets_$eq(TransactionalProducerStage$TransactionBatch$.MODULE$.empty());
        nonemptyTransactionBatch.internalCommit().onComplete(r4 -> {
            $anonfun$commitTransaction$1(this, r4);
            return BoxedUnit.UNIT;
        }, materializer().executionContext());
        if (z) {
            beginTransaction();
            resumeDemand(resumeDemand$default$1());
        }
    }

    private AsyncCallback<BoxedUnit> onInternalCommitAckCb() {
        return this.onInternalCommitAckCb;
    }

    private void initTransactions() {
        log().debug("Initializing transactions");
        producer().initTransactions();
    }

    private void beginTransaction() {
        log().debug("Beginning new transaction");
        producer().beginTransaction();
    }

    private void abortTransaction(String str) {
        log().debug("Aborting transaction: {}", str);
        DeferredProducer.ProducerAssignmentLifecycle producerAssignmentLifecycle = producerAssignmentLifecycle();
        DeferredProducer$Assigned$ deferredProducer$Assigned$ = DeferredProducer$Assigned$.MODULE$;
        if (producerAssignmentLifecycle == null) {
            if (deferredProducer$Assigned$ != null) {
                return;
            }
        } else if (!producerAssignmentLifecycle.equals(deferredProducer$Assigned$)) {
            return;
        }
        producer().abortTransaction();
    }

    public static final /* synthetic */ void $anonfun$commitTransaction$1(TransactionalProducerStageLogic transactionalProducerStageLogic, Try r4) {
        transactionalProducerStageLogic.onInternalCommitAckCb().invoke(BoxedUnit.UNIT);
    }

    public static final /* synthetic */ void $anonfun$onInternalCommitAckCb$1(TransactionalProducerStageLogic transactionalProducerStageLogic, BoxedUnit boxedUnit) {
        transactionalProducerStageLogic.scheduleOnce(transactionalProducerStageLogic.commitSchedulerKey(), transactionalProducerStageLogic.producerSettings().eosCommitInterval());
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public TransactionalProducerStageLogic(TransactionalProducerStage<K, V, P> transactionalProducerStage, String str, Attributes attributes) {
        super(transactionalProducerStage, attributes);
        this.akka$kafka$internal$TransactionalProducerStageLogic$$stage = transactionalProducerStage;
        this.transactionalId = str;
        this.commitSchedulerKey = "commit";
        this.messageDrainInterval = new package.DurationInt(package$.MODULE$.DurationInt(10)).milliseconds();
        this.batchOffsets = TransactionalProducerStage$TransactionBatch$.MODULE$.empty();
        this.demandSuspended = false;
        this.firstMessage = None$.MODULE$;
        this.onInternalCommitAckCb = getAsyncCallback(boxedUnit -> {
            $anonfun$onInternalCommitAckCb$1(this, boxedUnit);
            return BoxedUnit.UNIT;
        });
    }
}
