/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.format.avro;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FileFormatFactory;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.SimpleStatsExtractor;
import org.apache.paimon.format.avro.AvroBulkFormat;
import org.apache.paimon.format.avro.AvroBulkWriter;
import org.apache.paimon.format.avro.AvroRowDatumWriter;
import org.apache.paimon.format.avro.AvroSchemaConverter;
import org.apache.paimon.format.avro.AvroSimpleStatsExtractor;
import org.apache.paimon.format.avro.AvroWriterFactory;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.shade.org.apache.avro.Schema;
import org.apache.paimon.shade.org.apache.avro.file.CodecFactory;
import org.apache.paimon.shade.org.apache.avro.file.DataFileWriter;
import org.apache.paimon.statistics.SimpleColStatsCollector;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;

public class AvroFileFormat
extends FileFormat {
    public static final String IDENTIFIER = "avro";
    public static final ConfigOption<String> AVRO_OUTPUT_CODEC = ConfigOptions.key("codec").stringType().defaultValue("snappy").withDescription("The compression codec for avro");
    public static final ConfigOption<Map<String, String>> AVRO_ROW_NAME_MAPPING = ConfigOptions.key("row-name-mapping").mapType().defaultValue(new HashMap());
    private final FileFormatFactory.FormatContext context;

    public AvroFileFormat(FileFormatFactory.FormatContext context) {
        super(IDENTIFIER);
        this.context = context;
    }

    @Override
    public FormatReaderFactory createReaderFactory(RowType projectedRowType, @Nullable List<Predicate> filters) {
        return new AvroBulkFormat(projectedRowType);
    }

    @Override
    public FormatWriterFactory createWriterFactory(RowType type) {
        return new RowAvroWriterFactory(type);
    }

    @Override
    public Optional<SimpleStatsExtractor> createStatsExtractor(RowType type, SimpleColStatsCollector.Factory[] statsCollectors) {
        return Optional.of(new AvroSimpleStatsExtractor(type, statsCollectors));
    }

    @Override
    public void validateDataFields(RowType rowType) {
        List<DataType> fieldTypes = rowType.getFieldTypes();
        for (DataType dataType : fieldTypes) {
            AvroSchemaConverter.convertToSchema(dataType, new HashMap<String, String>());
        }
    }

    private CodecFactory createCodecFactory(String compression) {
        Options options = this.context.formatOptions();
        if (options.contains(AVRO_OUTPUT_CODEC)) {
            return CodecFactory.fromString(options.get(AVRO_OUTPUT_CODEC));
        }
        if (compression.equalsIgnoreCase("zstd")) {
            return CodecFactory.zstandardCodec(this.context.zstdLevel());
        }
        return CodecFactory.fromString(options.get(AVRO_OUTPUT_CODEC));
    }

    private class RowAvroWriterFactory
    implements FormatWriterFactory {
        private final AvroWriterFactory<InternalRow> factory = new AvroWriterFactory((out, compression) -> {
            Schema schema = AvroSchemaConverter.convertToSchema(rowType, AvroFileFormat.this.context.formatOptions().get(AVRO_ROW_NAME_MAPPING));
            AvroRowDatumWriter datumWriter = new AvroRowDatumWriter(rowType);
            DataFileWriter<InternalRow> dataFileWriter = new DataFileWriter<InternalRow>(datumWriter);
            dataFileWriter.setCodec(AvroFileFormat.this.createCodecFactory(compression));
            dataFileWriter.setFlushOnEveryBlock(false);
            dataFileWriter.create(schema, out);
            return dataFileWriter;
        });

        private RowAvroWriterFactory(RowType rowType) {
        }

        @Override
        public FormatWriter create(final PositionOutputStream out, String compression) throws IOException {
            final AvroBulkWriter<InternalRow> writer = this.factory.create(out, compression);
            return new FormatWriter(){

                @Override
                public void addElement(InternalRow element) throws IOException {
                    writer.addElement(element);
                }

                @Override
                public void close() throws IOException {
                    writer.close();
                }

                @Override
                public boolean reachTargetSize(boolean suggestedCheck, long targetSize) throws IOException {
                    if (out != null) {
                        return suggestedCheck && out.getPos() >= targetSize;
                    }
                    throw new IOException("Failed to get stream length: no open stream");
                }
            };
        }
    }
}

