package akka.kafka.internal;

import akka.actor.ActorRef;
import akka.annotation.InternalApi;
import akka.kafka.AutoSubscription;
import akka.kafka.ConsumerSettings;
import akka.kafka.ManualSubscription;
import akka.kafka.Subscription;
import akka.kafka.Subscriptions;
import akka.kafka.internal.KafkaConsumerActor;
import akka.stream.ActorMaterializerHelper$;
import akka.stream.SourceShape;
import scala.MatchError;
import scala.Predef$;
import scala.StringContext;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: SingleSourceLogic.scala */
@ScalaSignature(bytes = "\u0006\u0001}4Q!\u0001\u0002\u0002\n%\u0011\u0011cU5oO2,7k\\;sG\u0016dunZ5d\u0015\t\u0019A!\u0001\u0005j]R,'O\\1m\u0015\t)a!A\u0003lC\u001a\\\u0017MC\u0001\b\u0003\u0011\t7n[1\u0004\u0001U!!\"\u0005\u0010\"'\t\u00011\u0002E\u0003\r\u001b=i\u0002%D\u0001\u0003\u0013\tq!AA\u000bCCN,7+\u001b8hY\u0016\u001cv.\u001e:dK2{w-[2\u0011\u0005A\tB\u0002\u0001\u0003\u0006%\u0001\u0011\ra\u0005\u0002\u0002\u0017F\u0011AC\u0007\t\u0003+ai\u0011A\u0006\u0006\u0002/\u0005)1oY1mC&\u0011\u0011D\u0006\u0002\b\u001d>$\b.\u001b8h!\t)2$\u0003\u0002\u001d-\t\u0019\u0011I\\=\u0011\u0005AqB!B\u0010\u0001\u0005\u0004\u0019\"!\u0001,\u0011\u0005A\tC!\u0002\u0012\u0001\u0005\u0004\u0019\"aA'tO\"IA\u0005\u0001B\u0001B\u0003%QeK\u0001\u0006g\"\f\u0007/\u001a\t\u0004M%\u0002S\"A\u0014\u000b\u0005!2\u0011AB:ue\u0016\fW.\u0003\u0002+O\tY1k\\;sG\u0016\u001c\u0006.\u00199f\u0013\t!S\u0002\u0003\u0005.\u0001\t\u0005\t\u0015!\u0003/\u0003!\u0019X\r\u001e;j]\u001e\u001c\b\u0003B\u00181\u001fui\u0011\u0001B\u0005\u0003c\u0011\u0011\u0001cQ8ogVlWM]*fiRLgnZ:\t\u0011M\u0002!\u0011!Q\u0001\nQ\nAb];cg\u000e\u0014\u0018\u000e\u001d;j_:\u0004\"aL\u001b\n\u0005Y\"!\u0001D*vEN\u001c'/\u001b9uS>t\u0007\"\u0002\u001d\u0001\t\u0003I\u0014A\u0002\u001fj]&$h\b\u0006\u0003;wqj\u0004#\u0002\u0007\u0001\u001fu\u0001\u0003\"\u0002\u00138\u0001\u0004)\u0003\"B\u00178\u0001\u0004q\u0003\"B\u001a8\u0001\u0004!\u0004\"B \u0001\t#\u0002\u0015!\u00037pON{WO]2f+\u0005\t\u0005G\u0001\"K!\r\u0019e)\u0013\b\u0003+\u0011K!!\u0012\f\u0002\rA\u0013X\rZ3g\u0013\t9\u0005JA\u0003DY\u0006\u001c8O\u0003\u0002F-A\u0011\u0001C\u0013\u0003\n\u0017z\n\t\u0011!A\u0003\u0002M\u00111a\u0018\u00132\u0011\u001di\u0005A1A\u0005\n9\u000bqbY8ogVlWM\u001d)s_6L7/Z\u000b\u0002\u001fB\u0019\u0001kU+\u000e\u0003ES!A\u0015\f\u0002\u0015\r|gnY;se\u0016tG/\u0003\u0002U#\n9\u0001K]8nSN,\u0007C\u0001,Z\u001b\u00059&B\u0001-\u0007\u0003\u0015\t7\r^8s\u0013\tQvK\u0001\u0005BGR|'OU3g\u0011\u0019a\u0006\u0001)A\u0005\u001f\u0006\u00012m\u001c8tk6,'\u000f\u0015:p[&\u001cX\r\t\u0005\b=\u0002\u0011\r\u0011\"\u0002`\u0003-\t7\r^8s\u001dVl'-\u001a:\u0016\u0003\u0001\u0004\"!F1\n\u0005\t4\"aA%oi\"1A\r\u0001Q\u0001\u000e\u0001\fA\"Y2u_JtU/\u001c2fe\u0002BQA\u001a\u0001\u0005\u0006\u001d\fabY8ogVlWM\u001d$viV\u0014X-F\u0001i!\r\u0001\u0016.V\u0005\u0003UF\u0013aAR;ukJ,\u0007\"\u00027\u0001\t\u000bi\u0017!F2p]\u001aLw-\u001e:f'V\u00147o\u0019:jaRLwN\u001c\u000b\u0002]B\u0011Qc\\\u0005\u0003aZ\u0011A!\u00168ji\")!\u000f\u0001C\u0003g\u0006\u00192M]3bi\u0016\u001cuN\\:v[\u0016\u0014\u0018i\u0019;peR\tQ\u000bC\u0003v\u0001\u0011\u0015S.\u0001\u0005q_N$8\u000b^8q\u0011\u00159\b\u0001\"\u0002n\u0003=\u0001XM\u001d4pe6\u001c\u0006.\u001e;e_^t\u0007F\u0001\u0001z!\tQX0D\u0001|\u0015\tah!\u0001\u0006b]:|G/\u0019;j_:L!A`>\u0003\u0017%sG/\u001a:oC2\f\u0005/\u001b")
@InternalApi
/* loaded from: input_file:akka/kafka/internal/SingleSourceLogic.class */
public abstract class SingleSourceLogic<K, V, Msg> extends BaseSingleSourceLogic<K, V, Msg> {
    private final ConsumerSettings<K, V> settings;
    private final Subscription subscription;
    private final Promise<ActorRef> consumerPromise;
    private final int actorNumber;

