package io.cdap.plugin.format.plugin;

import io.cdap.cdap.api.data.batch.Input;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.api.plugin.InvalidPluginConfigException;
import io.cdap.cdap.api.plugin.InvalidPluginProperty;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.cdap.api.plugin.PluginProperties;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchContext;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.cdap.etl.api.validation.FormatContext;
import io.cdap.cdap.etl.api.validation.ValidatingInputFormat;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.common.SourceInputFormatProvider;
import io.cdap.plugin.common.batch.JobUtils;
import io.cdap.plugin.format.FileFormat;
import io.cdap.plugin.format.RegexPathFilter;
import io.cdap.plugin.format.SchemaDetector;
import io.cdap.plugin.format.input.EmptyInputFormat;
import io.cdap.plugin.format.plugin.FileSourceProperties;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:lib/format-avro-2.11.0.jar:lib/format-common-2.11.0.jar:io/cdap/plugin/format/plugin/AbstractFileSource.class
 */
/* loaded from: input_file:lib/format-common-2.11.0.jar:io/cdap/plugin/format/plugin/AbstractFileSource.class */
public abstract class AbstractFileSource<T extends PluginConfig & FileSourceProperties> extends BatchSource<NullWritable, StructuredRecord, StructuredRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractFileSource.class);
    private static final String NAME_FORMAT = "format";
    private final T config;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractFileSource(T t) {
        this.config = t;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        FailureCollector failureCollector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
        this.config.validate(failureCollector);
        failureCollector.getOrThrowException();
        if (this.config.containsMacro("format")) {
            for (FileFormat fileFormat : FileFormat.values()) {
                try {
                    pipelineConfigurer.usePlugin("validatingInputFormat", fileFormat.name().toLowerCase(), fileFormat.name().toLowerCase(), this.config.getRawProperties());
                } catch (InvalidPluginConfigException e) {
                    LOG.warn("Failed to register format '{}', which means it cannot be used when the pipeline is run. Missing properties: {}, invalid properties: {}", new Object[]{fileFormat.name(), e.getMissingProperties(), e.getInvalidProperties().stream().map((v0) -> {
                        return v0.getName();
                    }).collect(Collectors.toList())});
                }
            }
            return;
        }
        String formatName = this.config.getFormatName();
        Schema schema = this.config.getSchema();
        PluginProperties.Builder builder = PluginProperties.builder();
        builder.addAll(this.config.getRawProperties().getProperties());
        ValidatingInputFormat validatingInputFormat = (ValidatingInputFormat) pipelineConfigurer.usePlugin("validatingInputFormat", formatName, formatName, builder.build());
        FormatContext formatContext = new FormatContext(failureCollector, (Schema) null);
        if (validatingInputFormat != null && schema == null && !this.config.containsMacro("schema")) {
            schema = validatingInputFormat.getSchema(formatContext);
            if (schema == null && shouldGetSchema()) {
                try {
                    schema = new SchemaDetector(validatingInputFormat).detectSchema(this.config.getPath(), this.config.getFilePattern(), formatContext, getFileSystemProperties(null));
                } catch (IOException e2) {
                    formatContext.getFailureCollector().addFailure("Error when trying to detect schema: " + e2.getMessage(), (String) null).withStacktrace(e2.getStackTrace());
                }
            }
        }
        validateInputFormatProvider(new FormatContext(failureCollector, schema), formatName, validatingInputFormat);
        validatePathField(failureCollector, schema);
        pipelineConfigurer.getStageConfigurer().setOutputSchema(schema);
    }

    @Override // 
    public void prepareRun(BatchSourceContext batchSourceContext) throws Exception {
        String inputFormatClassName;
        FailureCollector failureCollector = batchSourceContext.getFailureCollector();
        this.config.validate(failureCollector);
        String formatName = this.config.getFormatName();
        try {
            ValidatingInputFormat validatingInputFormat = (ValidatingInputFormat) batchSourceContext.newPluginInstance(formatName);
            FormatContext formatContext = new FormatContext(failureCollector, (Schema) null);
            Schema schema = batchSourceContext.getOutputSchema() == null ? validatingInputFormat.getSchema(formatContext) : batchSourceContext.getOutputSchema();
            Pattern filePattern = this.config.getFilePattern();
            if (schema == null) {
                schema = new SchemaDetector(validatingInputFormat).detectSchema(this.config.getPath(batchSourceContext), filePattern, formatContext, getFileSystemProperties(null));
            }
            validateInputFormatProvider(new FormatContext(failureCollector, schema), formatName, validatingInputFormat);
            validatePathField(failureCollector, schema);
            failureCollector.getOrThrowException();
            Job createInstance = JobUtils.createInstance();
            Configuration configuration = createInstance.getConfiguration();
            if (filePattern != null) {
                RegexPathFilter.configure(configuration, filePattern);
                FileInputFormat.setInputPathFilter(createInstance, RegexPathFilter.class);
            }
            FileInputFormat.setInputDirRecursive(createInstance, this.config.shouldReadRecursively());
            LineageRecorder lineageRecorder = getLineageRecorder(batchSourceContext);
            lineageRecorder.createExternalDataset(schema);
            if (schema != null && schema.getFields() != null) {
                recordLineage(lineageRecorder, (List) schema.getFields().stream().map((v0) -> {
                    return v0.getName();
                }).collect(Collectors.toList()));
            }
            for (Map.Entry<String, String> entry : getFileSystemProperties(batchSourceContext).entrySet()) {
                configuration.set(entry.getKey(), entry.getValue());
            }
            Path path = new Path(this.config.getPath(batchSourceContext));
            if (FileSystem.get(path.toUri(), configuration).globStatus(path) != null) {
                FileInputFormat.addInputPath(createInstance, path);
                FileInputFormat.setMaxInputSplitSize(createInstance, this.config.getMaxSplitSize());
                inputFormatClassName = validatingInputFormat.getInputFormatClassName();
                Configuration configuration2 = createInstance.getConfiguration();
                for (Map.Entry entry2 : validatingInputFormat.getInputFormatConfiguration().entrySet()) {
                    configuration2.set((String) entry2.getKey(), (String) entry2.getValue());
                }
                if (schema != null) {
                    configuration2.set("schema", schema.toString());
                }
            } else {
                if (!this.config.shouldAllowEmptyInput()) {
                    throw new IOException(String.format("Input path %s does not exist", path));
                }
                inputFormatClassName = EmptyInputFormat.class.getName();
            }
            for (Map.Entry<String, String> entry3 : getFileSystemProperties(batchSourceContext).entrySet()) {
                configuration.set(entry3.getKey(), entry3.getValue());
            }
            batchSourceContext.setInput(Input.of(this.config.getReferenceName(), new SourceInputFormatProvider(inputFormatClassName, configuration)));
        } catch (InvalidPluginConfigException e) {
            HashSet hashSet = new HashSet(e.getMissingProperties());
            Iterator it = e.getInvalidProperties().iterator();
            while (it.hasNext()) {
                hashSet.add(((InvalidPluginProperty) it.next()).getName());
            }
            throw new IllegalArgumentException(String.format("Format '%s' cannot be used because properties %s were not provided or were invalid when the pipeline was deployed. Set the format to a different value, or re-create the pipeline with all required properties.", formatName, hashSet), e);
        }
    }

    public void transform(KeyValue<NullWritable, StructuredRecord> keyValue, Emitter<StructuredRecord> emitter) throws Exception {
        emitter.emit(keyValue.getValue());
    }

    protected Map<String, String> getFileSystemProperties(@Nullable BatchSourceContext batchSourceContext) {
        return Collections.emptyMap();
    }

    protected void recordLineage(LineageRecorder lineageRecorder, List<String> list) {
        lineageRecorder.recordRead("Read", String.format("Read from %s files.", this.config.getFormatName()), list);
    }

    private void validateInputFormatProvider(FormatContext formatContext, String str, @Nullable ValidatingInputFormat validatingInputFormat) {
        FailureCollector failureCollector = formatContext.getFailureCollector();
        if (validatingInputFormat == null) {
            failureCollector.addFailure(String.format("Could not find the '%s' input format.", str), (String) null).withPluginNotFound(str, str, "validatingInputFormat");
        } else {
            validatingInputFormat.validate(formatContext);
        }
    }

    protected LineageRecorder getLineageRecorder(BatchSourceContext batchSourceContext) {
        return new LineageRecorder((BatchContext) batchSourceContext, this.config.getReferenceName());
    }

    private void validatePathField(FailureCollector failureCollector, Schema schema) {
        String pathField = this.config.getPathField();
        if (pathField == null || schema == null) {
            return;
        }
        Schema.Field field = schema.getField(pathField);
        if (field == null) {
            failureCollector.addFailure(String.format("Path field '%s' must exist in input schema.", pathField), (String) null).withConfigProperty(FileSourceProperties.PATH_FIELD);
            throw failureCollector.getOrThrowException();
        }
        Schema schema2 = field.getSchema();
        Schema nonNullable = schema2.isNullable() ? schema2.getNonNullable() : schema2;
        if (nonNullable.getType() != Schema.Type.STRING) {
            failureCollector.addFailure(String.format("Path field '%s' is of unsupported type '%s'.", pathField, nonNullable.getDisplayName()), "It must be of type 'string'.").withConfigProperty(FileSourceProperties.PATH_FIELD).withOutputSchemaField(field.getName());
        }
    }

    protected boolean shouldGetSchema() {
        return true;
    }

    public /* bridge */ /* synthetic */ void transform(Object obj, Emitter emitter) throws Exception {
        transform((KeyValue<NullWritable, StructuredRecord>) obj, (Emitter<StructuredRecord>) emitter);
    }
}
