/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.commit;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.MergingIterator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils;
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.org.apache.avro.SchemaCompatibility;
import org.apache.hudi.org.apache.avro.generic.GenericDatumReader;
import org.apache.hudi.org.apache.avro.generic.GenericDatumWriter;
import org.apache.hudi.org.apache.avro.generic.GenericRecord;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.BaseMergeHelper;

public class HoodieMergeHelper<T extends HoodieRecordPayload>
extends BaseMergeHelper<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> {
    private HoodieMergeHelper() {
    }

    public static HoodieMergeHelper newInstance() {
        return MergeHelperHolder.HOODIE_MERGE_HELPER;
    }

    @Override
    public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table, HoodieMergeHandle<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> mergeHandle) throws IOException {
        GenericDatumReader gReader;
        GenericDatumWriter gWriter;
        Schema readSchema;
        boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation();
        HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
        Configuration hadoopConf = new Configuration(table.getHadoopConf());
        HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(hadoopConf, mergeHandle.getOldFilePath());
        HoodieFileReader bootstrapFileReader = null;
        if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) {
            readSchema = baseFileReader.getSchema();
            gWriter = new GenericDatumWriter(readSchema);
            gReader = new GenericDatumReader(readSchema, mergeHandle.getWriterSchemaWithMetaFields());
        } else {
            gReader = null;
            gWriter = null;
            readSchema = mergeHandle.getWriterSchemaWithMetaFields();
        }
        BoundedInMemoryExecutor<Object, Object, Void> wrapper = null;
        Option<InternalSchema> querySchemaOpt = SerDeHelper.fromJson(table.getConfig().getInternalSchema());
        boolean needToReWriteRecord = false;
        Map<String, String> renameCols = new HashMap<String, String>();
        if (querySchemaOpt.isPresent() && !baseFile.getBootstrapBaseFile().isPresent()) {
            InternalSchema querySchema = AvroSchemaEvolutionUtils.reconcileSchema(readSchema, querySchemaOpt.get());
            long commitInstantTime = Long.valueOf(FSUtils.getCommitTime(mergeHandle.getOldFilePath().getName()));
            InternalSchema writeInternalSchema = InternalSchemaCache.searchSchemaAndCache(commitInstantTime, table.getMetaClient(), table.getConfig().getInternalSchemaCacheEnable());
            if (writeInternalSchema.isEmptySchema()) {
                throw new HoodieException(String.format("cannot find file schema for current commit %s", commitInstantTime));
            }
            List<String> colNamesFromQuerySchema = querySchema.getAllColsFullName();
            List<String> colNamesFromWriteSchema = writeInternalSchema.getAllColsFullName();
            List sameCols = colNamesFromWriteSchema.stream().filter(f -> colNamesFromQuerySchema.contains(f) && writeInternalSchema.findIdByName((String)f) == querySchema.findIdByName((String)f) && writeInternalSchema.findIdByName((String)f) != -1 && writeInternalSchema.findType(writeInternalSchema.findIdByName((String)f)).equals(querySchema.findType(writeInternalSchema.findIdByName((String)f)))).collect(Collectors.toList());
            readSchema = AvroInternalSchemaConverter.convert(new InternalSchemaMerger(writeInternalSchema, querySchema, true, false, false).mergeSchema(), readSchema.getName());
            Schema writeSchemaFromFile = AvroInternalSchemaConverter.convert(writeInternalSchema, readSchema.getName());
            boolean bl = needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size() || SchemaCompatibility.checkReaderWriterCompatibility(readSchema, writeSchemaFromFile).getType() == SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
            if (needToReWriteRecord) {
                renameCols = InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema);
            }
        }
        try {
            Iterator<Object> readerIterator;
            if (baseFile.getBootstrapBaseFile().isPresent()) {
                Path bootstrapFilePath = new Path(baseFile.getBootstrapBaseFile().get().getPath());
                Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf());
                bootstrapFileReader = HoodieFileReaderFactory.getFileReader(bootstrapFileConfig, bootstrapFilePath);
                Schema bootstrapSchema = externalSchemaTransformation ? bootstrapFileReader.getSchema() : mergeHandle.getWriterSchema();
                readerIterator = new MergingIterator<GenericRecord>(baseFileReader.getRecordIterator(readSchema), bootstrapFileReader.getRecordIterator(bootstrapSchema), inputRecordPair -> HoodieAvroUtils.stitchRecords((GenericRecord)inputRecordPair.getLeft(), (GenericRecord)inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields()));
            } else {
                readerIterator = needToReWriteRecord ? HoodieAvroUtils.rewriteRecordWithNewSchema(baseFileReader.getRecordIterator(), readSchema, renameCols) : baseFileReader.getRecordIterator(readSchema);
            }
            ThreadLocal encoderCache = new ThreadLocal();
            ThreadLocal decoderCache = new ThreadLocal();
            wrapper = new BoundedInMemoryExecutor<Object, Object, Void>((long)table.getConfig().getWriteBufferLimitBytes(), readerIterator, new BaseMergeHelper.UpdateHandler(mergeHandle), record -> {
                if (!externalSchemaTransformation) {
                    return record;
                }
                return this.transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, (GenericRecord)record);
            }, table.getPreExecuteRunnable());
            wrapper.execute();
        }
        catch (Exception e) {
            throw new HoodieException(e);
        }
        finally {
            baseFileReader.close();
            if (bootstrapFileReader != null) {
                bootstrapFileReader.close();
            }
            if (null != wrapper) {
                wrapper.shutdownNow();
                wrapper.awaitTermination();
            }
            mergeHandle.close();
        }
    }

    private static class MergeHelperHolder {
        private static final HoodieMergeHelper HOODIE_MERGE_HELPER = new HoodieMergeHelper();

        private MergeHelperHolder() {
        }
    }
}

