package akka.kafka.internal;

import akka.Done;
import akka.actor.ActorRef;
import akka.annotation.InternalApi;
import akka.event.LoggingAdapter;
import akka.kafka.ManualSubscription;
import akka.kafka.Subscriptions;
import akka.kafka.internal.PromiseControl;
import akka.kafka.scaladsl.Consumer;
import akka.kafka.scaladsl.PartitionAssignmentHandler;
import akka.stream.SourceShape;
import akka.stream.stage.AsyncCallback;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.OutHandler;
import akka.stream.stage.StageLogging;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import scala.MatchError;
import scala.PartialFunction;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.immutable.Map;
import scala.collection.immutable.Set;
import scala.collection.immutable.StringOps;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: BaseSingleSourceLogic.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005=h!B\u0011#\u0003\u0013I\u0003\u0002\u0003.\u0001\u0005\u000b\u0007I\u0011A.\t\u0011\u0001\u0004!\u0011!Q\u0001\nqCQ!\u0019\u0001\u0005\u0002\tDQ!\u001a\u0001\u0005R\u0019DQ!\u001c\u0001\u0007\u00129D\u0011\u0002\u001f\u0001A\u0002\u0003\u0007IQC=\t\u0013i\u0004\u0001\u0019!a\u0001\n+Y\bBCA\u0002\u0001\u0001\u0007\t\u0011)Q\u0007e\"Y\u0011Q\u0001\u0001A\u0002\u0003\u0007I\u0011CA\u0004\u0011-\tI\u0003\u0001a\u0001\u0002\u0004%\t\"a\u000b\t\u0017\u0005=\u0002\u00011A\u0001B\u0003&\u0011\u0011\u0002\u0005\n\u0003c\u0001\u0001\u0019!C\t\u0003gA\u0011\"a\u0017\u0001\u0001\u0004%\t\"!\u0018\t\u0011\u0005\u0005\u0004\u0001)Q\u0005\u0003kA\u0011\"a\u0019\u0001\u0001\u0004%I!!\u001a\t\u0013\u00055\u0004\u00011A\u0005\n\u0005=\u0004\u0002CA:\u0001\u0001\u0006K!a\u001a\t\u0013\u0005U\u0004\u00011A\u0005\n\u0005]\u0004\"CA@\u0001\u0001\u0007I\u0011BAA\u0011!\t)\t\u0001Q!\n\u0005e\u0004\"CAD\u0001\t\u0007I\u0011BAE\u0011!\ty\n\u0001Q\u0001\n\u0005-\u0005\"CAQ\u0001\t\u0007I\u0011BAE\u0011!\t\u0019\u000b\u0001Q\u0001\n\u0005-\u0005bBAS\u0001\u0011\u0005\u0013q\u0015\u0005\b\u0003S\u0003A\u0011CAV\u0011\u001d\tI\f\u0001D\t\u0003wCq!!0\u0001\t#\ny\fC\u0004\u0002N\u0002!I!a*\t\u000f\u0005u\u0007\u0001\"\u0005\u0002(\"9\u0011q\u001c\u0001\u0005B\u0005\u001d\u0006bBAq\u0001\u0011\u0005\u0011q\u0015\u0002\u0016\u0005\u0006\u001cXmU5oO2,7k\\;sG\u0016dunZ5d\u0015\t\u0019C%\u0001\u0005j]R,'O\\1m\u0015\t)c%A\u0003lC\u001a\\\u0017MC\u0001(\u0003\u0011\t7n[1\u0004\u0001U!!&\u0012*V'!\u00011fM\u001c;{\u0001;\u0006C\u0001\u00172\u001b\u0005i#B\u0001\u00180\u0003\u0015\u0019H/Y4f\u0015\t\u0001d%\u0001\u0004tiJ,\u0017-\\\u0005\u0003e5\u0012qb\u0012:ba\"\u001cF/Y4f\u0019><\u0017n\u0019\t\u0003iUj\u0011AI\u0005\u0003m\t\u0012a\u0002\u0015:p[&\u001cXmQ8oiJ|G\u000e\u0005\u00025q%\u0011\u0011H\t\u0002\u000f\u001b\u0016$(/[2t\u0007>tGO]8m!\t!4(\u0003\u0002=E\tq1\u000b^1hK&#Gj\\4hS:<\u0007C\u0001\u001b?\u0013\ty$EA\fT_V\u00148-\u001a'pO&\u001c7+\u001e2tGJL\u0007\u000f^5p]B)A'Q\"R)&\u0011!I\t\u0002\u000f\u001b\u0016\u001c8/Y4f\u0005VLG\u000eZ3s!\t!U\t\u0004\u0001\u0005\u000b\u0019\u0003!\u0019A$\u0003\u0003-\u000b\"\u0001\u0013(\u0011\u0005%cU\"\u0001&\u000b\u0003-\u000bQa]2bY\u0006L!!\u0014&\u0003\u000f9{G\u000f[5oOB\u0011\u0011jT\u0005\u0003!*\u00131!\u00118z!\t!%\u000bB\u0003T\u0001\t\u0007qIA\u0001W!\t!U\u000bB\u0003W\u0001\t\u0007qIA\u0002Ng\u001e\u0004R\u0001\u000e-D#RK!!\u0017\u0012\u0003#M{WO]2f\u0019><\u0017n\u0019\"vM\u001a,'/A\u0003tQ\u0006\u0004X-F\u0001]!\rif\fV\u0007\u0002_%\u0011ql\f\u0002\f'>,(oY3TQ\u0006\u0004X-\u0001\u0004tQ\u0006\u0004X\rI\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0005\r$\u0007#\u0002\u001b\u0001\u0007F#\u0006\"\u0002.\u0004\u0001\u0004a\u0016\u0001E3yK\u000e,H/[8o\u0007>tG/\u001a=u+\u00059\u0007C\u00015l\u001b\u0005I'B\u00016K\u0003)\u0019wN\\2veJ,g\u000e^\u0005\u0003Y&\u0014\u0001#\u0012=fGV$\u0018n\u001c8D_:$X\r\u001f;\u0002\u001d\r|gn];nKJ4U\u000f^;sKV\tq\u000eE\u0002iaJL!!]5\u0003\r\u0019+H/\u001e:f!\t\u0019h/D\u0001u\u0015\t)h%A\u0003bGR|'/\u0003\u0002xi\nA\u0011i\u0019;peJ+g-A\u0007d_:\u001cX/\\3s\u0003\u000e$xN]\u000b\u0002e\u0006\t2m\u001c8tk6,'/Q2u_J|F%Z9\u0015\u0005q|\bCA%~\u0013\tq(J\u0001\u0003V]&$\b\u0002CA\u0001\u000f\u0005\u0005\t\u0019\u0001:\u0002\u0007a$\u0013'\u0001\bd_:\u001cX/\\3s\u0003\u000e$xN\u001d\u0011\u0002\u0017M|WO]2f\u0003\u000e$xN]\u000b\u0003\u0003\u0013\u0001B!a\u0003\u0002$9!\u0011QBA\u0010\u001d\u0011\ty!!\b\u000f\t\u0005E\u00111\u0004\b\u0005\u0003'\tI\"\u0004\u0002\u0002\u0016)\u0019\u0011q\u0003\u0015\u0002\rq\u0012xn\u001c;?\u0013\u00059\u0013B\u0001\u0019'\u0013\tqs&C\u0002\u0002\"5\nqb\u0012:ba\"\u001cF/Y4f\u0019><\u0017nY\u0005\u0005\u0003K\t9C\u0001\u0006Ti\u0006<W-Q2u_JT1!!\t.\u0003=\u0019x.\u001e:dK\u0006\u001bGo\u001c:`I\u0015\fHc\u0001?\u0002.!I\u0011\u0011\u0001\u0006\u0002\u0002\u0003\u0007\u0011\u0011B\u0001\rg>,(oY3BGR|'\u000fI\u0001\u0004iB\u001cXCAA\u001b!\u0019\t9$!\u0011\u0002F5\u0011\u0011\u0011\b\u0006\u0005\u0003w\ti$A\u0005j[6,H/\u00192mK*\u0019\u0011q\b&\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0003\u0002D\u0005e\"aA*fiB!\u0011qIA,\u001b\t\tIE\u0003\u0003\u0002L\u00055\u0013AB2p[6|gNC\u0002&\u0003\u001fRA!!\u0015\u0002T\u00051\u0011\r]1dQ\u0016T!!!\u0016\u0002\u0007=\u0014x-\u0003\u0003\u0002Z\u0005%#A\u0004+pa&\u001c\u0007+\u0019:uSRLwN\\\u0001\biB\u001cx\fJ3r)\ra\u0018q\f\u0005\n\u0003\u0003i\u0011\u0011!a\u0001\u0003k\tA\u0001\u001e9tA\u0005I!/Z9vKN$X\rZ\u000b\u0003\u0003O\u00022!SA5\u0013\r\tYG\u0013\u0002\b\u0005>|G.Z1o\u00035\u0011X-];fgR,Gm\u0018\u0013fcR\u0019A0!\u001d\t\u0013\u0005\u0005\u0001#!AA\u0002\u0005\u001d\u0014A\u0003:fcV,7\u000f^3eA\u0005I!/Z9vKN$\u0018\nZ\u000b\u0003\u0003s\u00022!SA>\u0013\r\tiH\u0013\u0002\u0004\u0013:$\u0018!\u0004:fcV,7\u000f^%e?\u0012*\u0017\u000fF\u0002}\u0003\u0007C\u0011\"!\u0001\u0014\u0003\u0003\u0005\r!!\u001f\u0002\u0015I,\u0017/^3ti&#\u0007%\u0001\u0006bgNLwM\\3e\u0007\n+\"!a#\u0011\u000b1\ni)!%\n\u0007\u0005=UFA\u0007Bgft7mQ1mY\n\f7m\u001b\t\u0007\u0003'\u000bY*!\u0012\u000f\t\u0005U\u0015q\u0013\t\u0004\u0003'Q\u0015bAAM\u0015\u00061\u0001K]3eK\u001aLA!a\u0011\u0002\u001e*\u0019\u0011\u0011\u0014&\u0002\u0017\u0005\u001c8/[4oK\u0012\u001c%\tI\u0001\ne\u00164xn[3e\u0007\n\u000b!B]3w_.,Gm\u0011\"!\u0003!\u0001(/Z*uCJ$H#\u0001?\u0002\u001f5,7o]1hK\"\u000bg\u000e\u001a7j]\u001e,\"!!,\u0011\r%\u000by+a-}\u0013\r\t\tL\u0013\u0002\u0010!\u0006\u0014H/[1m\rVt7\r^5p]B)\u0011*!.s\u001d&\u0019\u0011q\u0017&\u0003\rQ+\b\u000f\\33\u0003M\u0019'/Z1uK\u000e{gn];nKJ\f5\r^8s)\u0005\u0011\u0018aG2p]\u001aLw-\u001e:f\u001b\u0006tW/\u00197Tk\n\u001c8M]5qi&|g\u000eF\u0002}\u0003\u0003Dq!a1\u001d\u0001\u0004\t)-\u0001\u0007tk\n\u001c8M]5qi&|g\u000e\u0005\u0003\u0002H\u0006%W\"\u0001\u0013\n\u0007\u0005-GE\u0001\nNC:,\u0018\r\\*vEN\u001c'/\u001b9uS>t\u0017\u0001\u00029v[BD3!HAi!\u0011\t\u0019.!7\u000e\u0005\u0005U'bAAl\u0015\u0006Q\u0011M\u001c8pi\u0006$\u0018n\u001c8\n\t\u0005m\u0017Q\u001b\u0002\bi\u0006LGN]3d\u0003=\u0011X-];fgRlUm]:bO\u0016\u001c\u0018\u0001\u00039pgR\u001cFo\u001c9\u0002\u001fA,'OZ8s[NCW\u000f\u001e3po:D3\u0001AAs!\u0011\t9/a;\u000e\u0005\u0005%(bAAlM%!\u0011Q^Au\u0005-Ie\u000e^3s]\u0006d\u0017\t]5")
@InternalApi
/* loaded from: input_file:akka/kafka/internal/BaseSingleSourceLogic.class */
public abstract class BaseSingleSourceLogic<K, V, Msg> extends GraphStageLogic implements PromiseControl, MetricsControl, StageIdLogging, SourceLogicSubscription, MessageBuilder<K, V, Msg>, SourceLogicBuffer<K, V, Msg> {
    private final SourceShape<Msg> shape;
    private ActorRef consumerActor;
    private GraphStageLogic.StageActor sourceActor;
    private Set<TopicPartition> tps;
    private boolean akka$kafka$internal$BaseSingleSourceLogic$$requested;
    private int akka$kafka$internal$BaseSingleSourceLogic$$requestId;
    private final AsyncCallback<Set<TopicPartition>> assignedCB;
    private final AsyncCallback<Set<TopicPartition>> revokedCB;
    private Iterator<ConsumerRecord<K, V>> buffer;
    private final AsyncCallback<Set<TopicPartition>> filterRevokedPartitionsCB;
    private LoggingAdapter akka$kafka$internal$StageIdLogging$$_log;
    private final String akka$kafka$internal$InstanceId$$instanceId;
    private LoggingAdapter akka$stream$stage$StageLogging$$_log;
    private final Promise<Done> akka$kafka$internal$PromiseControl$$shutdownPromise;
    private final Promise<Done> akka$kafka$internal$PromiseControl$$stopPromise;
    private final AsyncCallback<PromiseControl.ControlOperation> akka$kafka$internal$PromiseControl$$controlCallback;

