package io.rsocket.transport.netty;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.rsocket.DuplexConnection;
import java.util.Objects;
import java.util.Queue;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.Fuseable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.FutureMono;
import reactor.util.concurrent.Queues;

/* loaded from: input_file:io/rsocket/transport/netty/WebsocketDuplexConnection.class */
public final class WebsocketDuplexConnection implements DuplexConnection {
    private final Connection connection;
    private final Disposable channelClosed;

    public WebsocketDuplexConnection(Connection connection) {
        this.connection = (Connection) Objects.requireNonNull(connection, "connection must not be null");
        this.channelClosed = FutureMono.from(connection.channel().closeFuture()).doFinally(signalType -> {
            if (isDisposed()) {
                return;
            }
            dispose();
        }).subscribe();
    }

    public void dispose() {
        this.connection.dispose();
    }

    public boolean isDisposed() {
        return this.connection.isDisposed();
    }

    public Mono<Void> onClose() {
        return this.connection.onDispose().doFinally(signalType -> {
            if (this.channelClosed.isDisposed()) {
                return;
            }
            this.channelClosed.dispose();
        });
    }

    public Flux<ByteBuf> receive() {
        return this.connection.inbound().receive().map((v0) -> {
            return v0.retain();
        });
    }

    public Mono<Void> send(Publisher<ByteBuf> publisher) {
        return Flux.from(publisher).transform(flux -> {
            if (!(flux instanceof Fuseable.QueueSubscription)) {
                return new SendPublisher((Queue) Queues.small().get(), flux, this.connection.channel(), this::toBinaryWebSocketFrame, binaryWebSocketFrame -> {
                    return binaryWebSocketFrame.content().readableBytes();
                });
            }
            Fuseable.QueueSubscription queueSubscription = (Fuseable.QueueSubscription) flux;
            queueSubscription.requestFusion(2);
            return new SendPublisher(queueSubscription, flux, this.connection.channel(), this::toBinaryWebSocketFrame, binaryWebSocketFrame2 -> {
                return binaryWebSocketFrame2.content().readableBytes();
            });
        }).then();
    }

    private BinaryWebSocketFrame toBinaryWebSocketFrame(ByteBuf byteBuf) {
        return new BinaryWebSocketFrame(byteBuf.retain());
    }
}