    @Override // akka.kafka.internal.BaseSingleSourceLogic
    public Class<?> logSource() {
        return SingleSourceLogic.class;
    }

    private Promise<ActorRef> consumerPromise() {
        return this.consumerPromise;
    }

    public final int actorNumber() {
        return this.actorNumber;
    }

    @Override // akka.kafka.internal.BaseSingleSourceLogic, akka.kafka.internal.MetricsControl
    public final Future<ActorRef> consumerFuture() {
        return consumerPromise().future();
    }

    @Override // akka.kafka.internal.BaseSingleSourceLogic
    public final void configureSubscription() {
        Subscription subscription = this.subscription;
        if (subscription instanceof Subscriptions.TopicSubscription) {
            Subscriptions.TopicSubscription topicSubscription = (Subscriptions.TopicSubscription) subscription;
            consumerActor().tell(new KafkaConsumerActor$Internal$Subscribe(topicSubscription.tps(), rebalanceListener$1(topicSubscription)), sourceActor().ref());
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        if (subscription instanceof Subscriptions.TopicSubscriptionPattern) {
            Subscriptions.TopicSubscriptionPattern topicSubscriptionPattern = (Subscriptions.TopicSubscriptionPattern) subscription;
            consumerActor().tell(new KafkaConsumerActor$Internal$SubscribePattern(topicSubscriptionPattern.pattern(), rebalanceListener$1(topicSubscriptionPattern)), sourceActor().ref());
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            return;
        }
        if (!(subscription instanceof ManualSubscription)) {
            throw new MatchError(subscription);
        }
        configureManualSubscription((ManualSubscription) subscription);
        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
    }

    @Override // akka.kafka.internal.BaseSingleSourceLogic
    public final ActorRef createConsumerActor() {
        ActorRef systemActorOf = ActorMaterializerHelper$.MODULE$.downcast(materializer()).system().systemActorOf(akka.kafka.KafkaConsumerActor$.MODULE$.props(sourceActor().ref(), this.settings), new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"kafka-consumer-", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(actorNumber())})));
        consumerPromise().success(systemActorOf);
        return systemActorOf;
    }

    @Override // akka.kafka.internal.BaseSingleSourceLogic
    public final void postStop() {
        consumerActor().tell(KafkaConsumerActor$Internal$.MODULE$.Stop(), sourceActor().ref());
        super.postStop();
    }

    @Override // akka.kafka.internal.BaseSingleSourceLogic, akka.kafka.internal.PromiseControl
    public final void performShutdown() {
        setKeepGoing(true);
        if (!isClosed(super.shape().out())) {
            complete(super.shape().out());
        }
        sourceActor().become(new SingleSourceLogic$$anonfun$performShutdown$1(this));
        materializer().scheduleOnce(this.settings.stopTimeout(), new Runnable(this) { // from class: akka.kafka.internal.SingleSourceLogic$$anon$1
            private final /* synthetic */ SingleSourceLogic $outer;

            @Override // java.lang.Runnable
            public void run() {
                this.$outer.consumerActor().tell(KafkaConsumerActor$Internal$.MODULE$.Stop(), this.$outer.sourceActor().ref());
            }

            {
                if (this == 0) {
                    throw null;
                }
                this.$outer = this;
            }
        });
    }

    private final KafkaConsumerActor.ListenerCallbacks rebalanceListener$1(AutoSubscription autoSubscription) {
        return new KafkaConsumerActor.ListenerCallbacks(new SingleSourceLogic$$anonfun$rebalanceListener$1$1(this, autoSubscription, getAsyncCallback(new SingleSourceLogic$$anonfun$1(this))), new SingleSourceLogic$$anonfun$rebalanceListener$1$2(this, autoSubscription, getAsyncCallback(new SingleSourceLogic$$anonfun$2(this))));
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public SingleSourceLogic(SourceShape<Msg> sourceShape, ConsumerSettings<K, V> consumerSettings, Subscription subscription) {
        super(sourceShape);
        this.settings = consumerSettings;
        this.subscription = subscription;
        this.consumerPromise = Promise$.MODULE$.apply();
        this.actorNumber = KafkaConsumerActor$Internal$.MODULE$.nextNumber();
    }
}
