package io.cdap.plugin.gcp.bigtable.source;

import com.google.cloud.bigtable.hbase.BigtableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.batch.Input;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageConfigurer;
import io.cdap.cdap.etl.api.batch.BatchContext;
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.plugin.common.ConfigUtil;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.common.SourceInputFormatProvider;
import io.cdap.plugin.gcp.bigtable.common.HBaseColumn;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.InputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Name("Bigtable")
@Description("This source reads data from Google Cloud Bigtable. Cloud Bigtable is Google's NoSQL Big Data database service.")
@Plugin(type = "batchsource")
/* loaded from: input_file:io/cdap/plugin/gcp/bigtable/source/BigtableSource.class */
public final class BigtableSource extends BatchSource<ImmutableBytesWritable, Result, StructuredRecord> {
    public static final String NAME = "Bigtable";
    private static final Logger LOG = LoggerFactory.getLogger(BigtableSource.class);
    private final BigtableSourceConfig config;
    private HBaseResultToRecordTransformer resultToRecordTransformer;

    public BigtableSource(BigtableSourceConfig bigtableSourceConfig) {
        this.config = bigtableSourceConfig;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        super.configurePipeline(pipelineConfigurer);
        StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
        FailureCollector failureCollector = stageConfigurer.getFailureCollector();
        this.config.validate(failureCollector);
        Schema schema = this.config.getSchema(failureCollector);
        if (schema != null && this.config.connectionParamsConfigured()) {
            try {
                validateOutputSchema(getConfiguration(failureCollector), failureCollector);
            } catch (IOException e) {
                LOG.warn("Failed to validate output schema", e);
            }
        }
        stageConfigurer.setOutputSchema(schema);
    }

    public void prepareRun(BatchSourceContext batchSourceContext) {
        FailureCollector failureCollector = batchSourceContext.getFailureCollector();
        this.config.validate(failureCollector);
        Schema schema = this.config.getSchema(failureCollector);
        Configuration configuration = null;
        try {
            configuration = getConfiguration(failureCollector);
        } catch (IOException e) {
            failureCollector.addFailure(String.format("Failed to prepare configuration for job : %s", e.getMessage()), (String) null).withConfigProperty("bigtableOptions").withStacktrace(e.getStackTrace());
            failureCollector.getOrThrowException();
        }
        try {
            validateOutputSchema(configuration, failureCollector);
        } catch (IOException e2) {
            failureCollector.addFailure(String.format("Failed to connect to Bigtable : %s", e2.getMessage()), (String) null).withStacktrace(e2.getStackTrace());
            failureCollector.getOrThrowException();
        }
        emitLineage(batchSourceContext, schema);
        batchSourceContext.setInput(Input.of(this.config.referenceName, new SourceInputFormatProvider((Class<? extends InputFormat>) BigtableInputFormat.class, configuration)));
    }

    public void initialize(BatchRuntimeContext batchRuntimeContext) throws Exception {
        super.initialize(batchRuntimeContext);
        this.resultToRecordTransformer = new HBaseResultToRecordTransformer(batchRuntimeContext.getOutputSchema(), this.config.keyAlias, this.config.getColumnMappings());
    }

    public void transform(KeyValue<ImmutableBytesWritable, Result> keyValue, Emitter<StructuredRecord> emitter) {
        try {
            emitter.emit(this.resultToRecordTransformer.transform((Result) keyValue.getValue()));
        } catch (Exception e) {
            switch (this.config.getErrorHandling()) {
                case SKIP:
                    LOG.warn("Failed to process message, skipping it", e);
                    return;
                case FAIL_PIPELINE:
                    throw new RuntimeException("Failed to process message", e);
                default:
                    throw new IllegalStateException(String.format("Unknown error handling strategy '%s'", this.config.getErrorHandling()));
            }
        }
    }

