package io.cdap.plugin.gcp.datastore.source;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.datastore.v1.Entity;
import com.google.datastore.v1.EntityResult;
import com.google.datastore.v1.Key;
import com.google.datastore.v1.KindExpression;
import com.google.datastore.v1.PartitionId;
import com.google.datastore.v1.Query;
import com.google.datastore.v1.RunQueryRequest;
import com.google.datastore.v1.Value;
import com.google.datastore.v1.client.DatastoreException;
import com.google.datastore.v1.client.DatastoreHelper;
import com.google.protobuf.Int32Value;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.batch.Input;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageConfigurer;
import io.cdap.cdap.etl.api.batch.BatchContext;
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.gcp.datastore.util.DatastoreUtil;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.hadoop.io.NullWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Name("Datastore")
@Description("Google Cloud Datastore is a NoSQL document database built for automatic scaling and high performance. Source plugin provides ability to read data from it by Kind with various filters usage.")
@Plugin(type = "batchsource")
/* loaded from: input_file:io/cdap/plugin/gcp/datastore/source/DatastoreSource.class */
public class DatastoreSource extends BatchSource<NullWritable, Entity, StructuredRecord> {
    public static final String NAME = "Datastore";
    private final DatastoreSourceConfig config;
    private EntityToRecordTransformer entityToRecordTransformer;
    private static final Logger LOG = LoggerFactory.getLogger(DatastoreSource.class);
    private static final Map<Value.ValueTypeCase, Schema> SUPPORTED_SIMPLE_TYPES = new ImmutableMap.Builder().put(Value.ValueTypeCase.STRING_VALUE, Schema.of(Schema.Type.STRING)).put(Value.ValueTypeCase.INTEGER_VALUE, Schema.of(Schema.Type.LONG)).put(Value.ValueTypeCase.DOUBLE_VALUE, Schema.of(Schema.Type.DOUBLE)).put(Value.ValueTypeCase.BOOLEAN_VALUE, Schema.of(Schema.Type.BOOLEAN)).put(Value.ValueTypeCase.TIMESTAMP_VALUE, Schema.of(Schema.LogicalType.TIMESTAMP_MICROS)).put(Value.ValueTypeCase.BLOB_VALUE, Schema.of(Schema.Type.BYTES)).put(Value.ValueTypeCase.NULL_VALUE, Schema.of(Schema.Type.NULL)).build();

    public DatastoreSource(DatastoreSourceConfig datastoreSourceConfig) {
        this.config = datastoreSourceConfig;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        LOG.debug("Validate config during `configurePipeline` stage: {}", this.config);
        StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
        FailureCollector failureCollector = stageConfigurer.getFailureCollector();
        this.config.validate(failureCollector);
        failureCollector.getOrThrowException();
        Schema schema = this.config.getSchema(failureCollector);
        if (!this.config.shouldConnect()) {
            stageConfigurer.setOutputSchema(schema);
        } else if (schema == null) {
            stageConfigurer.setOutputSchema(getSchema(failureCollector));
        } else {
            pipelineConfigurer.getStageConfigurer().setOutputSchema(schema);
        }
    }

