package com.hotels.styx.server.netty.connectors;

import com.google.common.annotations.VisibleForTesting;
import com.hotels.styx.api.Buffer;
import com.hotels.styx.api.ByteStream;
import com.hotels.styx.api.ContentOverflowException;
import com.hotels.styx.api.HttpHandler;
import com.hotels.styx.api.HttpHeaderNames;
import com.hotels.styx.api.HttpResponseStatus;
import com.hotels.styx.api.HttpVersion;
import com.hotels.styx.api.LiveHttpRequest;
import com.hotels.styx.api.LiveHttpResponse;
import com.hotels.styx.api.MetricRegistry;
import com.hotels.styx.api.exceptions.NoAvailableHostsException;
import com.hotels.styx.api.exceptions.OriginUnreachableException;
import com.hotels.styx.api.exceptions.ResponseTimeoutException;
import com.hotels.styx.api.exceptions.StyxException;
import com.hotels.styx.api.exceptions.TransportLostException;
import com.hotels.styx.api.metrics.codahale.CodaHaleMetricRegistry;
import com.hotels.styx.api.plugins.spi.PluginException;
import com.hotels.styx.client.BadHttpResponseException;
import com.hotels.styx.client.StyxClientException;
import com.hotels.styx.client.connectionpool.ResourceExhaustedException;
import com.hotels.styx.common.FsmEventProcessor;
import com.hotels.styx.common.QueueDrainingEventProcessor;
import com.hotels.styx.common.StateMachine;
import com.hotels.styx.common.content.ConsumerDisconnectedException;
import com.hotels.styx.server.BadRequestException;
import com.hotels.styx.server.HttpErrorStatusListener;
import com.hotels.styx.server.HttpInterceptorContext;
import com.hotels.styx.server.NoServiceConfiguredException;
import com.hotels.styx.server.RequestProgressListener;
import com.hotels.styx.server.RequestTimeoutException;
import com.hotels.styx.server.netty.connectors.ExceptionStatusMapper;
import com.hotels.styx.server.track.RequestTracker;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.http.LastHttpContent;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import javax.net.ssl.SSLHandshakeException;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;

