package com.hotels.styx.server.netty.connectors;

import com.hotels.styx.api.Buffer;
import com.hotels.styx.api.Buffers;
import com.hotels.styx.api.LiveHttpResponse;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/hotels/styx/server/netty/connectors/HttpResponseWriter.class */
public class HttpResponseWriter {
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpResponseWriter.class);
    private final AtomicLong writeOps;
    private final AtomicLong contentBytesWritten;
    private final AtomicLong writeOpsAcked;
    private final AtomicLong contentBytesAcked;
    private final AtomicBoolean contentCompleted;
    private final ChannelHandlerContext ctx;
    private final ResponseTranslator responseTranslator;

    /* JADX INFO: Access modifiers changed from: package-private */
    public HttpResponseWriter(ChannelHandlerContext channelHandlerContext) {
        this(channelHandlerContext, new StyxToNettyResponseTranslator());
    }

    HttpResponseWriter(ChannelHandlerContext channelHandlerContext, ResponseTranslator responseTranslator) {
        this.writeOps = new AtomicLong(0L);
        this.contentBytesWritten = new AtomicLong(0L);
        this.writeOpsAcked = new AtomicLong(0L);
        this.contentBytesAcked = new AtomicLong(0L);
        this.contentCompleted = new AtomicBoolean(false);
        this.ctx = (ChannelHandlerContext) Objects.requireNonNull(channelHandlerContext);
        this.responseTranslator = (ResponseTranslator) Objects.requireNonNull(responseTranslator);
    }

    public CompletableFuture<Void> write(final LiveHttpResponse liveHttpResponse) {
        final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        try {
            writeHeaders(liveHttpResponse).addListener(channelFuture -> {
                if (channelFuture.isSuccess()) {
                    this.writeOpsAcked.incrementAndGet();
                } else {
                    LOGGER.warn("Unable to send response headers. Written content bytes {}/{} (ackd/sent). Write events {}/{} (ackd/writes). Exception={}", new Object[]{Long.valueOf(this.contentBytesAcked.get()), Long.valueOf(this.contentBytesWritten.get()), Long.valueOf(this.writeOpsAcked.get()), Long.valueOf(this.writeOps.get()), channelFuture.cause()});
                    completableFuture.completeExceptionally(channelFuture.cause());
                }
            });
            liveHttpResponse.body().subscribe(new BaseSubscriber<Buffer>() { // from class: com.hotels.styx.server.netty.connectors.HttpResponseWriter.1
                public void hookOnSubscribe(Subscription subscription) {
                    CompletableFuture completableFuture2 = completableFuture;
                    CompletableFuture completableFuture3 = completableFuture;
                    completableFuture2.handle((r4, th) -> {
                        if (!completableFuture3.isCompletedExceptionally() || !(th instanceof CancellationException)) {
                            return null;
                        }
                        subscription.cancel();
                        return null;
                    });
                    subscription.request(1L);
                }

                public void hookOnComplete() {
                    if (completableFuture.isDone()) {
                        return;
                    }
                    HttpResponseWriter.this.nettyWriteAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(this::onWriteEmptyLastChunkOutcome);
                    HttpResponseWriter.this.contentCompleted.set(true);
                    HttpResponseWriter.this.completeIfAllSent(completableFuture);
                }

                public void hookOnError(Throwable th) {
                    HttpResponseWriter.LOGGER.warn("Content observable error. Written content bytes {}/{} (ackd/sent). Write events {}/{} (ackd/writes). Exception={}", new Object[]{Long.valueOf(HttpResponseWriter.this.contentBytesAcked.get()), Long.valueOf(HttpResponseWriter.this.contentBytesWritten.get()), Long.valueOf(HttpResponseWriter.this.writeOpsAcked.get()), Long.valueOf(HttpResponseWriter.this.writeOps.get()), th});
                    completableFuture.completeExceptionally(th);
                }

                public void hookOnNext(Buffer buffer) {
                    ByteBuf byteBuf = Buffers.toByteBuf(buffer);
                    if (completableFuture.isDone()) {
                        byteBuf.release();
                        return;
                    }
                    long readableBytes = byteBuf.readableBytes();
                    HttpResponseWriter.this.contentBytesWritten.addAndGet(readableBytes);
                    HttpResponseWriter.this.nettyWriteAndFlush(new DefaultHttpContent(byteBuf)).addListener(future -> {
                        onWriteOutcome((ChannelFuture) future, readableBytes);
                    });
                }

                private void onWriteOutcome(ChannelFuture channelFuture2, long j) {
                    if (channelFuture2.isSuccess()) {
                        HttpResponseWriter.this.contentBytesAcked.addAndGet(j);
                        HttpResponseWriter.this.writeOpsAcked.incrementAndGet();
                        request(1L);
                        HttpResponseWriter.this.completeIfAllSent(completableFuture);
                        return;
                    }
                    if (completableFuture.isDone()) {
                        return;
                    }
                    cancel();
                    HttpResponseWriter.LOGGER.warn("Write error. Written content bytes {}/{} (ackd/sent). Write events {}/{} (ackd/writes), Exception={}", new Object[]{Long.valueOf(HttpResponseWriter.this.contentBytesAcked.get()), Long.valueOf(HttpResponseWriter.this.contentBytesWritten.get()), Long.valueOf(HttpResponseWriter.this.writeOpsAcked.get()), Long.valueOf(HttpResponseWriter.this.writeOps.get()), liveHttpResponse, channelFuture2.cause()});
                    completableFuture.completeExceptionally(channelFuture2.cause());
                }

                private void onWriteEmptyLastChunkOutcome(ChannelFuture channelFuture2) {
                    HttpResponseWriter.this.writeOpsAcked.incrementAndGet();
                    HttpResponseWriter.this.completeIfAllSent(completableFuture);
                    cancel();
                }
            });
            return completableFuture;
        } catch (Throwable th) {
            LOGGER.warn("Failed to convert response headers. response={}, Cause={}", new Object[]{liveHttpResponse, th});
            Flux.from(liveHttpResponse.body().drop()).subscribe();
            completableFuture.completeExceptionally(th);
            return completableFuture;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void completeIfAllSent(CompletableFuture<Void> completableFuture) {
        if (this.contentCompleted.get() && this.writeOps.get() == this.writeOpsAcked.get()) {
            completableFuture.complete(null);
        }
    }

    private ChannelFuture writeHeaders(LiveHttpResponse liveHttpResponse) {
        HttpResponse nettyResponse = this.responseTranslator.toNettyResponse(liveHttpResponse);
        if (!liveHttpResponse.contentLength().isPresent() && !liveHttpResponse.chunked()) {
            HttpHeaders.setTransferEncodingChunked(nettyResponse);
        }
        return nettyWriteAndFlush(nettyResponse);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ChannelFuture nettyWriteAndFlush(Object obj) {
        this.writeOps.incrementAndGet();
        return this.ctx.writeAndFlush(obj);
    }
}
