package akka.kafka.internal;

import akka.Done;
import akka.actor.ActorRef;
import akka.annotation.InternalApi;
import akka.event.LoggingAdapter;
import akka.kafka.ManualSubscription;
import akka.kafka.Subscriptions;
import akka.kafka.internal.MetricsControl;
import akka.kafka.internal.PromiseControl;
import akka.kafka.scaladsl.Consumer;
import akka.stream.SourceShape;
import akka.stream.stage.AsyncCallback;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.OutHandler;
import akka.stream.stage.StageLogging;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import scala.MatchError;
import scala.Predef$;
import scala.collection.Iterator;
import scala.collection.immutable.Map;
import scala.collection.immutable.Set;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: BaseSingleSourceLogic.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\rh!B\u0001\u0003\u0003\u0013I!!\u0006\"bg\u0016\u001c\u0016N\\4mKN{WO]2f\u0019><\u0017n\u0019\u0006\u0003\u0007\u0011\t\u0001\"\u001b8uKJt\u0017\r\u001c\u0006\u0003\u000b\u0019\tQa[1gW\u0006T\u0011aB\u0001\u0005C.\\\u0017m\u0001\u0001\u0016\t)\u0011sFM\n\u0007\u0001-\u0019rCG\u000f\u0011\u00051\tR\"A\u0007\u000b\u00059y\u0011!B:uC\u001e,'B\u0001\t\u0007\u0003\u0019\u0019HO]3b[&\u0011!#\u0004\u0002\u0010\u000fJ\f\u0007\u000f[*uC\u001e,Gj\\4jGB\u0011A#F\u0007\u0002\u0005%\u0011aC\u0001\u0002\u000f!J|W.[:f\u0007>tGO]8m!\t!\u0002$\u0003\u0002\u001a\u0005\tqQ*\u001a;sS\u000e\u001c8i\u001c8ue>d\u0007C\u0001\u0007\u001c\u0013\taRB\u0001\u0007Ti\u0006<W\rT8hO&tw\rE\u0003\u0015=\u0001r\u0013'\u0003\u0002 \u0005\tqQ*Z:tC\u001e,')^5mI\u0016\u0014\bCA\u0011#\u0019\u0001!Qa\t\u0001C\u0002\u0011\u0012\u0011aS\t\u0003K-\u0002\"AJ\u0015\u000e\u0003\u001dR\u0011\u0001K\u0001\u0006g\u000e\fG.Y\u0005\u0003U\u001d\u0012qAT8uQ&tw\r\u0005\u0002'Y%\u0011Qf\n\u0002\u0004\u0003:L\bCA\u00110\t\u0015\u0001\u0004A1\u0001%\u0005\u00051\u0006CA\u00113\t\u0015\u0019\u0004A1\u0001%\u0005\ri5o\u001a\u0005\tk\u0001\u0011)\u0019!C\u0001m\u0005)1\u000f[1qKV\tq\u0007E\u00029sEj\u0011aD\u0005\u0003u=\u00111bU8ve\u000e,7\u000b[1qK\"AA\b\u0001B\u0001B\u0003%q'\u0001\u0004tQ\u0006\u0004X\r\t\u0005\u0006}\u0001!\taP\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0005\u0001\u000b\u0005#\u0002\u000b\u0001A9\n\u0004\"B\u001b>\u0001\u00049\u0004\"B\"\u0001\t#\"\u0015\u0001E3yK\u000e,H/[8o\u0007>tG/\u001a=u+\u0005)\u0005C\u0001$J\u001b\u00059%B\u0001%(\u0003)\u0019wN\\2veJ,g\u000e^\u0005\u0003\u0015\u001e\u0013\u0001#\u0012=fGV$\u0018n\u001c8D_:$X\r\u001f;\t\u000b1\u0003a\u0011C'\u0002\u001d\r|gn];nKJ4U\u000f^;sKV\ta\nE\u0002G\u001fFK!\u0001U$\u0003\r\u0019+H/\u001e:f!\t\u0011V+D\u0001T\u0015\t!f!A\u0003bGR|'/\u0003\u0002W'\nA\u0011i\u0019;peJ+g\rC\u0005Y\u0001\u0001\u0007\t\u0019!C\u000b3\u0006i1m\u001c8tk6,'/Q2u_J,\u0012!\u0015\u0005\n7\u0002\u0001\r\u00111A\u0005\u0016q\u000b\u0011cY8ogVlWM]!di>\u0014x\fJ3r)\ti\u0006\r\u0005\u0002'=&\u0011ql\n\u0002\u0005+:LG\u000fC\u0004b5\u0006\u0005\t\u0019A)\u0002\u0007a$\u0013\u0007\u0003\u0004d\u0001\u0001\u0006k!U\u0001\u000fG>t7/^7fe\u0006\u001bGo\u001c:!\u0011%)\u0007\u00011AA\u0002\u0013Ea-A\u0006t_V\u00148-Z!di>\u0014X#A4\u0011\u0005!$hBA5s\u001d\tQ\u0017O\u0004\u0002la:\u0011An\\\u0007\u0002[*\u0011a\u000eC\u0001\u0007yI|w\u000e\u001e \n\u0003\u001dI!\u0001\u0005\u0004\n\u00059y\u0011BA:\u000e\u0003=9%/\u00199i'R\fw-\u001a'pO&\u001c\u0017BA;w\u0005)\u0019F/Y4f\u0003\u000e$xN\u001d\u0006\u0003g6A\u0011\u0002\u001f\u0001A\u0002\u0003\u0007I\u0011C=\u0002\u001fM|WO]2f\u0003\u000e$xN]0%KF$\"!\u0018>\t\u000f\u0005<\u0018\u0011!a\u0001O\"1A\u0010\u0001Q!\n\u001d\fAb]8ve\u000e,\u0017i\u0019;pe\u0002BqA \u0001A\u0002\u0013Eq0A\u0002uaN,\"!!\u0001\u0011\r\u0005\r\u0011QBA\t\u001b\t\t)A\u0003\u0003\u0002\b\u0005%\u0011!C5n[V$\u0018M\u00197f\u0015\r\tYaJ\u0001\u000bG>dG.Z2uS>t\u0017\u0002BA\b\u0003\u000b\u00111aU3u!\u0011\t\u0019\"a\t\u000e\u0005\u0005U!\u0002BA\f\u00033\taaY8n[>t'bA\u0003\u0002\u001c)!\u0011QDA\u0010\u0003\u0019\t\u0007/Y2iK*\u0011\u0011\u0011E\u0001\u0004_J<\u0017\u0002BA\u0013\u0003+\u0011a\u0002V8qS\u000e\u0004\u0016M\u001d;ji&|g\u000eC\u0005\u0002*\u0001\u0001\r\u0011\"\u0005\u0002,\u00059A\u000f]:`I\u0015\fHcA/\u0002.!I\u0011-a\n\u0002\u0002\u0003\u0007\u0011\u0011\u0001\u0005\t\u0003c\u0001\u0001\u0015)\u0003\u0002\u0002\u0005!A\u000f]:!\u0011%\t)\u0004\u0001a\u0001\n\u0013\t9$\u0001\u0004ck\u001a4WM]\u000b\u0003\u0003s\u0001b!a\u000f\u0002F\u0005-c\u0002BA\u001f\u0003\u0003r1\u0001\\A \u0013\u0005A\u0013bAA\"O\u00059\u0001/Y2lC\u001e,\u0017\u0002BA$\u0003\u0013\u0012\u0001\"\u0013;fe\u0006$xN\u001d\u0006\u0004\u0003\u0007:\u0003CBA'\u0003/\u0002c&\u0004\u0002\u0002P)!\u0011\u0011KA*\u0003!\u0019wN\\:v[\u0016\u0014(\u0002BA+\u00033\tqa\u00197jK:$8/\u0003\u0003\u0002Z\u0005=#AD\"p]N,X.\u001a:SK\u000e|'\u000f\u001a\u0005\n\u0003;\u0002\u0001\u0019!C\u0005\u0003?\n!BY;gM\u0016\u0014x\fJ3r)\ri\u0016\u0011\r\u0005\nC\u0006m\u0013\u0011!a\u0001\u0003sA\u0001\"!\u001a\u0001A\u0003&\u0011\u0011H\u0001\bEV4g-\u001a:!\u0011%\tI\u0007\u0001a\u0001\n\u0013\tY'A\u0005sKF,Xm\u001d;fIV\u0011\u0011Q\u000e\t\u0004M\u0005=\u0014bAA9O\t9!i\\8mK\u0006t\u0007\"CA;\u0001\u0001\u0007I\u0011BA<\u00035\u0011X-];fgR,Gm\u0018\u0013fcR\u0019Q,!\u001f\t\u0013\u0005\f\u0019(!AA\u0002\u00055\u0004\u0002CA?\u0001\u0001\u0006K!!\u001c\u0002\u0015I,\u0017/^3ti\u0016$\u0007\u0005C\u0005\u0002\u0002\u0002\u0001\r\u0011\"\u0003\u0002\u0004\u0006I!/Z9vKN$\u0018\nZ\u000b\u0003\u0003\u000b\u00032AJAD\u0013\r\tIi\n\u0002\u0004\u0013:$\b\"CAG\u0001\u0001\u0007I\u0011BAH\u00035\u0011X-];fgRLEm\u0018\u0013fcR\u0019Q,!%\t\u0013\u0005\fY)!AA\u0002\u0005\u0015\u0005\u0002CAK\u0001\u0001\u0006K!!\"\u0002\u0015I,\u0017/^3ti&#\u0007\u0005C\u0004\u0002\u001a\u0002!\t%a'\u0002\u0011A\u0014Xm\u0015;beR$\u0012!\u0018\u0005\b\u0003?\u0003a\u0011CAQ\u0003M\u0019'/Z1uK\u000e{gn];nKJ\f5\r^8s)\u0005\t\u0006bBAS\u0001\u0019E\u00111T\u0001\u0016G>tg-[4ve\u0016\u001cVOY:de&\u0004H/[8o\u0011\u001d\tI\u000b\u0001C\t\u0003W\u000b1dY8oM&<WO]3NC:,\u0018\r\\*vEN\u001c'/\u001b9uS>tGcA/\u0002.\"A\u0011qVAT\u0001\u0004\t\t,\u0001\u0007tk\n\u001c8M]5qi&|g\u000e\u0005\u0003\u00024\u0006UV\"\u0001\u0003\n\u0007\u0005]FA\u0001\nNC:,\u0018\r\\*vEN\u001c'/\u001b9uS>t\u0007bBA^\u0001\u0011%\u00111T\u0001\u0005aVl\u0007\u000f\u000b\u0003\u0002:\u0006}\u0006\u0003BAa\u0003\u000fl!!a1\u000b\u0007\u0005\u0015w%\u0001\u0006b]:|G/\u0019;j_:LA!!3\u0002D\n9A/Y5me\u0016\u001c\u0007bBAg\u0001\u0011E\u00111T\u0001\u0010e\u0016\fX/Z:u\u001b\u0016\u001c8/Y4fg\"9\u0011\u0011\u001b\u0001\u0005B\u0005m\u0015\u0001\u00039pgR\u001cFo\u001c9\t\u000f\u0005U\u0007A\"\u0001\u0002\u001c\u0006y\u0001/\u001a:g_Jl7\u000b[;uI><h\u000eK\u0002\u0001\u00033\u0004B!a7\u0002`6\u0011\u0011Q\u001c\u0006\u0004\u0003\u000b4\u0011\u0002BAq\u0003;\u00141\"\u00138uKJt\u0017\r\\!qS\u0002")
@InternalApi
/* loaded from: input_file:akka/kafka/internal/BaseSingleSourceLogic.class */
public abstract class BaseSingleSourceLogic<K, V, Msg> extends GraphStageLogic implements PromiseControl, MetricsControl, StageLogging, MessageBuilder<K, V, Msg> {
    private final SourceShape<Msg> shape;
    private ActorRef consumerActor;
    private GraphStageLogic.StageActor sourceActor;
    private Set<TopicPartition> tps;
    private Iterator<ConsumerRecord<K, V>> akka$kafka$internal$BaseSingleSourceLogic$$buffer;
    private boolean akka$kafka$internal$BaseSingleSourceLogic$$requested;
    private int akka$kafka$internal$BaseSingleSourceLogic$$requestId;
    private LoggingAdapter akka$stream$stage$StageLogging$$_log;
    private final Promise<Done> akka$kafka$internal$PromiseControl$$shutdownPromise;
    private final Promise<Done> akka$kafka$internal$PromiseControl$$stopPromise;
    private final AsyncCallback<PromiseControl.ControlOperation> akka$kafka$internal$PromiseControl$$controlCallback;