    @Override // akka.kafka.internal.SourceLogicSubscription
    public void configureSubscription(AsyncCallback<Set<TopicPartition>> asyncCallback, AsyncCallback<Set<TopicPartition>> asyncCallback2) {
        SourceLogicSubscription.configureSubscription$(this, asyncCallback, asyncCallback2);
    }

    @Override // akka.kafka.internal.SourceLogicSubscription
    public PartitionAssignmentHandler addToPartitionAssignmentHandler(PartitionAssignmentHandler partitionAssignmentHandler) {
        return SourceLogicSubscription.addToPartitionAssignmentHandler$(this, partitionAssignmentHandler);
    }

    @Override // akka.kafka.internal.StageIdLogging
    public /* synthetic */ LoggingAdapter akka$kafka$internal$StageIdLogging$$super$log() {
        return StageLogging.log$(this);
    }

    @Override // akka.kafka.internal.StageIdLogging
    public String idLogPrefix() {
        String idLogPrefix;
        idLogPrefix = idLogPrefix();
        return idLogPrefix;
    }

    @Override // akka.kafka.internal.StageIdLogging
    public LoggingAdapter log() {
        LoggingAdapter log;
        log = log();
        return log;
    }

    @Override // akka.kafka.internal.InstanceId
    public String id() {
        String id;
        id = id();
        return id;
    }

