package io.cdap.plugin.salesforce.plugin.source.streaming;

import com.google.common.base.Strings;
import com.sforce.soap.partner.PartnerConnection;
import com.sforce.soap.partner.sobject.SObject;
import com.sforce.ws.ConnectionException;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.DatasetManagementException;
import io.cdap.cdap.api.dataset.DatasetProperties;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.streaming.StreamingContext;
import io.cdap.cdap.etl.api.streaming.StreamingSource;
import io.cdap.cdap.etl.api.streaming.StreamingSourceContext;
import io.cdap.plugin.common.Constants;
import io.cdap.plugin.common.IdUtils;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.salesforce.SObjectDescriptor;
import io.cdap.plugin.salesforce.SalesforceSchemaUtil;
import io.cdap.plugin.salesforce.authenticator.Authenticator;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
import java.util.List;
import java.util.stream.Collectors;
import javax.ws.rs.Path;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.tephra.TransactionFailureException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Name("Salesforce")
@Description(SalesforceStreamingSource.DESCRIPTION)
@Plugin(type = StreamingSource.PLUGIN_TYPE)
/* loaded from: input_file:io/cdap/plugin/salesforce/plugin/source/streaming/SalesforceStreamingSource.class */
public class SalesforceStreamingSource extends StreamingSource<StructuredRecord> {
    static final String NAME = "Salesforce";
    static final String DESCRIPTION = "Streams data updates from Salesforce using Salesforce Streaming API";
    private static final Logger LOG = LoggerFactory.getLogger(SalesforceStreamingSource.class);
    private SalesforceStreamingSourceConfig config;

    public SalesforceStreamingSource(SalesforceStreamingSourceConfig salesforceStreamingSourceConfig) {
        this.config = salesforceStreamingSourceConfig;
    }

    @Override // io.cdap.cdap.etl.api.streaming.StreamingSource
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        FailureCollector failureCollector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
        IdUtils.validateReferenceName(this.config.referenceName, failureCollector);
        pipelineConfigurer.createDataset(this.config.referenceName, Constants.EXTERNAL_DATASET_TYPE, DatasetProperties.EMPTY);
        try {
            this.config.validate(failureCollector);
            this.config.ensurePushTopicExistAndWithCorrectFields();
            String query = this.config.getQuery();
            if (!Strings.isNullOrEmpty(query) && !this.config.containsMacro("pushTopicQuery") && !this.config.containsMacro(SalesforceSourceConstants.PROPERTY_SOBJECT_NAME) && this.config.canAttemptToEstablishConnection()) {
                pipelineConfigurer.getStageConfigurer().setOutputSchema(SalesforceSchemaUtil.getSchema(this.config.getAuthenticatorCredentials(), SObjectDescriptor.fromQuery(query)));
            }
        } catch (ConnectionException e) {
            failureCollector.addFailure("There was issue communicating with Salesforce: " + e.getMessage(), (String) null).withStacktrace(e.getStackTrace());
        }
    }

    @Override // io.cdap.cdap.etl.api.streaming.StreamingSource
    public void prepareRun(StreamingSourceContext streamingSourceContext) throws Exception {
        Schema inputSchema = streamingSourceContext.getInputSchema();
        if (inputSchema == null || inputSchema.getFields() == null) {
            return;
        }
        recordLineage(streamingSourceContext, this.config.referenceName, inputSchema, "Read", String.format("Read from Salesforce Stream with push topic of %s.", this.config.getPushTopicName()));
    }

    @Override // io.cdap.cdap.etl.api.streaming.StreamingSource
    public JavaDStream<StructuredRecord> getStream(StreamingContext streamingContext) throws ConnectionException {
        FailureCollector failureCollector = streamingContext.getFailureCollector();
        this.config.validate(failureCollector);
        failureCollector.getOrThrowException();
        return SalesforceStreamingSourceUtil.getStructuredRecordJavaDStream(streamingContext, this.config);
    }

    @Path("outputSchema")
    public Schema outputSchema(SalesforceStreamingSourceConfig salesforceStreamingSourceConfig) throws Exception {
        AuthenticatorCredentials authenticatorCredentials = salesforceStreamingSourceConfig.getAuthenticatorCredentials();
        SObject fetchPushTopicByName = SalesforceStreamingSourceConfig.fetchPushTopicByName(new PartnerConnection(Authenticator.createConnectorConfig(authenticatorCredentials)), salesforceStreamingSourceConfig.getPushTopicName());
        return SalesforceSchemaUtil.getSchema(authenticatorCredentials, SObjectDescriptor.fromQuery(fetchPushTopicByName == null ? salesforceStreamingSourceConfig.getQuery() : (String) fetchPushTopicByName.getField("Query")));
    }

    private void recordLineage(StreamingSourceContext streamingSourceContext, String str, Schema schema, String str2, String str3) throws DatasetManagementException, TransactionFailureException {
        if (schema == null) {
            LOG.warn("Schema for output %s is null. Field-level lineage will not be recorded", str);
            return;
        }
        if (schema.getFields() == null) {
            LOG.warn("Schema fields for output %s is empty. Field-level lineage will not be recorded", str);
            return;
        }
        streamingSourceContext.registerLineage(str, schema);
        List<String> list = (List) schema.getFields().stream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            return;
        }
        new LineageRecorder(streamingSourceContext, str).recordRead(str2, str3, list);
    }
}
