package com.starrocks.connector.spark.backend;

import com.starrocks.connector.spark.cfg.ConfigurationOptions;
import com.starrocks.connector.spark.cfg.Settings;
import com.starrocks.connector.spark.exception.ConnectedFailedException;
import com.starrocks.connector.spark.exception.StarrocksException;
import com.starrocks.connector.spark.exception.StarrocksInternalException;
import com.starrocks.connector.spark.serialization.Routing;
import com.starrocks.connector.spark.util.ErrorMessages;
import com.starrocks.shade.org.apache.thrift.TException;
import com.starrocks.shade.org.apache.thrift.protocol.TBinaryProtocol;
import com.starrocks.shade.org.apache.thrift.transport.TSocket;
import com.starrocks.shade.org.apache.thrift.transport.TTransport;
import com.starrocks.shade.org.apache.thrift.transport.TTransportException;
import com.starrocks.thrift.TScanBatchResult;
import com.starrocks.thrift.TScanCloseParams;
import com.starrocks.thrift.TScanCloseResult;
import com.starrocks.thrift.TScanNextBatchParams;
import com.starrocks.thrift.TScanOpenParams;
import com.starrocks.thrift.TScanOpenResult;
import com.starrocks.thrift.TStarrocksExternalService;
import com.starrocks.thrift.TStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/starrocks/connector/spark/backend/BackendClient.class */
public class BackendClient {
    private static Logger logger = LoggerFactory.getLogger(BackendClient.class);
    private Routing routing;
    private TStarrocksExternalService.Client client;
    private TTransport transport;
    private boolean isConnected = false;
    private final int retries;
    private final int socketTimeout;
    private final int connectTimeout;

    public BackendClient(Routing routing, Settings settings) throws ConnectedFailedException {
        this.routing = routing;
        this.connectTimeout = settings.getIntegerProperty(ConfigurationOptions.STARROCKS_REQUEST_CONNECT_TIMEOUT_MS, 30000);
        this.socketTimeout = settings.getIntegerProperty(ConfigurationOptions.STARROCKS_REQUEST_READ_TIMEOUT_MS, 30000);
        this.retries = settings.getIntegerProperty(ConfigurationOptions.STARROCKS_REQUEST_RETRIES, 3);
        logger.trace("connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'.", new Object[]{Integer.valueOf(this.connectTimeout), Integer.valueOf(this.socketTimeout), Integer.valueOf(this.retries)});
        open();
    }