    public LoggingAdapter akka$stream$stage$StageLogging$$_log() {
        return this.akka$stream$stage$StageLogging$$_log;
    }

    public void akka$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.akka$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    public Class<?> logSource() {
        return StageLogging.class.logSource(this);
    }

    public LoggingAdapter log() {
        return StageLogging.class.log(this);
    }

    @Override // akka.kafka.scaladsl.Consumer.Control, akka.kafka.internal.MetricsControl
    public Future<Map<MetricName, Metric>> metrics() {
        return MetricsControl.Cclass.metrics(this);
    }

    @Override // akka.kafka.internal.PromiseControl
    public Promise<Done> akka$kafka$internal$PromiseControl$$shutdownPromise() {
        return this.akka$kafka$internal$PromiseControl$$shutdownPromise;
    }

    @Override // akka.kafka.internal.PromiseControl
    public Promise<Done> akka$kafka$internal$PromiseControl$$stopPromise() {
        return this.akka$kafka$internal$PromiseControl$$stopPromise;
    }

    @Override // akka.kafka.internal.PromiseControl
    public AsyncCallback<PromiseControl.ControlOperation> akka$kafka$internal$PromiseControl$$controlCallback() {
        return this.akka$kafka$internal$PromiseControl$$controlCallback;
    }

