package com.alibaba.ververica.connectors.hologres.jdbc;

import com.alibaba.hologres.client.Put;
import com.alibaba.hologres.client.exception.HoloClientException;
import com.alibaba.hologres.client.model.Record;
import com.alibaba.hologres.client.utils.Metrics;
import com.alibaba.ververica.connectors.common.MetricUtils;
import com.alibaba.ververica.connectors.common.metrics.SimpleGauge;
import com.alibaba.ververica.connectors.hologres.api.AbstractHologresWriter;
import com.alibaba.ververica.connectors.hologres.api.HologresRecordConverter;
import com.alibaba.ververica.connectors.hologres.api.HologresTableSchema;
import com.alibaba.ververica.connectors.hologres.api.table.HologresRowDataConverter;
import com.alibaba.ververica.connectors.hologres.api.table.HologresSinkRecordConverter;
import com.alibaba.ververica.connectors.hologres.config.HologresConnectionParam;
import com.alibaba.ververica.connectors.hologres.utils.SchemaUtils;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/ververica/connectors/hologres/jdbc/HologresJDBCWriter.class */
public class HologresJDBCWriter<T> extends AbstractHologresWriter<T> {
    private static final transient Logger LOG = LoggerFactory.getLogger((Class<?>) HologresJDBCWriter.class);
    private transient HologresJDBCClientProvider clientProvider;
    private final HologresRecordConverter<T, Record> recordConverter;
    private SimpleGauge currentSendTime;
    ScheduledExecutorService backgroundExecutorService;

    public HologresJDBCWriter(HologresConnectionParam hologresConnectionParam, TableSchema tableSchema, Integer[] numArr, HologresRecordConverter<T, Record> hologresRecordConverter) {
        super(hologresConnectionParam, SchemaUtils.getTargetFieldNames(tableSchema, numArr));
        this.recordConverter = hologresRecordConverter;
    }

    public static HologresJDBCWriter<RowData> createRowDataWriter(HologresConnectionParam hologresConnectionParam, TableSchema tableSchema, HologresTableSchema hologresTableSchema, Integer[] numArr) {
        return new HologresJDBCWriter<>(hologresConnectionParam, tableSchema, numArr, new HologresRowDataConverter(tableSchema, numArr, hologresConnectionParam, new HologresJDBCRecordWriter(hologresConnectionParam), new HologresJDBCRecordReader(tableSchema.getFieldNames(), hologresTableSchema), hologresTableSchema));
    }

    public static HologresJDBCWriter<SinkRecord> createSinkRecordWriter(HologresConnectionParam hologresConnectionParam, TableSchema tableSchema) {
        return new HologresJDBCWriter<>(hologresConnectionParam, tableSchema, new Integer[0], new HologresSinkRecordConverter(hologresConnectionParam, tableSchema));
    }

    @Override // com.alibaba.ververica.connectors.hologres.api.AbstractHologresIOClient
    public void open(RuntimeContext runtimeContext) {
        LOG.info("Initiating {} for database: {} and table: {}, table orientation {}, the whole configs is {}", getClass().getSimpleName(), this.param.getDatabase(), this.param.getTable(), HologresTableSchema.get(this.param).get().getOrientation(), this.param);
        this.clientProvider = new HologresJDBCClientProvider(this.param);
        this.currentSendTime = MetricUtils.registerCurrentSendTime(runtimeContext);
        this.recordConverter.open(runtimeContext);
        this.backgroundExecutorService = Executors.newSingleThreadScheduledExecutor();
        this.backgroundExecutorService.scheduleAtFixedRate(this::reportWriteLatency, 0L, 30L, TimeUnit.SECONDS);
        LOG.info("Successfully initiated connection to database [{}] / table[{}]", this.param.getJdbcOptions().getDatabase(), this.param.getTable());
    }

    @Override // com.alibaba.ververica.connectors.hologres.api.AbstractHologresWriter
    public long writeAddRecord(T t) throws IOException {
        Record convertFrom = this.recordConverter.convertFrom(t);
        convertFrom.setType(Put.MutationType.INSERT);
        try {
            this.clientProvider.getClient().put(new Put(convertFrom));
            return convertFrom.getByteSize();
        } catch (HoloClientException e) {
            throw new IOException(e);
        }
    }

    @Override // com.alibaba.ververica.connectors.hologres.api.AbstractHologresWriter
    public long writeDeleteRecord(T t) throws IOException {
        Record convertFrom = this.recordConverter.convertFrom(t);
        convertFrom.setType(Put.MutationType.DELETE);
        try {
            this.clientProvider.getClient().put(new Put(convertFrom));
            return convertFrom.getByteSize();
        } catch (HoloClientException e) {
            throw new IOException(e);
        }
    }

    @Override // com.alibaba.ververica.connectors.hologres.api.AbstractHologresWriter
    public void flush() throws IOException {
        try {
            this.clientProvider.getClient().flush();
        } catch (HoloClientException e) {
            throw new IOException(e);
        }
    }

    @Override // com.alibaba.ververica.connectors.hologres.api.AbstractHologresIOClient
    public void close() {
        this.backgroundExecutorService.shutdown();
        this.clientProvider.closeClient();
    }

    private void reportWriteLatency() {
        if (Metrics.registry().getHistograms().get(Metrics.METRICS_WRITE_LATENCY) == null || this.currentSendTime == null) {
            return;
        }
        this.currentSendTime.report(Metrics.registry().getHistograms().get(Metrics.METRICS_WRITE_LATENCY).getSnapshot().get999thPercentile());
    }
}