    private void open() throws ConnectedFailedException {
        logger.debug("Open client to StarRocks BE '{}'.", this.routing);
        TTransportException tTransportException = null;
        int i = 0;
        while (true) {
            if (this.isConnected || i >= this.retries) {
                break;
            }
            logger.debug("Attempt {} to connect {}.", Integer.valueOf(i), this.routing);
            TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
            try {
                this.transport = new TSocket(this.routing.getHost(), this.routing.getPort(), this.socketTimeout, this.connectTimeout);
                this.client = new TStarrocksExternalService.Client(factory.getProtocol(this.transport));
                logger.trace("Connect status before open transport to {} is '{}'.", this.routing, Boolean.valueOf(this.isConnected));
                if (!this.transport.isOpen()) {
                    this.transport.open();
                    this.isConnected = true;
                }
            } catch (TTransportException e) {
                logger.warn(ErrorMessages.CONNECT_FAILED_MESSAGE, this.routing, e);
                tTransportException = e;
            }
            if (this.isConnected) {
                logger.info("Success connect to {}.", this.routing);
                break;
            }
            i++;
        }
        if (this.isConnected) {
            return;
        }
        logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, this.routing);
        throw new ConnectedFailedException(this.routing.toString(), tTransportException);
    }

    private void close() {
        logger.trace("Connect status before close with '{}' is '{}'.", this.routing, Boolean.valueOf(this.isConnected));
        this.isConnected = false;
        if (null != this.client) {
            this.client = null;
        }
        if (this.transport == null || !this.transport.isOpen()) {
            return;
        }
        this.transport.close();
        logger.info("Closed a connection to {}.", this.routing);
    }

    public TScanOpenResult openScanner(TScanOpenParams tScanOpenParams) throws ConnectedFailedException {
        logger.debug("OpenScanner to '{}', parameter is '{}'.", this.routing, tScanOpenParams);
        if (!this.isConnected) {
            open();
        }
        TException tException = null;
        for (int i = 0; i < this.retries; i++) {
            logger.debug("Attempt {} to openScanner {}.", Integer.valueOf(i), this.routing);
            try {
                TScanOpenResult open_scanner = this.client.open_scanner(tScanOpenParams);
                if (open_scanner == null) {
                    logger.warn("Open scanner result from {} is null.", this.routing);
                } else {
                    if (TStatusCode.OK.equals(open_scanner.getStatus().getStatus_code())) {
                        return open_scanner;
                    }
                    logger.warn("The status of open scanner result from {} is '{}', error message is: {}.", new Object[]{this.routing, open_scanner.getStatus().getStatus_code(), open_scanner.getStatus().getError_msgs()});
                }
            } catch (TException e) {
                logger.warn("Open scanner from {} failed.", this.routing, e);
                tException = e;
            }
        }
        logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, this.routing);
        throw new ConnectedFailedException(this.routing.toString(), tException);
    }

    public TScanBatchResult getNext(TScanNextBatchParams tScanNextBatchParams) throws StarrocksException {
        logger.debug("GetNext to '{}', parameter is '{}'.", this.routing, tScanNextBatchParams);
        if (!this.isConnected) {
            open();
        }
        TException tException = null;
        TScanBatchResult tScanBatchResult = null;
        for (int i = 0; i < this.retries; i++) {
            logger.debug("Attempt {} to getNext {}.", Integer.valueOf(i), this.routing);
            try {
                tScanBatchResult = this.client.get_next(tScanNextBatchParams);
                if (tScanBatchResult == null) {
                    logger.warn("GetNext result from {} is null.", this.routing);
                } else {
                    if (TStatusCode.OK.equals(tScanBatchResult.getStatus().getStatus_code())) {
                        return tScanBatchResult;
                    }
                    logger.warn("The status of get next result from {} is '{}', error message is: {}.", new Object[]{this.routing, tScanBatchResult.getStatus().getStatus_code(), tScanBatchResult.getStatus().getError_msgs()});
                }
            } catch (TException e) {
                logger.warn("Get next from {} failed.", this.routing, e);
                tException = e;
            }
        }
        if (tScanBatchResult == null || TStatusCode.OK == tScanBatchResult.getStatus().getStatus_code()) {
            logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, this.routing);
            throw new ConnectedFailedException(this.routing.toString(), tException);
        }
        logger.error(ErrorMessages.STARROCKS_INTERNAL_FAIL_MESSAGE, new Object[]{this.routing, tScanBatchResult.getStatus().getStatus_code(), tScanBatchResult.getStatus().getError_msgs()});
        throw new StarrocksInternalException(this.routing.toString(), tScanBatchResult.getStatus().getStatus_code(), tScanBatchResult.getStatus().getError_msgs());
    }

    public void closeScanner(TScanCloseParams tScanCloseParams) {
        logger.debug("CloseScanner to '{}', parameter is '{}'.", this.routing, tScanCloseParams);
        if (!this.isConnected) {
            try {
                open();
            } catch (ConnectedFailedException e) {
                logger.warn("Cannot connect to StarRocks BE {} when close scanner.", this.routing);
                return;
            }
        }
        for (int i = 0; i < this.retries; i++) {
            logger.debug("Attempt {} to closeScanner {}.", Integer.valueOf(i), this.routing);
            try {
                TScanCloseResult close_scanner = this.client.close_scanner(tScanCloseParams);
                if (close_scanner == null) {
                    logger.warn("CloseScanner result from {} is null.", this.routing);
                } else if (TStatusCode.OK.equals(close_scanner.getStatus().getStatus_code())) {
                    break;
                } else {
                    logger.warn("The status of get next result from {} is '{}', error message is: {}.", new Object[]{this.routing, close_scanner.getStatus().getStatus_code(), close_scanner.getStatus().getError_msgs()});
                }
            } catch (TException e2) {
                logger.warn("Close scanner from {} failed.", this.routing, e2);
            }
        }
        logger.info("CloseScanner to StarRocks BE '{}' success.", this.routing);
        close();
    }
}