    @Override // akka.kafka.internal.PromiseControl
    public /* synthetic */ void akka$kafka$internal$PromiseControl$$super$setKeepGoing(boolean z) {
        super.setKeepGoing(z);
    }

    @Override // akka.kafka.internal.PromiseControl
    public void akka$kafka$internal$PromiseControl$_setter_$akka$kafka$internal$PromiseControl$$shutdownPromise_$eq(Promise promise) {
        this.akka$kafka$internal$PromiseControl$$shutdownPromise = promise;
    }

    @Override // akka.kafka.internal.PromiseControl
    public void akka$kafka$internal$PromiseControl$_setter_$akka$kafka$internal$PromiseControl$$stopPromise_$eq(Promise promise) {
        this.akka$kafka$internal$PromiseControl$$stopPromise = promise;
    }

    @Override // akka.kafka.internal.PromiseControl
    public void akka$kafka$internal$PromiseControl$_setter_$akka$kafka$internal$PromiseControl$$controlCallback_$eq(AsyncCallback asyncCallback) {
        this.akka$kafka$internal$PromiseControl$$controlCallback = asyncCallback;
    }

    @Override // akka.kafka.internal.PromiseControl
    public void performStop() {
        PromiseControl.Cclass.performStop(this);
    }

    @Override // akka.kafka.internal.PromiseControl
    public boolean onStop() {
        return PromiseControl.Cclass.onStop(this);
    }