    public void prepareRun(BatchSourceContext batchSourceContext) {
        LOG.debug("Validate config during `prepareRun` stage: {}", this.config);
        FailureCollector failureCollector = batchSourceContext.getFailureCollector();
        this.config.validate(failureCollector);
        failureCollector.getOrThrowException();
        batchSourceContext.setInput(Input.of(this.config.getReferenceName(), new DatastoreInputFormatProvider(this.config.getProject(), this.config.getServiceAccount(), this.config.isServiceAccountFilePath(), this.config.getNamespace(), this.config.getKind(), this.config.constructPbQuery(failureCollector).toString(), String.valueOf(this.config.getNumSplits()))));
        Schema outputSchema = batchSourceContext.getOutputSchema();
        LineageRecorder lineageRecorder = new LineageRecorder((BatchContext) batchSourceContext, this.config.getReferenceName());
        lineageRecorder.createExternalDataset(outputSchema);
        lineageRecorder.recordRead("Read", "Read from Cloud Datastore.", (List) ((List) Objects.requireNonNull(outputSchema.getFields())).stream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toList()));
    }

    public void initialize(BatchRuntimeContext batchRuntimeContext) throws Exception {
        super.initialize(batchRuntimeContext);
        this.entityToRecordTransformer = new EntityToRecordTransformer(batchRuntimeContext.getOutputSchema(), this.config.getKeyType(batchRuntimeContext.getFailureCollector()), this.config.getKeyAlias());
    }

    public void transform(KeyValue<NullWritable, Entity> keyValue, Emitter<StructuredRecord> emitter) {
        emitter.emit(this.entityToRecordTransformer.transformEntity((Entity) keyValue.getValue()));
    }

    private Schema getSchema(FailureCollector failureCollector) {
        Query.Builder limit = Query.newBuilder().addKind(KindExpression.newBuilder().setName(this.config.getKind()).build()).setLimit(Int32Value.of(1));
        Key constructAncestorKey = constructAncestorKey(this.config, failureCollector);
        if (constructAncestorKey != null) {
            limit.setFilter(DatastoreHelper.makeAncestorFilter(constructAncestorKey).build());
        }
        Query build = limit.build();
        LOG.debug("Executing query for `Get Schema`: {}", build);
        try {
            Iterator<EntityResult> it = DatastoreUtil.getDatastoreV1(this.config.getServiceAccount(), this.config.isServiceAccountFilePath(), this.config.getProject()).runQuery(RunQueryRequest.newBuilder().setQuery(build).setPartitionId(PartitionId.newBuilder().setNamespaceId(this.config.getNamespace()).setProjectId(this.config.getProject())).build()).getBatch().getEntityResultsList().iterator();
            if (it.hasNext()) {
                return constructSchema(it.next().getEntity(), this.config.isIncludeKey(failureCollector), this.config.getKeyAlias());
            }
            failureCollector.addFailure("Cloud Datastore query did not return any results. ", "Ensure Namespace, Kind and Ancestor properties are correct.").withConfigProperty("namespace").withConfigProperty("kind").withConfigProperty("ancestor");
            throw failureCollector.getOrThrowException();
        } catch (DatastoreException e) {
            failureCollector.addFailure("Unable to fetch data from Datastore: " + e.getMessage(), (String) null).withStacktrace(e.getStackTrace());
            throw failureCollector.getOrThrowException();
        }
    }

    @VisibleForTesting
    @Nullable
    Key constructAncestorKey(DatastoreSourceConfig datastoreSourceConfig, FailureCollector failureCollector) {
        List<Key.PathElement> ancestor = datastoreSourceConfig.getAncestor(failureCollector);
        if (ancestor.size() <= 1) {
            return null;
        }
        Key.PathElement pathElement = ancestor.get(ancestor.size() - 1);
        Key.Builder partitionId = Key.newBuilder().setPartitionId(PartitionId.newBuilder().setProjectId(datastoreSourceConfig.getProject()).setNamespaceId(datastoreSourceConfig.getNamespace()));
        List<Key.PathElement> subList = ancestor.subList(0, ancestor.size() - 1);
        partitionId.getClass();
        subList.forEach(partitionId::addPath);
        if (pathElement.getIdTypeCase() == Key.PathElement.IdTypeCase.ID) {
            partitionId.addPath(Key.PathElement.newBuilder().setId(pathElement.getId()).setKind(pathElement.getKind()).build());
        } else {
            partitionId.addPath(Key.PathElement.newBuilder().setName(pathElement.getName()).setKind(pathElement.getKind()).build());
        }
        return partitionId.build();
    }

    @VisibleForTesting
    Schema constructSchema(Entity entity, boolean z, String str) {
        List<Schema.Field> constructSchemaFields = constructSchemaFields(entity);
        if (z) {
            constructSchemaFields.add(Schema.Field.of(str, Schema.of(Schema.Type.STRING)));
        }
        return Schema.recordOf("schema", constructSchemaFields);
    }

    private List<Schema.Field> constructSchemaFields(Entity entity) {
        return (List) entity.getPropertiesMap().entrySet().stream().map(entry -> {
            return transformToField((String) entry.getKey(), (Value) entry.getValue());
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toList());
    }

    private Schema.Field transformToField(String str, Value value) {
        Schema createSchema = createSchema(str, value);
        if (createSchema == null) {
            return null;
        }
        return Schema.Type.NULL == createSchema.getType() ? Schema.Field.of(str, createSchema) : Schema.Field.of(str, Schema.nullableOf(createSchema));
    }

    private Schema createSchema(String str, Value value) {
        Schema schema = SUPPORTED_SIMPLE_TYPES.get(value.getValueTypeCase());
        if (schema != null) {
            return schema;
        }
        switch (value.getValueTypeCase()) {
            case ENTITY_VALUE:
                return Schema.recordOf(str, constructSchemaFields(value.getEntityValue()));
            case ARRAY_VALUE:
                List<Value> valuesList = value.getArrayValue().getValuesList();
                HashSet hashSet = new HashSet();
                Iterator<Value> it = valuesList.iterator();
                while (it.hasNext()) {
                    Schema createSchema = createSchema(str, it.next());
                    if (createSchema == null) {
                        return null;
                    }
                    hashSet.add(createSchema);
                }
                if (hashSet.isEmpty()) {
                    return Schema.arrayOf(Schema.of(Schema.Type.NULL));
                }
                if (hashSet.size() == 1) {
                    Schema schema2 = (Schema) hashSet.iterator().next();
                    return Schema.Type.NULL == schema2.getType() ? Schema.arrayOf(schema2) : Schema.arrayOf(Schema.nullableOf(schema2));
                }
                LOG.debug("Field '{}' has several schemas in array, add them as union of schemas plus {} schema for null values", str, Schema.Type.NULL);
                hashSet.add(Schema.of(Schema.Type.NULL));
                return Schema.arrayOf(Schema.unionOf(hashSet));
            default:
                LOG.debug("Field '{}' is of unsupported type '{}', skipping field from the schema", str, value.getValueTypeCase());
                return null;
        }
    }

    public /* bridge */ /* synthetic */ void transform(Object obj, Emitter emitter) throws Exception {
        transform((KeyValue<NullWritable, Entity>) obj, (Emitter<StructuredRecord>) emitter);
    }
}