    public Class<?> logSource() {
        return StageLogging.logSource$(this);
    }

    @Override // akka.kafka.scaladsl.Consumer.Control, akka.kafka.internal.MetricsControl
    public Future<Map<MetricName, Metric>> metrics() {
        Future<Map<MetricName, Metric>> metrics;
        metrics = metrics();
        return metrics;
    }

    @Override // akka.kafka.internal.PromiseControl
    public /* synthetic */ void akka$kafka$internal$PromiseControl$$super$setKeepGoing(boolean z) {
        super.setKeepGoing(z);
    }

    @Override // akka.kafka.internal.PromiseControl
    public void performStop() {
        performStop();
    }

    @Override // akka.kafka.internal.PromiseControl
    public boolean onStop() {
        boolean onStop;
        onStop = onStop();
        return onStop;
    }

    @Override // akka.kafka.internal.PromiseControl
    public boolean onShutdown() {
        boolean onShutdown;
        onShutdown = onShutdown();
        return onShutdown;
    }

    @Override // akka.kafka.internal.PromiseControl, akka.kafka.scaladsl.Consumer.Control
    public Future<Done> stop() {
        Future<Done> stop;
        stop = stop();
        return stop;
    }

    @Override // akka.kafka.internal.PromiseControl, akka.kafka.scaladsl.Consumer.Control
    public Future<Done> shutdown() {
        Future<Done> shutdown;
        shutdown = shutdown();
        return shutdown;
    }