    @Override // akka.kafka.internal.PromiseControl
    public boolean onShutdown() {
        return PromiseControl.Cclass.onShutdown(this);
    }

    @Override // akka.kafka.internal.PromiseControl, akka.kafka.scaladsl.Consumer.Control
    public Future<Done> stop() {
        return PromiseControl.Cclass.stop(this);
    }

    @Override // akka.kafka.internal.PromiseControl, akka.kafka.scaladsl.Consumer.Control
    public Future<Done> shutdown() {
        return PromiseControl.Cclass.shutdown(this);
    }

    @Override // akka.kafka.internal.PromiseControl, akka.kafka.scaladsl.Consumer.Control
    public Future<Done> isShutdown() {
        return PromiseControl.Cclass.isShutdown(this);
    }

    @Override // akka.kafka.scaladsl.Consumer.Control
    public <S> Future<S> drainAndShutdown(Future<S> future, ExecutionContext executionContext) {
        return Consumer.Control.Cclass.drainAndShutdown(this, future, executionContext);
    }

    @Override // akka.kafka.internal.PromiseControl
    public SourceShape<Msg> shape() {
        return this.shape;
    }

    @Override // akka.kafka.internal.MetricsControl
    public ExecutionContext executionContext() {
        return materializer().executionContext();
    }