/* loaded from: input_file:com/hotels/styx/server/netty/connectors/HttpPipelineHandler.class */
public class HttpPipelineHandler extends SimpleChannelInboundHandler<LiveHttpRequest> {
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpPipelineHandler.class);
    private static final ExceptionStatusMapper EXCEPTION_STATUSES = new ExceptionStatusMapper.Builder().add(HttpResponseStatus.REQUEST_TIMEOUT, RequestTimeoutException.class).add(HttpResponseStatus.BAD_GATEWAY, OriginUnreachableException.class, NoAvailableHostsException.class, NoServiceConfiguredException.class, BadHttpResponseException.class, ContentOverflowException.class).add(HttpResponseStatus.SERVICE_UNAVAILABLE, ResourceExhaustedException.class).add(HttpResponseStatus.GATEWAY_TIMEOUT, ResponseTimeoutException.class).add(HttpResponseStatus.INTERNAL_SERVER_ERROR, StyxClientException.class).build();
    private final HttpHandler httpPipeline;
    private final HttpErrorStatusListener httpErrorStatusListener;
    private final HttpResponseWriterFactory responseWriterFactory;
    private final RequestProgressListener statsSink;
    private final MetricRegistry metrics;
    private final StateMachine<State> stateMachine;
    private final ResponseEnhancer responseEnhancer;
    private final boolean secure;
    private final CharSequence originsHeaderName;
    private volatile Subscription subscription;
    private volatile LiveHttpRequest ongoingRequest;
    private volatile LiveHttpResponse ongoingResponse;
    private volatile LiveHttpRequest prematureRequest;
    private volatile CompletableFuture<Void> future;
    private volatile QueueDrainingEventProcessor eventProcessor;
    private final RequestTracker tracker;

    /* loaded from: input_file:com/hotels/styx/server/netty/connectors/HttpPipelineHandler$Builder.class */
    public static class Builder {
        private final HttpHandler httpPipeline;
        private ResponseEnhancer responseEnhancer = ResponseEnhancer.DO_NOT_MODIFY_RESPONSE;
        private HttpErrorStatusListener httpErrorStatusListener = HttpErrorStatusListener.IGNORE_ERROR_STATUS;
        private RequestProgressListener progressListener = RequestProgressListener.IGNORE_REQUEST_PROGRESS;
        private HttpResponseWriterFactory responseWriterFactory = HttpResponseWriter::new;
        private Supplier<MetricRegistry> metricRegistry = CodaHaleMetricRegistry::new;
        private RequestTracker tracker = RequestTracker.NO_OP;
        private boolean secure;
        private CharSequence originsHeaderName;

        public Builder(HttpHandler httpHandler) {
            this.httpPipeline = (HttpHandler) Objects.requireNonNull(httpHandler);
        }

        public Builder responseEnhancer(ResponseEnhancer responseEnhancer) {
            this.responseEnhancer = (ResponseEnhancer) Objects.requireNonNull(responseEnhancer);
            return this;
        }

        public Builder errorStatusListener(HttpErrorStatusListener httpErrorStatusListener) {
            this.httpErrorStatusListener = (HttpErrorStatusListener) Objects.requireNonNull(httpErrorStatusListener);
            return this;
        }

        public Builder progressListener(RequestProgressListener requestProgressListener) {
            this.progressListener = (RequestProgressListener) Objects.requireNonNull(requestProgressListener);
            return this;
        }

        Builder responseWriterFactory(HttpResponseWriterFactory httpResponseWriterFactory) {
            this.responseWriterFactory = (HttpResponseWriterFactory) Objects.requireNonNull(httpResponseWriterFactory);
            return this;
        }

        public Builder metricRegistry(MetricRegistry metricRegistry) {
            Objects.requireNonNull(metricRegistry);
            this.metricRegistry = () -> {
                return metricRegistry;
            };
            return this;
        }

        public Builder secure(boolean z) {
            this.secure = z;
            return this;
        }

        public Builder requestTracker(RequestTracker requestTracker) {
            this.tracker = (RequestTracker) Objects.requireNonNull(requestTracker);
            return this;
        }

        public Builder xOriginsHeader(CharSequence charSequence) {
            this.originsHeaderName = charSequence;
            return this;
        }

        public HttpPipelineHandler build() {
            return new HttpPipelineHandler(this, this.tracker);
        }

        HttpPipelineHandler buildForIoExceptionTest() {
            return new HttpPipelineHandler(this, this.tracker) { // from class: com.hotels.styx.server.netty.connectors.HttpPipelineHandler.Builder.1
                /* JADX INFO: Access modifiers changed from: protected */
                @Override // com.hotels.styx.server.netty.connectors.HttpPipelineHandler
                public void channelRead0(ChannelHandlerContext channelHandlerContext, LiveHttpRequest liveHttpRequest) throws Exception {
                    throw new IOException("Connection reset by peer");
                }
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hotels/styx/server/netty/connectors/HttpPipelineHandler$ChannelExceptionEvent.class */
    public static class ChannelExceptionEvent {
        ChannelHandlerContext ctx;
        Throwable cause;

        ChannelExceptionEvent(ChannelHandlerContext channelHandlerContext, Throwable th) {
            this.ctx = channelHandlerContext;
            this.cause = th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hotels/styx/server/netty/connectors/HttpPipelineHandler$ChannelInactiveEvent.class */
    public static class ChannelInactiveEvent {
        private ChannelInactiveEvent() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:com/hotels/styx/server/netty/connectors/HttpPipelineHandler$HttpResponseWriterFactory.class */
    public interface HttpResponseWriterFactory {
        HttpResponseWriter create(ChannelHandlerContext channelHandlerContext);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hotels/styx/server/netty/connectors/HttpPipelineHandler$RequestReceivedEvent.class */
    public static class RequestReceivedEvent {
        final LiveHttpRequest request;
        final ChannelHandlerContext ctx;

        RequestReceivedEvent(LiveHttpRequest liveHttpRequest, ChannelHandlerContext channelHandlerContext) {
            this.request = liveHttpRequest;
            this.ctx = channelHandlerContext;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hotels/styx/server/netty/connectors/HttpPipelineHandler$ResponseObservableCompletedEvent.class */
    public static class ResponseObservableCompletedEvent {
        private final ChannelHandlerContext ctx;
        private final Object requestId;

        ResponseObservableCompletedEvent(ChannelHandlerContext channelHandlerContext, Object obj) {
            this.ctx = channelHandlerContext;
            this.requestId = obj;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hotels/styx/server/netty/connectors/HttpPipelineHandler$ResponseObservableErrorEvent.class */
    public static class ResponseObservableErrorEvent {
        private final ChannelHandlerContext ctx;
        private final Throwable cause;
        private final Object requestId;

        ResponseObservableErrorEvent(ChannelHandlerContext channelHandlerContext, Throwable th, Object obj) {
            this.ctx = channelHandlerContext;
            this.cause = th;
            this.requestId = obj;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hotels/styx/server/netty/connectors/HttpPipelineHandler$ResponseReceivedEvent.class */
    public static class ResponseReceivedEvent {
        private final LiveHttpResponse response;
        private final ChannelHandlerContext ctx;

        ResponseReceivedEvent(LiveHttpResponse liveHttpResponse, ChannelHandlerContext channelHandlerContext) {
            this.response = liveHttpResponse;
            this.ctx = channelHandlerContext;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hotels/styx/server/netty/connectors/HttpPipelineHandler$ResponseSentEvent.class */
    public static class ResponseSentEvent {
        private final ChannelHandlerContext ctx;

        ResponseSentEvent(ChannelHandlerContext channelHandlerContext) {
            this.ctx = channelHandlerContext;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hotels/styx/server/netty/connectors/HttpPipelineHandler$ResponseWriteErrorEvent.class */
    public static class ResponseWriteErrorEvent {
        private final Throwable cause;
        private final ChannelHandlerContext ctx;

        ResponseWriteErrorEvent(ChannelHandlerContext channelHandlerContext, Throwable th) {
            this.ctx = channelHandlerContext;
            this.cause = th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/hotels/styx/server/netty/connectors/HttpPipelineHandler$State.class */
    public enum State {
        ACCEPTING_REQUESTS,
        WAITING_FOR_RESPONSE,
        SENDING_RESPONSE,
        SENDING_RESPONSE_CLIENT_CLOSED,
        TERMINATED
    }

    private HttpPipelineHandler(Builder builder, RequestTracker requestTracker) {
        this.responseEnhancer = (ResponseEnhancer) Objects.requireNonNull(builder.responseEnhancer);
        this.httpPipeline = (HttpHandler) Objects.requireNonNull(builder.httpPipeline);
        this.httpErrorStatusListener = (HttpErrorStatusListener) Objects.requireNonNull(builder.httpErrorStatusListener);
        this.responseWriterFactory = (HttpResponseWriterFactory) Objects.requireNonNull(builder.responseWriterFactory);
        this.statsSink = (RequestProgressListener) Objects.requireNonNull(builder.progressListener);
        this.stateMachine = createStateMachine();
        this.metrics = (MetricRegistry) builder.metricRegistry.get();
        this.secure = builder.secure;
        this.tracker = requestTracker;
        this.originsHeaderName = builder.originsHeaderName;
    }

    private StateMachine<State> createStateMachine() {
        return new StateMachine.Builder().initialState(State.ACCEPTING_REQUESTS).transition(State.ACCEPTING_REQUESTS, RequestReceivedEvent.class, requestReceivedEvent -> {
            return onLegitimateRequest(requestReceivedEvent.request, requestReceivedEvent.ctx);
        }).transition(State.ACCEPTING_REQUESTS, ChannelInactiveEvent.class, channelInactiveEvent -> {
            return State.TERMINATED;
        }).transition(State.ACCEPTING_REQUESTS, ChannelExceptionEvent.class, channelExceptionEvent -> {
            return onChannelExceptionWhenAcceptingRequests(channelExceptionEvent.ctx, channelExceptionEvent.cause);
        }).transition(State.ACCEPTING_REQUESTS, ResponseObservableCompletedEvent.class, responseObservableCompletedEvent -> {
            return State.ACCEPTING_REQUESTS;
        }).transition(State.WAITING_FOR_RESPONSE, ResponseReceivedEvent.class, responseReceivedEvent -> {
            return onResponseReceived(responseReceivedEvent.response, responseReceivedEvent.ctx);
        }).transition(State.WAITING_FOR_RESPONSE, RequestReceivedEvent.class, requestReceivedEvent2 -> {
            return onSpuriousRequest(requestReceivedEvent2.request, State.WAITING_FOR_RESPONSE);
        }).transition(State.WAITING_FOR_RESPONSE, ChannelInactiveEvent.class, channelInactiveEvent2 -> {
            return onChannelInactive();
        }).transition(State.WAITING_FOR_RESPONSE, ChannelExceptionEvent.class, channelExceptionEvent2 -> {
            return onChannelExceptionWhenWaitingForResponse(channelExceptionEvent2.ctx, channelExceptionEvent2.cause);
        }).transition(State.WAITING_FOR_RESPONSE, ResponseObservableErrorEvent.class, responseObservableErrorEvent -> {
            return onResponseObservableError(responseObservableErrorEvent.ctx, responseObservableErrorEvent.cause, responseObservableErrorEvent.requestId);
        }).transition(State.WAITING_FOR_RESPONSE, ResponseObservableCompletedEvent.class, responseObservableCompletedEvent2 -> {
            return onResponseObservableCompletedTooSoon(responseObservableCompletedEvent2.ctx, responseObservableCompletedEvent2.requestId);
        }).transition(State.SENDING_RESPONSE, ResponseSentEvent.class, responseSentEvent -> {
            return onResponseSent(responseSentEvent.ctx);
        }).transition(State.SENDING_RESPONSE, ResponseWriteErrorEvent.class, responseWriteErrorEvent -> {
            return onResponseWriteError(responseWriteErrorEvent.ctx, responseWriteErrorEvent.cause);
        }).transition(State.SENDING_RESPONSE, ChannelInactiveEvent.class, channelInactiveEvent3 -> {
            return State.SENDING_RESPONSE_CLIENT_CLOSED;
        }).transition(State.SENDING_RESPONSE, ChannelExceptionEvent.class, channelExceptionEvent3 -> {
            return onChannelExceptionWhenSendingResponse(channelExceptionEvent3.ctx, channelExceptionEvent3.cause);
        }).transition(State.SENDING_RESPONSE, ResponseObservableErrorEvent.class, responseObservableErrorEvent2 -> {
            return logError(State.SENDING_RESPONSE, responseObservableErrorEvent2.cause);
        }).transition(State.SENDING_RESPONSE, ResponseObservableCompletedEvent.class, responseObservableCompletedEvent3 -> {
            return State.SENDING_RESPONSE;
        }).transition(State.SENDING_RESPONSE, RequestReceivedEvent.class, requestReceivedEvent3 -> {
            return onPrematureRequest(requestReceivedEvent3.request, requestReceivedEvent3.ctx);
        }).transition(State.SENDING_RESPONSE_CLIENT_CLOSED, ResponseSentEvent.class, responseSentEvent2 -> {
            return onResponseSentAfterClientClosed(responseSentEvent2.ctx);
        }).transition(State.SENDING_RESPONSE_CLIENT_CLOSED, ResponseWriteErrorEvent.class, responseWriteErrorEvent2 -> {
            return onResponseWriteError(responseWriteErrorEvent2.ctx, responseWriteErrorEvent2.cause);
        }).transition(State.SENDING_RESPONSE_CLIENT_CLOSED, ChannelExceptionEvent.class, channelExceptionEvent4 -> {
            return logError(State.SENDING_RESPONSE_CLIENT_CLOSED, channelExceptionEvent4.cause);
        }).transition(State.SENDING_RESPONSE_CLIENT_CLOSED, ResponseObservableErrorEvent.class, responseObservableErrorEvent3 -> {
            return logError(State.SENDING_RESPONSE_CLIENT_CLOSED, responseObservableErrorEvent3.cause);
        }).transition(State.SENDING_RESPONSE_CLIENT_CLOSED, ResponseObservableCompletedEvent.class, responseObservableCompletedEvent4 -> {
            return State.SENDING_RESPONSE_CLIENT_CLOSED;
        }).transition(State.TERMINATED, ChannelInactiveEvent.class, channelInactiveEvent4 -> {
            return State.TERMINATED;
        }).onInappropriateEvent((state, obj) -> {
            LOGGER.warn(warningMessage(obj.getClass().getSimpleName()));
            return state;
        }).build();
    }

    private State logError(State state, Throwable th) {
        this.httpErrorStatusListener.proxyingFailure(this.ongoingRequest, this.ongoingResponse, th);
        return state;
    }

    @VisibleForTesting
    State state() {
        return (State) this.stateMachine.currentState();
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        this.eventProcessor = new QueueDrainingEventProcessor(new FsmEventProcessor(this.stateMachine, (th, state) -> {
        }, String.format("%s -> %s", channelHandlerContext.channel().remoteAddress(), channelHandlerContext.channel().localAddress())));
        super.channelActive(channelHandlerContext);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // 
    public void channelRead0(ChannelHandlerContext channelHandlerContext, LiveHttpRequest liveHttpRequest) throws Exception {
        this.eventProcessor.submit(new RequestReceivedEvent(liveHttpRequest, channelHandlerContext));
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        this.eventProcessor.submit(new ChannelInactiveEvent());
        super.channelInactive(channelHandlerContext);
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        this.eventProcessor.submit(new ChannelExceptionEvent(channelHandlerContext, th));
    }

    private State onSpuriousRequest(LiveHttpRequest liveHttpRequest, State state) {
        LOGGER.warn(warningMessage("message='Spurious request received while handling another request', spuriousRequest=" + liveHttpRequest));
        this.metrics.counter("requests.cancelled.spuriousRequest").inc();
        this.statsSink.onTerminate(this.ongoingRequest.id());
        this.tracker.endTrack(this.ongoingRequest);
        cancelSubscription();
        return State.TERMINATED;
    }

    private State onPrematureRequest(LiveHttpRequest liveHttpRequest, ChannelHandlerContext channelHandlerContext) {
        if (this.prematureRequest == null) {
            this.prematureRequest = liveHttpRequest;
            return state();
        }
        LOGGER.warn(warningMessage("message='Spurious request received while handling another request', spuriousRequest=%s" + liveHttpRequest));
        this.metrics.counter("requests.cancelled.spuriousRequest").inc();
        cancelSubscription();
        this.statsSink.onTerminate(this.ongoingRequest.id());
        this.tracker.endTrack(this.ongoingRequest);
        this.future.cancel(false);
        channelHandlerContext.close();
        return State.TERMINATED;
    }

    private State onLegitimateRequest(final LiveHttpRequest liveHttpRequest, final ChannelHandlerContext channelHandlerContext) {
        this.statsSink.onRequest(liveHttpRequest.id());
        LiveHttpRequest build = liveHttpRequest.newBuilder().version(HttpVersion.HTTP_1_1).build();
        this.tracker.trackRequest(liveHttpRequest, () -> {
            return state().toString();
        });
        this.ongoingRequest = liveHttpRequest;
        try {
            this.httpPipeline.handle(build, new HttpInterceptorContext(this.secure, remoteAddress(channelHandlerContext), channelHandlerContext.executor())).subscribe(new BaseSubscriber<LiveHttpResponse>() { // from class: com.hotels.styx.server.netty.connectors.HttpPipelineHandler.1
                public void hookOnSubscribe(Subscription subscription) {
                    HttpPipelineHandler.this.subscription = subscription;
                    subscription.request(1L);
                }

                public void hookOnComplete() {
                    HttpPipelineHandler.this.eventProcessor.submit(new ResponseObservableCompletedEvent(channelHandlerContext, liveHttpRequest.id()));
                }

                public void hookOnError(Throwable th) {
                    HttpPipelineHandler.this.eventProcessor.submit(new ResponseObservableErrorEvent(channelHandlerContext, th, liveHttpRequest.id()));
                }

                public void hookOnNext(LiveHttpResponse liveHttpResponse) {
                    HttpPipelineHandler.this.eventProcessor.submit(new ResponseReceivedEvent(liveHttpResponse, channelHandlerContext));
                }
            });
            return State.WAITING_FOR_RESPONSE;
        } catch (Throwable th) {
            LiveHttpResponse exceptionToResponse = exceptionToResponse(th, liveHttpRequest, this.originsHeaderName);
            this.httpErrorStatusListener.proxyErrorOccurred(liveHttpRequest, remoteAddress(channelHandlerContext), exceptionToResponse.status(), th);
            this.statsSink.onTerminate(liveHttpRequest.id());
            this.tracker.endTrack(this.ongoingRequest);
            if (channelHandlerContext.channel().isActive()) {
                respondAndClose(channelHandlerContext, exceptionToResponse);
            }
            return State.TERMINATED;
        }
    }

    private State onResponseReceived(LiveHttpResponse liveHttpResponse, ChannelHandlerContext channelHandlerContext) {
        this.ongoingResponse = liveHttpResponse;
        this.future = this.responseWriterFactory.create(channelHandlerContext).write(this.responseEnhancer.enhance(this.ongoingResponse, this.ongoingRequest));
        this.future.handle((r8, th) -> {
            if (th != null) {
                this.eventProcessor.submit(new ResponseWriteErrorEvent(channelHandlerContext, th));
                return null;
            }
            this.eventProcessor.submit(new ResponseSentEvent(channelHandlerContext));
            return null;
        });
        return State.SENDING_RESPONSE;
    }

    private State onResponseSent(ChannelHandlerContext channelHandlerContext) {
        this.statsSink.onComplete(this.ongoingRequest.id(), this.ongoingResponse.status().code());
        this.tracker.endTrack(this.ongoingRequest);
        if (!this.ongoingRequest.keepAlive()) {
            this.ongoingRequest = null;
            channelHandlerContext.close();
            return State.TERMINATED;
        }
        this.ongoingRequest = null;
        this.ongoingResponse = null;
        if (this.prematureRequest != null) {
            this.eventProcessor.submit(new RequestReceivedEvent(this.prematureRequest, channelHandlerContext));
            this.prematureRequest = null;
        }
        return State.ACCEPTING_REQUESTS;
    }

    private State onResponseSentAfterClientClosed(ChannelHandlerContext channelHandlerContext) {
        this.statsSink.onComplete(this.ongoingRequest.id(), this.ongoingResponse.status().code());
        this.tracker.endTrack(this.ongoingRequest);
        this.ongoingRequest = null;
        channelHandlerContext.close();
        return State.TERMINATED;
    }

    private State onResponseWriteError(ChannelHandlerContext channelHandlerContext, Throwable th) {
        this.metrics.counter("requests.cancelled.responseWriteError").inc();
        cancelSubscription();
        this.statsSink.onTerminate(this.ongoingRequest.id());
        this.tracker.endTrack(this.ongoingRequest);
        channelHandlerContext.channel().writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(ChannelFutureListener.CLOSE);
        this.httpErrorStatusListener.proxyWriteFailure(this.ongoingRequest, this.ongoingResponse, th);
        return State.TERMINATED;
    }

    private State onChannelInactive() {
        this.metrics.counter("requests.cancelled.channelInactive").inc();
        if (this.future != null) {
            LOGGER.warn(warningMessage("message=onChannelInactive"));
            this.future.cancel(false);
        }
        cancelSubscription();
        this.statsSink.onTerminate(this.ongoingRequest.id());
        this.tracker.endTrack(this.ongoingRequest);
        return State.TERMINATED;
    }

    private State onChannelExceptionWhenSendingResponse(ChannelHandlerContext channelHandlerContext, Throwable th) {
        this.metrics.counter("requests.cancelled.channelExceptionWhileSendingResponse").inc();
        cancelSubscription();
        this.statsSink.onTerminate(this.ongoingRequest.id());
        this.tracker.endTrack(this.ongoingRequest);
        channelHandlerContext.channel().writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(ChannelFutureListener.CLOSE);
        this.httpErrorStatusListener.proxyErrorOccurred(th);
        return State.TERMINATED;
    }

    private State onChannelExceptionWhenWaitingForResponse(ChannelHandlerContext channelHandlerContext, Throwable th) {
        this.metrics.counter("requests.cancelled.channelExceptionWhileWaitingForResponse").inc();
        this.statsSink.onTerminate(this.ongoingRequest.id());
        this.tracker.endTrack(this.ongoingRequest);
        cancelSubscription();
        return handleChannelException(channelHandlerContext, th);
    }

    private State onChannelExceptionWhenAcceptingRequests(ChannelHandlerContext channelHandlerContext, Throwable th) {
        return handleChannelException(channelHandlerContext, th);
    }

    private State handleChannelException(ChannelHandlerContext channelHandlerContext, Throwable th) {
        Throwable sslException = sslException(th);
        if (sslException != null) {
            if (channelHandlerContext.channel().isActive()) {
                channelHandlerContext.channel().close();
            }
            LOGGER.info("SSL handshake failure from incoming connection cause=\"{}\", serverAddress={}, clientAddress={}", new Object[]{sslException.getMessage(), channelHandlerContext.channel().localAddress(), channelHandlerContext.channel().remoteAddress()});
            return State.TERMINATED;
        }
        if (!isIoException(th)) {
            LiveHttpResponse exceptionToResponse = exceptionToResponse(th, this.ongoingRequest, this.originsHeaderName);
            this.httpErrorStatusListener.proxyErrorOccurred(exceptionToResponse.status(), th);
            if (channelHandlerContext.channel().isActive()) {
                respondAndClose(channelHandlerContext, exceptionToResponse);
            }
        }
        return State.TERMINATED;
    }

    private static Throwable sslException(Throwable th) {
        if (th.getCause() == null || !(th.getCause() instanceof SSLHandshakeException)) {
            return null;
        }
        return th.getCause();
    }

    private void respondAndClose(ChannelHandlerContext channelHandlerContext, LiveHttpResponse liveHttpResponse) {
        CompletableFuture<Void> write = this.responseWriterFactory.create(channelHandlerContext).write(liveHttpResponse);
        write.handle((r8, th) -> {
            if (write.isCompletedExceptionally()) {
                LOGGER.error(warningMessage("message='Unable to send error', response=" + th));
            }
            channelHandlerContext.close();
            return null;
        });
    }

    private State onResponseObservableError(ChannelHandlerContext channelHandlerContext, Throwable th, Object obj) {
        if (!this.ongoingRequest.id().equals(obj)) {
            return state();
        }
        this.metrics.counter("requests.cancelled.responseError").inc();
        cancelSubscription();
        LOGGER.error(warningMessage(String.format("message='Error proxying request', requestId=%s cause=%s", obj, th)));
        if (th instanceof ConsumerDisconnectedException) {
            return State.TERMINATED;
        }
        LiveHttpResponse exceptionToResponse = exceptionToResponse(th, this.ongoingRequest, this.originsHeaderName);
        this.responseWriterFactory.create(channelHandlerContext).write(exceptionToResponse).handle((r10, th2) -> {
            if (th2 != null) {
                this.httpErrorStatusListener.proxyErrorOccurred(th);
                this.httpErrorStatusListener.proxyErrorOccurred(th2);
            } else {
                this.httpErrorStatusListener.proxyErrorOccurred(this.ongoingRequest, remoteAddress(channelHandlerContext), exceptionToResponse.status(), th);
                this.statsSink.onComplete(this.ongoingRequest.id(), exceptionToResponse.status().code());
                this.tracker.endTrack(this.ongoingRequest);
            }
            channelHandlerContext.close();
            return null;
        }).handle((BiFunction<? super U, Throwable, ? extends U>) (obj2, th3) -> {
            this.statsSink.onTerminate(this.ongoingRequest.id());
            this.tracker.endTrack(this.ongoingRequest);
            if (th3 == null) {
                return null;
            }
            LOGGER.error(warningMessage("message='Error during write completion handling'"), th3);
            return null;
        });
        return State.TERMINATED;
    }

    private State onResponseObservableCompletedTooSoon(ChannelHandlerContext channelHandlerContext, Object obj) {
        this.metrics.counter("requests.cancelled.observableCompletedTooSoon").inc();
        if (!this.ongoingRequest.id().equals(obj)) {
            return state();
        }
        cancelSubscription();
        this.statsSink.onTerminate(this.ongoingRequest.id());
        this.tracker.endTrack(this.ongoingRequest);
        this.responseWriterFactory.create(channelHandlerContext).write(LiveHttpResponse.response(HttpResponseStatus.INTERNAL_SERVER_ERROR).build()).handle((r3, th) -> {
            return channelHandlerContext.close();
        });
        return State.TERMINATED;
    }

    private static boolean isIoException(Throwable th) {
        return th instanceof IOException;
    }

    private LiveHttpResponse exceptionToResponse(Throwable th, LiveHttpRequest liveHttpRequest, CharSequence charSequence) {
        HttpResponseStatus status = status(th instanceof PluginException ? th.getCause() : th);
        String description = status.code() >= 500 ? "Site temporarily unavailable." : status.description();
        LiveHttpResponse.Transformer header = this.responseEnhancer.enhance(LiveHttpResponse.response(status).body(new ByteStream(Flux.just(new Buffer(description, StandardCharsets.UTF_8)))).build().newBuilder(), liveHttpRequest).header(HttpHeaderNames.CONTENT_LENGTH, Integer.valueOf(description.getBytes(StandardCharsets.UTF_8).length)).header(HttpHeaderNames.CONNECTION, "close");
        return (charSequence == null || originFromException(th) == null) ? header.build() : header.header(charSequence, originFromException(th)).build();
    }

    private String originFromException(Throwable th) {
        if (!(th instanceof StyxException)) {
            return null;
        }
        StyxException styxException = (StyxException) th;
        return (String) styxException.origin().map((v0) -> {
            return v0.toString();
        }).orElse(Optional.ofNullable(styxException.application()).map((v0) -> {
            return v0.toString();
        }).orElse(null));
    }

    private static HttpResponseStatus status(Throwable th) {
        return EXCEPTION_STATUSES.statusFor(th).orElseGet(() -> {
            if (th instanceof DecoderException) {
                Throwable cause = th.getCause();
                if (cause instanceof BadRequestException) {
                    return cause.getCause() instanceof TooLongFrameException ? HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE : HttpResponseStatus.BAD_REQUEST;
                }
            } else if (th instanceof TransportLostException) {
                return HttpResponseStatus.BAD_GATEWAY;
            }
            return HttpResponseStatus.INTERNAL_SERVER_ERROR;
        });
    }

    private String warningMessage(String str) {
        return String.format("%s, state=%s, request=%s, ongoingResponse=%s, prematureRequest=%s", str, state(), this.ongoingRequest, this.ongoingResponse, this.prematureRequest);
    }

    private void cancelSubscription() {
        if (this.subscription != null) {
            this.subscription.cancel();
        }
    }

    private static InetSocketAddress remoteAddress(ChannelHandlerContext channelHandlerContext) {
        return channelHandlerContext.channel() instanceof EmbeddedChannel ? new InetSocketAddress(0) : (InetSocketAddress) channelHandlerContext.channel().remoteAddress();
    }
}
