/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shenyu.plugin.httpclient;

import com.google.common.collect.Sets;
import io.netty.channel.ConnectTimeoutException;
import io.netty.handler.timeout.ReadTimeoutException;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.shenyu.common.enums.RetryEnum;
import org.apache.shenyu.common.exception.ShenyuException;
import org.apache.shenyu.common.utils.LogUtils;
import org.apache.shenyu.loadbalancer.cache.UpstreamCacheManager;
import org.apache.shenyu.loadbalancer.entity.Upstream;
import org.apache.shenyu.loadbalancer.factory.LoadBalancerFactory;
import org.apache.shenyu.plugin.api.ShenyuPlugin;
import org.apache.shenyu.plugin.api.ShenyuPluginChain;
import org.apache.shenyu.plugin.api.context.ShenyuContext;
import org.apache.shenyu.plugin.api.result.ShenyuResultEnum;
import org.apache.shenyu.plugin.api.result.ShenyuResultWrap;
import org.apache.shenyu.plugin.api.utils.RequestUrlUtils;
import org.apache.shenyu.plugin.api.utils.WebFluxResultUtils;
import org.apache.shenyu.plugin.httpclient.config.DuplicateResponseHeaderProperties;
import org.apache.shenyu.plugin.httpclient.exception.ShenyuTimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.web.server.ResponseStatusException;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import reactor.util.retry.RetryBackoffSpec;

public abstract class AbstractHttpClientPlugin<R>
implements ShenyuPlugin {
    protected static final Logger LOG = LoggerFactory.getLogger(AbstractHttpClientPlugin.class);

    public final Mono<Void> execute(ServerWebExchange exchange, ShenyuPluginChain chain) {
        ShenyuContext shenyuContext = (ShenyuContext)exchange.getAttribute("context");
        assert (shenyuContext != null);
        URI uri = (URI)exchange.getAttribute("httpUri");
        if (Objects.isNull(uri)) {
            Object error = ShenyuResultWrap.error((ServerWebExchange)exchange, (ShenyuResultEnum)ShenyuResultEnum.CANNOT_FIND_URL);
            return WebFluxResultUtils.result((ServerWebExchange)exchange, (Object)error);
        }
        long timeout = (Long)Optional.ofNullable(exchange.getAttribute("httpTimeOut")).orElse(3000L);
        Duration duration = Duration.ofMillis(timeout);
        int retryTimes = (Integer)Optional.ofNullable(exchange.getAttribute("httpRetry")).orElse(0);
        String retryStrategy = (String)Optional.ofNullable(exchange.getAttribute("retryStrategy")).orElseGet(() -> ((RetryEnum)RetryEnum.CURRENT).getName());
        LogUtils.debug((Logger)LOG, () -> String.format("The request urlPath is: %s, retryTimes is : %s, retryStrategy is : %s", uri, retryTimes, retryStrategy));
        Mono response = this.doRequest(exchange, exchange.getRequest().getMethodValue(), uri, (Flux<DataBuffer>)exchange.getRequest().getBody()).timeout(duration, Mono.error(() -> new TimeoutException("Response took longer than timeout: " + duration))).doOnError(e -> LOG.error(e.getMessage(), e));
        if (RetryEnum.CURRENT.getName().equals(retryStrategy)) {
            RetryBackoffSpec retryBackoffSpec = Retry.backoff((long)retryTimes, (Duration)Duration.ofMillis(20L)).maxBackoff(Duration.ofSeconds(20L)).transientErrors(true).jitter(0.5).filter(t -> t instanceof TimeoutException || t instanceof ConnectTimeoutException || t instanceof ReadTimeoutException || t instanceof IllegalStateException).onRetryExhaustedThrow((retryBackoffSpecErr, retrySignal) -> {
                throw new ShenyuTimeoutException("Request timeout, the maximum number of retry times has been exceeded");
            });
            return response.retryWhen((Retry)retryBackoffSpec).onErrorMap(ShenyuTimeoutException.class, th -> new ResponseStatusException(HttpStatus.REQUEST_TIMEOUT, th.getMessage(), (Throwable)th)).onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), (Throwable)th)).flatMap(o -> chain.execute(exchange));
        }
        HashSet exclude = Sets.newHashSet((Object[])new URI[]{uri});
        return this.resend(response, exchange, duration, exclude, retryTimes).onErrorMap(ShenyuException.class, th -> new ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER.getMsg(), (Throwable)th)).onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), (Throwable)th)).flatMap(o -> chain.execute(exchange));
    }

    private Mono<R> resend(Mono<R> clientResponse, ServerWebExchange exchange, Duration duration, Set<URI> exclude, int retryTimes) {
        Mono<R> result = clientResponse;
        for (int i = 0; i < retryTimes; ++i) {
            result = this.resend(result, exchange, duration, exclude);
        }
        return result;
    }

    private Mono<R> resend(Mono<R> response, ServerWebExchange exchange, Duration duration, Set<URI> exclude) {
        return response.onErrorResume(th -> {
            String selectorId = (String)exchange.getAttribute("divideSelectorId");
            String loadBalance = (String)exchange.getAttribute("loadBalance");
            List upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selectorId).stream().filter(data -> {
                String trimUri = data.getUrl().trim();
                for (URI needToExclude : exclude) {
                    if (!(needToExclude.getHost() + ":" + needToExclude.getPort()).equals(trimUri)) continue;
                    return false;
                }
                return true;
            }).collect(Collectors.toList());
            if (CollectionUtils.isEmpty(upstreamList)) {
                return Mono.error((Throwable)new ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER.getMsg()));
            }
            String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
            Upstream upstream = LoadBalancerFactory.selector(upstreamList, (String)loadBalance, (String)ip);
            if (Objects.isNull(upstream)) {
                return Mono.error((Throwable)new ShenyuException(ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER.getMsg()));
            }
            URI newUri = RequestUrlUtils.buildRequestUri((ServerWebExchange)exchange, (String)upstream.buildDomain());
            exclude.add(newUri);
            return this.doRequest(exchange, exchange.getRequest().getMethodValue(), newUri, (Flux<DataBuffer>)exchange.getRequest().getBody()).timeout(duration, Mono.error(() -> new TimeoutException("Response took longer than timeout: " + duration))).doOnError(e -> LOG.error(e.getMessage(), e));
        });
    }

    protected abstract Mono<R> doRequest(ServerWebExchange var1, String var2, URI var3, Flux<DataBuffer> var4);

    protected void duplicateHeaders(HttpHeaders headers, String header, DuplicateResponseHeaderProperties.DuplicateResponseHeaderStrategy strategy) {
        List headerValues = headers.get((Object)header);
        if (Objects.isNull(headerValues) || headerValues.size() <= 1) {
            return;
        }
        switch (strategy) {
            case RETAIN_FIRST: {
                headers.set(header, (String)headerValues.get(0));
                break;
            }
            case RETAIN_LAST: {
                headers.set(header, (String)headerValues.get(headerValues.size() - 1));
                break;
            }
            case RETAIN_UNIQUE: {
                headers.put(header, new ArrayList(new LinkedHashSet(headerValues)));
                break;
            }
            default: {
                throw new IllegalStateException("Unexpected value: " + (Object)((Object)strategy));
            }
        }
    }
}

