package akka.kafka.internal;

import akka.Done;
import akka.actor.ActorRef;
import akka.event.LoggingAdapter;
import akka.kafka.internal.MetricsControl;
import akka.kafka.internal.PromiseControl;
import akka.kafka.scaladsl.Consumer;
import akka.stream.SourceShape;
import akka.stream.stage.AsyncCallback;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.OutHandler;
import akka.stream.stage.StageLogging;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import scala.None$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.collection.Iterator;
import scala.collection.immutable.Map;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.Promise;
import scala.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: SubSourceLogic.scala */
/* loaded from: input_file:akka/kafka/internal/SubSourceStage$$anon$1.class */
public final class SubSourceStage$$anon$1 extends GraphStageLogic implements PromiseControl, MetricsControl, StageLogging {
    private final SourceShape<Msg> shape;
    private final KafkaConsumerActor$Internal$RequestMessages requestMessages;
    private boolean akka$kafka$internal$SubSourceStage$$anon$$requested;
    private GraphStageLogic.StageActor subSourceActor;
    private Iterator<ConsumerRecord<K, V>> akka$kafka$internal$SubSourceStage$$anon$$buffer;
    private final /* synthetic */ SubSourceStage $outer;
    private LoggingAdapter akka$stream$stage$StageLogging$$_log;
    private final Promise<Done> akka$kafka$internal$PromiseControl$$shutdownPromise;
    private final Promise<Done> akka$kafka$internal$PromiseControl$$stopPromise;
    private final AsyncCallback<PromiseControl.ControlOperation> akka$kafka$internal$PromiseControl$$controlCallback;

    public LoggingAdapter akka$stream$stage$StageLogging$$_log() {
        return this.akka$stream$stage$StageLogging$$_log;
    }

    public void akka$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.akka$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    public Class<?> logSource() {
        return StageLogging.class.logSource(this);
    }

    public LoggingAdapter log() {
        return StageLogging.class.log(this);
    }

    @Override // akka.kafka.scaladsl.Consumer.Control, akka.kafka.internal.MetricsControl
    public Future<Map<MetricName, Metric>> metrics() {
        return MetricsControl.Cclass.metrics(this);
    }

    @Override // akka.kafka.internal.PromiseControl
    public Promise<Done> akka$kafka$internal$PromiseControl$$shutdownPromise() {
        return this.akka$kafka$internal$PromiseControl$$shutdownPromise;
    }

    @Override // akka.kafka.internal.PromiseControl
    public Promise<Done> akka$kafka$internal$PromiseControl$$stopPromise() {
        return this.akka$kafka$internal$PromiseControl$$stopPromise;
    }

    @Override // akka.kafka.internal.PromiseControl
    public AsyncCallback<PromiseControl.ControlOperation> akka$kafka$internal$PromiseControl$$controlCallback() {
        return this.akka$kafka$internal$PromiseControl$$controlCallback;
    }

    @Override // akka.kafka.internal.PromiseControl
    public /* synthetic */ void akka$kafka$internal$PromiseControl$$super$setKeepGoing(boolean z) {
        super.setKeepGoing(z);
    }

    @Override // akka.kafka.internal.PromiseControl
    public void akka$kafka$internal$PromiseControl$_setter_$akka$kafka$internal$PromiseControl$$shutdownPromise_$eq(Promise promise) {
        this.akka$kafka$internal$PromiseControl$$shutdownPromise = promise;
    }

    @Override // akka.kafka.internal.PromiseControl
    public void akka$kafka$internal$PromiseControl$_setter_$akka$kafka$internal$PromiseControl$$stopPromise_$eq(Promise promise) {
        this.akka$kafka$internal$PromiseControl$$stopPromise = promise;
    }

    @Override // akka.kafka.internal.PromiseControl
    public void akka$kafka$internal$PromiseControl$_setter_$akka$kafka$internal$PromiseControl$$controlCallback_$eq(AsyncCallback asyncCallback) {
        this.akka$kafka$internal$PromiseControl$$controlCallback = asyncCallback;
    }

    @Override // akka.kafka.internal.PromiseControl
    public void performStop() {
        PromiseControl.Cclass.performStop(this);
    }

    @Override // akka.kafka.internal.PromiseControl
    public boolean onStop() {
        return PromiseControl.Cclass.onStop(this);
    }

    @Override // akka.kafka.internal.PromiseControl
    public boolean onShutdown() {
        return PromiseControl.Cclass.onShutdown(this);
    }

    @Override // akka.kafka.internal.PromiseControl, akka.kafka.scaladsl.Consumer.Control
    public Future<Done> stop() {
        return PromiseControl.Cclass.stop(this);
    }

    @Override // akka.kafka.internal.PromiseControl, akka.kafka.scaladsl.Consumer.Control
    public Future<Done> shutdown() {
        return PromiseControl.Cclass.shutdown(this);
    }

    @Override // akka.kafka.internal.PromiseControl, akka.kafka.scaladsl.Consumer.Control
    public Future<Done> isShutdown() {
        return PromiseControl.Cclass.isShutdown(this);
    }

    @Override // akka.kafka.scaladsl.Consumer.Control
    public <S> Future<S> drainAndShutdown(Future<S> future, ExecutionContext executionContext) {
        return Consumer.Control.Cclass.drainAndShutdown(this, future, executionContext);
    }

    @Override // akka.kafka.internal.MetricsControl
    public ExecutionContext executionContext() {
        return materializer().executionContext();
    }

