package com.alibaba.ververica.connectors.hologres.sink;

import com.alibaba.ververica.connectors.common.MetricUtils;
import com.alibaba.ververica.connectors.common.sink.HasRetryTimeout;
import com.alibaba.ververica.connectors.common.sink.Syncable;
import com.alibaba.ververica.connectors.common.source.resolver.DirtyDataStrategy;
import com.alibaba.ververica.connectors.hologres.api.AbstractHologresWriter;
import com.alibaba.ververica.connectors.hologres.config.HologresConnectionParam;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.Objects;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Meter;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/ververica/connectors/hologres/sink/AbstractHologresOutputFormat.class */
public abstract class AbstractHologresOutputFormat<T> extends RichOutputFormat<T> implements Syncable, HasRetryTimeout {
    private static final transient Logger LOG = LoggerFactory.getLogger((Class<?>) AbstractHologresOutputFormat.class);
    private static final long serialVersionUID = 5113221824076190115L;
    protected final HologresConnectionParam param;
    protected boolean ignoreDelete;
    protected Meter outTps;
    protected Meter outBps;
    protected Counter sinkSkipCounter;
    protected DirtyDataStrategy dirtyDataStrategy;
    protected AbstractHologresWriter<T> hologresIOClient;
    protected Tuple2<String, Exception> exception = null;

    public AbstractHologresOutputFormat(HologresConnectionParam hologresConnectionParam, AbstractHologresWriter<T> abstractHologresWriter) {
        this.param = (HologresConnectionParam) Preconditions.checkNotNull(hologresConnectionParam);
        this.ignoreDelete = hologresConnectionParam.isIgnoreDelete();
        this.dirtyDataStrategy = hologresConnectionParam.getDirtyDataStrategy();
        this.hologresIOClient = abstractHologresWriter;
    }

    public void open(int i, int i2) throws IOException {
        this.exception = null;
        LOG.info("Opening {} for frontend: {}, database: {} and table: {}", getClass().getSimpleName(), this.param.getEndpoint(), this.param.getDatabase(), this.param.getTable());
        this.outTps = MetricUtils.registerNumRecordsOutRate(getRuntimeContext());
        this.outBps = MetricUtils.registerNumBytesOutRate(getRuntimeContext(), "hologres");
        this.sinkSkipCounter = MetricUtils.registerNumRecordsOutErrors(getRuntimeContext());
        this.hologresIOClient.open(getRuntimeContext());
        LOG.info("Finished opening {}", getClass().getSimpleName());
    }

    public void close() throws IOException {
        this.hologresIOClient.flush();
        this.hologresIOClient.close();
        LOG.info("Finished closing {}", getClass().getSimpleName());
    }

    public void writeRecord(T t) throws IOException {
        if (Objects.nonNull(this.exception)) {
            throw new IOException(String.format("An exception occurred in the sync operation called during the last checkpoint. at %s.", this.exception.f0), (Throwable) this.exception.f1);
        }
        if (this.outTps != null) {
            this.outTps.markEvent();
        }
        try {
            long writeData = writeData(t);
            if (this.outBps != null && writeData > 0) {
                this.outBps.markEvent(writeData);
            }
        } catch (IOException e) {
            LOG.error("Upsert data '{}' failed, caused by {}", t, ExceptionUtils.getStackTrace(e));
            if (!this.dirtyDataStrategy.equals(DirtyDataStrategy.SKIP) && !this.dirtyDataStrategy.equals(DirtyDataStrategy.SKIP_SILENT)) {
                throw new IOException(e);
            }
            this.sinkSkipCounter.inc();
        }
    }

    @Override // com.alibaba.ververica.connectors.common.sink.HasRetryTimeout
    public long getRetryTimeout() {
        return 0L;
    }

    @Override // com.alibaba.ververica.connectors.common.sink.Syncable
    public void sync() throws IOException {
        LOG.info("start to wait request to finish");
        try {
            this.hologresIOClient.flush();
            LOG.info("end to wait request to finish");
        } catch (IOException e) {
            LOG.info("Flush messages failed, %s", (Throwable) e);
            if (this.dirtyDataStrategy.equals(DirtyDataStrategy.SKIP) || this.dirtyDataStrategy.equals(DirtyDataStrategy.SKIP_SILENT)) {
                return;
            }
            this.exception = new Tuple2<>(LocalDateTime.now().toString(), e);
            throw e;
        }
    }

    public void configure(Configuration configuration) {
    }

    public abstract long writeData(T t) throws IOException;
}
