/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.format.avro;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.TableStatsExtractor;
import org.apache.paimon.format.avro.AvroBulkFormat;
import org.apache.paimon.format.avro.AvroBulkWriter;
import org.apache.paimon.format.avro.AvroRowDatumWriter;
import org.apache.paimon.format.avro.AvroSchemaConverter;
import org.apache.paimon.format.avro.AvroTableStatsExtractor;
import org.apache.paimon.format.avro.AvroWriterFactory;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.shade.org.apache.avro.Schema;
import org.apache.paimon.shade.org.apache.avro.file.CodecFactory;
import org.apache.paimon.shade.org.apache.avro.file.DataFileWriter;
import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;

public class AvroFileFormat
extends FileFormat {
    public static final String IDENTIFIER = "avro";
    public static final ConfigOption<String> AVRO_OUTPUT_CODEC = ConfigOptions.key("codec").stringType().defaultValue("snappy").withDescription("The compression codec for avro");
    private final Options formatOptions;

    public AvroFileFormat(Options formatOptions) {
        super(IDENTIFIER);
        this.formatOptions = formatOptions;
    }

    @Override
    public FormatReaderFactory createReaderFactory(RowType projectedRowType, @Nullable List<Predicate> filters) {
        return new AvroBulkFormat(projectedRowType);
    }

    @Override
    public FormatWriterFactory createWriterFactory(RowType type) {
        return new RowAvroWriterFactory(type, this.formatOptions.get(AVRO_OUTPUT_CODEC));
    }

    @Override
    public Optional<TableStatsExtractor> createStatsExtractor(RowType type, FieldStatsCollector.Factory[] statsCollectors) {
        return Optional.of(new AvroTableStatsExtractor(type, statsCollectors));
    }

    @Override
    public void validateDataFields(RowType rowType) {
        List<DataType> fieldTypes = rowType.getFieldTypes();
        for (DataType dataType : fieldTypes) {
            AvroSchemaConverter.convertToSchema(dataType);
        }
    }

    private static class RowAvroWriterFactory
    implements FormatWriterFactory {
        private final AvroWriterFactory<InternalRow> factory = new AvroWriterFactory(out -> {
            Schema schema = AvroSchemaConverter.convertToSchema(rowType);
            AvroRowDatumWriter datumWriter = new AvroRowDatumWriter(rowType);
            DataFileWriter<InternalRow> dataFileWriter = new DataFileWriter<InternalRow>(datumWriter);
            if (codec != null) {
                dataFileWriter.setCodec(CodecFactory.fromString(codec));
            }
            dataFileWriter.create(schema, out);
            return dataFileWriter;
        });

        private RowAvroWriterFactory(RowType rowType, String codec) {
        }

        @Override
        public FormatWriter create(final PositionOutputStream out, String compression) throws IOException {
            final AvroBulkWriter<InternalRow> writer = this.factory.create(out);
            return new FormatWriter(){

                @Override
                public void addElement(InternalRow element) throws IOException {
                    writer.addElement(element);
                }

                @Override
                public void flush() throws IOException {
                    writer.flush();
                }

                @Override
                public void finish() throws IOException {
                    writer.finish();
                }

                @Override
                public boolean reachTargetSize(boolean suggestedCheck, long targetSize) throws IOException {
                    if (out != null) {
                        return suggestedCheck && out.getPos() >= targetSize;
                    }
                    throw new IOException("Failed to get stream length: no open stream");
                }
            };
        }
    }
}