    @Override // akka.kafka.internal.MetricsControl
    public Future<ActorRef> consumerFuture() {
        return Future$.MODULE$.successful(this.$outer.akka$kafka$internal$SubSourceStage$$consumerActor);
    }

    @Override // akka.kafka.internal.PromiseControl
    public SourceShape<Msg> shape() {
        return this.shape;
    }

    private KafkaConsumerActor$Internal$RequestMessages requestMessages() {
        return this.requestMessages;
    }

    private boolean akka$kafka$internal$SubSourceStage$$anon$$requested() {
        return this.akka$kafka$internal$SubSourceStage$$anon$$requested;
    }

    public void akka$kafka$internal$SubSourceStage$$anon$$requested_$eq(boolean z) {
        this.akka$kafka$internal$SubSourceStage$$anon$$requested = z;
    }

    private GraphStageLogic.StageActor subSourceActor() {
        return this.subSourceActor;
    }

    private void subSourceActor_$eq(GraphStageLogic.StageActor stageActor) {
        this.subSourceActor = stageActor;
    }

    public Iterator<ConsumerRecord<K, V>> akka$kafka$internal$SubSourceStage$$anon$$buffer() {
        return this.akka$kafka$internal$SubSourceStage$$anon$$buffer;
    }

    public void akka$kafka$internal$SubSourceStage$$anon$$buffer_$eq(Iterator<ConsumerRecord<K, V>> iterator) {
        this.akka$kafka$internal$SubSourceStage$$anon$$buffer = iterator;
    }

    public void preStart() {
        log().debug("#{} Starting SubSource for partition {}", BoxesRunTime.boxToInteger(this.$outer.akka$kafka$internal$SubSourceStage$$actorNumber), this.$outer.akka$kafka$internal$SubSourceStage$$tp);
        super.preStart();
        this.$outer.akka$kafka$internal$SubSourceStage$$subSourceStartedCb.invoke(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(this.$outer.akka$kafka$internal$SubSourceStage$$tp), this));
        subSourceActor_$eq(getStageActor(new SubSourceStage$$anon$1$$anonfun$preStart$2(this)));
        subSourceActor().watch(this.$outer.akka$kafka$internal$SubSourceStage$$consumerActor);
    }

    public void postStop() {
        onShutdown();
        super.postStop();
    }

    @Override // akka.kafka.internal.PromiseControl
    public void performShutdown() {
        log().debug("#{} Completing SubSource for partition {}", BoxesRunTime.boxToInteger(this.$outer.akka$kafka$internal$SubSourceStage$$actorNumber), this.$outer.akka$kafka$internal$SubSourceStage$$tp);
        completeStage();
    }

    public void akka$kafka$internal$SubSourceStage$$anon$$pump() {
        while (isAvailable(this.$outer.out())) {
            if (!akka$kafka$internal$SubSourceStage$$anon$$buffer().hasNext()) {
                if (akka$kafka$internal$SubSourceStage$$anon$$requested()) {
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    return;
                }
                akka$kafka$internal$SubSourceStage$$anon$$requested_$eq(true);
                this.$outer.akka$kafka$internal$SubSourceStage$$consumerActor.tell(requestMessages(), subSourceActor().ref());
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                return;
            }
            push(this.$outer.out(), this.$outer.akka$kafka$internal$SubSourceStage$$messageBuilder.createMessage((ConsumerRecord) akka$kafka$internal$SubSourceStage$$anon$$buffer().next()));
        }
        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
    }

    public /* synthetic */ SubSourceStage akka$kafka$internal$SubSourceStage$$anon$$$outer() {
        return this.$outer;
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public SubSourceStage$$anon$1(SubSourceStage<K, V, Msg> subSourceStage) {
        super(subSourceStage.m111shape());
        if (subSourceStage == 0) {
            throw null;
        }
        this.$outer = subSourceStage;
        Consumer.Control.Cclass.$init$(this);
        PromiseControl.Cclass.$init$(this);
        MetricsControl.Cclass.$init$(this);
        StageLogging.class.$init$(this);
        this.shape = subSourceStage.m111shape();
        this.requestMessages = new KafkaConsumerActor$Internal$RequestMessages(0, Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new TopicPartition[]{subSourceStage.akka$kafka$internal$SubSourceStage$$tp})));
        this.akka$kafka$internal$SubSourceStage$$anon$$requested = false;
        this.akka$kafka$internal$SubSourceStage$$anon$$buffer = package$.MODULE$.Iterator().empty();
        setHandler(subSourceStage.out(), new OutHandler(this) { // from class: akka.kafka.internal.SubSourceStage$$anon$1$$anon$4
            private final /* synthetic */ SubSourceStage$$anon$1 $outer;

            public void onPull() {
                this.$outer.akka$kafka$internal$SubSourceStage$$anon$$pump();
            }

            public void onDownstreamFinish() {
                this.$outer.akka$kafka$internal$SubSourceStage$$anon$$$outer().akka$kafka$internal$SubSourceStage$$subSourceCancelledCb.invoke(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(this.$outer.akka$kafka$internal$SubSourceStage$$anon$$$outer().akka$kafka$internal$SubSourceStage$$tp), this.$outer.akka$kafka$internal$SubSourceStage$$anon$$buffer().hasNext() ? new Some(this.$outer.akka$kafka$internal$SubSourceStage$$anon$$buffer().next()) : None$.MODULE$));
                OutHandler.class.onDownstreamFinish(this);
            }

            /* JADX WARN: Incorrect inner types in method signature: (Lakka/kafka/internal/SubSourceStage<TK;TV;TMsg;>.$anon$1;)V */
            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                OutHandler.class.$init$(this);
            }
        });
    }
}
