/*
 * Decompiled with CFR 0.152.
 */
package org.litote.kmongo.reactivestreams;

import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
import com.mongodb.reactivestreams.client.MongoCollection;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import kotlin.Metadata;
import kotlin.Unit;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import org.jetbrains.annotations.NotNull;
import org.litote.kmongo.reactivestreams.MongoSharedCollectionsKt;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

@Metadata(mv={1, 9, 0}, k=1, xi=48, d1={"\u0000H\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\u0010\u0002\n\u0000\n\u0002\u0010\u0003\n\u0002\b\u0002\n\u0002\u0010\t\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\b\u000f\n\u0002\u0018\u0002\n\u0000\b\u0002\u0018\u0000*\u0004\b\u0000\u0010\u00012\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u0002H\u00010\u00030\u0002B\u008d\u0001\u0012\f\u0010\u0004\u001a\b\u0012\u0004\u0012\u00028\u00000\u0005\u0012\u001e\u0010\u0006\u001a\u001a\u0012\n\u0012\b\u0012\u0004\u0012\u00028\u00000\u0005\u0012\n\u0012\b\u0012\u0004\u0012\u00028\u00000\b0\u0007\u0012\u000e\b\u0002\u0010\t\u001a\b\u0012\u0004\u0012\u00020\u000b0\n\u0012\u0014\b\u0002\u0010\f\u001a\u000e\u0012\u0004\u0012\u00020\r\u0012\u0004\u0012\u00020\u000b0\u0007\u0012\u000e\b\u0002\u0010\u000e\u001a\b\u0012\u0004\u0012\u00020\u000b0\n\u0012\b\b\u0002\u0010\u000f\u001a\u00020\u0010\u0012\u0018\u0010\u0011\u001a\u0014\u0012\n\u0012\b\u0012\u0004\u0012\u00028\u00000\u0003\u0012\u0004\u0012\u00020\u000b0\u0007\u00a2\u0006\u0002\u0010\u0012J\b\u0010 \u001a\u00020\u000bH\u0016J\u0010\u0010!\u001a\u00020\u000b2\u0006\u0010\"\u001a\u00020\rH\u0016J\u0016\u0010#\u001a\u00020\u000b2\f\u0010\"\u001a\b\u0012\u0004\u0012\u00028\u00000\u0003H\u0016J\u0010\u0010$\u001a\u00020\u000b2\u0006\u0010%\u001a\u00020&H\u0016R\u0017\u0010\u0004\u001a\b\u0012\u0004\u0012\u00028\u00000\u0005\u00a2\u0006\b\n\u0000\u001a\u0004\b\u0013\u0010\u0014R\u000e\u0010\u0015\u001a\u00020\u0016X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u001d\u0010\f\u001a\u000e\u0012\u0004\u0012\u00020\r\u0012\u0004\u0012\u00020\u000b0\u0007\u00a2\u0006\b\n\u0000\u001a\u0004\b\u0017\u0010\u0018R#\u0010\u0011\u001a\u0014\u0012\n\u0012\b\u0012\u0004\u0012\u00028\u00000\u0003\u0012\u0004\u0012\u00020\u000b0\u0007\u00a2\u0006\b\n\u0000\u001a\u0004\b\u0019\u0010\u0018R\u0011\u0010\u000f\u001a\u00020\u0010\u00a2\u0006\b\n\u0000\u001a\u0004\b\u001a\u0010\u001bR\u0017\u0010\u000e\u001a\b\u0012\u0004\u0012\u00020\u000b0\n\u00a2\u0006\b\n\u0000\u001a\u0004\b\u001c\u0010\u001dR\u0017\u0010\t\u001a\b\u0012\u0004\u0012\u00020\u000b0\n\u00a2\u0006\b\n\u0000\u001a\u0004\b\u001e\u0010\u001dR)\u0010\u0006\u001a\u001a\u0012\n\u0012\b\u0012\u0004\u0012\u00028\u00000\u0005\u0012\n\u0012\b\u0012\u0004\u0012\u00028\u00000\b0\u0007\u00a2\u0006\b\n\u0000\u001a\u0004\b\u001f\u0010\u0018\u00a8\u0006'"}, d2={"Lorg/litote/kmongo/reactivestreams/WatchSubscriber;", "T", "Lorg/reactivestreams/Subscriber;", "Lcom/mongodb/client/model/changestream/ChangeStreamDocument;", "col", "Lcom/mongodb/reactivestreams/client/MongoCollection;", "watchProvider", "Lkotlin/Function1;", "Lcom/mongodb/reactivestreams/client/ChangeStreamPublisher;", "subscribeListener", "Lkotlin/Function0;", "", "errorListener", "", "reopenListener", "reopenDelayInMS", "", "listener", "(Lcom/mongodb/reactivestreams/client/MongoCollection;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function0;JLkotlin/jvm/functions/Function1;)V", "getCol", "()Lcom/mongodb/reactivestreams/client/MongoCollection;", "complete", "Ljava/util/concurrent/atomic/AtomicBoolean;", "getErrorListener", "()Lkotlin/jvm/functions/Function1;", "getListener", "getReopenDelayInMS", "()J", "getReopenListener", "()Lkotlin/jvm/functions/Function0;", "getSubscribeListener", "getWatchProvider", "onComplete", "onError", "t", "onNext", "onSubscribe", "s", "Lorg/reactivestreams/Subscription;", "kmongo-async-shared"})
final class WatchSubscriber<T>
implements Subscriber<ChangeStreamDocument<T>> {
    @NotNull
    private final MongoCollection<T> col;
    @NotNull
    private final Function1<MongoCollection<T>, ChangeStreamPublisher<T>> watchProvider;
    @NotNull
    private final Function0<Unit> subscribeListener;
    @NotNull
    private final Function1<Throwable, Unit> errorListener;
    @NotNull
    private final Function0<Unit> reopenListener;
    private final long reopenDelayInMS;
    @NotNull
    private final Function1<ChangeStreamDocument<T>, Unit> listener;
    @NotNull
    private final AtomicBoolean complete;

    public WatchSubscriber(@NotNull MongoCollection<T> col, @NotNull Function1<? super MongoCollection<T>, ? extends ChangeStreamPublisher<T>> watchProvider, @NotNull Function0<Unit> subscribeListener, @NotNull Function1<? super Throwable, Unit> errorListener, @NotNull Function0<Unit> reopenListener, long reopenDelayInMS, @NotNull Function1<? super ChangeStreamDocument<T>, Unit> listener) {
        Intrinsics.checkNotNullParameter(col, (String)"col");
        Intrinsics.checkNotNullParameter(watchProvider, (String)"watchProvider");
        Intrinsics.checkNotNullParameter(subscribeListener, (String)"subscribeListener");
        Intrinsics.checkNotNullParameter(errorListener, (String)"errorListener");
        Intrinsics.checkNotNullParameter(reopenListener, (String)"reopenListener");
        Intrinsics.checkNotNullParameter(listener, (String)"listener");
        this.col = col;
        this.watchProvider = watchProvider;
        this.subscribeListener = subscribeListener;
        this.errorListener = errorListener;
        this.reopenListener = reopenListener;
        this.reopenDelayInMS = reopenDelayInMS;
        this.listener = listener;
        this.complete = new AtomicBoolean();
    }

    public /* synthetic */ WatchSubscriber(MongoCollection mongoCollection, Function1 function1, Function0 function0, Function1 function12, Function0 function02, long l, Function1 function13, int n, DefaultConstructorMarker defaultConstructorMarker) {
        if ((n & 4) != 0) {
            function0 = 1.INSTANCE;
        }
        if ((n & 8) != 0) {
            function12 = 2.INSTANCE;
        }
        if ((n & 0x10) != 0) {
            function02 = 3.INSTANCE;
        }
        if ((n & 0x20) != 0) {
            l = 5000L;
        }
        this(mongoCollection, function1, (Function0<Unit>)function0, (Function1<? super Throwable, Unit>)function12, (Function0<Unit>)function02, l, function13);
    }

    @NotNull
    public final MongoCollection<T> getCol() {
        return this.col;
    }

    @NotNull
    public final Function1<MongoCollection<T>, ChangeStreamPublisher<T>> getWatchProvider() {
        return this.watchProvider;
    }

    @NotNull
    public final Function0<Unit> getSubscribeListener() {
        return this.subscribeListener;
    }

    @NotNull
    public final Function1<Throwable, Unit> getErrorListener() {
        return this.errorListener;
    }

    @NotNull
    public final Function0<Unit> getReopenListener() {
        return this.reopenListener;
    }

    public final long getReopenDelayInMS() {
        return this.reopenDelayInMS;
    }

    @NotNull
    public final Function1<ChangeStreamDocument<T>, Unit> getListener() {
        return this.listener;
    }

    public void onComplete() {
        if (!this.complete.getAndSet(true)) {
            ScheduledExecutorService scheduledExecutorService;
            ScheduledExecutorService $this$onComplete_u24lambda_u241 = scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
            boolean bl = false;
            $this$onComplete_u24lambda_u241.schedule(() -> WatchSubscriber.onComplete$lambda$1$lambda$0(this, $this$onComplete_u24lambda_u241), this.reopenDelayInMS, TimeUnit.MILLISECONDS);
        }
    }

    public void onSubscribe(@NotNull Subscription s) {
        Intrinsics.checkNotNullParameter((Object)s, (String)"s");
        s.request(Long.MAX_VALUE);
        this.subscribeListener.invoke();
    }

    public void onNext(@NotNull ChangeStreamDocument<T> t) {
        Intrinsics.checkNotNullParameter(t, (String)"t");
        this.listener.invoke(t);
    }

    public void onError(@NotNull Throwable t) {
        Intrinsics.checkNotNullParameter((Object)t, (String)"t");
        this.errorListener.invoke((Object)t);
        this.onComplete();
    }

    private static final void onComplete$lambda$1$lambda$0(WatchSubscriber this$0, ScheduledExecutorService $this_apply) {
        Intrinsics.checkNotNullParameter((Object)this$0, (String)"this$0");
        this$0.reopenListener.invoke();
        MongoSharedCollectionsKt.watchIndefinitely(this$0.col, this$0.watchProvider, this$0.subscribeListener, this$0.errorListener, this$0.reopenListener, this$0.reopenDelayInMS, this$0.listener);
        $this_apply.awaitTermination(1L, TimeUnit.MINUTES);
    }
}

