package akka.kafka.internal;

import akka.Done$;
import akka.actor.ActorRef;
import akka.kafka.internal.TransactionalSourceLogic;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import scala.Function1;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableView$;
import scala.collection.TraversableOnce;
import scala.collection.compat.MapViewExtensionMethods$;
import scala.collection.compat.package$;
import scala.collection.immutable.Set;
import scala.runtime.AbstractPartialFunction;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: TransactionalSources.scala */
/* loaded from: input_file:akka/kafka/internal/TransactionalSourceLogic$$anonfun$drainHandling$1.class */
public final class TransactionalSourceLogic$$anonfun$drainHandling$1 extends AbstractPartialFunction<Tuple2<ActorRef, Object>, BoxedUnit> implements Serializable {
    public static final long serialVersionUID = 0;
    private final /* synthetic */ TransactionalSourceLogic $outer;

    public final <A1 extends Tuple2<ActorRef, Object>, B1> B1 applyOrElse(A1 a1, Function1<A1, B1> function1) {
        Object apply;
        BoxedUnit boxedUnit;
        if (a1 != null) {
            ActorRef actorRef = (ActorRef) a1._1();
            Object _2 = a1._2();
            if (_2 instanceof TransactionalSourceLogic.Committed) {
                this.$outer.akka$kafka$internal$TransactionalSourceLogic$$inFlightRecords().committed(((TraversableOnce) MapViewExtensionMethods$.MODULE$.mapValues$extension(package$.MODULE$.toMapViewExtensionMethods(((TransactionalSourceLogic.Committed) _2).offsets().view()), offsetAndMetadata -> {
                    return BoxesRunTime.boxToLong($anonfun$applyOrElse$1(offsetAndMetadata));
                }, IterableView$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()));
                actorRef.tell(Done$.MODULE$, this.$outer.sourceActor().ref());
                apply = BoxedUnit.UNIT;
                return (B1) apply;
            }
        }
        if (a1 != null) {
            if (TransactionalSourceLogic$CommittingFailure$.MODULE$.equals(a1._2())) {
                this.$outer.log().info("Committing failed, resetting in flight offsets");
                this.$outer.akka$kafka$internal$TransactionalSourceLogic$$inFlightRecords().reset();
                apply = BoxedUnit.UNIT;
                return (B1) apply;
            }
        }
        if (a1 != null) {
            final ActorRef actorRef2 = (ActorRef) a1._1();
            Object _22 = a1._2();
            if (_22 instanceof TransactionalSourceLogic.Drain) {
                TransactionalSourceLogic.Drain drain = (TransactionalSourceLogic.Drain) _22;
                final Set<TopicPartition> partitions = drain.partitions();
                final Option<ActorRef> drainedConfirmationRef = drain.drainedConfirmationRef();
                final Object drainedConfirmationMsg = drain.drainedConfirmationMsg();
                if (this.$outer.akka$kafka$internal$TransactionalSourceLogic$$inFlightRecords().empty(partitions)) {
                    this.$outer.log().debug(new StringBuilder(19).append("Partitions drained ").append(partitions.mkString(",")).toString());
                    ((ActorRef) drainedConfirmationRef.getOrElse(() -> {
                        return actorRef2;
                    })).tell(drainedConfirmationMsg, this.$outer.sourceActor().ref());
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    this.$outer.log().debug("Draining partitions {}", partitions);
                    this.$outer.materializer().scheduleOnce(this.$outer.akka$kafka$internal$TransactionalSourceLogic$$consumerSettings.drainingCheckInterval(), new Runnable(this, partitions, drainedConfirmationRef, actorRef2, drainedConfirmationMsg) { // from class: akka.kafka.internal.TransactionalSourceLogic$$anonfun$drainHandling$1$$anon$3
                        private final /* synthetic */ TransactionalSourceLogic$$anonfun$drainHandling$1 $outer;
                        private final Set partitions$1;
                        private final Option ack$1;
                        private final ActorRef sender$1;
                        private final Object msg$1;

                        @Override // java.lang.Runnable
                        public void run() {
                            this.$outer.akka$kafka$internal$TransactionalSourceLogic$$anonfun$$$outer().sourceActor().ref().tell(new TransactionalSourceLogic.Drain(this.partitions$1, this.ack$1.orElse(() -> {
                                return new Some(this.sender$1);
                            }), this.msg$1), this.$outer.akka$kafka$internal$TransactionalSourceLogic$$anonfun$$$outer().sourceActor().ref());
                        }

                        {
                            if (this == null) {
                                throw null;
                            }
                            this.$outer = this;
                            this.partitions$1 = partitions;
                            this.ack$1 = drainedConfirmationRef;
                            this.sender$1 = actorRef2;
                            this.msg$1 = drainedConfirmationMsg;
                        }
                    });
                    boxedUnit = BoxedUnit.UNIT;
                }
                apply = boxedUnit;
                return (B1) apply;
            }
        }
        apply = function1.apply(a1);
        return (B1) apply;
    }

    public final boolean isDefinedAt(Tuple2<ActorRef, Object> tuple2) {
        boolean z;
        if (tuple2 == null || !(tuple2._2() instanceof TransactionalSourceLogic.Committed)) {
            if (tuple2 != null) {
                if (TransactionalSourceLogic$CommittingFailure$.MODULE$.equals(tuple2._2())) {
                    z = true;
                }
            }
            z = tuple2 != null && (tuple2._2() instanceof TransactionalSourceLogic.Drain);
        } else {
            z = true;
        }
        return z;
    }

    public /* synthetic */ TransactionalSourceLogic akka$kafka$internal$TransactionalSourceLogic$$anonfun$$$outer() {
        return this.$outer;
    }

    public final /* bridge */ /* synthetic */ Object applyOrElse(Object obj, Function1 function1) {
        return applyOrElse((TransactionalSourceLogic$$anonfun$drainHandling$1) obj, (Function1<TransactionalSourceLogic$$anonfun$drainHandling$1, B1>) function1);
    }

    public static final /* synthetic */ long $anonfun$applyOrElse$1(OffsetAndMetadata offsetAndMetadata) {
        return offsetAndMetadata.offset() - 1;
    }

    public TransactionalSourceLogic$$anonfun$drainHandling$1(TransactionalSourceLogic transactionalSourceLogic) {
        if (transactionalSourceLogic == null) {
            throw null;
        }
        this.$outer = transactionalSourceLogic;
    }
}