    @Override // akka.kafka.internal.PromiseControl, akka.kafka.scaladsl.Consumer.Control
    public Future<Done> isShutdown() {
        Future<Done> isShutdown;
        isShutdown = isShutdown();
        return isShutdown;
    }

    @Override // akka.kafka.scaladsl.Consumer.Control
    public <S> Future<S> drainAndShutdown(Future<S> future, ExecutionContext executionContext) {
        Future<S> drainAndShutdown;
        drainAndShutdown = drainAndShutdown(future, executionContext);
        return drainAndShutdown;
    }

    @Override // akka.kafka.internal.SourceLogicBuffer
    public Iterator<ConsumerRecord<K, V>> buffer() {
        return this.buffer;
    }

    @Override // akka.kafka.internal.SourceLogicBuffer
    public void buffer_$eq(Iterator<ConsumerRecord<K, V>> iterator) {
        this.buffer = iterator;
    }

    @Override // akka.kafka.internal.SourceLogicBuffer
    public AsyncCallback<Set<TopicPartition>> filterRevokedPartitionsCB() {
        return this.filterRevokedPartitionsCB;
    }

    @Override // akka.kafka.internal.SourceLogicBuffer
    public void akka$kafka$internal$SourceLogicBuffer$_setter_$filterRevokedPartitionsCB_$eq(AsyncCallback<Set<TopicPartition>> asyncCallback) {
        this.filterRevokedPartitionsCB = asyncCallback;
    }

