package io.rsocket.test;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.util.ByteBufPayload;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/rsocket/test/TestRSocket.class */
public class TestRSocket implements RSocket {
    private final String data;
    private final String metadata;
    private final AtomicLong observedInteractions = new AtomicLong();
    private final AtomicLong activeInteractions = new AtomicLong();

    public TestRSocket(String str, String str2) {
        this.data = str;
        this.metadata = str2;
    }

    public Mono<Payload> requestResponse(Payload payload) {
        this.activeInteractions.getAndIncrement();
        payload.release();
        this.observedInteractions.getAndIncrement();
        return Mono.just(ByteBufPayload.create(this.data, this.metadata)).doFinally(signalType -> {
            this.activeInteractions.getAndDecrement();
        });
    }

    public Flux<Payload> requestStream(Payload payload) {
        this.activeInteractions.getAndIncrement();
        payload.release();
        this.observedInteractions.getAndIncrement();
        return Flux.range(1, 10000).map(num -> {
            return ByteBufPayload.create(this.data, this.metadata);
        }).doFinally(signalType -> {
            this.activeInteractions.getAndDecrement();
        });
    }

    public Mono<Void> metadataPush(Payload payload) {
        this.activeInteractions.getAndIncrement();
        payload.release();
        this.observedInteractions.getAndIncrement();
        return Mono.empty().doFinally(signalType -> {
            this.activeInteractions.getAndDecrement();
        });
    }

    public Mono<Void> fireAndForget(Payload payload) {
        this.activeInteractions.getAndIncrement();
        payload.release();
        this.observedInteractions.getAndIncrement();
        return Mono.empty().doFinally(signalType -> {
            this.activeInteractions.getAndDecrement();
        });
    }

    public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
        this.activeInteractions.getAndIncrement();
        this.observedInteractions.getAndIncrement();
        return Flux.from(publisher).doFinally(signalType -> {
            this.activeInteractions.getAndDecrement();
        });
    }

    public boolean awaitAllInteractionTermination(Duration duration) {
        long nanos = duration.plusNanos(System.nanoTime()).toNanos();
        while (true) {
            long j = this.activeInteractions.get();
            if (j <= 0) {
                return j == 0;
            }
            if (System.nanoTime() >= nanos) {
                return false;
            }
            LockSupport.parkNanos(100L);
        }
    }

    public boolean awaitUntilObserved(int i, Duration duration) {
        long nanoTime = System.nanoTime() + duration.toNanos();
        while (true) {
            long j = this.observedInteractions.get();
            if (j >= i) {
                return j >= ((long) i);
            }
            if (System.nanoTime() >= nanoTime) {
                return false;
            }
            LockSupport.parkNanos(100L);
        }
    }
}
