/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shenyu.plugin.aliyun.sls.client;

import com.aliyun.openservices.aliyun.log.producer.LogProducer;
import com.aliyun.openservices.aliyun.log.producer.Producer;
import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
import com.aliyun.openservices.aliyun.log.producer.Result;
import com.aliyun.openservices.aliyun.log.producer.errors.LogSizeTooLargeException;
import com.aliyun.openservices.aliyun.log.producer.errors.MaxBatchCountExceedException;
import com.aliyun.openservices.aliyun.log.producer.errors.ResultFailedException;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.LogItem;
import com.aliyun.openservices.log.common.LogStore;
import com.aliyun.openservices.log.exception.LogException;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
import org.apache.shenyu.common.utils.GsonUtils;
import org.apache.shenyu.plugin.aliyun.sls.config.AliyunLogCollectConfig;
import org.apache.shenyu.plugin.logging.common.client.AbstractLogConsumeClient;
import org.apache.shenyu.plugin.logging.common.constant.GenericLoggingConstant;
import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.NonNull;

public class AliyunSlsLogCollectClient
extends AbstractLogConsumeClient<AliyunLogCollectConfig.AliyunSlsLogConfig, ShenyuRequestLog> {
    private Client client;
    private String projectName;
    private String logStore;
    private String topic;
    private Producer producer;
    private ThreadPoolExecutor threadExecutor;

    public void initClient0(@NonNull AliyunLogCollectConfig.AliyunSlsLogConfig config) {
        String accessId = config.getAccessId();
        String accessKey = config.getAccessKey();
        String host = config.getHost();
        if (StringUtils.isBlank((CharSequence)accessId) || StringUtils.isBlank((CharSequence)accessKey) || StringUtils.isBlank((CharSequence)host)) {
            LOG.error("init aliyun sls client error, please check accessId, accessKey or host");
            return;
        }
        this.client = new Client(host, accessId, accessKey);
        this.projectName = config.getProjectName();
        this.topic = config.getTopic();
        this.logStore = config.getLogStoreName();
        int ttlInDay = config.getTtlInDay();
        int shardCount = config.getShardCount();
        ProjectConfig projectConfig = new ProjectConfig(this.projectName, host, accessId, accessKey);
        this.producer = AliyunSlsLogCollectClient.createProducer(config, projectConfig);
        LogStore store = new LogStore(this.logStore, ttlInDay, shardCount);
        this.threadExecutor = AliyunSlsLogCollectClient.createThreadPoolExecutor(config);
        try {
            this.client.CreateLogStore(this.projectName, store);
        }
        catch (LogException e) {
            LOG.warn("error code:{}, error message:{}", (Object)e.GetErrorCode(), (Object)e.GetErrorMessage());
        }
    }

    public void consume0(@NonNull List<ShenyuRequestLog> logs) {
        logs.forEach(this::sendLog);
    }

    public void close0() throws Exception {
        if (Objects.nonNull(this.client)) {
            this.client.shutdown();
            this.producer.close();
        }
    }

    private void sendLog(ShenyuRequestLog log) {
        ArrayList<LogItem> logGroup = new ArrayList<LogItem>();
        LogItem logItem = new LogItem((int)(System.currentTimeMillis() / 1000L));
        logItem.PushBack("level", "info");
        logItem.PushBack("name", log.getRequestUri());
        logItem.PushBack("message", GsonUtils.getGson().toJson((Object)log));
        logGroup.add(logItem);
        try {
            ListenableFuture f = this.producer.send(this.projectName, this.logStore, this.topic, "shenyu-gateway", logGroup);
            Futures.addCallback((ListenableFuture)f, (FutureCallback)new ProducerFutureCallback(this.projectName, this.logStore), (Executor)this.threadExecutor);
        }
        catch (InterruptedException e) {
            LOG.warn("The current thread has been interrupted during send logs.");
        }
        catch (Exception e) {
            if (e instanceof MaxBatchCountExceedException) {
                LOG.error("The logs exceeds the maximum batch count, e={}", (Object)e.getMessage());
            }
            if (e instanceof LogSizeTooLargeException) {
                LOG.error("The size of log is larger than the maximum allowable size, e={}", (Object)e.getMessage());
            }
            LOG.error("Failed to send logs, e={}", (Object)e.getMessage());
        }
    }

    private static Producer createProducer(AliyunLogCollectConfig.AliyunSlsLogConfig config, ProjectConfig projectConfig) {
        int ioThreadCount = config.getIoThreadCount();
        if (ioThreadCount > GenericLoggingConstant.MAX_ALLOW_THREADS) {
            LOG.warn("io thread count number too large!");
            ioThreadCount = GenericLoggingConstant.MAX_ALLOW_THREADS;
        }
        ProducerConfig producerConfig = new ProducerConfig();
        producerConfig.setIoThreadCount(ioThreadCount);
        producerConfig.setLogFormat(ProducerConfig.LogFormat.JSON);
        LogProducer producer = new LogProducer(producerConfig);
        producer.putProjectConfig(projectConfig);
        return producer;
    }

    private static ThreadPoolExecutor createThreadPoolExecutor(AliyunLogCollectConfig.AliyunSlsLogConfig config) {
        int sendThreadCount = config.getSendThreadCount();
        if (sendThreadCount > GenericLoggingConstant.MAX_ALLOW_THREADS) {
            LOG.warn("send thread count number too large!");
            sendThreadCount = GenericLoggingConstant.MAX_ALLOW_THREADS;
        }
        return new ThreadPoolExecutor(sendThreadCount, GenericLoggingConstant.MAX_ALLOW_THREADS, 60000L, TimeUnit.MICROSECONDS, new LinkedBlockingQueue<Runnable>(GenericLoggingConstant.MAX_QUEUE_NUMBER), ShenyuThreadFactory.create((String)"shenyu-aliyun-sls", (boolean)true), new ThreadPoolExecutor.AbortPolicy());
    }

    private static final class ProducerFutureCallback
    implements FutureCallback<Result> {
        private static final Logger LOGGER = LoggerFactory.getLogger(ProducerFutureCallback.class);
        private final String project;
        private final String logStore;

        ProducerFutureCallback(String project, String logStore) {
            this.project = project;
            this.logStore = logStore;
        }

        public void onSuccess(@Nullable Result result) {
            LOGGER.info("Send logs to aliyun sls successfully.");
        }

        public void onFailure(Throwable throwable) {
            if (throwable instanceof ResultFailedException) {
                Result result = ((ResultFailedException)throwable).getResult();
                LOGGER.error("Failed to send logs, project={}, logStore={}, result={}", new Object[]{this.project, this.logStore, result});
            } else {
                LOGGER.error("Failed to send log, e={}", (Object)throwable.getMessage());
            }
        }
    }
}

