package io.r2dbc.postgresql;

import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.r2dbc.postgresql.api.CopyInBuilder;
import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.message.backend.ErrorResponse;
import io.r2dbc.postgresql.message.backend.ReadyForQuery;
import io.r2dbc.postgresql.message.frontend.CopyData;
import io.r2dbc.postgresql.message.frontend.CopyDone;
import io.r2dbc.postgresql.message.frontend.CopyFail;
import io.r2dbc.postgresql.message.frontend.FrontendMessage;
import io.r2dbc.postgresql.message.frontend.Query;
import io.r2dbc.postgresql.util.Assert;
import io.r2dbc.postgresql.util.Operators;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.annotation.Nullable;

/* loaded from: input_file:io/r2dbc/postgresql/PostgresqlCopyIn.class */
final class PostgresqlCopyIn {
    private final ConnectionResources context;

    /* loaded from: input_file:io/r2dbc/postgresql/PostgresqlCopyIn$Builder.class */
    static final class Builder implements CopyInBuilder {
        private final ConnectionResources resources;
        private final String sql;

        @Nullable
        private Publisher<? extends Publisher<ByteBuf>> stdin;

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder(ConnectionResources connectionResources, String str) {
            this.resources = connectionResources;
            this.sql = str;
        }

        @Override // io.r2dbc.postgresql.api.CopyInBuilder
        public CopyInBuilder fromMany(Publisher<? extends Publisher<ByteBuf>> publisher) {
            this.stdin = (Publisher) Assert.requireNonNull(publisher, "stdin must not be null");
            return this;
        }

        @Override // io.r2dbc.postgresql.api.CopyInBuilder
        public Mono<Long> build() {
            if (this.stdin == null) {
                throw new IllegalArgumentException("No stdin configured for COPY IN");
            }
            return new PostgresqlCopyIn(this.resources).copy(this.sql, this.stdin);
        }
    }

    PostgresqlCopyIn(ConnectionResources connectionResources) {
        this.context = (ConnectionResources) Assert.requireNonNull(connectionResources, "resources must not be null");
    }

    Mono<Long> copy(String str, Publisher<? extends Publisher<ByteBuf>> publisher) {
        ExceptionFactory withSql = ExceptionFactory.withSql(str);
        AtomicReference atomicReference = new AtomicReference();
        return ((Mono) Flux.from(publisher).concatMap(publisher2 -> {
            Mono map = Flux.from(publisher2).reduce(this.context.getClient().getByteBufAllocator().compositeBuffer(), (compositeByteBuf, byteBuf) -> {
                return compositeByteBuf.addComponent(true, byteBuf);
            }).map((v1) -> {
                return new CopyData(v1);
            });
            atomicReference.getClass();
            return map.doOnNext((v1) -> {
                r1.set(v1);
            }).doOnDiscard(ReferenceCounted.class, (v0) -> {
                ReferenceCountUtil.release(v0);
            });
        }).concatWithValues(new FrontendMessage[]{CopyDone.INSTANCE}).startWith(new FrontendMessage[]{new Query(str)}).as(flux -> {
            return copyIn(withSql, flux);
        })).doFinally(signalType -> {
            CopyData copyData = (CopyData) atomicReference.get();
            if (copyData == null || copyData.refCnt() <= 0) {
                return;
            }
            copyData.release();
        });
    }

    private Mono<Long> copyIn(ExceptionFactory exceptionFactory, Flux<FrontendMessage> flux) {
        Client client = this.context.getClient();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        Sinks.Many onBackpressureBuffer = Sinks.many().unicast().onBackpressureBuffer();
        Flux asFlux = onBackpressureBuffer.asFlux();
        onBackpressureBuffer.getClass();
        return (Mono) ((Flux) client.exchange(backendMessage -> {
            return backendMessage instanceof ReadyForQuery;
        }, asFlux.mergeWith(flux.doOnComplete(onBackpressureBuffer::tryEmitComplete).filter(frontendMessage -> {
            return !atomicBoolean.get();
        }).onErrorResume(th -> {
            copyFail(onBackpressureBuffer, atomicBoolean, "Copy operation failed: " + th.getMessage());
            return Mono.empty();
        }))).doOnNext(backendMessage2 -> {
            if (backendMessage2 instanceof ErrorResponse) {
                atomicBoolean.set(true);
                onBackpressureBuffer.tryEmitComplete();
            }
        }).doOnComplete(() -> {
            atomicBoolean.set(true);
            onBackpressureBuffer.tryEmitComplete();
        }).doOnError(th2 -> {
            copyFail(onBackpressureBuffer, atomicBoolean, "Copy operation failed: " + th2.getMessage());
        }).doOnCancel(() -> {
            copyFail(onBackpressureBuffer, atomicBoolean, "Copy operation failed: Cancelled");
        }).doOnDiscard(ReferenceCounted.class, (v0) -> {
            ReferenceCountUtil.release(v0);
        }).as(Operators::discardOnCancel)).doOnCancel(() -> {
            copyFail(onBackpressureBuffer, atomicBoolean, "Copy operation failed: Cancelled");
        }).as(flux2 -> {
            return PostgresqlResult.toResult(this.context, flux2, exceptionFactory).mo46getRowsUpdated();
        });
    }

    private void copyFail(Sinks.Many<FrontendMessage> many, AtomicBoolean atomicBoolean, String str) {
        many.tryEmitNext(new CopyFail(str));
        many.tryEmitComplete();
        atomicBoolean.set(true);
    }

    public String toString() {
        return "PostgresqlCopyIn{context=" + this.context + '}';
    }
}