    private Configuration getConfiguration(FailureCollector failureCollector) throws IOException {
        Configuration configuration = new Configuration();
        String serviceAccount = this.config.getServiceAccount();
        if (serviceAccount != null) {
            configuration.setBoolean(BigtableOptionsFactory.BIGTABLE_USE_SERVICE_ACCOUNTS_KEY, true);
            if (this.config.isServiceAccountFilePath().booleanValue()) {
                configuration.set(BigtableOptionsFactory.BIGTABLE_SERVICE_ACCOUNT_JSON_KEYFILE_LOCATION_KEY, serviceAccount);
            } else {
                configuration.set(BigtableOptionsFactory.BIGTABLE_SERVICE_ACCOUNT_JSON_VALUE_KEY, serviceAccount);
            }
        }
        BigtableConfiguration.configure(configuration, this.config.getProject(), this.config.instance);
        configuration.setBoolean(TableInputFormat.SHUFFLE_MAPS, true);
        configuration.set(TableInputFormat.INPUT_TABLE, this.config.table);
        Map<String, String> bigtableOptions = this.config.getBigtableOptions();
        configuration.getClass();
        bigtableOptions.forEach(configuration::set);
        configuration.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(getConfiguredScanForJob(failureCollector)));
        return configuration;
    }

    private void validateOutputSchema(Configuration configuration, FailureCollector failureCollector) throws IOException {
        TableName valueOf = TableName.valueOf(this.config.table);
        Connection connect = BigtableConfiguration.connect(configuration);
        Throwable th = null;
        try {
            Table table = connect.getTable(valueOf);
            Throwable th2 = null;
            try {
                try {
                    Set set = (Set) table.getTableDescriptor().getFamiliesKeys().stream().map(Bytes::toString).collect(Collectors.toSet());
                    for (HBaseColumn hBaseColumn : this.config.getRequestedColumns(failureCollector)) {
                        if (!set.contains(hBaseColumn.getFamily())) {
                            Map<String, String> columnMappings = this.config.getColumnMappings();
                            String qualifiedName = hBaseColumn.getQualifiedName();
                            failureCollector.addFailure(String.format("Column family '%s' does not exist.", hBaseColumn.getFamily()), "Specify correct column family.").withConfigElement("columnMappings", ConfigUtil.getKVPair(qualifiedName, columnMappings.get(qualifiedName), "="));
                        }
                    }
                    if (table != null) {
                        if (0 != 0) {
                            try {
                                table.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            table.close();
                        }
                    }
                    if (connect != null) {
                        if (0 == 0) {
                            connect.close();
                            return;
                        }
                        try {
                            connect.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (table != null) {
                    if (th2 != null) {
                        try {
                            table.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        table.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (connect != null) {
                if (0 != 0) {
                    try {
                        connect.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    connect.close();
                }
            }
            throw th8;
        }
    }

    private void emitLineage(BatchSourceContext batchSourceContext, Schema schema) {
        LineageRecorder lineageRecorder = new LineageRecorder((BatchContext) batchSourceContext, this.config.referenceName);
        lineageRecorder.createExternalDataset(schema);
        List fields = ((Schema) Objects.requireNonNull(this.config.getSchema(batchSourceContext.getFailureCollector()))).getFields();
        if (fields != null) {
            lineageRecorder.recordRead("Read", String.format("Read from Bigtable. Project: '%s', Instance: '%s'. Table: '%s'", this.config.getProject(), this.config.instance, this.config.table), (List) fields.stream().map((v0) -> {
                return v0.getName();
            }).collect(Collectors.toList()));
        }
    }

    private Scan getConfiguredScanForJob(FailureCollector failureCollector) {
        Scan scan = new Scan();
        try {
            if (this.config.scanTimeRangeStart != null || this.config.scanTimeRangeStop != null) {
                scan.setTimeRange(((Long) ObjectUtils.defaultIfNull(this.config.scanTimeRangeStart, 0L)).longValue(), ((Long) ObjectUtils.defaultIfNull(this.config.scanTimeRangeStop, Long.MAX_VALUE)).longValue());
            }
        } catch (IOException e) {
            failureCollector.addFailure(String.format("Unable to set time range configuration : %s", e.getMessage()), (String) null).withConfigProperty(BigtableSourceConfig.SCAN_TIME_RANGE_START).withConfigProperty(BigtableSourceConfig.SCAN_TIME_RANGE_STOP).withStacktrace(e.getStackTrace());
        }
        scan.setCacheBlocks(false);
        if (this.config.scanRowStart != null) {
            scan.withStartRow(Bytes.toBytes(this.config.scanRowStart));
        }
        if (this.config.scanRowStop != null) {
            scan.withStopRow(Bytes.toBytes(this.config.scanRowStop));
        }
        for (HBaseColumn hBaseColumn : this.config.getRequestedColumns(failureCollector)) {
            scan.addColumn(Bytes.toBytes(hBaseColumn.getFamily()), Bytes.toBytes(hBaseColumn.getQualifier()));
        }
        return scan;
    }

    public /* bridge */ /* synthetic */ void transform(Object obj, Emitter emitter) throws Exception {
        transform((KeyValue<ImmutableBytesWritable, Result>) obj, (Emitter<StructuredRecord>) emitter);
    }
}