    @Override // akka.kafka.internal.MetricsControl
    public abstract Future<ActorRef> consumerFuture();

    public final ActorRef consumerActor() {
        return this.consumerActor;
    }

    public final void consumerActor_$eq(ActorRef actorRef) {
        this.consumerActor = actorRef;
    }

    public GraphStageLogic.StageActor sourceActor() {
        return this.sourceActor;
    }

    public void sourceActor_$eq(GraphStageLogic.StageActor stageActor) {
        this.sourceActor = stageActor;
    }

    public Set<TopicPartition> tps() {
        return this.tps;
    }

    public void tps_$eq(Set<TopicPartition> set) {
        this.tps = set;
    }

    public Iterator<ConsumerRecord<K, V>> akka$kafka$internal$BaseSingleSourceLogic$$buffer() {
        return this.akka$kafka$internal$BaseSingleSourceLogic$$buffer;
    }

    public void akka$kafka$internal$BaseSingleSourceLogic$$buffer_$eq(Iterator<ConsumerRecord<K, V>> iterator) {
        this.akka$kafka$internal$BaseSingleSourceLogic$$buffer = iterator;
    }

    private boolean akka$kafka$internal$BaseSingleSourceLogic$$requested() {
        return this.akka$kafka$internal$BaseSingleSourceLogic$$requested;
    }

    public void akka$kafka$internal$BaseSingleSourceLogic$$requested_$eq(boolean z) {
        this.akka$kafka$internal$BaseSingleSourceLogic$$requested = z;
    }

    public int akka$kafka$internal$BaseSingleSourceLogic$$requestId() {
        return this.akka$kafka$internal$BaseSingleSourceLogic$$requestId;
    }

    private void akka$kafka$internal$BaseSingleSourceLogic$$requestId_$eq(int i) {
        this.akka$kafka$internal$BaseSingleSourceLogic$$requestId = i;
    }

    public void preStart() {
        super.preStart();
        sourceActor_$eq(getStageActor(new BaseSingleSourceLogic$$anonfun$preStart$1(this)));
        consumerActor_$eq(createConsumerActor());
        sourceActor().watch(consumerActor());
        configureSubscription();
    }

    public abstract ActorRef createConsumerActor();

    public abstract void configureSubscription();