    @Override // akka.kafka.internal.StageIdLogging
    public LoggingAdapter akka$kafka$internal$StageIdLogging$$_log() {
        return this.akka$kafka$internal$StageIdLogging$$_log;
    }

    @Override // akka.kafka.internal.StageIdLogging
    public void akka$kafka$internal$StageIdLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.akka$kafka$internal$StageIdLogging$$_log = loggingAdapter;
    }

    @Override // akka.kafka.internal.InstanceId
    public String akka$kafka$internal$InstanceId$$instanceId() {
        return this.akka$kafka$internal$InstanceId$$instanceId;
    }

    @Override // akka.kafka.internal.InstanceId
    public final void akka$kafka$internal$InstanceId$_setter_$akka$kafka$internal$InstanceId$$instanceId_$eq(String str) {
        this.akka$kafka$internal$InstanceId$$instanceId = str;
    }

    public LoggingAdapter akka$stream$stage$StageLogging$$_log() {
        return this.akka$stream$stage$StageLogging$$_log;
    }

    public void akka$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.akka$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    @Override // akka.kafka.internal.PromiseControl
    public Promise<Done> akka$kafka$internal$PromiseControl$$shutdownPromise() {
        return this.akka$kafka$internal$PromiseControl$$shutdownPromise;
    }

    @Override // akka.kafka.internal.PromiseControl
    public Promise<Done> akka$kafka$internal$PromiseControl$$stopPromise() {
        return this.akka$kafka$internal$PromiseControl$$stopPromise;
    }

    @Override // akka.kafka.internal.PromiseControl
    public AsyncCallback<PromiseControl.ControlOperation> akka$kafka$internal$PromiseControl$$controlCallback() {
        return this.akka$kafka$internal$PromiseControl$$controlCallback;
    }

    @Override // akka.kafka.internal.PromiseControl
    public final void akka$kafka$internal$PromiseControl$_setter_$akka$kafka$internal$PromiseControl$$shutdownPromise_$eq(Promise<Done> promise) {
        this.akka$kafka$internal$PromiseControl$$shutdownPromise = promise;
    }

    @Override // akka.kafka.internal.PromiseControl
    public final void akka$kafka$internal$PromiseControl$_setter_$akka$kafka$internal$PromiseControl$$stopPromise_$eq(Promise<Done> promise) {
        this.akka$kafka$internal$PromiseControl$$stopPromise = promise;
    }

    @Override // akka.kafka.internal.PromiseControl
    public final void akka$kafka$internal$PromiseControl$_setter_$akka$kafka$internal$PromiseControl$$controlCallback_$eq(AsyncCallback<PromiseControl.ControlOperation> asyncCallback) {
        this.akka$kafka$internal$PromiseControl$$controlCallback = asyncCallback;
    }

    @Override // akka.kafka.internal.PromiseControl
    public SourceShape<Msg> shape() {
        return this.shape;
    }

    @Override // akka.kafka.internal.MetricsControl
    public ExecutionContext executionContext() {
        return materializer().executionContext();
    }

    @Override // akka.kafka.internal.MetricsControl
    public abstract Future<ActorRef> consumerFuture();

    @Override // akka.kafka.internal.SourceLogicSubscription
    public final ActorRef consumerActor() {
        return this.consumerActor;
    }

    public final void consumerActor_$eq(ActorRef actorRef) {
        this.consumerActor = actorRef;
    }

    @Override // akka.kafka.internal.SourceLogicSubscription
    public GraphStageLogic.StageActor sourceActor() {
        return this.sourceActor;
    }

    public void sourceActor_$eq(GraphStageLogic.StageActor stageActor) {
        this.sourceActor = stageActor;
    }

    public Set<TopicPartition> tps() {
        return this.tps;
    }

    public void tps_$eq(Set<TopicPartition> set) {
        this.tps = set;
    }

    private boolean akka$kafka$internal$BaseSingleSourceLogic$$requested() {
        return this.akka$kafka$internal$BaseSingleSourceLogic$$requested;
    }

    public void akka$kafka$internal$BaseSingleSourceLogic$$requested_$eq(boolean z) {
        this.akka$kafka$internal$BaseSingleSourceLogic$$requested = z;
    }

    public int akka$kafka$internal$BaseSingleSourceLogic$$requestId() {
        return this.akka$kafka$internal$BaseSingleSourceLogic$$requestId;
    }

    private void akka$kafka$internal$BaseSingleSourceLogic$$requestId_$eq(int i) {
        this.akka$kafka$internal$BaseSingleSourceLogic$$requestId = i;
    }

    private AsyncCallback<Set<TopicPartition>> assignedCB() {
        return this.assignedCB;
    }

    private AsyncCallback<Set<TopicPartition>> revokedCB() {
        return this.revokedCB;
    }

    public void preStart() {
        super.preStart();
        sourceActor_$eq(getStageActor(messageHandling()));
        log().info("Starting. StageActor {}", sourceActor().ref());
        consumerActor_$eq(createConsumerActor());
        sourceActor().watch(consumerActor());
        configureSubscription(assignedCB(), revokedCB());
    }

    public PartialFunction<Tuple2<ActorRef, Object>, BoxedUnit> messageHandling() {
        return new BaseSingleSourceLogic$$anonfun$messageHandling$1(this);
    }

    public abstract ActorRef createConsumerActor();

    @Override // akka.kafka.internal.SourceLogicSubscription
    public void configureManualSubscription(ManualSubscription manualSubscription) {
        if (manualSubscription instanceof Subscriptions.Assignment) {
            Set<TopicPartition> tps = ((Subscriptions.Assignment) manualSubscription).tps();
            consumerActor().tell(new KafkaConsumerActor$Internal$Assign(tps), sourceActor().ref());
            tps_$eq((Set) tps().$plus$plus(tps));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        if (manualSubscription instanceof Subscriptions.AssignmentWithOffset) {
            Map<TopicPartition, Object> tps2 = ((Subscriptions.AssignmentWithOffset) manualSubscription).tps();
            consumerActor().tell(new KafkaConsumerActor$Internal$AssignWithOffset(tps2), sourceActor().ref());
            tps_$eq((Set) tps().$plus$plus(tps2.keySet()));
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            return;
        }
        if (!(manualSubscription instanceof Subscriptions.AssignmentOffsetsForTimes)) {
            throw new MatchError(manualSubscription);
        }
        Map<TopicPartition, Object> timestampsToSearch = ((Subscriptions.AssignmentOffsetsForTimes) manualSubscription).timestampsToSearch();
        consumerActor().tell(new KafkaConsumerActor$Internal$AssignOffsetsForTimes(timestampsToSearch), sourceActor().ref());
        tps_$eq((Set) tps().$plus$plus(timestampsToSearch.keySet()));
        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
    }

    public void akka$kafka$internal$BaseSingleSourceLogic$$pump() {
        while (isAvailable(shape().out())) {
            if (!buffer().hasNext()) {
                if (akka$kafka$internal$BaseSingleSourceLogic$$requested() || !tps().nonEmpty()) {
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    return;
                } else {
                    requestMessages();
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                    return;
                }
            }
            push(shape().out(), createMessage((ConsumerRecord) buffer().next()));
        }
        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
    }

    public void requestMessages() {
        akka$kafka$internal$BaseSingleSourceLogic$$requested_$eq(true);
        akka$kafka$internal$BaseSingleSourceLogic$$requestId_$eq(akka$kafka$internal$BaseSingleSourceLogic$$requestId() + 1);
        log().debug("Requesting messages, requestId: {}, partitions: {}", BoxesRunTime.boxToInteger(akka$kafka$internal$BaseSingleSourceLogic$$requestId()), tps());
        consumerActor().tell(new KafkaConsumerActor$Internal$RequestMessages(akka$kafka$internal$BaseSingleSourceLogic$$requestId(), tps()), sourceActor().ref());
    }

    public void postStop() {
        onShutdown();
        super.postStop();
    }

    @Override // akka.kafka.internal.PromiseControl
    public void performShutdown() {
        log().info("Completing");
    }

    public static final /* synthetic */ void $anonfun$assignedCB$1(BaseSingleSourceLogic baseSingleSourceLogic, Set set) {
        baseSingleSourceLogic.tps_$eq((Set) baseSingleSourceLogic.tps().$plus$plus(set));
        baseSingleSourceLogic.log().debug("Assigned partitions: {}. All partitions: {}", set, baseSingleSourceLogic.tps());
        baseSingleSourceLogic.requestMessages();
    }

    public static final /* synthetic */ void $anonfun$revokedCB$1(BaseSingleSourceLogic baseSingleSourceLogic, Set set) {
        baseSingleSourceLogic.tps_$eq((Set) baseSingleSourceLogic.tps().$minus$minus(set));
        baseSingleSourceLogic.log().debug("Revoked partitions: {}. All partitions: {}", set, baseSingleSourceLogic.tps());
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public BaseSingleSourceLogic(SourceShape<Msg> sourceShape) {
        super(sourceShape);
        this.shape = sourceShape;
        Consumer.Control.$init$(this);
        PromiseControl.$init$((PromiseControl) this);
        MetricsControl.$init$((MetricsControl) this);
        StageLogging.$init$(this);
        akka$kafka$internal$InstanceId$_setter_$akka$kafka$internal$InstanceId$$instanceId_$eq((String) new StringOps(Predef$.MODULE$.augmentString(UUID.randomUUID().toString())).take(5));
        StageIdLogging.$init$((StageIdLogging) this);
        SourceLogicSubscription.$init$(this);
        SourceLogicBuffer.$init$(this);
        this.tps = Predef$.MODULE$.Set().empty();
        this.akka$kafka$internal$BaseSingleSourceLogic$$requested = false;
        this.akka$kafka$internal$BaseSingleSourceLogic$$requestId = 0;
        this.assignedCB = getAsyncCallback(set -> {
            $anonfun$assignedCB$1(this, set);
            return BoxedUnit.UNIT;
        });
        this.revokedCB = getAsyncCallback(set2 -> {
            $anonfun$revokedCB$1(this, set2);
            return BoxedUnit.UNIT;
        });
        setHandler(sourceShape.out(), new OutHandler(this) { // from class: akka.kafka.internal.BaseSingleSourceLogic$$anon$1
            private final /* synthetic */ BaseSingleSourceLogic $outer;

            public void onPull() {
                this.$outer.akka$kafka$internal$BaseSingleSourceLogic$$pump();
            }

            public void onDownstreamFinish() {
                this.$outer.performShutdown();
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                OutHandler.$init$(this);
            }
        });
    }
}