    public void configureManualSubscription(ManualSubscription manualSubscription) {
        if (manualSubscription instanceof Subscriptions.Assignment) {
            Set<TopicPartition> tps = ((Subscriptions.Assignment) manualSubscription).tps();
            consumerActor().tell(new KafkaConsumerActor$Internal$Assign(tps), sourceActor().ref());
            tps_$eq((Set) tps().$plus$plus(tps));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        if (manualSubscription instanceof Subscriptions.AssignmentWithOffset) {
            Map<TopicPartition, Object> tps2 = ((Subscriptions.AssignmentWithOffset) manualSubscription).tps();
            consumerActor().tell(new KafkaConsumerActor$Internal$AssignWithOffset(tps2), sourceActor().ref());
            tps_$eq((Set) tps().$plus$plus(tps2.keySet()));
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            return;
        }
        if (!(manualSubscription instanceof Subscriptions.AssignmentOffsetsForTimes)) {
            throw new MatchError(manualSubscription);
        }
        Map<TopicPartition, Object> timestampsToSearch = ((Subscriptions.AssignmentOffsetsForTimes) manualSubscription).timestampsToSearch();
        consumerActor().tell(new KafkaConsumerActor$Internal$AssignOffsetsForTimes(timestampsToSearch), sourceActor().ref());
        tps_$eq((Set) tps().$plus$plus(timestampsToSearch.keySet()));
        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
    }

    public void akka$kafka$internal$BaseSingleSourceLogic$$pump() {
        while (isAvailable(shape().out())) {
            if (!akka$kafka$internal$BaseSingleSourceLogic$$buffer().hasNext()) {
                if (akka$kafka$internal$BaseSingleSourceLogic$$requested() || !tps().nonEmpty()) {
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    return;
                } else {
                    requestMessages();
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                    return;
                }
            }
            push(shape().out(), createMessage((ConsumerRecord) akka$kafka$internal$BaseSingleSourceLogic$$buffer().next()));
        }
        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
    }

    public void requestMessages() {
        akka$kafka$internal$BaseSingleSourceLogic$$requested_$eq(true);
        akka$kafka$internal$BaseSingleSourceLogic$$requestId_$eq(akka$kafka$internal$BaseSingleSourceLogic$$requestId() + 1);
        log().debug("Requesting messages, requestId: {}, partitions: {}", BoxesRunTime.boxToInteger(akka$kafka$internal$BaseSingleSourceLogic$$requestId()), tps());
        consumerActor().tell(new KafkaConsumerActor$Internal$RequestMessages(akka$kafka$internal$BaseSingleSourceLogic$$requestId(), tps()), sourceActor().ref());
    }

    public void postStop() {
        onShutdown();
        super.postStop();
    }

    @Override // akka.kafka.internal.PromiseControl
    public abstract void performShutdown();

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public BaseSingleSourceLogic(SourceShape<Msg> sourceShape) {
        super(sourceShape);
        this.shape = sourceShape;
        Consumer.Control.Cclass.$init$(this);
        PromiseControl.Cclass.$init$(this);
        MetricsControl.Cclass.$init$(this);
        StageLogging.class.$init$(this);
        this.tps = Predef$.MODULE$.Set().empty();
        this.akka$kafka$internal$BaseSingleSourceLogic$$buffer = package$.MODULE$.Iterator().empty();
        this.akka$kafka$internal$BaseSingleSourceLogic$$requested = false;
        this.akka$kafka$internal$BaseSingleSourceLogic$$requestId = 0;
        setHandler(sourceShape.out(), new OutHandler(this) { // from class: akka.kafka.internal.BaseSingleSourceLogic$$anon$1
            private final /* synthetic */ BaseSingleSourceLogic $outer;

            public void onPull() {
                this.$outer.akka$kafka$internal$BaseSingleSourceLogic$$pump();
            }

            public void onDownstreamFinish() {
                this.$outer.performShutdown();
            }

            {
                if (this == 0) {
                    throw null;
                }
                this.$outer = this;
                OutHandler.class.$init$(this);
            }
        });
    }
}
